aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-08-02 14:30:08 +0200
committerLoïc Hoguin <[email protected]>2019-08-05 19:57:13 +0200
commit611f9a9b78cab4005892e13dffb7a2c8e44580ee (patch)
treed8d3fc407110ea12333ba122cf711326e82a7070
parent145b9af4bdbb85e2f83959ee8abaa4d9207a4529 (diff)
downloadgun-611f9a9b78cab4005892e13dffb7a2c8e44580ee.tar.gz
gun-611f9a9b78cab4005892e13dffb7a2c8e44580ee.tar.bz2
gun-611f9a9b78cab4005892e13dffb7a2c8e44580ee.zip
Add flow control
Flow control is disabled by default. The initial flow value must be set to enable it (either for the entire connection or on a per-request basis). Flow applies to all HTTP streams as well as Websocket. HTTP/2 pushed streams receive the same value as their originating stream.
-rw-r--r--doc/src/manual/gun.asciidoc28
-rw-r--r--doc/src/manual/gun.update_flow.asciidoc67
-rw-r--r--src/gun.erl59
-rw-r--r--src/gun_content_handler.erl21
-rw-r--r--src/gun_data_h.erl4
-rw-r--r--src/gun_http.erl137
-rw-r--r--src/gun_http2.erl82
-rw-r--r--src/gun_sse_h.erl18
-rw-r--r--src/gun_ws.erl54
-rw-r--r--src/gun_ws_h.erl8
-rw-r--r--test/flow_SUITE.erl325
-rw-r--r--test/handlers/sse_clock_h.erl7
-rw-r--r--test/sse_SUITE.erl2
13 files changed, 700 insertions, 112 deletions
diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc
index da5548b..acc1454 100644
--- a/doc/src/manual/gun.asciidoc
+++ b/doc/src/manual/gun.asciidoc
@@ -46,6 +46,7 @@ Messages:
Streams:
+* link:man:gun:update_flow(3)[gun:update_flow(3)] - Update a stream's flow control value
* link:man:gun:cancel(3)[gun:cancel(3)] - Cancel the given stream
* link:man:gun:stream_info(3)[gun:stream_info(3)] - Obtain information about a stream
@@ -143,6 +144,7 @@ Handshake timeout for tunneled TLS connections.
[source,erlang]
----
http_opts() :: #{
+ flow => pos_integer(),
keepalive => timeout(),
transform_header_name => fun((binary()) -> binary()),
version => 'HTTP/1.1' | 'HTTP/1.0'
@@ -155,6 +157,11 @@ The default value is given next to the option name:
// @todo Document content_handlers and gun_sse_h.
+flow - see below::
+
+The initial flow control value for all HTTP/1.1 streams.
+By default flow control is disabled.
+
keepalive (infinity)::
Time between pings in milliseconds. Since the HTTP protocol has
@@ -181,6 +188,7 @@ HTTP version to use.
[source,erlang]
----
http2_opts() :: #{
+ flow => pos_integer(),
keepalive => timeout()
}
----
@@ -191,6 +199,11 @@ The default value is given next to the option name:
// @todo Document content_handlers and gun_sse_h.
+flow - see below::
+
+The initial flow control value for all HTTP/2 streams.
+By default flow control is disabled.
+
keepalive (5000)::
Time between pings in milliseconds.
@@ -328,6 +341,7 @@ Request headers.
[source,erlang]
----
req_opts() :: #{
+ flow => pos_integer(),
reply_to => pid()
}
----
@@ -336,6 +350,11 @@ Configuration for a particular request.
The default value is given next to the option name:
+flow - see below::
+
+The initial flow control value for the stream. By default
+flow control is disabled.
+
reply_to (`self()`)::
The pid of the process that will receive the response messages.
@@ -346,6 +365,7 @@ The pid of the process that will receive the response messages.
----
ws_opts() :: #{
compress => boolean(),
+ flow => pos_integer(),
protocols => [{binary(), module()}]
}
----
@@ -360,6 +380,11 @@ Whether to enable permessage-deflate compression. This does
not guarantee that compression will be used as it is the
server that ultimately decides. Defaults to false.
+flow - see below::
+
+The initial flow control value for the Websocket connection.
+By default flow control is disabled.
+
protocols ([])::
A non-empty list enables Websocket protocol negotiation. The
@@ -378,6 +403,9 @@ undocumented and must be set to `gun_ws_h`.
implement different reconnect strategies.
* *2.0*: The `transport_opts` option has been split into
two options: `tcp_opts` and `tls_opts`.
+* *2.0*: Function `gun:update_flow/3` introduced. The `flow`
+ option was added to request options and HTTP/1.1,
+ HTTP/2 and Websocket options as well.
* *2.0*: Introduce the type `req_headers()` and extend the
types accepted for header names for greater
interoperability. Header names are automatically
diff --git a/doc/src/manual/gun.update_flow.asciidoc b/doc/src/manual/gun.update_flow.asciidoc
new file mode 100644
index 0000000..c7990f2
--- /dev/null
+++ b/doc/src/manual/gun.update_flow.asciidoc
@@ -0,0 +1,67 @@
+= gun:update_flow(3)
+
+== Name
+
+gun:update_flow - Update a stream's flow control value
+
+== Description
+
+[source,erlang]
+----
+update_flow(ConnPid, StreamRef, Flow) -> ok
+
+ConnPid :: pid()
+StreamRef :: reference()
+Flow :: pos_integer()
+----
+
+Update a stream's flow control value.
+
+The flow value can only ever be incremented.
+
+This function does nothing for streams that have flow
+control disabled (which is the default).
+
+== Arguments
+
+ConnPid::
+
+The pid of the Gun connection process.
+
+StreamRef::
+
+Identifier of the stream for the original request.
+
+Flow::
+
+Flow control value increment.
+
+== Return value
+
+The atom `ok` is returned.
+
+== Changelog
+
+* *2.0*: Function introduced.
+
+== Examples
+
+.Update a stream's flow control value
+[source,erlang]
+----
+gun:update_flow(ConnPid, StreamRef, 10).
+----
+
+== See also
+
+link:man:gun(3)[gun(3)],
+link:man:gun:get(3)[gun:get(3)],
+link:man:gun:head(3)[gun:head(3)],
+link:man:gun:options(3)[gun:options(3)],
+link:man:gun:patch(3)[gun:patch(3)],
+link:man:gun:post(3)[gun:post(3)],
+link:man:gun:put(3)[gun:put(3)],
+link:man:gun:delete(3)[gun:delete(3)],
+link:man:gun:headers(3)[gun:headers(3)],
+link:man:gun:request(3)[gun:request(3)],
+link:man:gun:ws_upgrade(3)[gun:ws_upgrade(3)]
diff --git a/src/gun.erl b/src/gun.erl
index d758ffc..7b06aaf 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -79,6 +79,7 @@
-export([flush/1]).
%% Streams.
+-export([update_flow/3]).
-export([cancel/2]).
-export([stream_info/2]).
@@ -158,6 +159,7 @@
%% This is of course not required for HTTP/1.1 since the CONNECT takes over
%% the entire connection.
-type req_opts() :: #{
+ flow => pos_integer(),
reply_to => pid()
}.
-export_type([req_opts/0]).
@@ -177,6 +179,7 @@
%% @todo keepalive
-type ws_opts() :: #{
compress => boolean(),
+ flow => pos_integer(),
protocols => [{binary(), module()}]
}.
-export_type([ws_opts/0]).
@@ -194,6 +197,7 @@
keepalive_ref :: undefined | reference(),
socket :: undefined | inet:socket() | ssl:sslsocket() | pid(),
transport :: module(),
+ active = true :: boolean(),
messages :: {atom(), atom(), atom()},
protocol :: module(),
protocol_state :: any(),
@@ -458,9 +462,10 @@ headers(ServerPid, Method, Path, Headers) ->
-spec headers(pid(), iodata(), iodata(), req_headers(), req_opts()) -> reference().
headers(ServerPid, Method, Path, Headers, ReqOpts) ->
StreamRef = make_ref(),
+ InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
gen_statem:cast(ServerPid, {headers, ReplyTo, StreamRef,
- Method, Path, normalize_headers(Headers)}),
+ Method, Path, normalize_headers(Headers), InitialFlow}),
StreamRef.
-spec request(pid(), iodata(), iodata(), req_headers(), iodata()) -> reference().
@@ -470,9 +475,10 @@ request(ServerPid, Method, Path, Headers, Body) ->
-spec request(pid(), iodata(), iodata(), req_headers(), iodata(), req_opts()) -> reference().
request(ServerPid, Method, Path, Headers, Body, ReqOpts) ->
StreamRef = make_ref(),
+ InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
gen_statem:cast(ServerPid, {request, ReplyTo, StreamRef,
- Method, Path, normalize_headers(Headers), Body}),
+ Method, Path, normalize_headers(Headers), Body, InitialFlow}),
StreamRef.
normalize_headers([]) ->
@@ -510,8 +516,10 @@ connect(ServerPid, Destination, Headers) ->
-spec connect(pid(), connect_destination(), req_headers(), req_opts()) -> reference().
connect(ServerPid, Destination, Headers, ReqOpts) ->
StreamRef = make_ref(),
+ InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
- gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef, Destination, Headers}),
+ gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef,
+ Destination, Headers, InitialFlow}),
StreamRef.
%% Awaiting gun messages.
@@ -520,6 +528,7 @@ connect(ServerPid, Destination, Headers, ReqOpts) ->
-type await_result() :: {inform, 100..199, resp_headers()}
| {response, fin | nofin, non_neg_integer(), resp_headers()}
| {data, fin | nofin, binary()}
+ | {sse, cow_sse:event() | fin}
| {trailers, resp_headers()}
| {push, reference(), binary(), binary(), resp_headers()}
| {upgrade, [binary()], resp_headers()}
@@ -551,6 +560,8 @@ await(ServerPid, StreamRef, Timeout, MRef) ->
{response, IsFin, Status, Headers};
{gun_data, ServerPid, StreamRef, IsFin, Data} ->
{data, IsFin, Data};
+ {gun_sse, ServerPid, StreamRef, Event} ->
+ {sse, Event};
{gun_trailers, ServerPid, StreamRef, Trailers} ->
{trailers, Trailers};
{gun_push, ServerPid, StreamRef, NewStreamRef, Method, URI, Headers} ->
@@ -699,12 +710,20 @@ flush_ref(StreamRef) ->
ok
end.
+%% Flow control.
+
+-spec update_flow(pid(), reference(), pos_integer()) -> ok.
+update_flow(ServerPid, StreamRef, Flow) ->
+ gen_statem:cast(ServerPid, {update_flow, self(), StreamRef, Flow}).
+
%% Cancelling a stream.
-spec cancel(pid(), reference()) -> ok.
cancel(ServerPid, StreamRef) ->
gen_statem:cast(ServerPid, {cancel, self(), StreamRef}).
+%% Information about a stream.
+
-spec stream_info(pid(), reference()) -> {ok, map() | undefined} | {error, not_connected}.
stream_info(ServerPid, StreamRef) ->
gen_statem:call(ServerPid, {stream_info, StreamRef}).
@@ -919,11 +938,16 @@ connected(internal, {connected, Socket, Protocol},
{keep_state, keepalive_timeout(active(State#state{socket=Socket,
protocol=Protocol, protocol_state=ProtoState}))};
%% Socket events.
-connected(info, {OK, Socket, Data}, State=#state{socket=Socket, messages={OK, _, _},
+connected(info, {OK, Socket, Data}, State0=#state{socket=Socket, messages={OK, _, _},
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0),
- commands(Commands, active(State#state{event_handler_state=EvHandlerState}));
+ case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of
+ {keep_state, State} ->
+ {keep_state, active(State)};
+ Res ->
+ Res
+ end;
connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) ->
disconnect(State, closed);
connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) ->
@@ -936,21 +960,21 @@ connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoS
ProtoState2 = Protocol:keepalive(ProtoState),
{keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})};
%% Public HTTP interface.
-connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers},
+connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, EvHandlerState} = Protocol:headers(ProtoState,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
- EvHandler, EvHandlerState0),
+ InitialFlow, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
-connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body},
+connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, EvHandlerState} = Protocol:request(ProtoState,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body,
- EvHandler, EvHandlerState0),
+ InitialFlow, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
%% @todo Do we want to reject ReplyTo if it's not the process
%% who initiated the connection? For both data and cancel.
@@ -960,7 +984,7 @@ connected(cast, {data, ReplyTo, StreamRef, IsFin, Data},
{ProtoState2, EvHandlerState} = Protocol:data(ProtoState,
StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
-connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers},
+connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow},
State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
%% The protocol option has been deprecated in favor of the protocols option.
%% Nobody probably ended up using it, but let's not break the interface.
@@ -980,7 +1004,7 @@ connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers},
_ ->
Destination1
end,
- ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers),
+ ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers, InitialFlow),
{keep_state, State#state{protocol_state=ProtoState2}};
%% When using gun_tls_proxy we need a separate message to know whether
%% the handshake succeeded and whether we need to switch to a different protocol.
@@ -1002,6 +1026,15 @@ connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent}
error => Reason
}, EvHandlerState0),
commands([Error], State#state{event_handler_state=EvHandlerState});
+connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, State0=#state{
+ protocol=Protocol, protocol_state=ProtoState}) ->
+ Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow),
+ case commands(Commands, State0) of
+ {keep_state, State} ->
+ {keep_state, active(State)};
+ Res ->
+ Res
+ end;
connected(cast, {cancel, ReplyTo, StreamRef}, State=#state{
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
@@ -1092,6 +1125,8 @@ commands([close|_], State) ->
disconnect(State, normal);
commands([Error={error, _}|_], State) ->
disconnect(State, Error);
+commands([{active, Active}|Tail], State) when is_boolean(Active) ->
+ commands(Tail, State#state{active=Active});
commands([{state, ProtoState}|Tail], State) ->
commands(Tail, State#state{protocol_state=ProtoState});
%% Order is important: the origin must be changed before
@@ -1179,6 +1214,8 @@ disconnect_flush(State=#state{socket=Socket, messages={OK, Closed, Error}}) ->
ok
end.
+active(State=#state{active=false}) ->
+ State;
active(State=#state{socket=Socket, transport=Transport}) ->
Transport:setopts(Socket, [{active, once}]),
State.
diff --git a/src/gun_content_handler.erl b/src/gun_content_handler.erl
index 78e5e9d..fa180d8 100644
--- a/src/gun_content_handler.erl
+++ b/src/gun_content_handler.erl
@@ -28,7 +28,9 @@
cow_http:headers(), map()) -> {ok, any()} | disable.
%% @todo Make fin | nofin its own type.
-callback handle(fin | nofin, any(), State)
- -> {ok, any(), State} | {done, State} when State::any().
+ -> {ok, any(), non_neg_integer(), State}
+ | {done, non_neg_integer(), State}
+ when State::any().
-spec init(pid(), any(), cow_http:status(),
cow_http:headers(), State) -> State when State::state().
@@ -44,13 +46,18 @@ init(ReplyTo, StreamRef, Status, Headers, [Handler|Tail]) ->
disable -> init(ReplyTo, StreamRef, Status, Headers, Tail)
end.
--spec handle(fin | nofin, any(), State) -> State when State::state().
-handle(_, _, []) ->
- [];
-handle(IsFin, Data0, [{Mod, State0}|Tail]) ->
+-spec handle(fin | nofin, any(), State) -> {ok, non_neg_integer(), State} when State::state().
+handle(IsFin, Data, State) ->
+ handle(IsFin, Data, State, 0, []).
+
+handle(_, _, [], Flow, Acc) ->
+ {ok, Flow, lists:reverse(Acc)};
+handle(IsFin, Data0, [{Mod, State0}|Tail], Flow, Acc) ->
case Mod:handle(IsFin, Data0, State0) of
- {ok, Data, State} -> [{Mod, State}|handle(IsFin, Data, Tail)];
- {done, State} -> [{Mod, State}|Tail]
+ {ok, Data, Inc, State} ->
+ handle(IsFin, Data, Tail, Flow + Inc, [{Mod, State}|Acc]);
+ {done, Inc, State} ->
+ {ok, Flow + Inc, lists:reverse([{Mod, State}|Acc], Tail)}
end.
-spec check_option(list()) -> ok | error.
diff --git a/src/gun_data_h.erl b/src/gun_data_h.erl
index af2c2cb..d1f8787 100644
--- a/src/gun_data_h.erl
+++ b/src/gun_data_h.erl
@@ -27,7 +27,7 @@
init(ReplyTo, StreamRef, _, _, _) ->
{ok, #state{reply_to=ReplyTo, stream_ref=StreamRef}}.
--spec handle(fin | nofin, binary(), State) -> {done, State} when State::#state{}.
+-spec handle(fin | nofin, binary(), State) -> {done, 1, State} when State::#state{}.
handle(IsFin, Data, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) ->
ReplyTo ! {gun_data, self(), StreamRef, IsFin, Data},
- {done, State}.
+ {done, 1, State}.
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 68f9e7d..ec268ad 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -18,12 +18,13 @@
-export([name/0]).
-export([init/4]).
-export([handle/4]).
+-export([update_flow/4]).
-export([close/4]).
-export([keepalive/1]).
--export([headers/10]).
--export([request/11]).
+-export([headers/11]).
+-export([request/12]).
-export([data/7]).
--export([connect/5]).
+-export([connect/6]).
-export([cancel/5]).
-export([stream_info/2]).
-export([down/1]).
@@ -43,6 +44,7 @@
-record(stream, {
ref :: reference() | connect_info() | websocket_info(),
reply_to :: pid(),
+ flow :: integer() | infinity,
method :: binary(),
is_alive :: boolean(),
handler_state :: undefined | gun_content_handler:state()
@@ -52,6 +54,7 @@
owner :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
+ opts = #{} :: map(), %% @todo
version = 'HTTP/1.1' :: cow_http:version(),
content_handlers :: gun_content_handler:opt(),
connection = keepalive :: keepalive | close,
@@ -73,6 +76,8 @@ do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
ok -> do_check_options(Opts);
error -> {error, {options, {http, Opt}}}
end;
+do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 ->
+ do_check_options(Opts);
do_check_options([{keepalive, infinity}|Opts]) ->
do_check_options(Opts);
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
@@ -87,10 +92,11 @@ do_check_options([Opt|_]) ->
name() -> http.
init(Owner, Socket, Transport, Opts) ->
+ %% @todo If we keep the opts we don't need to add these to the state.
Version = maps:get(version, Opts, 'HTTP/1.1'),
Handlers = maps:get(content_handlers, Opts, [gun_data_h]),
TransformHeaderName = maps:get(transform_header_name, Opts, fun (N) -> N end),
- #http_state{owner=Owner, socket=Socket, transport=Transport, version=Version,
+ #http_state{owner=Owner, socket=Socket, transport=Transport, opts=Opts, version=Version,
content_handlers=Handlers, transform_header_name=TransformHeaderName}.
%% Stop looping when we got no more data.
@@ -120,7 +126,7 @@ handle(Data, State=#http_state{in=head, buffer=Buffer,
end;
%% Everything sent to the socket until it closes is part of the response body.
handle(Data, State=#http_state{in=body_close}, _, EvHandlerState) ->
- {{state, send_data_if_alive(Data, State, nofin)}, EvHandlerState};
+ {send_data(Data, State, nofin), EvHandlerState};
%% Chunked transfer-encoding may contain both data and trailers.
handle(Data, State=#http_state{in=body_chunked, in_state=InState,
buffer=Buffer, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_],
@@ -130,21 +136,15 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
more ->
{{state, State#http_state{buffer=Buffer2}}, EvHandlerState0};
{more, Data2, InState2} ->
- {{state, send_data_if_alive(Data2,
- State#http_state{buffer= <<>>, in_state=InState2},
- nofin)}, EvHandlerState0};
+ {send_data(Data2, State#http_state{buffer= <<>>, in_state=InState2}, nofin), EvHandlerState0};
{more, Data2, Length, InState2} when is_integer(Length) ->
%% @todo See if we can recv faster than one message at a time.
- {{state, send_data_if_alive(Data2,
- State#http_state{buffer= <<>>, in_state=InState2},
- nofin)}, EvHandlerState0};
+ {send_data(Data2, State#http_state{buffer= <<>>, in_state=InState2}, nofin), EvHandlerState0};
{more, Data2, Rest, InState2} ->
%% @todo See if we can recv faster than one message at a time.
- {{state, send_data_if_alive(Data2,
- State#http_state{buffer=Rest, in_state=InState2},
- nofin)}, EvHandlerState0};
+ {send_data(Data2, State#http_state{buffer=Rest, in_state=InState2}, nofin), EvHandlerState0};
{done, HasTrailers, Rest} ->
- %% @todo response_end should be called AFTER send_data_if_alive
+ %% @todo response_end should be called AFTER send_data
{IsFin, EvHandlerState} = case HasTrailers of
trailers ->
{nofin, EvHandlerState0};
@@ -156,7 +156,8 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
{fin, EvHandlerState1}
end,
%% I suppose it doesn't hurt to append an empty binary.
- State1 = send_data_if_alive(<<>>, State, IsFin),
+ %% We ignore the active command because the stream ended.
+ [{state, State1}|_] = send_data(<<>>, State, IsFin),
case {HasTrailers, Conn} of
{trailers, _} ->
handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}, EvHandler, EvHandlerState);
@@ -166,7 +167,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
{[{state, end_stream(State1)}, close], EvHandlerState}
end;
{done, Data2, HasTrailers, Rest} ->
- %% @todo response_end should be called AFTER send_data_if_alive
+ %% @todo response_end should be called AFTER send_data
{IsFin, EvHandlerState} = case HasTrailers of
trailers ->
{nofin, EvHandlerState0};
@@ -177,7 +178,8 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
}, EvHandlerState0),
{fin, EvHandlerState1}
end,
- State1 = send_data_if_alive(Data2, State, IsFin),
+ %% We ignore the active command because the stream ended.
+ [{state, State1}|_] = send_data(Data2, State, IsFin),
case {HasTrailers, Conn} of
{trailers, _} ->
handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}, EvHandler, EvHandlerState);
@@ -218,24 +220,24 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn,
if
%% More data coming.
DataSize < Length ->
- {{state, send_data_if_alive(Data,
- State#http_state{in={body, Length - DataSize}},
- nofin)}, EvHandlerState0};
+ {send_data(Data, State#http_state{in={body, Length - DataSize}}, nofin), EvHandlerState0};
%% Stream finished, no rest.
DataSize =:= Length ->
- State1 = send_data_if_alive(Data, State, fin),
+ %% We ignore the active command because the stream ended.
+ [{state, State1}|_] = send_data(Data, State, fin),
EvHandlerState = EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
}, EvHandlerState0),
case Conn of
- keepalive -> {{state, end_stream(State1)}, EvHandlerState};
+ keepalive -> {[{state, end_stream(State1)}, {active, true}], EvHandlerState};
close -> {[{state, end_stream(State1)}, close], EvHandlerState}
end;
%% Stream finished, rest.
true ->
<< Body:Length/binary, Rest/bits >> = Data,
- State1 = send_data_if_alive(Body, State, fin),
+ %% We ignore the active command because the stream ended.
+ [{state, State1}|_] = send_data(Body, State, fin),
EvHandlerState = EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
@@ -420,20 +422,48 @@ stream_ref({connect, StreamRef, _}) -> StreamRef;
stream_ref({websocket, StreamRef, _, _, _}) -> StreamRef;
stream_ref(StreamRef) -> StreamRef.
-send_data_if_alive(<<>>, State, nofin) ->
- State;
+%% The state must be first in order to retrieve it when the stream ended.
+send_data(<<>>, State, nofin) ->
+ [{state, State}, {active, true}];
%% @todo What if we receive data when the HEAD method was used?
-send_data_if_alive(Data, State=#http_state{streams=[Stream=#stream{
- is_alive=true, handler_state=Handlers0}|Tail]}, IsFin) ->
- Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
- State#http_state{streams=[Stream#stream{handler_state=Handlers}|Tail]};
-send_data_if_alive(_, State, _) ->
- State.
+send_data(Data, State=#http_state{streams=[Stream=#stream{
+ flow=Flow0, is_alive=true, handler_state=Handlers0}|Tail]}, IsFin) ->
+ {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0),
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 - Dec
+ end,
+ [
+ {state, State#http_state{streams=[Stream#stream{flow=Flow, handler_state=Handlers}|Tail]}},
+ {active, Flow > 0}
+ ];
+send_data(_, State, _) ->
+ [{state, State}, {active, true}].
+
+%% We only update the active state when the current stream is being updated.
+update_flow(State=#http_state{streams=[Stream=#stream{ref=StreamRef, flow=Flow0}|Tail]},
+ _ReplyTo, StreamRef, Inc) ->
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 + Inc
+ end,
+ [
+ {state, State#http_state{streams=[Stream#stream{flow=Flow}|Tail]}},
+ {active, Flow > 0}
+ ];
+update_flow(State=#http_state{streams=Streams0}, _ReplyTo, StreamRef, Inc) ->
+ Streams = [case Ref of
+ StreamRef when Flow =/= infinity ->
+ Tuple#stream{flow=Flow + Inc};
+ _ ->
+ Tuple
+ end || Tuple = #stream{ref=Ref, flow=Flow} <- Streams0],
+ {state, State#http_state{streams=Streams}}.
%% @todo Use Reason.
close(_, State=#http_state{in=body_close, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]},
EvHandler, EvHandlerState0) ->
- _ = send_data_if_alive(<<>>, State, fin),
+ _ = send_data(<<>>, State, fin),
EvHandlerState = EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
@@ -461,23 +491,29 @@ keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) ->
keepalive(State) ->
State.
-headers(State=#http_state{out=head},
+headers(State=#http_state{opts=Opts, out=head},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
- EvHandler, EvHandlerState0) ->
+ InitialFlow0, EvHandler, EvHandlerState0) ->
{Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo,
Method, Host, Port, Path, Headers, undefined,
EvHandler, EvHandlerState0, ?FUNCTION_NAME),
- {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method),
+ InitialFlow = initial_flow(InitialFlow0, Opts),
+ {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method, InitialFlow),
EvHandlerState}.
-request(State=#http_state{out=head}, StreamRef, ReplyTo,
- Method, Host, Port, Path, Headers, Body, EvHandler, EvHandlerState0) ->
+request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo,
+ Method, Host, Port, Path, Headers, Body,
+ InitialFlow0, EvHandler, EvHandlerState0) ->
{Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo,
Method, Host, Port, Path, Headers, Body,
EvHandler, EvHandlerState0, ?FUNCTION_NAME),
- {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method),
+ InitialFlow = initial_flow(InitialFlow0, Opts),
+ {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method, InitialFlow),
EvHandlerState}.
+initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
+initial_flow(InitialFlow, _) -> InitialFlow.
+
send_request(State=#http_state{socket=Socket, transport=Transport, version=Version},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body,
EvHandler, EvHandlerState0, Function) ->
@@ -602,12 +638,12 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
end.
-connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _) when Streams =/= [] ->
+connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _) when Streams =/= [] ->
ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
State;
-connect(State=#http_state{socket=Socket, transport=Transport, version=Version},
- StreamRef, ReplyTo, Destination=#{host := Host0}, Headers0) ->
+connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
+ StreamRef, ReplyTo, Destination=#{host := Host0}, Headers0, InitialFlow0) ->
Host = case Host0 of
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
_ -> Host0
@@ -634,7 +670,8 @@ connect(State=#http_state{socket=Socket, transport=Transport, version=Version},
Transport:send(Socket, [
cow_http:request(<<"CONNECT">>, Authority, Version, Headers)
]),
- new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>).
+ InitialFlow = initial_flow(InitialFlow0, Opts),
+ new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>, InitialFlow).
%% We can't cancel anything, we can just stop forwarding messages to the owner.
cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
@@ -735,9 +772,9 @@ response_io_from_headers(_, Version, _Status, Headers) ->
%% Streams.
-new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method) ->
+new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method, InitialFlow) ->
State#http_state{streams=Streams
- ++ [#stream{ref=StreamRef, reply_to=ReplyTo,
+ ++ [#stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
method=iolist_to_binary(Method), is_alive=true}]}.
is_stream(#http_state{streams=Streams}, StreamRef) ->
@@ -787,8 +824,9 @@ ws_upgrade(State=#http_state{owner=ReplyTo, out=head},
{Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo,
<<"GET">>, Host, Port, Path, Headers, undefined,
EvHandler, EvHandlerState0, ?FUNCTION_NAME),
+ InitialFlow = maps:get(flow, WsOpts, infinity),
{new_stream(State#http_state{connection=Conn, out=Out},
- {websocket, StreamRef, Key, GunExtensions, WsOpts}, ReplyTo, <<"GET">>),
+ {websocket, StreamRef, Key, GunExtensions, WsOpts}, ReplyTo, <<"GET">>, InitialFlow),
EvHandlerState}.
ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) ->
@@ -853,8 +891,9 @@ ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) ->
end
end.
-ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport},
- StreamRef, Headers, Extensions, Handler, Opts) ->
+%% We know that the most recent stream is the Websocket one.
+ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport,
+ streams=[#stream{flow=InitialFlow}|_]}, StreamRef, Headers, Extensions, Handler, Opts) ->
%% Send ourselves the remaining buffer, if any.
_ = case Buffer of
<<>> ->
@@ -863,4 +902,4 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans
{OK, _, _} = Transport:messages(),
self() ! {OK, Socket, Buffer}
end,
- gun_ws:init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts).
+ gun_ws:init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Handler, Opts).
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 20e21ec..3b3b79b 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -18,10 +18,11 @@
-export([name/0]).
-export([init/4]).
-export([handle/4]).
+-export([update_flow/4]).
-export([close/4]).
-export([keepalive/1]).
--export([headers/10]).
--export([request/11]).
+-export([headers/11]).
+-export([request/12]).
-export([data/7]).
-export([cancel/5]).
-export([stream_info/2]).
@@ -36,6 +37,10 @@
%% Process to send messages to.
reply_to :: pid(),
+ %% Flow control.
+ flow :: integer() | infinity,
+ flow_window = 0 :: non_neg_integer(),
+
%% Content handlers state.
handler_state :: undefined | gun_content_handler:state()
}).
@@ -66,10 +71,15 @@ do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
ok -> do_check_options(Opts);
error -> {error, {options, {http2, Opt}}}
end;
+do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 ->
+ do_check_options(Opts);
do_check_options([{keepalive, infinity}|Opts]) ->
do_check_options(Opts);
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
do_check_options(Opts);
+%% @todo Add all http2_machine options.
+do_check_options([{max_frame_size_received, _}|Opts]) ->
+ do_check_options(Opts);
do_check_options([Opt|_]) ->
{error, {options, {http2, Opt}}}.
@@ -192,10 +202,20 @@ lingering_data_frame(State=#http2_state{socket=Socket, transport=Transport,
data_frame(State=#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0}, StreamID, IsFin, Data,
EvHandler, EvHandlerState0) ->
- Stream = #stream{ref=StreamRef, reply_to=ReplyTo,
- handler_state=Handlers0} = get_stream_by_id(State, StreamID),
- Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
+ Stream = #stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0,
+ flow_window=FlowWindow0, handler_state=Handlers0} = get_stream_by_id(State, StreamID),
+ {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0),
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 - Dec
+ end,
Size = byte_size(Data),
+ FlowWindow = if
+ IsFin =:= nofin, Flow =< 0 ->
+ FlowWindow0 + Size;
+ true ->
+ FlowWindow0
+ end,
{HTTP2Machine, EvHandlerState} = case Size of
%% We do not send a WINDOW_UPDATE if the DATA frame was of size 0.
0 when IsFin =:= fin ->
@@ -209,8 +229,11 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport,
_ ->
Transport:send(Socket, cow_http2:window_update(Size)),
HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0),
- %% We do not send a stream WINDOW_UPDATE if this was the last DATA frame.
+ %% We do not send a stream WINDOW_UPDATE when the flow control kicks in
+ %% (it'll be sent when the flow recovers) or for the last DATA frame.
case IsFin of
+ nofin when Flow =< 0 ->
+ {HTTP2Machine1, EvHandlerState0};
nofin ->
Transport:send(Socket, cow_http2:window_update(StreamID, Size)),
{cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1),
@@ -224,7 +247,7 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport,
end
end,
{maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine},
- Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin),
+ Stream#stream{flow=Flow, flow_window=FlowWindow, handler_state=Handlers}), StreamID, remote, IsFin),
EvHandlerState}.
headers_frame(State=#http2_state{content_handlers=Handlers0},
@@ -294,12 +317,13 @@ rst_stream_frame(State=#http2_state{streams=Streams0},
{State, EvHandlerState0}
end.
+%% Pushed streams receive the same initial flow value as the parent stream.
push_promise_frame(State=#http2_state{streams=Streams},
StreamID, PromisedStreamID, Headers, #{
method := Method, scheme := Scheme,
authority := Authority, path := Path},
EvHandler, EvHandlerState0) ->
- #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
+ #stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID),
PromisedStreamRef = make_ref(),
URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]),
ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers},
@@ -311,7 +335,8 @@ push_promise_frame(State=#http2_state{streams=Streams},
uri => URI,
headers => Headers
}, EvHandlerState0),
- NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, reply_to=ReplyTo},
+ NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef,
+ reply_to=ReplyTo, flow=InitialFlow},
{State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}.
ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
@@ -322,6 +347,28 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
terminate(State#http2_state{http2_machine=HTTP2Machine}, Error)
end.
+update_flow(State=#http2_state{socket=Socket, transport=Transport,
+ http2_machine=HTTP2Machine0}, _ReplyTo, StreamRef, Inc) ->
+ case get_stream_by_ref(State, StreamRef) of
+ Stream=#stream{id=StreamID, flow=Flow0, flow_window=FlowWindow} ->
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 + Inc
+ end,
+ if
+ %% Flow is active again, update the window.
+ Flow0 =< 0, Flow > 0 ->
+ Transport:send(Socket, cow_http2:window_update(StreamID, FlowWindow)),
+ HTTP2Machine = cow_http2_machine:update_window(StreamID, FlowWindow, HTTP2Machine0),
+ {state, store_stream(State#http2_state{http2_machine=HTTP2Machine},
+ Stream#stream{flow=Flow, flow_window=0})};
+ true ->
+ {state, store_stream(State, Stream#stream{flow=Flow})}
+ end;
+ false ->
+ []
+ end.
+
%% @todo Use Reason.
close(_, #http2_state{streams=Streams}, _, EvHandlerState) ->
{close_streams(Streams), EvHandlerState}.
@@ -337,10 +384,10 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
Transport:send(Socket, cow_http2:ping(0)),
State.
-headers(State=#http2_state{socket=Socket, transport=Transport,
+headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0, streams=Streams},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers0,
- EvHandler, EvHandlerState0) ->
+ InitialFlow0, EvHandler, EvHandlerState0) ->
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
iolist_to_binary(Method), HTTP2Machine0),
{ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0),
@@ -358,14 +405,15 @@ headers(State=#http2_state{socket=Socket, transport=Transport,
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
- Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo},
+ InitialFlow = initial_flow(InitialFlow0, Opts),
+ Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow},
{State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]},
EvHandlerState}.
-request(State=#http2_state{socket=Socket, transport=Transport,
+request(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0, streams=Streams},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body,
- EvHandler, EvHandlerState0) ->
+ InitialFlow0, EvHandler, EvHandlerState0) ->
Headers1 = lists:keystore(<<"content-length">>, 1, Headers0,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
@@ -385,11 +433,15 @@ request(State=#http2_state{socket=Socket, transport=Transport,
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
- Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo},
+ InitialFlow = initial_flow(InitialFlow0, Opts),
+ Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow},
maybe_send_data(State#http2_state{http2_machine=HTTP2Machine,
streams=[Stream|Streams]}, StreamID, fin, Body,
EvHandler, EvHandlerState).
+initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
+initial_flow(InitialFlow, _) -> InitialFlow.
+
prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, Headers0) ->
Authority = case lists:keyfind(<<"host">>, 1, Headers0) of
{_, Host} -> Host;
diff --git a/src/gun_sse_h.erl b/src/gun_sse_h.erl
index 11dc443..9d8836e 100644
--- a/src/gun_sse_h.erl
+++ b/src/gun_sse_h.erl
@@ -38,18 +38,22 @@ init(ReplyTo, StreamRef, _, Headers, _) ->
disable
end.
--spec handle(_, binary(), State) -> {done, State} when State::#state{}.
-handle(IsFin, Data, State=#state{reply_to=ReplyTo, stream_ref=StreamRef, sse_state=SSE0}) ->
+-spec handle(_, binary(), State) -> {done, non_neg_integer(), State} when State::#state{}.
+handle(IsFin, Data, State) ->
+ handle(IsFin, Data, State, 0).
+
+handle(IsFin, Data, State=#state{reply_to=ReplyTo, stream_ref=StreamRef, sse_state=SSE0}, Flow) ->
case cow_sse:parse(Data, SSE0) of
{event, Event, SSE} ->
ReplyTo ! {gun_sse, self(), StreamRef, Event},
- handle(IsFin, <<>>, State#state{sse_state=SSE});
+ handle(IsFin, <<>>, State#state{sse_state=SSE}, Flow + 1);
{more, SSE} ->
- _ = case IsFin of
+ Inc = case IsFin of
fin ->
- ReplyTo ! {gun_sse, self(), StreamRef, fin};
+ ReplyTo ! {gun_sse, self(), StreamRef, fin},
+ 1;
_ ->
- ok
+ 0
end,
- {done, State#state{sse_state=SSE}}
+ {done, Flow + Inc, State#state{sse_state=SSE}}
end.
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index 7acf74e..42cf049 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -16,8 +16,9 @@
-export([check_options/1]).
-export([name/0]).
--export([init/8]).
+-export([init/9]).
-export([handle/4]).
+-export([update_flow/4]).
-export([close/4]).
-export([send/4]).
-export([down/1]).
@@ -42,6 +43,7 @@
frag_state = undefined :: cow_ws:frag_state(),
utf8_state = 0 :: cow_ws:utf8_state(),
extensions = #{} :: cow_ws:extensions(),
+ flow :: integer() | infinity,
handler :: module(),
handler_state :: any()
}).
@@ -55,6 +57,8 @@ do_check_options([{compress, B}|Opts]) when B =:= true; B =:= false ->
do_check_options(Opts);
do_check_options([{default_protocol, M}|Opts]) when is_atom(M) ->
do_check_options(Opts);
+do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 ->
+ do_check_options(Opts);
do_check_options([Opt={protocols, L}|Opts]) when is_list(L) ->
case lists:usort(lists:flatten([[is_binary(B), is_atom(M)] || {B, M} <- L])) of
[true] -> do_check_options(Opts);
@@ -67,19 +71,19 @@ do_check_options([Opt|_]) ->
name() -> ws.
-init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) ->
+init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Handler, Opts) ->
Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
- HandlerState = Handler:init(Owner, StreamRef, Headers, Opts),
+ {ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts),
{switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef,
socket=Socket, transport=Transport, extensions=Extensions,
- handler=Handler, handler_state=HandlerState}}.
+ flow=InitialFlow, handler=Handler, handler_state=HandlerState}}.
%% Do not handle anything if we received a close frame.
handle(_, State=#ws_state{in=close}, _, EvHandlerState) ->
{{state, State}, EvHandlerState};
%% Shortcut for common case when Data is empty after processing a frame.
handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) ->
- {{state, State}, EvHandlerState};
+ maybe_active(State, EvHandlerState);
handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer,
in=head, frag_state=FragState, extensions=Extensions},
EvHandler, EvHandlerState0) ->
@@ -113,7 +117,7 @@ handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer,
in=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey},
frag_state=FragState2}, EvHandler, EvHandlerState);
more ->
- {{state, State#ws_state{buffer=Data2}}, EvHandlerState1};
+ maybe_active(State#ws_state{buffer=Data2}, EvHandlerState1);
error ->
close({error, badframe}, State, EvHandler, EvHandlerState1)
end;
@@ -130,20 +134,26 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke
<<Unmasked/binary, Payload/binary>>, CloseCode,
EvHandler, EvHandlerState);
{more, CloseCode2, Payload, Utf8State2} ->
- {{state, State#ws_state{in=In#payload{close_code=CloseCode2,
+ maybe_active(State#ws_state{in=In#payload{close_code=CloseCode2,
unmasked= <<Unmasked/binary, Payload/binary>>,
- len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}},
- EvHandlerState};
+ len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2},
+ EvHandlerState);
{more, Payload, Utf8State2} ->
- {{state, State#ws_state{in=In#payload{unmasked= <<Unmasked/binary, Payload/binary>>,
- len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}},
- EvHandlerState};
+ maybe_active(State#ws_state{in=In#payload{unmasked= <<Unmasked/binary, Payload/binary>>,
+ len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2},
+ EvHandlerState);
Error = {error, _Reason} ->
close(Error, State, EvHandler, EvHandlerState)
end.
+maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) ->
+ {[
+ {state, State},
+ {active, Flow > 0}
+ ], EvHandlerState}.
+
dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
- frag_state=FragState, extensions=Extensions,
+ frag_state=FragState, extensions=Extensions, flow=Flow0,
handler=Handler, handler_state=HandlerState0},
Type, Payload, CloseCode, EvHandler, EvHandlerState0) ->
EvHandlerState1 = EvHandler:ws_recv_frame_end(#{
@@ -165,8 +175,12 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
{pong, _} ->
handle(Rest, State0, EvHandler, EvHandlerState1);
Frame ->
- HandlerState = Handler:handle(Frame, HandlerState0),
- State1 = State0#ws_state{handler_state=HandlerState},
+ {ok, Dec, HandlerState} = Handler:handle(Frame, HandlerState0),
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 - Dec
+ end,
+ State1 = State0#ws_state{flow=Flow, handler_state=HandlerState},
State = case Frame of
close -> State1#ws_state{in=close};
{close, _, _} -> State1#ws_state{in=close};
@@ -176,6 +190,16 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
handle(Rest, State, EvHandler, EvHandlerState1)
end.
+update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) ->
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 + Inc
+ end,
+ [
+ {state, State#ws_state{flow=Flow}},
+ {active, Flow > 0}
+ ].
+
close(Reason, State, EvHandler, EvHandlerState) ->
case Reason of
normal ->
diff --git a/src/gun_ws_h.erl b/src/gun_ws_h.erl
index 5ff0646..4859532 100644
--- a/src/gun_ws_h.erl
+++ b/src/gun_ws_h.erl
@@ -24,15 +24,15 @@
}).
init(ReplyTo, StreamRef, _, _) ->
- #state{reply_to=ReplyTo, stream_ref=StreamRef}.
+ {ok, #state{reply_to=ReplyTo, stream_ref=StreamRef}}.
handle({fragment, nofin, _, Payload},
State=#state{frag_buffer=SoFar}) ->
- State#state{frag_buffer= << SoFar/binary, Payload/binary >>};
+ {ok, 0, State#state{frag_buffer= << SoFar/binary, Payload/binary >>}};
handle({fragment, fin, Type, Payload},
State=#state{reply_to=ReplyTo, stream_ref=StreamRef, frag_buffer=SoFar}) ->
ReplyTo ! {gun_ws, self(), StreamRef, {Type, << SoFar/binary, Payload/binary >>}},
- State#state{frag_buffer= <<>>};
+ {ok, 1, State#state{frag_buffer= <<>>}};
handle(Frame, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) ->
ReplyTo ! {gun_ws, self(), StreamRef, Frame},
- State.
+ {ok, 1, State}.
diff --git a/test/flow_SUITE.erl b/test/flow_SUITE.erl
new file mode 100644
index 0000000..937af26
--- /dev/null
+++ b/test/flow_SUITE.erl
@@ -0,0 +1,325 @@
+%% Copyright (c) 2019, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(flow_SUITE).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-import(ct_helper, [doc/1]).
+
+all() ->
+ [{group, flow}].
+
+groups() ->
+ [{flow, [parallel], ct_helper:all(?MODULE)}].
+
+%% Tests.
+
+default_flow_http(_) ->
+ doc("Confirm flow control default can be changed and overriden for HTTP/1.1."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ %% First we check that we can set the flow for the entire connection.
+ {ok, ConnPid1} = gun:open("localhost", Port, #{
+ http_opts => #{flow => 1}
+ }),
+ {ok, http} = gun:await_up(ConnPid1),
+ StreamRef1 = gun:get(ConnPid1, "/"),
+ {response, nofin, 200, _} = gun:await(ConnPid1, StreamRef1),
+ {data, nofin, _} = gun:await(ConnPid1, StreamRef1),
+ {error, timeout} = gun:await(ConnPid1, StreamRef1, 1500),
+ gun:close(ConnPid1),
+ %% Then we confirm that we can override it per request.
+ {ok, ConnPid2} = gun:open("localhost", Port, #{
+ http_opts => #{flow => 1}
+ }),
+ {ok, http} = gun:await_up(ConnPid2),
+ StreamRef2 = gun:get(ConnPid2, "/", [], #{flow => 2}),
+ {response, nofin, 200, _} = gun:await(ConnPid2, StreamRef2),
+ {data, nofin, _} = gun:await(ConnPid2, StreamRef2),
+ {data, nofin, _} = gun:await(ConnPid2, StreamRef2),
+ {error, timeout} = gun:await(ConnPid2, StreamRef2, 1500),
+ gun:close(ConnPid2)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+default_flow_http2(_) ->
+ doc("Confirm flow control default can be changed and overriden for HTTP/2."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, 40000}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ %% First we check that we can set the flow for the entire connection.
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ http2_opts => #{
+ flow => 1,
+ %% We set the max frame size to the same as the initial
+ %% window size in order to reduce the number of data messages.
+ max_frame_size_received => 65535
+ },
+ protocols => [http2]
+ }),
+ {ok, http2} = gun:await_up(ConnPid),
+ StreamRef1 = gun:get(ConnPid, "/"),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ %% We set the flow to 1 therefore we will receive *2* data messages,
+ %% and then nothing because the window was fully consumed.
+ {data, nofin, _} = gun:await(ConnPid, StreamRef1),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef1),
+ {error, timeout} = gun:await(ConnPid, StreamRef1, 1500),
+ %% Then we confirm that we can override it per request.
+ StreamRef2 = gun:get(ConnPid, "/", [], #{flow => 2}),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef2),
+ %% We set the flow to 2 therefore we will receive *3* data messages
+ %% and then nothing because two windows have been fully consumed.
+ {data, nofin, _} = gun:await(ConnPid, StreamRef2),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef2),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef2),
+ {error, timeout} = gun:await(ConnPid, StreamRef2, 1500),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+flow_http(_) ->
+ doc("Confirm flow control works as intended for HTTP/1.1."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ %% We set the flow to 1 therefore we will receive 1 data message,
+ %% and then nothing because Gun doesn't read from the socket.
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 3000),
+ %% We then update the flow and get 2 more data messages but no more.
+ gun:update_flow(ConnPid, StreamRef, 2),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 1000),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+flow_http2(_) ->
+ doc("Confirm flow control works as intended for HTTP/2."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, 40000}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ %% We set the max frame size to the same as the initial
+ %% window size in order to reduce the number of data messages.
+ http2_opts => #{max_frame_size_received => 65535},
+ protocols => [http2]
+ }),
+ {ok, http2} = gun:await_up(ConnPid),
+ StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ %% We set the flow to 1 therefore we will receive *2* data messages,
+ %% and then nothing because the window was fully consumed.
+ {data, nofin, D1} = gun:await(ConnPid, StreamRef),
+ {data, nofin, D2} = gun:await(ConnPid, StreamRef),
+ %% We consumed all the window available.
+ 65535 = byte_size(D1) + byte_size(D2),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 3000),
+ %% We then update the flow and get *3* more data messages but no more.
+ gun:update_flow(ConnPid, StreamRef, 2),
+ {data, nofin, D3} = gun:await(ConnPid, StreamRef),
+ {data, nofin, D4} = gun:await(ConnPid, StreamRef),
+ {data, nofin, D5} = gun:await(ConnPid, StreamRef),
+ %% We consumed all the window available again.
+ %% D3 is the end of the truncated D2, D4 is full and D5 truncated.
+ 65535 = byte_size(D3) + byte_size(D4) + byte_size(D5),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 1000),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+flow_ws(_) ->
+ doc("Confirm flow control works as intended for Websocket."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", ws_echo_h, []}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/", [], #{flow => 1}),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ %% We send 2 frames with some time in between to make sure that
+ %% Gun handles them in separate Protocol:handle calls.
+ Frame = {text, <<"Hello!">>},
+ gun:ws_send(ConnPid, Frame),
+ timer:sleep(100),
+ gun:ws_send(ConnPid, Frame),
+ %% We set the flow to 1 therefore we will receive 1 data message,
+ %% and then nothing because Gun doesn't read from the socket.
+ {ws, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 3000),
+ %% We send 2 more frames.
+ gun:ws_send(ConnPid, Frame),
+ timer:sleep(100),
+ gun:ws_send(ConnPid, Frame),
+ %% We then update the flow and get 2 more data messages but no more.
+ gun:update_flow(ConnPid, StreamRef, 2),
+ {ws, _} = gun:await(ConnPid, StreamRef),
+ {ws, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 1000),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+no_flow_http(_) ->
+ doc("Ignore flow updates for no-flow streams for HTTP/1.1."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:get(ConnPid, "/", []),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ gun:update_flow(ConnPid, StreamRef, 2),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+no_flow_http2(_) ->
+ doc("Ignore flow updates for no-flow streams for HTTP/2."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ protocols => [http2]
+ }),
+ {ok, http2} = gun:await_up(ConnPid),
+ StreamRef = gun:get(ConnPid, "/", []),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ gun:update_flow(ConnPid, StreamRef, 2),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+no_flow_ws(_) ->
+ doc("Ignore flow updates for no-flow streams for Websocket."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", ws_echo_h, []}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ gun:update_flow(ConnPid, StreamRef, 2),
+ Frame = {text, <<"Hello!">>},
+ gun:ws_send(ConnPid, Frame),
+ timer:sleep(100),
+ gun:ws_send(ConnPid, Frame),
+ {ws, _} = gun:await(ConnPid, StreamRef),
+ {ws, _} = gun:await(ConnPid, StreamRef),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+sse_flow_http(_) ->
+ doc("Confirm flow control works as intended for HTTP/1.1 "
+ "when using the gun_sse_h content handler."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ http_opts => #{content_handlers => [gun_sse_h, gun_data_h]}
+ }),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ %% We set the flow to 1 therefore we will receive 1 event message,
+ %% and then nothing because Gun doesn't read from the socket. We
+ %% set the timeout to 2500 to ensure there is only going to be one
+ %% message queued up.
+ {sse, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 2500),
+ %% We then update the flow and get 2 more event messages but no more.
+ gun:update_flow(ConnPid, StreamRef, 2),
+ {sse, _} = gun:await(ConnPid, StreamRef),
+ {sse, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 1000),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+sse_flow_http2(_) ->
+ doc("Confirm flow control works as intended for HTTP/2 "
+ "when using the gun_sse_h content handler."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, 40000}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ %% We set the max frame size to the same as the initial
+ %% window size in order to reduce the number of data messages.
+ http2_opts => #{
+ content_handlers => [gun_sse_h, gun_data_h],
+ max_frame_size_received => 65535
+ },
+ protocols => [http2]
+ }),
+ {ok, http2} = gun:await_up(ConnPid),
+ StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ %% We set the flow to 1 therefore we will receive 1 event message,
+ %% and then nothing because the window was fully consumed before
+ %% the second event was fully received.
+ {sse, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 3000),
+ %% We then update the flow and get 2 more event messages but no more.
+ gun:update_flow(ConnPid, StreamRef, 2),
+ {sse, _} = gun:await(ConnPid, StreamRef),
+ {sse, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 1000),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
diff --git a/test/handlers/sse_clock_h.erl b/test/handlers/sse_clock_h.erl
index 8c18ac5..dcc7c3f 100644
--- a/test/handlers/sse_clock_h.erl
+++ b/test/handlers/sse_clock_h.erl
@@ -15,6 +15,11 @@ init(Req, State) ->
info(timeout, Req, State) ->
erlang:send_after(1000, self(), timeout),
cowboy_req:stream_events(#{
- data => cowboy_clock:rfc1123()
+ data => data(State)
}, nofin, Req),
{ok, Req, State}.
+
+data(date) ->
+ cowboy_clock:rfc1123();
+data(Size) when is_integer(Size) ->
+ lists:duplicate(Size, $0).
diff --git a/test/sse_SUITE.erl b/test/sse_SUITE.erl
index f950bf7..dc46311 100644
--- a/test/sse_SUITE.erl
+++ b/test/sse_SUITE.erl
@@ -31,7 +31,7 @@ end_per_suite(Config) ->
init_routes() -> [
{"localhost", [
- {"/clock", sse_clock_h, []},
+ {"/clock", sse_clock_h, date},
{"/lone_id", sse_lone_id_h, []}
]}
].