aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-08-02 14:30:08 +0200
committerLoïc Hoguin <[email protected]>2019-08-05 19:57:13 +0200
commit611f9a9b78cab4005892e13dffb7a2c8e44580ee (patch)
treed8d3fc407110ea12333ba122cf711326e82a7070 /test
parent145b9af4bdbb85e2f83959ee8abaa4d9207a4529 (diff)
downloadgun-611f9a9b78cab4005892e13dffb7a2c8e44580ee.tar.gz
gun-611f9a9b78cab4005892e13dffb7a2c8e44580ee.tar.bz2
gun-611f9a9b78cab4005892e13dffb7a2c8e44580ee.zip
Add flow control
Flow control is disabled by default. The initial flow value must be set to enable it (either for the entire connection or on a per-request basis). Flow applies to all HTTP streams as well as Websocket. HTTP/2 pushed streams receive the same value as their originating stream.
Diffstat (limited to 'test')
-rw-r--r--test/flow_SUITE.erl325
-rw-r--r--test/handlers/sse_clock_h.erl7
-rw-r--r--test/sse_SUITE.erl2
3 files changed, 332 insertions, 2 deletions
diff --git a/test/flow_SUITE.erl b/test/flow_SUITE.erl
new file mode 100644
index 0000000..937af26
--- /dev/null
+++ b/test/flow_SUITE.erl
@@ -0,0 +1,325 @@
+%% Copyright (c) 2019, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(flow_SUITE).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-import(ct_helper, [doc/1]).
+
+all() ->
+ [{group, flow}].
+
+groups() ->
+ [{flow, [parallel], ct_helper:all(?MODULE)}].
+
+%% Tests.
+
+default_flow_http(_) ->
+ doc("Confirm flow control default can be changed and overriden for HTTP/1.1."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ %% First we check that we can set the flow for the entire connection.
+ {ok, ConnPid1} = gun:open("localhost", Port, #{
+ http_opts => #{flow => 1}
+ }),
+ {ok, http} = gun:await_up(ConnPid1),
+ StreamRef1 = gun:get(ConnPid1, "/"),
+ {response, nofin, 200, _} = gun:await(ConnPid1, StreamRef1),
+ {data, nofin, _} = gun:await(ConnPid1, StreamRef1),
+ {error, timeout} = gun:await(ConnPid1, StreamRef1, 1500),
+ gun:close(ConnPid1),
+ %% Then we confirm that we can override it per request.
+ {ok, ConnPid2} = gun:open("localhost", Port, #{
+ http_opts => #{flow => 1}
+ }),
+ {ok, http} = gun:await_up(ConnPid2),
+ StreamRef2 = gun:get(ConnPid2, "/", [], #{flow => 2}),
+ {response, nofin, 200, _} = gun:await(ConnPid2, StreamRef2),
+ {data, nofin, _} = gun:await(ConnPid2, StreamRef2),
+ {data, nofin, _} = gun:await(ConnPid2, StreamRef2),
+ {error, timeout} = gun:await(ConnPid2, StreamRef2, 1500),
+ gun:close(ConnPid2)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+default_flow_http2(_) ->
+ doc("Confirm flow control default can be changed and overriden for HTTP/2."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, 40000}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ %% First we check that we can set the flow for the entire connection.
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ http2_opts => #{
+ flow => 1,
+ %% We set the max frame size to the same as the initial
+ %% window size in order to reduce the number of data messages.
+ max_frame_size_received => 65535
+ },
+ protocols => [http2]
+ }),
+ {ok, http2} = gun:await_up(ConnPid),
+ StreamRef1 = gun:get(ConnPid, "/"),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ %% We set the flow to 1 therefore we will receive *2* data messages,
+ %% and then nothing because the window was fully consumed.
+ {data, nofin, _} = gun:await(ConnPid, StreamRef1),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef1),
+ {error, timeout} = gun:await(ConnPid, StreamRef1, 1500),
+ %% Then we confirm that we can override it per request.
+ StreamRef2 = gun:get(ConnPid, "/", [], #{flow => 2}),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef2),
+ %% We set the flow to 2 therefore we will receive *3* data messages
+ %% and then nothing because two windows have been fully consumed.
+ {data, nofin, _} = gun:await(ConnPid, StreamRef2),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef2),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef2),
+ {error, timeout} = gun:await(ConnPid, StreamRef2, 1500),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+flow_http(_) ->
+ doc("Confirm flow control works as intended for HTTP/1.1."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ %% We set the flow to 1 therefore we will receive 1 data message,
+ %% and then nothing because Gun doesn't read from the socket.
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 3000),
+ %% We then update the flow and get 2 more data messages but no more.
+ gun:update_flow(ConnPid, StreamRef, 2),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 1000),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+flow_http2(_) ->
+ doc("Confirm flow control works as intended for HTTP/2."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, 40000}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ %% We set the max frame size to the same as the initial
+ %% window size in order to reduce the number of data messages.
+ http2_opts => #{max_frame_size_received => 65535},
+ protocols => [http2]
+ }),
+ {ok, http2} = gun:await_up(ConnPid),
+ StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ %% We set the flow to 1 therefore we will receive *2* data messages,
+ %% and then nothing because the window was fully consumed.
+ {data, nofin, D1} = gun:await(ConnPid, StreamRef),
+ {data, nofin, D2} = gun:await(ConnPid, StreamRef),
+ %% We consumed all the window available.
+ 65535 = byte_size(D1) + byte_size(D2),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 3000),
+ %% We then update the flow and get *3* more data messages but no more.
+ gun:update_flow(ConnPid, StreamRef, 2),
+ {data, nofin, D3} = gun:await(ConnPid, StreamRef),
+ {data, nofin, D4} = gun:await(ConnPid, StreamRef),
+ {data, nofin, D5} = gun:await(ConnPid, StreamRef),
+ %% We consumed all the window available again.
+ %% D3 is the end of the truncated D2, D4 is full and D5 truncated.
+ 65535 = byte_size(D3) + byte_size(D4) + byte_size(D5),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 1000),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+flow_ws(_) ->
+ doc("Confirm flow control works as intended for Websocket."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", ws_echo_h, []}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/", [], #{flow => 1}),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ %% We send 2 frames with some time in between to make sure that
+ %% Gun handles them in separate Protocol:handle calls.
+ Frame = {text, <<"Hello!">>},
+ gun:ws_send(ConnPid, Frame),
+ timer:sleep(100),
+ gun:ws_send(ConnPid, Frame),
+ %% We set the flow to 1 therefore we will receive 1 data message,
+ %% and then nothing because Gun doesn't read from the socket.
+ {ws, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 3000),
+ %% We send 2 more frames.
+ gun:ws_send(ConnPid, Frame),
+ timer:sleep(100),
+ gun:ws_send(ConnPid, Frame),
+ %% We then update the flow and get 2 more data messages but no more.
+ gun:update_flow(ConnPid, StreamRef, 2),
+ {ws, _} = gun:await(ConnPid, StreamRef),
+ {ws, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 1000),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+no_flow_http(_) ->
+ doc("Ignore flow updates for no-flow streams for HTTP/1.1."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:get(ConnPid, "/", []),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ gun:update_flow(ConnPid, StreamRef, 2),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+no_flow_http2(_) ->
+ doc("Ignore flow updates for no-flow streams for HTTP/2."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ protocols => [http2]
+ }),
+ {ok, http2} = gun:await_up(ConnPid),
+ StreamRef = gun:get(ConnPid, "/", []),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ gun:update_flow(ConnPid, StreamRef, 2),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ {data, nofin, _} = gun:await(ConnPid, StreamRef),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+no_flow_ws(_) ->
+ doc("Ignore flow updates for no-flow streams for Websocket."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", ws_echo_h, []}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ gun:update_flow(ConnPid, StreamRef, 2),
+ Frame = {text, <<"Hello!">>},
+ gun:ws_send(ConnPid, Frame),
+ timer:sleep(100),
+ gun:ws_send(ConnPid, Frame),
+ {ws, _} = gun:await(ConnPid, StreamRef),
+ {ws, _} = gun:await(ConnPid, StreamRef),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+sse_flow_http(_) ->
+ doc("Confirm flow control works as intended for HTTP/1.1 "
+ "when using the gun_sse_h content handler."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ http_opts => #{content_handlers => [gun_sse_h, gun_data_h]}
+ }),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ %% We set the flow to 1 therefore we will receive 1 event message,
+ %% and then nothing because Gun doesn't read from the socket. We
+ %% set the timeout to 2500 to ensure there is only going to be one
+ %% message queued up.
+ {sse, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 2500),
+ %% We then update the flow and get 2 more event messages but no more.
+ gun:update_flow(ConnPid, StreamRef, 2),
+ {sse, _} = gun:await(ConnPid, StreamRef),
+ {sse, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 1000),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+sse_flow_http2(_) ->
+ doc("Confirm flow control works as intended for HTTP/2 "
+ "when using the gun_sse_h content handler."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{
+ dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, 40000}]}])
+ }}),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ %% We set the max frame size to the same as the initial
+ %% window size in order to reduce the number of data messages.
+ http2_opts => #{
+ content_handlers => [gun_sse_h, gun_data_h],
+ max_frame_size_received => 65535
+ },
+ protocols => [http2]
+ }),
+ {ok, http2} = gun:await_up(ConnPid),
+ StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ %% We set the flow to 1 therefore we will receive 1 event message,
+ %% and then nothing because the window was fully consumed before
+ %% the second event was fully received.
+ {sse, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 3000),
+ %% We then update the flow and get 2 more event messages but no more.
+ gun:update_flow(ConnPid, StreamRef, 2),
+ {sse, _} = gun:await(ConnPid, StreamRef),
+ {sse, _} = gun:await(ConnPid, StreamRef),
+ {error, timeout} = gun:await(ConnPid, StreamRef, 1000),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
diff --git a/test/handlers/sse_clock_h.erl b/test/handlers/sse_clock_h.erl
index 8c18ac5..dcc7c3f 100644
--- a/test/handlers/sse_clock_h.erl
+++ b/test/handlers/sse_clock_h.erl
@@ -15,6 +15,11 @@ init(Req, State) ->
info(timeout, Req, State) ->
erlang:send_after(1000, self(), timeout),
cowboy_req:stream_events(#{
- data => cowboy_clock:rfc1123()
+ data => data(State)
}, nofin, Req),
{ok, Req, State}.
+
+data(date) ->
+ cowboy_clock:rfc1123();
+data(Size) when is_integer(Size) ->
+ lists:duplicate(Size, $0).
diff --git a/test/sse_SUITE.erl b/test/sse_SUITE.erl
index f950bf7..dc46311 100644
--- a/test/sse_SUITE.erl
+++ b/test/sse_SUITE.erl
@@ -31,7 +31,7 @@ end_per_suite(Config) ->
init_routes() -> [
{"localhost", [
- {"/clock", sse_clock_h, []},
+ {"/clock", sse_clock_h, date},
{"/lone_id", sse_lone_id_h, []}
]}
].