diff options
author | Loïc Hoguin <[email protected]> | 2019-09-02 14:48:28 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2019-09-05 14:07:38 +0200 |
commit | 48f417ac8f3d00039e2dc674e312d982336dcfea (patch) | |
tree | 232092a3fcbe0364a92c10f074faccdaf389c6da /src | |
parent | aedf6379cc769ce2be1e040de3fe631e6539f442 (diff) | |
download | cowboy-48f417ac8f3d00039e2dc674e312d982336dcfea.tar.gz cowboy-48f417ac8f3d00039e2dc674e312d982336dcfea.tar.bz2 cowboy-48f417ac8f3d00039e2dc674e312d982336dcfea.zip |
Fix and optimize sending of WINDOW_UPDATE frames
For long-running connections it was possible for the connection
window to become larger than allowed by the protocol because the
window increases claimed by stream handlers were never reclaimed
even if no data was consumed.
The new code applies heuristics to fix this and reduce the number
of WINDOW_UPDATE frames that are sent. It includes six new options
to control that behavior: margin, max and threshold for both the
connection and stream windows. The margin is some extra space
added on top of the requested read size. The max is the maximum
window size at any given time. The threshold is a minimum window
size that must be reached before we even consider sending more
WINDOW_UPDATE frames. We also avoid sending WINDOW_UPDATE frames
when there is already enough space in the window, or when the
read size is 0.
Cowlib is set to master until a new tag is done.
Diffstat (limited to 'src')
-rw-r--r-- | src/cowboy_http2.erl | 118 |
1 files changed, 83 insertions, 35 deletions
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 7c9f070..7039ab2 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -30,6 +30,8 @@ compress_buffering => boolean(), compress_threshold => non_neg_integer(), connection_type => worker | supervisor, + connection_window_margin_size => 0..16#7fffffff, + connection_window_update_threshold => 0..16#7fffffff, enable_connect_protocol => boolean(), env => cowboy_middleware:env(), idle_timeout => timeout(), @@ -38,10 +40,12 @@ initial_stream_window_size => 0..16#7fffffff, logger => module(), max_concurrent_streams => non_neg_integer() | infinity, + max_connection_window_size => 0..16#7fffffff, max_decode_table_size => non_neg_integer(), max_encode_table_size => non_neg_integer(), max_frame_size_received => 16384..16777215, max_frame_size_sent => 16384..16777215 | infinity, + max_stream_window_size => 0..16#7fffffff, metrics_callback => cowboy_metrics_h:metrics_callback(), middlewares => [module()], preface_timeout => timeout(), @@ -50,6 +54,8 @@ settings_timeout => timeout(), shutdown_timeout => timeout(), stream_handlers => [module()], + stream_window_margin_size => 0..16#7fffffff, + stream_window_update_threshold => 0..16#7fffffff, tracer_callback => cowboy_tracer_h:tracer_callback(), tracer_match_specs => cowboy_tracer_h:tracer_match_specs(), %% Open ended because configured stream handlers might add options. @@ -57,6 +63,17 @@ }. -export_type([opts/0]). +-record(stream, { + %% Whether the stream is currently stopping. + status = running :: running | stopping, + + %% Flow requested for this stream. + flow = 0 :: non_neg_integer(), + + %% Stream state. + state :: {module, any()} +}). + -record(state, { parent = undefined :: pid(), ref :: ranch:ref(), @@ -81,9 +98,12 @@ http2_status :: sequence | settings | upgrade | connected | closing, http2_machine :: cow_http2_machine:http2_machine(), + %% Flow requested for all streams. + flow = 0 :: non_neg_integer(), + %% Currently active HTTP/2 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. - streams = #{} :: #{cow_http2:streamid() => {running | stopping, {module, any()}}}, + streams = #{} :: #{cow_http2:streamid() => #stream{}}, %% Streams can spawn zero or more children which are then managed %% by this module if operating as a supervisor. @@ -259,8 +279,8 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame); {ok, {data, StreamID, IsFin, Data}, HTTP2Machine} -> data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data); - {ok, {lingering_data, _StreamID, DataLen}, HTTP2Machine} -> - lingering_data_frame(State#state{http2_machine=HTTP2Machine}, DataLen); + {ok, {lingering_data, StreamID, DataLen}, HTTP2Machine} -> + lingering_data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, DataLen); {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} -> headers_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Headers, PseudoHeaders, BodyLen); @@ -292,30 +312,38 @@ maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) -> end, State. -data_frame(State=#state{opts=Opts, streams=Streams}, StreamID, IsFin, Data) -> +data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) -> case Streams of - #{StreamID := {running, StreamState0}} -> + #{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} -> try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of {Commands, StreamState} -> - commands(State#state{streams=Streams#{StreamID => {running, StreamState}}}, - StreamID, Commands) + %% Remove the amount of data received from the flow. + %% We may receive more data than we requested. We ensure + %% that the flow value doesn't go lower than 0. + Size = byte_size(Data), + State = update_window(State0#state{flow=max(0, Flow - Size), + streams=Streams#{StreamID => Stream#stream{ + flow=max(0, StreamFlow - Size), state=StreamState}}}, + StreamID), + commands(State, StreamID, Commands) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(data, [StreamID, IsFin, Data, StreamState0], Class, Exception, erlang:get_stacktrace()), Opts), - reset_stream(State, StreamID, {internal_error, {Class, Exception}, + reset_stream(State0, StreamID, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:data/4.'}) end; %% We ignore DATA frames for streams that are stopping. #{} -> - State + State0 end. -lingering_data_frame(State=#state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, DataLen) -> - Transport:send(Socket, cow_http2:window_update(DataLen)), - HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0), - State#state{http2_machine=HTTP2Machine1}. +lingering_data_frame(State, _StreamID, _DataLen) -> + %% We do nothing when receiving a lingering DATA frame. + %% We already removed the stream flow from the connection + %% flow and are therefore already accounting for the window + %% being reduced by these frames. + State. headers_frame(State, StreamID, IsFin, Headers, PseudoHeaders=#{method := <<"CONNECT">>}, _) @@ -410,7 +438,7 @@ headers_frame(State=#state{opts=Opts, streams=Streams}, StreamID, Req) -> try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> commands(State#state{ - streams=Streams#{StreamID => {running, StreamState}}}, + streams=Streams#{StreamID => #stream{state=StreamState}}}, StreamID, Commands) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(init, @@ -449,7 +477,7 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer}, rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) -> case maps:take(StreamID, Streams0) of - {{_, StreamState}, Streams} -> + {#stream{state=StreamState}, Streams} -> terminate_stream_handler(State, StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), State#state{streams=Streams, children=Children}; @@ -494,10 +522,10 @@ down(State=#state{opts=Opts, children=Children0}, Pid, Msg) -> info(State=#state{opts=Opts, streams=Streams}, StreamID, Msg) -> case Streams of - #{StreamID := {IsRunning, StreamState0}} -> + #{StreamID := Stream=#stream{state=StreamState0}} -> try cowboy_stream:info(StreamID, Msg, StreamState0) of {Commands, StreamState} -> - commands(State#state{streams=Streams#{StreamID => {IsRunning, StreamState}}}, + commands(State#state{streams=Streams#{StreamID => Stream#stream{state=StreamState}}}, StreamID, Commands) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(info, @@ -586,15 +614,13 @@ commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Ma State0 end, commands(State, StreamID, Tail); -commands(State=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, - StreamID, [{flow, Size}|Tail]) -> - Transport:send(Socket, [ - cow_http2:window_update(Size), - cow_http2:window_update(StreamID, Size) - ]), - HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0), - HTTP2Machine = cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1), - commands(State#state{http2_machine=HTTP2Machine}, StreamID, Tail); +%% Read the request body. +commands(State0=#state{flow=Flow, streams=Streams}, StreamID, [{flow, Size}|Tail]) -> + #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams, + State = update_window(State0#state{flow=Flow + Size, + streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}}, + StreamID), + commands(State, StreamID, Tail); %% Supervise a child process. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) -> commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)}, @@ -628,6 +654,25 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) -> cowboy:log(Log, Opts), commands(State, StreamID, Tail). +%% Tentatively update the window after the flow was updated. + +update_window(State=#state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) -> + #{StreamID := #stream{flow=StreamFlow}} = Streams, + {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of + ok -> {<<>>, HTTP2Machine0}; + {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1} + end, + {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of + ok -> {<<>>, HTTP2Machine2}; + {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} + end, + case {Data1, Data2} of + {<<>>, <<>>} -> ok; + _ -> Transport:send(Socket, [Data1, Data2]) + end, + State#state{http2_machine=HTTP2Machine}. + %% Send the response, trailers or data. send_response(State0, StreamID, StatusCode, Headers, Body) -> @@ -741,7 +786,7 @@ goaway(State, {goaway, _, Reason, _}) -> %% Cancel client-initiated streams that are above LastStreamID. goaway_streams(_, [], _, _, Acc) -> Acc; -goaway_streams(State, [{StreamID, {_, StreamState}}|Tail], LastStreamID, Reason, Acc) +goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamID, Reason, Acc) when StreamID > LastStreamID, (StreamID rem 2) =:= 0 -> terminate_stream_handler(State, StreamID, Reason, StreamState), goaway_streams(State, Tail, LastStreamID, Reason, Acc); @@ -780,7 +825,7 @@ terminate_reason({internal_error, _, _}) -> internal_error. terminate_all_streams(_, [], _) -> ok; -terminate_all_streams(State, [{StreamID, {_, StreamState}}|Tail], Reason) -> +terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) -> terminate_stream_handler(State, StreamID, Reason, StreamState), terminate_all_streams(State, Tail, Reason). @@ -820,13 +865,13 @@ stop_stream(State=#state{http2_machine=HTTP2Machine}, StreamID) -> end. stopping(State=#state{streams=Streams}, StreamID) -> - #{StreamID := {_, StreamState}} = Streams, - State#state{streams=Streams#{StreamID => {stopping, StreamState}}}. + #{StreamID := Stream} = Streams, + State#state{streams=Streams#{StreamID => Stream#stream{status=stopping}}}. %% If we finished sending data and the stream is stopping, terminate it. maybe_terminate_stream(State=#state{streams=Streams}, StreamID, fin) -> case Streams of - #{StreamID := {stopping, _}} -> + #{StreamID := #stream{status=stopping}} -> terminate_stream(State, StreamID); _ -> State @@ -849,12 +894,15 @@ terminate_stream(State0=#state{socket=Socket, transport=Transport, end, terminate_stream(State, StreamID, normal). -terminate_stream(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) -> +%% We remove the stream flow from the connection flow. Any further +%% data received for this stream is therefore fully contained within +%% the extra window we allocated for this stream. +terminate_stream(State=#state{flow=Flow, streams=Streams0, children=Children0}, StreamID, Reason) -> case maps:take(StreamID, Streams0) of - {{_, StreamState}, Streams} -> + {#stream{flow=StreamFlow, state=StreamState}, Streams} -> terminate_stream_handler(State, StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), - State#state{streams=Streams, children=Children}; + State#state{flow=Flow - StreamFlow, streams=Streams, children=Children}; error -> State end. |