aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/gun_http2.erl98
-rw-r--r--test/flow_SUITE.erl29
-rw-r--r--test/gun_test.erl2
-rw-r--r--test/rfc7540_SUITE.erl12
4 files changed, 93 insertions, 48 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 1726528..47f670f 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -40,7 +40,6 @@
%% Flow control.
flow :: integer() | infinity,
- flow_window = 0 :: non_neg_integer(),
%% Content handlers state.
handler_state :: undefined | gun_content_handler:state()
@@ -87,6 +86,10 @@ do_check_options([{keepalive, infinity}|Opts]) ->
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
do_check_options(Opts);
%% @todo Add all http2_machine options.
+do_check_options([{initial_connection_window_size, _}|Opts]) ->
+ do_check_options(Opts);
+do_check_options([{initial_stream_window_size, _}|Opts]) ->
+ do_check_options(Opts);
do_check_options([{max_frame_size_received, _}|Opts]) ->
do_check_options(Opts);
do_check_options([Opt|_]) ->
@@ -94,7 +97,13 @@ do_check_options([Opt|_]) ->
name() -> http2.
-init(Owner, Socket, Transport, Opts) ->
+init(Owner, Socket, Transport, Opts0) ->
+ %% We have different defaults than the protocol in order
+ %% to optimize for performance when receiving responses.
+ Opts = Opts0#{
+ initial_connection_window_size => maps:get(initial_connection_window_size, Opts0, 8000000),
+ initial_stream_window_size => maps:get(initial_stream_window_size, Opts0, 8000000)
+ },
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts),
Handlers = maps:get(content_handlers, Opts, [gun_data_h]),
%% @todo Better validate the preface being received.
@@ -209,62 +218,47 @@ maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) ->
end,
State.
-lingering_data_frame(State=#http2_state{socket=Socket, transport=Transport,
- http2_machine=HTTP2Machine0}, DataLen) ->
- Transport:send(Socket, cow_http2:window_update(DataLen)),
- HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0),
- State#http2_state{http2_machine=HTTP2Machine1}.
+lingering_data_frame(State, _DataLen) ->
+ %% We only update the connection's window when receiving
+ %% a lingering data frame.
+ update_window(State).
-data_frame(State=#http2_state{socket=Socket, transport=Transport,
- http2_machine=HTTP2Machine0}, StreamID, IsFin, Data,
- EvHandler, EvHandlerState0) ->
+data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0) ->
Stream = #stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0,
- flow_window=FlowWindow0, handler_state=Handlers0} = get_stream_by_id(State, StreamID),
+ handler_state=Handlers0} = get_stream_by_id(State0, StreamID),
{ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0),
Flow = case Flow0 of
infinity -> infinity;
_ -> Flow0 - Dec
end,
- Size = byte_size(Data),
- FlowWindow = if
- IsFin =:= nofin, Flow =< 0 ->
- FlowWindow0 + Size;
- true ->
- FlowWindow0
- end,
- {HTTP2Machine, EvHandlerState} = case Size of
+ State1 = store_stream(State0, Stream#stream{flow=Flow, handler_state=Handlers}),
+ {State, EvHandlerState} = case byte_size(Data) of
%% We do not send a WINDOW_UPDATE if the DATA frame was of size 0.
0 when IsFin =:= fin ->
EvHandlerState1 = EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
}, EvHandlerState0),
- {HTTP2Machine0, EvHandlerState1};
+ {State1, EvHandlerState1};
0 ->
- {HTTP2Machine0, EvHandlerState0};
+ {State1, EvHandlerState0};
_ ->
- Transport:send(Socket, cow_http2:window_update(Size)),
- HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0),
%% We do not send a stream WINDOW_UPDATE when the flow control kicks in
%% (it'll be sent when the flow recovers) or for the last DATA frame.
case IsFin of
nofin when Flow =< 0 ->
- {HTTP2Machine1, EvHandlerState0};
+ {update_window(State1), EvHandlerState0};
nofin ->
- Transport:send(Socket, cow_http2:window_update(StreamID, Size)),
- {cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1),
- EvHandlerState0};
+ {update_window(State1, StreamID), EvHandlerState0};
fin ->
EvHandlerState1 = EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
}, EvHandlerState0),
- {HTTP2Machine1, EvHandlerState1}
+ {update_window(State1), EvHandlerState1}
end
end,
- {maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine},
- Stream#stream{flow=Flow, flow_window=FlowWindow, handler_state=Handlers}), StreamID, remote, IsFin),
- EvHandlerState}.
+ {maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}.
headers_frame(State=#http2_state{content_handlers=Handlers0},
StreamID, IsFin, Headers, PseudoHeaders, _BodyLen,
@@ -378,21 +372,18 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error)
end.
-update_flow(State=#http2_state{socket=Socket, transport=Transport,
- http2_machine=HTTP2Machine0}, _ReplyTo, StreamRef, Inc) ->
+update_flow(State, _ReplyTo, StreamRef, Inc) ->
case get_stream_by_ref(State, StreamRef) of
- Stream=#stream{id=StreamID, flow=Flow0, flow_window=FlowWindow} ->
+ Stream=#stream{id=StreamID, flow=Flow0} ->
Flow = case Flow0 of
infinity -> infinity;
_ -> Flow0 + Inc
end,
if
- %% Flow is active again, update the window.
+ %% Flow is active again, update the stream's window.
Flow0 =< 0, Flow > 0 ->
- Transport:send(Socket, cow_http2:window_update(StreamID, FlowWindow)),
- HTTP2Machine = cow_http2_machine:update_window(StreamID, FlowWindow, HTTP2Machine0),
- {state, store_stream(State#http2_state{http2_machine=HTTP2Machine},
- Stream#stream{flow=Flow, flow_window=0})};
+ {state, update_window(store_stream(State,
+ Stream#stream{flow=Flow}), StreamID)};
true ->
{state, store_stream(State, Stream#stream{flow=Flow})}
end;
@@ -400,6 +391,35 @@ update_flow(State=#http2_state{socket=Socket, transport=Transport,
[]
end.
+%% Only update the connection's window.
+update_window(State=#http2_state{socket=Socket, transport=Transport,
+ opts=#{initial_connection_window_size := ConnWindow}, http2_machine=HTTP2Machine0}) ->
+ case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of
+ ok ->
+ State;
+ {ok, Increment, HTTP2Machine} ->
+ Transport:send(Socket, cow_http2:window_update(Increment)),
+ State#http2_state{http2_machine=HTTP2Machine}
+ end.
+
+%% Update both the connection and the stream's window.
+update_window(State=#http2_state{socket=Socket, transport=Transport,
+ opts=#{initial_connection_window_size := ConnWindow, initial_stream_window_size := StreamWindow},
+ http2_machine=HTTP2Machine0}, StreamID) ->
+ {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of
+ ok -> {<<>>, HTTP2Machine0};
+ {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1}
+ end,
+ {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamWindow, HTTP2Machine2) of
+ ok -> {<<>>, HTTP2Machine2};
+ {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3}
+ end,
+ case {Data1, Data2} of
+ {<<>>, <<>>} -> ok;
+ _ -> Transport:send(Socket, [Data1, Data2])
+ end,
+ State#http2_state{http2_machine=HTTP2Machine}.
+
%% We may have to cancel streams even if we receive multiple
%% GOAWAY frames as the LastStreamID value may be lower than
%% the one previously received.
diff --git a/test/flow_SUITE.erl b/test/flow_SUITE.erl
index 937af26..5076bbc 100644
--- a/test/flow_SUITE.erl
+++ b/test/flow_SUITE.erl
@@ -71,6 +71,8 @@ default_flow_http2(_) ->
flow => 1,
%% We set the max frame size to the same as the initial
%% window size in order to reduce the number of data messages.
+ initial_connection_window_size => 65535,
+ initial_stream_window_size => 65535,
max_frame_size_received => 65535
},
protocols => [http2]
@@ -86,8 +88,11 @@ default_flow_http2(_) ->
%% Then we confirm that we can override it per request.
StreamRef2 = gun:get(ConnPid, "/", [], #{flow => 2}),
{response, nofin, 200, _} = gun:await(ConnPid, StreamRef2),
- %% We set the flow to 2 therefore we will receive *3* data messages
- %% and then nothing because two windows have been fully consumed.
+ %% We set the flow to 2 but due to the ensure_window algorithm
+ %% we end up receiving *5* data messages before flow control kicks in,
+ %% equivalent to 3 SSE events.
+ {data, nofin, _} = gun:await(ConnPid, StreamRef2),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef2),
{data, nofin, _} = gun:await(ConnPid, StreamRef2),
{data, nofin, _} = gun:await(ConnPid, StreamRef2),
{data, nofin, _} = gun:await(ConnPid, StreamRef2),
@@ -132,7 +137,11 @@ flow_http2(_) ->
{ok, ConnPid} = gun:open("localhost", Port, #{
%% We set the max frame size to the same as the initial
%% window size in order to reduce the number of data messages.
- http2_opts => #{max_frame_size_received => 65535},
+ http2_opts => #{
+ initial_connection_window_size => 65535,
+ initial_stream_window_size => 65535,
+ max_frame_size_received => 65535
+ },
protocols => [http2]
}),
{ok, http2} = gun:await_up(ConnPid),
@@ -145,14 +154,16 @@ flow_http2(_) ->
%% We consumed all the window available.
65535 = byte_size(D1) + byte_size(D2),
{error, timeout} = gun:await(ConnPid, StreamRef, 3000),
- %% We then update the flow and get *3* more data messages but no more.
+ %% We then update the flow and get *5* more data messages but no more.
gun:update_flow(ConnPid, StreamRef, 2),
{data, nofin, D3} = gun:await(ConnPid, StreamRef),
{data, nofin, D4} = gun:await(ConnPid, StreamRef),
{data, nofin, D5} = gun:await(ConnPid, StreamRef),
+ {data, nofin, D6} = gun:await(ConnPid, StreamRef),
+ {data, nofin, D7} = gun:await(ConnPid, StreamRef),
%% We consumed all the window available again.
- %% D3 is the end of the truncated D2, D4 is full and D5 truncated.
- 65535 = byte_size(D3) + byte_size(D4) + byte_size(D5),
+ %% D3 is the end of the truncated D2, D4, D5 and D6 are full and D7 truncated.
+ 131070 = byte_size(D3) + byte_size(D4) + byte_size(D5) + byte_size(D6) + byte_size(D7),
{error, timeout} = gun:await(ConnPid, StreamRef, 1000),
gun:close(ConnPid)
after
@@ -302,6 +313,8 @@ sse_flow_http2(_) ->
%% window size in order to reduce the number of data messages.
http2_opts => #{
content_handlers => [gun_sse_h, gun_data_h],
+ initial_connection_window_size => 65535,
+ initial_stream_window_size => 65535,
max_frame_size_received => 65535
},
protocols => [http2]
@@ -314,10 +327,12 @@ sse_flow_http2(_) ->
%% the second event was fully received.
{sse, _} = gun:await(ConnPid, StreamRef),
{error, timeout} = gun:await(ConnPid, StreamRef, 3000),
- %% We then update the flow and get 2 more event messages but no more.
+ %% We then update the flow and get 3 more event messages but no more.
+ %% We get an extra message because of the ensure_window algorithm.
gun:update_flow(ConnPid, StreamRef, 2),
{sse, _} = gun:await(ConnPid, StreamRef),
{sse, _} = gun:await(ConnPid, StreamRef),
+ {sse, _} = gun:await(ConnPid, StreamRef),
{error, timeout} = gun:await(ConnPid, StreamRef, 1000),
gun:close(ConnPid)
after
diff --git a/test/gun_test.erl b/test/gun_test.erl
index e74fcd0..a2cbf6d 100644
--- a/test/gun_test.erl
+++ b/test/gun_test.erl
@@ -79,6 +79,8 @@ http2_handshake(Socket, Transport) ->
%% Receive the SETTINGS from the preface.
{ok, <<Len:24>>} = Transport:recv(Socket, 3, 5000),
{ok, <<4:8, 0:40, _:Len/binary>>} = Transport:recv(Socket, 6 + Len, 5000),
+ %% Receive the WINDOW_UPDATE sent with the preface.
+ {ok, <<4:24, 8:8, 0:40, _:32>>} = Transport:recv(Socket, 13, 5000),
%% Send the SETTINGS ack.
ok = Transport:send(Socket, cow_http2:settings_ack()),
%% Receive the SETTINGS ack.
diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl
index 0db9dd0..14e9c0c 100644
--- a/test/rfc7540_SUITE.erl
+++ b/test/rfc7540_SUITE.erl
@@ -98,9 +98,17 @@ lingering_data_counts_toward_connection_window(_) ->
%% Skip RST_STREAM.
{ok, << 4:24, 3:8, 1:40, _:32 >>} = gen_tcp:recv(Socket, 13, 1000),
%% Received a WINDOW_UPDATE frame after we got RST_STREAM.
- {ok, << 4:24, 8:8, 0:40, 1000:32 >>} = gen_tcp:recv(Socket, 13, 1000)
+ {ok, << 4:24, 8:8, 0:40, Increment:32 >>} = gen_tcp:recv(Socket, 13, 1000),
+ true = Increment > 0
end),
- {ok, ConnPid} = gun:open("localhost", Port, #{protocols => [http2]}),
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ protocols => [http2],
+ http2_opts => #{
+ %% We don't set 65535 because we still want to have an initial WINDOW_UPDATE.
+ initial_connection_window_size => 65536,
+ initial_stream_window_size => 65535
+ }
+ }),
{ok, http2} = gun:await_up(ConnPid),
timer:sleep(100), %% Give enough time for the handshake to fully complete.
%% Step 1.