diff options
author | Loïc Hoguin <[email protected]> | 2019-09-13 10:52:10 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2019-09-13 10:52:10 +0200 |
commit | 4194682d4edaee3da34783c46a513698eb1e8d05 (patch) | |
tree | 6e43ee4ed6a52e9b1f44c0eede4673b42f6ca21e | |
parent | 585c1dcd001c2cb41cc77216aed2cf729fad6cc7 (diff) | |
download | gun-4194682d4edaee3da34783c46a513698eb1e8d05.tar.gz gun-4194682d4edaee3da34783c46a513698eb1e8d05.tar.bz2 gun-4194682d4edaee3da34783c46a513698eb1e8d05.zip |
Use cow_http2_machine:ensure_window
Gun was very inefficient at receiving HTTP/2 bodies. Switching
to ensure_window and increasing the default window sizes brings
the response body reading performance at least on par with the
one for HTTP/1.1.
This has a small negative impact on message flow control because
we stop updating the window later than we did before, increasing
the number of extra messages we may send. The exact amount depends
on configuration and the exact moment flow control kicks in.
-rw-r--r-- | src/gun_http2.erl | 98 | ||||
-rw-r--r-- | test/flow_SUITE.erl | 29 | ||||
-rw-r--r-- | test/gun_test.erl | 2 | ||||
-rw-r--r-- | test/rfc7540_SUITE.erl | 12 |
4 files changed, 93 insertions, 48 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 1726528..47f670f 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -40,7 +40,6 @@ %% Flow control. flow :: integer() | infinity, - flow_window = 0 :: non_neg_integer(), %% Content handlers state. handler_state :: undefined | gun_content_handler:state() @@ -87,6 +86,10 @@ do_check_options([{keepalive, infinity}|Opts]) -> do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> do_check_options(Opts); %% @todo Add all http2_machine options. +do_check_options([{initial_connection_window_size, _}|Opts]) -> + do_check_options(Opts); +do_check_options([{initial_stream_window_size, _}|Opts]) -> + do_check_options(Opts); do_check_options([{max_frame_size_received, _}|Opts]) -> do_check_options(Opts); do_check_options([Opt|_]) -> @@ -94,7 +97,13 @@ do_check_options([Opt|_]) -> name() -> http2. -init(Owner, Socket, Transport, Opts) -> +init(Owner, Socket, Transport, Opts0) -> + %% We have different defaults than the protocol in order + %% to optimize for performance when receiving responses. + Opts = Opts0#{ + initial_connection_window_size => maps:get(initial_connection_window_size, Opts0, 8000000), + initial_stream_window_size => maps:get(initial_stream_window_size, Opts0, 8000000) + }, {ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts), Handlers = maps:get(content_handlers, Opts, [gun_data_h]), %% @todo Better validate the preface being received. @@ -209,62 +218,47 @@ maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) -> end, State. -lingering_data_frame(State=#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, DataLen) -> - Transport:send(Socket, cow_http2:window_update(DataLen)), - HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0), - State#http2_state{http2_machine=HTTP2Machine1}. +lingering_data_frame(State, _DataLen) -> + %% We only update the connection's window when receiving + %% a lingering data frame. + update_window(State). -data_frame(State=#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, StreamID, IsFin, Data, - EvHandler, EvHandlerState0) -> +data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0) -> Stream = #stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0, - flow_window=FlowWindow0, handler_state=Handlers0} = get_stream_by_id(State, StreamID), + handler_state=Handlers0} = get_stream_by_id(State0, StreamID), {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0), Flow = case Flow0 of infinity -> infinity; _ -> Flow0 - Dec end, - Size = byte_size(Data), - FlowWindow = if - IsFin =:= nofin, Flow =< 0 -> - FlowWindow0 + Size; - true -> - FlowWindow0 - end, - {HTTP2Machine, EvHandlerState} = case Size of + State1 = store_stream(State0, Stream#stream{flow=Flow, handler_state=Handlers}), + {State, EvHandlerState} = case byte_size(Data) of %% We do not send a WINDOW_UPDATE if the DATA frame was of size 0. 0 when IsFin =:= fin -> EvHandlerState1 = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState0), - {HTTP2Machine0, EvHandlerState1}; + {State1, EvHandlerState1}; 0 -> - {HTTP2Machine0, EvHandlerState0}; + {State1, EvHandlerState0}; _ -> - Transport:send(Socket, cow_http2:window_update(Size)), - HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0), %% We do not send a stream WINDOW_UPDATE when the flow control kicks in %% (it'll be sent when the flow recovers) or for the last DATA frame. case IsFin of nofin when Flow =< 0 -> - {HTTP2Machine1, EvHandlerState0}; + {update_window(State1), EvHandlerState0}; nofin -> - Transport:send(Socket, cow_http2:window_update(StreamID, Size)), - {cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1), - EvHandlerState0}; + {update_window(State1, StreamID), EvHandlerState0}; fin -> EvHandlerState1 = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState0), - {HTTP2Machine1, EvHandlerState1} + {update_window(State1), EvHandlerState1} end end, - {maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine}, - Stream#stream{flow=Flow, flow_window=FlowWindow, handler_state=Handlers}), StreamID, remote, IsFin), - EvHandlerState}. + {maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}. headers_frame(State=#http2_state{content_handlers=Handlers0}, StreamID, IsFin, Headers, PseudoHeaders, _BodyLen, @@ -378,21 +372,18 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error) end. -update_flow(State=#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, _ReplyTo, StreamRef, Inc) -> +update_flow(State, _ReplyTo, StreamRef, Inc) -> case get_stream_by_ref(State, StreamRef) of - Stream=#stream{id=StreamID, flow=Flow0, flow_window=FlowWindow} -> + Stream=#stream{id=StreamID, flow=Flow0} -> Flow = case Flow0 of infinity -> infinity; _ -> Flow0 + Inc end, if - %% Flow is active again, update the window. + %% Flow is active again, update the stream's window. Flow0 =< 0, Flow > 0 -> - Transport:send(Socket, cow_http2:window_update(StreamID, FlowWindow)), - HTTP2Machine = cow_http2_machine:update_window(StreamID, FlowWindow, HTTP2Machine0), - {state, store_stream(State#http2_state{http2_machine=HTTP2Machine}, - Stream#stream{flow=Flow, flow_window=0})}; + {state, update_window(store_stream(State, + Stream#stream{flow=Flow}), StreamID)}; true -> {state, store_stream(State, Stream#stream{flow=Flow})} end; @@ -400,6 +391,35 @@ update_flow(State=#http2_state{socket=Socket, transport=Transport, [] end. +%% Only update the connection's window. +update_window(State=#http2_state{socket=Socket, transport=Transport, + opts=#{initial_connection_window_size := ConnWindow}, http2_machine=HTTP2Machine0}) -> + case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of + ok -> + State; + {ok, Increment, HTTP2Machine} -> + Transport:send(Socket, cow_http2:window_update(Increment)), + State#http2_state{http2_machine=HTTP2Machine} + end. + +%% Update both the connection and the stream's window. +update_window(State=#http2_state{socket=Socket, transport=Transport, + opts=#{initial_connection_window_size := ConnWindow, initial_stream_window_size := StreamWindow}, + http2_machine=HTTP2Machine0}, StreamID) -> + {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of + ok -> {<<>>, HTTP2Machine0}; + {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1} + end, + {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamWindow, HTTP2Machine2) of + ok -> {<<>>, HTTP2Machine2}; + {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} + end, + case {Data1, Data2} of + {<<>>, <<>>} -> ok; + _ -> Transport:send(Socket, [Data1, Data2]) + end, + State#http2_state{http2_machine=HTTP2Machine}. + %% We may have to cancel streams even if we receive multiple %% GOAWAY frames as the LastStreamID value may be lower than %% the one previously received. diff --git a/test/flow_SUITE.erl b/test/flow_SUITE.erl index 937af26..5076bbc 100644 --- a/test/flow_SUITE.erl +++ b/test/flow_SUITE.erl @@ -71,6 +71,8 @@ default_flow_http2(_) -> flow => 1, %% We set the max frame size to the same as the initial %% window size in order to reduce the number of data messages. + initial_connection_window_size => 65535, + initial_stream_window_size => 65535, max_frame_size_received => 65535 }, protocols => [http2] @@ -86,8 +88,11 @@ default_flow_http2(_) -> %% Then we confirm that we can override it per request. StreamRef2 = gun:get(ConnPid, "/", [], #{flow => 2}), {response, nofin, 200, _} = gun:await(ConnPid, StreamRef2), - %% We set the flow to 2 therefore we will receive *3* data messages - %% and then nothing because two windows have been fully consumed. + %% We set the flow to 2 but due to the ensure_window algorithm + %% we end up receiving *5* data messages before flow control kicks in, + %% equivalent to 3 SSE events. + {data, nofin, _} = gun:await(ConnPid, StreamRef2), + {data, nofin, _} = gun:await(ConnPid, StreamRef2), {data, nofin, _} = gun:await(ConnPid, StreamRef2), {data, nofin, _} = gun:await(ConnPid, StreamRef2), {data, nofin, _} = gun:await(ConnPid, StreamRef2), @@ -132,7 +137,11 @@ flow_http2(_) -> {ok, ConnPid} = gun:open("localhost", Port, #{ %% We set the max frame size to the same as the initial %% window size in order to reduce the number of data messages. - http2_opts => #{max_frame_size_received => 65535}, + http2_opts => #{ + initial_connection_window_size => 65535, + initial_stream_window_size => 65535, + max_frame_size_received => 65535 + }, protocols => [http2] }), {ok, http2} = gun:await_up(ConnPid), @@ -145,14 +154,16 @@ flow_http2(_) -> %% We consumed all the window available. 65535 = byte_size(D1) + byte_size(D2), {error, timeout} = gun:await(ConnPid, StreamRef, 3000), - %% We then update the flow and get *3* more data messages but no more. + %% We then update the flow and get *5* more data messages but no more. gun:update_flow(ConnPid, StreamRef, 2), {data, nofin, D3} = gun:await(ConnPid, StreamRef), {data, nofin, D4} = gun:await(ConnPid, StreamRef), {data, nofin, D5} = gun:await(ConnPid, StreamRef), + {data, nofin, D6} = gun:await(ConnPid, StreamRef), + {data, nofin, D7} = gun:await(ConnPid, StreamRef), %% We consumed all the window available again. - %% D3 is the end of the truncated D2, D4 is full and D5 truncated. - 65535 = byte_size(D3) + byte_size(D4) + byte_size(D5), + %% D3 is the end of the truncated D2, D4, D5 and D6 are full and D7 truncated. + 131070 = byte_size(D3) + byte_size(D4) + byte_size(D5) + byte_size(D6) + byte_size(D7), {error, timeout} = gun:await(ConnPid, StreamRef, 1000), gun:close(ConnPid) after @@ -302,6 +313,8 @@ sse_flow_http2(_) -> %% window size in order to reduce the number of data messages. http2_opts => #{ content_handlers => [gun_sse_h, gun_data_h], + initial_connection_window_size => 65535, + initial_stream_window_size => 65535, max_frame_size_received => 65535 }, protocols => [http2] @@ -314,10 +327,12 @@ sse_flow_http2(_) -> %% the second event was fully received. {sse, _} = gun:await(ConnPid, StreamRef), {error, timeout} = gun:await(ConnPid, StreamRef, 3000), - %% We then update the flow and get 2 more event messages but no more. + %% We then update the flow and get 3 more event messages but no more. + %% We get an extra message because of the ensure_window algorithm. gun:update_flow(ConnPid, StreamRef, 2), {sse, _} = gun:await(ConnPid, StreamRef), {sse, _} = gun:await(ConnPid, StreamRef), + {sse, _} = gun:await(ConnPid, StreamRef), {error, timeout} = gun:await(ConnPid, StreamRef, 1000), gun:close(ConnPid) after diff --git a/test/gun_test.erl b/test/gun_test.erl index e74fcd0..a2cbf6d 100644 --- a/test/gun_test.erl +++ b/test/gun_test.erl @@ -79,6 +79,8 @@ http2_handshake(Socket, Transport) -> %% Receive the SETTINGS from the preface. {ok, <<Len:24>>} = Transport:recv(Socket, 3, 5000), {ok, <<4:8, 0:40, _:Len/binary>>} = Transport:recv(Socket, 6 + Len, 5000), + %% Receive the WINDOW_UPDATE sent with the preface. + {ok, <<4:24, 8:8, 0:40, _:32>>} = Transport:recv(Socket, 13, 5000), %% Send the SETTINGS ack. ok = Transport:send(Socket, cow_http2:settings_ack()), %% Receive the SETTINGS ack. diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl index 0db9dd0..14e9c0c 100644 --- a/test/rfc7540_SUITE.erl +++ b/test/rfc7540_SUITE.erl @@ -98,9 +98,17 @@ lingering_data_counts_toward_connection_window(_) -> %% Skip RST_STREAM. {ok, << 4:24, 3:8, 1:40, _:32 >>} = gen_tcp:recv(Socket, 13, 1000), %% Received a WINDOW_UPDATE frame after we got RST_STREAM. - {ok, << 4:24, 8:8, 0:40, 1000:32 >>} = gen_tcp:recv(Socket, 13, 1000) + {ok, << 4:24, 8:8, 0:40, Increment:32 >>} = gen_tcp:recv(Socket, 13, 1000), + true = Increment > 0 end), - {ok, ConnPid} = gun:open("localhost", Port, #{protocols => [http2]}), + {ok, ConnPid} = gun:open("localhost", Port, #{ + protocols => [http2], + http2_opts => #{ + %% We don't set 65535 because we still want to have an initial WINDOW_UPDATE. + initial_connection_window_size => 65536, + initial_stream_window_size => 65535 + } + }), {ok, http2} = gun:await_up(ConnPid), timer:sleep(100), %% Give enough time for the handshake to fully complete. %% Step 1. |