aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-09-13 10:52:10 +0200
committerLoïc Hoguin <[email protected]>2019-09-13 10:52:10 +0200
commit4194682d4edaee3da34783c46a513698eb1e8d05 (patch)
tree6e43ee4ed6a52e9b1f44c0eede4673b42f6ca21e
parent585c1dcd001c2cb41cc77216aed2cf729fad6cc7 (diff)
downloadgun-4194682d4edaee3da34783c46a513698eb1e8d05.tar.gz
gun-4194682d4edaee3da34783c46a513698eb1e8d05.tar.bz2
gun-4194682d4edaee3da34783c46a513698eb1e8d05.zip
Use cow_http2_machine:ensure_window
Gun was very inefficient at receiving HTTP/2 bodies. Switching to ensure_window and increasing the default window sizes brings the response body reading performance at least on par with the one for HTTP/1.1. This has a small negative impact on message flow control because we stop updating the window later than we did before, increasing the number of extra messages we may send. The exact amount depends on configuration and the exact moment flow control kicks in.
-rw-r--r--src/gun_http2.erl98
-rw-r--r--test/flow_SUITE.erl29
-rw-r--r--test/gun_test.erl2
-rw-r--r--test/rfc7540_SUITE.erl12
4 files changed, 93 insertions, 48 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 1726528..47f670f 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -40,7 +40,6 @@
%% Flow control.
flow :: integer() | infinity,
- flow_window = 0 :: non_neg_integer(),
%% Content handlers state.
handler_state :: undefined | gun_content_handler:state()
@@ -87,6 +86,10 @@ do_check_options([{keepalive, infinity}|Opts]) ->
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
do_check_options(Opts);
%% @todo Add all http2_machine options.
+do_check_options([{initial_connection_window_size, _}|Opts]) ->
+ do_check_options(Opts);
+do_check_options([{initial_stream_window_size, _}|Opts]) ->
+ do_check_options(Opts);
do_check_options([{max_frame_size_received, _}|Opts]) ->
do_check_options(Opts);
do_check_options([Opt|_]) ->
@@ -94,7 +97,13 @@ do_check_options([Opt|_]) ->
name() -> http2.
-init(Owner, Socket, Transport, Opts) ->
+init(Owner, Socket, Transport, Opts0) ->
+ %% We have different defaults than the protocol in order
+ %% to optimize for performance when receiving responses.
+ Opts = Opts0#{
+ initial_connection_window_size => maps:get(initial_connection_window_size, Opts0, 8000000),
+ initial_stream_window_size => maps:get(initial_stream_window_size, Opts0, 8000000)
+ },
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts),
Handlers = maps:get(content_handlers, Opts, [gun_data_h]),
%% @todo Better validate the preface being received.
@@ -209,62 +218,47 @@ maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) ->
end,
State.
-lingering_data_frame(State=#http2_state{socket=Socket, transport=Transport,
- http2_machine=HTTP2Machine0}, DataLen) ->
- Transport:send(Socket, cow_http2:window_update(DataLen)),
- HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0),
- State#http2_state{http2_machine=HTTP2Machine1}.
+lingering_data_frame(State, _DataLen) ->
+ %% We only update the connection's window when receiving
+ %% a lingering data frame.
+ update_window(State).
-data_frame(State=#http2_state{socket=Socket, transport=Transport,
- http2_machine=HTTP2Machine0}, StreamID, IsFin, Data,
- EvHandler, EvHandlerState0) ->
+data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0) ->
Stream = #stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0,
- flow_window=FlowWindow0, handler_state=Handlers0} = get_stream_by_id(State, StreamID),
+ handler_state=Handlers0} = get_stream_by_id(State0, StreamID),
{ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0),
Flow = case Flow0 of
infinity -> infinity;
_ -> Flow0 - Dec
end,
- Size = byte_size(Data),
- FlowWindow = if
- IsFin =:= nofin, Flow =< 0 ->
- FlowWindow0 + Size;
- true ->
- FlowWindow0
- end,
- {HTTP2Machine, EvHandlerState} = case Size of
+ State1 = store_stream(State0, Stream#stream{flow=Flow, handler_state=Handlers}),
+ {State, EvHandlerState} = case byte_size(Data) of
%% We do not send a WINDOW_UPDATE if the DATA frame was of size 0.
0 when IsFin =:= fin ->
EvHandlerState1 = EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
}, EvHandlerState0),
- {HTTP2Machine0, EvHandlerState1};
+ {State1, EvHandlerState1};
0 ->
- {HTTP2Machine0, EvHandlerState0};
+ {State1, EvHandlerState0};
_ ->
- Transport:send(Socket, cow_http2:window_update(Size)),
- HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0),
%% We do not send a stream WINDOW_UPDATE when the flow control kicks in
%% (it'll be sent when the flow recovers) or for the last DATA frame.
case IsFin of
nofin when Flow =< 0 ->
- {HTTP2Machine1, EvHandlerState0};
+ {update_window(State1), EvHandlerState0};
nofin ->
- Transport:send(Socket, cow_http2:window_update(StreamID, Size)),
- {cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1),
- EvHandlerState0};
+ {update_window(State1, StreamID), EvHandlerState0};
fin ->
EvHandlerState1 = EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
}, EvHandlerState0),
- {HTTP2Machine1, EvHandlerState1}
+ {update_window(State1), EvHandlerState1}
end
end,
- {maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine},
- Stream#stream{flow=Flow, flow_window=FlowWindow, handler_state=Handlers}), StreamID, remote, IsFin),
- EvHandlerState}.
+ {maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}.
headers_frame(State=#http2_state{content_handlers=Handlers0},
StreamID, IsFin, Headers, PseudoHeaders, _BodyLen,
@@ -378,21 +372,18 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error)
end.
-update_flow(State=#http2_state{socket=Socket, transport=Transport,
- http2_machine=HTTP2Machine0}, _ReplyTo, StreamRef, Inc) ->
+update_flow(State, _ReplyTo, StreamRef, Inc) ->
case get_stream_by_ref(State, StreamRef) of
- Stream=#stream{id=StreamID, flow=Flow0, flow_window=FlowWindow} ->
+ Stream=#stream{id=StreamID, flow=Flow0} ->
Flow = case Flow0 of
infinity -> infinity;
_ -> Flow0 + Inc
end,
if
- %% Flow is active again, update the window.
+ %% Flow is active again, update the stream's window.
Flow0 =< 0, Flow > 0 ->
- Transport:send(Socket, cow_http2:window_update(StreamID, FlowWindow)),
- HTTP2Machine = cow_http2_machine:update_window(StreamID, FlowWindow, HTTP2Machine0),
- {state, store_stream(State#http2_state{http2_machine=HTTP2Machine},
- Stream#stream{flow=Flow, flow_window=0})};
+ {state, update_window(store_stream(State,
+ Stream#stream{flow=Flow}), StreamID)};
true ->
{state, store_stream(State, Stream#stream{flow=Flow})}
end;
@@ -400,6 +391,35 @@ update_flow(State=#http2_state{socket=Socket, transport=Transport,
[]
end.
+%% Only update the connection's window.
+update_window(State=#http2_state{socket=Socket, transport=Transport,
+ opts=#{initial_connection_window_size := ConnWindow}, http2_machine=HTTP2Machine0}) ->
+ case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of
+ ok ->
+ State;
+ {ok, Increment, HTTP2Machine} ->
+ Transport:send(Socket, cow_http2:window_update(Increment)),
+ State#http2_state{http2_machine=HTTP2Machine}
+ end.
+
+%% Update both the connection and the stream's window.
+update_window(State=#http2_state{socket=Socket, transport=Transport,
+ opts=#{initial_connection_window_size := ConnWindow, initial_stream_window_size := StreamWindow},
+ http2_machine=HTTP2Machine0}, StreamID) ->
+ {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of
+ ok -> {<<>>, HTTP2Machine0};
+ {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1}
+ end,
+ {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamWindow, HTTP2Machine2) of
+ ok -> {<<>>, HTTP2Machine2};
+ {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3}
+ end,
+ case {Data1, Data2} of
+ {<<>>, <<>>} -> ok;
+ _ -> Transport:send(Socket, [Data1, Data2])
+ end,
+ State#http2_state{http2_machine=HTTP2Machine}.
+
%% We may have to cancel streams even if we receive multiple
%% GOAWAY frames as the LastStreamID value may be lower than
%% the one previously received.
diff --git a/test/flow_SUITE.erl b/test/flow_SUITE.erl
index 937af26..5076bbc 100644
--- a/test/flow_SUITE.erl
+++ b/test/flow_SUITE.erl
@@ -71,6 +71,8 @@ default_flow_http2(_) ->
flow => 1,
%% We set the max frame size to the same as the initial
%% window size in order to reduce the number of data messages.
+ initial_connection_window_size => 65535,
+ initial_stream_window_size => 65535,
max_frame_size_received => 65535
},
protocols => [http2]
@@ -86,8 +88,11 @@ default_flow_http2(_) ->
%% Then we confirm that we can override it per request.
StreamRef2 = gun:get(ConnPid, "/", [], #{flow => 2}),
{response, nofin, 200, _} = gun:await(ConnPid, StreamRef2),
- %% We set the flow to 2 therefore we will receive *3* data messages
- %% and then nothing because two windows have been fully consumed.
+ %% We set the flow to 2 but due to the ensure_window algorithm
+ %% we end up receiving *5* data messages before flow control kicks in,
+ %% equivalent to 3 SSE events.
+ {data, nofin, _} = gun:await(ConnPid, StreamRef2),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef2),
{data, nofin, _} = gun:await(ConnPid, StreamRef2),
{data, nofin, _} = gun:await(ConnPid, StreamRef2),
{data, nofin, _} = gun:await(ConnPid, StreamRef2),
@@ -132,7 +137,11 @@ flow_http2(_) ->
{ok, ConnPid} = gun:open("localhost", Port, #{
%% We set the max frame size to the same as the initial
%% window size in order to reduce the number of data messages.
- http2_opts => #{max_frame_size_received => 65535},
+ http2_opts => #{
+ initial_connection_window_size => 65535,
+ initial_stream_window_size => 65535,
+ max_frame_size_received => 65535
+ },
protocols => [http2]
}),
{ok, http2} = gun:await_up(ConnPid),
@@ -145,14 +154,16 @@ flow_http2(_) ->
%% We consumed all the window available.
65535 = byte_size(D1) + byte_size(D2),
{error, timeout} = gun:await(ConnPid, StreamRef, 3000),
- %% We then update the flow and get *3* more data messages but no more.
+ %% We then update the flow and get *5* more data messages but no more.
gun:update_flow(ConnPid, StreamRef, 2),
{data, nofin, D3} = gun:await(ConnPid, StreamRef),
{data, nofin, D4} = gun:await(ConnPid, StreamRef),
{data, nofin, D5} = gun:await(ConnPid, StreamRef),
+ {data, nofin, D6} = gun:await(ConnPid, StreamRef),
+ {data, nofin, D7} = gun:await(ConnPid, StreamRef),
%% We consumed all the window available again.
- %% D3 is the end of the truncated D2, D4 is full and D5 truncated.
- 65535 = byte_size(D3) + byte_size(D4) + byte_size(D5),
+ %% D3 is the end of the truncated D2, D4, D5 and D6 are full and D7 truncated.
+ 131070 = byte_size(D3) + byte_size(D4) + byte_size(D5) + byte_size(D6) + byte_size(D7),
{error, timeout} = gun:await(ConnPid, StreamRef, 1000),
gun:close(ConnPid)
after
@@ -302,6 +313,8 @@ sse_flow_http2(_) ->
%% window size in order to reduce the number of data messages.
http2_opts => #{
content_handlers => [gun_sse_h, gun_data_h],
+ initial_connection_window_size => 65535,
+ initial_stream_window_size => 65535,
max_frame_size_received => 65535
},
protocols => [http2]
@@ -314,10 +327,12 @@ sse_flow_http2(_) ->
%% the second event was fully received.
{sse, _} = gun:await(ConnPid, StreamRef),
{error, timeout} = gun:await(ConnPid, StreamRef, 3000),
- %% We then update the flow and get 2 more event messages but no more.
+ %% We then update the flow and get 3 more event messages but no more.
+ %% We get an extra message because of the ensure_window algorithm.
gun:update_flow(ConnPid, StreamRef, 2),
{sse, _} = gun:await(ConnPid, StreamRef),
{sse, _} = gun:await(ConnPid, StreamRef),
+ {sse, _} = gun:await(ConnPid, StreamRef),
{error, timeout} = gun:await(ConnPid, StreamRef, 1000),
gun:close(ConnPid)
after
diff --git a/test/gun_test.erl b/test/gun_test.erl
index e74fcd0..a2cbf6d 100644
--- a/test/gun_test.erl
+++ b/test/gun_test.erl
@@ -79,6 +79,8 @@ http2_handshake(Socket, Transport) ->
%% Receive the SETTINGS from the preface.
{ok, <<Len:24>>} = Transport:recv(Socket, 3, 5000),
{ok, <<4:8, 0:40, _:Len/binary>>} = Transport:recv(Socket, 6 + Len, 5000),
+ %% Receive the WINDOW_UPDATE sent with the preface.
+ {ok, <<4:24, 8:8, 0:40, _:32>>} = Transport:recv(Socket, 13, 5000),
%% Send the SETTINGS ack.
ok = Transport:send(Socket, cow_http2:settings_ack()),
%% Receive the SETTINGS ack.
diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl
index 0db9dd0..14e9c0c 100644
--- a/test/rfc7540_SUITE.erl
+++ b/test/rfc7540_SUITE.erl
@@ -98,9 +98,17 @@ lingering_data_counts_toward_connection_window(_) ->
%% Skip RST_STREAM.
{ok, << 4:24, 3:8, 1:40, _:32 >>} = gen_tcp:recv(Socket, 13, 1000),
%% Received a WINDOW_UPDATE frame after we got RST_STREAM.
- {ok, << 4:24, 8:8, 0:40, 1000:32 >>} = gen_tcp:recv(Socket, 13, 1000)
+ {ok, << 4:24, 8:8, 0:40, Increment:32 >>} = gen_tcp:recv(Socket, 13, 1000),
+ true = Increment > 0
end),
- {ok, ConnPid} = gun:open("localhost", Port, #{protocols => [http2]}),
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ protocols => [http2],
+ http2_opts => #{
+ %% We don't set 65535 because we still want to have an initial WINDOW_UPDATE.
+ initial_connection_window_size => 65536,
+ initial_stream_window_size => 65535
+ }
+ }),
{ok, http2} = gun:await_up(ConnPid),
timer:sleep(100), %% Give enough time for the handshake to fully complete.
%% Step 1.