From 611f9a9b78cab4005892e13dffb7a2c8e44580ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 2 Aug 2019 14:30:08 +0200 Subject: Add flow control Flow control is disabled by default. The initial flow value must be set to enable it (either for the entire connection or on a per-request basis). Flow applies to all HTTP streams as well as Websocket. HTTP/2 pushed streams receive the same value as their originating stream. --- src/gun_http.erl | 137 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 88 insertions(+), 49 deletions(-) (limited to 'src/gun_http.erl') diff --git a/src/gun_http.erl b/src/gun_http.erl index 68f9e7d..ec268ad 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -18,12 +18,13 @@ -export([name/0]). -export([init/4]). -export([handle/4]). +-export([update_flow/4]). -export([close/4]). -export([keepalive/1]). --export([headers/10]). --export([request/11]). +-export([headers/11]). +-export([request/12]). -export([data/7]). --export([connect/5]). +-export([connect/6]). -export([cancel/5]). -export([stream_info/2]). -export([down/1]). @@ -43,6 +44,7 @@ -record(stream, { ref :: reference() | connect_info() | websocket_info(), reply_to :: pid(), + flow :: integer() | infinity, method :: binary(), is_alive :: boolean(), handler_state :: undefined | gun_content_handler:state() @@ -52,6 +54,7 @@ owner :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), + opts = #{} :: map(), %% @todo version = 'HTTP/1.1' :: cow_http:version(), content_handlers :: gun_content_handler:opt(), connection = keepalive :: keepalive | close, @@ -73,6 +76,8 @@ do_check_options([Opt={content_handlers, Handlers}|Opts]) -> ok -> do_check_options(Opts); error -> {error, {options, {http, Opt}}} end; +do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 -> + do_check_options(Opts); do_check_options([{keepalive, infinity}|Opts]) -> do_check_options(Opts); do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> @@ -87,10 +92,11 @@ do_check_options([Opt|_]) -> name() -> http. init(Owner, Socket, Transport, Opts) -> + %% @todo If we keep the opts we don't need to add these to the state. Version = maps:get(version, Opts, 'HTTP/1.1'), Handlers = maps:get(content_handlers, Opts, [gun_data_h]), TransformHeaderName = maps:get(transform_header_name, Opts, fun (N) -> N end), - #http_state{owner=Owner, socket=Socket, transport=Transport, version=Version, + #http_state{owner=Owner, socket=Socket, transport=Transport, opts=Opts, version=Version, content_handlers=Handlers, transform_header_name=TransformHeaderName}. %% Stop looping when we got no more data. @@ -120,7 +126,7 @@ handle(Data, State=#http_state{in=head, buffer=Buffer, end; %% Everything sent to the socket until it closes is part of the response body. handle(Data, State=#http_state{in=body_close}, _, EvHandlerState) -> - {{state, send_data_if_alive(Data, State, nofin)}, EvHandlerState}; + {send_data(Data, State, nofin), EvHandlerState}; %% Chunked transfer-encoding may contain both data and trailers. handle(Data, State=#http_state{in=body_chunked, in_state=InState, buffer=Buffer, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_], @@ -130,21 +136,15 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, more -> {{state, State#http_state{buffer=Buffer2}}, EvHandlerState0}; {more, Data2, InState2} -> - {{state, send_data_if_alive(Data2, - State#http_state{buffer= <<>>, in_state=InState2}, - nofin)}, EvHandlerState0}; + {send_data(Data2, State#http_state{buffer= <<>>, in_state=InState2}, nofin), EvHandlerState0}; {more, Data2, Length, InState2} when is_integer(Length) -> %% @todo See if we can recv faster than one message at a time. - {{state, send_data_if_alive(Data2, - State#http_state{buffer= <<>>, in_state=InState2}, - nofin)}, EvHandlerState0}; + {send_data(Data2, State#http_state{buffer= <<>>, in_state=InState2}, nofin), EvHandlerState0}; {more, Data2, Rest, InState2} -> %% @todo See if we can recv faster than one message at a time. - {{state, send_data_if_alive(Data2, - State#http_state{buffer=Rest, in_state=InState2}, - nofin)}, EvHandlerState0}; + {send_data(Data2, State#http_state{buffer=Rest, in_state=InState2}, nofin), EvHandlerState0}; {done, HasTrailers, Rest} -> - %% @todo response_end should be called AFTER send_data_if_alive + %% @todo response_end should be called AFTER send_data {IsFin, EvHandlerState} = case HasTrailers of trailers -> {nofin, EvHandlerState0}; @@ -156,7 +156,8 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, {fin, EvHandlerState1} end, %% I suppose it doesn't hurt to append an empty binary. - State1 = send_data_if_alive(<<>>, State, IsFin), + %% We ignore the active command because the stream ended. + [{state, State1}|_] = send_data(<<>>, State, IsFin), case {HasTrailers, Conn} of {trailers, _} -> handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}, EvHandler, EvHandlerState); @@ -166,7 +167,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, {[{state, end_stream(State1)}, close], EvHandlerState} end; {done, Data2, HasTrailers, Rest} -> - %% @todo response_end should be called AFTER send_data_if_alive + %% @todo response_end should be called AFTER send_data {IsFin, EvHandlerState} = case HasTrailers of trailers -> {nofin, EvHandlerState0}; @@ -177,7 +178,8 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, }, EvHandlerState0), {fin, EvHandlerState1} end, - State1 = send_data_if_alive(Data2, State, IsFin), + %% We ignore the active command because the stream ended. + [{state, State1}|_] = send_data(Data2, State, IsFin), case {HasTrailers, Conn} of {trailers, _} -> handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}, EvHandler, EvHandlerState); @@ -218,24 +220,24 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn, if %% More data coming. DataSize < Length -> - {{state, send_data_if_alive(Data, - State#http_state{in={body, Length - DataSize}}, - nofin)}, EvHandlerState0}; + {send_data(Data, State#http_state{in={body, Length - DataSize}}, nofin), EvHandlerState0}; %% Stream finished, no rest. DataSize =:= Length -> - State1 = send_data_if_alive(Data, State, fin), + %% We ignore the active command because the stream ended. + [{state, State1}|_] = send_data(Data, State, fin), EvHandlerState = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState0), case Conn of - keepalive -> {{state, end_stream(State1)}, EvHandlerState}; + keepalive -> {[{state, end_stream(State1)}, {active, true}], EvHandlerState}; close -> {[{state, end_stream(State1)}, close], EvHandlerState} end; %% Stream finished, rest. true -> << Body:Length/binary, Rest/bits >> = Data, - State1 = send_data_if_alive(Body, State, fin), + %% We ignore the active command because the stream ended. + [{state, State1}|_] = send_data(Body, State, fin), EvHandlerState = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo @@ -420,20 +422,48 @@ stream_ref({connect, StreamRef, _}) -> StreamRef; stream_ref({websocket, StreamRef, _, _, _}) -> StreamRef; stream_ref(StreamRef) -> StreamRef. -send_data_if_alive(<<>>, State, nofin) -> - State; +%% The state must be first in order to retrieve it when the stream ended. +send_data(<<>>, State, nofin) -> + [{state, State}, {active, true}]; %% @todo What if we receive data when the HEAD method was used? -send_data_if_alive(Data, State=#http_state{streams=[Stream=#stream{ - is_alive=true, handler_state=Handlers0}|Tail]}, IsFin) -> - Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), - State#http_state{streams=[Stream#stream{handler_state=Handlers}|Tail]}; -send_data_if_alive(_, State, _) -> - State. +send_data(Data, State=#http_state{streams=[Stream=#stream{ + flow=Flow0, is_alive=true, handler_state=Handlers0}|Tail]}, IsFin) -> + {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0), + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 - Dec + end, + [ + {state, State#http_state{streams=[Stream#stream{flow=Flow, handler_state=Handlers}|Tail]}}, + {active, Flow > 0} + ]; +send_data(_, State, _) -> + [{state, State}, {active, true}]. + +%% We only update the active state when the current stream is being updated. +update_flow(State=#http_state{streams=[Stream=#stream{ref=StreamRef, flow=Flow0}|Tail]}, + _ReplyTo, StreamRef, Inc) -> + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 + Inc + end, + [ + {state, State#http_state{streams=[Stream#stream{flow=Flow}|Tail]}}, + {active, Flow > 0} + ]; +update_flow(State=#http_state{streams=Streams0}, _ReplyTo, StreamRef, Inc) -> + Streams = [case Ref of + StreamRef when Flow =/= infinity -> + Tuple#stream{flow=Flow + Inc}; + _ -> + Tuple + end || Tuple = #stream{ref=Ref, flow=Flow} <- Streams0], + {state, State#http_state{streams=Streams}}. %% @todo Use Reason. close(_, State=#http_state{in=body_close, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]}, EvHandler, EvHandlerState0) -> - _ = send_data_if_alive(<<>>, State, fin), + _ = send_data(<<>>, State, fin), EvHandlerState = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo @@ -461,23 +491,29 @@ keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) -> keepalive(State) -> State. -headers(State=#http_state{out=head}, +headers(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, - EvHandler, EvHandlerState0) -> + InitialFlow0, EvHandler, EvHandlerState0) -> {Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, undefined, EvHandler, EvHandlerState0, ?FUNCTION_NAME), - {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method), + InitialFlow = initial_flow(InitialFlow0, Opts), + {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method, InitialFlow), EvHandlerState}. -request(State=#http_state{out=head}, StreamRef, ReplyTo, - Method, Host, Port, Path, Headers, Body, EvHandler, EvHandlerState0) -> +request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, + Method, Host, Port, Path, Headers, Body, + InitialFlow0, EvHandler, EvHandlerState0) -> {Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, EvHandler, EvHandlerState0, ?FUNCTION_NAME), - {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method), + InitialFlow = initial_flow(InitialFlow0, Opts), + {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method, InitialFlow), EvHandlerState}. +initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow; +initial_flow(InitialFlow, _) -> InitialFlow. + send_request(State=#http_state{socket=Socket, transport=Transport, version=Version}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body, EvHandler, EvHandlerState0, Function) -> @@ -602,12 +638,12 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} end. -connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _) when Streams =/= [] -> +connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _) when Streams =/= [] -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}, State; -connect(State=#http_state{socket=Socket, transport=Transport, version=Version}, - StreamRef, ReplyTo, Destination=#{host := Host0}, Headers0) -> +connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version}, + StreamRef, ReplyTo, Destination=#{host := Host0}, Headers0, InitialFlow0) -> Host = case Host0 of Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); _ -> Host0 @@ -634,7 +670,8 @@ connect(State=#http_state{socket=Socket, transport=Transport, version=Version}, Transport:send(Socket, [ cow_http:request(<<"CONNECT">>, Authority, Version, Headers) ]), - new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>). + InitialFlow = initial_flow(InitialFlow0, Opts), + new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>, InitialFlow). %% We can't cancel anything, we can just stop forwarding messages to the owner. cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> @@ -735,9 +772,9 @@ response_io_from_headers(_, Version, _Status, Headers) -> %% Streams. -new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method) -> +new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method, InitialFlow) -> State#http_state{streams=Streams - ++ [#stream{ref=StreamRef, reply_to=ReplyTo, + ++ [#stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, method=iolist_to_binary(Method), is_alive=true}]}. is_stream(#http_state{streams=Streams}, StreamRef) -> @@ -787,8 +824,9 @@ ws_upgrade(State=#http_state{owner=ReplyTo, out=head}, {Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo, <<"GET">>, Host, Port, Path, Headers, undefined, EvHandler, EvHandlerState0, ?FUNCTION_NAME), + InitialFlow = maps:get(flow, WsOpts, infinity), {new_stream(State#http_state{connection=Conn, out=Out}, - {websocket, StreamRef, Key, GunExtensions, WsOpts}, ReplyTo, <<"GET">>), + {websocket, StreamRef, Key, GunExtensions, WsOpts}, ReplyTo, <<"GET">>, InitialFlow), EvHandlerState}. ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) -> @@ -853,8 +891,9 @@ ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) -> end end. -ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport}, - StreamRef, Headers, Extensions, Handler, Opts) -> +%% We know that the most recent stream is the Websocket one. +ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport, + streams=[#stream{flow=InitialFlow}|_]}, StreamRef, Headers, Extensions, Handler, Opts) -> %% Send ourselves the remaining buffer, if any. _ = case Buffer of <<>> -> @@ -863,4 +902,4 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans {OK, _, _} = Transport:messages(), self() ! {OK, Socket, Buffer} end, - gun_ws:init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts). + gun_ws:init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Handler, Opts). -- cgit v1.2.3