diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/gun.erl | 7 | ||||
-rw-r--r-- | src/gun_app.erl | 1 | ||||
-rw-r--r-- | src/gun_conns_sup.erl | 36 | ||||
-rw-r--r-- | src/gun_event.erl | 2 | ||||
-rw-r--r-- | src/gun_http.erl | 8 | ||||
-rw-r--r-- | src/gun_http2.erl | 2 | ||||
-rw-r--r-- | src/gun_pool.erl | 704 | ||||
-rw-r--r-- | src/gun_pool_events_h.erl | 157 | ||||
-rw-r--r-- | src/gun_pools_sup.erl | 37 | ||||
-rw-r--r-- | src/gun_sup.erl | 13 |
10 files changed, 952 insertions, 15 deletions
diff --git a/src/gun.erl b/src/gun.erl index 8a101a7..d808750 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -98,6 +98,7 @@ -export([start_link/4]). -export([callback_mode/0]). -export([init/1]). +-export([default_transport/1]). -export([not_connected/3]). -export([domain_lookup/3]). -export([connecting/3]). @@ -321,7 +322,7 @@ do_open(Host, Port, Opts0) -> case check_options(maps:to_list(Opts)) of ok -> Result = case maps:get(supervise, Opts, true) of - true -> supervisor:start_child(gun_sup, [self(), Host, Port, Opts]); + true -> supervisor:start_child(gun_conns_sup, [self(), Host, Port, Opts]); false -> start_link(self(), Host, Port, Opts) end, case Result of @@ -508,7 +509,7 @@ intermediaries_info([Intermediary=#{transport := Transport0}|Tail], Acc) -> -spec close(pid()) -> ok. close(ServerPid) -> - supervisor:terminate_child(gun_sup, ServerPid). + supervisor:terminate_child(gun_conns_sup, ServerPid). -spec shutdown(pid()) -> ok. shutdown(ServerPid) -> @@ -832,6 +833,8 @@ await_up(ServerPid, Timeout, MRef) -> {error, timeout} end. +%% Flushing gun messages. + -spec flush(pid() | stream_ref()) -> ok. flush(ServerPid) when is_pid(ServerPid) -> flush_pid(ServerPid); diff --git a/src/gun_app.erl b/src/gun_app.erl index 848faf2..624e65f 100644 --- a/src/gun_app.erl +++ b/src/gun_app.erl @@ -23,6 +23,7 @@ %% API. start(_Type, _Args) -> + gun_pools = ets:new(gun_pools, [ordered_set, public, named_table]), gun_sup:start_link(). stop(_State) -> diff --git a/src/gun_conns_sup.erl b/src/gun_conns_sup.erl new file mode 100644 index 0000000..bee266b --- /dev/null +++ b/src/gun_conns_sup.erl @@ -0,0 +1,36 @@ +%% Copyright (c) 2013-2021, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_conns_sup). +-behaviour(supervisor). + +%% API. +-export([start_link/0]). + +%% supervisor. +-export([init/1]). + +%% API. + +-spec start_link() -> {ok, pid()}. +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% supervisor. + +init([]) -> + Procs = [ + #{id => gun, start => {gun, start_link, []}, restart => temporary} + ], + {ok, {#{strategy => simple_one_for_one}, Procs}}. diff --git a/src/gun_event.erl b/src/gun_event.erl index 71da77b..ab6020b 100644 --- a/src/gun_event.erl +++ b/src/gun_event.erl @@ -83,7 +83,7 @@ -type request_start_event() :: #{ stream_ref := gun:stream_ref(), reply_to := pid(), - function := headers | request | ws_upgrade, + function := headers | request | ws_upgrade, %% @todo connect? method := iodata(), scheme => binary(), authority := iodata(), diff --git a/src/gun_http.erl b/src/gun_http.erl index cc541e2..aaa6d15 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -35,7 +35,7 @@ -export([down/1]). -export([ws_upgrade/11]). -%% Functions shared with gun_http2. +%% Functions shared with gun_http2 and gun_pool. -export([host_header/3]). -type io() :: head | {body, non_neg_integer()} | body_close | body_chunked | body_trailer. @@ -607,7 +607,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi end, {Authority, Headers3} = case lists:keyfind(<<"host">>, 1, Headers2) of false -> - Authority0 = host_header(Transport, Host, Port), + Authority0 = host_header(Transport:name(), Host, Port), {Authority0, [{<<"host">>, Authority0}|Headers2]}; {_, Authority1} -> {Authority1, Headers2} @@ -648,7 +648,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi end, {Authority, Conn, Out, CookieStore, EvHandlerState}. -host_header(Transport, Host0, Port) -> +host_header(TransportName, Host0, Port) -> Host = case Host0 of {local, _SocketPath} -> <<>>; Tuple when tuple_size(Tuple) =:= 8 -> [$[, inet:ntoa(Tuple), $]]; %% IPv6. @@ -656,7 +656,7 @@ host_header(Transport, Host0, Port) -> Atom when is_atom(Atom) -> atom_to_list(Atom); _ -> Host0 end, - case {Transport:name(), Port} of + case {TransportName, Port} of {tcp, 80} -> Host; {tls, 443} -> Host; _ -> [Host, $:, integer_to_binary(Port)] diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 2778b4f..ec51235 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -979,7 +979,7 @@ prepare_headers(State=#http2_state{transport=Transport}, Scheme = scheme(State), Authority = case lists:keyfind(<<"host">>, 1, Headers0) of {_, Host} -> Host; - _ -> gun_http:host_header(Transport, Host0, Port) + _ -> gun_http:host_header(Transport:name(), Host0, Port) end, %% @todo We also must remove any header found in the connection header. %% @todo Much of this is duplicated in cow_http2_machine; sort things out. diff --git a/src/gun_pool.erl b/src/gun_pool.erl new file mode 100644 index 0000000..56734ee --- /dev/null +++ b/src/gun_pool.erl @@ -0,0 +1,704 @@ +%% Copyright (c) 2021, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_pool). +-behaviour(gen_statem). + +%% Pools. +-export([start_pool/3]). +-export([stop_pool/2]). +-export([stop_pool/3]). +% @todo shutdown pool? +-export([info/0]). +-export([info/1]). +-export([info/2]). +-export([await_up/1]). +-export([await_up/2]). +-export([checkout/2]). %% Use responsibly! + +%% Requests. +-export([delete/2]). +-export([delete/3]). +-export([get/2]). +-export([get/3]). +-export([head/2]). +-export([head/3]). +-export([options/2]). +-export([options/3]). +-export([patch/2]). +-export([patch/3]). +-export([patch/4]). +-export([post/2]). +-export([post/3]). +-export([post/4]). +-export([put/2]). +-export([put/3]). +-export([put/4]). + +%% Generic requests interface. +-export([headers/3]). +-export([headers/4]). +-export([request/4]). +-export([request/5]). + +%% Streaming data. +-export([data/3]). + +%% Tunneling. (HTTP/2+ only.) +%% @todo -export([connect/2]). +%% @todo -export([connect/3]). +%% @todo -export([connect/4]). + +%% Cookies. +%% @todo -export([gc_cookies/1]). +%% @todo -export([session_gc_cookies/1]). + +%% Awaiting gun messages. +-export([await/1]). +-export([await/2]). +-export([await/3]). +-export([await_body/1]). +-export([await_body/2]). +-export([await_body/3]). + +%% Flushing gun messages. +-export([flush/1]). + +%% Streams. +-export([update_flow/2]). +-export([cancel/1]). +-export([stream_info/1]). + +%% Websocket. (HTTP/2+ only for upgrade.) +%% -export([ws_upgrade/1]). +%% -export([ws_upgrade/2]). +%% -export([ws_upgrade/3]). +-export([ws_send/2]). +%% -export([ws_send/3]). (HTTP/2+ only.) + +%% Internals. +-export([callback_mode/0]). +-export([start_link/3]). +-export([init/1]). +-export([degraded/3]). +-export([operational/3]). +-export([terminate/3]). + +-type setup_msg() :: {gun_up, pid(), http | http2 | raw | socks} + | {gun_upgrade, pid(), gun:stream_ref(), [<<"websocket">>], [{binary(), binary()}]}. + +-type opts() :: #{ + conn_opts => gun:opts(), + scope => any(), + setup_fun => {fun((pid(), setup_msg(), any()) -> any()), any()}, + size => non_neg_integer() +}. +-export_type([opts/0]). + +-type pool_stream_ref() :: {pid(), gun:stream_ref()}. +-export_type([pool_stream_ref/0]). + +-type error_result() :: {error, pool_not_found | no_connection_available, atom()}. %% @todo {pool_start_error, SupError} + +-type request_result() :: {async, pool_stream_ref()} + %% @todo {sync, ...} perhaps with Status, Headers, Body, and Extra info such as intermediate responses. + | error_result(). + +-type req_opts() :: #{ + %% Options common with normal Gun. + flow => pos_integer(), + reply_to => pid(), +% @todo tunnel => stream_ref(), + + %% Options specific to pools. + checkout_call_timeout => timeout(), + checkout_retry => [pos_integer()], + scope => any(), + start_pool_if_missing => boolean() +}. +-export_type([req_opts/0]). + +-type ws_send_opts() :: #{ + authority => iodata(), + + %% Options specific to pools. + checkout_call_timeout => timeout(), + checkout_retry => [pos_integer()], + scope => any(), + start_pool_if_missing => boolean() +}. +-export_type([ws_send_opts/0]). + +%% @todo tunnel +-type meta() :: #{pid() => #{ws => gun:stream_ref()}}. + +-record(state, { + host :: inet:hostname() | inet:ip_address(), + port :: inet:port_number(), + opts :: gun:opts(), + table :: ets:tid(), + conns :: #{pid() => down | {setup, any()} | {up, http | http2 | ws | raw, map()}}, + conns_meta = #{} :: meta(), + await_up = [] :: [{pid(), any()}] +}). + +%% Pool management. + +-spec start_pool(inet:hostname() | inet:ip_address(), inet:port_number(), opts()) + -> {ok, pid()} | {error, any()}. +start_pool(Host, Port, Opts) -> + supervisor:start_child(gun_pools_sup, [Host, Port, Opts]). + +-spec stop_pool(inet:hostname() | inet:ip_address(), inet:port_number()) + -> ok. +stop_pool(Host, Port) -> + stop_pool(Host, Port, #{}). + +-spec stop_pool(inet:hostname() | inet:ip_address(), inet:port_number(), req_opts()) + -> ok | {error, pool_not_found, atom()}. +stop_pool(Host, Port, ReqOpts) -> + case get_pool(iolist_to_binary([Host, $:, integer_to_binary(Port)]), ReqOpts) of + undefined -> + {error, pool_not_found, + 'No pool was found for the given scope and authority.'}; + ManagerPid -> + supervisor:terminate_child(gun_pools_sup, ManagerPid) + end. + +-spec info() -> [map()]. +info() -> + ets:foldl(fun({{Scope, _}, ManagerPid}, Acc) -> + {StateName, Info} = info(ManagerPid), + [Info#{scope => Scope, state => StateName}|Acc] + end, [], gun_pools). + +-spec info(pid() | binary()) -> undefined | {degraded | operational, map()}. +info(ManagerPid) when is_pid(ManagerPid) -> + gen_statem:call(ManagerPid, info); +info(Authority) -> + info(Authority, default). + +-spec info(binary(), any()) -> undefined | {degraded | operational, map()}. +info(Authority, Scope) -> + case ets:lookup(gun_pools, {Scope, Authority}) of + [] -> + undefined; + [{_, ManagerPid}] -> + gen_statem:call(ManagerPid, info) + end. + +-spec await_up(pid() | binary()) -> ok | {error, pool_not_found, atom()}. +await_up(ManagerPid) when is_pid(ManagerPid) -> + gen_statem:call(ManagerPid, await_up, 5000); +await_up(Authority) -> + await_up(Authority, default). + +-spec await_up(binary(), any()) -> ok | {error, pool_not_found, atom()}. +await_up(Authority, Scope) -> + case ets:lookup(gun_pools, {Scope, Authority}) of + [] -> + {error, pool_not_found, + 'No pool was found for the given scope and authority.'}; + [{_, ManagerPid}] -> + gen_statem:call(ManagerPid, await_up, 5000) + end. + +-spec checkout(pid(), req_opts()) -> undefined | {pid(), map()}. +checkout(ManagerPid, ReqOpts=#{checkout_retry := Retry}) when is_list(Retry) -> + CallTimeout = maps:get(checkout_call_timeout, ReqOpts, 5000), + case gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout) of + undefined -> + checkout_retry(ManagerPid, ReqOpts, CallTimeout, Retry); + Result -> + Result + end; +checkout(ManagerPid, ReqOpts) -> + CallTimeout = maps:get(checkout_call_timeout, ReqOpts, 5000), + gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout). + +%% When the checkout_retry option is used, and the first call resulted +%% in no connection being given out, we wait for the configured amount +%% of time then try again. We loop over the wait times until there is +%% none. +checkout_retry(_, _, _, []) -> + undefined; +checkout_retry(ManagerPid, ReqOpts, CallTimeout, [Wait|Retry]) -> + timer:sleep(Wait), + case gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout) of + undefined -> + checkout_retry(ManagerPid, ReqOpts, CallTimeout, Retry); + Result -> + Result + end. + +%% Requests. + +-spec delete(iodata(), gun:req_headers()) -> request_result(). +delete(Path, Headers) -> + request(<<"DELETE">>, Path, Headers, <<>>). + +-spec delete(iodata(), gun:req_headers(), req_opts()) -> request_result(). +delete(Path, Headers, ReqOpts) -> + request(<<"DELETE">>, Path, Headers, <<>>, ReqOpts). + +-spec get(iodata(), gun:req_headers()) -> request_result(). +get(Path, Headers) -> + request(<<"GET">>, Path, Headers, <<>>). + +-spec get(iodata(), gun:req_headers(), req_opts()) -> request_result(). +get(Path, Headers, ReqOpts) -> + request(<<"GET">>, Path, Headers, <<>>, ReqOpts). + +-spec head(iodata(), gun:req_headers()) -> request_result(). +head(Path, Headers) -> + request(<<"HEAD">>, Path, Headers, <<>>). + +-spec head(iodata(), gun:req_headers(), req_opts()) -> request_result(). +head(Path, Headers, ReqOpts) -> + request(<<"HEAD">>, Path, Headers, <<>>, ReqOpts). + +-spec options(iodata(), gun:req_headers()) -> request_result(). +options(Path, Headers) -> + request(<<"OPTIONS">>, Path, Headers, <<>>). + +-spec options(iodata(), gun:req_headers(), req_opts()) -> request_result(). +options(Path, Headers, ReqOpts) -> + request(<<"OPTIONS">>, Path, Headers, <<>>, ReqOpts). + +-spec patch(iodata(), gun:req_headers()) -> request_result(). +patch(Path, Headers) -> + headers(<<"PATCH">>, Path, Headers). + +-spec patch(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result(). +patch(Path, Headers, ReqOpts) when is_map(ReqOpts) -> + headers(<<"PATCH">>, Path, Headers, ReqOpts); +patch(Path, Headers, Body) -> + request(<<"PATCH">>, Path, Headers, Body). + +-spec patch(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result(). +patch(Path, Headers, Body, ReqOpts) -> + request(<<"PATCH">>, Path, Headers, Body, ReqOpts). + +-spec post(iodata(), gun:req_headers()) -> request_result(). +post(Path, Headers) -> + headers(<<"POST">>, Path, Headers). + +-spec post(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result(). +post(Path, Headers, ReqOpts) when is_map(ReqOpts) -> + headers(<<"POST">>, Path, Headers, ReqOpts); +post(Path, Headers, Body) -> + request(<<"POST">>, Path, Headers, Body). + +-spec post(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result(). +post(Path, Headers, Body, ReqOpts) -> + request(<<"POST">>, Path, Headers, Body, ReqOpts). + +-spec put(iodata(), gun:req_headers()) -> request_result(). +put(Path, Headers) -> + headers(<<"PUT">>, Path, Headers). + +-spec put(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result(). +put(Path, Headers, ReqOpts) when is_map(ReqOpts) -> + headers(<<"PUT">>, Path, Headers, ReqOpts); +put(Path, Headers, Body) -> + request(<<"PUT">>, Path, Headers, Body). + +-spec put(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result(). +put(Path, Headers, Body, ReqOpts) -> + request(<<"PUT">>, Path, Headers, Body, ReqOpts). + +%% Generic requests interface. +%% +%% @todo Accept a TargetURI map as well as a normal Path. + +-spec headers(iodata(), iodata(), gun:req_headers()) -> request_result(). +headers(Method, Path, Headers) -> + headers(Method, Path, Headers, #{}). + +-spec headers(iodata(), iodata(), gun:req_headers(), req_opts()) -> request_result(). +headers(Method, Path, Headers, ReqOpts) -> + case get_pool(authority(Headers), ReqOpts) of + undefined -> + {error, pool_not_found, + 'No pool was found for the given scope and authority.'}; + ManagerPid -> + case checkout(ManagerPid, ReqOpts) of + undefined -> + {error, no_connection_available, + 'No connection in the pool with enough capacity available to open a new stream.'}; + {ConnPid, _Meta} -> + StreamRef = gun:headers(ConnPid, Method, Path, Headers, ReqOpts), + %% @todo Synchronous mode. + {async, {ConnPid, StreamRef}} + end + end. + +-spec request(iodata(), iodata(), gun:req_headers(), iodata()) -> request_result(). +request(Method, Path, Headers, Body) -> + request(Method, Path, Headers, Body, #{}). + +-spec request(iodata(), iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result(). +request(Method, Path, Headers, Body, ReqOpts) -> + case get_pool(authority(Headers), ReqOpts) of + undefined -> + {error, pool_not_found, + 'No pool was found for the given scope and authority.'}; + ManagerPid -> + case checkout(ManagerPid, ReqOpts) of + undefined -> + {error, no_connection_available, + 'No connection in the pool with enough capacity available to open a new stream.'}; + {ConnPid, _Meta} -> + StreamRef = gun:request(ConnPid, Method, Path, Headers, Body, ReqOpts), + %% @todo Synchronous mode. + {async, {ConnPid, StreamRef}} + end + end. + +%% We require the host to be given in the headers for the time being. +%% @todo Allow passing it in options. Websocket send already does that. +authority(#{<<"host">> := Authority}) -> + Authority; +authority(Headers) -> + {_, Authority} = lists:keyfind(<<"host">>, 1, Headers), + Authority. + +get_pool(Authority0, ReqOpts) -> + Authority = iolist_to_binary(Authority0), + %% @todo Perhaps rename this to temporary. + %% There's two concepts: temporary pool is started and stops + %% when there's no traffic. Dynamic pool simply reduces its number + %% of connections when there's no traffic. + StartPoolIfMissing = maps:get(start_pool_if_missing, ReqOpts, false), + Scope = maps:get(scope, ReqOpts, default), + case ets:lookup(gun_pools, {Scope, Authority}) of + [] when StartPoolIfMissing -> + start_missing_pool(Authority, ReqOpts); + [] -> + undefined; + [{_, ManagerPid}] -> + %% @todo With temporary pool, getting a pid here doesn't mean the pool can be used. + %% Indeed the manager could be in process of stopping. I suppose we must + %% do a check but perhaps it's best to leave that detail to the user + %% (they can easily retry and recreate the pool if necessary). + ManagerPid + end. + +start_missing_pool(_Authority, _ReqOpts) -> + undefined. + +%% Streaming data. + +-spec data(pool_stream_ref(), fin | nofin, iodata()) -> ok. +data({ConnPid, StreamRef}, IsFin, Data) -> + gun:data(ConnPid, StreamRef, IsFin, Data). + +%% Awaiting gun messages. + +-spec await(pool_stream_ref()) -> gun:await_result(). +await({ConnPid, StreamRef}) -> + gun:await(ConnPid, StreamRef). + +-spec await(pool_stream_ref(), timeout() | reference()) -> gun:await_result(). +await({ConnPid, StreamRef}, MRefOrTimeout) -> + gun:await(ConnPid, StreamRef, MRefOrTimeout). + +-spec await(pool_stream_ref(), timeout(), reference()) -> gun:await_result(). +await({ConnPid, StreamRef}, Timeout, MRef) -> + gun:await(ConnPid, StreamRef, Timeout, MRef). + +-spec await_body(pool_stream_ref()) -> gun:await_body_result(). +await_body({ConnPid, StreamRef}) -> + gun:await_body(ConnPid, StreamRef). + +-spec await_body(pool_stream_ref(), timeout() | reference()) -> gun:await_body_result(). +await_body({ConnPid, StreamRef}, MRefOrTimeout) -> + gun:await_body(ConnPid, StreamRef, MRefOrTimeout). + +-spec await_body(pool_stream_ref(), timeout(), reference()) -> gun:await_body_result(). +await_body({ConnPid, StreamRef}, Timeout, MRef) -> + gun:await_body(ConnPid, StreamRef, Timeout, MRef). + +%% Flushing gun messages. + +-spec flush(pool_stream_ref()) -> ok. +flush({ConnPid, _}) -> + gun:flush(ConnPid). + +%% Flow control. + +-spec update_flow(pool_stream_ref(), pos_integer()) -> ok. +update_flow({ConnPid, StreamRef}, Flow) -> + gun:update_flow(ConnPid, StreamRef, Flow). + +%% Cancelling a stream. + +-spec cancel(pool_stream_ref()) -> ok. +cancel({ConnPid, StreamRef}) -> + gun:cancel(ConnPid, StreamRef). + +%% Information about a stream. + +-spec stream_info(pool_stream_ref()) -> {ok, map() | undefined} | {error, not_connected}. +stream_info({ConnPid, StreamRef}) -> + gun:stream_info(ConnPid, StreamRef). + +%% Websocket. + +-spec ws_send(gun:ws_frame() | [gun:ws_frame()], ws_send_opts()) -> ok | error_result(). +ws_send(Frames, WsSendOpts=#{authority := Authority}) -> + case get_pool(Authority, WsSendOpts) of + undefined -> + {error, pool_not_found, + 'No pool was found for the given scope and authority.'}; + ManagerPid -> + case checkout(ManagerPid, WsSendOpts) of + undefined -> + {error, no_connection_available, + 'No connection in the pool with enough capacity available to send Websocket frames.'}; + {ConnPid, #{ws := StreamRef}} -> + gun:ws_send(ConnPid, StreamRef, Frames) + end + end. + +%% Pool manager internals. +%% +%% The pool manager is responsible for starting connection processes +%% and restarting them as necessary. It also provides a suitable +%% connection process to any caller that needs it. +%% +%% The pool manager installs an event handler into each connection. +%% The event handler is responsible for counting the number of +%% active streams. It updates the gun_pooled_conns ets table +%% whenever a stream begins or ends. +%% +%% A connection is deemed suitable if it is possible to open new +%% streams. How many streams can be open at any one time depends +%% on the protocol. For HTTP/2 the manager process keeps track of +%% the connection's settings to know the maximum. For non-stream +%% based protocols, there is no limit. +%% +%% The connection to be used is otherwise chosen randomly. The +%% first connection that is suitable is returned. There is no +%% need to "give back" the connection to the manager. + +%% @todo +%% What should happen if we always fail to reconnect? I suspect we keep the manager +%% around and propagate errors, the same as if there's no more capacity? Perhaps have alarms? + +callback_mode() -> state_functions. + +start_link(Host, Port, Opts) -> + gen_statem:start_link(?MODULE, {Host, Port, Opts}, []). + +init({Host, Port, Opts}) -> + process_flag(trap_exit, true), + true = ets:insert_new(gun_pools, {gun_pools_key(Host, Port, Opts), self()}), + Tid = ets:new(gun_pooled_conns, [ordered_set, public]), + Size = maps:get(size, Opts, 8), + %% @todo Only start processes in static mode. + ConnOpts = conn_opts(Tid, Opts), + Conns = maps:from_list([begin + {ok, ConnPid} = gun:open(Host, Port, ConnOpts), + _ = monitor(process, ConnPid), + {ConnPid, down} + end || _ <- lists:seq(1, Size)]), + State = #state{ + host=Host, + port=Port, + opts=Opts, + table=Tid, + conns=Conns + }, + %% If Size is 0 then we can never be operational. + {ok, degraded, State}. + +gun_pools_key(Host, Port, Opts) -> + Transport = maps:get(transport, Opts, gun:default_transport(Port)), + Authority = gun_http:host_header(Transport, Host, Port), + Scope = maps:get(scope, Opts, default), + {Scope, iolist_to_binary(Authority)}. + +conn_opts(Tid, Opts) -> + ConnOpts = maps:get(conn_opts, Opts, #{}), + EventHandlerState = maps:with([event_handler], ConnOpts), + H2Opts = maps:get(http2_opts, ConnOpts, #{}), + ConnOpts#{ + event_handler => {gun_pool_events_h, EventHandlerState#{ + table => Tid + }}, + http2_opts => H2Opts#{ + notify_settings_changed => true + } + }. + +%% We use the degraded state as long as at least one connection is degraded. +%% @todo Probably keep count of connections separately to avoid counting every time. +degraded(info, Msg={gun_up, ConnPid, _}, StateData=#state{opts=Opts, conns=Conns}) -> + #{ConnPid := down} = Conns, + %% We optionally run the setup function if one is defined. The + %% setup function tells us whether we are fully up or not. The + %% setup function may be called repeatedly until the connection + %% is established. + %% + %% @todo It is possible that the connection never does get + %% fully established. We should deal with this. We probably + %% need to handle all messages. + {SetupFun, SetupState0} = setup_fun(Opts), + degraded_setup(ConnPid, Msg, StateData, SetupFun, SetupState0); +%% @todo +%degraded(info, Msg={gun_tunnel_up, ConnPid, _, _}, StateData0=#state{conns=Conns}) -> +% ; +degraded(info, Msg={gun_upgrade, ConnPid, _, _, _}, + StateData=#state{opts=#{setup_fun := {SetupFun, _}}, conns=Conns}) -> + %% @todo Probably shouldn't crash if the state is incorrect, that's programmer error though. + #{ConnPid := {setup, SetupState0}} = Conns, + %% We run the setup function again using the state previously kept. + degraded_setup(ConnPid, Msg, StateData, SetupFun, SetupState0); +degraded(Type, Event, StateData) -> + handle_common(Type, Event, ?FUNCTION_NAME, StateData). + +setup_fun(#{setup_fun := SetupFun}) -> + SetupFun; +setup_fun(_) -> + {fun (_, {gun_up, _, Protocol}, _) -> + {up, Protocol, #{}} + end, undefined}. + +degraded_setup(ConnPid, Msg, StateData0=#state{conns=Conns, conns_meta=ConnsMeta, + await_up=AwaitUp}, SetupFun, SetupState0) -> + case SetupFun(ConnPid, Msg, SetupState0) of + Setup={setup, _SetupState} -> + StateData = StateData0#state{conns=Conns#{ConnPid => Setup}}, + {keep_state, StateData}; + %% The Meta is different from Settings. It allows passing around + %% Websocket or tunnel stream refs. + {up, Protocol, Meta} -> + Settings = #{}, + StateData = StateData0#state{ + conns=Conns#{ConnPid => {up, Protocol, Settings}}, + conns_meta=ConnsMeta#{ConnPid => Meta} + }, + case is_degraded(StateData) of + true -> {keep_state, StateData}; + false -> {next_state, operational, StateData#state{await_up=[]}, + [{reply, ReplyTo, ok} || ReplyTo <- AwaitUp]} + end + end. + +is_degraded(#state{conns=Conns0}) -> + Conns = maps:to_list(Conns0), + Len = length(Conns), + Ups = [up || {_, {up, _, _}} <- Conns], + Len =/= length(Ups). + +operational(Type, Event, StateData) -> + handle_common(Type, Event, ?FUNCTION_NAME, StateData). + +handle_common({call, From}, {checkout, _ReqOpts}, _, + StateData=#state{conns_meta=ConnsMeta}) -> + case find_available_connection(StateData) of + none -> + {keep_state_and_data, {reply, From, undefined}}; + ConnPid -> + Meta = maps:get(ConnPid, ConnsMeta, #{}), + {keep_state_and_data, {reply, From, {ConnPid, Meta}}} + end; +handle_common(info, {gun_notify, ConnPid, settings_changed, Settings}, _, StateData=#state{conns=Conns}) -> + %% Assert that the state is correct. + {up, http2, _} = maps:get(ConnPid, Conns), + {keep_state, StateData#state{conns=Conns#{ConnPid => {up, http2, Settings}}}}; +handle_common(info, {gun_down, ConnPid, Protocol, _Reason, _KilledStreams}, _, StateData=#state{conns=Conns}) -> + {up, Protocol, _} = maps:get(ConnPid, Conns), + {next_state, degraded, StateData#state{conns=Conns#{ConnPid => down}}}; +%% @todo We do not want to reconnect automatically when the pool is dynamic. +handle_common(info, {'DOWN', _MRef, process, ConnPid0, _Reason}, _, + StateData=#state{host=Host, port=Port, opts=Opts, table=Tid, conns=Conns0, conns_meta=ConnsMeta0}) -> + Conns = maps:remove(ConnPid0, Conns0), + ConnsMeta = maps:remove(ConnPid0, ConnsMeta0), + ConnOpts = conn_opts(Tid, Opts), + {ok, ConnPid} = gun:open(Host, Port, ConnOpts), + _ = monitor(process, ConnPid), + {next_state, degraded, StateData#state{conns=Conns#{ConnPid => down}, conns_meta=ConnsMeta}}; +handle_common({call, From}, info, StateName, #state{host=Host, port=Port, + opts=Opts, table=Tid, conns=Conns, conns_meta=ConnsMeta}) -> + {keep_state_and_data, {reply, From, {StateName, #{ + %% @todo Not sure whether all of this should be documented. Maybe not ConnsMeta for now? + host => Host, + port => Port, + opts => Opts, + table => Tid, + conns => Conns, + conns_meta => ConnsMeta + }}}}; +handle_common({call, From}, await_up, operational, _) -> + {keep_state_and_data, {reply, From, ok}}; +handle_common({call, From}, await_up, _, StateData=#state{await_up=AwaitUp}) -> + {keep_state, StateData#state{await_up=[From|AwaitUp]}}; +handle_common(Type, Event, StateName, StateData) -> + logger:error("Unexpected event in state ~p of type ~p:~n~w~n~p~n", + [StateName, Type, Event, StateData]), + keep_state_and_data. + +%% We go over every connection and return the first one +%% we find that has capacity. How we determine whether +%% capacity is available depends on the protocol. For +%% HTTP/2 we look into the protocol settings. The +%% current number of streams is maintained by the +%% event handler gun_pool_events_h. +find_available_connection(#state{table=Tid, conns=Conns}) -> + I = lists:sort([{rand:uniform(), K} || K <- maps:keys(Conns)]), + find_available_connection(I, Conns, Tid). + +find_available_connection([], _, _) -> + none; +find_available_connection([{_, ConnPid}|I], Conns, Tid) -> + case maps:get(ConnPid, Conns) of + {up, Protocol, Settings} -> + MaxStreams = max_streams(Protocol, Settings), + CurrentStreams = case ets:lookup(Tid, ConnPid) of + [] -> + 0; + [{_, CS}] -> + CS + end, + if + CurrentStreams + 1 > MaxStreams -> + find_available_connection(I, Conns, Tid); + true -> + ConnPid + end; + _ -> + find_available_connection(I, Conns, Tid) + end. + +max_streams(http, _) -> + 1; +max_streams(http2, #{max_concurrent_streams := MaxStreams}) -> + MaxStreams; +max_streams(http2, #{}) -> + infinity; +%% There are no streams or Gun is not aware of streams when +%% the protocol is Websocket or raw. +max_streams(ws, _) -> + infinity; +max_streams(raw, _) -> + infinity. + +terminate(Reason, StateName, #state{host=Host, port=Port, opts=Opts, await_up=AwaitUp}) -> + gen_statem:reply([ + {reply, ReplyTo, {error, {terminate, StateName, Reason}}} + || ReplyTo <- AwaitUp]), + true = ets:delete(gun_pools, gun_pools_key(Host, Port, Opts)), + ok. diff --git a/src/gun_pool_events_h.erl b/src/gun_pool_events_h.erl new file mode 100644 index 0000000..1871a00 --- /dev/null +++ b/src/gun_pool_events_h.erl @@ -0,0 +1,157 @@ +%% Copyright (c) 2021, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_pool_events_h). + +-export([init/2]). +-export([domain_lookup_start/2]). +-export([domain_lookup_end/2]). +-export([connect_start/2]). +-export([connect_end/2]). +-export([tls_handshake_start/2]). +-export([tls_handshake_end/2]). +-export([request_start/2]). +-export([request_headers/2]). +-export([request_end/2]). +-export([push_promise_start/2]). +-export([push_promise_end/2]). +-export([response_start/2]). +-export([response_inform/2]). +-export([response_headers/2]). +-export([response_trailers/2]). +-export([response_end/2]). +-export([ws_upgrade/2]). +-export([ws_recv_frame_start/2]). +-export([ws_recv_frame_header/2]). +-export([ws_recv_frame_end/2]). +-export([ws_send_frame_start/2]). +-export([ws_send_frame_end/2]). +-export([protocol_changed/2]). +-export([origin_changed/2]). +-export([cancel/2]). +-export([disconnect/2]). +-export([terminate/2]). + +init(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +domain_lookup_start(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +domain_lookup_end(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +connect_start(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +connect_end(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +tls_handshake_start(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +tls_handshake_end(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +request_start(Event=#{stream_ref := StreamRef}, State=#{table := Tid}) -> + _ = ets:update_counter(Tid, self(), +1, {self(), 0}), + propagate(Event, State#{ + StreamRef => {nofin, nofin} + }, ?FUNCTION_NAME). + +request_headers(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +request_end(Event=#{stream_ref := StreamRef}, State0=#{table := Tid}) -> + State = case State0 of + #{StreamRef := {nofin, fin}} -> + _ = ets:update_counter(Tid, self(), -1), + maps:remove(StreamRef, State0); + #{StreamRef := {nofin, IsFin}} -> + State0#{StreamRef => {fin, IsFin}} + end, + propagate(Event, State, ?FUNCTION_NAME). + +push_promise_start(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +push_promise_end(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +response_start(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +response_inform(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +response_headers(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +response_trailers(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +response_end(Event=#{stream_ref := StreamRef}, State0=#{table := Tid}) -> + State = case State0 of + #{StreamRef := {fin, nofin}} -> + _ = ets:update_counter(Tid, self(), -1), + maps:remove(StreamRef, State0); + #{StreamRef := {IsFin, nofin}} -> + State0#{StreamRef => {IsFin, fin}} + end, + propagate(Event, State, ?FUNCTION_NAME). + +ws_upgrade(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +ws_recv_frame_start(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +ws_recv_frame_header(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +ws_recv_frame_end(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +ws_send_frame_start(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +ws_send_frame_end(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +protocol_changed(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +origin_changed(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +cancel(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +disconnect(Event, State=#{table := Tid}) -> + %% The ets:delete/2 call might fail when the pool has shut down. + try + true = ets:delete(Tid, self()) + catch _:_ -> + ok + end, + propagate(Event, maps:with([event_handler, table], State), ?FUNCTION_NAME). + +terminate(Event, State) -> + propagate(Event, State, ?FUNCTION_NAME). + +propagate(Event, State=#{event_handler := {Mod, ModState0}}, Fun) -> + ModState = Mod:Fun(Event, ModState0), + State#{event_handler => {Mod, ModState}}; +propagate(_, State, _) -> + State. diff --git a/src/gun_pools_sup.erl b/src/gun_pools_sup.erl new file mode 100644 index 0000000..157b41b --- /dev/null +++ b/src/gun_pools_sup.erl @@ -0,0 +1,37 @@ +%% Copyright (c) 2021, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_pools_sup). +-behaviour(supervisor). + +%% API. +-export([start_link/0]). + +%% supervisor. +-export([init/1]). + +%% API. + +-spec start_link() -> {ok, pid()}. +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% supervisor. + +init([]) -> + %% @todo Review restart strategies. + Procs = [ + #{id => gun_pool, start => {gun_pool, start_link, []}, restart => transient} + ], + {ok, {#{strategy => simple_one_for_one}, Procs}}. diff --git a/src/gun_sup.erl b/src/gun_sup.erl index dbe6698..a1270d7 100644 --- a/src/gun_sup.erl +++ b/src/gun_sup.erl @@ -12,7 +12,6 @@ %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -%% @private -module(gun_sup). -behaviour(supervisor). @@ -22,17 +21,17 @@ %% supervisor. -export([init/1]). --define(SUPERVISOR, ?MODULE). - %% API. -spec start_link() -> {ok, pid()}. start_link() -> - supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []). + supervisor:start_link({local, ?MODULE}, ?MODULE, []). %% supervisor. init([]) -> - Procs = [{gun, {gun, start_link, []}, - temporary, 5000, worker, [gun]}], - {ok, {{simple_one_for_one, 10, 10}, Procs}}. + Procs = [ + #{id => gun_conns_sup, start => {gun_conns_sup, start_link, []}, type => supervisor}, + #{id => gun_pools_sup, start => {gun_pools_sup, start_link, []}, type => supervisor} + ], + {ok, {#{}, Procs}}. |