aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_pool.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/gun_pool.erl')
-rw-r--r--src/gun_pool.erl704
1 files changed, 704 insertions, 0 deletions
diff --git a/src/gun_pool.erl b/src/gun_pool.erl
new file mode 100644
index 0000000..56734ee
--- /dev/null
+++ b/src/gun_pool.erl
@@ -0,0 +1,704 @@
+%% Copyright (c) 2021, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_pool).
+-behaviour(gen_statem).
+
+%% Pools.
+-export([start_pool/3]).
+-export([stop_pool/2]).
+-export([stop_pool/3]).
+% @todo shutdown pool?
+-export([info/0]).
+-export([info/1]).
+-export([info/2]).
+-export([await_up/1]).
+-export([await_up/2]).
+-export([checkout/2]). %% Use responsibly!
+
+%% Requests.
+-export([delete/2]).
+-export([delete/3]).
+-export([get/2]).
+-export([get/3]).
+-export([head/2]).
+-export([head/3]).
+-export([options/2]).
+-export([options/3]).
+-export([patch/2]).
+-export([patch/3]).
+-export([patch/4]).
+-export([post/2]).
+-export([post/3]).
+-export([post/4]).
+-export([put/2]).
+-export([put/3]).
+-export([put/4]).
+
+%% Generic requests interface.
+-export([headers/3]).
+-export([headers/4]).
+-export([request/4]).
+-export([request/5]).
+
+%% Streaming data.
+-export([data/3]).
+
+%% Tunneling. (HTTP/2+ only.)
+%% @todo -export([connect/2]).
+%% @todo -export([connect/3]).
+%% @todo -export([connect/4]).
+
+%% Cookies.
+%% @todo -export([gc_cookies/1]).
+%% @todo -export([session_gc_cookies/1]).
+
+%% Awaiting gun messages.
+-export([await/1]).
+-export([await/2]).
+-export([await/3]).
+-export([await_body/1]).
+-export([await_body/2]).
+-export([await_body/3]).
+
+%% Flushing gun messages.
+-export([flush/1]).
+
+%% Streams.
+-export([update_flow/2]).
+-export([cancel/1]).
+-export([stream_info/1]).
+
+%% Websocket. (HTTP/2+ only for upgrade.)
+%% -export([ws_upgrade/1]).
+%% -export([ws_upgrade/2]).
+%% -export([ws_upgrade/3]).
+-export([ws_send/2]).
+%% -export([ws_send/3]). (HTTP/2+ only.)
+
+%% Internals.
+-export([callback_mode/0]).
+-export([start_link/3]).
+-export([init/1]).
+-export([degraded/3]).
+-export([operational/3]).
+-export([terminate/3]).
+
+-type setup_msg() :: {gun_up, pid(), http | http2 | raw | socks}
+ | {gun_upgrade, pid(), gun:stream_ref(), [<<"websocket">>], [{binary(), binary()}]}.
+
+-type opts() :: #{
+ conn_opts => gun:opts(),
+ scope => any(),
+ setup_fun => {fun((pid(), setup_msg(), any()) -> any()), any()},
+ size => non_neg_integer()
+}.
+-export_type([opts/0]).
+
+-type pool_stream_ref() :: {pid(), gun:stream_ref()}.
+-export_type([pool_stream_ref/0]).
+
+-type error_result() :: {error, pool_not_found | no_connection_available, atom()}. %% @todo {pool_start_error, SupError}
+
+-type request_result() :: {async, pool_stream_ref()}
+ %% @todo {sync, ...} perhaps with Status, Headers, Body, and Extra info such as intermediate responses.
+ | error_result().
+
+-type req_opts() :: #{
+ %% Options common with normal Gun.
+ flow => pos_integer(),
+ reply_to => pid(),
+% @todo tunnel => stream_ref(),
+
+ %% Options specific to pools.
+ checkout_call_timeout => timeout(),
+ checkout_retry => [pos_integer()],
+ scope => any(),
+ start_pool_if_missing => boolean()
+}.
+-export_type([req_opts/0]).
+
+-type ws_send_opts() :: #{
+ authority => iodata(),
+
+ %% Options specific to pools.
+ checkout_call_timeout => timeout(),
+ checkout_retry => [pos_integer()],
+ scope => any(),
+ start_pool_if_missing => boolean()
+}.
+-export_type([ws_send_opts/0]).
+
+%% @todo tunnel
+-type meta() :: #{pid() => #{ws => gun:stream_ref()}}.
+
+-record(state, {
+ host :: inet:hostname() | inet:ip_address(),
+ port :: inet:port_number(),
+ opts :: gun:opts(),
+ table :: ets:tid(),
+ conns :: #{pid() => down | {setup, any()} | {up, http | http2 | ws | raw, map()}},
+ conns_meta = #{} :: meta(),
+ await_up = [] :: [{pid(), any()}]
+}).
+
+%% Pool management.
+
+-spec start_pool(inet:hostname() | inet:ip_address(), inet:port_number(), opts())
+ -> {ok, pid()} | {error, any()}.
+start_pool(Host, Port, Opts) ->
+ supervisor:start_child(gun_pools_sup, [Host, Port, Opts]).
+
+-spec stop_pool(inet:hostname() | inet:ip_address(), inet:port_number())
+ -> ok.
+stop_pool(Host, Port) ->
+ stop_pool(Host, Port, #{}).
+
+-spec stop_pool(inet:hostname() | inet:ip_address(), inet:port_number(), req_opts())
+ -> ok | {error, pool_not_found, atom()}.
+stop_pool(Host, Port, ReqOpts) ->
+ case get_pool(iolist_to_binary([Host, $:, integer_to_binary(Port)]), ReqOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ supervisor:terminate_child(gun_pools_sup, ManagerPid)
+ end.
+
+-spec info() -> [map()].
+info() ->
+ ets:foldl(fun({{Scope, _}, ManagerPid}, Acc) ->
+ {StateName, Info} = info(ManagerPid),
+ [Info#{scope => Scope, state => StateName}|Acc]
+ end, [], gun_pools).
+
+-spec info(pid() | binary()) -> undefined | {degraded | operational, map()}.
+info(ManagerPid) when is_pid(ManagerPid) ->
+ gen_statem:call(ManagerPid, info);
+info(Authority) ->
+ info(Authority, default).
+
+-spec info(binary(), any()) -> undefined | {degraded | operational, map()}.
+info(Authority, Scope) ->
+ case ets:lookup(gun_pools, {Scope, Authority}) of
+ [] ->
+ undefined;
+ [{_, ManagerPid}] ->
+ gen_statem:call(ManagerPid, info)
+ end.
+
+-spec await_up(pid() | binary()) -> ok | {error, pool_not_found, atom()}.
+await_up(ManagerPid) when is_pid(ManagerPid) ->
+ gen_statem:call(ManagerPid, await_up, 5000);
+await_up(Authority) ->
+ await_up(Authority, default).
+
+-spec await_up(binary(), any()) -> ok | {error, pool_not_found, atom()}.
+await_up(Authority, Scope) ->
+ case ets:lookup(gun_pools, {Scope, Authority}) of
+ [] ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ [{_, ManagerPid}] ->
+ gen_statem:call(ManagerPid, await_up, 5000)
+ end.
+
+-spec checkout(pid(), req_opts()) -> undefined | {pid(), map()}.
+checkout(ManagerPid, ReqOpts=#{checkout_retry := Retry}) when is_list(Retry) ->
+ CallTimeout = maps:get(checkout_call_timeout, ReqOpts, 5000),
+ case gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout) of
+ undefined ->
+ checkout_retry(ManagerPid, ReqOpts, CallTimeout, Retry);
+ Result ->
+ Result
+ end;
+checkout(ManagerPid, ReqOpts) ->
+ CallTimeout = maps:get(checkout_call_timeout, ReqOpts, 5000),
+ gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout).
+
+%% When the checkout_retry option is used, and the first call resulted
+%% in no connection being given out, we wait for the configured amount
+%% of time then try again. We loop over the wait times until there is
+%% none.
+checkout_retry(_, _, _, []) ->
+ undefined;
+checkout_retry(ManagerPid, ReqOpts, CallTimeout, [Wait|Retry]) ->
+ timer:sleep(Wait),
+ case gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout) of
+ undefined ->
+ checkout_retry(ManagerPid, ReqOpts, CallTimeout, Retry);
+ Result ->
+ Result
+ end.
+
+%% Requests.
+
+-spec delete(iodata(), gun:req_headers()) -> request_result().
+delete(Path, Headers) ->
+ request(<<"DELETE">>, Path, Headers, <<>>).
+
+-spec delete(iodata(), gun:req_headers(), req_opts()) -> request_result().
+delete(Path, Headers, ReqOpts) ->
+ request(<<"DELETE">>, Path, Headers, <<>>, ReqOpts).
+
+-spec get(iodata(), gun:req_headers()) -> request_result().
+get(Path, Headers) ->
+ request(<<"GET">>, Path, Headers, <<>>).
+
+-spec get(iodata(), gun:req_headers(), req_opts()) -> request_result().
+get(Path, Headers, ReqOpts) ->
+ request(<<"GET">>, Path, Headers, <<>>, ReqOpts).
+
+-spec head(iodata(), gun:req_headers()) -> request_result().
+head(Path, Headers) ->
+ request(<<"HEAD">>, Path, Headers, <<>>).
+
+-spec head(iodata(), gun:req_headers(), req_opts()) -> request_result().
+head(Path, Headers, ReqOpts) ->
+ request(<<"HEAD">>, Path, Headers, <<>>, ReqOpts).
+
+-spec options(iodata(), gun:req_headers()) -> request_result().
+options(Path, Headers) ->
+ request(<<"OPTIONS">>, Path, Headers, <<>>).
+
+-spec options(iodata(), gun:req_headers(), req_opts()) -> request_result().
+options(Path, Headers, ReqOpts) ->
+ request(<<"OPTIONS">>, Path, Headers, <<>>, ReqOpts).
+
+-spec patch(iodata(), gun:req_headers()) -> request_result().
+patch(Path, Headers) ->
+ headers(<<"PATCH">>, Path, Headers).
+
+-spec patch(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
+patch(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
+ headers(<<"PATCH">>, Path, Headers, ReqOpts);
+patch(Path, Headers, Body) ->
+ request(<<"PATCH">>, Path, Headers, Body).
+
+-spec patch(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+patch(Path, Headers, Body, ReqOpts) ->
+ request(<<"PATCH">>, Path, Headers, Body, ReqOpts).
+
+-spec post(iodata(), gun:req_headers()) -> request_result().
+post(Path, Headers) ->
+ headers(<<"POST">>, Path, Headers).
+
+-spec post(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
+post(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
+ headers(<<"POST">>, Path, Headers, ReqOpts);
+post(Path, Headers, Body) ->
+ request(<<"POST">>, Path, Headers, Body).
+
+-spec post(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+post(Path, Headers, Body, ReqOpts) ->
+ request(<<"POST">>, Path, Headers, Body, ReqOpts).
+
+-spec put(iodata(), gun:req_headers()) -> request_result().
+put(Path, Headers) ->
+ headers(<<"PUT">>, Path, Headers).
+
+-spec put(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
+put(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
+ headers(<<"PUT">>, Path, Headers, ReqOpts);
+put(Path, Headers, Body) ->
+ request(<<"PUT">>, Path, Headers, Body).
+
+-spec put(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+put(Path, Headers, Body, ReqOpts) ->
+ request(<<"PUT">>, Path, Headers, Body, ReqOpts).
+
+%% Generic requests interface.
+%%
+%% @todo Accept a TargetURI map as well as a normal Path.
+
+-spec headers(iodata(), iodata(), gun:req_headers()) -> request_result().
+headers(Method, Path, Headers) ->
+ headers(Method, Path, Headers, #{}).
+
+-spec headers(iodata(), iodata(), gun:req_headers(), req_opts()) -> request_result().
+headers(Method, Path, Headers, ReqOpts) ->
+ case get_pool(authority(Headers), ReqOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ case checkout(ManagerPid, ReqOpts) of
+ undefined ->
+ {error, no_connection_available,
+ 'No connection in the pool with enough capacity available to open a new stream.'};
+ {ConnPid, _Meta} ->
+ StreamRef = gun:headers(ConnPid, Method, Path, Headers, ReqOpts),
+ %% @todo Synchronous mode.
+ {async, {ConnPid, StreamRef}}
+ end
+ end.
+
+-spec request(iodata(), iodata(), gun:req_headers(), iodata()) -> request_result().
+request(Method, Path, Headers, Body) ->
+ request(Method, Path, Headers, Body, #{}).
+
+-spec request(iodata(), iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+request(Method, Path, Headers, Body, ReqOpts) ->
+ case get_pool(authority(Headers), ReqOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ case checkout(ManagerPid, ReqOpts) of
+ undefined ->
+ {error, no_connection_available,
+ 'No connection in the pool with enough capacity available to open a new stream.'};
+ {ConnPid, _Meta} ->
+ StreamRef = gun:request(ConnPid, Method, Path, Headers, Body, ReqOpts),
+ %% @todo Synchronous mode.
+ {async, {ConnPid, StreamRef}}
+ end
+ end.
+
+%% We require the host to be given in the headers for the time being.
+%% @todo Allow passing it in options. Websocket send already does that.
+authority(#{<<"host">> := Authority}) ->
+ Authority;
+authority(Headers) ->
+ {_, Authority} = lists:keyfind(<<"host">>, 1, Headers),
+ Authority.
+
+get_pool(Authority0, ReqOpts) ->
+ Authority = iolist_to_binary(Authority0),
+ %% @todo Perhaps rename this to temporary.
+ %% There's two concepts: temporary pool is started and stops
+ %% when there's no traffic. Dynamic pool simply reduces its number
+ %% of connections when there's no traffic.
+ StartPoolIfMissing = maps:get(start_pool_if_missing, ReqOpts, false),
+ Scope = maps:get(scope, ReqOpts, default),
+ case ets:lookup(gun_pools, {Scope, Authority}) of
+ [] when StartPoolIfMissing ->
+ start_missing_pool(Authority, ReqOpts);
+ [] ->
+ undefined;
+ [{_, ManagerPid}] ->
+ %% @todo With temporary pool, getting a pid here doesn't mean the pool can be used.
+ %% Indeed the manager could be in process of stopping. I suppose we must
+ %% do a check but perhaps it's best to leave that detail to the user
+ %% (they can easily retry and recreate the pool if necessary).
+ ManagerPid
+ end.
+
+start_missing_pool(_Authority, _ReqOpts) ->
+ undefined.
+
+%% Streaming data.
+
+-spec data(pool_stream_ref(), fin | nofin, iodata()) -> ok.
+data({ConnPid, StreamRef}, IsFin, Data) ->
+ gun:data(ConnPid, StreamRef, IsFin, Data).
+
+%% Awaiting gun messages.
+
+-spec await(pool_stream_ref()) -> gun:await_result().
+await({ConnPid, StreamRef}) ->
+ gun:await(ConnPid, StreamRef).
+
+-spec await(pool_stream_ref(), timeout() | reference()) -> gun:await_result().
+await({ConnPid, StreamRef}, MRefOrTimeout) ->
+ gun:await(ConnPid, StreamRef, MRefOrTimeout).
+
+-spec await(pool_stream_ref(), timeout(), reference()) -> gun:await_result().
+await({ConnPid, StreamRef}, Timeout, MRef) ->
+ gun:await(ConnPid, StreamRef, Timeout, MRef).
+
+-spec await_body(pool_stream_ref()) -> gun:await_body_result().
+await_body({ConnPid, StreamRef}) ->
+ gun:await_body(ConnPid, StreamRef).
+
+-spec await_body(pool_stream_ref(), timeout() | reference()) -> gun:await_body_result().
+await_body({ConnPid, StreamRef}, MRefOrTimeout) ->
+ gun:await_body(ConnPid, StreamRef, MRefOrTimeout).
+
+-spec await_body(pool_stream_ref(), timeout(), reference()) -> gun:await_body_result().
+await_body({ConnPid, StreamRef}, Timeout, MRef) ->
+ gun:await_body(ConnPid, StreamRef, Timeout, MRef).
+
+%% Flushing gun messages.
+
+-spec flush(pool_stream_ref()) -> ok.
+flush({ConnPid, _}) ->
+ gun:flush(ConnPid).
+
+%% Flow control.
+
+-spec update_flow(pool_stream_ref(), pos_integer()) -> ok.
+update_flow({ConnPid, StreamRef}, Flow) ->
+ gun:update_flow(ConnPid, StreamRef, Flow).
+
+%% Cancelling a stream.
+
+-spec cancel(pool_stream_ref()) -> ok.
+cancel({ConnPid, StreamRef}) ->
+ gun:cancel(ConnPid, StreamRef).
+
+%% Information about a stream.
+
+-spec stream_info(pool_stream_ref()) -> {ok, map() | undefined} | {error, not_connected}.
+stream_info({ConnPid, StreamRef}) ->
+ gun:stream_info(ConnPid, StreamRef).
+
+%% Websocket.
+
+-spec ws_send(gun:ws_frame() | [gun:ws_frame()], ws_send_opts()) -> ok | error_result().
+ws_send(Frames, WsSendOpts=#{authority := Authority}) ->
+ case get_pool(Authority, WsSendOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ case checkout(ManagerPid, WsSendOpts) of
+ undefined ->
+ {error, no_connection_available,
+ 'No connection in the pool with enough capacity available to send Websocket frames.'};
+ {ConnPid, #{ws := StreamRef}} ->
+ gun:ws_send(ConnPid, StreamRef, Frames)
+ end
+ end.
+
+%% Pool manager internals.
+%%
+%% The pool manager is responsible for starting connection processes
+%% and restarting them as necessary. It also provides a suitable
+%% connection process to any caller that needs it.
+%%
+%% The pool manager installs an event handler into each connection.
+%% The event handler is responsible for counting the number of
+%% active streams. It updates the gun_pooled_conns ets table
+%% whenever a stream begins or ends.
+%%
+%% A connection is deemed suitable if it is possible to open new
+%% streams. How many streams can be open at any one time depends
+%% on the protocol. For HTTP/2 the manager process keeps track of
+%% the connection's settings to know the maximum. For non-stream
+%% based protocols, there is no limit.
+%%
+%% The connection to be used is otherwise chosen randomly. The
+%% first connection that is suitable is returned. There is no
+%% need to "give back" the connection to the manager.
+
+%% @todo
+%% What should happen if we always fail to reconnect? I suspect we keep the manager
+%% around and propagate errors, the same as if there's no more capacity? Perhaps have alarms?
+
+callback_mode() -> state_functions.
+
+start_link(Host, Port, Opts) ->
+ gen_statem:start_link(?MODULE, {Host, Port, Opts}, []).
+
+init({Host, Port, Opts}) ->
+ process_flag(trap_exit, true),
+ true = ets:insert_new(gun_pools, {gun_pools_key(Host, Port, Opts), self()}),
+ Tid = ets:new(gun_pooled_conns, [ordered_set, public]),
+ Size = maps:get(size, Opts, 8),
+ %% @todo Only start processes in static mode.
+ ConnOpts = conn_opts(Tid, Opts),
+ Conns = maps:from_list([begin
+ {ok, ConnPid} = gun:open(Host, Port, ConnOpts),
+ _ = monitor(process, ConnPid),
+ {ConnPid, down}
+ end || _ <- lists:seq(1, Size)]),
+ State = #state{
+ host=Host,
+ port=Port,
+ opts=Opts,
+ table=Tid,
+ conns=Conns
+ },
+ %% If Size is 0 then we can never be operational.
+ {ok, degraded, State}.
+
+gun_pools_key(Host, Port, Opts) ->
+ Transport = maps:get(transport, Opts, gun:default_transport(Port)),
+ Authority = gun_http:host_header(Transport, Host, Port),
+ Scope = maps:get(scope, Opts, default),
+ {Scope, iolist_to_binary(Authority)}.
+
+conn_opts(Tid, Opts) ->
+ ConnOpts = maps:get(conn_opts, Opts, #{}),
+ EventHandlerState = maps:with([event_handler], ConnOpts),
+ H2Opts = maps:get(http2_opts, ConnOpts, #{}),
+ ConnOpts#{
+ event_handler => {gun_pool_events_h, EventHandlerState#{
+ table => Tid
+ }},
+ http2_opts => H2Opts#{
+ notify_settings_changed => true
+ }
+ }.
+
+%% We use the degraded state as long as at least one connection is degraded.
+%% @todo Probably keep count of connections separately to avoid counting every time.
+degraded(info, Msg={gun_up, ConnPid, _}, StateData=#state{opts=Opts, conns=Conns}) ->
+ #{ConnPid := down} = Conns,
+ %% We optionally run the setup function if one is defined. The
+ %% setup function tells us whether we are fully up or not. The
+ %% setup function may be called repeatedly until the connection
+ %% is established.
+ %%
+ %% @todo It is possible that the connection never does get
+ %% fully established. We should deal with this. We probably
+ %% need to handle all messages.
+ {SetupFun, SetupState0} = setup_fun(Opts),
+ degraded_setup(ConnPid, Msg, StateData, SetupFun, SetupState0);
+%% @todo
+%degraded(info, Msg={gun_tunnel_up, ConnPid, _, _}, StateData0=#state{conns=Conns}) ->
+% ;
+degraded(info, Msg={gun_upgrade, ConnPid, _, _, _},
+ StateData=#state{opts=#{setup_fun := {SetupFun, _}}, conns=Conns}) ->
+ %% @todo Probably shouldn't crash if the state is incorrect, that's programmer error though.
+ #{ConnPid := {setup, SetupState0}} = Conns,
+ %% We run the setup function again using the state previously kept.
+ degraded_setup(ConnPid, Msg, StateData, SetupFun, SetupState0);
+degraded(Type, Event, StateData) ->
+ handle_common(Type, Event, ?FUNCTION_NAME, StateData).
+
+setup_fun(#{setup_fun := SetupFun}) ->
+ SetupFun;
+setup_fun(_) ->
+ {fun (_, {gun_up, _, Protocol}, _) ->
+ {up, Protocol, #{}}
+ end, undefined}.
+
+degraded_setup(ConnPid, Msg, StateData0=#state{conns=Conns, conns_meta=ConnsMeta,
+ await_up=AwaitUp}, SetupFun, SetupState0) ->
+ case SetupFun(ConnPid, Msg, SetupState0) of
+ Setup={setup, _SetupState} ->
+ StateData = StateData0#state{conns=Conns#{ConnPid => Setup}},
+ {keep_state, StateData};
+ %% The Meta is different from Settings. It allows passing around
+ %% Websocket or tunnel stream refs.
+ {up, Protocol, Meta} ->
+ Settings = #{},
+ StateData = StateData0#state{
+ conns=Conns#{ConnPid => {up, Protocol, Settings}},
+ conns_meta=ConnsMeta#{ConnPid => Meta}
+ },
+ case is_degraded(StateData) of
+ true -> {keep_state, StateData};
+ false -> {next_state, operational, StateData#state{await_up=[]},
+ [{reply, ReplyTo, ok} || ReplyTo <- AwaitUp]}
+ end
+ end.
+
+is_degraded(#state{conns=Conns0}) ->
+ Conns = maps:to_list(Conns0),
+ Len = length(Conns),
+ Ups = [up || {_, {up, _, _}} <- Conns],
+ Len =/= length(Ups).
+
+operational(Type, Event, StateData) ->
+ handle_common(Type, Event, ?FUNCTION_NAME, StateData).
+
+handle_common({call, From}, {checkout, _ReqOpts}, _,
+ StateData=#state{conns_meta=ConnsMeta}) ->
+ case find_available_connection(StateData) of
+ none ->
+ {keep_state_and_data, {reply, From, undefined}};
+ ConnPid ->
+ Meta = maps:get(ConnPid, ConnsMeta, #{}),
+ {keep_state_and_data, {reply, From, {ConnPid, Meta}}}
+ end;
+handle_common(info, {gun_notify, ConnPid, settings_changed, Settings}, _, StateData=#state{conns=Conns}) ->
+ %% Assert that the state is correct.
+ {up, http2, _} = maps:get(ConnPid, Conns),
+ {keep_state, StateData#state{conns=Conns#{ConnPid => {up, http2, Settings}}}};
+handle_common(info, {gun_down, ConnPid, Protocol, _Reason, _KilledStreams}, _, StateData=#state{conns=Conns}) ->
+ {up, Protocol, _} = maps:get(ConnPid, Conns),
+ {next_state, degraded, StateData#state{conns=Conns#{ConnPid => down}}};
+%% @todo We do not want to reconnect automatically when the pool is dynamic.
+handle_common(info, {'DOWN', _MRef, process, ConnPid0, _Reason}, _,
+ StateData=#state{host=Host, port=Port, opts=Opts, table=Tid, conns=Conns0, conns_meta=ConnsMeta0}) ->
+ Conns = maps:remove(ConnPid0, Conns0),
+ ConnsMeta = maps:remove(ConnPid0, ConnsMeta0),
+ ConnOpts = conn_opts(Tid, Opts),
+ {ok, ConnPid} = gun:open(Host, Port, ConnOpts),
+ _ = monitor(process, ConnPid),
+ {next_state, degraded, StateData#state{conns=Conns#{ConnPid => down}, conns_meta=ConnsMeta}};
+handle_common({call, From}, info, StateName, #state{host=Host, port=Port,
+ opts=Opts, table=Tid, conns=Conns, conns_meta=ConnsMeta}) ->
+ {keep_state_and_data, {reply, From, {StateName, #{
+ %% @todo Not sure whether all of this should be documented. Maybe not ConnsMeta for now?
+ host => Host,
+ port => Port,
+ opts => Opts,
+ table => Tid,
+ conns => Conns,
+ conns_meta => ConnsMeta
+ }}}};
+handle_common({call, From}, await_up, operational, _) ->
+ {keep_state_and_data, {reply, From, ok}};
+handle_common({call, From}, await_up, _, StateData=#state{await_up=AwaitUp}) ->
+ {keep_state, StateData#state{await_up=[From|AwaitUp]}};
+handle_common(Type, Event, StateName, StateData) ->
+ logger:error("Unexpected event in state ~p of type ~p:~n~w~n~p~n",
+ [StateName, Type, Event, StateData]),
+ keep_state_and_data.
+
+%% We go over every connection and return the first one
+%% we find that has capacity. How we determine whether
+%% capacity is available depends on the protocol. For
+%% HTTP/2 we look into the protocol settings. The
+%% current number of streams is maintained by the
+%% event handler gun_pool_events_h.
+find_available_connection(#state{table=Tid, conns=Conns}) ->
+ I = lists:sort([{rand:uniform(), K} || K <- maps:keys(Conns)]),
+ find_available_connection(I, Conns, Tid).
+
+find_available_connection([], _, _) ->
+ none;
+find_available_connection([{_, ConnPid}|I], Conns, Tid) ->
+ case maps:get(ConnPid, Conns) of
+ {up, Protocol, Settings} ->
+ MaxStreams = max_streams(Protocol, Settings),
+ CurrentStreams = case ets:lookup(Tid, ConnPid) of
+ [] ->
+ 0;
+ [{_, CS}] ->
+ CS
+ end,
+ if
+ CurrentStreams + 1 > MaxStreams ->
+ find_available_connection(I, Conns, Tid);
+ true ->
+ ConnPid
+ end;
+ _ ->
+ find_available_connection(I, Conns, Tid)
+ end.
+
+max_streams(http, _) ->
+ 1;
+max_streams(http2, #{max_concurrent_streams := MaxStreams}) ->
+ MaxStreams;
+max_streams(http2, #{}) ->
+ infinity;
+%% There are no streams or Gun is not aware of streams when
+%% the protocol is Websocket or raw.
+max_streams(ws, _) ->
+ infinity;
+max_streams(raw, _) ->
+ infinity.
+
+terminate(Reason, StateName, #state{host=Host, port=Port, opts=Opts, await_up=AwaitUp}) ->
+ gen_statem:reply([
+ {reply, ReplyTo, {error, {terminate, StateName, Reason}}}
+ || ReplyTo <- AwaitUp]),
+ true = ets:delete(gun_pools, gun_pools_key(Host, Port, Opts)),
+ ok.