path: root/src/cowboy_http2.erl
diff options
authorLoïc Hoguin <[email protected]>2018-04-26 22:08:05 +0200
committerLoïc Hoguin <[email protected]>2018-04-26 22:08:05 +0200
commitd38d86c4a93340b1dd2633e1649c257e3f160d63 (patch)
treebe371e3aab65b4375ab03e71426bb72652e12872 /src/cowboy_http2.erl
parentb2f16d462a9e08b50430814a3028bb5b123a02d2 (diff)
Add options controlling initial control flow windows
Diffstat (limited to 'src/cowboy_http2.erl')
1 files changed, 39 insertions, 9 deletions
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 3bcd402..45a60c2 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -27,6 +27,8 @@
enable_connect_protocol => boolean(),
env => cowboy_middleware:env(),
inactivity_timeout => timeout(),
+ initial_connection_window_size => 65535..16#7fffffff,
+ initial_stream_window_size => 0..16#7fffffff,
max_concurrent_streams => non_neg_integer() | infinity,
max_decode_table_size => non_neg_integer(),
max_encode_table_size => non_neg_integer(),
@@ -171,6 +173,7 @@ init(Parent, Ref, Socket, Transport, Opts) ->
init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer) ->
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, opts=Opts, peer=Peer, sock=Sock, cert=Cert,
+ remote_window=maps:get(initial_connection_window_size, Opts, 65535),
parse_state={preface, sequence, preface_timeout(Opts)}},
State = settings_init(State0, Opts),
@@ -186,6 +189,7 @@ init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer) ->
init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer, _Settings, Req) ->
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, opts=Opts, peer=Peer, sock=Sock, cert=Cert,
+ remote_window=maps:get(initial_connection_window_size, Opts, 65535),
parse_state={preface, sequence, preface_timeout(Opts)}},
%% @todo Apply settings.
%% StreamID from HTTP/1.1 Upgrade requests is always 1.
@@ -209,10 +213,11 @@ settings_init(State, Opts) ->
header_table_size, 4096),
S1 = setting_from_opt(S0, Opts, max_concurrent_streams,
max_concurrent_streams, infinity),
- %% @todo initial_window_size
+ S2 = setting_from_opt(S1, Opts, initial_stream_window_size,
+ initial_window_size, 65535),
%% @todo max_frame_size
%% @todo max_header_list_size
- Settings = setting_from_opt(S1, Opts, enable_connect_protocol,
+ Settings = setting_from_opt(S2, Opts, enable_connect_protocol,
enable_connect_protocol, false),
@@ -222,9 +227,16 @@ setting_from_opt(Settings, Opts, OptName, SettingName, Default) ->
Value -> Settings#{SettingName => Value}
-preface(#state{socket=Socket, transport=Transport, next_settings=Settings}) ->
- %% We send next_settings and use defaults until we get a ack.
- Transport:send(Socket, cow_http2:settings(Settings)).
+%% We send next_settings and use defaults until we get an ack.
+%% We also send a WINDOW_UPDATE frame for the connection when
+%% the user specified an initial_connection_window_size.
+preface(#state{socket=Socket, transport=Transport, opts=Opts, next_settings=Settings}) ->
+ MaybeWindowUpdate = case maps:get(initial_connection_window_size, Opts, 65535) of
+ 65535 -> <<>>;
+ Size -> cow_http2:window_update(Size - 65535)
+ end,
+ Transport:send(Socket, [cow_http2:settings(Settings), MaybeWindowUpdate]).
preface_timeout(Opts) ->
case maps:get(preface_timeout, Opts, 5000) of
@@ -348,11 +360,18 @@ frame(State=#state{client_streamid=LastStreamID}, {data, StreamID, _, _})
when StreamID > LastStreamID ->
terminate(State, {connection_error, protocol_error,
'DATA frame received on a stream in idle state. (RFC7540 5.1)'});
+frame(State=#state{remote_window=ConnWindow}, {data, _, _, Data})
+ when byte_size(Data) > ConnWindow ->
+ terminate(State, {connection_error, flow_control_error,
+ 'DATA frame overflowed the connection flow control window. (RFC7540 6.9, RFC7540 6.9.1)'});
frame(State0=#state{remote_window=ConnWindow, streams=Streams, lingering_streams=Lingering},
{data, StreamID, IsFin, Data}) ->
DataLen = byte_size(Data),
State = State0#state{remote_window=ConnWindow - DataLen},
case lists:keyfind(StreamID, #stream.id, Streams) of
+ #stream{remote_window=StreamWindow} when StreamWindow < DataLen ->
+ stream_reset(State, StreamID, {stream_error, flow_control_error,
+ 'DATA frame overflowed the stream flow control window. (RFC7540 6.9, RFC7540 6.9.1)'});
Stream = #stream{state=flush, remote=nofin, remote_window=StreamWindow} ->
after_commands(State, Stream#stream{remote=IsFin, remote_window=StreamWindow - DataLen});
Stream = #stream{state=StreamState0, remote=nofin, remote_window=StreamWindow} ->
@@ -436,7 +455,7 @@ frame(State0=#state{socket=Socket, transport=Transport, opts=Opts,
(initial_window_size, NewWindowSize, State) ->
OldWindowSize = maps:get(initial_window_size, Settings0, 65535),
- update_stream_windows(State, NewWindowSize - OldWindowSize);
+ update_streams_local_window(State, NewWindowSize - OldWindowSize);
(_, _, State) ->
end, State1, Settings);
@@ -448,6 +467,9 @@ frame(State0=#state{local_settings=Local0, next_settings=NextSettings}, settings
(header_table_size, MaxSize, State=#state{decode_state=DecodeState0}) ->
DecodeState = cow_hpack:set_max_size(MaxSize, DecodeState0),
+ (initial_window_size, NewWindowSize, State) ->
+ OldWindowSize = maps:get(initial_window_size, Local0, 65535),
+ update_streams_remote_window(State, NewWindowSize - OldWindowSize);
(_, _, State) ->
end, State1, NextSettings);
@@ -718,14 +740,22 @@ send_data(State=#state{streams=Streams}) ->
resume_streams(State, Streams, []).
%% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update
-%% the stream windows for all active streams and perhaps resume
-%% sending data.
-update_stream_windows(State=#state{streams=Streams0}, Increment) ->
+%% the local stream windows for all active streams and perhaps
+%% resume sending data.
+update_streams_local_window(State=#state{streams=Streams0}, Increment) ->
Streams = [
S#stream{local_window=StreamWindow + Increment}
|| S=#stream{local_window=StreamWindow} <- Streams0],
resume_streams(State, Streams, []).
+%% When we receive an ack to a SETTINGS frame we sent we need to update
+%% the remote stream windows for all active streams.
+update_streams_remote_window(State=#state{streams=Streams0}, Increment) ->
+ Streams = [
+ S#stream{remote_window=StreamWindow + Increment}
+ || S=#stream{remote_window=StreamWindow} <- Streams0],
+ State#state{streams=Streams}.
resume_streams(State, [], Acc) ->
%% While technically we should never get < 0 here, let's be on the safe side.