From 48f417ac8f3d00039e2dc674e312d982336dcfea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 2 Sep 2019 14:48:28 +0200 Subject: Fix and optimize sending of WINDOW_UPDATE frames For long-running connections it was possible for the connection window to become larger than allowed by the protocol because the window increases claimed by stream handlers were never reclaimed even if no data was consumed. The new code applies heuristics to fix this and reduce the number of WINDOW_UPDATE frames that are sent. It includes six new options to control that behavior: margin, max and threshold for both the connection and stream windows. The margin is some extra space added on top of the requested read size. The max is the maximum window size at any given time. The threshold is a minimum window size that must be reached before we even consider sending more WINDOW_UPDATE frames. We also avoid sending WINDOW_UPDATE frames when there is already enough space in the window, or when the read size is 0. Cowlib is set to master until a new tag is done. --- src/cowboy_http2.erl | 118 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 83 insertions(+), 35 deletions(-) (limited to 'src') diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 7c9f070..7039ab2 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -30,6 +30,8 @@ compress_buffering => boolean(), compress_threshold => non_neg_integer(), connection_type => worker | supervisor, + connection_window_margin_size => 0..16#7fffffff, + connection_window_update_threshold => 0..16#7fffffff, enable_connect_protocol => boolean(), env => cowboy_middleware:env(), idle_timeout => timeout(), @@ -38,10 +40,12 @@ initial_stream_window_size => 0..16#7fffffff, logger => module(), max_concurrent_streams => non_neg_integer() | infinity, + max_connection_window_size => 0..16#7fffffff, max_decode_table_size => non_neg_integer(), max_encode_table_size => non_neg_integer(), max_frame_size_received => 16384..16777215, max_frame_size_sent => 16384..16777215 | infinity, + max_stream_window_size => 0..16#7fffffff, metrics_callback => cowboy_metrics_h:metrics_callback(), middlewares => [module()], preface_timeout => timeout(), @@ -50,6 +54,8 @@ settings_timeout => timeout(), shutdown_timeout => timeout(), stream_handlers => [module()], + stream_window_margin_size => 0..16#7fffffff, + stream_window_update_threshold => 0..16#7fffffff, tracer_callback => cowboy_tracer_h:tracer_callback(), tracer_match_specs => cowboy_tracer_h:tracer_match_specs(), %% Open ended because configured stream handlers might add options. @@ -57,6 +63,17 @@ }. -export_type([opts/0]). +-record(stream, { + %% Whether the stream is currently stopping. + status = running :: running | stopping, + + %% Flow requested for this stream. + flow = 0 :: non_neg_integer(), + + %% Stream state. + state :: {module, any()} +}). + -record(state, { parent = undefined :: pid(), ref :: ranch:ref(), @@ -81,9 +98,12 @@ http2_status :: sequence | settings | upgrade | connected | closing, http2_machine :: cow_http2_machine:http2_machine(), + %% Flow requested for all streams. + flow = 0 :: non_neg_integer(), + %% Currently active HTTP/2 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. - streams = #{} :: #{cow_http2:streamid() => {running | stopping, {module, any()}}}, + streams = #{} :: #{cow_http2:streamid() => #stream{}}, %% Streams can spawn zero or more children which are then managed %% by this module if operating as a supervisor. @@ -259,8 +279,8 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame); {ok, {data, StreamID, IsFin, Data}, HTTP2Machine} -> data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data); - {ok, {lingering_data, _StreamID, DataLen}, HTTP2Machine} -> - lingering_data_frame(State#state{http2_machine=HTTP2Machine}, DataLen); + {ok, {lingering_data, StreamID, DataLen}, HTTP2Machine} -> + lingering_data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, DataLen); {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} -> headers_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Headers, PseudoHeaders, BodyLen); @@ -292,30 +312,38 @@ maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) -> end, State. -data_frame(State=#state{opts=Opts, streams=Streams}, StreamID, IsFin, Data) -> +data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) -> case Streams of - #{StreamID := {running, StreamState0}} -> + #{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} -> try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of {Commands, StreamState} -> - commands(State#state{streams=Streams#{StreamID => {running, StreamState}}}, - StreamID, Commands) + %% Remove the amount of data received from the flow. + %% We may receive more data than we requested. We ensure + %% that the flow value doesn't go lower than 0. + Size = byte_size(Data), + State = update_window(State0#state{flow=max(0, Flow - Size), + streams=Streams#{StreamID => Stream#stream{ + flow=max(0, StreamFlow - Size), state=StreamState}}}, + StreamID), + commands(State, StreamID, Commands) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(data, [StreamID, IsFin, Data, StreamState0], Class, Exception, erlang:get_stacktrace()), Opts), - reset_stream(State, StreamID, {internal_error, {Class, Exception}, + reset_stream(State0, StreamID, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:data/4.'}) end; %% We ignore DATA frames for streams that are stopping. #{} -> - State + State0 end. -lingering_data_frame(State=#state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, DataLen) -> - Transport:send(Socket, cow_http2:window_update(DataLen)), - HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0), - State#state{http2_machine=HTTP2Machine1}. +lingering_data_frame(State, _StreamID, _DataLen) -> + %% We do nothing when receiving a lingering DATA frame. + %% We already removed the stream flow from the connection + %% flow and are therefore already accounting for the window + %% being reduced by these frames. + State. headers_frame(State, StreamID, IsFin, Headers, PseudoHeaders=#{method := <<"CONNECT">>}, _) @@ -410,7 +438,7 @@ headers_frame(State=#state{opts=Opts, streams=Streams}, StreamID, Req) -> try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> commands(State#state{ - streams=Streams#{StreamID => {running, StreamState}}}, + streams=Streams#{StreamID => #stream{state=StreamState}}}, StreamID, Commands) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(init, @@ -449,7 +477,7 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer}, rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) -> case maps:take(StreamID, Streams0) of - {{_, StreamState}, Streams} -> + {#stream{state=StreamState}, Streams} -> terminate_stream_handler(State, StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), State#state{streams=Streams, children=Children}; @@ -494,10 +522,10 @@ down(State=#state{opts=Opts, children=Children0}, Pid, Msg) -> info(State=#state{opts=Opts, streams=Streams}, StreamID, Msg) -> case Streams of - #{StreamID := {IsRunning, StreamState0}} -> + #{StreamID := Stream=#stream{state=StreamState0}} -> try cowboy_stream:info(StreamID, Msg, StreamState0) of {Commands, StreamState} -> - commands(State#state{streams=Streams#{StreamID => {IsRunning, StreamState}}}, + commands(State#state{streams=Streams#{StreamID => Stream#stream{state=StreamState}}}, StreamID, Commands) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(info, @@ -586,15 +614,13 @@ commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Ma State0 end, commands(State, StreamID, Tail); -commands(State=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, - StreamID, [{flow, Size}|Tail]) -> - Transport:send(Socket, [ - cow_http2:window_update(Size), - cow_http2:window_update(StreamID, Size) - ]), - HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0), - HTTP2Machine = cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1), - commands(State#state{http2_machine=HTTP2Machine}, StreamID, Tail); +%% Read the request body. +commands(State0=#state{flow=Flow, streams=Streams}, StreamID, [{flow, Size}|Tail]) -> + #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams, + State = update_window(State0#state{flow=Flow + Size, + streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}}, + StreamID), + commands(State, StreamID, Tail); %% Supervise a child process. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) -> commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)}, @@ -628,6 +654,25 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) -> cowboy:log(Log, Opts), commands(State, StreamID, Tail). +%% Tentatively update the window after the flow was updated. + +update_window(State=#state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) -> + #{StreamID := #stream{flow=StreamFlow}} = Streams, + {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of + ok -> {<<>>, HTTP2Machine0}; + {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1} + end, + {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of + ok -> {<<>>, HTTP2Machine2}; + {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} + end, + case {Data1, Data2} of + {<<>>, <<>>} -> ok; + _ -> Transport:send(Socket, [Data1, Data2]) + end, + State#state{http2_machine=HTTP2Machine}. + %% Send the response, trailers or data. send_response(State0, StreamID, StatusCode, Headers, Body) -> @@ -741,7 +786,7 @@ goaway(State, {goaway, _, Reason, _}) -> %% Cancel client-initiated streams that are above LastStreamID. goaway_streams(_, [], _, _, Acc) -> Acc; -goaway_streams(State, [{StreamID, {_, StreamState}}|Tail], LastStreamID, Reason, Acc) +goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamID, Reason, Acc) when StreamID > LastStreamID, (StreamID rem 2) =:= 0 -> terminate_stream_handler(State, StreamID, Reason, StreamState), goaway_streams(State, Tail, LastStreamID, Reason, Acc); @@ -780,7 +825,7 @@ terminate_reason({internal_error, _, _}) -> internal_error. terminate_all_streams(_, [], _) -> ok; -terminate_all_streams(State, [{StreamID, {_, StreamState}}|Tail], Reason) -> +terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) -> terminate_stream_handler(State, StreamID, Reason, StreamState), terminate_all_streams(State, Tail, Reason). @@ -820,13 +865,13 @@ stop_stream(State=#state{http2_machine=HTTP2Machine}, StreamID) -> end. stopping(State=#state{streams=Streams}, StreamID) -> - #{StreamID := {_, StreamState}} = Streams, - State#state{streams=Streams#{StreamID => {stopping, StreamState}}}. + #{StreamID := Stream} = Streams, + State#state{streams=Streams#{StreamID => Stream#stream{status=stopping}}}. %% If we finished sending data and the stream is stopping, terminate it. maybe_terminate_stream(State=#state{streams=Streams}, StreamID, fin) -> case Streams of - #{StreamID := {stopping, _}} -> + #{StreamID := #stream{status=stopping}} -> terminate_stream(State, StreamID); _ -> State @@ -849,12 +894,15 @@ terminate_stream(State0=#state{socket=Socket, transport=Transport, end, terminate_stream(State, StreamID, normal). -terminate_stream(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) -> +%% We remove the stream flow from the connection flow. Any further +%% data received for this stream is therefore fully contained within +%% the extra window we allocated for this stream. +terminate_stream(State=#state{flow=Flow, streams=Streams0, children=Children0}, StreamID, Reason) -> case maps:take(StreamID, Streams0) of - {{_, StreamState}, Streams} -> + {#stream{flow=StreamFlow, state=StreamState}, Streams} -> terminate_stream_handler(State, StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), - State#state{streams=Streams, children=Children}; + State#state{flow=Flow - StreamFlow, streams=Streams, children=Children}; error -> State end. -- cgit v1.2.3