aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/cow_http2_machine.erl65
1 files changed, 50 insertions, 15 deletions
diff --git a/src/cow_http2_machine.erl b/src/cow_http2_machine.erl
index 5c9e66e..412eba0 100644
--- a/src/cow_http2_machine.erl
+++ b/src/cow_http2_machine.erl
@@ -51,6 +51,7 @@
max_stream_window_size => 0..16#7fffffff,
preface_timeout => timeout(),
settings_timeout => timeout(),
+ stream_window_data_threshold => 0..16#7fffffff,
stream_window_margin_size => 0..16#7fffffff,
stream_window_update_threshold => 0..16#7fffffff
}.
@@ -1139,9 +1140,15 @@ prepare_trailers(StreamID, State=#http2_machine{encode_state=EncodeState0}, Trai
| {send, [{cow_http2:streamid(), cow_http2:fin(), [DataOrFileOrTrailers]}], State}
when State::http2_machine(), DataOrFileOrTrailers::
{data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}.
-send_or_queue_data(StreamID, State0, IsFin0, DataOrFileOrTrailers0) ->
+send_or_queue_data(StreamID, State0=#http2_machine{opts=Opts, local_window=ConnWindow},
+ IsFin0, DataOrFileOrTrailers0) ->
%% @todo Probably just ignore if the method was HEAD.
- Stream0 = #stream{local=nofin, te=TE0} = stream_get(StreamID, State0),
+ Stream0 = #stream{
+ local=nofin,
+ local_window=StreamWindow,
+ local_buffer_size=BufferSize,
+ te=TE0
+ } = stream_get(StreamID, State0),
DataOrFileOrTrailers = case DataOrFileOrTrailers0 of
{trailers, _} ->
%% We only accept TE headers containing exactly "trailers" (RFC7540 8.1.2.1).
@@ -1161,11 +1168,26 @@ send_or_queue_data(StreamID, State0, IsFin0, DataOrFileOrTrailers0) ->
_ ->
DataOrFileOrTrailers0
end,
- case send_or_queue_data(Stream0, State0, [], IsFin0, DataOrFileOrTrailers, in) of
- {ok, Stream, State, []} ->
- {ok, stream_store(Stream, State)};
- {ok, Stream=#stream{local=IsFin}, State, SendData} ->
- {send, [{StreamID, IsFin, lists:reverse(SendData)}], stream_store(Stream, State)}
+ SendSize = BufferSize + case DataOrFileOrTrailers of
+ {data, D} -> iolist_size(D);
+ #sendfile{bytes=B} -> B;
+ {trailers, _} -> 0
+ end,
+ MinSendSize = maps:get(stream_window_data_threshold, Opts, 16384),
+ if
+ %% If we cannot send the data all at once and the window
+ %% is smaller than we are willing to send at a minimum,
+ %% we queue the data directly.
+ (StreamWindow < MinSendSize)
+ andalso ((StreamWindow < SendSize) orelse (ConnWindow < SendSize)) ->
+ {ok, stream_store(queue_data(Stream0, IsFin0, DataOrFileOrTrailers, in), State0)};
+ true ->
+ case send_or_queue_data(Stream0, State0, [], IsFin0, DataOrFileOrTrailers, in) of
+ {ok, Stream, State, []} ->
+ {ok, stream_store(Stream, State)};
+ {ok, Stream=#stream{local=IsFin}, State, SendData} ->
+ {send, [{StreamID, IsFin, lists:reverse(SendData)}], stream_store(Stream, State)}
+ end
end.
%% Internal data sending/queuing functions.
@@ -1228,14 +1250,27 @@ send_data_for_one_stream(Stream=#stream{local=IsFin, local_window=StreamWindow,
local_buffer_size=BufferSize}, State=#http2_machine{local_window=ConnWindow}, SendAcc)
when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 ->
{ok, Stream, State, lists:reverse(SendAcc)};
-send_data_for_one_stream(Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize},
- State0, SendAcc0) ->
- %% We know there is an item in the queue.
- {{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0),
- Stream1 = Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize},
- {ok, Stream, State, SendAcc}
- = send_or_queue_data(Stream1, State0, SendAcc0, IsFin, Data, in_r),
- send_data_for_one_stream(Stream, State, SendAcc).
+send_data_for_one_stream(Stream0=#stream{local_window=StreamWindow,
+ local_buffer=Q0, local_buffer_size=BufferSize},
+ State0=#http2_machine{opts=Opts, local_window=ConnWindow}, SendAcc0) ->
+ MinSendSize = maps:get(stream_window_data_threshold, Opts, 16384),
+ if
+ %% If we cannot send the entire buffer at once and the window
+ %% is smaller than we are willing to send at a minimum, do nothing.
+ %%
+ %% We only do this check the first time we go through this function;
+ %% we want to send as much data as possible IF we send some.
+ (SendAcc0 =:= []) andalso (StreamWindow < MinSendSize)
+ andalso ((StreamWindow < BufferSize) orelse (ConnWindow < BufferSize)) ->
+ {ok, Stream0, State0, []};
+ true ->
+ %% We know there is an item in the queue.
+ {{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0),
+ Stream1 = Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize},
+ {ok, Stream, State, SendAcc}
+ = send_or_queue_data(Stream1, State0, SendAcc0, IsFin, Data, in_r),
+ send_data_for_one_stream(Stream, State, SendAcc)
+ end.
%% We can send trailers immediately if the queue is empty, otherwise we queue.
%% We always send trailer frames even if the window is empty.