From 611f9a9b78cab4005892e13dffb7a2c8e44580ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 2 Aug 2019 14:30:08 +0200 Subject: Add flow control Flow control is disabled by default. The initial flow value must be set to enable it (either for the entire connection or on a per-request basis). Flow applies to all HTTP streams as well as Websocket. HTTP/2 pushed streams receive the same value as their originating stream. --- src/gun_http2.erl | 82 +++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 67 insertions(+), 15 deletions(-) (limited to 'src/gun_http2.erl') diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 20e21ec..3b3b79b 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -18,10 +18,11 @@ -export([name/0]). -export([init/4]). -export([handle/4]). +-export([update_flow/4]). -export([close/4]). -export([keepalive/1]). --export([headers/10]). --export([request/11]). +-export([headers/11]). +-export([request/12]). -export([data/7]). -export([cancel/5]). -export([stream_info/2]). @@ -36,6 +37,10 @@ %% Process to send messages to. reply_to :: pid(), + %% Flow control. + flow :: integer() | infinity, + flow_window = 0 :: non_neg_integer(), + %% Content handlers state. handler_state :: undefined | gun_content_handler:state() }). @@ -66,10 +71,15 @@ do_check_options([Opt={content_handlers, Handlers}|Opts]) -> ok -> do_check_options(Opts); error -> {error, {options, {http2, Opt}}} end; +do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 -> + do_check_options(Opts); do_check_options([{keepalive, infinity}|Opts]) -> do_check_options(Opts); do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> do_check_options(Opts); +%% @todo Add all http2_machine options. +do_check_options([{max_frame_size_received, _}|Opts]) -> + do_check_options(Opts); do_check_options([Opt|_]) -> {error, {options, {http2, Opt}}}. @@ -192,10 +202,20 @@ lingering_data_frame(State=#http2_state{socket=Socket, transport=Transport, data_frame(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, IsFin, Data, EvHandler, EvHandlerState0) -> - Stream = #stream{ref=StreamRef, reply_to=ReplyTo, - handler_state=Handlers0} = get_stream_by_id(State, StreamID), - Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), + Stream = #stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0, + flow_window=FlowWindow0, handler_state=Handlers0} = get_stream_by_id(State, StreamID), + {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0), + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 - Dec + end, Size = byte_size(Data), + FlowWindow = if + IsFin =:= nofin, Flow =< 0 -> + FlowWindow0 + Size; + true -> + FlowWindow0 + end, {HTTP2Machine, EvHandlerState} = case Size of %% We do not send a WINDOW_UPDATE if the DATA frame was of size 0. 0 when IsFin =:= fin -> @@ -209,8 +229,11 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport, _ -> Transport:send(Socket, cow_http2:window_update(Size)), HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0), - %% We do not send a stream WINDOW_UPDATE if this was the last DATA frame. + %% We do not send a stream WINDOW_UPDATE when the flow control kicks in + %% (it'll be sent when the flow recovers) or for the last DATA frame. case IsFin of + nofin when Flow =< 0 -> + {HTTP2Machine1, EvHandlerState0}; nofin -> Transport:send(Socket, cow_http2:window_update(StreamID, Size)), {cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1), @@ -224,7 +247,7 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport, end end, {maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine}, - Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin), + Stream#stream{flow=Flow, flow_window=FlowWindow, handler_state=Handlers}), StreamID, remote, IsFin), EvHandlerState}. headers_frame(State=#http2_state{content_handlers=Handlers0}, @@ -294,12 +317,13 @@ rst_stream_frame(State=#http2_state{streams=Streams0}, {State, EvHandlerState0} end. +%% Pushed streams receive the same initial flow value as the parent stream. push_promise_frame(State=#http2_state{streams=Streams}, StreamID, PromisedStreamID, Headers, #{ method := Method, scheme := Scheme, authority := Authority, path := Path}, EvHandler, EvHandlerState0) -> - #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), + #stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID), PromisedStreamRef = make_ref(), URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]), ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers}, @@ -311,7 +335,8 @@ push_promise_frame(State=#http2_state{streams=Streams}, uri => URI, headers => Headers }, EvHandlerState0), - NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, reply_to=ReplyTo}, + NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, + reply_to=ReplyTo, flow=InitialFlow}, {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}. ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> @@ -322,6 +347,28 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> terminate(State#http2_state{http2_machine=HTTP2Machine}, Error) end. +update_flow(State=#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine0}, _ReplyTo, StreamRef, Inc) -> + case get_stream_by_ref(State, StreamRef) of + Stream=#stream{id=StreamID, flow=Flow0, flow_window=FlowWindow} -> + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 + Inc + end, + if + %% Flow is active again, update the window. + Flow0 =< 0, Flow > 0 -> + Transport:send(Socket, cow_http2:window_update(StreamID, FlowWindow)), + HTTP2Machine = cow_http2_machine:update_window(StreamID, FlowWindow, HTTP2Machine0), + {state, store_stream(State#http2_state{http2_machine=HTTP2Machine}, + Stream#stream{flow=Flow, flow_window=0})}; + true -> + {state, store_stream(State, Stream#stream{flow=Flow})} + end; + false -> + [] + end. + %% @todo Use Reason. close(_, #http2_state{streams=Streams}, _, EvHandlerState) -> {close_streams(Streams), EvHandlerState}. @@ -337,10 +384,10 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> Transport:send(Socket, cow_http2:ping(0)), State. -headers(State=#http2_state{socket=Socket, transport=Transport, +headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0, streams=Streams}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, - EvHandler, EvHandlerState0) -> + InitialFlow0, EvHandler, EvHandlerState0) -> {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0), @@ -358,14 +405,15 @@ headers(State=#http2_state{socket=Socket, transport=Transport, StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), - Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, + InitialFlow = initial_flow(InitialFlow0, Opts), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow}, {State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]}, EvHandlerState}. -request(State=#http2_state{socket=Socket, transport=Transport, +request(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0, streams=Streams}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body, - EvHandler, EvHandlerState0) -> + InitialFlow0, EvHandler, EvHandlerState0) -> Headers1 = lists:keystore(<<"content-length">>, 1, Headers0, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( @@ -385,11 +433,15 @@ request(State=#http2_state{socket=Socket, transport=Transport, StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), - Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, + InitialFlow = initial_flow(InitialFlow0, Opts), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow}, maybe_send_data(State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]}, StreamID, fin, Body, EvHandler, EvHandlerState). +initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow; +initial_flow(InitialFlow, _) -> InitialFlow. + prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, Headers0) -> Authority = case lists:keyfind(<<"host">>, 1, Headers0) of {_, Host} -> Host; -- cgit v1.2.3