aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-08-02 14:30:08 +0200
committerLoïc Hoguin <[email protected]>2019-08-05 19:57:13 +0200
commit611f9a9b78cab4005892e13dffb7a2c8e44580ee (patch)
treed8d3fc407110ea12333ba122cf711326e82a7070 /src/gun.erl
parent145b9af4bdbb85e2f83959ee8abaa4d9207a4529 (diff)
downloadgun-611f9a9b78cab4005892e13dffb7a2c8e44580ee.tar.gz
gun-611f9a9b78cab4005892e13dffb7a2c8e44580ee.tar.bz2
gun-611f9a9b78cab4005892e13dffb7a2c8e44580ee.zip
Add flow control
Flow control is disabled by default. The initial flow value must be set to enable it (either for the entire connection or on a per-request basis). Flow applies to all HTTP streams as well as Websocket. HTTP/2 pushed streams receive the same value as their originating stream.
Diffstat (limited to 'src/gun.erl')
-rw-r--r--src/gun.erl59
1 files changed, 48 insertions, 11 deletions
diff --git a/src/gun.erl b/src/gun.erl
index d758ffc..7b06aaf 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -79,6 +79,7 @@
-export([flush/1]).
%% Streams.
+-export([update_flow/3]).
-export([cancel/2]).
-export([stream_info/2]).
@@ -158,6 +159,7 @@
%% This is of course not required for HTTP/1.1 since the CONNECT takes over
%% the entire connection.
-type req_opts() :: #{
+ flow => pos_integer(),
reply_to => pid()
}.
-export_type([req_opts/0]).
@@ -177,6 +179,7 @@
%% @todo keepalive
-type ws_opts() :: #{
compress => boolean(),
+ flow => pos_integer(),
protocols => [{binary(), module()}]
}.
-export_type([ws_opts/0]).
@@ -194,6 +197,7 @@
keepalive_ref :: undefined | reference(),
socket :: undefined | inet:socket() | ssl:sslsocket() | pid(),
transport :: module(),
+ active = true :: boolean(),
messages :: {atom(), atom(), atom()},
protocol :: module(),
protocol_state :: any(),
@@ -458,9 +462,10 @@ headers(ServerPid, Method, Path, Headers) ->
-spec headers(pid(), iodata(), iodata(), req_headers(), req_opts()) -> reference().
headers(ServerPid, Method, Path, Headers, ReqOpts) ->
StreamRef = make_ref(),
+ InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
gen_statem:cast(ServerPid, {headers, ReplyTo, StreamRef,
- Method, Path, normalize_headers(Headers)}),
+ Method, Path, normalize_headers(Headers), InitialFlow}),
StreamRef.
-spec request(pid(), iodata(), iodata(), req_headers(), iodata()) -> reference().
@@ -470,9 +475,10 @@ request(ServerPid, Method, Path, Headers, Body) ->
-spec request(pid(), iodata(), iodata(), req_headers(), iodata(), req_opts()) -> reference().
request(ServerPid, Method, Path, Headers, Body, ReqOpts) ->
StreamRef = make_ref(),
+ InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
gen_statem:cast(ServerPid, {request, ReplyTo, StreamRef,
- Method, Path, normalize_headers(Headers), Body}),
+ Method, Path, normalize_headers(Headers), Body, InitialFlow}),
StreamRef.
normalize_headers([]) ->
@@ -510,8 +516,10 @@ connect(ServerPid, Destination, Headers) ->
-spec connect(pid(), connect_destination(), req_headers(), req_opts()) -> reference().
connect(ServerPid, Destination, Headers, ReqOpts) ->
StreamRef = make_ref(),
+ InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
- gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef, Destination, Headers}),
+ gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef,
+ Destination, Headers, InitialFlow}),
StreamRef.
%% Awaiting gun messages.
@@ -520,6 +528,7 @@ connect(ServerPid, Destination, Headers, ReqOpts) ->
-type await_result() :: {inform, 100..199, resp_headers()}
| {response, fin | nofin, non_neg_integer(), resp_headers()}
| {data, fin | nofin, binary()}
+ | {sse, cow_sse:event() | fin}
| {trailers, resp_headers()}
| {push, reference(), binary(), binary(), resp_headers()}
| {upgrade, [binary()], resp_headers()}
@@ -551,6 +560,8 @@ await(ServerPid, StreamRef, Timeout, MRef) ->
{response, IsFin, Status, Headers};
{gun_data, ServerPid, StreamRef, IsFin, Data} ->
{data, IsFin, Data};
+ {gun_sse, ServerPid, StreamRef, Event} ->
+ {sse, Event};
{gun_trailers, ServerPid, StreamRef, Trailers} ->
{trailers, Trailers};
{gun_push, ServerPid, StreamRef, NewStreamRef, Method, URI, Headers} ->
@@ -699,12 +710,20 @@ flush_ref(StreamRef) ->
ok
end.
+%% Flow control.
+
+-spec update_flow(pid(), reference(), pos_integer()) -> ok.
+update_flow(ServerPid, StreamRef, Flow) ->
+ gen_statem:cast(ServerPid, {update_flow, self(), StreamRef, Flow}).
+
%% Cancelling a stream.
-spec cancel(pid(), reference()) -> ok.
cancel(ServerPid, StreamRef) ->
gen_statem:cast(ServerPid, {cancel, self(), StreamRef}).
+%% Information about a stream.
+
-spec stream_info(pid(), reference()) -> {ok, map() | undefined} | {error, not_connected}.
stream_info(ServerPid, StreamRef) ->
gen_statem:call(ServerPid, {stream_info, StreamRef}).
@@ -919,11 +938,16 @@ connected(internal, {connected, Socket, Protocol},
{keep_state, keepalive_timeout(active(State#state{socket=Socket,
protocol=Protocol, protocol_state=ProtoState}))};
%% Socket events.
-connected(info, {OK, Socket, Data}, State=#state{socket=Socket, messages={OK, _, _},
+connected(info, {OK, Socket, Data}, State0=#state{socket=Socket, messages={OK, _, _},
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0),
- commands(Commands, active(State#state{event_handler_state=EvHandlerState}));
+ case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of
+ {keep_state, State} ->
+ {keep_state, active(State)};
+ Res ->
+ Res
+ end;
connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) ->
disconnect(State, closed);
connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) ->
@@ -936,21 +960,21 @@ connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoS
ProtoState2 = Protocol:keepalive(ProtoState),
{keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})};
%% Public HTTP interface.
-connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers},
+connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, EvHandlerState} = Protocol:headers(ProtoState,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
- EvHandler, EvHandlerState0),
+ InitialFlow, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
-connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body},
+connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, EvHandlerState} = Protocol:request(ProtoState,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body,
- EvHandler, EvHandlerState0),
+ InitialFlow, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
%% @todo Do we want to reject ReplyTo if it's not the process
%% who initiated the connection? For both data and cancel.
@@ -960,7 +984,7 @@ connected(cast, {data, ReplyTo, StreamRef, IsFin, Data},
{ProtoState2, EvHandlerState} = Protocol:data(ProtoState,
StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
-connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers},
+connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow},
State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
%% The protocol option has been deprecated in favor of the protocols option.
%% Nobody probably ended up using it, but let's not break the interface.
@@ -980,7 +1004,7 @@ connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers},
_ ->
Destination1
end,
- ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers),
+ ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers, InitialFlow),
{keep_state, State#state{protocol_state=ProtoState2}};
%% When using gun_tls_proxy we need a separate message to know whether
%% the handshake succeeded and whether we need to switch to a different protocol.
@@ -1002,6 +1026,15 @@ connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent}
error => Reason
}, EvHandlerState0),
commands([Error], State#state{event_handler_state=EvHandlerState});
+connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, State0=#state{
+ protocol=Protocol, protocol_state=ProtoState}) ->
+ Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow),
+ case commands(Commands, State0) of
+ {keep_state, State} ->
+ {keep_state, active(State)};
+ Res ->
+ Res
+ end;
connected(cast, {cancel, ReplyTo, StreamRef}, State=#state{
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
@@ -1092,6 +1125,8 @@ commands([close|_], State) ->
disconnect(State, normal);
commands([Error={error, _}|_], State) ->
disconnect(State, Error);
+commands([{active, Active}|Tail], State) when is_boolean(Active) ->
+ commands(Tail, State#state{active=Active});
commands([{state, ProtoState}|Tail], State) ->
commands(Tail, State#state{protocol_state=ProtoState});
%% Order is important: the origin must be changed before
@@ -1179,6 +1214,8 @@ disconnect_flush(State=#state{socket=Socket, messages={OK, Closed, Error}}) ->
ok
end.
+active(State=#state{active=false}) ->
+ State;
active(State=#state{socket=Socket, transport=Transport}) ->
Transport:setopts(Socket, [{active, once}]),
State.