From 611f9a9b78cab4005892e13dffb7a2c8e44580ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 2 Aug 2019 14:30:08 +0200 Subject: Add flow control Flow control is disabled by default. The initial flow value must be set to enable it (either for the entire connection or on a per-request basis). Flow applies to all HTTP streams as well as Websocket. HTTP/2 pushed streams receive the same value as their originating stream. --- test/flow_SUITE.erl | 325 ++++++++++++++++++++++++++++++++++++++++++ test/handlers/sse_clock_h.erl | 7 +- test/sse_SUITE.erl | 2 +- 3 files changed, 332 insertions(+), 2 deletions(-) create mode 100644 test/flow_SUITE.erl (limited to 'test') diff --git a/test/flow_SUITE.erl b/test/flow_SUITE.erl new file mode 100644 index 0000000..937af26 --- /dev/null +++ b/test/flow_SUITE.erl @@ -0,0 +1,325 @@ +%% Copyright (c) 2019, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(flow_SUITE). +-compile(export_all). +-compile(nowarn_export_all). + +-import(ct_helper, [doc/1]). + +all() -> + [{group, flow}]. + +groups() -> + [{flow, [parallel], ct_helper:all(?MODULE)}]. + +%% Tests. + +default_flow_http(_) -> + doc("Confirm flow control default can be changed and overriden for HTTP/1.1."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + %% First we check that we can set the flow for the entire connection. + {ok, ConnPid1} = gun:open("localhost", Port, #{ + http_opts => #{flow => 1} + }), + {ok, http} = gun:await_up(ConnPid1), + StreamRef1 = gun:get(ConnPid1, "/"), + {response, nofin, 200, _} = gun:await(ConnPid1, StreamRef1), + {data, nofin, _} = gun:await(ConnPid1, StreamRef1), + {error, timeout} = gun:await(ConnPid1, StreamRef1, 1500), + gun:close(ConnPid1), + %% Then we confirm that we can override it per request. + {ok, ConnPid2} = gun:open("localhost", Port, #{ + http_opts => #{flow => 1} + }), + {ok, http} = gun:await_up(ConnPid2), + StreamRef2 = gun:get(ConnPid2, "/", [], #{flow => 2}), + {response, nofin, 200, _} = gun:await(ConnPid2, StreamRef2), + {data, nofin, _} = gun:await(ConnPid2, StreamRef2), + {data, nofin, _} = gun:await(ConnPid2, StreamRef2), + {error, timeout} = gun:await(ConnPid2, StreamRef2, 1500), + gun:close(ConnPid2) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +default_flow_http2(_) -> + doc("Confirm flow control default can be changed and overriden for HTTP/2."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, 40000}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + %% First we check that we can set the flow for the entire connection. + {ok, ConnPid} = gun:open("localhost", Port, #{ + http2_opts => #{ + flow => 1, + %% We set the max frame size to the same as the initial + %% window size in order to reduce the number of data messages. + max_frame_size_received => 65535 + }, + protocols => [http2] + }), + {ok, http2} = gun:await_up(ConnPid), + StreamRef1 = gun:get(ConnPid, "/"), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1), + %% We set the flow to 1 therefore we will receive *2* data messages, + %% and then nothing because the window was fully consumed. + {data, nofin, _} = gun:await(ConnPid, StreamRef1), + {data, nofin, _} = gun:await(ConnPid, StreamRef1), + {error, timeout} = gun:await(ConnPid, StreamRef1, 1500), + %% Then we confirm that we can override it per request. + StreamRef2 = gun:get(ConnPid, "/", [], #{flow => 2}), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef2), + %% We set the flow to 2 therefore we will receive *3* data messages + %% and then nothing because two windows have been fully consumed. + {data, nofin, _} = gun:await(ConnPid, StreamRef2), + {data, nofin, _} = gun:await(ConnPid, StreamRef2), + {data, nofin, _} = gun:await(ConnPid, StreamRef2), + {error, timeout} = gun:await(ConnPid, StreamRef2, 1500), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +flow_http(_) -> + doc("Confirm flow control works as intended for HTTP/1.1."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + %% We set the flow to 1 therefore we will receive 1 data message, + %% and then nothing because Gun doesn't read from the socket. + {data, nofin, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 3000), + %% We then update the flow and get 2 more data messages but no more. + gun:update_flow(ConnPid, StreamRef, 2), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 1000), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +flow_http2(_) -> + doc("Confirm flow control works as intended for HTTP/2."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, 40000}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port, #{ + %% We set the max frame size to the same as the initial + %% window size in order to reduce the number of data messages. + http2_opts => #{max_frame_size_received => 65535}, + protocols => [http2] + }), + {ok, http2} = gun:await_up(ConnPid), + StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + %% We set the flow to 1 therefore we will receive *2* data messages, + %% and then nothing because the window was fully consumed. + {data, nofin, D1} = gun:await(ConnPid, StreamRef), + {data, nofin, D2} = gun:await(ConnPid, StreamRef), + %% We consumed all the window available. + 65535 = byte_size(D1) + byte_size(D2), + {error, timeout} = gun:await(ConnPid, StreamRef, 3000), + %% We then update the flow and get *3* more data messages but no more. + gun:update_flow(ConnPid, StreamRef, 2), + {data, nofin, D3} = gun:await(ConnPid, StreamRef), + {data, nofin, D4} = gun:await(ConnPid, StreamRef), + {data, nofin, D5} = gun:await(ConnPid, StreamRef), + %% We consumed all the window available again. + %% D3 is the end of the truncated D2, D4 is full and D5 truncated. + 65535 = byte_size(D3) + byte_size(D4) + byte_size(D5), + {error, timeout} = gun:await(ConnPid, StreamRef, 1000), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +flow_ws(_) -> + doc("Confirm flow control works as intended for Websocket."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", ws_echo_h, []}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/", [], #{flow => 1}), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + %% We send 2 frames with some time in between to make sure that + %% Gun handles them in separate Protocol:handle calls. + Frame = {text, <<"Hello!">>}, + gun:ws_send(ConnPid, Frame), + timer:sleep(100), + gun:ws_send(ConnPid, Frame), + %% We set the flow to 1 therefore we will receive 1 data message, + %% and then nothing because Gun doesn't read from the socket. + {ws, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 3000), + %% We send 2 more frames. + gun:ws_send(ConnPid, Frame), + timer:sleep(100), + gun:ws_send(ConnPid, Frame), + %% We then update the flow and get 2 more data messages but no more. + gun:update_flow(ConnPid, StreamRef, 2), + {ws, _} = gun:await(ConnPid, StreamRef), + {ws, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 1000), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +no_flow_http(_) -> + doc("Ignore flow updates for no-flow streams for HTTP/1.1."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:get(ConnPid, "/", []), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + gun:update_flow(ConnPid, StreamRef, 2), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +no_flow_http2(_) -> + doc("Ignore flow updates for no-flow streams for HTTP/2."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port, #{ + protocols => [http2] + }), + {ok, http2} = gun:await_up(ConnPid), + StreamRef = gun:get(ConnPid, "/", []), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + gun:update_flow(ConnPid, StreamRef, 2), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + {data, nofin, _} = gun:await(ConnPid, StreamRef), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +no_flow_ws(_) -> + doc("Ignore flow updates for no-flow streams for Websocket."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", ws_echo_h, []}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + gun:update_flow(ConnPid, StreamRef, 2), + Frame = {text, <<"Hello!">>}, + gun:ws_send(ConnPid, Frame), + timer:sleep(100), + gun:ws_send(ConnPid, Frame), + {ws, _} = gun:await(ConnPid, StreamRef), + {ws, _} = gun:await(ConnPid, StreamRef), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +sse_flow_http(_) -> + doc("Confirm flow control works as intended for HTTP/1.1 " + "when using the gun_sse_h content handler."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, date}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port, #{ + http_opts => #{content_handlers => [gun_sse_h, gun_data_h]} + }), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + %% We set the flow to 1 therefore we will receive 1 event message, + %% and then nothing because Gun doesn't read from the socket. We + %% set the timeout to 2500 to ensure there is only going to be one + %% message queued up. + {sse, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 2500), + %% We then update the flow and get 2 more event messages but no more. + gun:update_flow(ConnPid, StreamRef, 2), + {sse, _} = gun:await(ConnPid, StreamRef), + {sse, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 1000), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +sse_flow_http2(_) -> + doc("Confirm flow control works as intended for HTTP/2 " + "when using the gun_sse_h content handler."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{env => #{ + dispatch => cowboy_router:compile([{'_', [{"/", sse_clock_h, 40000}]}]) + }}), + Port = ranch:get_port(?FUNCTION_NAME), + try + {ok, ConnPid} = gun:open("localhost", Port, #{ + %% We set the max frame size to the same as the initial + %% window size in order to reduce the number of data messages. + http2_opts => #{ + content_handlers => [gun_sse_h, gun_data_h], + max_frame_size_received => 65535 + }, + protocols => [http2] + }), + {ok, http2} = gun:await_up(ConnPid), + StreamRef = gun:get(ConnPid, "/", [], #{flow => 1}), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + %% We set the flow to 1 therefore we will receive 1 event message, + %% and then nothing because the window was fully consumed before + %% the second event was fully received. + {sse, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 3000), + %% We then update the flow and get 2 more event messages but no more. + gun:update_flow(ConnPid, StreamRef, 2), + {sse, _} = gun:await(ConnPid, StreamRef), + {sse, _} = gun:await(ConnPid, StreamRef), + {error, timeout} = gun:await(ConnPid, StreamRef, 1000), + gun:close(ConnPid) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. diff --git a/test/handlers/sse_clock_h.erl b/test/handlers/sse_clock_h.erl index 8c18ac5..dcc7c3f 100644 --- a/test/handlers/sse_clock_h.erl +++ b/test/handlers/sse_clock_h.erl @@ -15,6 +15,11 @@ init(Req, State) -> info(timeout, Req, State) -> erlang:send_after(1000, self(), timeout), cowboy_req:stream_events(#{ - data => cowboy_clock:rfc1123() + data => data(State) }, nofin, Req), {ok, Req, State}. + +data(date) -> + cowboy_clock:rfc1123(); +data(Size) when is_integer(Size) -> + lists:duplicate(Size, $0). diff --git a/test/sse_SUITE.erl b/test/sse_SUITE.erl index f950bf7..dc46311 100644 --- a/test/sse_SUITE.erl +++ b/test/sse_SUITE.erl @@ -31,7 +31,7 @@ end_per_suite(Config) -> init_routes() -> [ {"localhost", [ - {"/clock", sse_clock_h, []}, + {"/clock", sse_clock_h, date}, {"/lone_id", sse_lone_id_h, []} ]} ]. -- cgit v1.2.3