From 611f9a9b78cab4005892e13dffb7a2c8e44580ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 2 Aug 2019 14:30:08 +0200 Subject: Add flow control Flow control is disabled by default. The initial flow value must be set to enable it (either for the entire connection or on a per-request basis). Flow applies to all HTTP streams as well as Websocket. HTTP/2 pushed streams receive the same value as their originating stream. --- src/gun.erl | 59 ++++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 11 deletions(-) (limited to 'src/gun.erl') diff --git a/src/gun.erl b/src/gun.erl index d758ffc..7b06aaf 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -79,6 +79,7 @@ -export([flush/1]). %% Streams. +-export([update_flow/3]). -export([cancel/2]). -export([stream_info/2]). @@ -158,6 +159,7 @@ %% This is of course not required for HTTP/1.1 since the CONNECT takes over %% the entire connection. -type req_opts() :: #{ + flow => pos_integer(), reply_to => pid() }. -export_type([req_opts/0]). @@ -177,6 +179,7 @@ %% @todo keepalive -type ws_opts() :: #{ compress => boolean(), + flow => pos_integer(), protocols => [{binary(), module()}] }. -export_type([ws_opts/0]). @@ -194,6 +197,7 @@ keepalive_ref :: undefined | reference(), socket :: undefined | inet:socket() | ssl:sslsocket() | pid(), transport :: module(), + active = true :: boolean(), messages :: {atom(), atom(), atom()}, protocol :: module(), protocol_state :: any(), @@ -458,9 +462,10 @@ headers(ServerPid, Method, Path, Headers) -> -spec headers(pid(), iodata(), iodata(), req_headers(), req_opts()) -> reference(). headers(ServerPid, Method, Path, Headers, ReqOpts) -> StreamRef = make_ref(), + InitialFlow = maps:get(flow, ReqOpts, infinity), ReplyTo = maps:get(reply_to, ReqOpts, self()), gen_statem:cast(ServerPid, {headers, ReplyTo, StreamRef, - Method, Path, normalize_headers(Headers)}), + Method, Path, normalize_headers(Headers), InitialFlow}), StreamRef. -spec request(pid(), iodata(), iodata(), req_headers(), iodata()) -> reference(). @@ -470,9 +475,10 @@ request(ServerPid, Method, Path, Headers, Body) -> -spec request(pid(), iodata(), iodata(), req_headers(), iodata(), req_opts()) -> reference(). request(ServerPid, Method, Path, Headers, Body, ReqOpts) -> StreamRef = make_ref(), + InitialFlow = maps:get(flow, ReqOpts, infinity), ReplyTo = maps:get(reply_to, ReqOpts, self()), gen_statem:cast(ServerPid, {request, ReplyTo, StreamRef, - Method, Path, normalize_headers(Headers), Body}), + Method, Path, normalize_headers(Headers), Body, InitialFlow}), StreamRef. normalize_headers([]) -> @@ -510,8 +516,10 @@ connect(ServerPid, Destination, Headers) -> -spec connect(pid(), connect_destination(), req_headers(), req_opts()) -> reference(). connect(ServerPid, Destination, Headers, ReqOpts) -> StreamRef = make_ref(), + InitialFlow = maps:get(flow, ReqOpts, infinity), ReplyTo = maps:get(reply_to, ReqOpts, self()), - gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef, Destination, Headers}), + gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef, + Destination, Headers, InitialFlow}), StreamRef. %% Awaiting gun messages. @@ -520,6 +528,7 @@ connect(ServerPid, Destination, Headers, ReqOpts) -> -type await_result() :: {inform, 100..199, resp_headers()} | {response, fin | nofin, non_neg_integer(), resp_headers()} | {data, fin | nofin, binary()} + | {sse, cow_sse:event() | fin} | {trailers, resp_headers()} | {push, reference(), binary(), binary(), resp_headers()} | {upgrade, [binary()], resp_headers()} @@ -551,6 +560,8 @@ await(ServerPid, StreamRef, Timeout, MRef) -> {response, IsFin, Status, Headers}; {gun_data, ServerPid, StreamRef, IsFin, Data} -> {data, IsFin, Data}; + {gun_sse, ServerPid, StreamRef, Event} -> + {sse, Event}; {gun_trailers, ServerPid, StreamRef, Trailers} -> {trailers, Trailers}; {gun_push, ServerPid, StreamRef, NewStreamRef, Method, URI, Headers} -> @@ -699,12 +710,20 @@ flush_ref(StreamRef) -> ok end. +%% Flow control. + +-spec update_flow(pid(), reference(), pos_integer()) -> ok. +update_flow(ServerPid, StreamRef, Flow) -> + gen_statem:cast(ServerPid, {update_flow, self(), StreamRef, Flow}). + %% Cancelling a stream. -spec cancel(pid(), reference()) -> ok. cancel(ServerPid, StreamRef) -> gen_statem:cast(ServerPid, {cancel, self(), StreamRef}). +%% Information about a stream. + -spec stream_info(pid(), reference()) -> {ok, map() | undefined} | {error, not_connected}. stream_info(ServerPid, StreamRef) -> gen_statem:call(ServerPid, {stream_info, StreamRef}). @@ -919,11 +938,16 @@ connected(internal, {connected, Socket, Protocol}, {keep_state, keepalive_timeout(active(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}))}; %% Socket events. -connected(info, {OK, Socket, Data}, State=#state{socket=Socket, messages={OK, _, _}, +connected(info, {OK, Socket, Data}, State0=#state{socket=Socket, messages={OK, _, _}, protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0), - commands(Commands, active(State#state{event_handler_state=EvHandlerState})); + case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of + {keep_state, State} -> + {keep_state, active(State)}; + Res -> + Res + end; connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) -> disconnect(State, closed); connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) -> @@ -936,21 +960,21 @@ connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoS ProtoState2 = Protocol:keepalive(ProtoState), {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})}; %% Public HTTP interface. -connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers}, +connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {ProtoState2, EvHandlerState} = Protocol:headers(ProtoState, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, - EvHandler, EvHandlerState0), + InitialFlow, EvHandler, EvHandlerState0), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; -connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body}, +connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {ProtoState2, EvHandlerState} = Protocol:request(ProtoState, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, - EvHandler, EvHandlerState0), + InitialFlow, EvHandler, EvHandlerState0), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; %% @todo Do we want to reject ReplyTo if it's not the process %% who initiated the connection? For both data and cancel. @@ -960,7 +984,7 @@ connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, {ProtoState2, EvHandlerState} = Protocol:data(ProtoState, StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; -connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers}, +connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow}, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> %% The protocol option has been deprecated in favor of the protocols option. %% Nobody probably ended up using it, but let's not break the interface. @@ -980,7 +1004,7 @@ connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers}, _ -> Destination1 end, - ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers), + ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers, InitialFlow), {keep_state, State#state{protocol_state=ProtoState2}}; %% When using gun_tls_proxy we need a separate message to know whether %% the handshake succeeded and whether we need to switch to a different protocol. @@ -1002,6 +1026,15 @@ connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent} error => Reason }, EvHandlerState0), commands([Error], State#state{event_handler_state=EvHandlerState}); +connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, State0=#state{ + protocol=Protocol, protocol_state=ProtoState}) -> + Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow), + case commands(Commands, State0) of + {keep_state, State} -> + {keep_state, active(State)}; + Res -> + Res + end; connected(cast, {cancel, ReplyTo, StreamRef}, State=#state{ protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> @@ -1092,6 +1125,8 @@ commands([close|_], State) -> disconnect(State, normal); commands([Error={error, _}|_], State) -> disconnect(State, Error); +commands([{active, Active}|Tail], State) when is_boolean(Active) -> + commands(Tail, State#state{active=Active}); commands([{state, ProtoState}|Tail], State) -> commands(Tail, State#state{protocol_state=ProtoState}); %% Order is important: the origin must be changed before @@ -1179,6 +1214,8 @@ disconnect_flush(State=#state{socket=Socket, messages={OK, Closed, Error}}) -> ok end. +active(State=#state{active=false}) -> + State; active(State=#state{socket=Socket, transport=Transport}) -> Transport:setopts(Socket, [{active, once}]), State. -- cgit v1.2.3