Initial commit for Gun pools
The approach taken here is very similar to what browsers are doing. A separate pool is created for each host/port/scope. The authority (host header) is used to determine which pool will execute requests. A connection process is semi-randomly chosen, from the connections that have capacity. Maximum capacity is determined by the protocol (the HTTP/2 setting set by the server is used, for example). Multiple processes can process requests/responses on the same connection concurrently. There is no need to "give back" the response to the pool, the number of ongoing streams is maintained via an event handler. The implementation is currently not strict, there may be more attempts to create requests than there is capacity. I'm not sure if it should be made strict or if Gun should just wait before sending requests (it only matters in the HTTP/2 case at the moment). When there is no connection with capacity available in the pool (because they have too many streams, or are reconnecting, or any other reason), checking out fails. There is no timeout to wait for a connection to be available. On the other hand the checkout_retry option allows setting multiple timeouts to retry checking out a connection. Each retry attempt's wait time can have a different value. The initial implementation of this work was sponsored by Kobil and made at the suggestion of Ilya Khaprov.
+%% Copyright (c) 2021, Loïc Hoguin <[email protected]>
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%% Pools.
+% @todo shutdown pool?
+-export([checkout/2]). %% Use responsibly!
+%% Requests.
+%% Generic requests interface.
+%% Streaming data.
+%% Tunneling. (HTTP/2+ only.)
+%% @todo -export([connect/2]).
+%% @todo -export([connect/3]).
+%% @todo -export([connect/4]).
+%% Cookies.
+%% @todo -export([gc_cookies/1]).
+%% @todo -export([session_gc_cookies/1]).
+%% Awaiting gun messages.
+%% Flushing gun messages.
+%% Streams.
+%% Websocket. (HTTP/2+ only for upgrade.)
+%% -export([ws_upgrade/1]).
+%% -export([ws_upgrade/2]).
+%% -export([ws_upgrade/3]).
+%% -export([ws_send/3]). (HTTP/2+ only.)
+%% Internals.
+-type setup_msg() :: {gun_up, pid(), http | http2 | raw | socks}
+ | {gun_upgrade, pid(), gun:stream_ref(), [<<"websocket">>], [{binary(), binary()}]}.
+-type opts() :: #{
+ conn_opts => gun:opts(),
+ scope => any(),
+ setup_fun => {fun((pid(), setup_msg(), any()) -> any()), any()},
+ size => non_neg_integer()
+-type pool_stream_ref() :: {pid(), gun:stream_ref()}.
+-type error_result() :: {error, pool_not_found | no_connection_available, atom()}. %% @todo {pool_start_error, SupError}
+-type request_result() :: {async, pool_stream_ref()}
+ %% @todo {sync, ...} perhaps with Status, Headers, Body, and Extra info such as intermediate responses.
+ | error_result().
+-type req_opts() :: #{
+ %% Options common with normal Gun.
+ flow => pos_integer(),
+ reply_to => pid(),
+% @todo tunnel => stream_ref(),
+ %% Options specific to pools.
+ checkout_call_timeout => timeout(),
+ checkout_retry => [pos_integer()],
+ scope => any(),
+ start_pool_if_missing => boolean()
+-type ws_send_opts() :: #{
+ authority => iodata(),
+ %% Options specific to pools.
+ checkout_call_timeout => timeout(),
+ checkout_retry => [pos_integer()],
+ scope => any(),
+ start_pool_if_missing => boolean()
+%% @todo tunnel
+-type meta() :: #{pid() => #{ws => gun:stream_ref()}}.
+-record(state, {
+ host :: inet:hostname() | inet:ip_address(),
+ port :: inet:port_number(),
+ opts :: gun:opts(),
+ table :: ets:tid(),
+ conns :: #{pid() => down | {setup, any()} | {up, http | http2 | ws | raw, map()}},
+ conns_meta = #{} :: meta(),
+ await_up = [] :: [{pid(), any()}]
+%% Pool management.
+-spec start_pool(inet:hostname() | inet:ip_address(), inet:port_number(), opts())
+ -> {ok, pid()} | {error, any()}.
+start_pool(Host, Port, Opts) ->
+ supervisor:start_child(gun_pools_sup, [Host, Port, Opts]).
+-spec stop_pool(inet:hostname() | inet:ip_address(), inet:port_number())
+ -> ok.
+stop_pool(Host, Port) ->
+ stop_pool(Host, Port, #{}).
+-spec stop_pool(inet:hostname() | inet:ip_address(), inet:port_number(), req_opts())
+ -> ok | {error, pool_not_found, atom()}.
+stop_pool(Host, Port, ReqOpts) ->
+ case get_pool(iolist_to_binary([Host, $:, integer_to_binary(Port)]), ReqOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ supervisor:terminate_child(gun_pools_sup, ManagerPid)
+ end.
+-spec info() -> [map()].
+info() ->
+ ets:foldl(fun({{Scope, _}, ManagerPid}, Acc) ->
+ {StateName, Info} = info(ManagerPid),
+ [Info#{scope => Scope, state => StateName}|Acc]
+ end, [], gun_pools).
+-spec info(pid() | binary()) -> undefined | {degraded | operational, map()}.
+info(ManagerPid) when is_pid(ManagerPid) ->
+ gen_statem:call(ManagerPid, info);
+info(Authority) ->
+ info(Authority, default).
+-spec info(binary(), any()) -> undefined | {degraded | operational, map()}.
+info(Authority, Scope) ->
+ case ets:lookup(gun_pools, {Scope, Authority}) of
+ [] ->
+ undefined;
+ [{_, ManagerPid}] ->
+ gen_statem:call(ManagerPid, info)
+ end.
+-spec await_up(pid() | binary()) -> ok | {error, pool_not_found, atom()}.
+await_up(ManagerPid) when is_pid(ManagerPid) ->
+ gen_statem:call(ManagerPid, await_up, 5000);
+await_up(Authority) ->
+ await_up(Authority, default).
+-spec await_up(binary(), any()) -> ok | {error, pool_not_found, atom()}.
+await_up(Authority, Scope) ->
+ case ets:lookup(gun_pools, {Scope, Authority}) of
+ [] ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ [{_, ManagerPid}] ->
+ gen_statem:call(ManagerPid, await_up, 5000)
+ end.
+-spec checkout(pid(), req_opts()) -> undefined | {pid(), map()}.
+checkout(ManagerPid, ReqOpts=#{checkout_retry := Retry}) when is_list(Retry) ->
+ CallTimeout = maps:get(checkout_call_timeout, ReqOpts, 5000),
+ case gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout) of
+ undefined ->
+ checkout_retry(ManagerPid, ReqOpts, CallTimeout, Retry);
+ Result ->
+ Result
+ end;
+checkout(ManagerPid, ReqOpts) ->
+ CallTimeout = maps:get(checkout_call_timeout, ReqOpts, 5000),
+ gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout).
+%% When the checkout_retry option is used, and the first call resulted
+%% in no connection being given out, we wait for the configured amount
+%% of time then try again. We loop over the wait times until there is
+%% none.
+checkout_retry(_, _, _, []) ->
+ undefined;
+checkout_retry(ManagerPid, ReqOpts, CallTimeout, [Wait|Retry]) ->
+ timer:sleep(Wait),
+ case gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout) of
+ undefined ->
+ checkout_retry(ManagerPid, ReqOpts, CallTimeout, Retry);
+ Result ->
+ Result
+ end.
+%% Requests.
+-spec delete(iodata(), gun:req_headers()) -> request_result().
+delete(Path, Headers) ->
+ request(<<"DELETE">>, Path, Headers, <<>>).
+-spec delete(iodata(), gun:req_headers(), req_opts()) -> request_result().
+delete(Path, Headers, ReqOpts) ->
+ request(<<"DELETE">>, Path, Headers, <<>>, ReqOpts).
+-spec get(iodata(), gun:req_headers()) -> request_result().
+get(Path, Headers) ->
+ request(<<"GET">>, Path, Headers, <<>>).
+-spec get(iodata(), gun:req_headers(), req_opts()) -> request_result().
+get(Path, Headers, ReqOpts) ->
+ request(<<"GET">>, Path, Headers, <<>>, ReqOpts).
+-spec head(iodata(), gun:req_headers()) -> request_result().
+head(Path, Headers) ->
+ request(<<"HEAD">>, Path, Headers, <<>>).
+-spec head(iodata(), gun:req_headers(), req_opts()) -> request_result().
+head(Path, Headers, ReqOpts) ->
+ request(<<"HEAD">>, Path, Headers, <<>>, ReqOpts).
+-spec options(iodata(), gun:req_headers()) -> request_result().
+options(Path, Headers) ->
+ request(<<"OPTIONS">>, Path, Headers, <<>>).
+-spec options(iodata(), gun:req_headers(), req_opts()) -> request_result().
+options(Path, Headers, ReqOpts) ->
+ request(<<"OPTIONS">>, Path, Headers, <<>>, ReqOpts).
+-spec patch(iodata(), gun:req_headers()) -> request_result().
+patch(Path, Headers) ->
+ headers(<<"PATCH">>, Path, Headers).
+-spec patch(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
+patch(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
+ headers(<<"PATCH">>, Path, Headers, ReqOpts);
+patch(Path, Headers, Body) ->
+ request(<<"PATCH">>, Path, Headers, Body).
+-spec patch(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+patch(Path, Headers, Body, ReqOpts) ->
+ request(<<"PATCH">>, Path, Headers, Body, ReqOpts).
+-spec post(iodata(), gun:req_headers()) -> request_result().
+post(Path, Headers) ->
+ headers(<<"POST">>, Path, Headers).
+-spec post(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
+post(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
+ headers(<<"POST">>, Path, Headers, ReqOpts);
+post(Path, Headers, Body) ->
+ request(<<"POST">>, Path, Headers, Body).
+-spec post(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+post(Path, Headers, Body, ReqOpts) ->
+ request(<<"POST">>, Path, Headers, Body, ReqOpts).
+-spec put(iodata(), gun:req_headers()) -> request_result().
+put(Path, Headers) ->
+ headers(<<"PUT">>, Path, Headers).
+-spec put(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
+put(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
+ headers(<<"PUT">>, Path, Headers, ReqOpts);
+put(Path, Headers, Body) ->
+ request(<<"PUT">>, Path, Headers, Body).
+-spec put(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+put(Path, Headers, Body, ReqOpts) ->
+ request(<<"PUT">>, Path, Headers, Body, ReqOpts).
+%% Generic requests interface.
+%% @todo Accept a TargetURI map as well as a normal Path.
+-spec headers(iodata(), iodata(), gun:req_headers()) -> request_result().
+headers(Method, Path, Headers) ->
+ headers(Method, Path, Headers, #{}).
+-spec headers(iodata(), iodata(), gun:req_headers(), req_opts()) -> request_result().
+headers(Method, Path, Headers, ReqOpts) ->
+ case get_pool(authority(Headers), ReqOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ case checkout(ManagerPid, ReqOpts) of
+ undefined ->
+ {error, no_connection_available,
+ 'No connection in the pool with enough capacity available to open a new stream.'};
+ {ConnPid, _Meta} ->
+ StreamRef = gun:headers(ConnPid, Method, Path, Headers, ReqOpts),
+ %% @todo Synchronous mode.
+ {async, {ConnPid, StreamRef}}
+ end
+ end.
+-spec request(iodata(), iodata(), gun:req_headers(), iodata()) -> request_result().
+request(Method, Path, Headers, Body) ->
+ request(Method, Path, Headers, Body, #{}).
+-spec request(iodata(), iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+request(Method, Path, Headers, Body, ReqOpts) ->
+ case get_pool(authority(Headers), ReqOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ case checkout(ManagerPid, ReqOpts) of
+ undefined ->
+ {error, no_connection_available,
+ 'No connection in the pool with enough capacity available to open a new stream.'};
+ {ConnPid, _Meta} ->
+ StreamRef = gun:request(ConnPid, Method, Path, Headers, Body, ReqOpts),
+ %% @todo Synchronous mode.
+ {async, {ConnPid, StreamRef}}
+ end
+ end.
+%% We require the host to be given in the headers for the time being.
+%% @todo Allow passing it in options. Websocket send already does that.
+authority(#{<<"host">> := Authority}) ->
+ Authority;
+authority(Headers) ->
+ {_, Authority} = lists:keyfind(<<"host">>, 1, Headers),
+ Authority.
+get_pool(Authority0, ReqOpts) ->
+ Authority = iolist_to_binary(Authority0),
+ %% @todo Perhaps rename this to temporary.
+ %% There's two concepts: temporary pool is started and stops
+ %% when there's no traffic. Dynamic pool simply reduces its number
+ %% of connections when there's no traffic.
+ StartPoolIfMissing = maps:get(start_pool_if_missing, ReqOpts, false),
+ Scope = maps:get(scope, ReqOpts, default),
+ case ets:lookup(gun_pools, {Scope, Authority}) of
+ [] when StartPoolIfMissing ->
+ start_missing_pool(Authority, ReqOpts);
+ [] ->
+ undefined;
+ [{_, ManagerPid}] ->
+ %% @todo With temporary pool, getting a pid here doesn't mean the pool can be used.
+ %% Indeed the manager could be in process of stopping. I suppose we must
+ %% do a check but perhaps it's best to leave that detail to the user
+ %% (they can easily retry and recreate the pool if necessary).
+ ManagerPid
+ end.
+start_missing_pool(_Authority, _ReqOpts) ->
+ undefined.
+%% Streaming data.
+-spec data(pool_stream_ref(), fin | nofin, iodata()) -> ok.
+data({ConnPid, StreamRef}, IsFin, Data) ->
+ gun:data(ConnPid, StreamRef, IsFin, Data).
+%% Awaiting gun messages.
+-spec await(pool_stream_ref()) -> gun:await_result().
+await({ConnPid, StreamRef}) ->
+ gun:await(ConnPid, StreamRef).
+-spec await(pool_stream_ref(), timeout() | reference()) -> gun:await_result().
+await({ConnPid, StreamRef}, MRefOrTimeout) ->
+ gun:await(ConnPid, StreamRef, MRefOrTimeout).
+-spec await(pool_stream_ref(), timeout(), reference()) -> gun:await_result().
+await({ConnPid, StreamRef}, Timeout, MRef) ->
+ gun:await(ConnPid, StreamRef, Timeout, MRef).
+-spec await_body(pool_stream_ref()) -> gun:await_body_result().
+await_body({ConnPid, StreamRef}) ->
+ gun:await_body(ConnPid, StreamRef).
+-spec await_body(pool_stream_ref(), timeout() | reference()) -> gun:await_body_result().
+await_body({ConnPid, StreamRef}, MRefOrTimeout) ->
+ gun:await_body(ConnPid, StreamRef, MRefOrTimeout).
+-spec await_body(pool_stream_ref(), timeout(), reference()) -> gun:await_body_result().
+await_body({ConnPid, StreamRef}, Timeout, MRef) ->
+ gun:await_body(ConnPid, StreamRef, Timeout, MRef).
+%% Flushing gun messages.
+-spec flush(pool_stream_ref()) -> ok.
+flush({ConnPid, _}) ->
+ gun:flush(ConnPid).
+%% Flow control.
+-spec update_flow(pool_stream_ref(), pos_integer()) -> ok.
+update_flow({ConnPid, StreamRef}, Flow) ->
+ gun:update_flow(ConnPid, StreamRef, Flow).
+%% Cancelling a stream.
+-spec cancel(pool_stream_ref()) -> ok.
+cancel({ConnPid, StreamRef}) ->
+ gun:cancel(ConnPid, StreamRef).
+%% Information about a stream.
+-spec stream_info(pool_stream_ref()) -> {ok, map() | undefined} | {error, not_connected}.
+stream_info({ConnPid, StreamRef}) ->
+ gun:stream_info(ConnPid, StreamRef).
+%% Websocket.
+-spec ws_send(gun:ws_frame() | [gun:ws_frame()], ws_send_opts()) -> ok | error_result().
+ws_send(Frames, WsSendOpts=#{authority := Authority}) ->
+ case get_pool(Authority, WsSendOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ case checkout(ManagerPid, WsSendOpts) of
+ undefined ->
+ {error, no_connection_available,
+ 'No connection in the pool with enough capacity available to send Websocket frames.'};
+ {ConnPid, #{ws := StreamRef}} ->
+ gun:ws_send(ConnPid, StreamRef, Frames)
+ end
+ end.
+%% Pool manager internals.
+%% The pool manager is responsible for starting connection processes
+%% and restarting them as necessary. It also provides a suitable
+%% connection process to any caller that needs it.
+%% The pool manager installs an event handler into each connection.
+%% The event handler is responsible for counting the number of
+%% active streams. It updates the gun_pooled_conns ets table
+%% whenever a stream begins or ends.
+%% A connection is deemed suitable if it is possible to open new
+%% streams. How many streams can be open at any one time depends
+%% on the protocol. For HTTP/2 the manager process keeps track of
+%% the connection's settings to know the maximum. For non-stream
+%% based protocols, there is no limit.
+%% The connection to be used is otherwise chosen randomly. The
+%% first connection that is suitable is returned. There is no
+%% need to "give back" the connection to the manager.
+%% @todo
+%% What should happen if we always fail to reconnect? I suspect we keep the manager
+%% around and propagate errors, the same as if there's no more capacity? Perhaps have alarms?
+callback_mode() -> state_functions.
+start_link(Host, Port, Opts) ->
+ gen_statem:start_link(?MODULE, {Host, Port, Opts}, []).
+init({Host, Port, Opts}) ->
+ process_flag(trap_exit, true),
+ true = ets:insert_new(gun_pools, {gun_pools_key(Host, Port, Opts), self()}),
+ Tid = ets:new(gun_pooled_conns, [ordered_set, public]),
+ Size = maps:get(size, Opts, 8),
+ %% @todo Only start processes in static mode.
+ ConnOpts = conn_opts(Tid, Opts),
+ Conns = maps:from_list([begin
+ {ok, ConnPid} = gun:open(Host, Port, ConnOpts),
+ _ = monitor(process, ConnPid),
+ {ConnPid, down}
+ end || _ <- lists:seq(1, Size)]),
+ State = #state{
+ host=Host,
+ port=Port,
+ opts=Opts,
+ table=Tid,
+ conns=Conns
+ },
+ %% If Size is 0 then we can never be operational.
+ {ok, degraded, State}.
+gun_pools_key(Host, Port, Opts) ->
+ Transport = maps:get(transport, Opts, gun:default_transport(Port)),
+ Authority = gun_http:host_header(Transport, Host, Port),
+ Scope = maps:get(scope, Opts, default),
+ {Scope, iolist_to_binary(Authority)}.
+conn_opts(Tid, Opts) ->
+ ConnOpts = maps:get(conn_opts, Opts, #{}),
+ EventHandlerState = maps:with([event_handler], ConnOpts),
+ H2Opts = maps:get(http2_opts, ConnOpts, #{}),
+ ConnOpts#{
+ event_handler => {gun_pool_events_h, EventHandlerState#{
+ table => Tid
+ }},
+ http2_opts => H2Opts#{
+ notify_settings_changed => true
+ }
+ }.
+%% We use the degraded state as long as at least one connection is degraded.
+%% @todo Probably keep count of connections separately to avoid counting every time.
+degraded(info, Msg={gun_up, ConnPid, _}, StateData=#state{opts=Opts, conns=Conns}) ->
+ #{ConnPid := down} = Conns,
+ %% We optionally run the setup function if one is defined. The
+ %% setup function tells us whether we are fully up or not. The
+ %% setup function may be called repeatedly until the connection
+ %% is established.
+ %%
+ %% @todo It is possible that the connection never does get
+ %% fully established. We should deal with this. We probably
+ %% need to handle all messages.
+ {SetupFun, SetupState0} = setup_fun(Opts),
+ degraded_setup(ConnPid, Msg, StateData, SetupFun, SetupState0);
+%% @todo
+%degraded(info, Msg={gun_tunnel_up, ConnPid, _, _}, StateData0=#state{conns=Conns}) ->
+% ;
+degraded(info, Msg={gun_upgrade, ConnPid, _, _, _},
+ StateData=#state{opts=#{setup_fun := {SetupFun, _}}, conns=Conns}) ->
+ %% @todo Probably shouldn't crash if the state is incorrect, that's programmer error though.
+ #{ConnPid := {setup, SetupState0}} = Conns,
+ %% We run the setup function again using the state previously kept.
+ degraded_setup(ConnPid, Msg, StateData, SetupFun, SetupState0);
+degraded(Type, Event, StateData) ->
+ handle_common(Type, Event, ?FUNCTION_NAME, StateData).
+setup_fun(#{setup_fun := SetupFun}) ->
+ SetupFun;
+setup_fun(_) ->
+ {fun (_, {gun_up, _, Protocol}, _) ->
+ {up, Protocol, #{}}
+ end, undefined}.
+degraded_setup(ConnPid, Msg, StateData0=#state{conns=Conns, conns_meta=ConnsMeta,
+ await_up=AwaitUp}, SetupFun, SetupState0) ->
+ case SetupFun(ConnPid, Msg, SetupState0) of
+ Setup={setup, _SetupState} ->
+ StateData = StateData0#state{conns=Conns#{ConnPid => Setup}},
+ {keep_state, StateData};
+ %% The Meta is different from Settings. It allows passing around
+ %% Websocket or tunnel stream refs.
+ {up, Protocol, Meta} ->
+ Settings = #{},
+ StateData = StateData0#state{
+ conns=Conns#{ConnPid => {up, Protocol, Settings}},
+ conns_meta=ConnsMeta#{ConnPid => Meta}
+ },
+ case is_degraded(StateData) of
+ true -> {keep_state, StateData};
+ false -> {next_state, operational, StateData#state{await_up=[]},
+ [{reply, ReplyTo, ok} || ReplyTo <- AwaitUp]}
+ end
+ end.
+is_degraded(#state{conns=Conns0}) ->
+ Conns = maps:to_list(Conns0),
+ Len = length(Conns),
+ Ups = [up || {_, {up, _, _}} <- Conns],
+ Len =/= length(Ups).
+operational(Type, Event, StateData) ->
+ handle_common(Type, Event, ?FUNCTION_NAME, StateData).
+handle_common({call, From}, {checkout, _ReqOpts}, _,
+ StateData=#state{conns_meta=ConnsMeta}) ->
+ case find_available_connection(StateData) of
+ none ->
+ {keep_state_and_data, {reply, From, undefined}};
+ ConnPid ->
+ Meta = maps:get(ConnPid, ConnsMeta, #{}),
+ {keep_state_and_data, {reply, From, {ConnPid, Meta}}}
+ end;
+handle_common(info, {gun_notify, ConnPid, settings_changed, Settings}, _, StateData=#state{conns=Conns}) ->
+ %% Assert that the state is correct.
+ {up, http2, _} = maps:get(ConnPid, Conns),
+ {keep_state, StateData#state{conns=Conns#{ConnPid => {up, http2, Settings}}}};
+handle_common(info, {gun_down, ConnPid, Protocol, _Reason, _KilledStreams}, _, StateData=#state{conns=Conns}) ->
+ {up, Protocol, _} = maps:get(ConnPid, Conns),
+ {next_state, degraded, StateData#state{conns=Conns#{ConnPid => down}}};
+%% @todo We do not want to reconnect automatically when the pool is dynamic.
+handle_common(info, {'DOWN', _MRef, process, ConnPid0, _Reason}, _,
+ StateData=#state{host=Host, port=Port, opts=Opts, table=Tid, conns=Conns0, conns_meta=ConnsMeta0}) ->
+ Conns = maps:remove(ConnPid0, Conns0),
+ ConnsMeta = maps:remove(ConnPid0, ConnsMeta0),
+ ConnOpts = conn_opts(Tid, Opts),
+ {ok, ConnPid} = gun:open(Host, Port, ConnOpts),
+ _ = monitor(process, ConnPid),
+ {next_state, degraded, StateData#state{conns=Conns#{ConnPid => down}, conns_meta=ConnsMeta}};
+handle_common({call, From}, info, StateName, #state{host=Host, port=Port,
+ opts=Opts, table=Tid, conns=Conns, conns_meta=ConnsMeta}) ->
+ {keep_state_and_data, {reply, From, {StateName, #{
+ %% @todo Not sure whether all of this should be documented. Maybe not ConnsMeta for now?
+ host => Host,
+ port => Port,
+ opts => Opts,
+ table => Tid,
+ conns => Conns,
+ conns_meta => ConnsMeta
+ }}}};
+handle_common({call, From}, await_up, operational, _) ->
+ {keep_state_and_data, {reply, From, ok}};
+handle_common({call, From}, await_up, _, StateData=#state{await_up=AwaitUp}) ->
+ {keep_state, StateData#state{await_up=[From|AwaitUp]}};
+handle_common(Type, Event, StateName, StateData) ->
+ logger:error("Unexpected event in state ~p of type ~p:~n~w~n~p~n",
+ [StateName, Type, Event, StateData]),
+ keep_state_and_data.
+%% We go over every connection and return the first one
+%% we find that has capacity. How we determine whether
+%% capacity is available depends on the protocol. For
+%% HTTP/2 we look into the protocol settings. The
+%% current number of streams is maintained by the
+%% event handler gun_pool_events_h.
+find_available_connection(#state{table=Tid, conns=Conns}) ->
+ I = lists:sort([{rand:uniform(), K} || K <- maps:keys(Conns)]),
+ find_available_connection(I, Conns, Tid).
+find_available_connection([], _, _) ->
+ none;
+find_available_connection([{_, ConnPid}|I], Conns, Tid) ->
+ case maps:get(ConnPid, Conns) of
+ {up, Protocol, Settings} ->
+ MaxStreams = max_streams(Protocol, Settings),
+ CurrentStreams = case ets:lookup(Tid, ConnPid) of
+ [] ->
+ 0;
+ [{_, CS}] ->
+ CS
+ end,
+ if
+ CurrentStreams + 1 > MaxStreams ->
+ find_available_connection(I, Conns, Tid);
+ true ->
+ ConnPid
+ end;
+ _ ->
+ find_available_connection(I, Conns, Tid)
+ end.
+max_streams(http, _) ->
+ 1;
+max_streams(http2, #{max_concurrent_streams := MaxStreams}) ->
+ MaxStreams;
+max_streams(http2, #{}) ->
+ infinity;
+%% There are no streams or Gun is not aware of streams when
+%% the protocol is Websocket or raw.
+max_streams(ws, _) ->
+ infinity;
+max_streams(raw, _) ->
+ infinity.
+terminate(Reason, StateName, #state{host=Host, port=Port, opts=Opts, await_up=AwaitUp}) ->
+ gen_statem:reply([
+ {reply, ReplyTo, {error, {terminate, StateName, Reason}}}
+ || ReplyTo <- AwaitUp]),
+ true = ets:delete(gun_pools, gun_pools_key(Host, Port, Opts)),
+ ok.