From d38d86c4a93340b1dd2633e1649c257e3f160d63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 26 Apr 2018 22:08:05 +0200 Subject: Add options controlling initial control flow windows --- src/cowboy_http2.erl | 48 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 9 deletions(-) (limited to 'src/cowboy_http2.erl') diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 3bcd402..45a60c2 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -27,6 +27,8 @@ enable_connect_protocol => boolean(), env => cowboy_middleware:env(), inactivity_timeout => timeout(), + initial_connection_window_size => 65535..16#7fffffff, + initial_stream_window_size => 0..16#7fffffff, max_concurrent_streams => non_neg_integer() | infinity, max_decode_table_size => non_neg_integer(), max_encode_table_size => non_neg_integer(), @@ -171,6 +173,7 @@ init(Parent, Ref, Socket, Transport, Opts) -> init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer) -> State0 = #state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + remote_window=maps:get(initial_connection_window_size, Opts, 65535), parse_state={preface, sequence, preface_timeout(Opts)}}, State = settings_init(State0, Opts), preface(State), @@ -186,6 +189,7 @@ init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer) -> init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer, _Settings, Req) -> State0 = #state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + remote_window=maps:get(initial_connection_window_size, Opts, 65535), parse_state={preface, sequence, preface_timeout(Opts)}}, %% @todo Apply settings. %% StreamID from HTTP/1.1 Upgrade requests is always 1. @@ -209,10 +213,11 @@ settings_init(State, Opts) -> header_table_size, 4096), S1 = setting_from_opt(S0, Opts, max_concurrent_streams, max_concurrent_streams, infinity), - %% @todo initial_window_size + S2 = setting_from_opt(S1, Opts, initial_stream_window_size, + initial_window_size, 65535), %% @todo max_frame_size %% @todo max_header_list_size - Settings = setting_from_opt(S1, Opts, enable_connect_protocol, + Settings = setting_from_opt(S2, Opts, enable_connect_protocol, enable_connect_protocol, false), State#state{next_settings=Settings}. @@ -222,9 +227,16 @@ setting_from_opt(Settings, Opts, OptName, SettingName, Default) -> Value -> Settings#{SettingName => Value} end. -preface(#state{socket=Socket, transport=Transport, next_settings=Settings}) -> - %% We send next_settings and use defaults until we get a ack. - Transport:send(Socket, cow_http2:settings(Settings)). +%% We send next_settings and use defaults until we get an ack. +%% +%% We also send a WINDOW_UPDATE frame for the connection when +%% the user specified an initial_connection_window_size. +preface(#state{socket=Socket, transport=Transport, opts=Opts, next_settings=Settings}) -> + MaybeWindowUpdate = case maps:get(initial_connection_window_size, Opts, 65535) of + 65535 -> <<>>; + Size -> cow_http2:window_update(Size - 65535) + end, + Transport:send(Socket, [cow_http2:settings(Settings), MaybeWindowUpdate]). preface_timeout(Opts) -> case maps:get(preface_timeout, Opts, 5000) of @@ -348,11 +360,18 @@ frame(State=#state{client_streamid=LastStreamID}, {data, StreamID, _, _}) when StreamID > LastStreamID -> terminate(State, {connection_error, protocol_error, 'DATA frame received on a stream in idle state. (RFC7540 5.1)'}); +frame(State=#state{remote_window=ConnWindow}, {data, _, _, Data}) + when byte_size(Data) > ConnWindow -> + terminate(State, {connection_error, flow_control_error, + 'DATA frame overflowed the connection flow control window. (RFC7540 6.9, RFC7540 6.9.1)'}); frame(State0=#state{remote_window=ConnWindow, streams=Streams, lingering_streams=Lingering}, {data, StreamID, IsFin, Data}) -> DataLen = byte_size(Data), State = State0#state{remote_window=ConnWindow - DataLen}, case lists:keyfind(StreamID, #stream.id, Streams) of + #stream{remote_window=StreamWindow} when StreamWindow < DataLen -> + stream_reset(State, StreamID, {stream_error, flow_control_error, + 'DATA frame overflowed the stream flow control window. (RFC7540 6.9, RFC7540 6.9.1)'}); Stream = #stream{state=flush, remote=nofin, remote_window=StreamWindow} -> after_commands(State, Stream#stream{remote=IsFin, remote_window=StreamWindow - DataLen}); Stream = #stream{state=StreamState0, remote=nofin, remote_window=StreamWindow} -> @@ -436,7 +455,7 @@ frame(State0=#state{socket=Socket, transport=Transport, opts=Opts, State#state{encode_state=EncodeState}; (initial_window_size, NewWindowSize, State) -> OldWindowSize = maps:get(initial_window_size, Settings0, 65535), - update_stream_windows(State, NewWindowSize - OldWindowSize); + update_streams_local_window(State, NewWindowSize - OldWindowSize); (_, _, State) -> State end, State1, Settings); @@ -448,6 +467,9 @@ frame(State0=#state{local_settings=Local0, next_settings=NextSettings}, settings (header_table_size, MaxSize, State=#state{decode_state=DecodeState0}) -> DecodeState = cow_hpack:set_max_size(MaxSize, DecodeState0), State#state{decode_state=DecodeState}; + (initial_window_size, NewWindowSize, State) -> + OldWindowSize = maps:get(initial_window_size, Local0, 65535), + update_streams_remote_window(State, NewWindowSize - OldWindowSize); (_, _, State) -> State end, State1, NextSettings); @@ -718,14 +740,22 @@ send_data(State=#state{streams=Streams}) -> resume_streams(State, Streams, []). %% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update -%% the stream windows for all active streams and perhaps resume -%% sending data. -update_stream_windows(State=#state{streams=Streams0}, Increment) -> +%% the local stream windows for all active streams and perhaps +%% resume sending data. +update_streams_local_window(State=#state{streams=Streams0}, Increment) -> Streams = [ S#stream{local_window=StreamWindow + Increment} || S=#stream{local_window=StreamWindow} <- Streams0], resume_streams(State, Streams, []). +%% When we receive an ack to a SETTINGS frame we sent we need to update +%% the remote stream windows for all active streams. +update_streams_remote_window(State=#state{streams=Streams0}, Increment) -> + Streams = [ + S#stream{remote_window=StreamWindow + Increment} + || S=#stream{remote_window=StreamWindow} <- Streams0], + State#state{streams=Streams}. + resume_streams(State, [], Acc) -> State#state{streams=lists:reverse(Acc)}; %% While technically we should never get < 0 here, let's be on the safe side. -- cgit v1.2.3