diff options
Diffstat (limited to 'src/cowboy_http2.erl')
-rw-r--r-- | src/cowboy_http2.erl | 118 |
1 files changed, 83 insertions, 35 deletions
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 7c9f070..7039ab2 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -30,6 +30,8 @@ compress_buffering => boolean(), compress_threshold => non_neg_integer(), connection_type => worker | supervisor, + connection_window_margin_size => 0..16#7fffffff, + connection_window_update_threshold => 0..16#7fffffff, enable_connect_protocol => boolean(), env => cowboy_middleware:env(), idle_timeout => timeout(), @@ -38,10 +40,12 @@ initial_stream_window_size => 0..16#7fffffff, logger => module(), max_concurrent_streams => non_neg_integer() | infinity, + max_connection_window_size => 0..16#7fffffff, max_decode_table_size => non_neg_integer(), max_encode_table_size => non_neg_integer(), max_frame_size_received => 16384..16777215, max_frame_size_sent => 16384..16777215 | infinity, + max_stream_window_size => 0..16#7fffffff, metrics_callback => cowboy_metrics_h:metrics_callback(), middlewares => [module()], preface_timeout => timeout(), @@ -50,6 +54,8 @@ settings_timeout => timeout(), shutdown_timeout => timeout(), stream_handlers => [module()], + stream_window_margin_size => 0..16#7fffffff, + stream_window_update_threshold => 0..16#7fffffff, tracer_callback => cowboy_tracer_h:tracer_callback(), tracer_match_specs => cowboy_tracer_h:tracer_match_specs(), %% Open ended because configured stream handlers might add options. @@ -57,6 +63,17 @@ }. -export_type([opts/0]). +-record(stream, { + %% Whether the stream is currently stopping. + status = running :: running | stopping, + + %% Flow requested for this stream. + flow = 0 :: non_neg_integer(), + + %% Stream state. + state :: {module, any()} +}). + -record(state, { parent = undefined :: pid(), ref :: ranch:ref(), @@ -81,9 +98,12 @@ http2_status :: sequence | settings | upgrade | connected | closing, http2_machine :: cow_http2_machine:http2_machine(), + %% Flow requested for all streams. + flow = 0 :: non_neg_integer(), + %% Currently active HTTP/2 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. - streams = #{} :: #{cow_http2:streamid() => {running | stopping, {module, any()}}}, + streams = #{} :: #{cow_http2:streamid() => #stream{}}, %% Streams can spawn zero or more children which are then managed %% by this module if operating as a supervisor. @@ -259,8 +279,8 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame); {ok, {data, StreamID, IsFin, Data}, HTTP2Machine} -> data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data); - {ok, {lingering_data, _StreamID, DataLen}, HTTP2Machine} -> - lingering_data_frame(State#state{http2_machine=HTTP2Machine}, DataLen); + {ok, {lingering_data, StreamID, DataLen}, HTTP2Machine} -> + lingering_data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, DataLen); {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} -> headers_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Headers, PseudoHeaders, BodyLen); @@ -292,30 +312,38 @@ maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) -> end, State. -data_frame(State=#state{opts=Opts, streams=Streams}, StreamID, IsFin, Data) -> +data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) -> case Streams of - #{StreamID := {running, StreamState0}} -> + #{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} -> try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of {Commands, StreamState} -> - commands(State#state{streams=Streams#{StreamID => {running, StreamState}}}, - StreamID, Commands) + %% Remove the amount of data received from the flow. + %% We may receive more data than we requested. We ensure + %% that the flow value doesn't go lower than 0. + Size = byte_size(Data), + State = update_window(State0#state{flow=max(0, Flow - Size), + streams=Streams#{StreamID => Stream#stream{ + flow=max(0, StreamFlow - Size), state=StreamState}}}, + StreamID), + commands(State, StreamID, Commands) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(data, [StreamID, IsFin, Data, StreamState0], Class, Exception, erlang:get_stacktrace()), Opts), - reset_stream(State, StreamID, {internal_error, {Class, Exception}, + reset_stream(State0, StreamID, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:data/4.'}) end; %% We ignore DATA frames for streams that are stopping. #{} -> - State + State0 end. -lingering_data_frame(State=#state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, DataLen) -> - Transport:send(Socket, cow_http2:window_update(DataLen)), - HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0), - State#state{http2_machine=HTTP2Machine1}. +lingering_data_frame(State, _StreamID, _DataLen) -> + %% We do nothing when receiving a lingering DATA frame. + %% We already removed the stream flow from the connection + %% flow and are therefore already accounting for the window + %% being reduced by these frames. + State. headers_frame(State, StreamID, IsFin, Headers, PseudoHeaders=#{method := <<"CONNECT">>}, _) @@ -410,7 +438,7 @@ headers_frame(State=#state{opts=Opts, streams=Streams}, StreamID, Req) -> try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> commands(State#state{ - streams=Streams#{StreamID => {running, StreamState}}}, + streams=Streams#{StreamID => #stream{state=StreamState}}}, StreamID, Commands) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(init, @@ -449,7 +477,7 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer}, rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) -> case maps:take(StreamID, Streams0) of - {{_, StreamState}, Streams} -> + {#stream{state=StreamState}, Streams} -> terminate_stream_handler(State, StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), State#state{streams=Streams, children=Children}; @@ -494,10 +522,10 @@ down(State=#state{opts=Opts, children=Children0}, Pid, Msg) -> info(State=#state{opts=Opts, streams=Streams}, StreamID, Msg) -> case Streams of - #{StreamID := {IsRunning, StreamState0}} -> + #{StreamID := Stream=#stream{state=StreamState0}} -> try cowboy_stream:info(StreamID, Msg, StreamState0) of {Commands, StreamState} -> - commands(State#state{streams=Streams#{StreamID => {IsRunning, StreamState}}}, + commands(State#state{streams=Streams#{StreamID => Stream#stream{state=StreamState}}}, StreamID, Commands) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(info, @@ -586,15 +614,13 @@ commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Ma State0 end, commands(State, StreamID, Tail); -commands(State=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, - StreamID, [{flow, Size}|Tail]) -> - Transport:send(Socket, [ - cow_http2:window_update(Size), - cow_http2:window_update(StreamID, Size) - ]), - HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0), - HTTP2Machine = cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1), - commands(State#state{http2_machine=HTTP2Machine}, StreamID, Tail); +%% Read the request body. +commands(State0=#state{flow=Flow, streams=Streams}, StreamID, [{flow, Size}|Tail]) -> + #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams, + State = update_window(State0#state{flow=Flow + Size, + streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}}, + StreamID), + commands(State, StreamID, Tail); %% Supervise a child process. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) -> commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)}, @@ -628,6 +654,25 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) -> cowboy:log(Log, Opts), commands(State, StreamID, Tail). +%% Tentatively update the window after the flow was updated. + +update_window(State=#state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) -> + #{StreamID := #stream{flow=StreamFlow}} = Streams, + {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of + ok -> {<<>>, HTTP2Machine0}; + {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1} + end, + {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of + ok -> {<<>>, HTTP2Machine2}; + {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} + end, + case {Data1, Data2} of + {<<>>, <<>>} -> ok; + _ -> Transport:send(Socket, [Data1, Data2]) + end, + State#state{http2_machine=HTTP2Machine}. + %% Send the response, trailers or data. send_response(State0, StreamID, StatusCode, Headers, Body) -> @@ -741,7 +786,7 @@ goaway(State, {goaway, _, Reason, _}) -> %% Cancel client-initiated streams that are above LastStreamID. goaway_streams(_, [], _, _, Acc) -> Acc; -goaway_streams(State, [{StreamID, {_, StreamState}}|Tail], LastStreamID, Reason, Acc) +goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamID, Reason, Acc) when StreamID > LastStreamID, (StreamID rem 2) =:= 0 -> terminate_stream_handler(State, StreamID, Reason, StreamState), goaway_streams(State, Tail, LastStreamID, Reason, Acc); @@ -780,7 +825,7 @@ terminate_reason({internal_error, _, _}) -> internal_error. terminate_all_streams(_, [], _) -> ok; -terminate_all_streams(State, [{StreamID, {_, StreamState}}|Tail], Reason) -> +terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) -> terminate_stream_handler(State, StreamID, Reason, StreamState), terminate_all_streams(State, Tail, Reason). @@ -820,13 +865,13 @@ stop_stream(State=#state{http2_machine=HTTP2Machine}, StreamID) -> end. stopping(State=#state{streams=Streams}, StreamID) -> - #{StreamID := {_, StreamState}} = Streams, - State#state{streams=Streams#{StreamID => {stopping, StreamState}}}. + #{StreamID := Stream} = Streams, + State#state{streams=Streams#{StreamID => Stream#stream{status=stopping}}}. %% If we finished sending data and the stream is stopping, terminate it. maybe_terminate_stream(State=#state{streams=Streams}, StreamID, fin) -> case Streams of - #{StreamID := {stopping, _}} -> + #{StreamID := #stream{status=stopping}} -> terminate_stream(State, StreamID); _ -> State @@ -849,12 +894,15 @@ terminate_stream(State0=#state{socket=Socket, transport=Transport, end, terminate_stream(State, StreamID, normal). -terminate_stream(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) -> +%% We remove the stream flow from the connection flow. Any further +%% data received for this stream is therefore fully contained within +%% the extra window we allocated for this stream. +terminate_stream(State=#state{flow=Flow, streams=Streams0, children=Children0}, StreamID, Reason) -> case maps:take(StreamID, Streams0) of - {{_, StreamState}, Streams} -> + {#stream{flow=StreamFlow, state=StreamState}, Streams} -> terminate_stream_handler(State, StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), - State#state{streams=Streams, children=Children}; + State#state{flow=Flow - StreamFlow, streams=Streams, children=Children}; error -> State end. |