aboutsummaryrefslogblamecommitdiffstats
path: root/src/gun_pool.erl
blob: 5d49a7c1c9e13f423a2b0a3745384b602af88d51 (plain) (tree)

































































































                                                                           
                                                                                     
















































                                                                                                                        
                       

















                                                                                          





                                                                                     
                                                



                                                                               












































                                                                                        
                                                                                 






























































































































































                                                                                                                                  
                                                                                         







































































































































































































































































































































                                                                                                                                      
%% Copyright (c) 2021, Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

-module(gun_pool).
-behaviour(gen_statem).

%% Pools.
-export([start_pool/3]).
-export([stop_pool/2]).
-export([stop_pool/3]).
% @todo shutdown pool?
-export([info/0]).
-export([info/1]).
-export([info/2]).
-export([await_up/1]).
-export([await_up/2]).
-export([checkout/2]). %% Use responsibly!

%% Requests.
-export([delete/2]).
-export([delete/3]).
-export([get/2]).
-export([get/3]).
-export([head/2]).
-export([head/3]).
-export([options/2]).
-export([options/3]).
-export([patch/2]).
-export([patch/3]).
-export([patch/4]).
-export([post/2]).
-export([post/3]).
-export([post/4]).
-export([put/2]).
-export([put/3]).
-export([put/4]).

%% Generic requests interface.
-export([headers/3]).
-export([headers/4]).
-export([request/4]).
-export([request/5]).

%% Streaming data.
-export([data/3]).

%% Tunneling. (HTTP/2+ only.)
%% @todo -export([connect/2]).
%% @todo -export([connect/3]).
%% @todo -export([connect/4]).

%% Cookies.
%% @todo -export([gc_cookies/1]).
%% @todo -export([session_gc_cookies/1]).

%% Awaiting gun messages.
-export([await/1]).
-export([await/2]).
-export([await/3]).
-export([await_body/1]).
-export([await_body/2]).
-export([await_body/3]).

%% Flushing gun messages.
-export([flush/1]).

%% Streams.
-export([update_flow/2]).
-export([cancel/1]).
-export([stream_info/1]).

%% Websocket. (HTTP/2+ only for upgrade.)
%% -export([ws_upgrade/1]).
%% -export([ws_upgrade/2]).
%% -export([ws_upgrade/3]).
-export([ws_send/2]).
%% -export([ws_send/3]). (HTTP/2+ only.)

%% Internals.
-export([callback_mode/0]).
-export([start_link/3]).
-export([init/1]).
-export([degraded/3]).
-export([operational/3]).
-export([terminate/3]).

-type setup_msg() :: {gun_up, pid(), http | http2 | raw | socks}
	| {gun_upgrade, pid(), gun:stream_ref(), [binary()], [{binary(), binary()}]}.

-type opts() :: #{
	conn_opts => gun:opts(),
	scope => any(),
	setup_fun => {fun((pid(), setup_msg(), any()) -> any()), any()},
	size => non_neg_integer()
}.
-export_type([opts/0]).

-type pool_stream_ref() :: {pid(), gun:stream_ref()}.
-export_type([pool_stream_ref/0]).

-type error_result() :: {error, pool_not_found | no_connection_available, atom()}. %% @todo {pool_start_error, SupError}

-type request_result() :: {async, pool_stream_ref()}
	%% @todo {sync, ...} perhaps with Status, Headers, Body, and Extra info such as intermediate responses.
	| error_result().

-type req_opts() :: #{
	%% Options common with normal Gun.
	flow => pos_integer(),
	reply_to => pid(),
%	@todo tunnel => stream_ref(),

	%% Options specific to pools.
	checkout_call_timeout => timeout(),
	checkout_retry => [pos_integer()],
	scope => any(),
	start_pool_if_missing => boolean()
}.
-export_type([req_opts/0]).

-type ws_send_opts() :: #{
	authority => iodata(),

	%% Options specific to pools.
	checkout_call_timeout => timeout(),
	checkout_retry => [pos_integer()],
	scope => any(),
	start_pool_if_missing => boolean()
}.
-export_type([ws_send_opts/0]).

%% @todo tunnel
-type meta() :: #{pid() => #{ws => gun:stream_ref()}}.

-record(state, {
	host :: inet:hostname() | inet:ip_address(),
	port :: inet:port_number(),
	opts :: opts(),
	table :: ets:tid(),
	conns :: #{pid() => down | {setup, any()} | {up, http | http2 | ws | raw, map()}},
	conns_meta = #{} :: meta(),
	await_up = [] :: [{pid(), any()}]
}).

%% Pool management.

-spec start_pool(inet:hostname() | inet:ip_address(), inet:port_number(), opts())
	-> {ok, pid()} | {error, any()}.
start_pool(Host, Port, Opts) ->
	supervisor:start_child(gun_pools_sup, [Host, Port, Opts]).

-spec stop_pool(inet:hostname() | inet:ip_address(), inet:port_number())
	-> ok.
stop_pool(Host, Port) ->
	stop_pool(Host, Port, #{}).

-type stop_opts() :: #{
	scope => any(),
	transport => tcp | tls
}.

-spec stop_pool(inet:hostname() | inet:ip_address(), inet:port_number(), stop_opts())
	-> ok | {error, pool_not_found, atom()}.
stop_pool(Host, Port, StopOpts) ->
	Transport = maps:get(transport, StopOpts, gun:default_transport(Port)),
	Authority = gun_http:host_header(Transport, Host, Port),
	case get_pool(Authority, StopOpts) of
		undefined ->
			{error, pool_not_found,
				'No pool was found for the given scope and authority.'};
		ManagerPid ->
			supervisor:terminate_child(gun_pools_sup, ManagerPid)
	end.

-spec info() -> [map()].
info() ->
	ets:foldl(fun({{Scope, _}, ManagerPid}, Acc) ->
		{StateName, Info} = info(ManagerPid),
		[Info#{scope => Scope, state => StateName}|Acc]
	end, [], gun_pools).

-spec info(pid() | binary()) -> undefined | {degraded | operational, map()}.
info(ManagerPid) when is_pid(ManagerPid) ->
	gen_statem:call(ManagerPid, info);
info(Authority) ->
	info(Authority, default).

-spec info(binary(), any()) -> undefined | {degraded | operational, map()}.
info(Authority, Scope) ->
	case ets:lookup(gun_pools, {Scope, Authority}) of
		[] ->
			undefined;
		[{_, ManagerPid}] ->
			gen_statem:call(ManagerPid, info)
	end.

-spec await_up(pid() | binary()) -> ok | {error, pool_not_found, atom()}.
await_up(ManagerPid) when is_pid(ManagerPid) ->
	gen_statem:call(ManagerPid, await_up, 5000);
await_up(Authority) ->
	await_up(Authority, default).

-spec await_up(binary(), any()) -> ok | {error, pool_not_found, atom()}.
await_up(Authority, Scope) ->
	case ets:lookup(gun_pools, {Scope, Authority}) of
		[] ->
			{error, pool_not_found,
				'No pool was found for the given scope and authority.'};
		[{_, ManagerPid}] ->
			gen_statem:call(ManagerPid, await_up, 5000)
	end.

-spec checkout(pid(), req_opts() | ws_send_opts()) -> undefined | {pid(), map()}.
checkout(ManagerPid, ReqOpts=#{checkout_retry := Retry}) when is_list(Retry) ->
	CallTimeout = maps:get(checkout_call_timeout, ReqOpts, 5000),
	case gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout) of
		undefined ->
			checkout_retry(ManagerPid, ReqOpts, CallTimeout, Retry);
		Result ->
			Result
	end;
checkout(ManagerPid, ReqOpts) ->
	CallTimeout = maps:get(checkout_call_timeout, ReqOpts, 5000),
	gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout).

%% When the checkout_retry option is used, and the first call resulted
%% in no connection being given out, we wait for the configured amount
%% of time then try again. We loop over the wait times until there is
%% none.
checkout_retry(_, _, _, []) ->
	undefined;
checkout_retry(ManagerPid, ReqOpts, CallTimeout, [Wait|Retry]) ->
	timer:sleep(Wait),
	case gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout) of
		undefined ->
			checkout_retry(ManagerPid, ReqOpts, CallTimeout, Retry);
		Result ->
			Result
	end.

%% Requests.

-spec delete(iodata(), gun:req_headers()) -> request_result().
delete(Path, Headers) ->
	request(<<"DELETE">>, Path, Headers, <<>>).

-spec delete(iodata(), gun:req_headers(), req_opts()) -> request_result().
delete(Path, Headers, ReqOpts) ->
	request(<<"DELETE">>, Path, Headers, <<>>, ReqOpts).

-spec get(iodata(), gun:req_headers()) -> request_result().
get(Path, Headers) ->
	request(<<"GET">>, Path, Headers, <<>>).

-spec get(iodata(), gun:req_headers(), req_opts()) -> request_result().
get(Path, Headers, ReqOpts) ->
	request(<<"GET">>, Path, Headers, <<>>, ReqOpts).

-spec head(iodata(), gun:req_headers()) -> request_result().
head(Path, Headers) ->
	request(<<"HEAD">>, Path, Headers, <<>>).

-spec head(iodata(), gun:req_headers(), req_opts()) -> request_result().
head(Path, Headers, ReqOpts) ->
	request(<<"HEAD">>, Path, Headers, <<>>, ReqOpts).

-spec options(iodata(), gun:req_headers()) -> request_result().
options(Path, Headers) ->
	request(<<"OPTIONS">>, Path, Headers, <<>>).

-spec options(iodata(), gun:req_headers(), req_opts()) -> request_result().
options(Path, Headers, ReqOpts) ->
	request(<<"OPTIONS">>, Path, Headers, <<>>, ReqOpts).

-spec patch(iodata(), gun:req_headers()) -> request_result().
patch(Path, Headers) ->
	headers(<<"PATCH">>, Path, Headers).

-spec patch(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
patch(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
	headers(<<"PATCH">>, Path, Headers, ReqOpts);
patch(Path, Headers, Body) ->
	request(<<"PATCH">>, Path, Headers, Body).

-spec patch(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
patch(Path, Headers, Body, ReqOpts) ->
	request(<<"PATCH">>, Path, Headers, Body, ReqOpts).

-spec post(iodata(), gun:req_headers()) -> request_result().
post(Path, Headers) ->
	headers(<<"POST">>, Path, Headers).

-spec post(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
post(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
	headers(<<"POST">>, Path, Headers, ReqOpts);
post(Path, Headers, Body) ->
	request(<<"POST">>, Path, Headers, Body).

-spec post(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
post(Path, Headers, Body, ReqOpts) ->
	request(<<"POST">>, Path, Headers, Body, ReqOpts).

-spec put(iodata(), gun:req_headers()) -> request_result().
put(Path, Headers) ->
	headers(<<"PUT">>, Path, Headers).

-spec put(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
put(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
	headers(<<"PUT">>, Path, Headers, ReqOpts);
put(Path, Headers, Body) ->
	request(<<"PUT">>, Path, Headers, Body).

-spec put(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
put(Path, Headers, Body, ReqOpts) ->
	request(<<"PUT">>, Path, Headers, Body, ReqOpts).

%% Generic requests interface.
%%
%% @todo Accept a TargetURI map as well as a normal Path.

-spec headers(iodata(), iodata(), gun:req_headers()) -> request_result().
headers(Method, Path, Headers) ->
	headers(Method, Path, Headers, #{}).

-spec headers(iodata(), iodata(), gun:req_headers(), req_opts()) -> request_result().
headers(Method, Path, Headers, ReqOpts) ->
	case get_pool(authority(Headers), ReqOpts) of
		undefined ->
			{error, pool_not_found,
				'No pool was found for the given scope and authority.'};
		ManagerPid ->
			case checkout(ManagerPid, ReqOpts) of
				undefined ->
					{error, no_connection_available,
						'No connection in the pool with enough capacity available to open a new stream.'};
				{ConnPid, _Meta} ->
					StreamRef = gun:headers(ConnPid, Method, Path, Headers, ReqOpts),
					%% @todo Synchronous mode.
					{async, {ConnPid, StreamRef}}
			end
	end.

-spec request(iodata(), iodata(), gun:req_headers(), iodata()) -> request_result().
request(Method, Path, Headers, Body) ->
	request(Method, Path, Headers, Body, #{}).

-spec request(iodata(), iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
request(Method, Path, Headers, Body, ReqOpts) ->
	case get_pool(authority(Headers), ReqOpts) of
		undefined ->
			{error, pool_not_found,
				'No pool was found for the given scope and authority.'};
		ManagerPid ->
			case checkout(ManagerPid, ReqOpts) of
				undefined ->
					{error, no_connection_available,
						'No connection in the pool with enough capacity available to open a new stream.'};
				{ConnPid, _Meta} ->
					StreamRef = gun:request(ConnPid, Method, Path, Headers, Body, ReqOpts),
					%% @todo Synchronous mode.
					{async, {ConnPid, StreamRef}}
			end
	end.

%% We require the host to be given in the headers for the time being.
%% @todo Allow passing it in options. Websocket send already does that.
authority(#{<<"host">> := Authority}) ->
	Authority;
authority(Headers) ->
	{_, Authority} = lists:keyfind(<<"host">>, 1, Headers),
	Authority.

-spec get_pool(iolist(), req_opts() | ws_send_opts() | stop_opts()) -> pid() | undefined.
get_pool(Authority0, ReqOpts) ->
	Authority = iolist_to_binary(Authority0),
	%% @todo Perhaps rename this to temporary.
	%% There's two concepts: temporary pool is started and stops
	%% when there's no traffic. Dynamic pool simply reduces its number
	%% of connections when there's no traffic.
	StartPoolIfMissing = maps:get(start_pool_if_missing, ReqOpts, false),
	Scope = maps:get(scope, ReqOpts, default),
	case ets:lookup(gun_pools, {Scope, Authority}) of
		[] when StartPoolIfMissing ->
			start_missing_pool(Authority, ReqOpts);
		[] ->
			undefined;
		[{_, ManagerPid}] ->
			%% @todo With temporary pool, getting a pid here doesn't mean the pool can be used.
			%% Indeed the manager could be in process of stopping. I suppose we must
			%% do a check but perhaps it's best to leave that detail to the user
			%% (they can easily retry and recreate the pool if necessary).
			ManagerPid
	end.

start_missing_pool(_Authority, _ReqOpts) ->
	undefined.

%% Streaming data.

-spec data(pool_stream_ref(), fin | nofin, iodata()) -> ok.
data({ConnPid, StreamRef}, IsFin, Data) ->
	gun:data(ConnPid, StreamRef, IsFin, Data).

%% Awaiting gun messages.

-spec await(pool_stream_ref()) -> gun:await_result().
await({ConnPid, StreamRef}) ->
	gun:await(ConnPid, StreamRef).

-spec await(pool_stream_ref(), timeout() | reference()) -> gun:await_result().
await({ConnPid, StreamRef}, MRefOrTimeout) ->
	gun:await(ConnPid, StreamRef, MRefOrTimeout).

-spec await(pool_stream_ref(), timeout(), reference()) -> gun:await_result().
await({ConnPid, StreamRef}, Timeout, MRef) ->
	gun:await(ConnPid, StreamRef, Timeout, MRef).

-spec await_body(pool_stream_ref()) -> gun:await_body_result().
await_body({ConnPid, StreamRef}) ->
	gun:await_body(ConnPid, StreamRef).

-spec await_body(pool_stream_ref(), timeout() | reference()) -> gun:await_body_result().
await_body({ConnPid, StreamRef}, MRefOrTimeout) ->
	gun:await_body(ConnPid, StreamRef, MRefOrTimeout).

-spec await_body(pool_stream_ref(), timeout(), reference()) -> gun:await_body_result().
await_body({ConnPid, StreamRef}, Timeout, MRef) ->
	gun:await_body(ConnPid, StreamRef, Timeout, MRef).

%% Flushing gun messages.

-spec flush(pool_stream_ref()) -> ok.
flush({ConnPid, _}) ->
	gun:flush(ConnPid).

%% Flow control.

-spec update_flow(pool_stream_ref(), pos_integer()) -> ok.
update_flow({ConnPid, StreamRef}, Flow) ->
	gun:update_flow(ConnPid, StreamRef, Flow).

%% Cancelling a stream.

-spec cancel(pool_stream_ref()) -> ok.
cancel({ConnPid, StreamRef}) ->
	gun:cancel(ConnPid, StreamRef).

%% Information about a stream.

-spec stream_info(pool_stream_ref()) -> {ok, map() | undefined} | {error, not_connected}.
stream_info({ConnPid, StreamRef}) ->
	gun:stream_info(ConnPid, StreamRef).

%% Websocket.

-spec ws_send(gun:ws_frame() | [gun:ws_frame()], ws_send_opts()) -> ok | error_result().
ws_send(Frames, WsSendOpts=#{authority := Authority}) ->
	case get_pool(Authority, WsSendOpts) of
		undefined ->
			{error, pool_not_found,
				'No pool was found for the given scope and authority.'};
		ManagerPid ->
			case checkout(ManagerPid, WsSendOpts) of
				undefined ->
					{error, no_connection_available,
						'No connection in the pool with enough capacity available to send Websocket frames.'};
				{ConnPid, #{ws := StreamRef}} ->
					gun:ws_send(ConnPid, StreamRef, Frames)
			end
	end.

%% Pool manager internals.
%%
%% The pool manager is responsible for starting connection processes
%% and restarting them as necessary. It also provides a suitable
%% connection process to any caller that needs it.
%%
%% The pool manager installs an event handler into each connection.
%% The event handler is responsible for counting the number of
%% active streams. It updates the gun_pooled_conns ets table
%% whenever a stream begins or ends.
%%
%% A connection is deemed suitable if it is possible to open new
%% streams. How many streams can be open at any one time depends
%% on the protocol. For HTTP/2 the manager process keeps track of
%% the connection's settings to know the maximum. For non-stream
%% based protocols, there is no limit.
%%
%% The connection to be used is otherwise chosen randomly. The
%% first connection that is suitable is returned. There is no
%% need to "give back" the connection to the manager.

%% @todo
%% What should happen if we always fail to reconnect? I suspect we keep the manager
%% around and propagate errors, the same as if there's no more capacity? Perhaps have alarms?

callback_mode() -> state_functions.

start_link(Host, Port, Opts) ->
	gen_statem:start_link(?MODULE, {Host, Port, Opts}, []).

init({Host, Port, Opts}) ->
	process_flag(trap_exit, true),
	true = ets:insert_new(gun_pools, {gun_pools_key(Host, Port, Opts), self()}),
	Tid = ets:new(gun_pooled_conns, [ordered_set, public]),
	Size = maps:get(size, Opts, 8),
	%% @todo Only start processes in static mode.
	ConnOpts = conn_opts(Tid, Opts),
	Conns = maps:from_list([begin
		{ok, ConnPid} = gun:open(Host, Port, ConnOpts),
		_ = monitor(process, ConnPid),
		{ConnPid, down}
	end || _ <- lists:seq(1, Size)]),
	State = #state{
		host=Host,
		port=Port,
		opts=Opts,
		table=Tid,
		conns=Conns
	},
	%% If Size is 0 then we can never be operational.
	{ok, degraded, State}.

gun_pools_key(Host, Port, Opts) ->
	Transport = maps:get(transport, Opts, gun:default_transport(Port)),
	Authority = gun_http:host_header(Transport, Host, Port),
	Scope = maps:get(scope, Opts, default),
	{Scope, iolist_to_binary(Authority)}.

conn_opts(Tid, Opts) ->
	ConnOpts = maps:get(conn_opts, Opts, #{}),
	EventHandlerState = maps:with([event_handler], ConnOpts),
	H2Opts = maps:get(http2_opts, ConnOpts, #{}),
	ConnOpts#{
		event_handler => {gun_pool_events_h, EventHandlerState#{
			table => Tid
		}},
		http2_opts => H2Opts#{
			notify_settings_changed => true
		}
	}.

%% We use the degraded state as long as at least one connection is degraded.
%% @todo Probably keep count of connections separately to avoid counting every time.
degraded(info, Msg={gun_up, ConnPid, _}, StateData=#state{opts=Opts, conns=Conns}) ->
	#{ConnPid := down} = Conns,
	%% We optionally run the setup function if one is defined. The
	%% setup function tells us whether we are fully up or not. The
	%% setup function may be called repeatedly until the connection
	%% is established.
	%%
	%% @todo It is possible that the connection never does get
	%% fully established. We should deal with this. We probably
	%% need to handle all messages.
	{SetupFun, SetupState0} = setup_fun(Opts),
	degraded_setup(ConnPid, Msg, StateData, SetupFun, SetupState0);
%% @todo
%degraded(info, Msg={gun_tunnel_up, ConnPid, _, _}, StateData0=#state{conns=Conns}) ->
%	;
degraded(info, Msg={gun_upgrade, ConnPid, _, _, _},
		StateData=#state{opts=#{setup_fun := {SetupFun, _}}, conns=Conns}) ->
	%% @todo Probably shouldn't crash if the state is incorrect, that's programmer error though.
	#{ConnPid := {setup, SetupState0}} = Conns,
	%% We run the setup function again using the state previously kept.
	degraded_setup(ConnPid, Msg, StateData, SetupFun, SetupState0);
degraded(Type, Event, StateData) ->
	handle_common(Type, Event, ?FUNCTION_NAME, StateData).

setup_fun(#{setup_fun := SetupFun}) ->
	SetupFun;
setup_fun(_) ->
	{fun (_, {gun_up, _, Protocol}, _) ->
		{up, Protocol, #{}}
	end, undefined}.

degraded_setup(ConnPid, Msg, StateData0=#state{conns=Conns, conns_meta=ConnsMeta,
		await_up=AwaitUp}, SetupFun, SetupState0) ->
	case SetupFun(ConnPid, Msg, SetupState0) of
		Setup={setup, _SetupState} ->
			StateData = StateData0#state{conns=Conns#{ConnPid => Setup}},
			{keep_state, StateData};
		%% The Meta is different from Settings. It allows passing around
		%% Websocket or tunnel stream refs.
		{up, Protocol, Meta} ->
			Settings = #{},
			StateData = StateData0#state{
				conns=Conns#{ConnPid => {up, Protocol, Settings}},
				conns_meta=ConnsMeta#{ConnPid => Meta}
			},
			case is_degraded(StateData) of
				true -> {keep_state, StateData};
				false -> {next_state, operational, StateData#state{await_up=[]},
					[{reply, ReplyTo, ok} || ReplyTo <- AwaitUp]}
			end
	end.

is_degraded(#state{conns=Conns0}) ->
	Conns = maps:to_list(Conns0),
	Len = length(Conns),
	Ups = [up || {_, {up, _, _}} <- Conns],
	Len =/= length(Ups).

operational(Type, Event, StateData) ->
	handle_common(Type, Event, ?FUNCTION_NAME, StateData).

handle_common({call, From}, {checkout, _ReqOpts}, _,
		StateData=#state{conns_meta=ConnsMeta}) ->
	case find_available_connection(StateData) of
		none ->
			{keep_state_and_data, {reply, From, undefined}};
		ConnPid ->
			Meta = maps:get(ConnPid, ConnsMeta, #{}),
			{keep_state_and_data, {reply, From, {ConnPid, Meta}}}
	end;
handle_common(info, {gun_notify, ConnPid, settings_changed, Settings}, _, StateData=#state{conns=Conns}) ->
	%% Assert that the state is correct.
	{up, http2, _} = maps:get(ConnPid, Conns),
	{keep_state, StateData#state{conns=Conns#{ConnPid => {up, http2, Settings}}}};
handle_common(info, {gun_down, ConnPid, Protocol, _Reason, _KilledStreams}, _, StateData=#state{conns=Conns}) ->
	{up, Protocol, _} = maps:get(ConnPid, Conns),
	{next_state, degraded, StateData#state{conns=Conns#{ConnPid => down}}};
%% @todo We do not want to reconnect automatically when the pool is dynamic.
handle_common(info, {'DOWN', _MRef, process, ConnPid0, _Reason}, _,
		StateData=#state{host=Host, port=Port, opts=Opts, table=Tid, conns=Conns0, conns_meta=ConnsMeta0}) ->
	Conns = maps:remove(ConnPid0, Conns0),
	ConnsMeta = maps:remove(ConnPid0, ConnsMeta0),
	ConnOpts = conn_opts(Tid, Opts),
	{ok, ConnPid} = gun:open(Host, Port, ConnOpts),
	_ = monitor(process, ConnPid),
	{next_state, degraded, StateData#state{conns=Conns#{ConnPid => down}, conns_meta=ConnsMeta}};
handle_common({call, From}, info, StateName, #state{host=Host, port=Port,
		opts=Opts, table=Tid, conns=Conns, conns_meta=ConnsMeta}) ->
	{keep_state_and_data, {reply, From, {StateName, #{
		%% @todo Not sure whether all of this should be documented. Maybe not ConnsMeta for now?
		host => Host,
		port => Port,
		opts => Opts,
		table => Tid,
		conns => Conns,
		conns_meta => ConnsMeta
	}}}};
handle_common({call, From}, await_up, operational, _) ->
	{keep_state_and_data, {reply, From, ok}};
handle_common({call, From}, await_up, _, StateData=#state{await_up=AwaitUp}) ->
	{keep_state, StateData#state{await_up=[From|AwaitUp]}};
handle_common(Type, Event, StateName, StateData) ->
	logger:error("Unexpected event in state ~p of type ~p:~n~w~n~p~n",
		[StateName, Type, Event, StateData]),
	keep_state_and_data.

%% We go over every connection and return the first one
%% we find that has capacity. How we determine whether
%% capacity is available depends on the protocol. For
%% HTTP/2 we look into the protocol settings. The
%% current number of streams is maintained by the
%% event handler gun_pool_events_h.
find_available_connection(#state{table=Tid, conns=Conns}) ->
	I = lists:sort([{rand:uniform(), K} || K <- maps:keys(Conns)]),
	find_available_connection(I, Conns, Tid).

find_available_connection([], _, _) ->
	none;
find_available_connection([{_, ConnPid}|I], Conns, Tid) ->
	case maps:get(ConnPid, Conns) of
		{up, Protocol, Settings} ->
			MaxStreams = max_streams(Protocol, Settings),
			CurrentStreams = case ets:lookup(Tid, ConnPid) of
				[] ->
					0;
				[{_, CS}] ->
					CS
			end,
			if
				CurrentStreams + 1 > MaxStreams ->
					find_available_connection(I, Conns, Tid);
				true ->
					ConnPid
			end;
		_ ->
			find_available_connection(I, Conns, Tid)
	end.

max_streams(http, _) ->
	1;
max_streams(http2, #{max_concurrent_streams := MaxStreams}) ->
	MaxStreams;
max_streams(http2, #{}) ->
	infinity;
%% There are no streams or Gun is not aware of streams when
%% the protocol is Websocket or raw.
max_streams(ws, _) ->
	infinity;
max_streams(raw, _) ->
	infinity.

terminate(Reason, StateName, #state{host=Host, port=Port, opts=Opts, await_up=AwaitUp}) ->
	gen_statem:reply([
		{reply, ReplyTo, {error, {terminate, StateName, Reason}}}
	|| ReplyTo <- AwaitUp]),
	true = ets:delete(gun_pools, gun_pools_key(Host, Port, Opts)),
	ok.