diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/handlers/ws_active_commands_h.erl | 4 | ||||
-rw-r--r-- | test/handlers/ws_deflate_commands_h.erl | 7 | ||||
-rw-r--r-- | test/handlers/ws_handle_commands_h.erl | 14 | ||||
-rw-r--r-- | test/handlers/ws_ignore.erl | 1 | ||||
-rw-r--r-- | test/handlers/ws_info_commands_h.erl | 14 | ||||
-rw-r--r-- | test/handlers/ws_init_commands_h.erl | 14 | ||||
-rw-r--r-- | test/handlers/ws_init_h.erl | 4 | ||||
-rw-r--r-- | test/handlers/ws_set_options_commands_h.erl | 7 | ||||
-rw-r--r-- | test/handlers/ws_shutdown_reason_commands_h.erl | 4 | ||||
-rw-r--r-- | test/rfc7231_SUITE.erl | 2 | ||||
-rw-r--r-- | test/rfc8441_SUITE.erl | 3 | ||||
-rw-r--r-- | test/ws_SUITE.erl | 2 | ||||
-rw-r--r-- | test/ws_SUITE_data/ws_echo.erl | 1 | ||||
-rw-r--r-- | test/ws_handler_SUITE.erl | 82 | ||||
-rw-r--r-- | test/ws_perf_SUITE.erl | 187 |
15 files changed, 292 insertions, 54 deletions
diff --git a/test/handlers/ws_active_commands_h.erl b/test/handlers/ws_active_commands_h.erl index 1c615e3..bbf8907 100644 --- a/test/handlers/ws_active_commands_h.erl +++ b/test/handlers/ws_active_commands_h.erl @@ -9,8 +9,8 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, RunOrHibernate) -> - {cowboy_websocket, Req, RunOrHibernate}. +init(Req, Opts) -> + {cowboy_websocket, Req, maps:get(run_or_hibernate, Opts), Opts}. websocket_init(State=run) -> erlang:send_after(1500, self(), active_true), diff --git a/test/handlers/ws_deflate_commands_h.erl b/test/handlers/ws_deflate_commands_h.erl index 14236bc..be3e16e 100644 --- a/test/handlers/ws_deflate_commands_h.erl +++ b/test/handlers/ws_deflate_commands_h.erl @@ -8,10 +8,11 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, RunOrHibernate) -> +init(Req, Opts) -> + DataDelivery = maps:get(data_delivery, Opts, stream_handlers), {cowboy_websocket, Req, - #{deflate => true, hibernate => RunOrHibernate}, - #{compress => true}}. + #{deflate => true, hibernate => maps:get(run_or_hibernate, Opts)}, + #{compress => true, data_delivery => DataDelivery}}. websocket_handle(Frame, State=#{deflate := Deflate0, hibernate := run}) -> Deflate = not Deflate0, diff --git a/test/handlers/ws_handle_commands_h.erl b/test/handlers/ws_handle_commands_h.erl index da3ffad..0a8513b 100644 --- a/test/handlers/ws_handle_commands_h.erl +++ b/test/handlers/ws_handle_commands_h.erl @@ -9,14 +9,20 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req=#{pid := Pid}, RunOrHibernate) -> +init(Req, Opts) -> Commands0 = cowboy_req:header(<<"x-commands">>, Req), Commands = binary_to_term(base64:decode(Commands0)), case Commands of - bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); - _ -> ok + bad -> + Pid = case Req of + #{version := 'HTTP/2'} -> self(); + #{pid := Pid0} -> Pid0 + end, + ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); + _ -> + ok end, - {cowboy_websocket, Req, {Commands, RunOrHibernate}}. + {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}. websocket_init(State) -> {[], State}. diff --git a/test/handlers/ws_ignore.erl b/test/handlers/ws_ignore.erl index 9fe3322..37595aa 100644 --- a/test/handlers/ws_ignore.erl +++ b/test/handlers/ws_ignore.erl @@ -8,6 +8,7 @@ init(Req, _) -> {cowboy_websocket, Req, undefined, #{ + data_delivery => relay, compress => true }}. diff --git a/test/handlers/ws_info_commands_h.erl b/test/handlers/ws_info_commands_h.erl index d596473..2115727 100644 --- a/test/handlers/ws_info_commands_h.erl +++ b/test/handlers/ws_info_commands_h.erl @@ -10,14 +10,20 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req=#{pid := Pid}, RunOrHibernate) -> +init(Req, Opts) -> Commands0 = cowboy_req:header(<<"x-commands">>, Req), Commands = binary_to_term(base64:decode(Commands0)), case Commands of - bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); - _ -> ok + bad -> + Pid = case Req of + #{version := 'HTTP/2'} -> self(); + #{pid := Pid0} -> Pid0 + end, + ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); + _ -> + ok end, - {cowboy_websocket, Req, {Commands, RunOrHibernate}}. + {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}. websocket_init(State) -> self() ! shoot, diff --git a/test/handlers/ws_init_commands_h.erl b/test/handlers/ws_init_commands_h.erl index 8bae352..d821b18 100644 --- a/test/handlers/ws_init_commands_h.erl +++ b/test/handlers/ws_init_commands_h.erl @@ -9,14 +9,20 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req=#{pid := Pid}, RunOrHibernate) -> +init(Req, Opts) -> Commands0 = cowboy_req:header(<<"x-commands">>, Req), Commands = binary_to_term(base64:decode(Commands0)), case Commands of - bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); - _ -> ok + bad -> + Pid = case Req of + #{version := 'HTTP/2'} -> self(); + #{pid := Pid0} -> Pid0 + end, + ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); + _ -> + ok end, - {cowboy_websocket, Req, {Commands, RunOrHibernate}}. + {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}. websocket_init(State={Commands, run}) -> {Commands, State}; diff --git a/test/handlers/ws_init_h.erl b/test/handlers/ws_init_h.erl index bbe9ef9..c44986f 100644 --- a/test/handlers/ws_init_h.erl +++ b/test/handlers/ws_init_h.erl @@ -8,9 +8,9 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, _) -> +init(Req, Opts) -> State = binary_to_atom(cowboy_req:qs(Req), latin1), - {cowboy_websocket, Req, State}. + {cowboy_websocket, Req, State, Opts}. %% Sleep to make sure the HTTP response was sent. websocket_init(State) -> diff --git a/test/handlers/ws_set_options_commands_h.erl b/test/handlers/ws_set_options_commands_h.erl index 1ab0af4..af768d6 100644 --- a/test/handlers/ws_set_options_commands_h.erl +++ b/test/handlers/ws_set_options_commands_h.erl @@ -7,9 +7,10 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, RunOrHibernate) -> - {cowboy_websocket, Req, RunOrHibernate, - #{idle_timeout => infinity}}. +init(Req, Opts) -> + DataDelivery = maps:get(data_delivery, Opts, stream_handlers), + {cowboy_websocket, Req, maps:get(run_or_hibernate, Opts), + #{idle_timeout => infinity, data_delivery => DataDelivery}}. %% Set the idle_timeout option dynamically. websocket_handle({text, <<"idle_timeout_short">>}, State=run) -> diff --git a/test/handlers/ws_shutdown_reason_commands_h.erl b/test/handlers/ws_shutdown_reason_commands_h.erl index 90b435c..878b70a 100644 --- a/test/handlers/ws_shutdown_reason_commands_h.erl +++ b/test/handlers/ws_shutdown_reason_commands_h.erl @@ -10,9 +10,9 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, RunOrHibernate) -> +init(Req, Opts) -> TestPid = list_to_pid(binary_to_list(cowboy_req:header(<<"x-test-pid">>, Req))), - {cowboy_websocket, Req, {TestPid, RunOrHibernate}}. + {cowboy_websocket, Req, {TestPid, maps:get(run_or_hibernate, Opts)}, Opts}. websocket_init(State={TestPid, RunOrHibernate}) -> TestPid ! {ws_pid, self()}, diff --git a/test/rfc7231_SUITE.erl b/test/rfc7231_SUITE.erl index 183fa0f..a42ca3f 100644 --- a/test/rfc7231_SUITE.erl +++ b/test/rfc7231_SUITE.erl @@ -44,7 +44,7 @@ init_dispatch(_) -> {"/echo/:key", echo_h, []}, {"/delay/echo/:key", echo_h, []}, {"/resp/:key[/:arg]", resp_h, []}, - {"/ws", ws_init_h, []} + {"/ws", ws_init_h, #{}} ]}]). %% @todo The documentation should list what methods, headers and status codes diff --git a/test/rfc8441_SUITE.erl b/test/rfc8441_SUITE.erl index b788f9f..efa753c 100644 --- a/test/rfc8441_SUITE.erl +++ b/test/rfc8441_SUITE.erl @@ -396,6 +396,9 @@ accept_handshake_when_enabled(Config) -> MaskedHello = ws_SUITE:do_mask(<<"Hello">>, Mask, <<>>), ok = gen_tcp:send(Socket, cow_http2:data(1, nofin, <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedHello/binary>>)), + %% Ignore expected WINDOW_UPDATE frames. + {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000), + {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000), {ok, <<Len2:24, _:8, _:8, _:32>>} = gen_tcp:recv(Socket, 9, 1000), {ok, <<1:1, 0:3, 1:4, 0:1, 5:7, "Hello">>} = gen_tcp:recv(Socket, Len2, 1000), ok. diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl index 6fa4e61..2f1d3e2 100644 --- a/test/ws_SUITE.erl +++ b/test/ws_SUITE.erl @@ -42,7 +42,7 @@ init_dispatch() -> {"localhost", [ {"/ws_echo", ws_echo, []}, {"/ws_echo_timer", ws_echo_timer, []}, - {"/ws_init", ws_init_h, []}, + {"/ws_init", ws_init_h, #{}}, {"/ws_init_shutdown", ws_init_shutdown, []}, {"/ws_send_many", ws_send_many, [ {sequence, [ diff --git a/test/ws_SUITE_data/ws_echo.erl b/test/ws_SUITE_data/ws_echo.erl index efdc204..1f3a4be 100644 --- a/test/ws_SUITE_data/ws_echo.erl +++ b/test/ws_SUITE_data/ws_echo.erl @@ -8,6 +8,7 @@ init(Req, _) -> {cowboy_websocket, Req, undefined, #{ + data_delivery => relay, compress => true }}. diff --git a/test/ws_handler_SUITE.erl b/test/ws_handler_SUITE.erl index ab9dbe2..00c584d 100644 --- a/test/ws_handler_SUITE.erl +++ b/test/ws_handler_SUITE.erl @@ -19,21 +19,40 @@ -import(ct_helper, [config/2]). -import(ct_helper, [doc/1]). -import(cowboy_test, [gun_open/1]). +-import(cowboy_test, [gun_open/2]). -import(cowboy_test, [gun_down/1]). %% ct. all() -> - [{group, ws}, {group, ws_hibernate}]. + [ + {group, h1}, + {group, h1_hibernate}, + {group, h2}, + {group, h2_relay} + ]. %% @todo Test against HTTP/2 too. groups() -> AllTests = ct_helper:all(?MODULE), - [{ws, [parallel], AllTests}, {ws_hibernate, [parallel], AllTests}]. - -init_per_group(Name, Config) -> + [ + {h1, [parallel], AllTests}, + {h1_hibernate, [parallel], AllTests}, + %% The websocket_deflate_false test isn't compatible with HTTP/2. + {h2, [parallel], AllTests -- [websocket_deflate_false]}, + {h2_relay, [parallel], AllTests -- [websocket_deflate_false]} + ]. + +init_per_group(Name, Config) + when Name =:= h1; Name =:= h1_hibernate -> cowboy_test:init_http(Name, #{ env => #{dispatch => init_dispatch(Name)} + }, Config); +init_per_group(Name, Config) + when Name =:= h2; Name =:= h2_relay -> + cowboy_test:init_http2(Name, #{ + enable_connect_protocol => true, + env => #{dispatch => init_dispatch(Name)} }, Config). end_per_group(Name, _) -> @@ -42,25 +61,27 @@ end_per_group(Name, _) -> %% Dispatch configuration. init_dispatch(Name) -> - RunOrHibernate = case Name of - ws -> run; - ws_hibernate -> hibernate + InitialState = case Name of + h1_hibernate -> #{run_or_hibernate => hibernate}; + h2_relay -> #{run_or_hibernate => run, data_delivery => relay}; + _ -> #{run_or_hibernate => run} end, cowboy_router:compile([{'_', [ - {"/init", ws_init_commands_h, RunOrHibernate}, - {"/handle", ws_handle_commands_h, RunOrHibernate}, - {"/info", ws_info_commands_h, RunOrHibernate}, - {"/trap_exit", ws_init_h, RunOrHibernate}, - {"/active", ws_active_commands_h, RunOrHibernate}, - {"/deflate", ws_deflate_commands_h, RunOrHibernate}, - {"/set_options", ws_set_options_commands_h, RunOrHibernate}, - {"/shutdown_reason", ws_shutdown_reason_commands_h, RunOrHibernate} + {"/init", ws_init_commands_h, InitialState}, + {"/handle", ws_handle_commands_h, InitialState}, + {"/info", ws_info_commands_h, InitialState}, + {"/trap_exit", ws_init_h, InitialState}, + {"/active", ws_active_commands_h, InitialState}, + {"/deflate", ws_deflate_commands_h, InitialState}, + {"/set_options", ws_set_options_commands_h, InitialState}, + {"/shutdown_reason", ws_shutdown_reason_commands_h, InitialState} ]}]). %% Support functions for testing using Gun. gun_open_ws(Config, Path, Commands) -> - ConnPid = gun_open(Config), + ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}), + do_await_enable_connect_protocol(config(protocol, Config), ConnPid), StreamRef = gun:ws_upgrade(ConnPid, Path, [ {<<"x-commands">>, base64:encode(term_to_binary(Commands))} ]), @@ -75,6 +96,13 @@ gun_open_ws(Config, Path, Commands) -> error(timeout) end. +do_await_enable_connect_protocol(http, _) -> + ok; +do_await_enable_connect_protocol(http2, ConnPid) -> + {notify, settings_changed, #{enable_connect_protocol := true}} + = gun:await(ConnPid, undefined), %% @todo Maybe have a gun:await/1? + ok. + receive_ws(ConnPid, StreamRef) -> receive {gun_ws, ConnPid, StreamRef, Frame} -> @@ -123,7 +151,14 @@ websocket_info_invalid(Config) -> do_invalid(Config, Path) -> {ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, bad), ensure_handle_is_called(ConnPid, StreamRef, Path), - gun_down(ConnPid). + case config(protocol, Config) of + %% HTTP/1.1 closes the connection. + http -> gun_down(ConnPid); + %% HTTP/2 terminates the stream. + http2 -> + receive {gun_error, ConnPid, StreamRef, {stream_error, internal_error, _}} -> ok + after 500 -> error(timeout) end + end. websocket_init_one_frame(Config) -> doc("A single frame is received when websocket_init/1 returns it as a command."), @@ -223,8 +258,12 @@ websocket_active_false(Config) -> doc("The {active, false} command stops receiving data from the socket. " "The {active, true} command reenables it."), {ok, ConnPid, StreamRef} = gun_open_ws(Config, "/active", []), + %% We must exhaust the HTTP/2 flow control window + %% otherwise the frame will be received even if active mode is disabled. + gun:ws_send(ConnPid, StreamRef, {binary, <<0:100000/unit:8>>}), gun:ws_send(ConnPid, StreamRef, {text, <<"Not received until the handler enables active again.">>}), {error, timeout} = receive_ws(ConnPid, StreamRef), + {ok, {binary, _}} = receive_ws(ConnPid, StreamRef), {ok, {text, <<"Not received until the handler enables active again.">>}} = receive_ws(ConnPid, StreamRef), ok. @@ -271,7 +310,8 @@ websocket_deflate_ignore_if_not_negotiated(Config) -> websocket_set_options_idle_timeout(Config) -> doc("The idle_timeout option can be modified using the " "command {set_options, Opts} at runtime."), - ConnPid = gun_open(Config), + ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}), + do_await_enable_connect_protocol(config(protocol, Config), ConnPid), StreamRef = gun:ws_upgrade(ConnPid, "/set_options"), receive {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} -> @@ -299,7 +339,8 @@ websocket_set_options_idle_timeout(Config) -> websocket_set_options_max_frame_size(Config) -> doc("The max_frame_size option can be modified using the " "command {set_options, Opts} at runtime."), - ConnPid = gun_open(Config), + ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}), + do_await_enable_connect_protocol(config(protocol, Config), ConnPid), StreamRef = gun:ws_upgrade(ConnPid, "/set_options"), receive {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} -> @@ -334,7 +375,8 @@ websocket_set_options_max_frame_size(Config) -> websocket_shutdown_reason(Config) -> doc("The command {shutdown_reason, any()} can be used to " "change the shutdown reason of a Websocket connection."), - ConnPid = gun_open(Config), + ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}), + do_await_enable_connect_protocol(config(protocol, Config), ConnPid), StreamRef = gun:ws_upgrade(ConnPid, "/shutdown_reason", [ {<<"x-test-pid">>, pid_to_list(self())} ]), diff --git a/test/ws_perf_SUITE.erl b/test/ws_perf_SUITE.erl index ff88554..98b4e18 100644 --- a/test/ws_perf_SUITE.erl +++ b/test/ws_perf_SUITE.erl @@ -55,14 +55,15 @@ init_per_group(Name, Config) when Name =:= h2c; Name =:= h2c_compress -> h2c_compress -> {compress, #{stream_handlers => [cowboy_compress_h, cowboy_stream_h]}} end, Config1 = cowboy_test:init_http(Name, Opts#{ - connection_window_margin_size => 64*1024, + %% The margin sizes must be larger than the larger test message for plain sockets. + connection_window_margin_size => 128*1024, enable_connect_protocol => true, env => #{dispatch => init_dispatch(Config)}, max_frame_size_sent => 64*1024, max_frame_size_received => 16384 * 1024 - 1, max_received_frame_rate => {10_000_000, 1}, stream_window_data_threshold => 1024, - stream_window_margin_size => 64*1024 + stream_window_margin_size => 128*1024 }, [{flavor, Flavor}|Config]), lists:keyreplace(protocol, 1, Config1, {protocol, http2}); init_per_group(ascii, Config) -> @@ -265,13 +266,14 @@ send_N_16384B(Config) -> do_send(Config, What, Num, FrameSize) -> {ok, ConnPid, StreamRef} = do_gun_open_ws("/ws_ignore", Config), + %% Prepare the frame data. FrameType = config(frame_type, Config), FrameData = case FrameType of text -> do_text_data(Config, FrameSize); binary -> rand:bytes(FrameSize) end, %% Heat up the processes before doing the real run. -% do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData), +% do_send_loop(Socket, Num, FrameType, Mask, MaskedFrameData), {Time, _} = timer:tc(?MODULE, do_send_loop, [ConnPid, StreamRef, Num, FrameType, FrameData]), do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]), gun:ws_send(ConnPid, StreamRef, close), @@ -279,12 +281,181 @@ do_send(Config, What, Num, FrameSize) -> gun_down(ConnPid). do_send_loop(ConnPid, StreamRef, 0, _, _) -> - gun:ws_send(ConnPid, StreamRef, {text, <<"CHECK">>}), - {ok, {text, <<"CHECK">>}} = receive_ws(ConnPid, StreamRef), - ok; + gun:ws_send(ConnPid, StreamRef, {text, <<"CHECK">>}), + {ok, {text, <<"CHECK">>}} = receive_ws(ConnPid, StreamRef), + ok; do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData) -> - gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}), - do_send_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData). + gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}), + do_send_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData). + +tcp_send_N_00000B(Config) -> + doc("Send a 0B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 0). + +tcp_send_N_00256B(Config) -> + doc("Send a 256B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 256). + +tcp_send_N_01024B(Config) -> + doc("Send a 1024B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 1024). + +tcp_send_N_04096B(Config) -> + doc("Send a 4096B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 4096). + +tcp_send_N_16384B(Config) -> + doc("Send a 16384B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 16384). + +do_tcp_send(Config, What, Num, FrameSize) -> + {ok, Socket} = do_tcp_handshake(Config, #{}), + %% Prepare the frame data. + FrameType = config(frame_type, Config), + FrameData = case FrameType of + text -> do_text_data(Config, FrameSize); + binary -> rand:bytes(FrameSize) + end, + %% Mask the data outside the benchmark to avoid influencing the results. + Mask = 16#37fa213d, + MaskedFrameData = ws_SUITE:do_mask(FrameData, Mask, <<>>), + FrameSizeWithHeader = FrameSize + case FrameSize of + N when N =< 125 -> 6; + N when N =< 16#ffff -> 8; + N when N =< 16#7fffffffffffffff -> 14 + end, + %% Run the benchmark; different function for h1 and h2. + {Time, _} = case config(protocol, Config) of + http -> timer:tc(?MODULE, do_tcp_send_loop_h1, + [Socket, Num, FrameType, Mask, MaskedFrameData]); + http2 -> timer:tc(?MODULE, do_tcp_send_loop_h2, + [Socket, 65535, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader]) + end, + do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]), + gen_tcp:close(Socket). + +%% Do a prior knowledge handshake. +do_tcp_handshake(Config, LocalSettings) -> + Protocol = config(protocol, Config), + Socket1 = case Protocol of + http -> + {ok, Socket, _} = ws_SUITE:do_handshake(<<"/ws_ignore">>, Config), + Socket; + http2 -> + {ok, Socket} = gen_tcp:connect("localhost", config(port, Config), [binary, {active, false}]), + %% Send a valid preface. + ok = gen_tcp:send(Socket, [ + "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n", + cow_http2:settings(LocalSettings) + ]), + %% Receive the server preface. + {ok, << Len:24 >>} = gen_tcp:recv(Socket, 3, 1000), + {ok, << 4:8, 0:40, SettingsPayload:Len/binary >>} = gen_tcp:recv(Socket, 6 + Len, 1000), + RemoteSettings = cow_http2:parse_settings_payload(SettingsPayload), + #{enable_connect_protocol := true} = RemoteSettings, + %% Send the SETTINGS ack. + ok = gen_tcp:send(Socket, cow_http2:settings_ack()), + %% Receive the SETTINGS ack. + {ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000), + %% Send a CONNECT :protocol request to upgrade the stream to Websocket. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"websocket">>}, + {<<":scheme">>, <<"http">>}, + {<<":path">>, <<"/ws_ignore">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a 200 response. + {ok, << Len1:24, 1:8, _:8, 1:32 >>} = gen_tcp:recv(Socket, 9, 1000), + {ok, RespHeadersBlock} = gen_tcp:recv(Socket, Len1, 1000), + {RespHeaders, _} = cow_hpack:decode(RespHeadersBlock), + {_, <<"200">>} = lists:keyfind(<<":status">>, 1, RespHeaders), + Socket + end, + %% Enable active mode to avoid delays in receiving data at the end of benchmark. + ok = inet:setopts(Socket1, [{active, true}]), + %% Stream number 1 is ready. + {ok, Socket1}. + +do_tcp_send_loop_h1(Socket, 0, _, Mask, _) -> + MaskedFrameData = ws_SUITE:do_mask(<<"CHECK">>, Mask, <<>>), + ok = gen_tcp:send(Socket, + <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedFrameData/binary>>), + do_tcp_wait_for_check(Socket); +do_tcp_send_loop_h1(Socket, Num, FrameType, Mask, MaskedFrameData) -> + Len = byte_size(MaskedFrameData), + LenBits = case Len of + N when N =< 125 -> << N:7 >>; + N when N =< 16#ffff -> << 126:7, N:16 >>; + N when N =< 16#7fffffffffffffff -> << 127:7, N:64 >> + end, + FrameHeader = <<1:1, 0:3, 2:4, 1:1, LenBits/bits, Mask:32>>, + ok = gen_tcp:send(Socket, [ + FrameHeader, + MaskedFrameData + ]), + do_tcp_send_loop_h1(Socket, Num - 1, FrameType, Mask, MaskedFrameData). + +do_tcp_send_loop_h2(Socket, _, 0, _, Mask, _, _) -> + MaskedFrameData = ws_SUITE:do_mask(<<"CHECK">>, Mask, <<>>), + ok = gen_tcp:send(Socket, cow_http2:data(1, nofin, + <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedFrameData/binary>>)), + do_tcp_wait_for_check(Socket); +do_tcp_send_loop_h2(Socket, Window0, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader) + when Window0 < FrameSizeWithHeader -> + %% The remaining window isn't large enough so + %% we wait for WINDOW_UPDATE frames. + Window = do_tcp_wait_window_updates(Socket, Window0), + do_tcp_send_loop_h2(Socket, Window, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader); +do_tcp_send_loop_h2(Socket, Window, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader) -> + Len = byte_size(MaskedFrameData), + LenBits = case Len of + N when N =< 125 -> << N:7 >>; + N when N =< 16#ffff -> << 126:7, N:16 >>; + N when N =< 16#7fffffffffffffff -> << 127:7, N:64 >> + end, + FrameHeader = <<1:1, 0:3, 2:4, 1:1, LenBits/bits, Mask:32>>, + ok = gen_tcp:send(Socket, cow_http2:data(1, nofin, [ + FrameHeader, + MaskedFrameData + ])), + do_tcp_send_loop_h2(Socket, Window - FrameSizeWithHeader, + Num - 1, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader). + +do_tcp_wait_window_updates(Socket, Window) -> + receive + {tcp, Socket, Data} -> + do_tcp_wait_window_updates_parse(Socket, Window, Data) + after 0 -> + Window + end. + +do_tcp_wait_window_updates_parse(Socket, Window, Data) -> + case Data of + %% Ignore the connection-wide WINDOW_UPDATE. + <<4:24, 8:8, 0:8, 0:32, 0:1, _:31, Rest/bits>> -> + do_tcp_wait_window_updates_parse(Socket, Window, Rest); + %% Use the stream-specific WINDOW_UPDATE to increment our window. + <<4:24, 8:8, 0:8, 1:32, 0:1, Inc:31, Rest/bits>> -> + do_tcp_wait_window_updates_parse(Socket, Window + Inc, Rest); + %% Other frames are not expected. + <<>> -> + do_tcp_wait_window_updates(Socket, Window) + end. + +do_tcp_wait_for_check(Socket) -> + receive + {tcp, Socket, Data} -> + case binary:match(Data, <<"CHECK">>) of + nomatch -> do_tcp_wait_for_check(Socket); + _ -> ok + end + after 5000 -> + error(timeout) + end. %% Internal. |