From 611f9a9b78cab4005892e13dffb7a2c8e44580ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 2 Aug 2019 14:30:08 +0200 Subject: Add flow control Flow control is disabled by default. The initial flow value must be set to enable it (either for the entire connection or on a per-request basis). Flow applies to all HTTP streams as well as Websocket. HTTP/2 pushed streams receive the same value as their originating stream. --- doc/src/manual/gun.asciidoc | 28 +++ doc/src/manual/gun.update_flow.asciidoc | 67 +++++++ src/gun.erl | 59 ++++-- src/gun_content_handler.erl | 21 ++- src/gun_data_h.erl | 4 +- src/gun_http.erl | 137 +++++++++----- src/gun_http2.erl | 82 ++++++-- src/gun_sse_h.erl | 18 +- src/gun_ws.erl | 54 ++++-- src/gun_ws_h.erl | 8 +- test/flow_SUITE.erl | 325 ++++++++++++++++++++++++++++++++ test/handlers/sse_clock_h.erl | 7 +- test/sse_SUITE.erl | 2 +- 13 files changed, 700 insertions(+), 112 deletions(-) create mode 100644 doc/src/manual/gun.update_flow.asciidoc create mode 100644 test/flow_SUITE.erl diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc index da5548b..acc1454 100644 --- a/doc/src/manual/gun.asciidoc +++ b/doc/src/manual/gun.asciidoc @@ -46,6 +46,7 @@ Messages: Streams: +* link:man:gun:update_flow(3)[gun:update_flow(3)] - Update a stream's flow control value * link:man:gun:cancel(3)[gun:cancel(3)] - Cancel the given stream * link:man:gun:stream_info(3)[gun:stream_info(3)] - Obtain information about a stream @@ -143,6 +144,7 @@ Handshake timeout for tunneled TLS connections. [source,erlang] ---- http_opts() :: #{ + flow => pos_integer(), keepalive => timeout(), transform_header_name => fun((binary()) -> binary()), version => 'HTTP/1.1' | 'HTTP/1.0' @@ -155,6 +157,11 @@ The default value is given next to the option name: // @todo Document content_handlers and gun_sse_h. +flow - see below:: + +The initial flow control value for all HTTP/1.1 streams. +By default flow control is disabled. + keepalive (infinity):: Time between pings in milliseconds. Since the HTTP protocol has @@ -181,6 +188,7 @@ HTTP version to use. [source,erlang] ---- http2_opts() :: #{ + flow => pos_integer(), keepalive => timeout() } ---- @@ -191,6 +199,11 @@ The default value is given next to the option name: // @todo Document content_handlers and gun_sse_h. +flow - see below:: + +The initial flow control value for all HTTP/2 streams. +By default flow control is disabled. + keepalive (5000):: Time between pings in milliseconds. @@ -328,6 +341,7 @@ Request headers. [source,erlang] ---- req_opts() :: #{ + flow => pos_integer(), reply_to => pid() } ---- @@ -336,6 +350,11 @@ Configuration for a particular request. The default value is given next to the option name: +flow - see below:: + +The initial flow control value for the stream. By default +flow control is disabled. + reply_to (`self()`):: The pid of the process that will receive the response messages. @@ -346,6 +365,7 @@ The pid of the process that will receive the response messages. ---- ws_opts() :: #{ compress => boolean(), + flow => pos_integer(), protocols => [{binary(), module()}] } ---- @@ -360,6 +380,11 @@ Whether to enable permessage-deflate compression. This does not guarantee that compression will be used as it is the server that ultimately decides. Defaults to false. +flow - see below:: + +The initial flow control value for the Websocket connection. +By default flow control is disabled. + protocols ([]):: A non-empty list enables Websocket protocol negotiation. The @@ -378,6 +403,9 @@ undocumented and must be set to `gun_ws_h`. implement different reconnect strategies. * *2.0*: The `transport_opts` option has been split into two options: `tcp_opts` and `tls_opts`. +* *2.0*: Function `gun:update_flow/3` introduced. The `flow` + option was added to request options and HTTP/1.1, + HTTP/2 and Websocket options as well. * *2.0*: Introduce the type `req_headers()` and extend the types accepted for header names for greater interoperability. Header names are automatically diff --git a/doc/src/manual/gun.update_flow.asciidoc b/doc/src/manual/gun.update_flow.asciidoc new file mode 100644 index 0000000..c7990f2 --- /dev/null +++ b/doc/src/manual/gun.update_flow.asciidoc @@ -0,0 +1,67 @@ += gun:update_flow(3) + +== Name + +gun:update_flow - Update a stream's flow control value + +== Description + +[source,erlang] +---- +update_flow(ConnPid, StreamRef, Flow) -> ok + +ConnPid :: pid() +StreamRef :: reference() +Flow :: pos_integer() +---- + +Update a stream's flow control value. + +The flow value can only ever be incremented. + +This function does nothing for streams that have flow +control disabled (which is the default). + +== Arguments + +ConnPid:: + +The pid of the Gun connection process. + +StreamRef:: + +Identifier of the stream for the original request. + +Flow:: + +Flow control value increment. + +== Return value + +The atom `ok` is returned. + +== Changelog + +* *2.0*: Function introduced. + +== Examples + +.Update a stream's flow control value +[source,erlang] +---- +gun:update_flow(ConnPid, StreamRef, 10). +---- + +== See also + +link:man:gun(3)[gun(3)], +link:man:gun:get(3)[gun:get(3)], +link:man:gun:head(3)[gun:head(3)], +link:man:gun:options(3)[gun:options(3)], +link:man:gun:patch(3)[gun:patch(3)], +link:man:gun:post(3)[gun:post(3)], +link:man:gun:put(3)[gun:put(3)], +link:man:gun:delete(3)[gun:delete(3)], +link:man:gun:headers(3)[gun:headers(3)], +link:man:gun:request(3)[gun:request(3)], +link:man:gun:ws_upgrade(3)[gun:ws_upgrade(3)] diff --git a/src/gun.erl b/src/gun.erl index d758ffc..7b06aaf 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -79,6 +79,7 @@ -export([flush/1]). %% Streams. +-export([update_flow/3]). -export([cancel/2]). -export([stream_info/2]). @@ -158,6 +159,7 @@ %% This is of course not required for HTTP/1.1 since the CONNECT takes over %% the entire connection. -type req_opts() :: #{ + flow => pos_integer(), reply_to => pid() }. -export_type([req_opts/0]). @@ -177,6 +179,7 @@ %% @todo keepalive -type ws_opts() :: #{ compress => boolean(), + flow => pos_integer(), protocols => [{binary(), module()}] }. -export_type([ws_opts/0]). @@ -194,6 +197,7 @@ keepalive_ref :: undefined | reference(), socket :: undefined | inet:socket() | ssl:sslsocket() | pid(), transport :: module(), + active = true :: boolean(), messages :: {atom(), atom(), atom()}, protocol :: module(), protocol_state :: any(), @@ -458,9 +462,10 @@ headers(ServerPid, Method, Path, Headers) -> -spec headers(pid(), iodata(), iodata(), req_headers(), req_opts()) -> reference(). headers(ServerPid, Method, Path, Headers, ReqOpts) -> StreamRef = make_ref(), + InitialFlow = maps:get(flow, ReqOpts, infinity), ReplyTo = maps:get(reply_to, ReqOpts, self()), gen_statem:cast(ServerPid, {headers, ReplyTo, StreamRef, - Method, Path, normalize_headers(Headers)}), + Method, Path, normalize_headers(Headers), InitialFlow}), StreamRef. -spec request(pid(), iodata(), iodata(), req_headers(), iodata()) -> reference(). @@ -470,9 +475,10 @@ request(ServerPid, Method, Path, Headers, Body) -> -spec request(pid(), iodata(), iodata(), req_headers(), iodata(), req_opts()) -> reference(). request(ServerPid, Method, Path, Headers, Body, ReqOpts) -> StreamRef = make_ref(), + InitialFlow = maps:get(flow, ReqOpts, infinity), ReplyTo = maps:get(reply_to, ReqOpts, self()), gen_statem:cast(ServerPid, {request, ReplyTo, StreamRef, - Method, Path, normalize_headers(Headers), Body}), + Method, Path, normalize_headers(Headers), Body, InitialFlow}), StreamRef. normalize_headers([]) -> @@ -510,8 +516,10 @@ connect(ServerPid, Destination, Headers) -> -spec connect(pid(), connect_destination(), req_headers(), req_opts()) -> reference(). connect(ServerPid, Destination, Headers, ReqOpts) -> StreamRef = make_ref(), + InitialFlow = maps:get(flow, ReqOpts, infinity), ReplyTo = maps:get(reply_to, ReqOpts, self()), - gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef, Destination, Headers}), + gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef, + Destination, Headers, InitialFlow}), StreamRef. %% Awaiting gun messages. @@ -520,6 +528,7 @@ connect(ServerPid, Destination, Headers, ReqOpts) -> -type await_result() :: {inform, 100..199, resp_headers()} | {response, fin | nofin, non_neg_integer(), resp_headers()} | {data, fin | nofin, binary()} + | {sse, cow_sse:event() | fin} | {trailers, resp_headers()} | {push, reference(), binary(), binary(), resp_headers()} | {upgrade, [binary()], resp_headers()} @@ -551,6 +560,8 @@ await(ServerPid, StreamRef, Timeout, MRef) -> {response, IsFin, Status, Headers}; {gun_data, ServerPid, StreamRef, IsFin, Data} -> {data, IsFin, Data}; + {gun_sse, ServerPid, StreamRef, Event} -> + {sse, Event}; {gun_trailers, ServerPid, StreamRef, Trailers} -> {trailers, Trailers}; {gun_push, ServerPid, StreamRef, NewStreamRef, Method, URI, Headers} -> @@ -699,12 +710,20 @@ flush_ref(StreamRef) -> ok end. +%% Flow control. + +-spec update_flow(pid(), reference(), pos_integer()) -> ok. +update_flow(ServerPid, StreamRef, Flow) -> + gen_statem:cast(ServerPid, {update_flow, self(), StreamRef, Flow}). + %% Cancelling a stream. -spec cancel(pid(), reference()) -> ok. cancel(ServerPid, StreamRef) -> gen_statem:cast(ServerPid, {cancel, self(), StreamRef}). +%% Information about a stream. + -spec stream_info(pid(), reference()) -> {ok, map() | undefined} | {error, not_connected}. stream_info(ServerPid, StreamRef) -> gen_statem:call(ServerPid, {stream_info, StreamRef}). @@ -919,11 +938,16 @@ connected(internal, {connected, Socket, Protocol}, {keep_state, keepalive_timeout(active(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}))}; %% Socket events. -connected(info, {OK, Socket, Data}, State=#state{socket=Socket, messages={OK, _, _}, +connected(info, {OK, Socket, Data}, State0=#state{socket=Socket, messages={OK, _, _}, protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0), - commands(Commands, active(State#state{event_handler_state=EvHandlerState})); + case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of + {keep_state, State} -> + {keep_state, active(State)}; + Res -> + Res + end; connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) -> disconnect(State, closed); connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) -> @@ -936,21 +960,21 @@ connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoS ProtoState2 = Protocol:keepalive(ProtoState), {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})}; %% Public HTTP interface. -connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers}, +connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {ProtoState2, EvHandlerState} = Protocol:headers(ProtoState, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, - EvHandler, EvHandlerState0), + InitialFlow, EvHandler, EvHandlerState0), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; -connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body}, +connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {ProtoState2, EvHandlerState} = Protocol:request(ProtoState, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, - EvHandler, EvHandlerState0), + InitialFlow, EvHandler, EvHandlerState0), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; %% @todo Do we want to reject ReplyTo if it's not the process %% who initiated the connection? For both data and cancel. @@ -960,7 +984,7 @@ connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, {ProtoState2, EvHandlerState} = Protocol:data(ProtoState, StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; -connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers}, +connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow}, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> %% The protocol option has been deprecated in favor of the protocols option. %% Nobody probably ended up using it, but let's not break the interface. @@ -980,7 +1004,7 @@ connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers}, _ -> Destination1 end, - ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers), + ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers, InitialFlow), {keep_state, State#state{protocol_state=ProtoState2}}; %% When using gun_tls_proxy we need a separate message to know whether %% the handshake succeeded and whether we need to switch to a different protocol. @@ -1002,6 +1026,15 @@ connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent} error => Reason }, EvHandlerState0), commands([Error], State#state{event_handler_state=EvHandlerState}); +connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, State0=#state{ + protocol=Protocol, protocol_state=ProtoState}) -> + Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow), + case commands(Commands, State0) of + {keep_state, State} -> + {keep_state, active(State)}; + Res -> + Res + end; connected(cast, {cancel, ReplyTo, StreamRef}, State=#state{ protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> @@ -1092,6 +1125,8 @@ commands([close|_], State) -> disconnect(State, normal); commands([Error={error, _}|_], State) -> disconnect(State, Error); +commands([{active, Active}|Tail], State) when is_boolean(Active) -> + commands(Tail, State#state{active=Active}); commands([{state, ProtoState}|Tail], State) -> commands(Tail, State#state{protocol_state=ProtoState}); %% Order is important: the origin must be changed before @@ -1179,6 +1214,8 @@ disconnect_flush(State=#state{socket=Socket, messages={OK, Closed, Error}}) -> ok end. +active(State=#state{active=false}) -> + State; active(State=#state{socket=Socket, transport=Transport}) -> Transport:setopts(Socket, [{active, once}]), State. diff --git a/src/gun_content_handler.erl b/src/gun_content_handler.erl index 78e5e9d..fa180d8 100644 --- a/src/gun_content_handler.erl +++ b/src/gun_content_handler.erl @@ -28,7 +28,9 @@ cow_http:headers(), map()) -> {ok, any()} | disable. %% @todo Make fin | nofin its own type. -callback handle(fin | nofin, any(), State) - -> {ok, any(), State} | {done, State} when State::any(). + -> {ok, any(), non_neg_integer(), State} + | {done, non_neg_integer(), State} + when State::any(). -spec init(pid(), any(), cow_http:status(), cow_http:headers(), State) -> State when State::state(). @@ -44,13 +46,18 @@ init(ReplyTo, StreamRef, Status, Headers, [Handler|Tail]) -> disable -> init(ReplyTo, StreamRef, Status, Headers, Tail) end. --spec handle(fin | nofin, any(), State) -> State when State::state(). -handle(_, _, []) -> - []; -handle(IsFin, Data0, [{Mod, State0}|Tail]) -> +-spec handle(fin | nofin, any(), State) -> {ok, non_neg_integer(), State} when State::state(). +handle(IsFin, Data, State) -> + handle(IsFin, Data, State, 0, []). + +handle(_, _, [], Flow, Acc) -> + {ok, Flow, lists:reverse(Acc)}; +handle(IsFin, Data0, [{Mod, State0}|Tail], Flow, Acc) -> case Mod:handle(IsFin, Data0, State0) of - {ok, Data, State} -> [{Mod, State}|handle(IsFin, Data, Tail)]; - {done, State} -> [{Mod, State}|Tail] + {ok, Data, Inc, State} -> + handle(IsFin, Data, Tail, Flow + Inc, [{Mod, State}|Acc]); + {done, Inc, State} -> + {ok, Flow + Inc, lists:reverse([{Mod, State}|Acc], Tail)} end. -spec check_option(list()) -> ok | error. diff --git a/src/gun_data_h.erl b/src/gun_data_h.erl index af2c2cb..d1f8787 100644 --- a/src/gun_data_h.erl +++ b/src/gun_data_h.erl @@ -27,7 +27,7 @@ init(ReplyTo, StreamRef, _, _, _) -> {ok, #state{reply_to=ReplyTo, stream_ref=StreamRef}}. --spec handle(fin | nofin, binary(), State) -> {done, State} when State::#state{}. +-spec handle(fin | nofin, binary(), State) -> {done, 1, State} when State::#state{}. handle(IsFin, Data, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) -> ReplyTo ! {gun_data, self(), StreamRef, IsFin, Data}, - {done, State}. + {done, 1, State}. diff --git a/src/gun_http.erl b/src/gun_http.erl index 68f9e7d..ec268ad 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -18,12 +18,13 @@ -export([name/0]). -export([init/4]). -export([handle/4]). +-export([update_flow/4]). -export([close/4]). -export([keepalive/1]). --export([headers/10]). --export([request/11]). +-export([headers/11]). +-export([request/12]). -export([data/7]). --export([connect/5]). +-export([connect/6]). -export([cancel/5]). -export([stream_info/2]). -export([down/1]). @@ -43,6 +44,7 @@ -record(stream, { ref :: reference() | connect_info() | websocket_info(), reply_to :: pid(), + flow :: integer() | infinity, method :: binary(), is_alive :: boolean(), handler_state :: undefined | gun_content_handler:state() @@ -52,6 +54,7 @@ owner :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), + opts = #{} :: map(), %% @todo version = 'HTTP/1.1' :: cow_http:version(), content_handlers :: gun_content_handler:opt(), connection = keepalive :: keepalive | close, @@ -73,6 +76,8 @@ do_check_options([Opt={content_handlers, Handlers}|Opts]) -> ok -> do_check_options(Opts); error -> {error, {options, {http, Opt}}} end; +do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 -> + do_check_options(Opts); do_check_options([{keepalive, infinity}|Opts]) -> do_check_options(Opts); do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> @@ -87,10 +92,11 @@ do_check_options([Opt|_]) -> name() -> http. init(Owner, Socket, Transport, Opts) -> + %% @todo If we keep the opts we don't need to add these to the state. Version = maps:get(version, Opts, 'HTTP/1.1'), Handlers = maps:get(content_handlers, Opts, [gun_data_h]), TransformHeaderName = maps:get(transform_header_name, Opts, fun (N) -> N end), - #http_state{owner=Owner, socket=Socket, transport=Transport, version=Version, + #http_state{owner=Owner, socket=Socket, transport=Transport, opts=Opts, version=Version, content_handlers=Handlers, transform_header_name=TransformHeaderName}. %% Stop looping when we got no more data. @@ -120,7 +126,7 @@ handle(Data, State=#http_state{in=head, buffer=Buffer, end; %% Everything sent to the socket until it closes is part of the response body. handle(Data, State=#http_state{in=body_close}, _, EvHandlerState) -> - {{state, send_data_if_alive(Data, State, nofin)}, EvHandlerState}; + {send_data(Data, State, nofin), EvHandlerState}; %% Chunked transfer-encoding may contain both data and trailers. handle(Data, State=#http_state{in=body_chunked, in_state=InState, buffer=Buffer, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_], @@ -130,21 +136,15 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, more -> {{state, State#http_state{buffer=Buffer2}}, EvHandlerState0}; {more, Data2, InState2} -> - {{state, send_data_if_alive(Data2, - State#http_state{buffer= <<>>, in_state=InState2}, - nofin)}, EvHandlerState0}; + {send_data(Data2, State#http_state{buffer= <<>>, in_state=InState2}, nofin), EvHandlerState0}; {more, Data2, Length, InState2} when is_integer(Length) -> %% @todo See if we can recv faster than one message at a time. - {{state, send_data_if_alive(Data2, - State#http_state{buffer= <<>>, in_state=InState2}, - nofin)}, EvHandlerState0}; + {send_data(Data2, State#http_state{buffer= <<>>, in_state=InState2}, nofin), EvHandlerState0}; {more, Data2, Rest, InState2} -> %% @todo See if we can recv faster than one message at a time. - {{state, send_data_if_alive(Data2, - State#http_state{buffer=Rest, in_state=InState2}, - nofin)}, EvHandlerState0}; + {send_data(Data2, State#http_state{buffer=Rest, in_state=InState2}, nofin), EvHandlerState0}; {done, HasTrailers, Rest} -> - %% @todo response_end should be called AFTER send_data_if_alive + %% @todo response_end should be called AFTER send_data {IsFin, EvHandlerState} = case HasTrailers of trailers -> {nofin, EvHandlerState0}; @@ -156,7 +156,8 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, {fin, EvHandlerState1} end, %% I suppose it doesn't hurt to append an empty binary. - State1 = send_data_if_alive(<<>>, State, IsFin), + %% We ignore the active command because the stream ended. + [{state, State1}|_] = send_data(<<>>, State, IsFin), case {HasTrailers, Conn} of {trailers, _} -> handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}, EvHandler, EvHandlerState); @@ -166,7 +167,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, {[{state, end_stream(State1)}, close], EvHandlerState} end; {done, Data2, HasTrailers, Rest} -> - %% @todo response_end should be called AFTER send_data_if_alive + %% @todo response_end should be called AFTER send_data {IsFin, EvHandlerState} = case HasTrailers of trailers -> {nofin, EvHandlerState0}; @@ -177,7 +178,8 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, }, EvHandlerState0), {fin, EvHandlerState1} end, - State1 = send_data_if_alive(Data2, State, IsFin), + %% We ignore the active command because the stream ended. + [{state, State1}|_] = send_data(Data2, State, IsFin), case {HasTrailers, Conn} of {trailers, _} -> handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}, EvHandler, EvHandlerState); @@ -218,24 +220,24 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn, if %% More data coming. DataSize < Length -> - {{state, send_data_if_alive(Data, - State#http_state{in={body, Length - DataSize}}, - nofin)}, EvHandlerState0}; + {send_data(Data, State#http_state{in={body, Length - DataSize}}, nofin), EvHandlerState0}; %% Stream finished, no rest. DataSize =:= Length -> - State1 = send_data_if_alive(Data, State, fin), + %% We ignore the active command because the stream ended. + [{state, State1}|_] = send_data(Data, State, fin), EvHandlerState = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState0), case Conn of - keepalive -> {{state, end_stream(State1)}, EvHandlerState}; + keepalive -> {[{state, end_stream(State1)}, {active, true}], EvHandlerState}; close -> {[{state, end_stream(State1)}, close], EvHandlerState} end; %% Stream finished, rest. true -> << Body:Length/binary, Rest/bits >> = Data, - State1 = send_data_if_alive(Body, State, fin), + %% We ignore the active command because the stream ended. + [{state, State1}|_] = send_data(Body, State, fin), EvHandlerState = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo @@ -420,20 +422,48 @@ stream_ref({connect, StreamRef, _}) -> StreamRef; stream_ref({websocket, StreamRef, _, _, _}) -> StreamRef; stream_ref(StreamRef) -> StreamRef. -send_data_if_alive(<<>>, State, nofin) -> - State; +%% The state must be first in order to retrieve it when the stream ended. +send_data(<<>>, State, nofin) -> + [{state, State}, {active, true}]; %% @todo What if we receive data when the HEAD method was used? -send_data_if_alive(Data, State=#http_state{streams=[Stream=#stream{ - is_alive=true, handler_state=Handlers0}|Tail]}, IsFin) -> - Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), - State#http_state{streams=[Stream#stream{handler_state=Handlers}|Tail]}; -send_data_if_alive(_, State, _) -> - State. +send_data(Data, State=#http_state{streams=[Stream=#stream{ + flow=Flow0, is_alive=true, handler_state=Handlers0}|Tail]}, IsFin) -> + {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0), + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 - Dec + end, + [ + {state, State#http_state{streams=[Stream#stream{flow=Flow, handler_state=Handlers}|Tail]}}, + {active, Flow > 0} + ]; +send_data(_, State, _) -> + [{state, State}, {active, true}]. + +%% We only update the active state when the current stream is being updated. +update_flow(State=#http_state{streams=[Stream=#stream{ref=StreamRef, flow=Flow0}|Tail]}, + _ReplyTo, StreamRef, Inc) -> + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 + Inc + end, + [ + {state, State#http_state{streams=[Stream#stream{flow=Flow}|Tail]}}, + {active, Flow > 0} + ]; +update_flow(State=#http_state{streams=Streams0}, _ReplyTo, StreamRef, Inc) -> + Streams = [case Ref of + StreamRef when Flow =/= infinity -> + Tuple#stream{flow=Flow + Inc}; + _ -> + Tuple + end || Tuple = #stream{ref=Ref, flow=Flow} <- Streams0], + {state, State#http_state{streams=Streams}}. %% @todo Use Reason. close(_, State=#http_state{in=body_close, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]}, EvHandler, EvHandlerState0) -> - _ = send_data_if_alive(<<>>, State, fin), + _ = send_data(<<>>, State, fin), EvHandlerState = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo @@ -461,23 +491,29 @@ keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) -> keepalive(State) -> State. -headers(State=#http_state{out=head}, +headers(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, - EvHandler, EvHandlerState0) -> + InitialFlow0, EvHandler, EvHandlerState0) -> {Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, undefined, EvHandler, EvHandlerState0, ?FUNCTION_NAME), - {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method), + InitialFlow = initial_flow(InitialFlow0, Opts), + {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method, InitialFlow), EvHandlerState}. -request(State=#http_state{out=head}, StreamRef, ReplyTo, - Method, Host, Port, Path, Headers, Body, EvHandler, EvHandlerState0) -> +request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, + Method, Host, Port, Path, Headers, Body, + InitialFlow0, EvHandler, EvHandlerState0) -> {Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, EvHandler, EvHandlerState0, ?FUNCTION_NAME), - {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method), + InitialFlow = initial_flow(InitialFlow0, Opts), + {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method, InitialFlow), EvHandlerState}. +initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow; +initial_flow(InitialFlow, _) -> InitialFlow. + send_request(State=#http_state{socket=Socket, transport=Transport, version=Version}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body, EvHandler, EvHandlerState0, Function) -> @@ -602,12 +638,12 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} end. -connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _) when Streams =/= [] -> +connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _) when Streams =/= [] -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}, State; -connect(State=#http_state{socket=Socket, transport=Transport, version=Version}, - StreamRef, ReplyTo, Destination=#{host := Host0}, Headers0) -> +connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version}, + StreamRef, ReplyTo, Destination=#{host := Host0}, Headers0, InitialFlow0) -> Host = case Host0 of Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); _ -> Host0 @@ -634,7 +670,8 @@ connect(State=#http_state{socket=Socket, transport=Transport, version=Version}, Transport:send(Socket, [ cow_http:request(<<"CONNECT">>, Authority, Version, Headers) ]), - new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>). + InitialFlow = initial_flow(InitialFlow0, Opts), + new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>, InitialFlow). %% We can't cancel anything, we can just stop forwarding messages to the owner. cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> @@ -735,9 +772,9 @@ response_io_from_headers(_, Version, _Status, Headers) -> %% Streams. -new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method) -> +new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method, InitialFlow) -> State#http_state{streams=Streams - ++ [#stream{ref=StreamRef, reply_to=ReplyTo, + ++ [#stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, method=iolist_to_binary(Method), is_alive=true}]}. is_stream(#http_state{streams=Streams}, StreamRef) -> @@ -787,8 +824,9 @@ ws_upgrade(State=#http_state{owner=ReplyTo, out=head}, {Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo, <<"GET">>, Host, Port, Path, Headers, undefined, EvHandler, EvHandlerState0, ?FUNCTION_NAME), + InitialFlow = maps:get(flow, WsOpts, infinity), {new_stream(State#http_state{connection=Conn, out=Out}, - {websocket, StreamRef, Key, GunExtensions, WsOpts}, ReplyTo, <<"GET">>), + {websocket, StreamRef, Key, GunExtensions, WsOpts}, ReplyTo, <<"GET">>, InitialFlow), EvHandlerState}. ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) -> @@ -853,8 +891,9 @@ ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) -> end end. -ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport}, - StreamRef, Headers, Extensions, Handler, Opts) -> +%% We know that the most recent stream is the Websocket one. +ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport, + streams=[#stream{flow=InitialFlow}|_]}, StreamRef, Headers, Extensions, Handler, Opts) -> %% Send ourselves the remaining buffer, if any. _ = case Buffer of <<>> -> @@ -863,4 +902,4 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans {OK, _, _} = Transport:messages(), self() ! {OK, Socket, Buffer} end, - gun_ws:init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts). + gun_ws:init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Handler, Opts). diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 20e21ec..3b3b79b 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -18,10 +18,11 @@ -export([name/0]). -export([init/4]). -export([handle/4]). +-export([update_flow/4]). -export([close/4]). -export([keepalive/1]). --export([headers/10]). --export([request/11]). +-export([headers/11]). +-export([request/12]). -export([data/7]). -export([cancel/5]). -export([stream_info/2]). @@ -36,6 +37,10 @@ %% Process to send messages to. reply_to :: pid(), + %% Flow control. + flow :: integer() | infinity, + flow_window = 0 :: non_neg_integer(), + %% Content handlers state. handler_state :: undefined | gun_content_handler:state() }). @@ -66,10 +71,15 @@ do_check_options([Opt={content_handlers, Handlers}|Opts]) -> ok -> do_check_options(Opts); error -> {error, {options, {http2, Opt}}} end; +do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 -> + do_check_options(Opts); do_check_options([{keepalive, infinity}|Opts]) -> do_check_options(Opts); do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> do_check_options(Opts); +%% @todo Add all http2_machine options. +do_check_options([{max_frame_size_received, _}|Opts]) -> + do_check_options(Opts); do_check_options([Opt|_]) -> {error, {options, {http2, Opt}}}. @@ -192,10 +202,20 @@ lingering_data_frame(State=#http2_state{socket=Socket, transport=Transport, data_frame(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, IsFin, Data, EvHandler, EvHandlerState0) -> - Stream = #stream{ref=StreamRef, reply_to=ReplyTo, - handler_state=Handlers0} = get_stream_by_id(State, StreamID), - Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), + Stream = #stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0, + flow_window=FlowWindow0, handler_state=Handlers0} = get_stream_by_id(State, StreamID), + {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0), + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 - Dec + end, Size = byte_size(Data), + FlowWindow = if + IsFin =:= nofin, Flow =< 0 -> + FlowWindow0 + Size; + true -> + FlowWindow0 + end, {HTTP2Machine, EvHandlerState} = case Size of %% We do not send a WINDOW_UPDATE if the DATA frame was of size 0. 0 when IsFin =:= fin -> @@ -209,8 +229,11 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport, _ -> Transport:send(Socket, cow_http2:window_update(Size)), HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0), - %% We do not send a stream WINDOW_UPDATE if this was the last DATA frame. + %% We do not send a stream WINDOW_UPDATE when the flow control kicks in + %% (it'll be sent when the flow recovers) or for the last DATA frame. case IsFin of + nofin when Flow =< 0 -> + {HTTP2Machine1, EvHandlerState0}; nofin -> Transport:send(Socket, cow_http2:window_update(StreamID, Size)), {cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1), @@ -224,7 +247,7 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport, end end, {maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine}, - Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin), + Stream#stream{flow=Flow, flow_window=FlowWindow, handler_state=Handlers}), StreamID, remote, IsFin), EvHandlerState}. headers_frame(State=#http2_state{content_handlers=Handlers0}, @@ -294,12 +317,13 @@ rst_stream_frame(State=#http2_state{streams=Streams0}, {State, EvHandlerState0} end. +%% Pushed streams receive the same initial flow value as the parent stream. push_promise_frame(State=#http2_state{streams=Streams}, StreamID, PromisedStreamID, Headers, #{ method := Method, scheme := Scheme, authority := Authority, path := Path}, EvHandler, EvHandlerState0) -> - #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), + #stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID), PromisedStreamRef = make_ref(), URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]), ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers}, @@ -311,7 +335,8 @@ push_promise_frame(State=#http2_state{streams=Streams}, uri => URI, headers => Headers }, EvHandlerState0), - NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, reply_to=ReplyTo}, + NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, + reply_to=ReplyTo, flow=InitialFlow}, {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}. ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> @@ -322,6 +347,28 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> terminate(State#http2_state{http2_machine=HTTP2Machine}, Error) end. +update_flow(State=#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine0}, _ReplyTo, StreamRef, Inc) -> + case get_stream_by_ref(State, StreamRef) of + Stream=#stream{id=StreamID, flow=Flow0, flow_window=FlowWindow} -> + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 + Inc + end, + if + %% Flow is active again, update the window. + Flow0 =< 0, Flow > 0 -> + Transport:send(Socket, cow_http2:window_update(StreamID, FlowWindow)), + HTTP2Machine = cow_http2_machine:update_window(StreamID, FlowWindow, HTTP2Machine0), + {state, store_stream(State#http2_state{http2_machine=HTTP2Machine}, + Stream#stream{flow=Flow, flow_window=0})}; + true -> + {state, store_stream(State, Stream#stream{flow=Flow})} + end; + false -> + [] + end. + %% @todo Use Reason. close(_, #http2_state{streams=Streams}, _, EvHandlerState) -> {close_streams(Streams), EvHandlerState}. @@ -337,10 +384,10 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> Transport:send(Socket, cow_http2:ping(0)), State. -headers(State=#http2_state{socket=Socket, transport=Transport, +headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0, streams=Streams}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, - EvHandler, EvHandlerState0) -> + InitialFlow0, EvHandler, EvHandlerState0) -> {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0), @@ -358,14 +405,15 @@ headers(State=#http2_state{socket=Socket, transport=Transport, StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), - Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, + InitialFlow = initial_flow(InitialFlow0, Opts), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow}, {State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]}, EvHandlerState}. -request(State=#http2_state{socket=Socket, transport=Transport, +request(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0, streams=Streams}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body, - EvHandler, EvHandlerState0) -> + InitialFlow0, EvHandler, EvHandlerState0) -> Headers1 = lists:keystore(<<"content-length">>, 1, Headers0, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( @@ -385,11 +433,15 @@ request(State=#http2_state{socket=Socket, transport=Transport, StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), - Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, + InitialFlow = initial_flow(InitialFlow0, Opts), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow}, maybe_send_data(State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]}, StreamID, fin, Body, EvHandler, EvHandlerState). +initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow; +initial_flow(InitialFlow, _) -> InitialFlow. + prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, Headers0) -> Authority = case lists:keyfind(<<"host">>, 1, Headers0) of {_, Host} -> Host; diff --git a/src/gun_sse_h.erl b/src/gun_sse_h.erl index 11dc443..9d8836e 100644 --- a/src/gun_sse_h.erl +++ b/src/gun_sse_h.erl @@ -38,18 +38,22 @@ init(ReplyTo, StreamRef, _, Headers, _) -> disable end. --spec handle(_, binary(), State) -> {done, State} when State::#state{}. -handle(IsFin, Data, State=#state{reply_to=ReplyTo, stream_ref=StreamRef, sse_state=SSE0}) -> +-spec handle(_, binary(), State) -> {done, non_neg_integer(), State} when State::#state{}. +handle(IsFin, Data, State) -> + handle(IsFin, Data, State, 0). + +handle(IsFin, Data, State=#state{reply_to=ReplyTo, stream_ref=StreamRef, sse_state=SSE0}, Flow) -> case cow_sse:parse(Data, SSE0) of {event, Event, SSE} -> ReplyTo ! {gun_sse, self(), StreamRef, Event}, - handle(IsFin, <<>>, State#state{sse_state=SSE}); + handle(IsFin, <<>>, State#state{sse_state=SSE}, Flow + 1); {more, SSE} -> - _ = case IsFin of + Inc = case IsFin of fin -> - ReplyTo ! {gun_sse, self(), StreamRef, fin}; + ReplyTo ! {gun_sse, self(), StreamRef, fin}, + 1; _ -> - ok + 0 end, - {done, State#state{sse_state=SSE}} + {done, Flow + Inc, State#state{sse_state=SSE}} end. diff --git a/src/gun_ws.erl b/src/gun_ws.erl index 7acf74e..42cf049 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -16,8 +16,9 @@ -export([check_options/1]). -export([name/0]). --export([init/8]). +-export([init/9]). -export([handle/4]). +-export([update_flow/4]). -export([close/4]). -export([send/4]). -export([down/1]). @@ -42,6 +43,7 @@ frag_state = undefined :: cow_ws:frag_state(), utf8_state = 0 :: cow_ws:utf8_state(), extensions = #{} :: cow_ws:extensions(), + flow :: integer() | infinity, handler :: module(), handler_state :: any() }). @@ -55,6 +57,8 @@ do_check_options([{compress, B}|Opts]) when B =:= true; B =:= false -> do_check_options(Opts); do_check_options([{default_protocol, M}|Opts]) when is_atom(M) -> do_check_options(Opts); +do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 -> + do_check_options(Opts); do_check_options([Opt={protocols, L}|Opts]) when is_list(L) -> case lists:usort(lists:flatten([[is_binary(B), is_atom(M)] || {B, M} <- L])) of [true] -> do_check_options(Opts); @@ -67,19 +71,19 @@ do_check_options([Opt|_]) -> name() -> ws. -init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) -> +init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Handler, Opts) -> Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, - HandlerState = Handler:init(Owner, StreamRef, Headers, Opts), + {ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts), {switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef, socket=Socket, transport=Transport, extensions=Extensions, - handler=Handler, handler_state=HandlerState}}. + flow=InitialFlow, handler=Handler, handler_state=HandlerState}}. %% Do not handle anything if we received a close frame. handle(_, State=#ws_state{in=close}, _, EvHandlerState) -> {{state, State}, EvHandlerState}; %% Shortcut for common case when Data is empty after processing a frame. handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) -> - {{state, State}, EvHandlerState}; + maybe_active(State, EvHandlerState); handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer, in=head, frag_state=FragState, extensions=Extensions}, EvHandler, EvHandlerState0) -> @@ -113,7 +117,7 @@ handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer, in=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey}, frag_state=FragState2}, EvHandler, EvHandlerState); more -> - {{state, State#ws_state{buffer=Data2}}, EvHandlerState1}; + maybe_active(State#ws_state{buffer=Data2}, EvHandlerState1); error -> close({error, badframe}, State, EvHandler, EvHandlerState1) end; @@ -130,20 +134,26 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke <>, CloseCode, EvHandler, EvHandlerState); {more, CloseCode2, Payload, Utf8State2} -> - {{state, State#ws_state{in=In#payload{close_code=CloseCode2, + maybe_active(State#ws_state{in=In#payload{close_code=CloseCode2, unmasked= <>, - len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}}, - EvHandlerState}; + len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}, + EvHandlerState); {more, Payload, Utf8State2} -> - {{state, State#ws_state{in=In#payload{unmasked= <>, - len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}}, - EvHandlerState}; + maybe_active(State#ws_state{in=In#payload{unmasked= <>, + len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}, + EvHandlerState); Error = {error, _Reason} -> close(Error, State, EvHandler, EvHandlerState) end. +maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) -> + {[ + {state, State}, + {active, Flow > 0} + ], EvHandlerState}. + dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, - frag_state=FragState, extensions=Extensions, + frag_state=FragState, extensions=Extensions, flow=Flow0, handler=Handler, handler_state=HandlerState0}, Type, Payload, CloseCode, EvHandler, EvHandlerState0) -> EvHandlerState1 = EvHandler:ws_recv_frame_end(#{ @@ -165,8 +175,12 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, {pong, _} -> handle(Rest, State0, EvHandler, EvHandlerState1); Frame -> - HandlerState = Handler:handle(Frame, HandlerState0), - State1 = State0#ws_state{handler_state=HandlerState}, + {ok, Dec, HandlerState} = Handler:handle(Frame, HandlerState0), + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 - Dec + end, + State1 = State0#ws_state{flow=Flow, handler_state=HandlerState}, State = case Frame of close -> State1#ws_state{in=close}; {close, _, _} -> State1#ws_state{in=close}; @@ -176,6 +190,16 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, handle(Rest, State, EvHandler, EvHandlerState1) end. +update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) -> + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 + Inc + end, + [ + {state, State#ws_state{flow=Flow}}, + {active, Flow > 0} + ]. + close(Reason, State, EvHandler, EvHandlerState) -> case Reason of normal -> diff --git a/src/gun_ws_h.erl b/src/gun_ws_h.erl index 5ff0646..4859532 100644 --- a/src/gun_ws_h.erl +++ b/src/gun_ws_h.erl @@ -24,15 +24,15 @@ }). init(ReplyTo, StreamRef, _, _) -> - #state{reply_to=ReplyTo, stream_ref=StreamRef}. + {ok, #state{reply_to=ReplyTo, stream_ref=StreamRef}}. handle({fragment, nofin, _, Payload}, State=#state{frag_buffer=SoFar}) -> - State#state{frag_buffer= << SoFar/binary, Payload/binary >>}; + {ok, 0, State#state{frag_buffer= << SoFar/binary, Payload/binary >>}}; handle({fragment, fin, Type, Payload}, State=#state{reply_to=ReplyTo, stream_ref=StreamRef, frag_buffer=SoFar}) -> ReplyTo ! {gun_ws, self(), StreamRef, {Type, << SoFar/binary, Payload/binary >>}}, - State#state{frag_buffer= <<>>}; + {ok, 1, State#state{frag_buffer= <<>>}}; handle(Frame, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) -> ReplyTo ! {gun_ws, self(), StreamRef, Frame}, - State. + {ok, 1, State}. diff --git a/test/flow_SUITE.erl b/test/flow_SUITE.erl new file mode 100644 index 0000000..937af26 --- /dev/null +++ b/test/flow_SUITE.erl @@ -0,0 +1,325 @@ +%% Copyright (c) 2019, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(flow_SUITE). +-compile(export_all). +-compile(nowarn_export_all). + +-import(ct_helper, [doc/1]). + +all() -> + [{group, flow}]. + +groups() -> + [{flow, [parallel], ct_helper:all(?MODULE)}]. + +%% Tests. + +default_flow_http(_) -> + doc("Confirm flow control default can be changed and overriden for HTTP/1.1."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + %% First we check that we can set the flow for the entire connection. + {ok, ConnPid1} = gun:open("localhost", Port, #{ + http_opts => #{flow => 1} + }), + {ok, http} = gun:await_up(ConnPid1), + StreamRef1 = gun:get(ConnPid1, "/"), + {response, nofin, 200, _} = gun:await(ConnPid1, StreamRef1), + {data, nofin, _} = gun:await(ConnPid1, StreamRef1), + {error, timeout} = gun:await(ConnPid1, StreamRef1, 1500), + gun:close(ConnPid1), + %% Then we confirm that we can override it per request. + {ok, ConnPid2} = gun:open("localhost", Port, #{ + http_opts => #{flow => 1} + }), + {ok, http} = gun:await_up(ConnPid2), + StreamRef2 = gun:get(ConnPid2, "/", [], #{flow => 2}), + {response, nofin, 200, _} = gun:await(ConnPid2, StreamRef2), + {data, nofin, _} = gun:await(ConnPid2, StreamRef2), + {data, nofin, _} = gun:await(ConnPid2, StreamRef2), + {error, timeout} = gun:await(ConnPid2, StreamRef2, 1500), + gun:close(ConnPid2) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +default_flow_http2(_) -> + doc("Confirm flow control default can be changed and overriden for HTTP/2."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, 40000}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + %% First we check that we can set the flow for the entire connection. + {ok, ConnPid} = gun:open("localhost", Port, #{ + http2_opts => #{ + flow => 1, + %% We set the max frame size to the same as the initial + %% window size in order to reduce the number of data messages. + max_frame_size_received => 65535 + }, + protocols => [http2] + }), + {ok, http2} = gun:await_up(ConnPid), + StreamRef1 = gun:get(ConnPid, "/"), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1), + %% We set the flow to 1 therefore we will receive *2* data messages, + %% and then nothing because the window was fully consumed. + {data, nofin, _} = gun:await(ConnPid, StreamRef1), + {data, nofin, _} = gun:await(ConnPid, StreamRef1), + {error, timeout} = gun:await(ConnPid, StreamRef1, 1500), + %% Then we confirm that we can override it per request. + StreamRef2 = gun:get(ConnPid, "/", [], #{flow => 2}), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef2), + %% We set the flow to 2 therefore we will receive *3* data messages + %% and then nothing because two windows have been fully consumed. + {data, nofin, _} = gun:await(ConnPid, StreamRef2), + {data, nofin, _} = gun:await(ConnPid, StreamRef2), + {data, nofin, _} = gun:await(ConnPid, StreamRef2), + {error, timeout} = gun:await(ConnPid, StreamRef2, 1500), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +flow_http(_) -> + doc("Confirm flow control works as intended for HTTP/1.1."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + %% We set the flow to 1 therefore we will receive 1 data message, + %% and then nothing because Gun doesn't read from the socket. + {data, nofin, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 3000), + %% We then update the flow and get 2 more data messages but no more. + gun:update_flow(ConnPid, StreamRef, 2), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 1000), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +flow_http2(_) -> + doc("Confirm flow control works as intended for HTTP/2."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, 40000}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port, #{ + %% We set the max frame size to the same as the initial + %% window size in order to reduce the number of data messages. + http2_opts => #{max_frame_size_received => 65535}, + protocols => [http2] + }), + {ok, http2} = gun:await_up(ConnPid), + StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + %% We set the flow to 1 therefore we will receive *2* data messages, + %% and then nothing because the window was fully consumed. + {data, nofin, D1} = gun:await(ConnPid, StreamRef), + {data, nofin, D2} = gun:await(ConnPid, StreamRef), + %% We consumed all the window available. + 65535 = byte_size(D1) + byte_size(D2), + {error, timeout} = gun:await(ConnPid, StreamRef, 3000), + %% We then update the flow and get *3* more data messages but no more. + gun:update_flow(ConnPid, StreamRef, 2), + {data, nofin, D3} = gun:await(ConnPid, StreamRef), + {data, nofin, D4} = gun:await(ConnPid, StreamRef), + {data, nofin, D5} = gun:await(ConnPid, StreamRef), + %% We consumed all the window available again. + %% D3 is the end of the truncated D2, D4 is full and D5 truncated. + 65535 = byte_size(D3) + byte_size(D4) + byte_size(D5), + {error, timeout} = gun:await(ConnPid, StreamRef, 1000), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +flow_ws(_) -> + doc("Confirm flow control works as intended for Websocket."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", ws_echo_h, []}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/", [], #{flow => 1}), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + %% We send 2 frames with some time in between to make sure that + %% Gun handles them in separate Protocol:handle calls. + Frame = {text, <<"Hello!">>}, + gun:ws_send(ConnPid, Frame), + timer:sleep(100), + gun:ws_send(ConnPid, Frame), + %% We set the flow to 1 therefore we will receive 1 data message, + %% and then nothing because Gun doesn't read from the socket. + {ws, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 3000), + %% We send 2 more frames. + gun:ws_send(ConnPid, Frame), + timer:sleep(100), + gun:ws_send(ConnPid, Frame), + %% We then update the flow and get 2 more data messages but no more. + gun:update_flow(ConnPid, StreamRef, 2), + {ws, _} = gun:await(ConnPid, StreamRef), + {ws, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 1000), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +no_flow_http(_) -> + doc("Ignore flow updates for no-flow streams for HTTP/1.1."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:get(ConnPid, "/", []), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + gun:update_flow(ConnPid, StreamRef, 2), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +no_flow_http2(_) -> + doc("Ignore flow updates for no-flow streams for HTTP/2."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port, #{ + protocols => [http2] + }), + {ok, http2} = gun:await_up(ConnPid), + StreamRef = gun:get(ConnPid, "/", []), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + gun:update_flow(ConnPid, StreamRef, 2), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +no_flow_ws(_) -> + doc("Ignore flow updates for no-flow streams for Websocket."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", ws_echo_h, []}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + gun:update_flow(ConnPid, StreamRef, 2), + Frame = {text, <<"Hello!">>}, + gun:ws_send(ConnPid, Frame), + timer:sleep(100), + gun:ws_send(ConnPid, Frame), + {ws, _} = gun:await(ConnPid, StreamRef), + {ws, _} = gun:await(ConnPid, StreamRef), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +sse_flow_http(_) -> + doc("Confirm flow control works as intended for HTTP/1.1 " + "when using the gun_sse_h content handler."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port, #{ + http_opts => #{content_handlers => [gun_sse_h, gun_data_h]} + }), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + %% We set the flow to 1 therefore we will receive 1 event message, + %% and then nothing because Gun doesn't read from the socket. We + %% set the timeout to 2500 to ensure there is only going to be one + %% message queued up. + {sse, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 2500), + %% We then update the flow and get 2 more event messages but no more. + gun:update_flow(ConnPid, StreamRef, 2), + {sse, _} = gun:await(ConnPid, StreamRef), + {sse, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 1000), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +sse_flow_http2(_) -> + doc("Confirm flow control works as intended for HTTP/2 " + "when using the gun_sse_h content handler."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, 40000}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port, #{ + %% We set the max frame size to the same as the initial + %% window size in order to reduce the number of data messages. + http2_opts => #{ + content_handlers => [gun_sse_h, gun_data_h], + max_frame_size_received => 65535 + }, + protocols => [http2] + }), + {ok, http2} = gun:await_up(ConnPid), + StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + %% We set the flow to 1 therefore we will receive 1 event message, + %% and then nothing because the window was fully consumed before + %% the second event was fully received. + {sse, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 3000), + %% We then update the flow and get 2 more event messages but no more. + gun:update_flow(ConnPid, StreamRef, 2), + {sse, _} = gun:await(ConnPid, StreamRef), + {sse, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 1000), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. diff --git a/test/handlers/sse_clock_h.erl b/test/handlers/sse_clock_h.erl index 8c18ac5..dcc7c3f 100644 --- a/test/handlers/sse_clock_h.erl +++ b/test/handlers/sse_clock_h.erl @@ -15,6 +15,11 @@ init(Req, State) -> info(timeout, Req, State) -> erlang:send_after(1000, self(), timeout), cowboy_req:stream_events(#{ - data => cowboy_clock:rfc1123() + data => data(State) }, nofin, Req), {ok, Req, State}. + +data(date) -> + cowboy_clock:rfc1123(); +data(Size) when is_integer(Size) -> + lists:duplicate(Size, $0). diff --git a/test/sse_SUITE.erl b/test/sse_SUITE.erl index f950bf7..dc46311 100644 --- a/test/sse_SUITE.erl +++ b/test/sse_SUITE.erl @@ -31,7 +31,7 @@ end_per_suite(Config) -> init_routes() -> [ {"localhost", [ - {"/clock", sse_clock_h, []}, + {"/clock", sse_clock_h, date}, {"/lone_id", sse_lone_id_h, []} ]} ]. -- cgit v1.2.3