From 9bba03430c69eb756079ea925995e5a94a52776a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 19 May 2017 18:13:30 +0200 Subject: Add partial support for h2 flow control Specifically we send WINDOW_UPDATE frames in order to receive more data, but we do not respect the flow control window when we are sending request bodies. --- src/gun_http2.erl | 100 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 79 insertions(+), 21 deletions(-) (limited to 'src/gun_http2.erl') diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 2195037..4e108ae 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -32,8 +32,12 @@ reply_to :: pid(), %% Whether we finished sending data. local = nofin :: cowboy_stream:fin(), + %% Local flow control window (how much we can send). + local_window :: integer(), %% Whether we finished receiving data. remote = nofin :: cowboy_stream:fin(), + %% Remote flow control window (how much we accept to receive). + remote_window :: integer(), %% Content handlers state. handler_state :: undefined | gun_content_handler:state() }). @@ -45,7 +49,17 @@ content_handlers :: gun_content_handler:opt(), buffer = <<>> :: binary(), - %% @todo local_settings, next_settings, remote_settings + local_settings = #{ + initial_window_size => 65535, + max_frame_size => 16384 + } :: map(), + remote_settings = #{ + initial_window_size => 65535 + } :: map(), + + %% Connection-wide flow control window. + local_window = 65535 :: integer(), %% How much we can send. + remote_window = 65535 :: integer(), %% How much we accept to receive. streams = [] :: [#stream{}], stream_id = 1 :: non_neg_integer(), @@ -73,14 +87,16 @@ do_check_options([Opt|_]) -> name() -> http2. init(Owner, Socket, Transport, Opts) -> + Handlers = maps:get(content_handlers, Opts, [gun_data]), + State = #http2_state{owner=Owner, socket=Socket, + transport=Transport, content_handlers=Handlers}, + #http2_state{local_settings=Settings} = State, %% Send the HTTP/2 preface. Transport:send(Socket, [ << "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>, - cow_http2:settings(#{}) %% @todo Settings. + cow_http2:settings(Settings) ]), - Handlers = maps:get(content_handlers, Opts, [gun_data]), - #http2_state{owner=Owner, socket=Socket, transport=Transport, - content_handlers=Handlers}. + State. handle(Data, State=#http2_state{buffer=Buffer}) -> parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}). @@ -100,14 +116,18 @@ parse(Data0, State=#http2_state{buffer=Buffer}) -> end. %% DATA frame. -frame({data, StreamID, IsFin, Data}, State) -> - case get_stream_by_id(StreamID, State) of - Stream = #stream{remote=nofin, handler_state=Handlers0} -> +frame({data, StreamID, IsFin, Data}, State0=#http2_state{remote_window=ConnWindow}) -> + case get_stream_by_id(StreamID, State0) of + Stream0 = #stream{remote=nofin, remote_window=StreamWindow, handler_state=Handlers0} -> Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), - remote_fin(Stream#stream{handler_state=Handlers}, State, IsFin); + {Stream, State} = send_window_update( + Stream0#stream{remote_window=StreamWindow - byte_size(Data), + handler_state=Handlers}, + State0#http2_state{remote_window=ConnWindow - byte_size(Data)}), + remote_fin(Stream, State, IsFin); _ -> %% @todo protocol_error if not existing - stream_reset(State, StreamID, {stream_error, stream_closed, + stream_reset(State0, StreamID, {stream_error, stream_closed, 'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'}) end; %% Single HEADERS frame headers block. @@ -162,10 +182,10 @@ frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, frame({rst_stream, StreamID, Reason}, State) -> stream_reset(State, StreamID, {stream_error, Reason, 'Stream reset by server.'}); %% SETTINGS frame. -frame({settings, _Settings}, State=#http2_state{socket=Socket, transport=Transport}) -> - %% @todo Apply SETTINGS. +frame({settings, Settings}, State=#http2_state{socket=Socket, transport=Transport, + remote_settings=Settings0}) -> Transport:send(Socket, cow_http2:settings_ack()), - State; + State#http2_state{remote_settings=maps:merge(Settings0, Settings)}; %% Ack for a previously sent SETTINGS frame. frame(settings_ack, State) -> %% @todo =#http2_state{next_settings=_NextSettings}) -> %% @todo Apply SETTINGS that require synchronization. @@ -219,18 +239,49 @@ frame({ping_ack, _Opaque}, State) -> frame(Frame={goaway, StreamID, _, _}, State) -> terminate(State, StreamID, {stop, Frame, 'Client is going away.'}); %% Connection-wide WINDOW_UPDATE frame. -frame({window_update, _Increment}, State) -> - %% @todo control flow - State; +frame({window_update, Increment}, State=#http2_state{local_window=ConnWindow}) -> + send_data(State#http2_state{local_window=ConnWindow + Increment}); %% Stream-specific WINDOW_UPDATE frame. -frame({window_update, _StreamID, _Increment}, State) -> - %% @todo stream-specific control flow - State; +frame({window_update, StreamID, Increment}, State0=#http2_state{streams=Streams0}) -> + case lists:keyfind(StreamID, #stream.id, Streams0) of + Stream0 = #stream{local_window=StreamWindow} -> + {State, Stream} = send_data(State0, + Stream0#stream{local_window=StreamWindow + Increment}), + Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream), + State#http2_state{streams=Streams}; + false -> + %% @todo Receiving this frame on a stream in the idle state is an error. + %% WINDOW_UPDATE frames may be received for a short period of time + %% after a stream is closed. They must be ignored. + State0 + end; %% Unexpected CONTINUATION frame. frame({continuation, StreamID, _, _}, State) -> terminate(State, StreamID, {connection_error, protocol_error, 'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}). +send_window_update(Stream=#stream{id=StreamID, remote_window=StreamWindow0}, + State=#http2_state{socket=Socket, transport=Transport, remote_window=ConnWindow0}) -> + %% @todo We should make the windows configurable. + MinConnWindow = 8000000, + MinStreamWindow = 1000000, + ConnWindow = if + ConnWindow0 =< MinConnWindow -> + Transport:send(Socket, cow_http2:window_update(MinConnWindow)), + ConnWindow0 + MinConnWindow; + true -> + ConnWindow0 + end, + StreamWindow = if + StreamWindow0 =< MinStreamWindow -> + Transport:send(Socket, cow_http2:window_update(StreamID, MinStreamWindow)), + StreamWindow0 + MinStreamWindow; + true -> + StreamWindow0 + end, + {Stream#stream{remote_window=StreamWindow}, + State#http2_state{remote_window=ConnWindow}}. + parse_status(Status) -> << Code:3/binary, _/bits >> = Status, list_to_integer(binary_to_list(Code)). @@ -314,6 +365,9 @@ data(State=#http2_state{socket=Socket, transport=Transport}, error_stream_not_found(State, StreamRef, ReplyTo) end. +send_data(State) -> State. +send_data(State, Stream) -> {State, Stream}. + %% This same function is found in cowboy_http2. send_data(Socket, Transport, StreamID, IsFin, Data, Length) -> if @@ -386,8 +440,12 @@ error_stream_not_found(State, StreamRef, ReplyTo) -> %% @todo probably change order of args and have state first? new_stream(StreamID, StreamRef, ReplyTo, Remote, Local, - State=#http2_state{streams=Streams}) -> - New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, remote=Remote, local=Local}, + State=#http2_state{streams=Streams, + local_settings=#{initial_window_size := RemoteWindow}, + remote_settings=#{initial_window_size := LocalWindow}}) -> + New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, + remote=Remote, remote_window=RemoteWindow, + local=Local, local_window=LocalWindow}, State#http2_state{streams=[New|Streams]}. get_stream_by_id(StreamID, #http2_state{streams=Streams}) -> -- cgit v1.2.3