diff options
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | doc/src/manual/cowboy_http2.asciidoc | 52 | ||||
-rw-r--r-- | rebar.config | 2 | ||||
-rw-r--r-- | src/cowboy_http2.erl | 118 | ||||
-rw-r--r-- | test/rfc7540_SUITE.erl | 50 | ||||
-rw-r--r-- | test/rfc8441_SUITE.erl | 5 |
6 files changed, 136 insertions, 93 deletions
@@ -15,7 +15,7 @@ CT_OPTS += -ct_hooks cowboy_ct_hook [] # -boot start_sasl LOCAL_DEPS = crypto DEPS = cowlib ranch -dep_cowlib = git https://github.com/ninenines/cowlib 2.7.3 +dep_cowlib = git https://github.com/ninenines/cowlib master dep_ranch = git https://github.com/ninenines/ranch 1.7.1 DOC_DEPS = asciideck diff --git a/doc/src/manual/cowboy_http2.asciidoc b/doc/src/manual/cowboy_http2.asciidoc index 4907f09..33c801c 100644 --- a/doc/src/manual/cowboy_http2.asciidoc +++ b/doc/src/manual/cowboy_http2.asciidoc @@ -18,21 +18,27 @@ as a Ranch protocol. ---- opts() :: #{ connection_type => worker | supervisor, + connection_window_margin_size => 0..16#7fffffff, + connection_window_update_threshold => 0..16#7fffffff, enable_connect_protocol => boolean(), idle_timeout => timeout(), inactivity_timeout => timeout(), initial_connection_window_size => 65535..16#7fffffff, initial_stream_window_size => 0..16#7fffffff, max_concurrent_streams => non_neg_integer() | infinity, + max_connection_window_size => 0..16#7fffffff, max_decode_table_size => non_neg_integer(), max_encode_table_size => non_neg_integer(), max_frame_size_received => 16384..16777215, max_frame_size_sent => 16384..16777215 | infinity, + max_stream_window_size => 0..16#7fffffff, preface_timeout => timeout(), proxy_header => boolean(), sendfile => boolean(), settings_timeout => timeout(), - stream_handlers => [module()] + stream_handlers => [module()], + stream_window_margin_size => 0..16#7fffffff, + stream_window_update_threshold => 0..16#7fffffff } ---- @@ -51,6 +57,19 @@ connection_type (supervisor):: Whether the connection process also acts as a supervisor. +connection_window_margin_size (65535):: + +Extra amount to be added to the window size when +updating the connection window. This is used to +ensure that there is always some space available in +the window. + +connection_window_update_threshold (163840):: + +The connection window will only get updated when its size +becomes lower than this threshold. This is to avoid sending +too many `WINDOW_UPDATE` frames. + enable_connect_protocol (false):: Whether to enable the extended CONNECT method to allow @@ -84,6 +103,12 @@ max_concurrent_streams (infinity):: Maximum number of concurrent streams allowed on the connection. +max_connection_window_size (16#7fffffff):: + +Maximum connection window size. This is used as an upper bound +when calculating the window size, either when reading the request +body or receiving said body. + max_decode_table_size (4096):: Maximum header table size used by the decoder. This is the value advertised @@ -111,6 +136,12 @@ following the client's advertised maximum. Note that actual frame sizes may be lower than the limit when there is not enough space left in the flow control window. +max_stream_window_size (16#7fffffff):: + +Maximum stream window size. This is used as an upper bound +when calculating the window size, either when reading the request +body or receiving said body. + preface_timeout (5000):: Time in ms Cowboy is willing to wait for the connection preface. @@ -135,8 +166,27 @@ stream_handlers ([cowboy_stream_h]):: Ordered list of stream handlers that will handle all stream events. +stream_window_margin_size (65535):: + +Extra amount to be added to the window size when +updating a stream's window. This is used to +ensure that there is always some space available in +the window. + +stream_window_update_threshold (163840):: + +A stream's window will only get updated when its size +becomes lower than this threshold. This is to avoid sending +too many `WINDOW_UPDATE` frames. + == Changelog +* *2.7*: Add the options `connection_window_margin_size`, + `connection_window_update_threshold`, + `max_connection_window_size`, `max_stream_window_size`, + `stream_window_margin_size` and + `stream_window_update_threshold` to configure + behavior on sending WINDOW_UPDATE frames. * *2.6*: The `proxy_header` and `sendfile` options were added. * *2.4*: Add the options `initial_connection_window_size`, `initial_stream_window_size`, `max_concurrent_streams`, diff --git a/rebar.config b/rebar.config index bb6e0ef..cb76748 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{cowlib,".*",{git,"https://github.com/ninenines/cowlib","2.7.3"}},{ranch,".*",{git,"https://github.com/ninenines/ranch","1.7.1"}} +{cowlib,".*",{git,"https://github.com/ninenines/cowlib","master"}},{ranch,".*",{git,"https://github.com/ninenines/ranch","1.7.1"}} ]}. {erl_opts, [debug_info,warn_export_vars,warn_shadow_vars,warn_obsolete_guard,warn_missing_spec,warn_untyped_record]}. diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 7c9f070..7039ab2 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -30,6 +30,8 @@ compress_buffering => boolean(), compress_threshold => non_neg_integer(), connection_type => worker | supervisor, + connection_window_margin_size => 0..16#7fffffff, + connection_window_update_threshold => 0..16#7fffffff, enable_connect_protocol => boolean(), env => cowboy_middleware:env(), idle_timeout => timeout(), @@ -38,10 +40,12 @@ initial_stream_window_size => 0..16#7fffffff, logger => module(), max_concurrent_streams => non_neg_integer() | infinity, + max_connection_window_size => 0..16#7fffffff, max_decode_table_size => non_neg_integer(), max_encode_table_size => non_neg_integer(), max_frame_size_received => 16384..16777215, max_frame_size_sent => 16384..16777215 | infinity, + max_stream_window_size => 0..16#7fffffff, metrics_callback => cowboy_metrics_h:metrics_callback(), middlewares => [module()], preface_timeout => timeout(), @@ -50,6 +54,8 @@ settings_timeout => timeout(), shutdown_timeout => timeout(), stream_handlers => [module()], + stream_window_margin_size => 0..16#7fffffff, + stream_window_update_threshold => 0..16#7fffffff, tracer_callback => cowboy_tracer_h:tracer_callback(), tracer_match_specs => cowboy_tracer_h:tracer_match_specs(), %% Open ended because configured stream handlers might add options. @@ -57,6 +63,17 @@ }. -export_type([opts/0]). +-record(stream, { + %% Whether the stream is currently stopping. + status = running :: running | stopping, + + %% Flow requested for this stream. + flow = 0 :: non_neg_integer(), + + %% Stream state. + state :: {module, any()} +}). + -record(state, { parent = undefined :: pid(), ref :: ranch:ref(), @@ -81,9 +98,12 @@ http2_status :: sequence | settings | upgrade | connected | closing, http2_machine :: cow_http2_machine:http2_machine(), + %% Flow requested for all streams. + flow = 0 :: non_neg_integer(), + %% Currently active HTTP/2 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. - streams = #{} :: #{cow_http2:streamid() => {running | stopping, {module, any()}}}, + streams = #{} :: #{cow_http2:streamid() => #stream{}}, %% Streams can spawn zero or more children which are then managed %% by this module if operating as a supervisor. @@ -259,8 +279,8 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame); {ok, {data, StreamID, IsFin, Data}, HTTP2Machine} -> data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data); - {ok, {lingering_data, _StreamID, DataLen}, HTTP2Machine} -> - lingering_data_frame(State#state{http2_machine=HTTP2Machine}, DataLen); + {ok, {lingering_data, StreamID, DataLen}, HTTP2Machine} -> + lingering_data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, DataLen); {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} -> headers_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Headers, PseudoHeaders, BodyLen); @@ -292,30 +312,38 @@ maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) -> end, State. -data_frame(State=#state{opts=Opts, streams=Streams}, StreamID, IsFin, Data) -> +data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) -> case Streams of - #{StreamID := {running, StreamState0}} -> + #{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} -> try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of {Commands, StreamState} -> - commands(State#state{streams=Streams#{StreamID => {running, StreamState}}}, - StreamID, Commands) + %% Remove the amount of data received from the flow. + %% We may receive more data than we requested. We ensure + %% that the flow value doesn't go lower than 0. + Size = byte_size(Data), + State = update_window(State0#state{flow=max(0, Flow - Size), + streams=Streams#{StreamID => Stream#stream{ + flow=max(0, StreamFlow - Size), state=StreamState}}}, + StreamID), + commands(State, StreamID, Commands) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(data, [StreamID, IsFin, Data, StreamState0], Class, Exception, erlang:get_stacktrace()), Opts), - reset_stream(State, StreamID, {internal_error, {Class, Exception}, + reset_stream(State0, StreamID, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:data/4.'}) end; %% We ignore DATA frames for streams that are stopping. #{} -> - State + State0 end. -lingering_data_frame(State=#state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, DataLen) -> - Transport:send(Socket, cow_http2:window_update(DataLen)), - HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0), - State#state{http2_machine=HTTP2Machine1}. +lingering_data_frame(State, _StreamID, _DataLen) -> + %% We do nothing when receiving a lingering DATA frame. + %% We already removed the stream flow from the connection + %% flow and are therefore already accounting for the window + %% being reduced by these frames. + State. headers_frame(State, StreamID, IsFin, Headers, PseudoHeaders=#{method := <<"CONNECT">>}, _) @@ -410,7 +438,7 @@ headers_frame(State=#state{opts=Opts, streams=Streams}, StreamID, Req) -> try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> commands(State#state{ - streams=Streams#{StreamID => {running, StreamState}}}, + streams=Streams#{StreamID => #stream{state=StreamState}}}, StreamID, Commands) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(init, @@ -449,7 +477,7 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer}, rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) -> case maps:take(StreamID, Streams0) of - {{_, StreamState}, Streams} -> + {#stream{state=StreamState}, Streams} -> terminate_stream_handler(State, StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), State#state{streams=Streams, children=Children}; @@ -494,10 +522,10 @@ down(State=#state{opts=Opts, children=Children0}, Pid, Msg) -> info(State=#state{opts=Opts, streams=Streams}, StreamID, Msg) -> case Streams of - #{StreamID := {IsRunning, StreamState0}} -> + #{StreamID := Stream=#stream{state=StreamState0}} -> try cowboy_stream:info(StreamID, Msg, StreamState0) of {Commands, StreamState} -> - commands(State#state{streams=Streams#{StreamID => {IsRunning, StreamState}}}, + commands(State#state{streams=Streams#{StreamID => Stream#stream{state=StreamState}}}, StreamID, Commands) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(info, @@ -586,15 +614,13 @@ commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Ma State0 end, commands(State, StreamID, Tail); -commands(State=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, - StreamID, [{flow, Size}|Tail]) -> - Transport:send(Socket, [ - cow_http2:window_update(Size), - cow_http2:window_update(StreamID, Size) - ]), - HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0), - HTTP2Machine = cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1), - commands(State#state{http2_machine=HTTP2Machine}, StreamID, Tail); +%% Read the request body. +commands(State0=#state{flow=Flow, streams=Streams}, StreamID, [{flow, Size}|Tail]) -> + #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams, + State = update_window(State0#state{flow=Flow + Size, + streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}}, + StreamID), + commands(State, StreamID, Tail); %% Supervise a child process. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) -> commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)}, @@ -628,6 +654,25 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) -> cowboy:log(Log, Opts), commands(State, StreamID, Tail). +%% Tentatively update the window after the flow was updated. + +update_window(State=#state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) -> + #{StreamID := #stream{flow=StreamFlow}} = Streams, + {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of + ok -> {<<>>, HTTP2Machine0}; + {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1} + end, + {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of + ok -> {<<>>, HTTP2Machine2}; + {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} + end, + case {Data1, Data2} of + {<<>>, <<>>} -> ok; + _ -> Transport:send(Socket, [Data1, Data2]) + end, + State#state{http2_machine=HTTP2Machine}. + %% Send the response, trailers or data. send_response(State0, StreamID, StatusCode, Headers, Body) -> @@ -741,7 +786,7 @@ goaway(State, {goaway, _, Reason, _}) -> %% Cancel client-initiated streams that are above LastStreamID. goaway_streams(_, [], _, _, Acc) -> Acc; -goaway_streams(State, [{StreamID, {_, StreamState}}|Tail], LastStreamID, Reason, Acc) +goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamID, Reason, Acc) when StreamID > LastStreamID, (StreamID rem 2) =:= 0 -> terminate_stream_handler(State, StreamID, Reason, StreamState), goaway_streams(State, Tail, LastStreamID, Reason, Acc); @@ -780,7 +825,7 @@ terminate_reason({internal_error, _, _}) -> internal_error. terminate_all_streams(_, [], _) -> ok; -terminate_all_streams(State, [{StreamID, {_, StreamState}}|Tail], Reason) -> +terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) -> terminate_stream_handler(State, StreamID, Reason, StreamState), terminate_all_streams(State, Tail, Reason). @@ -820,13 +865,13 @@ stop_stream(State=#state{http2_machine=HTTP2Machine}, StreamID) -> end. stopping(State=#state{streams=Streams}, StreamID) -> - #{StreamID := {_, StreamState}} = Streams, - State#state{streams=Streams#{StreamID => {stopping, StreamState}}}. + #{StreamID := Stream} = Streams, + State#state{streams=Streams#{StreamID => Stream#stream{status=stopping}}}. %% If we finished sending data and the stream is stopping, terminate it. maybe_terminate_stream(State=#state{streams=Streams}, StreamID, fin) -> case Streams of - #{StreamID := {stopping, _}} -> + #{StreamID := #stream{status=stopping}} -> terminate_stream(State, StreamID); _ -> State @@ -849,12 +894,15 @@ terminate_stream(State0=#state{socket=Socket, transport=Transport, end, terminate_stream(State, StreamID, normal). -terminate_stream(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) -> +%% We remove the stream flow from the connection flow. Any further +%% data received for this stream is therefore fully contained within +%% the extra window we allocated for this stream. +terminate_stream(State=#state{flow=Flow, streams=Streams0, children=Children0}, StreamID, Reason) -> case maps:take(StreamID, Streams0) of - {{_, StreamState}, Streams} -> + {#stream{flow=StreamFlow, state=StreamState}, Streams} -> terminate_stream_handler(State, StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), - State#state{streams=Streams, children=Children}; + State#state{flow=Flow - StreamFlow, streams=Streams, children=Children}; error -> State end. diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl index fe0c4ef..4f27dfa 100644 --- a/test/rfc7540_SUITE.erl +++ b/test/rfc7540_SUITE.erl @@ -3117,56 +3117,6 @@ data_reject_overflow_stream(Config0) -> cowboy:stop_listener(?FUNCTION_NAME) end. -lingering_data_counts_toward_connection_window(Config0) -> - doc("DATA frames received after sending RST_STREAM must be counted " - "toward the connection flow-control window. (RFC7540 5.1)"), - Config = cowboy_test:init_http(?FUNCTION_NAME, #{ - env => #{dispatch => cowboy_router:compile(init_routes(Config0))}, - initial_connection_window_size => 100000 - }, Config0), - try - %% We need to do the handshake manually because a WINDOW_UPDATE - %% frame will be sent to update the connection window. - {ok, Socket} = gen_tcp:connect("localhost", config(port, Config), [binary, {active, false}]), - %% Send a valid preface. - ok = gen_tcp:send(Socket, ["PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n", cow_http2:settings(#{})]), - %% Receive the server preface. - {ok, << Len1:24 >>} = gen_tcp:recv(Socket, 3, 1000), - {ok, << 4:8, 0:40, _:Len1/binary >>} = gen_tcp:recv(Socket, 6 + Len1, 1000), - %% Send the SETTINGS ack. - ok = gen_tcp:send(Socket, cow_http2:settings_ack()), - %% Receive the WINDOW_UPDATE for the connection. - {ok, << 4:24, 8:8, 0:40, _:32 >>} = gen_tcp:recv(Socket, 13, 1000), - %% Receive the SETTINGS ack. - {ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000), - Headers = [ - {<<":method">>, <<"POST">>}, - {<<":scheme">>, <<"http">>}, - {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. - {<<":path">>, <<"/loop_handler_abort">>} - ], - {HeadersBlock, _} = cow_hpack:encode(Headers), - ok = gen_tcp:send(Socket, [ - cow_http2:headers(1, nofin, HeadersBlock), - cow_http2:data(1, nofin, <<0:1000/unit:8>>) - ]), - % Make sure server send RST_STREAM. - timer:sleep(100), - ok = gen_tcp:send(Socket, [ - cow_http2:data(1, nofin, <<0:0/unit:8>>), - cow_http2:data(1, fin, <<0:1000/unit:8>>) - ]), - {ok, << SkipLen:24, 1:8, _:8, 1:32 >>} = gen_tcp:recv(Socket, 9, 1000), - % Skip the header. - {ok, _} = gen_tcp:recv(Socket, SkipLen, 1000), - % Skip RST_STREAM. - {ok, << 4:24, 3:8, 1:40, _:32 >>} = gen_tcp:recv(Socket, 13, 1000), - % Received a WINDOW_UPDATE frame after we got RST_STREAM. - {ok, << 4:24, 8:8, 0:40, 1000:32 >>} = gen_tcp:recv(Socket, 13, 1000) - after - cowboy:stop_listener(?FUNCTION_NAME) - end. - %% (RFC7540 6.9.1) % Frames with zero length with the END_STREAM flag set (that % is, an empty DATA frame) MAY be sent if there is no available space diff --git a/test/rfc8441_SUITE.erl b/test/rfc8441_SUITE.erl index 3105ddb..245658f 100644 --- a/test/rfc8441_SUITE.erl +++ b/test/rfc8441_SUITE.erl @@ -389,15 +389,10 @@ accept_handshake_when_enabled(Config) -> {RespHeaders, _} = cow_hpack:decode(RespHeadersBlock), {_, <<"200">>} = lists:keyfind(<<":status">>, 1, RespHeaders), %% Masked text hello echoed back clear by the server. - %% - %% We receive WINDOW_UPDATE frames before the actual data - %% due to flow control updates every time a data frame is received. Mask = 16#37fa213d, MaskedHello = ws_SUITE:do_mask(<<"Hello">>, Mask, <<>>), ok = gen_tcp:send(Socket, cow_http2:data(1, nofin, <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedHello/binary>>)), - {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000), - {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000), {ok, <<Len2:24, _:8, _:8, _:32>>} = gen_tcp:recv(Socket, 9, 1000), {ok, <<1:1, 0:3, 1:4, 0:1, 5:7, "Hello">>} = gen_tcp:recv(Socket, Len2, 1000), ok. |