aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_http2.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-08-02 14:30:08 +0200
committerLoïc Hoguin <[email protected]>2019-08-05 19:57:13 +0200
commit611f9a9b78cab4005892e13dffb7a2c8e44580ee (patch)
treed8d3fc407110ea12333ba122cf711326e82a7070 /src/gun_http2.erl
parent145b9af4bdbb85e2f83959ee8abaa4d9207a4529 (diff)
downloadgun-611f9a9b78cab4005892e13dffb7a2c8e44580ee.tar.gz
gun-611f9a9b78cab4005892e13dffb7a2c8e44580ee.tar.bz2
gun-611f9a9b78cab4005892e13dffb7a2c8e44580ee.zip
Add flow control
Flow control is disabled by default. The initial flow value must be set to enable it (either for the entire connection or on a per-request basis). Flow applies to all HTTP streams as well as Websocket. HTTP/2 pushed streams receive the same value as their originating stream.
Diffstat (limited to 'src/gun_http2.erl')
-rw-r--r--src/gun_http2.erl82
1 files changed, 67 insertions, 15 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 20e21ec..3b3b79b 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -18,10 +18,11 @@
-export([name/0]).
-export([init/4]).
-export([handle/4]).
+-export([update_flow/4]).
-export([close/4]).
-export([keepalive/1]).
--export([headers/10]).
--export([request/11]).
+-export([headers/11]).
+-export([request/12]).
-export([data/7]).
-export([cancel/5]).
-export([stream_info/2]).
@@ -36,6 +37,10 @@
%% Process to send messages to.
reply_to :: pid(),
+ %% Flow control.
+ flow :: integer() | infinity,
+ flow_window = 0 :: non_neg_integer(),
+
%% Content handlers state.
handler_state :: undefined | gun_content_handler:state()
}).
@@ -66,10 +71,15 @@ do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
ok -> do_check_options(Opts);
error -> {error, {options, {http2, Opt}}}
end;
+do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 ->
+ do_check_options(Opts);
do_check_options([{keepalive, infinity}|Opts]) ->
do_check_options(Opts);
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
do_check_options(Opts);
+%% @todo Add all http2_machine options.
+do_check_options([{max_frame_size_received, _}|Opts]) ->
+ do_check_options(Opts);
do_check_options([Opt|_]) ->
{error, {options, {http2, Opt}}}.
@@ -192,10 +202,20 @@ lingering_data_frame(State=#http2_state{socket=Socket, transport=Transport,
data_frame(State=#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0}, StreamID, IsFin, Data,
EvHandler, EvHandlerState0) ->
- Stream = #stream{ref=StreamRef, reply_to=ReplyTo,
- handler_state=Handlers0} = get_stream_by_id(State, StreamID),
- Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
+ Stream = #stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0,
+ flow_window=FlowWindow0, handler_state=Handlers0} = get_stream_by_id(State, StreamID),
+ {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0),
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 - Dec
+ end,
Size = byte_size(Data),
+ FlowWindow = if
+ IsFin =:= nofin, Flow =< 0 ->
+ FlowWindow0 + Size;
+ true ->
+ FlowWindow0
+ end,
{HTTP2Machine, EvHandlerState} = case Size of
%% We do not send a WINDOW_UPDATE if the DATA frame was of size 0.
0 when IsFin =:= fin ->
@@ -209,8 +229,11 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport,
_ ->
Transport:send(Socket, cow_http2:window_update(Size)),
HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0),
- %% We do not send a stream WINDOW_UPDATE if this was the last DATA frame.
+ %% We do not send a stream WINDOW_UPDATE when the flow control kicks in
+ %% (it'll be sent when the flow recovers) or for the last DATA frame.
case IsFin of
+ nofin when Flow =< 0 ->
+ {HTTP2Machine1, EvHandlerState0};
nofin ->
Transport:send(Socket, cow_http2:window_update(StreamID, Size)),
{cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1),
@@ -224,7 +247,7 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport,
end
end,
{maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine},
- Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin),
+ Stream#stream{flow=Flow, flow_window=FlowWindow, handler_state=Handlers}), StreamID, remote, IsFin),
EvHandlerState}.
headers_frame(State=#http2_state{content_handlers=Handlers0},
@@ -294,12 +317,13 @@ rst_stream_frame(State=#http2_state{streams=Streams0},
{State, EvHandlerState0}
end.
+%% Pushed streams receive the same initial flow value as the parent stream.
push_promise_frame(State=#http2_state{streams=Streams},
StreamID, PromisedStreamID, Headers, #{
method := Method, scheme := Scheme,
authority := Authority, path := Path},
EvHandler, EvHandlerState0) ->
- #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
+ #stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID),
PromisedStreamRef = make_ref(),
URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]),
ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers},
@@ -311,7 +335,8 @@ push_promise_frame(State=#http2_state{streams=Streams},
uri => URI,
headers => Headers
}, EvHandlerState0),
- NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, reply_to=ReplyTo},
+ NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef,
+ reply_to=ReplyTo, flow=InitialFlow},
{State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}.
ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
@@ -322,6 +347,28 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
terminate(State#http2_state{http2_machine=HTTP2Machine}, Error)
end.
+update_flow(State=#http2_state{socket=Socket, transport=Transport,
+ http2_machine=HTTP2Machine0}, _ReplyTo, StreamRef, Inc) ->
+ case get_stream_by_ref(State, StreamRef) of
+ Stream=#stream{id=StreamID, flow=Flow0, flow_window=FlowWindow} ->
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 + Inc
+ end,
+ if
+ %% Flow is active again, update the window.
+ Flow0 =< 0, Flow > 0 ->
+ Transport:send(Socket, cow_http2:window_update(StreamID, FlowWindow)),
+ HTTP2Machine = cow_http2_machine:update_window(StreamID, FlowWindow, HTTP2Machine0),
+ {state, store_stream(State#http2_state{http2_machine=HTTP2Machine},
+ Stream#stream{flow=Flow, flow_window=0})};
+ true ->
+ {state, store_stream(State, Stream#stream{flow=Flow})}
+ end;
+ false ->
+ []
+ end.
+
%% @todo Use Reason.
close(_, #http2_state{streams=Streams}, _, EvHandlerState) ->
{close_streams(Streams), EvHandlerState}.
@@ -337,10 +384,10 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
Transport:send(Socket, cow_http2:ping(0)),
State.
-headers(State=#http2_state{socket=Socket, transport=Transport,
+headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0, streams=Streams},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers0,
- EvHandler, EvHandlerState0) ->
+ InitialFlow0, EvHandler, EvHandlerState0) ->
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
iolist_to_binary(Method), HTTP2Machine0),
{ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0),
@@ -358,14 +405,15 @@ headers(State=#http2_state{socket=Socket, transport=Transport,
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
- Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo},
+ InitialFlow = initial_flow(InitialFlow0, Opts),
+ Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow},
{State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]},
EvHandlerState}.
-request(State=#http2_state{socket=Socket, transport=Transport,
+request(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0, streams=Streams},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body,
- EvHandler, EvHandlerState0) ->
+ InitialFlow0, EvHandler, EvHandlerState0) ->
Headers1 = lists:keystore(<<"content-length">>, 1, Headers0,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
@@ -385,11 +433,15 @@ request(State=#http2_state{socket=Socket, transport=Transport,
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
- Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo},
+ InitialFlow = initial_flow(InitialFlow0, Opts),
+ Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow},
maybe_send_data(State#http2_state{http2_machine=HTTP2Machine,
streams=[Stream|Streams]}, StreamID, fin, Body,
EvHandler, EvHandlerState).
+initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
+initial_flow(InitialFlow, _) -> InitialFlow.
+
prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, Headers0) ->
Authority = case lists:keyfind(<<"host">>, 1, Headers0) of
{_, Host} -> Host;