diff options
Diffstat (limited to 'src/gun_http2.erl')
-rw-r--r-- | src/gun_http2.erl | 98 |
1 files changed, 59 insertions, 39 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 1726528..47f670f 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -40,7 +40,6 @@ %% Flow control. flow :: integer() | infinity, - flow_window = 0 :: non_neg_integer(), %% Content handlers state. handler_state :: undefined | gun_content_handler:state() @@ -87,6 +86,10 @@ do_check_options([{keepalive, infinity}|Opts]) -> do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> do_check_options(Opts); %% @todo Add all http2_machine options. +do_check_options([{initial_connection_window_size, _}|Opts]) -> + do_check_options(Opts); +do_check_options([{initial_stream_window_size, _}|Opts]) -> + do_check_options(Opts); do_check_options([{max_frame_size_received, _}|Opts]) -> do_check_options(Opts); do_check_options([Opt|_]) -> @@ -94,7 +97,13 @@ do_check_options([Opt|_]) -> name() -> http2. -init(Owner, Socket, Transport, Opts) -> +init(Owner, Socket, Transport, Opts0) -> + %% We have different defaults than the protocol in order + %% to optimize for performance when receiving responses. + Opts = Opts0#{ + initial_connection_window_size => maps:get(initial_connection_window_size, Opts0, 8000000), + initial_stream_window_size => maps:get(initial_stream_window_size, Opts0, 8000000) + }, {ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts), Handlers = maps:get(content_handlers, Opts, [gun_data_h]), %% @todo Better validate the preface being received. @@ -209,62 +218,47 @@ maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) -> end, State. -lingering_data_frame(State=#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, DataLen) -> - Transport:send(Socket, cow_http2:window_update(DataLen)), - HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0), - State#http2_state{http2_machine=HTTP2Machine1}. +lingering_data_frame(State, _DataLen) -> + %% We only update the connection's window when receiving + %% a lingering data frame. + update_window(State). -data_frame(State=#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, StreamID, IsFin, Data, - EvHandler, EvHandlerState0) -> +data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0) -> Stream = #stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0, - flow_window=FlowWindow0, handler_state=Handlers0} = get_stream_by_id(State, StreamID), + handler_state=Handlers0} = get_stream_by_id(State0, StreamID), {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0), Flow = case Flow0 of infinity -> infinity; _ -> Flow0 - Dec end, - Size = byte_size(Data), - FlowWindow = if - IsFin =:= nofin, Flow =< 0 -> - FlowWindow0 + Size; - true -> - FlowWindow0 - end, - {HTTP2Machine, EvHandlerState} = case Size of + State1 = store_stream(State0, Stream#stream{flow=Flow, handler_state=Handlers}), + {State, EvHandlerState} = case byte_size(Data) of %% We do not send a WINDOW_UPDATE if the DATA frame was of size 0. 0 when IsFin =:= fin -> EvHandlerState1 = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState0), - {HTTP2Machine0, EvHandlerState1}; + {State1, EvHandlerState1}; 0 -> - {HTTP2Machine0, EvHandlerState0}; + {State1, EvHandlerState0}; _ -> - Transport:send(Socket, cow_http2:window_update(Size)), - HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0), %% We do not send a stream WINDOW_UPDATE when the flow control kicks in %% (it'll be sent when the flow recovers) or for the last DATA frame. case IsFin of nofin when Flow =< 0 -> - {HTTP2Machine1, EvHandlerState0}; + {update_window(State1), EvHandlerState0}; nofin -> - Transport:send(Socket, cow_http2:window_update(StreamID, Size)), - {cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1), - EvHandlerState0}; + {update_window(State1, StreamID), EvHandlerState0}; fin -> EvHandlerState1 = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState0), - {HTTP2Machine1, EvHandlerState1} + {update_window(State1), EvHandlerState1} end end, - {maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine}, - Stream#stream{flow=Flow, flow_window=FlowWindow, handler_state=Handlers}), StreamID, remote, IsFin), - EvHandlerState}. + {maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}. headers_frame(State=#http2_state{content_handlers=Handlers0}, StreamID, IsFin, Headers, PseudoHeaders, _BodyLen, @@ -378,21 +372,18 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error) end. -update_flow(State=#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, _ReplyTo, StreamRef, Inc) -> +update_flow(State, _ReplyTo, StreamRef, Inc) -> case get_stream_by_ref(State, StreamRef) of - Stream=#stream{id=StreamID, flow=Flow0, flow_window=FlowWindow} -> + Stream=#stream{id=StreamID, flow=Flow0} -> Flow = case Flow0 of infinity -> infinity; _ -> Flow0 + Inc end, if - %% Flow is active again, update the window. + %% Flow is active again, update the stream's window. Flow0 =< 0, Flow > 0 -> - Transport:send(Socket, cow_http2:window_update(StreamID, FlowWindow)), - HTTP2Machine = cow_http2_machine:update_window(StreamID, FlowWindow, HTTP2Machine0), - {state, store_stream(State#http2_state{http2_machine=HTTP2Machine}, - Stream#stream{flow=Flow, flow_window=0})}; + {state, update_window(store_stream(State, + Stream#stream{flow=Flow}), StreamID)}; true -> {state, store_stream(State, Stream#stream{flow=Flow})} end; @@ -400,6 +391,35 @@ update_flow(State=#http2_state{socket=Socket, transport=Transport, [] end. +%% Only update the connection's window. +update_window(State=#http2_state{socket=Socket, transport=Transport, + opts=#{initial_connection_window_size := ConnWindow}, http2_machine=HTTP2Machine0}) -> + case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of + ok -> + State; + {ok, Increment, HTTP2Machine} -> + Transport:send(Socket, cow_http2:window_update(Increment)), + State#http2_state{http2_machine=HTTP2Machine} + end. + +%% Update both the connection and the stream's window. +update_window(State=#http2_state{socket=Socket, transport=Transport, + opts=#{initial_connection_window_size := ConnWindow, initial_stream_window_size := StreamWindow}, + http2_machine=HTTP2Machine0}, StreamID) -> + {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of + ok -> {<<>>, HTTP2Machine0}; + {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1} + end, + {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamWindow, HTTP2Machine2) of + ok -> {<<>>, HTTP2Machine2}; + {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} + end, + case {Data1, Data2} of + {<<>>, <<>>} -> ok; + _ -> Transport:send(Socket, [Data1, Data2]) + end, + State#http2_state{http2_machine=HTTP2Machine}. + %% We may have to cancel streams even if we receive multiple %% GOAWAY frames as the LastStreamID value may be lower than %% the one previously received. |