aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/src/manual/gun_up.asciidoc2
-rw-r--r--ebin/gun.app2
-rw-r--r--src/gun.erl7
-rw-r--r--src/gun_app.erl1
-rw-r--r--src/gun_conns_sup.erl36
-rw-r--r--src/gun_event.erl2
-rw-r--r--src/gun_http.erl8
-rw-r--r--src/gun_http2.erl2
-rw-r--r--src/gun_pool.erl704
-rw-r--r--src/gun_pool_events_h.erl157
-rw-r--r--src/gun_pools_sup.erl37
-rw-r--r--src/gun_sup.erl13
-rw-r--r--test/handlers/pool_ws_handler.erl13
-rw-r--r--test/pool_SUITE.erl376
14 files changed, 1343 insertions, 17 deletions
diff --git a/doc/src/manual/gun_up.asciidoc b/doc/src/manual/gun_up.asciidoc
index 3853afd..2c05432 100644
--- a/doc/src/manual/gun_up.asciidoc
+++ b/doc/src/manual/gun_up.asciidoc
@@ -11,7 +11,7 @@ gun_up - The connection is up
{gun_up, ConnPid, Protocol}
ConnPid :: pid()
-Protocol :: http | http2 | socks
+Protocol :: http | http2 | raw | socks
----
The connection is up.
diff --git a/ebin/gun.app b/ebin/gun.app
index 779ccbe..4efac80 100644
--- a/ebin/gun.app
+++ b/ebin/gun.app
@@ -1,7 +1,7 @@
{application, 'gun', [
{description, "HTTP/1.1, HTTP/2 and Websocket client for Erlang/OTP."},
{vsn, "2.0.0-rc.1"},
- {modules, ['gun','gun_app','gun_content_handler','gun_cookies','gun_cookies_list','gun_data_h','gun_default_event_h','gun_event','gun_http','gun_http2','gun_protocols','gun_public_suffix','gun_raw','gun_socks','gun_sse_h','gun_sup','gun_tcp','gun_tcp_proxy','gun_tls','gun_tls_proxy','gun_tls_proxy_cb','gun_tls_proxy_http2_connect','gun_tunnel','gun_ws','gun_ws_h']},
+ {modules, ['gun','gun_app','gun_conns_sup','gun_content_handler','gun_cookies','gun_cookies_list','gun_data_h','gun_default_event_h','gun_event','gun_http','gun_http2','gun_pool','gun_pool_events_h','gun_pools_sup','gun_protocols','gun_public_suffix','gun_raw','gun_socks','gun_sse_h','gun_sup','gun_tcp','gun_tcp_proxy','gun_tls','gun_tls_proxy','gun_tls_proxy_cb','gun_tls_proxy_http2_connect','gun_tunnel','gun_ws','gun_ws_h']},
{registered, [gun_sup]},
{applications, [kernel,stdlib,ssl,cowlib]},
{mod, {gun_app, []}},
diff --git a/src/gun.erl b/src/gun.erl
index 8a101a7..d808750 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -98,6 +98,7 @@
-export([start_link/4]).
-export([callback_mode/0]).
-export([init/1]).
+-export([default_transport/1]).
-export([not_connected/3]).
-export([domain_lookup/3]).
-export([connecting/3]).
@@ -321,7 +322,7 @@ do_open(Host, Port, Opts0) ->
case check_options(maps:to_list(Opts)) of
ok ->
Result = case maps:get(supervise, Opts, true) of
- true -> supervisor:start_child(gun_sup, [self(), Host, Port, Opts]);
+ true -> supervisor:start_child(gun_conns_sup, [self(), Host, Port, Opts]);
false -> start_link(self(), Host, Port, Opts)
end,
case Result of
@@ -508,7 +509,7 @@ intermediaries_info([Intermediary=#{transport := Transport0}|Tail], Acc) ->
-spec close(pid()) -> ok.
close(ServerPid) ->
- supervisor:terminate_child(gun_sup, ServerPid).
+ supervisor:terminate_child(gun_conns_sup, ServerPid).
-spec shutdown(pid()) -> ok.
shutdown(ServerPid) ->
@@ -832,6 +833,8 @@ await_up(ServerPid, Timeout, MRef) ->
{error, timeout}
end.
+%% Flushing gun messages.
+
-spec flush(pid() | stream_ref()) -> ok.
flush(ServerPid) when is_pid(ServerPid) ->
flush_pid(ServerPid);
diff --git a/src/gun_app.erl b/src/gun_app.erl
index 848faf2..624e65f 100644
--- a/src/gun_app.erl
+++ b/src/gun_app.erl
@@ -23,6 +23,7 @@
%% API.
start(_Type, _Args) ->
+ gun_pools = ets:new(gun_pools, [ordered_set, public, named_table]),
gun_sup:start_link().
stop(_State) ->
diff --git a/src/gun_conns_sup.erl b/src/gun_conns_sup.erl
new file mode 100644
index 0000000..bee266b
--- /dev/null
+++ b/src/gun_conns_sup.erl
@@ -0,0 +1,36 @@
+%% Copyright (c) 2013-2021, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_conns_sup).
+-behaviour(supervisor).
+
+%% API.
+-export([start_link/0]).
+
+%% supervisor.
+-export([init/1]).
+
+%% API.
+
+-spec start_link() -> {ok, pid()}.
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% supervisor.
+
+init([]) ->
+ Procs = [
+ #{id => gun, start => {gun, start_link, []}, restart => temporary}
+ ],
+ {ok, {#{strategy => simple_one_for_one}, Procs}}.
diff --git a/src/gun_event.erl b/src/gun_event.erl
index 71da77b..ab6020b 100644
--- a/src/gun_event.erl
+++ b/src/gun_event.erl
@@ -83,7 +83,7 @@
-type request_start_event() :: #{
stream_ref := gun:stream_ref(),
reply_to := pid(),
- function := headers | request | ws_upgrade,
+ function := headers | request | ws_upgrade, %% @todo connect?
method := iodata(),
scheme => binary(),
authority := iodata(),
diff --git a/src/gun_http.erl b/src/gun_http.erl
index cc541e2..aaa6d15 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -35,7 +35,7 @@
-export([down/1]).
-export([ws_upgrade/11]).
-%% Functions shared with gun_http2.
+%% Functions shared with gun_http2 and gun_pool.
-export([host_header/3]).
-type io() :: head | {body, non_neg_integer()} | body_close | body_chunked | body_trailer.
@@ -607,7 +607,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi
end,
{Authority, Headers3} = case lists:keyfind(<<"host">>, 1, Headers2) of
false ->
- Authority0 = host_header(Transport, Host, Port),
+ Authority0 = host_header(Transport:name(), Host, Port),
{Authority0, [{<<"host">>, Authority0}|Headers2]};
{_, Authority1} ->
{Authority1, Headers2}
@@ -648,7 +648,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi
end,
{Authority, Conn, Out, CookieStore, EvHandlerState}.
-host_header(Transport, Host0, Port) ->
+host_header(TransportName, Host0, Port) ->
Host = case Host0 of
{local, _SocketPath} -> <<>>;
Tuple when tuple_size(Tuple) =:= 8 -> [$[, inet:ntoa(Tuple), $]]; %% IPv6.
@@ -656,7 +656,7 @@ host_header(Transport, Host0, Port) ->
Atom when is_atom(Atom) -> atom_to_list(Atom);
_ -> Host0
end,
- case {Transport:name(), Port} of
+ case {TransportName, Port} of
{tcp, 80} -> Host;
{tls, 443} -> Host;
_ -> [Host, $:, integer_to_binary(Port)]
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 2778b4f..ec51235 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -979,7 +979,7 @@ prepare_headers(State=#http2_state{transport=Transport},
Scheme = scheme(State),
Authority = case lists:keyfind(<<"host">>, 1, Headers0) of
{_, Host} -> Host;
- _ -> gun_http:host_header(Transport, Host0, Port)
+ _ -> gun_http:host_header(Transport:name(), Host0, Port)
end,
%% @todo We also must remove any header found in the connection header.
%% @todo Much of this is duplicated in cow_http2_machine; sort things out.
diff --git a/src/gun_pool.erl b/src/gun_pool.erl
new file mode 100644
index 0000000..56734ee
--- /dev/null
+++ b/src/gun_pool.erl
@@ -0,0 +1,704 @@
+%% Copyright (c) 2021, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_pool).
+-behaviour(gen_statem).
+
+%% Pools.
+-export([start_pool/3]).
+-export([stop_pool/2]).
+-export([stop_pool/3]).
+% @todo shutdown pool?
+-export([info/0]).
+-export([info/1]).
+-export([info/2]).
+-export([await_up/1]).
+-export([await_up/2]).
+-export([checkout/2]). %% Use responsibly!
+
+%% Requests.
+-export([delete/2]).
+-export([delete/3]).
+-export([get/2]).
+-export([get/3]).
+-export([head/2]).
+-export([head/3]).
+-export([options/2]).
+-export([options/3]).
+-export([patch/2]).
+-export([patch/3]).
+-export([patch/4]).
+-export([post/2]).
+-export([post/3]).
+-export([post/4]).
+-export([put/2]).
+-export([put/3]).
+-export([put/4]).
+
+%% Generic requests interface.
+-export([headers/3]).
+-export([headers/4]).
+-export([request/4]).
+-export([request/5]).
+
+%% Streaming data.
+-export([data/3]).
+
+%% Tunneling. (HTTP/2+ only.)
+%% @todo -export([connect/2]).
+%% @todo -export([connect/3]).
+%% @todo -export([connect/4]).
+
+%% Cookies.
+%% @todo -export([gc_cookies/1]).
+%% @todo -export([session_gc_cookies/1]).
+
+%% Awaiting gun messages.
+-export([await/1]).
+-export([await/2]).
+-export([await/3]).
+-export([await_body/1]).
+-export([await_body/2]).
+-export([await_body/3]).
+
+%% Flushing gun messages.
+-export([flush/1]).
+
+%% Streams.
+-export([update_flow/2]).
+-export([cancel/1]).
+-export([stream_info/1]).
+
+%% Websocket. (HTTP/2+ only for upgrade.)
+%% -export([ws_upgrade/1]).
+%% -export([ws_upgrade/2]).
+%% -export([ws_upgrade/3]).
+-export([ws_send/2]).
+%% -export([ws_send/3]). (HTTP/2+ only.)
+
+%% Internals.
+-export([callback_mode/0]).
+-export([start_link/3]).
+-export([init/1]).
+-export([degraded/3]).
+-export([operational/3]).
+-export([terminate/3]).
+
+-type setup_msg() :: {gun_up, pid(), http | http2 | raw | socks}
+ | {gun_upgrade, pid(), gun:stream_ref(), [<<"websocket">>], [{binary(), binary()}]}.
+
+-type opts() :: #{
+ conn_opts => gun:opts(),
+ scope => any(),
+ setup_fun => {fun((pid(), setup_msg(), any()) -> any()), any()},
+ size => non_neg_integer()
+}.
+-export_type([opts/0]).
+
+-type pool_stream_ref() :: {pid(), gun:stream_ref()}.
+-export_type([pool_stream_ref/0]).
+
+-type error_result() :: {error, pool_not_found | no_connection_available, atom()}. %% @todo {pool_start_error, SupError}
+
+-type request_result() :: {async, pool_stream_ref()}
+ %% @todo {sync, ...} perhaps with Status, Headers, Body, and Extra info such as intermediate responses.
+ | error_result().
+
+-type req_opts() :: #{
+ %% Options common with normal Gun.
+ flow => pos_integer(),
+ reply_to => pid(),
+% @todo tunnel => stream_ref(),
+
+ %% Options specific to pools.
+ checkout_call_timeout => timeout(),
+ checkout_retry => [pos_integer()],
+ scope => any(),
+ start_pool_if_missing => boolean()
+}.
+-export_type([req_opts/0]).
+
+-type ws_send_opts() :: #{
+ authority => iodata(),
+
+ %% Options specific to pools.
+ checkout_call_timeout => timeout(),
+ checkout_retry => [pos_integer()],
+ scope => any(),
+ start_pool_if_missing => boolean()
+}.
+-export_type([ws_send_opts/0]).
+
+%% @todo tunnel
+-type meta() :: #{pid() => #{ws => gun:stream_ref()}}.
+
+-record(state, {
+ host :: inet:hostname() | inet:ip_address(),
+ port :: inet:port_number(),
+ opts :: gun:opts(),
+ table :: ets:tid(),
+ conns :: #{pid() => down | {setup, any()} | {up, http | http2 | ws | raw, map()}},
+ conns_meta = #{} :: meta(),
+ await_up = [] :: [{pid(), any()}]
+}).
+
+%% Pool management.
+
+-spec start_pool(inet:hostname() | inet:ip_address(), inet:port_number(), opts())
+ -> {ok, pid()} | {error, any()}.
+start_pool(Host, Port, Opts) ->
+ supervisor:start_child(gun_pools_sup, [Host, Port, Opts]).
+
+-spec stop_pool(inet:hostname() | inet:ip_address(), inet:port_number())
+ -> ok.
+stop_pool(Host, Port) ->
+ stop_pool(Host, Port, #{}).
+
+-spec stop_pool(inet:hostname() | inet:ip_address(), inet:port_number(), req_opts())
+ -> ok | {error, pool_not_found, atom()}.
+stop_pool(Host, Port, ReqOpts) ->
+ case get_pool(iolist_to_binary([Host, $:, integer_to_binary(Port)]), ReqOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ supervisor:terminate_child(gun_pools_sup, ManagerPid)
+ end.
+
+-spec info() -> [map()].
+info() ->
+ ets:foldl(fun({{Scope, _}, ManagerPid}, Acc) ->
+ {StateName, Info} = info(ManagerPid),
+ [Info#{scope => Scope, state => StateName}|Acc]
+ end, [], gun_pools).
+
+-spec info(pid() | binary()) -> undefined | {degraded | operational, map()}.
+info(ManagerPid) when is_pid(ManagerPid) ->
+ gen_statem:call(ManagerPid, info);
+info(Authority) ->
+ info(Authority, default).
+
+-spec info(binary(), any()) -> undefined | {degraded | operational, map()}.
+info(Authority, Scope) ->
+ case ets:lookup(gun_pools, {Scope, Authority}) of
+ [] ->
+ undefined;
+ [{_, ManagerPid}] ->
+ gen_statem:call(ManagerPid, info)
+ end.
+
+-spec await_up(pid() | binary()) -> ok | {error, pool_not_found, atom()}.
+await_up(ManagerPid) when is_pid(ManagerPid) ->
+ gen_statem:call(ManagerPid, await_up, 5000);
+await_up(Authority) ->
+ await_up(Authority, default).
+
+-spec await_up(binary(), any()) -> ok | {error, pool_not_found, atom()}.
+await_up(Authority, Scope) ->
+ case ets:lookup(gun_pools, {Scope, Authority}) of
+ [] ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ [{_, ManagerPid}] ->
+ gen_statem:call(ManagerPid, await_up, 5000)
+ end.
+
+-spec checkout(pid(), req_opts()) -> undefined | {pid(), map()}.
+checkout(ManagerPid, ReqOpts=#{checkout_retry := Retry}) when is_list(Retry) ->
+ CallTimeout = maps:get(checkout_call_timeout, ReqOpts, 5000),
+ case gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout) of
+ undefined ->
+ checkout_retry(ManagerPid, ReqOpts, CallTimeout, Retry);
+ Result ->
+ Result
+ end;
+checkout(ManagerPid, ReqOpts) ->
+ CallTimeout = maps:get(checkout_call_timeout, ReqOpts, 5000),
+ gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout).
+
+%% When the checkout_retry option is used, and the first call resulted
+%% in no connection being given out, we wait for the configured amount
+%% of time then try again. We loop over the wait times until there is
+%% none.
+checkout_retry(_, _, _, []) ->
+ undefined;
+checkout_retry(ManagerPid, ReqOpts, CallTimeout, [Wait|Retry]) ->
+ timer:sleep(Wait),
+ case gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout) of
+ undefined ->
+ checkout_retry(ManagerPid, ReqOpts, CallTimeout, Retry);
+ Result ->
+ Result
+ end.
+
+%% Requests.
+
+-spec delete(iodata(), gun:req_headers()) -> request_result().
+delete(Path, Headers) ->
+ request(<<"DELETE">>, Path, Headers, <<>>).
+
+-spec delete(iodata(), gun:req_headers(), req_opts()) -> request_result().
+delete(Path, Headers, ReqOpts) ->
+ request(<<"DELETE">>, Path, Headers, <<>>, ReqOpts).
+
+-spec get(iodata(), gun:req_headers()) -> request_result().
+get(Path, Headers) ->
+ request(<<"GET">>, Path, Headers, <<>>).
+
+-spec get(iodata(), gun:req_headers(), req_opts()) -> request_result().
+get(Path, Headers, ReqOpts) ->
+ request(<<"GET">>, Path, Headers, <<>>, ReqOpts).
+
+-spec head(iodata(), gun:req_headers()) -> request_result().
+head(Path, Headers) ->
+ request(<<"HEAD">>, Path, Headers, <<>>).
+
+-spec head(iodata(), gun:req_headers(), req_opts()) -> request_result().
+head(Path, Headers, ReqOpts) ->
+ request(<<"HEAD">>, Path, Headers, <<>>, ReqOpts).
+
+-spec options(iodata(), gun:req_headers()) -> request_result().
+options(Path, Headers) ->
+ request(<<"OPTIONS">>, Path, Headers, <<>>).
+
+-spec options(iodata(), gun:req_headers(), req_opts()) -> request_result().
+options(Path, Headers, ReqOpts) ->
+ request(<<"OPTIONS">>, Path, Headers, <<>>, ReqOpts).
+
+-spec patch(iodata(), gun:req_headers()) -> request_result().
+patch(Path, Headers) ->
+ headers(<<"PATCH">>, Path, Headers).
+
+-spec patch(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
+patch(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
+ headers(<<"PATCH">>, Path, Headers, ReqOpts);
+patch(Path, Headers, Body) ->
+ request(<<"PATCH">>, Path, Headers, Body).
+
+-spec patch(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+patch(Path, Headers, Body, ReqOpts) ->
+ request(<<"PATCH">>, Path, Headers, Body, ReqOpts).
+
+-spec post(iodata(), gun:req_headers()) -> request_result().
+post(Path, Headers) ->
+ headers(<<"POST">>, Path, Headers).
+
+-spec post(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
+post(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
+ headers(<<"POST">>, Path, Headers, ReqOpts);
+post(Path, Headers, Body) ->
+ request(<<"POST">>, Path, Headers, Body).
+
+-spec post(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+post(Path, Headers, Body, ReqOpts) ->
+ request(<<"POST">>, Path, Headers, Body, ReqOpts).
+
+-spec put(iodata(), gun:req_headers()) -> request_result().
+put(Path, Headers) ->
+ headers(<<"PUT">>, Path, Headers).
+
+-spec put(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
+put(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
+ headers(<<"PUT">>, Path, Headers, ReqOpts);
+put(Path, Headers, Body) ->
+ request(<<"PUT">>, Path, Headers, Body).
+
+-spec put(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+put(Path, Headers, Body, ReqOpts) ->
+ request(<<"PUT">>, Path, Headers, Body, ReqOpts).
+
+%% Generic requests interface.
+%%
+%% @todo Accept a TargetURI map as well as a normal Path.
+
+-spec headers(iodata(), iodata(), gun:req_headers()) -> request_result().
+headers(Method, Path, Headers) ->
+ headers(Method, Path, Headers, #{}).
+
+-spec headers(iodata(), iodata(), gun:req_headers(), req_opts()) -> request_result().
+headers(Method, Path, Headers, ReqOpts) ->
+ case get_pool(authority(Headers), ReqOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ case checkout(ManagerPid, ReqOpts) of
+ undefined ->
+ {error, no_connection_available,
+ 'No connection in the pool with enough capacity available to open a new stream.'};
+ {ConnPid, _Meta} ->
+ StreamRef = gun:headers(ConnPid, Method, Path, Headers, ReqOpts),
+ %% @todo Synchronous mode.
+ {async, {ConnPid, StreamRef}}
+ end
+ end.
+
+-spec request(iodata(), iodata(), gun:req_headers(), iodata()) -> request_result().
+request(Method, Path, Headers, Body) ->
+ request(Method, Path, Headers, Body, #{}).
+
+-spec request(iodata(), iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+request(Method, Path, Headers, Body, ReqOpts) ->
+ case get_pool(authority(Headers), ReqOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ case checkout(ManagerPid, ReqOpts) of
+ undefined ->
+ {error, no_connection_available,
+ 'No connection in the pool with enough capacity available to open a new stream.'};
+ {ConnPid, _Meta} ->
+ StreamRef = gun:request(ConnPid, Method, Path, Headers, Body, ReqOpts),
+ %% @todo Synchronous mode.
+ {async, {ConnPid, StreamRef}}
+ end
+ end.
+
+%% We require the host to be given in the headers for the time being.
+%% @todo Allow passing it in options. Websocket send already does that.
+authority(#{<<"host">> := Authority}) ->
+ Authority;
+authority(Headers) ->
+ {_, Authority} = lists:keyfind(<<"host">>, 1, Headers),
+ Authority.
+
+get_pool(Authority0, ReqOpts) ->
+ Authority = iolist_to_binary(Authority0),
+ %% @todo Perhaps rename this to temporary.
+ %% There's two concepts: temporary pool is started and stops
+ %% when there's no traffic. Dynamic pool simply reduces its number
+ %% of connections when there's no traffic.
+ StartPoolIfMissing = maps:get(start_pool_if_missing, ReqOpts, false),
+ Scope = maps:get(scope, ReqOpts, default),
+ case ets:lookup(gun_pools, {Scope, Authority}) of
+ [] when StartPoolIfMissing ->
+ start_missing_pool(Authority, ReqOpts);
+ [] ->
+ undefined;
+ [{_, ManagerPid}] ->
+ %% @todo With temporary pool, getting a pid here doesn't mean the pool can be used.
+ %% Indeed the manager could be in process of stopping. I suppose we must
+ %% do a check but perhaps it's best to leave that detail to the user
+ %% (they can easily retry and recreate the pool if necessary).
+ ManagerPid
+ end.
+
+start_missing_pool(_Authority, _ReqOpts) ->
+ undefined.
+
+%% Streaming data.
+
+-spec data(pool_stream_ref(), fin | nofin, iodata()) -> ok.
+data({ConnPid, StreamRef}, IsFin, Data) ->
+ gun:data(ConnPid, StreamRef, IsFin, Data).
+
+%% Awaiting gun messages.
+
+-spec await(pool_stream_ref()) -> gun:await_result().
+await({ConnPid, StreamRef}) ->
+ gun:await(ConnPid, StreamRef).
+
+-spec await(pool_stream_ref(), timeout() | reference()) -> gun:await_result().
+await({ConnPid, StreamRef}, MRefOrTimeout) ->
+ gun:await(ConnPid, StreamRef, MRefOrTimeout).
+
+-spec await(pool_stream_ref(), timeout(), reference()) -> gun:await_result().
+await({ConnPid, StreamRef}, Timeout, MRef) ->
+ gun:await(ConnPid, StreamRef, Timeout, MRef).
+
+-spec await_body(pool_stream_ref()) -> gun:await_body_result().
+await_body({ConnPid, StreamRef}) ->
+ gun:await_body(ConnPid, StreamRef).
+
+-spec await_body(pool_stream_ref(), timeout() | reference()) -> gun:await_body_result().
+await_body({ConnPid, StreamRef}, MRefOrTimeout) ->
+ gun:await_body(ConnPid, StreamRef, MRefOrTimeout).
+
+-spec await_body(pool_stream_ref(), timeout(), reference()) -> gun:await_body_result().
+await_body({ConnPid, StreamRef}, Timeout, MRef) ->
+ gun:await_body(ConnPid, StreamRef, Timeout, MRef).
+
+%% Flushing gun messages.
+
+-spec flush(pool_stream_ref()) -> ok.
+flush({ConnPid, _}) ->
+ gun:flush(ConnPid).
+
+%% Flow control.
+
+-spec update_flow(pool_stream_ref(), pos_integer()) -> ok.
+update_flow({ConnPid, StreamRef}, Flow) ->
+ gun:update_flow(ConnPid, StreamRef, Flow).
+
+%% Cancelling a stream.
+
+-spec cancel(pool_stream_ref()) -> ok.
+cancel({ConnPid, StreamRef}) ->
+ gun:cancel(ConnPid, StreamRef).
+
+%% Information about a stream.
+
+-spec stream_info(pool_stream_ref()) -> {ok, map() | undefined} | {error, not_connected}.
+stream_info({ConnPid, StreamRef}) ->
+ gun:stream_info(ConnPid, StreamRef).
+
+%% Websocket.
+
+-spec ws_send(gun:ws_frame() | [gun:ws_frame()], ws_send_opts()) -> ok | error_result().
+ws_send(Frames, WsSendOpts=#{authority := Authority}) ->
+ case get_pool(Authority, WsSendOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ case checkout(ManagerPid, WsSendOpts) of
+ undefined ->
+ {error, no_connection_available,
+ 'No connection in the pool with enough capacity available to send Websocket frames.'};
+ {ConnPid, #{ws := StreamRef}} ->
+ gun:ws_send(ConnPid, StreamRef, Frames)
+ end
+ end.
+
+%% Pool manager internals.
+%%
+%% The pool manager is responsible for starting connection processes
+%% and restarting them as necessary. It also provides a suitable
+%% connection process to any caller that needs it.
+%%
+%% The pool manager installs an event handler into each connection.
+%% The event handler is responsible for counting the number of
+%% active streams. It updates the gun_pooled_conns ets table
+%% whenever a stream begins or ends.
+%%
+%% A connection is deemed suitable if it is possible to open new
+%% streams. How many streams can be open at any one time depends
+%% on the protocol. For HTTP/2 the manager process keeps track of
+%% the connection's settings to know the maximum. For non-stream
+%% based protocols, there is no limit.
+%%
+%% The connection to be used is otherwise chosen randomly. The
+%% first connection that is suitable is returned. There is no
+%% need to "give back" the connection to the manager.
+
+%% @todo
+%% What should happen if we always fail to reconnect? I suspect we keep the manager
+%% around and propagate errors, the same as if there's no more capacity? Perhaps have alarms?
+
+callback_mode() -> state_functions.
+
+start_link(Host, Port, Opts) ->
+ gen_statem:start_link(?MODULE, {Host, Port, Opts}, []).
+
+init({Host, Port, Opts}) ->
+ process_flag(trap_exit, true),
+ true = ets:insert_new(gun_pools, {gun_pools_key(Host, Port, Opts), self()}),
+ Tid = ets:new(gun_pooled_conns, [ordered_set, public]),
+ Size = maps:get(size, Opts, 8),
+ %% @todo Only start processes in static mode.
+ ConnOpts = conn_opts(Tid, Opts),
+ Conns = maps:from_list([begin
+ {ok, ConnPid} = gun:open(Host, Port, ConnOpts),
+ _ = monitor(process, ConnPid),
+ {ConnPid, down}
+ end || _ <- lists:seq(1, Size)]),
+ State = #state{
+ host=Host,
+ port=Port,
+ opts=Opts,
+ table=Tid,
+ conns=Conns
+ },
+ %% If Size is 0 then we can never be operational.
+ {ok, degraded, State}.
+
+gun_pools_key(Host, Port, Opts) ->
+ Transport = maps:get(transport, Opts, gun:default_transport(Port)),
+ Authority = gun_http:host_header(Transport, Host, Port),
+ Scope = maps:get(scope, Opts, default),
+ {Scope, iolist_to_binary(Authority)}.
+
+conn_opts(Tid, Opts) ->
+ ConnOpts = maps:get(conn_opts, Opts, #{}),
+ EventHandlerState = maps:with([event_handler], ConnOpts),
+ H2Opts = maps:get(http2_opts, ConnOpts, #{}),
+ ConnOpts#{
+ event_handler => {gun_pool_events_h, EventHandlerState#{
+ table => Tid
+ }},
+ http2_opts => H2Opts#{
+ notify_settings_changed => true
+ }
+ }.
+
+%% We use the degraded state as long as at least one connection is degraded.
+%% @todo Probably keep count of connections separately to avoid counting every time.
+degraded(info, Msg={gun_up, ConnPid, _}, StateData=#state{opts=Opts, conns=Conns}) ->
+ #{ConnPid := down} = Conns,
+ %% We optionally run the setup function if one is defined. The
+ %% setup function tells us whether we are fully up or not. The
+ %% setup function may be called repeatedly until the connection
+ %% is established.
+ %%
+ %% @todo It is possible that the connection never does get
+ %% fully established. We should deal with this. We probably
+ %% need to handle all messages.
+ {SetupFun, SetupState0} = setup_fun(Opts),
+ degraded_setup(ConnPid, Msg, StateData, SetupFun, SetupState0);
+%% @todo
+%degraded(info, Msg={gun_tunnel_up, ConnPid, _, _}, StateData0=#state{conns=Conns}) ->
+% ;
+degraded(info, Msg={gun_upgrade, ConnPid, _, _, _},
+ StateData=#state{opts=#{setup_fun := {SetupFun, _}}, conns=Conns}) ->
+ %% @todo Probably shouldn't crash if the state is incorrect, that's programmer error though.
+ #{ConnPid := {setup, SetupState0}} = Conns,
+ %% We run the setup function again using the state previously kept.
+ degraded_setup(ConnPid, Msg, StateData, SetupFun, SetupState0);
+degraded(Type, Event, StateData) ->
+ handle_common(Type, Event, ?FUNCTION_NAME, StateData).
+
+setup_fun(#{setup_fun := SetupFun}) ->
+ SetupFun;
+setup_fun(_) ->
+ {fun (_, {gun_up, _, Protocol}, _) ->
+ {up, Protocol, #{}}
+ end, undefined}.
+
+degraded_setup(ConnPid, Msg, StateData0=#state{conns=Conns, conns_meta=ConnsMeta,
+ await_up=AwaitUp}, SetupFun, SetupState0) ->
+ case SetupFun(ConnPid, Msg, SetupState0) of
+ Setup={setup, _SetupState} ->
+ StateData = StateData0#state{conns=Conns#{ConnPid => Setup}},
+ {keep_state, StateData};
+ %% The Meta is different from Settings. It allows passing around
+ %% Websocket or tunnel stream refs.
+ {up, Protocol, Meta} ->
+ Settings = #{},
+ StateData = StateData0#state{
+ conns=Conns#{ConnPid => {up, Protocol, Settings}},
+ conns_meta=ConnsMeta#{ConnPid => Meta}
+ },
+ case is_degraded(StateData) of
+ true -> {keep_state, StateData};
+ false -> {next_state, operational, StateData#state{await_up=[]},
+ [{reply, ReplyTo, ok} || ReplyTo <- AwaitUp]}
+ end
+ end.
+
+is_degraded(#state{conns=Conns0}) ->
+ Conns = maps:to_list(Conns0),
+ Len = length(Conns),
+ Ups = [up || {_, {up, _, _}} <- Conns],
+ Len =/= length(Ups).
+
+operational(Type, Event, StateData) ->
+ handle_common(Type, Event, ?FUNCTION_NAME, StateData).
+
+handle_common({call, From}, {checkout, _ReqOpts}, _,
+ StateData=#state{conns_meta=ConnsMeta}) ->
+ case find_available_connection(StateData) of
+ none ->
+ {keep_state_and_data, {reply, From, undefined}};
+ ConnPid ->
+ Meta = maps:get(ConnPid, ConnsMeta, #{}),
+ {keep_state_and_data, {reply, From, {ConnPid, Meta}}}
+ end;
+handle_common(info, {gun_notify, ConnPid, settings_changed, Settings}, _, StateData=#state{conns=Conns}) ->
+ %% Assert that the state is correct.
+ {up, http2, _} = maps:get(ConnPid, Conns),
+ {keep_state, StateData#state{conns=Conns#{ConnPid => {up, http2, Settings}}}};
+handle_common(info, {gun_down, ConnPid, Protocol, _Reason, _KilledStreams}, _, StateData=#state{conns=Conns}) ->
+ {up, Protocol, _} = maps:get(ConnPid, Conns),
+ {next_state, degraded, StateData#state{conns=Conns#{ConnPid => down}}};
+%% @todo We do not want to reconnect automatically when the pool is dynamic.
+handle_common(info, {'DOWN', _MRef, process, ConnPid0, _Reason}, _,
+ StateData=#state{host=Host, port=Port, opts=Opts, table=Tid, conns=Conns0, conns_meta=ConnsMeta0}) ->
+ Conns = maps:remove(ConnPid0, Conns0),
+ ConnsMeta = maps:remove(ConnPid0, ConnsMeta0),
+ ConnOpts = conn_opts(Tid, Opts),
+ {ok, ConnPid} = gun:open(Host, Port, ConnOpts),
+ _ = monitor(process, ConnPid),
+ {next_state, degraded, StateData#state{conns=Conns#{ConnPid => down}, conns_meta=ConnsMeta}};
+handle_common({call, From}, info, StateName, #state{host=Host, port=Port,
+ opts=Opts, table=Tid, conns=Conns, conns_meta=ConnsMeta}) ->
+ {keep_state_and_data, {reply, From, {StateName, #{
+ %% @todo Not sure whether all of this should be documented. Maybe not ConnsMeta for now?
+ host => Host,
+ port => Port,
+ opts => Opts,
+ table => Tid,
+ conns => Conns,
+ conns_meta => ConnsMeta
+ }}}};
+handle_common({call, From}, await_up, operational, _) ->
+ {keep_state_and_data, {reply, From, ok}};
+handle_common({call, From}, await_up, _, StateData=#state{await_up=AwaitUp}) ->
+ {keep_state, StateData#state{await_up=[From|AwaitUp]}};
+handle_common(Type, Event, StateName, StateData) ->
+ logger:error("Unexpected event in state ~p of type ~p:~n~w~n~p~n",
+ [StateName, Type, Event, StateData]),
+ keep_state_and_data.
+
+%% We go over every connection and return the first one
+%% we find that has capacity. How we determine whether
+%% capacity is available depends on the protocol. For
+%% HTTP/2 we look into the protocol settings. The
+%% current number of streams is maintained by the
+%% event handler gun_pool_events_h.
+find_available_connection(#state{table=Tid, conns=Conns}) ->
+ I = lists:sort([{rand:uniform(), K} || K <- maps:keys(Conns)]),
+ find_available_connection(I, Conns, Tid).
+
+find_available_connection([], _, _) ->
+ none;
+find_available_connection([{_, ConnPid}|I], Conns, Tid) ->
+ case maps:get(ConnPid, Conns) of
+ {up, Protocol, Settings} ->
+ MaxStreams = max_streams(Protocol, Settings),
+ CurrentStreams = case ets:lookup(Tid, ConnPid) of
+ [] ->
+ 0;
+ [{_, CS}] ->
+ CS
+ end,
+ if
+ CurrentStreams + 1 > MaxStreams ->
+ find_available_connection(I, Conns, Tid);
+ true ->
+ ConnPid
+ end;
+ _ ->
+ find_available_connection(I, Conns, Tid)
+ end.
+
+max_streams(http, _) ->
+ 1;
+max_streams(http2, #{max_concurrent_streams := MaxStreams}) ->
+ MaxStreams;
+max_streams(http2, #{}) ->
+ infinity;
+%% There are no streams or Gun is not aware of streams when
+%% the protocol is Websocket or raw.
+max_streams(ws, _) ->
+ infinity;
+max_streams(raw, _) ->
+ infinity.
+
+terminate(Reason, StateName, #state{host=Host, port=Port, opts=Opts, await_up=AwaitUp}) ->
+ gen_statem:reply([
+ {reply, ReplyTo, {error, {terminate, StateName, Reason}}}
+ || ReplyTo <- AwaitUp]),
+ true = ets:delete(gun_pools, gun_pools_key(Host, Port, Opts)),
+ ok.
diff --git a/src/gun_pool_events_h.erl b/src/gun_pool_events_h.erl
new file mode 100644
index 0000000..1871a00
--- /dev/null
+++ b/src/gun_pool_events_h.erl
@@ -0,0 +1,157 @@
+%% Copyright (c) 2021, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_pool_events_h).
+
+-export([init/2]).
+-export([domain_lookup_start/2]).
+-export([domain_lookup_end/2]).
+-export([connect_start/2]).
+-export([connect_end/2]).
+-export([tls_handshake_start/2]).
+-export([tls_handshake_end/2]).
+-export([request_start/2]).
+-export([request_headers/2]).
+-export([request_end/2]).
+-export([push_promise_start/2]).
+-export([push_promise_end/2]).
+-export([response_start/2]).
+-export([response_inform/2]).
+-export([response_headers/2]).
+-export([response_trailers/2]).
+-export([response_end/2]).
+-export([ws_upgrade/2]).
+-export([ws_recv_frame_start/2]).
+-export([ws_recv_frame_header/2]).
+-export([ws_recv_frame_end/2]).
+-export([ws_send_frame_start/2]).
+-export([ws_send_frame_end/2]).
+-export([protocol_changed/2]).
+-export([origin_changed/2]).
+-export([cancel/2]).
+-export([disconnect/2]).
+-export([terminate/2]).
+
+init(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+domain_lookup_start(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+domain_lookup_end(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+connect_start(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+connect_end(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+tls_handshake_start(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+tls_handshake_end(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+request_start(Event=#{stream_ref := StreamRef}, State=#{table := Tid}) ->
+ _ = ets:update_counter(Tid, self(), +1, {self(), 0}),
+ propagate(Event, State#{
+ StreamRef => {nofin, nofin}
+ }, ?FUNCTION_NAME).
+
+request_headers(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+request_end(Event=#{stream_ref := StreamRef}, State0=#{table := Tid}) ->
+ State = case State0 of
+ #{StreamRef := {nofin, fin}} ->
+ _ = ets:update_counter(Tid, self(), -1),
+ maps:remove(StreamRef, State0);
+ #{StreamRef := {nofin, IsFin}} ->
+ State0#{StreamRef => {fin, IsFin}}
+ end,
+ propagate(Event, State, ?FUNCTION_NAME).
+
+push_promise_start(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+push_promise_end(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+response_start(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+response_inform(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+response_headers(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+response_trailers(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+response_end(Event=#{stream_ref := StreamRef}, State0=#{table := Tid}) ->
+ State = case State0 of
+ #{StreamRef := {fin, nofin}} ->
+ _ = ets:update_counter(Tid, self(), -1),
+ maps:remove(StreamRef, State0);
+ #{StreamRef := {IsFin, nofin}} ->
+ State0#{StreamRef => {IsFin, fin}}
+ end,
+ propagate(Event, State, ?FUNCTION_NAME).
+
+ws_upgrade(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+ws_recv_frame_start(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+ws_recv_frame_header(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+ws_recv_frame_end(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+ws_send_frame_start(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+ws_send_frame_end(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+protocol_changed(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+origin_changed(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+cancel(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+disconnect(Event, State=#{table := Tid}) ->
+ %% The ets:delete/2 call might fail when the pool has shut down.
+ try
+ true = ets:delete(Tid, self())
+ catch _:_ ->
+ ok
+ end,
+ propagate(Event, maps:with([event_handler, table], State), ?FUNCTION_NAME).
+
+terminate(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+propagate(Event, State=#{event_handler := {Mod, ModState0}}, Fun) ->
+ ModState = Mod:Fun(Event, ModState0),
+ State#{event_handler => {Mod, ModState}};
+propagate(_, State, _) ->
+ State.
diff --git a/src/gun_pools_sup.erl b/src/gun_pools_sup.erl
new file mode 100644
index 0000000..157b41b
--- /dev/null
+++ b/src/gun_pools_sup.erl
@@ -0,0 +1,37 @@
+%% Copyright (c) 2021, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_pools_sup).
+-behaviour(supervisor).
+
+%% API.
+-export([start_link/0]).
+
+%% supervisor.
+-export([init/1]).
+
+%% API.
+
+-spec start_link() -> {ok, pid()}.
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% supervisor.
+
+init([]) ->
+ %% @todo Review restart strategies.
+ Procs = [
+ #{id => gun_pool, start => {gun_pool, start_link, []}, restart => transient}
+ ],
+ {ok, {#{strategy => simple_one_for_one}, Procs}}.
diff --git a/src/gun_sup.erl b/src/gun_sup.erl
index dbe6698..a1270d7 100644
--- a/src/gun_sup.erl
+++ b/src/gun_sup.erl
@@ -12,7 +12,6 @@
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-%% @private
-module(gun_sup).
-behaviour(supervisor).
@@ -22,17 +21,17 @@
%% supervisor.
-export([init/1]).
--define(SUPERVISOR, ?MODULE).
-
%% API.
-spec start_link() -> {ok, pid()}.
start_link() ->
- supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []).
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% supervisor.
init([]) ->
- Procs = [{gun, {gun, start_link, []},
- temporary, 5000, worker, [gun]}],
- {ok, {{simple_one_for_one, 10, 10}, Procs}}.
+ Procs = [
+ #{id => gun_conns_sup, start => {gun_conns_sup, start_link, []}, type => supervisor},
+ #{id => gun_pools_sup, start => {gun_pools_sup, start_link, []}, type => supervisor}
+ ],
+ {ok, {#{}, Procs}}.
diff --git a/test/handlers/pool_ws_handler.erl b/test/handlers/pool_ws_handler.erl
new file mode 100644
index 0000000..5475e88
--- /dev/null
+++ b/test/handlers/pool_ws_handler.erl
@@ -0,0 +1,13 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(pool_ws_handler).
+
+-export([init/4]).
+-export([handle/2]).
+
+init(_, _, _, #{user_opts := ReplyTo}) ->
+ {ok, ReplyTo}.
+
+handle(Frame, ReplyTo) ->
+ ReplyTo ! Frame,
+ {ok, 0, ReplyTo}.
diff --git a/test/pool_SUITE.erl b/test/pool_SUITE.erl
new file mode 100644
index 0000000..a48b75d
--- /dev/null
+++ b/test/pool_SUITE.erl
@@ -0,0 +1,376 @@
+%% Copyright (c) 2021, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(pool_SUITE).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-import(ct_helper, [doc/1]).
+-import(ct_helper, [config/2]).
+-import(gun_test, [receive_from/1]).
+
+all() ->
+ ct_helper:all(?MODULE).
+
+init_per_suite(Config) ->
+ {ok, _} = cowboy:start_clear({?MODULE, tcp}, [], do_proto_opts()),
+ Port = ranch:get_port({?MODULE, tcp}),
+ [{port, Port}|Config].
+
+end_per_suite(_) ->
+ ExtraListeners = [
+ max_streams_h2_size_1,
+ max_streams_h2_size_2,
+ reconnect_h1
+ ],
+ _ = [cowboy:stop_listener(Listener) || Listener <- ExtraListeners],
+ ok.
+
+do_proto_opts() ->
+ Routes = [
+ {"/", hello_h, []},
+ {"/delay", delayed_hello_h, 3000},
+ {"/ws", ws_echo_h, []}
+ ],
+ #{
+ env => #{dispatch => cowboy_router:compile([{'_', Routes}])}
+ }.
+
+%% Tests.
+
+hello_pool_h1(Config) ->
+ doc("Confirm the pool can be used for HTTP/1.1 connections."),
+ Port = config(port, Config),
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => ["localhost:", integer_to_binary(Port)]},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams].
+
+hello_pool_h2(Config) ->
+ doc("Confirm the pool can be used for HTTP/2 connections."),
+ Port = config(port, Config),
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => ["localhost:", integer_to_binary(Port)]},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams].
+
+hello_pool_ws(Config) ->
+ doc("Confirm the pool can be used for HTTP/1.1 connections upgraded to Websocket."),
+ Port = config(port, Config),
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{
+ protocols => [http],
+ ws_opts => #{
+ default_protocol => pool_ws_handler,
+ user_opts => self()
+ }
+ },
+ scope => ?FUNCTION_NAME,
+ setup_fun => {fun
+ (ConnPid, {gun_up, _, http}, SetupState) ->
+ _ = gun:ws_upgrade(ConnPid, "/ws"),
+ {setup, SetupState};
+ (_, {gun_upgrade, _, StreamRef, _, _}, _) ->
+ {up, ws, #{ws => StreamRef}};
+ (ConnPid, Msg, SetupState) ->
+ ct:pal("Unexpected setup message for ~p: ~p", [ConnPid, Msg]),
+ {setup, SetupState}
+ end, undefined}
+ }),
+ gun_pool:await_up(ManagerPid),
+ _ = [gun_pool:ws_send({text, <<"Hello world!">>}, #{
+ authority => ["localhost:", integer_to_binary(Port)],
+ scope => ?FUNCTION_NAME
+ }) || _ <- lists:seq(1, 8)],
+ %% The pool_ws_handler module sends frames back to us.
+ _ = [receive
+ {text, <<"Hello world!">>} ->
+ ok
+ end || _ <- lists:seq(1, 8)].
+
+max_streams_h1(Config) ->
+ doc("Confirm requests are rejected when the maximum number "
+ "of streams is reached for HTTP/1.1 connections."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME,
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ {async, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}),
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}).
+
+max_streams_h1_retry(Config) ->
+ doc("Confirm connection checkout is retried when the maximum number "
+ "of streams is reached for HTTP/1.1 connections."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME,
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ {async, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}),
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}),
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{
+ checkout_retry => [100, 500, 500, 500, 500, 500, 500],
+ scope => ?FUNCTION_NAME
+ }).
+
+max_streams_h2_size_1(_) ->
+ doc("Confirm requests are rejected when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ [{async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}) || _ <- lists:seq(1, 5)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}).
+
+max_streams_h2_size_1_retry(_) ->
+ doc("Confirm connection checkout is retried when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ [{async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}) || _ <- lists:seq(1, 5)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{
+ checkout_retry => [100, 500, 500, 500, 500, 500, 500]
+ }).
+
+max_streams_h2_size_2(_) ->
+ doc("Confirm requests are rejected when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 2
+ }),
+ gun_pool:await_up(ManagerPid),
+ [begin
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ %% We need to wait a bit for the request to be sent because the
+ %% request is sent and counted asynchronously.
+ timer:sleep(10)
+ end || _ <- lists:seq(1, 10)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}).
+
+max_streams_h2_size_2_retry(_) ->
+ doc("Confirm connection checkout is retried when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 2
+ }),
+ gun_pool:await_up(ManagerPid),
+ [begin
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ %% We need to wait a bit for the request to be sent because the
+ %% request is sent and counted asynchronously.
+ timer:sleep(10)
+ end || _ <- lists:seq(1, 10)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{
+ checkout_retry => [100, 500, 500, 500, 500, 500, 500]
+ }).
+
+kill_restart_h1(Config) ->
+ doc("Confirm the Gun process is restarted and the pool operational "
+ "after an HTTP/1.1 Gun process has crashed."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Get a connection and kill the process.
+ {operational, #{conns := Conns}} = gun_pool:info(ManagerPid),
+ ConnPid = hd(maps:keys(Conns)),
+ MRef = monitor(process, ConnPid),
+ exit(ConnPid, {shutdown, ?FUNCTION_NAME}),
+ receive {'DOWN', MRef, process, ConnPid, _} -> ok end,
+ {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+kill_restart_h2(Config) ->
+ doc("Confirm the Gun process is restarted and the pool operational "
+ "after an HTTP/2 Gun process has crashed."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Get a connection and kill the process.
+ {operational, #{conns := Conns}} = gun_pool:info(ManagerPid),
+ ConnPid = hd(maps:keys(Conns)),
+ MRef = monitor(process, ConnPid),
+ exit(ConnPid, {shutdown, ?FUNCTION_NAME}),
+ receive {'DOWN', MRef, process, ConnPid, _} -> ok end,
+ {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+%% @todo kill_restart_ws
+
+reconnect_h1(_) ->
+ doc("Confirm the Gun process reconnects automatically for HTTP/1.1 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ idle_timeout => 500,
+ scope => ?FUNCTION_NAME
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]}
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/", #{<<"host">> => Authority}) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Wait for the idle timeout to trigger.
+ timer:sleep(600),
+% {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/", #{<<"host">> => Authority}) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+reconnect_h2(Config) ->
+ doc("Confirm the Gun process reconnects automatically for HTTP/2 connections."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Wait for the idle timeout to trigger.
+ timer:sleep(600),
+% {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+%% @todo reconnect_ws