aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rw-r--r--doc/src/manual/cowboy_http2.asciidoc52
-rw-r--r--rebar.config2
-rw-r--r--src/cowboy_http2.erl118
-rw-r--r--test/rfc7540_SUITE.erl50
-rw-r--r--test/rfc8441_SUITE.erl5
6 files changed, 136 insertions, 93 deletions
diff --git a/Makefile b/Makefile
index 5fab20d..ba97e25 100644
--- a/Makefile
+++ b/Makefile
@@ -15,7 +15,7 @@ CT_OPTS += -ct_hooks cowboy_ct_hook [] # -boot start_sasl
LOCAL_DEPS = crypto
DEPS = cowlib ranch
-dep_cowlib = git https://github.com/ninenines/cowlib 2.7.3
+dep_cowlib = git https://github.com/ninenines/cowlib master
dep_ranch = git https://github.com/ninenines/ranch 1.7.1
DOC_DEPS = asciideck
diff --git a/doc/src/manual/cowboy_http2.asciidoc b/doc/src/manual/cowboy_http2.asciidoc
index 4907f09..33c801c 100644
--- a/doc/src/manual/cowboy_http2.asciidoc
+++ b/doc/src/manual/cowboy_http2.asciidoc
@@ -18,21 +18,27 @@ as a Ranch protocol.
----
opts() :: #{
connection_type => worker | supervisor,
+ connection_window_margin_size => 0..16#7fffffff,
+ connection_window_update_threshold => 0..16#7fffffff,
enable_connect_protocol => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
initial_connection_window_size => 65535..16#7fffffff,
initial_stream_window_size => 0..16#7fffffff,
max_concurrent_streams => non_neg_integer() | infinity,
+ max_connection_window_size => 0..16#7fffffff,
max_decode_table_size => non_neg_integer(),
max_encode_table_size => non_neg_integer(),
max_frame_size_received => 16384..16777215,
max_frame_size_sent => 16384..16777215 | infinity,
+ max_stream_window_size => 0..16#7fffffff,
preface_timeout => timeout(),
proxy_header => boolean(),
sendfile => boolean(),
settings_timeout => timeout(),
- stream_handlers => [module()]
+ stream_handlers => [module()],
+ stream_window_margin_size => 0..16#7fffffff,
+ stream_window_update_threshold => 0..16#7fffffff
}
----
@@ -51,6 +57,19 @@ connection_type (supervisor)::
Whether the connection process also acts as a supervisor.
+connection_window_margin_size (65535)::
+
+Extra amount to be added to the window size when
+updating the connection window. This is used to
+ensure that there is always some space available in
+the window.
+
+connection_window_update_threshold (163840)::
+
+The connection window will only get updated when its size
+becomes lower than this threshold. This is to avoid sending
+too many `WINDOW_UPDATE` frames.
+
enable_connect_protocol (false)::
Whether to enable the extended CONNECT method to allow
@@ -84,6 +103,12 @@ max_concurrent_streams (infinity)::
Maximum number of concurrent streams allowed on the connection.
+max_connection_window_size (16#7fffffff)::
+
+Maximum connection window size. This is used as an upper bound
+when calculating the window size, either when reading the request
+body or receiving said body.
+
max_decode_table_size (4096)::
Maximum header table size used by the decoder. This is the value advertised
@@ -111,6 +136,12 @@ following the client's advertised maximum.
Note that actual frame sizes may be lower than the limit when
there is not enough space left in the flow control window.
+max_stream_window_size (16#7fffffff)::
+
+Maximum stream window size. This is used as an upper bound
+when calculating the window size, either when reading the request
+body or receiving said body.
+
preface_timeout (5000)::
Time in ms Cowboy is willing to wait for the connection preface.
@@ -135,8 +166,27 @@ stream_handlers ([cowboy_stream_h])::
Ordered list of stream handlers that will handle all stream events.
+stream_window_margin_size (65535)::
+
+Extra amount to be added to the window size when
+updating a stream's window. This is used to
+ensure that there is always some space available in
+the window.
+
+stream_window_update_threshold (163840)::
+
+A stream's window will only get updated when its size
+becomes lower than this threshold. This is to avoid sending
+too many `WINDOW_UPDATE` frames.
+
== Changelog
+* *2.7*: Add the options `connection_window_margin_size`,
+ `connection_window_update_threshold`,
+ `max_connection_window_size`, `max_stream_window_size`,
+ `stream_window_margin_size` and
+ `stream_window_update_threshold` to configure
+ behavior on sending WINDOW_UPDATE frames.
* *2.6*: The `proxy_header` and `sendfile` options were added.
* *2.4*: Add the options `initial_connection_window_size`,
`initial_stream_window_size`, `max_concurrent_streams`,
diff --git a/rebar.config b/rebar.config
index bb6e0ef..cb76748 100644
--- a/rebar.config
+++ b/rebar.config
@@ -1,4 +1,4 @@
{deps, [
-{cowlib,".*",{git,"https://github.com/ninenines/cowlib","2.7.3"}},{ranch,".*",{git,"https://github.com/ninenines/ranch","1.7.1"}}
+{cowlib,".*",{git,"https://github.com/ninenines/cowlib","master"}},{ranch,".*",{git,"https://github.com/ninenines/ranch","1.7.1"}}
]}.
{erl_opts, [debug_info,warn_export_vars,warn_shadow_vars,warn_obsolete_guard,warn_missing_spec,warn_untyped_record]}.
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 7c9f070..7039ab2 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -30,6 +30,8 @@
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
connection_type => worker | supervisor,
+ connection_window_margin_size => 0..16#7fffffff,
+ connection_window_update_threshold => 0..16#7fffffff,
enable_connect_protocol => boolean(),
env => cowboy_middleware:env(),
idle_timeout => timeout(),
@@ -38,10 +40,12 @@
initial_stream_window_size => 0..16#7fffffff,
logger => module(),
max_concurrent_streams => non_neg_integer() | infinity,
+ max_connection_window_size => 0..16#7fffffff,
max_decode_table_size => non_neg_integer(),
max_encode_table_size => non_neg_integer(),
max_frame_size_received => 16384..16777215,
max_frame_size_sent => 16384..16777215 | infinity,
+ max_stream_window_size => 0..16#7fffffff,
metrics_callback => cowboy_metrics_h:metrics_callback(),
middlewares => [module()],
preface_timeout => timeout(),
@@ -50,6 +54,8 @@
settings_timeout => timeout(),
shutdown_timeout => timeout(),
stream_handlers => [module()],
+ stream_window_margin_size => 0..16#7fffffff,
+ stream_window_update_threshold => 0..16#7fffffff,
tracer_callback => cowboy_tracer_h:tracer_callback(),
tracer_match_specs => cowboy_tracer_h:tracer_match_specs(),
%% Open ended because configured stream handlers might add options.
@@ -57,6 +63,17 @@
}.
-export_type([opts/0]).
+-record(stream, {
+ %% Whether the stream is currently stopping.
+ status = running :: running | stopping,
+
+ %% Flow requested for this stream.
+ flow = 0 :: non_neg_integer(),
+
+ %% Stream state.
+ state :: {module, any()}
+}).
+
-record(state, {
parent = undefined :: pid(),
ref :: ranch:ref(),
@@ -81,9 +98,12 @@
http2_status :: sequence | settings | upgrade | connected | closing,
http2_machine :: cow_http2_machine:http2_machine(),
+ %% Flow requested for all streams.
+ flow = 0 :: non_neg_integer(),
+
%% Currently active HTTP/2 streams. Streams may be initiated either
%% by the client or by the server through PUSH_PROMISE frames.
- streams = #{} :: #{cow_http2:streamid() => {running | stopping, {module, any()}}},
+ streams = #{} :: #{cow_http2:streamid() => #stream{}},
%% Streams can spawn zero or more children which are then managed
%% by this module if operating as a supervisor.
@@ -259,8 +279,8 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame);
{ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->
data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data);
- {ok, {lingering_data, _StreamID, DataLen}, HTTP2Machine} ->
- lingering_data_frame(State#state{http2_machine=HTTP2Machine}, DataLen);
+ {ok, {lingering_data, StreamID, DataLen}, HTTP2Machine} ->
+ lingering_data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, DataLen);
{ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} ->
headers_frame(State#state{http2_machine=HTTP2Machine},
StreamID, IsFin, Headers, PseudoHeaders, BodyLen);
@@ -292,30 +312,38 @@ maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) ->
end,
State.
-data_frame(State=#state{opts=Opts, streams=Streams}, StreamID, IsFin, Data) ->
+data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) ->
case Streams of
- #{StreamID := {running, StreamState0}} ->
+ #{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} ->
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
{Commands, StreamState} ->
- commands(State#state{streams=Streams#{StreamID => {running, StreamState}}},
- StreamID, Commands)
+ %% Remove the amount of data received from the flow.
+ %% We may receive more data than we requested. We ensure
+ %% that the flow value doesn't go lower than 0.
+ Size = byte_size(Data),
+ State = update_window(State0#state{flow=max(0, Flow - Size),
+ streams=Streams#{StreamID => Stream#stream{
+ flow=max(0, StreamFlow - Size), state=StreamState}}},
+ StreamID),
+ commands(State, StreamID, Commands)
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(data,
[StreamID, IsFin, Data, StreamState0],
Class, Exception, erlang:get_stacktrace()), Opts),
- reset_stream(State, StreamID, {internal_error, {Class, Exception},
+ reset_stream(State0, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:data/4.'})
end;
%% We ignore DATA frames for streams that are stopping.
#{} ->
- State
+ State0
end.
-lingering_data_frame(State=#state{socket=Socket, transport=Transport,
- http2_machine=HTTP2Machine0}, DataLen) ->
- Transport:send(Socket, cow_http2:window_update(DataLen)),
- HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0),
- State#state{http2_machine=HTTP2Machine1}.
+lingering_data_frame(State, _StreamID, _DataLen) ->
+ %% We do nothing when receiving a lingering DATA frame.
+ %% We already removed the stream flow from the connection
+ %% flow and are therefore already accounting for the window
+ %% being reduced by these frames.
+ State.
headers_frame(State, StreamID, IsFin, Headers,
PseudoHeaders=#{method := <<"CONNECT">>}, _)
@@ -410,7 +438,7 @@ headers_frame(State=#state{opts=Opts, streams=Streams}, StreamID, Req) ->
try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
commands(State#state{
- streams=Streams#{StreamID => {running, StreamState}}},
+ streams=Streams#{StreamID => #stream{state=StreamState}}},
StreamID, Commands)
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(init,
@@ -449,7 +477,7 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer},
rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) ->
case maps:take(StreamID, Streams0) of
- {{_, StreamState}, Streams} ->
+ {#stream{state=StreamState}, Streams} ->
terminate_stream_handler(State, StreamID, Reason, StreamState),
Children = cowboy_children:shutdown(Children0, StreamID),
State#state{streams=Streams, children=Children};
@@ -494,10 +522,10 @@ down(State=#state{opts=Opts, children=Children0}, Pid, Msg) ->
info(State=#state{opts=Opts, streams=Streams}, StreamID, Msg) ->
case Streams of
- #{StreamID := {IsRunning, StreamState0}} ->
+ #{StreamID := Stream=#stream{state=StreamState0}} ->
try cowboy_stream:info(StreamID, Msg, StreamState0) of
{Commands, StreamState} ->
- commands(State#state{streams=Streams#{StreamID => {IsRunning, StreamState}}},
+ commands(State#state{streams=Streams#{StreamID => Stream#stream{state=StreamState}}},
StreamID, Commands)
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(info,
@@ -586,15 +614,13 @@ commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Ma
State0
end,
commands(State, StreamID, Tail);
-commands(State=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
- StreamID, [{flow, Size}|Tail]) ->
- Transport:send(Socket, [
- cow_http2:window_update(Size),
- cow_http2:window_update(StreamID, Size)
- ]),
- HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0),
- HTTP2Machine = cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1),
- commands(State#state{http2_machine=HTTP2Machine}, StreamID, Tail);
+%% Read the request body.
+commands(State0=#state{flow=Flow, streams=Streams}, StreamID, [{flow, Size}|Tail]) ->
+ #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams,
+ State = update_window(State0#state{flow=Flow + Size,
+ streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}},
+ StreamID),
+ commands(State, StreamID, Tail);
%% Supervise a child process.
commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
@@ -628,6 +654,25 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
cowboy:log(Log, Opts),
commands(State, StreamID, Tail).
+%% Tentatively update the window after the flow was updated.
+
+update_window(State=#state{socket=Socket, transport=Transport,
+ http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) ->
+ #{StreamID := #stream{flow=StreamFlow}} = Streams,
+ {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of
+ ok -> {<<>>, HTTP2Machine0};
+ {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1}
+ end,
+ {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of
+ ok -> {<<>>, HTTP2Machine2};
+ {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3}
+ end,
+ case {Data1, Data2} of
+ {<<>>, <<>>} -> ok;
+ _ -> Transport:send(Socket, [Data1, Data2])
+ end,
+ State#state{http2_machine=HTTP2Machine}.
+
%% Send the response, trailers or data.
send_response(State0, StreamID, StatusCode, Headers, Body) ->
@@ -741,7 +786,7 @@ goaway(State, {goaway, _, Reason, _}) ->
%% Cancel client-initiated streams that are above LastStreamID.
goaway_streams(_, [], _, _, Acc) ->
Acc;
-goaway_streams(State, [{StreamID, {_, StreamState}}|Tail], LastStreamID, Reason, Acc)
+goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamID, Reason, Acc)
when StreamID > LastStreamID, (StreamID rem 2) =:= 0 ->
terminate_stream_handler(State, StreamID, Reason, StreamState),
goaway_streams(State, Tail, LastStreamID, Reason, Acc);
@@ -780,7 +825,7 @@ terminate_reason({internal_error, _, _}) -> internal_error.
terminate_all_streams(_, [], _) ->
ok;
-terminate_all_streams(State, [{StreamID, {_, StreamState}}|Tail], Reason) ->
+terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) ->
terminate_stream_handler(State, StreamID, Reason, StreamState),
terminate_all_streams(State, Tail, Reason).
@@ -820,13 +865,13 @@ stop_stream(State=#state{http2_machine=HTTP2Machine}, StreamID) ->
end.
stopping(State=#state{streams=Streams}, StreamID) ->
- #{StreamID := {_, StreamState}} = Streams,
- State#state{streams=Streams#{StreamID => {stopping, StreamState}}}.
+ #{StreamID := Stream} = Streams,
+ State#state{streams=Streams#{StreamID => Stream#stream{status=stopping}}}.
%% If we finished sending data and the stream is stopping, terminate it.
maybe_terminate_stream(State=#state{streams=Streams}, StreamID, fin) ->
case Streams of
- #{StreamID := {stopping, _}} ->
+ #{StreamID := #stream{status=stopping}} ->
terminate_stream(State, StreamID);
_ ->
State
@@ -849,12 +894,15 @@ terminate_stream(State0=#state{socket=Socket, transport=Transport,
end,
terminate_stream(State, StreamID, normal).
-terminate_stream(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) ->
+%% We remove the stream flow from the connection flow. Any further
+%% data received for this stream is therefore fully contained within
+%% the extra window we allocated for this stream.
+terminate_stream(State=#state{flow=Flow, streams=Streams0, children=Children0}, StreamID, Reason) ->
case maps:take(StreamID, Streams0) of
- {{_, StreamState}, Streams} ->
+ {#stream{flow=StreamFlow, state=StreamState}, Streams} ->
terminate_stream_handler(State, StreamID, Reason, StreamState),
Children = cowboy_children:shutdown(Children0, StreamID),
- State#state{streams=Streams, children=Children};
+ State#state{flow=Flow - StreamFlow, streams=Streams, children=Children};
error ->
State
end.
diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl
index fe0c4ef..4f27dfa 100644
--- a/test/rfc7540_SUITE.erl
+++ b/test/rfc7540_SUITE.erl
@@ -3117,56 +3117,6 @@ data_reject_overflow_stream(Config0) ->
cowboy:stop_listener(?FUNCTION_NAME)
end.
-lingering_data_counts_toward_connection_window(Config0) ->
- doc("DATA frames received after sending RST_STREAM must be counted "
- "toward the connection flow-control window. (RFC7540 5.1)"),
- Config = cowboy_test:init_http(?FUNCTION_NAME, #{
- env => #{dispatch => cowboy_router:compile(init_routes(Config0))},
- initial_connection_window_size => 100000
- }, Config0),
- try
- %% We need to do the handshake manually because a WINDOW_UPDATE
- %% frame will be sent to update the connection window.
- {ok, Socket} = gen_tcp:connect("localhost", config(port, Config), [binary, {active, false}]),
- %% Send a valid preface.
- ok = gen_tcp:send(Socket, ["PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n", cow_http2:settings(#{})]),
- %% Receive the server preface.
- {ok, << Len1:24 >>} = gen_tcp:recv(Socket, 3, 1000),
- {ok, << 4:8, 0:40, _:Len1/binary >>} = gen_tcp:recv(Socket, 6 + Len1, 1000),
- %% Send the SETTINGS ack.
- ok = gen_tcp:send(Socket, cow_http2:settings_ack()),
- %% Receive the WINDOW_UPDATE for the connection.
- {ok, << 4:24, 8:8, 0:40, _:32 >>} = gen_tcp:recv(Socket, 13, 1000),
- %% Receive the SETTINGS ack.
- {ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000),
- Headers = [
- {<<":method">>, <<"POST">>},
- {<<":scheme">>, <<"http">>},
- {<<":authority">>, <<"localhost">>}, %% @todo Correct port number.
- {<<":path">>, <<"/loop_handler_abort">>}
- ],
- {HeadersBlock, _} = cow_hpack:encode(Headers),
- ok = gen_tcp:send(Socket, [
- cow_http2:headers(1, nofin, HeadersBlock),
- cow_http2:data(1, nofin, <<0:1000/unit:8>>)
- ]),
- % Make sure server send RST_STREAM.
- timer:sleep(100),
- ok = gen_tcp:send(Socket, [
- cow_http2:data(1, nofin, <<0:0/unit:8>>),
- cow_http2:data(1, fin, <<0:1000/unit:8>>)
- ]),
- {ok, << SkipLen:24, 1:8, _:8, 1:32 >>} = gen_tcp:recv(Socket, 9, 1000),
- % Skip the header.
- {ok, _} = gen_tcp:recv(Socket, SkipLen, 1000),
- % Skip RST_STREAM.
- {ok, << 4:24, 3:8, 1:40, _:32 >>} = gen_tcp:recv(Socket, 13, 1000),
- % Received a WINDOW_UPDATE frame after we got RST_STREAM.
- {ok, << 4:24, 8:8, 0:40, 1000:32 >>} = gen_tcp:recv(Socket, 13, 1000)
- after
- cowboy:stop_listener(?FUNCTION_NAME)
- end.
-
%% (RFC7540 6.9.1)
% Frames with zero length with the END_STREAM flag set (that
% is, an empty DATA frame) MAY be sent if there is no available space
diff --git a/test/rfc8441_SUITE.erl b/test/rfc8441_SUITE.erl
index 3105ddb..245658f 100644
--- a/test/rfc8441_SUITE.erl
+++ b/test/rfc8441_SUITE.erl
@@ -389,15 +389,10 @@ accept_handshake_when_enabled(Config) ->
{RespHeaders, _} = cow_hpack:decode(RespHeadersBlock),
{_, <<"200">>} = lists:keyfind(<<":status">>, 1, RespHeaders),
%% Masked text hello echoed back clear by the server.
- %%
- %% We receive WINDOW_UPDATE frames before the actual data
- %% due to flow control updates every time a data frame is received.
Mask = 16#37fa213d,
MaskedHello = ws_SUITE:do_mask(<<"Hello">>, Mask, <<>>),
ok = gen_tcp:send(Socket, cow_http2:data(1, nofin,
<<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedHello/binary>>)),
- {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000),
- {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000),
{ok, <<Len2:24, _:8, _:8, _:32>>} = gen_tcp:recv(Socket, 9, 1000),
{ok, <<1:1, 0:3, 1:4, 0:1, 5:7, "Hello">>} = gen_tcp:recv(Socket, Len2, 1000),
ok.