aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_http2.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cowboy_http2.erl')
-rw-r--r--src/cowboy_http2.erl118
1 files changed, 83 insertions, 35 deletions
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 7c9f070..7039ab2 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -30,6 +30,8 @@
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
connection_type => worker | supervisor,
+ connection_window_margin_size => 0..16#7fffffff,
+ connection_window_update_threshold => 0..16#7fffffff,
enable_connect_protocol => boolean(),
env => cowboy_middleware:env(),
idle_timeout => timeout(),
@@ -38,10 +40,12 @@
initial_stream_window_size => 0..16#7fffffff,
logger => module(),
max_concurrent_streams => non_neg_integer() | infinity,
+ max_connection_window_size => 0..16#7fffffff,
max_decode_table_size => non_neg_integer(),
max_encode_table_size => non_neg_integer(),
max_frame_size_received => 16384..16777215,
max_frame_size_sent => 16384..16777215 | infinity,
+ max_stream_window_size => 0..16#7fffffff,
metrics_callback => cowboy_metrics_h:metrics_callback(),
middlewares => [module()],
preface_timeout => timeout(),
@@ -50,6 +54,8 @@
settings_timeout => timeout(),
shutdown_timeout => timeout(),
stream_handlers => [module()],
+ stream_window_margin_size => 0..16#7fffffff,
+ stream_window_update_threshold => 0..16#7fffffff,
tracer_callback => cowboy_tracer_h:tracer_callback(),
tracer_match_specs => cowboy_tracer_h:tracer_match_specs(),
%% Open ended because configured stream handlers might add options.
@@ -57,6 +63,17 @@
}.
-export_type([opts/0]).
+-record(stream, {
+ %% Whether the stream is currently stopping.
+ status = running :: running | stopping,
+
+ %% Flow requested for this stream.
+ flow = 0 :: non_neg_integer(),
+
+ %% Stream state.
+ state :: {module, any()}
+}).
+
-record(state, {
parent = undefined :: pid(),
ref :: ranch:ref(),
@@ -81,9 +98,12 @@
http2_status :: sequence | settings | upgrade | connected | closing,
http2_machine :: cow_http2_machine:http2_machine(),
+ %% Flow requested for all streams.
+ flow = 0 :: non_neg_integer(),
+
%% Currently active HTTP/2 streams. Streams may be initiated either
%% by the client or by the server through PUSH_PROMISE frames.
- streams = #{} :: #{cow_http2:streamid() => {running | stopping, {module, any()}}},
+ streams = #{} :: #{cow_http2:streamid() => #stream{}},
%% Streams can spawn zero or more children which are then managed
%% by this module if operating as a supervisor.
@@ -259,8 +279,8 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame);
{ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->
data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data);
- {ok, {lingering_data, _StreamID, DataLen}, HTTP2Machine} ->
- lingering_data_frame(State#state{http2_machine=HTTP2Machine}, DataLen);
+ {ok, {lingering_data, StreamID, DataLen}, HTTP2Machine} ->
+ lingering_data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, DataLen);
{ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} ->
headers_frame(State#state{http2_machine=HTTP2Machine},
StreamID, IsFin, Headers, PseudoHeaders, BodyLen);
@@ -292,30 +312,38 @@ maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) ->
end,
State.
-data_frame(State=#state{opts=Opts, streams=Streams}, StreamID, IsFin, Data) ->
+data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) ->
case Streams of
- #{StreamID := {running, StreamState0}} ->
+ #{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} ->
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
{Commands, StreamState} ->
- commands(State#state{streams=Streams#{StreamID => {running, StreamState}}},
- StreamID, Commands)
+ %% Remove the amount of data received from the flow.
+ %% We may receive more data than we requested. We ensure
+ %% that the flow value doesn't go lower than 0.
+ Size = byte_size(Data),
+ State = update_window(State0#state{flow=max(0, Flow - Size),
+ streams=Streams#{StreamID => Stream#stream{
+ flow=max(0, StreamFlow - Size), state=StreamState}}},
+ StreamID),
+ commands(State, StreamID, Commands)
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(data,
[StreamID, IsFin, Data, StreamState0],
Class, Exception, erlang:get_stacktrace()), Opts),
- reset_stream(State, StreamID, {internal_error, {Class, Exception},
+ reset_stream(State0, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:data/4.'})
end;
%% We ignore DATA frames for streams that are stopping.
#{} ->
- State
+ State0
end.
-lingering_data_frame(State=#state{socket=Socket, transport=Transport,
- http2_machine=HTTP2Machine0}, DataLen) ->
- Transport:send(Socket, cow_http2:window_update(DataLen)),
- HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0),
- State#state{http2_machine=HTTP2Machine1}.
+lingering_data_frame(State, _StreamID, _DataLen) ->
+ %% We do nothing when receiving a lingering DATA frame.
+ %% We already removed the stream flow from the connection
+ %% flow and are therefore already accounting for the window
+ %% being reduced by these frames.
+ State.
headers_frame(State, StreamID, IsFin, Headers,
PseudoHeaders=#{method := <<"CONNECT">>}, _)
@@ -410,7 +438,7 @@ headers_frame(State=#state{opts=Opts, streams=Streams}, StreamID, Req) ->
try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
commands(State#state{
- streams=Streams#{StreamID => {running, StreamState}}},
+ streams=Streams#{StreamID => #stream{state=StreamState}}},
StreamID, Commands)
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(init,
@@ -449,7 +477,7 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer},
rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) ->
case maps:take(StreamID, Streams0) of
- {{_, StreamState}, Streams} ->
+ {#stream{state=StreamState}, Streams} ->
terminate_stream_handler(State, StreamID, Reason, StreamState),
Children = cowboy_children:shutdown(Children0, StreamID),
State#state{streams=Streams, children=Children};
@@ -494,10 +522,10 @@ down(State=#state{opts=Opts, children=Children0}, Pid, Msg) ->
info(State=#state{opts=Opts, streams=Streams}, StreamID, Msg) ->
case Streams of
- #{StreamID := {IsRunning, StreamState0}} ->
+ #{StreamID := Stream=#stream{state=StreamState0}} ->
try cowboy_stream:info(StreamID, Msg, StreamState0) of
{Commands, StreamState} ->
- commands(State#state{streams=Streams#{StreamID => {IsRunning, StreamState}}},
+ commands(State#state{streams=Streams#{StreamID => Stream#stream{state=StreamState}}},
StreamID, Commands)
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(info,
@@ -586,15 +614,13 @@ commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Ma
State0
end,
commands(State, StreamID, Tail);
-commands(State=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
- StreamID, [{flow, Size}|Tail]) ->
- Transport:send(Socket, [
- cow_http2:window_update(Size),
- cow_http2:window_update(StreamID, Size)
- ]),
- HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0),
- HTTP2Machine = cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1),
- commands(State#state{http2_machine=HTTP2Machine}, StreamID, Tail);
+%% Read the request body.
+commands(State0=#state{flow=Flow, streams=Streams}, StreamID, [{flow, Size}|Tail]) ->
+ #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams,
+ State = update_window(State0#state{flow=Flow + Size,
+ streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}},
+ StreamID),
+ commands(State, StreamID, Tail);
%% Supervise a child process.
commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
@@ -628,6 +654,25 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
cowboy:log(Log, Opts),
commands(State, StreamID, Tail).
+%% Tentatively update the window after the flow was updated.
+
+update_window(State=#state{socket=Socket, transport=Transport,
+ http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) ->
+ #{StreamID := #stream{flow=StreamFlow}} = Streams,
+ {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of
+ ok -> {<<>>, HTTP2Machine0};
+ {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1}
+ end,
+ {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of
+ ok -> {<<>>, HTTP2Machine2};
+ {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3}
+ end,
+ case {Data1, Data2} of
+ {<<>>, <<>>} -> ok;
+ _ -> Transport:send(Socket, [Data1, Data2])
+ end,
+ State#state{http2_machine=HTTP2Machine}.
+
%% Send the response, trailers or data.
send_response(State0, StreamID, StatusCode, Headers, Body) ->
@@ -741,7 +786,7 @@ goaway(State, {goaway, _, Reason, _}) ->
%% Cancel client-initiated streams that are above LastStreamID.
goaway_streams(_, [], _, _, Acc) ->
Acc;
-goaway_streams(State, [{StreamID, {_, StreamState}}|Tail], LastStreamID, Reason, Acc)
+goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamID, Reason, Acc)
when StreamID > LastStreamID, (StreamID rem 2) =:= 0 ->
terminate_stream_handler(State, StreamID, Reason, StreamState),
goaway_streams(State, Tail, LastStreamID, Reason, Acc);
@@ -780,7 +825,7 @@ terminate_reason({internal_error, _, _}) -> internal_error.
terminate_all_streams(_, [], _) ->
ok;
-terminate_all_streams(State, [{StreamID, {_, StreamState}}|Tail], Reason) ->
+terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) ->
terminate_stream_handler(State, StreamID, Reason, StreamState),
terminate_all_streams(State, Tail, Reason).
@@ -820,13 +865,13 @@ stop_stream(State=#state{http2_machine=HTTP2Machine}, StreamID) ->
end.
stopping(State=#state{streams=Streams}, StreamID) ->
- #{StreamID := {_, StreamState}} = Streams,
- State#state{streams=Streams#{StreamID => {stopping, StreamState}}}.
+ #{StreamID := Stream} = Streams,
+ State#state{streams=Streams#{StreamID => Stream#stream{status=stopping}}}.
%% If we finished sending data and the stream is stopping, terminate it.
maybe_terminate_stream(State=#state{streams=Streams}, StreamID, fin) ->
case Streams of
- #{StreamID := {stopping, _}} ->
+ #{StreamID := #stream{status=stopping}} ->
terminate_stream(State, StreamID);
_ ->
State
@@ -849,12 +894,15 @@ terminate_stream(State0=#state{socket=Socket, transport=Transport,
end,
terminate_stream(State, StreamID, normal).
-terminate_stream(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) ->
+%% We remove the stream flow from the connection flow. Any further
+%% data received for this stream is therefore fully contained within
+%% the extra window we allocated for this stream.
+terminate_stream(State=#state{flow=Flow, streams=Streams0, children=Children0}, StreamID, Reason) ->
case maps:take(StreamID, Streams0) of
- {{_, StreamState}, Streams} ->
+ {#stream{flow=StreamFlow, state=StreamState}, Streams} ->
terminate_stream_handler(State, StreamID, Reason, StreamState),
Children = cowboy_children:shutdown(Children0, StreamID),
- State#state{streams=Streams, children=Children};
+ State#state{flow=Flow - StreamFlow, streams=Streams, children=Children};
error ->
State
end.