diff options
author | Loïc Hoguin <[email protected]> | 2025-08-21 18:09:42 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2025-09-15 13:09:23 +0200 |
commit | 8da6ca11e8ea4e93def78bd0299decd6f409bc43 (patch) | |
tree | e897bf997175a113e02acb2b7f4c179c7528aeb4 /test | |
parent | a8c717718a3f4dd7b4bc67fe7bebe3a4e7a7ed74 (diff) | |
download | cowboy-8da6ca11e8ea4e93def78bd0299decd6f409bc43.tar.gz cowboy-8da6ca11e8ea4e93def78bd0299decd6f409bc43.tar.bz2 cowboy-8da6ca11e8ea4e93def78bd0299decd6f409bc43.zip |
New data delivery mechanism for HTTP/2+ Websocketdirect-data_delivery-for-h2-websocket
A new data_delivery mechanism called 'relay' has been added.
It bypasses stream handlers (and the buffering in cowboy_stream_h)
and sends the data directly to the process implementing
Websocket (and should work for other similar protocols
like HTTP/2 WebTransport).
Flow control in HTTP/2 is maintained in a simpler way,
via a configured flow value that is used to maintain
the window to a reasonable value when data is received.
The 'relay' data_delivery has been implemented for both
HTTP/2 and HTTP/3. It has not been implemented for HTTP/1.1
since switching protocol there overrides the connection process.
HTTP/2 Websocket is now better tested.
A bug was fixed with the 'stream_handlers' data_delivery
where active mode would not be reenabled if it was disabled
at some point.
The Websocket performance suite has been updated to
include tests that do not use Gun. Websocket modules
used by the performance suite use the 'relay' data_delivery
now. Performance is improved significantly with 'relay',
between 10% and 20% faster. HTTP/2 Websocket performance
is not on par with HTTP/1.1 still, but the remaining
difference is thought to be from the HTTP/2 overhead and
flow control.
Diffstat (limited to 'test')
-rw-r--r-- | test/handlers/ws_active_commands_h.erl | 4 | ||||
-rw-r--r-- | test/handlers/ws_deflate_commands_h.erl | 7 | ||||
-rw-r--r-- | test/handlers/ws_handle_commands_h.erl | 14 | ||||
-rw-r--r-- | test/handlers/ws_ignore.erl | 1 | ||||
-rw-r--r-- | test/handlers/ws_info_commands_h.erl | 14 | ||||
-rw-r--r-- | test/handlers/ws_init_commands_h.erl | 14 | ||||
-rw-r--r-- | test/handlers/ws_init_h.erl | 4 | ||||
-rw-r--r-- | test/handlers/ws_set_options_commands_h.erl | 7 | ||||
-rw-r--r-- | test/handlers/ws_shutdown_reason_commands_h.erl | 4 | ||||
-rw-r--r-- | test/rfc7231_SUITE.erl | 2 | ||||
-rw-r--r-- | test/rfc8441_SUITE.erl | 3 | ||||
-rw-r--r-- | test/ws_SUITE.erl | 2 | ||||
-rw-r--r-- | test/ws_SUITE_data/ws_echo.erl | 1 | ||||
-rw-r--r-- | test/ws_handler_SUITE.erl | 82 | ||||
-rw-r--r-- | test/ws_perf_SUITE.erl | 187 |
15 files changed, 292 insertions, 54 deletions
diff --git a/test/handlers/ws_active_commands_h.erl b/test/handlers/ws_active_commands_h.erl index 1c615e3..bbf8907 100644 --- a/test/handlers/ws_active_commands_h.erl +++ b/test/handlers/ws_active_commands_h.erl @@ -9,8 +9,8 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, RunOrHibernate) -> - {cowboy_websocket, Req, RunOrHibernate}. +init(Req, Opts) -> + {cowboy_websocket, Req, maps:get(run_or_hibernate, Opts), Opts}. websocket_init(State=run) -> erlang:send_after(1500, self(), active_true), diff --git a/test/handlers/ws_deflate_commands_h.erl b/test/handlers/ws_deflate_commands_h.erl index 14236bc..be3e16e 100644 --- a/test/handlers/ws_deflate_commands_h.erl +++ b/test/handlers/ws_deflate_commands_h.erl @@ -8,10 +8,11 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, RunOrHibernate) -> +init(Req, Opts) -> + DataDelivery = maps:get(data_delivery, Opts, stream_handlers), {cowboy_websocket, Req, - #{deflate => true, hibernate => RunOrHibernate}, - #{compress => true}}. + #{deflate => true, hibernate => maps:get(run_or_hibernate, Opts)}, + #{compress => true, data_delivery => DataDelivery}}. websocket_handle(Frame, State=#{deflate := Deflate0, hibernate := run}) -> Deflate = not Deflate0, diff --git a/test/handlers/ws_handle_commands_h.erl b/test/handlers/ws_handle_commands_h.erl index da3ffad..0a8513b 100644 --- a/test/handlers/ws_handle_commands_h.erl +++ b/test/handlers/ws_handle_commands_h.erl @@ -9,14 +9,20 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req=#{pid := Pid}, RunOrHibernate) -> +init(Req, Opts) -> Commands0 = cowboy_req:header(<<"x-commands">>, Req), Commands = binary_to_term(base64:decode(Commands0)), case Commands of - bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); - _ -> ok + bad -> + Pid = case Req of + #{version := 'HTTP/2'} -> self(); + #{pid := Pid0} -> Pid0 + end, + ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); + _ -> + ok end, - {cowboy_websocket, Req, {Commands, RunOrHibernate}}. + {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}. websocket_init(State) -> {[], State}. diff --git a/test/handlers/ws_ignore.erl b/test/handlers/ws_ignore.erl index 9fe3322..37595aa 100644 --- a/test/handlers/ws_ignore.erl +++ b/test/handlers/ws_ignore.erl @@ -8,6 +8,7 @@ init(Req, _) -> {cowboy_websocket, Req, undefined, #{ + data_delivery => relay, compress => true }}. diff --git a/test/handlers/ws_info_commands_h.erl b/test/handlers/ws_info_commands_h.erl index d596473..2115727 100644 --- a/test/handlers/ws_info_commands_h.erl +++ b/test/handlers/ws_info_commands_h.erl @@ -10,14 +10,20 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req=#{pid := Pid}, RunOrHibernate) -> +init(Req, Opts) -> Commands0 = cowboy_req:header(<<"x-commands">>, Req), Commands = binary_to_term(base64:decode(Commands0)), case Commands of - bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); - _ -> ok + bad -> + Pid = case Req of + #{version := 'HTTP/2'} -> self(); + #{pid := Pid0} -> Pid0 + end, + ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); + _ -> + ok end, - {cowboy_websocket, Req, {Commands, RunOrHibernate}}. + {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}. websocket_init(State) -> self() ! shoot, diff --git a/test/handlers/ws_init_commands_h.erl b/test/handlers/ws_init_commands_h.erl index 8bae352..d821b18 100644 --- a/test/handlers/ws_init_commands_h.erl +++ b/test/handlers/ws_init_commands_h.erl @@ -9,14 +9,20 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req=#{pid := Pid}, RunOrHibernate) -> +init(Req, Opts) -> Commands0 = cowboy_req:header(<<"x-commands">>, Req), Commands = binary_to_term(base64:decode(Commands0)), case Commands of - bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); - _ -> ok + bad -> + Pid = case Req of + #{version := 'HTTP/2'} -> self(); + #{pid := Pid0} -> Pid0 + end, + ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); + _ -> + ok end, - {cowboy_websocket, Req, {Commands, RunOrHibernate}}. + {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}. websocket_init(State={Commands, run}) -> {Commands, State}; diff --git a/test/handlers/ws_init_h.erl b/test/handlers/ws_init_h.erl index bbe9ef9..c44986f 100644 --- a/test/handlers/ws_init_h.erl +++ b/test/handlers/ws_init_h.erl @@ -8,9 +8,9 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, _) -> +init(Req, Opts) -> State = binary_to_atom(cowboy_req:qs(Req), latin1), - {cowboy_websocket, Req, State}. + {cowboy_websocket, Req, State, Opts}. %% Sleep to make sure the HTTP response was sent. websocket_init(State) -> diff --git a/test/handlers/ws_set_options_commands_h.erl b/test/handlers/ws_set_options_commands_h.erl index 1ab0af4..af768d6 100644 --- a/test/handlers/ws_set_options_commands_h.erl +++ b/test/handlers/ws_set_options_commands_h.erl @@ -7,9 +7,10 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, RunOrHibernate) -> - {cowboy_websocket, Req, RunOrHibernate, - #{idle_timeout => infinity}}. +init(Req, Opts) -> + DataDelivery = maps:get(data_delivery, Opts, stream_handlers), + {cowboy_websocket, Req, maps:get(run_or_hibernate, Opts), + #{idle_timeout => infinity, data_delivery => DataDelivery}}. %% Set the idle_timeout option dynamically. websocket_handle({text, <<"idle_timeout_short">>}, State=run) -> diff --git a/test/handlers/ws_shutdown_reason_commands_h.erl b/test/handlers/ws_shutdown_reason_commands_h.erl index 90b435c..878b70a 100644 --- a/test/handlers/ws_shutdown_reason_commands_h.erl +++ b/test/handlers/ws_shutdown_reason_commands_h.erl @@ -10,9 +10,9 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, RunOrHibernate) -> +init(Req, Opts) -> TestPid = list_to_pid(binary_to_list(cowboy_req:header(<<"x-test-pid">>, Req))), - {cowboy_websocket, Req, {TestPid, RunOrHibernate}}. + {cowboy_websocket, Req, {TestPid, maps:get(run_or_hibernate, Opts)}, Opts}. websocket_init(State={TestPid, RunOrHibernate}) -> TestPid ! {ws_pid, self()}, diff --git a/test/rfc7231_SUITE.erl b/test/rfc7231_SUITE.erl index 183fa0f..a42ca3f 100644 --- a/test/rfc7231_SUITE.erl +++ b/test/rfc7231_SUITE.erl @@ -44,7 +44,7 @@ init_dispatch(_) -> {"/echo/:key", echo_h, []}, {"/delay/echo/:key", echo_h, []}, {"/resp/:key[/:arg]", resp_h, []}, - {"/ws", ws_init_h, []} + {"/ws", ws_init_h, #{}} ]}]). %% @todo The documentation should list what methods, headers and status codes diff --git a/test/rfc8441_SUITE.erl b/test/rfc8441_SUITE.erl index b788f9f..efa753c 100644 --- a/test/rfc8441_SUITE.erl +++ b/test/rfc8441_SUITE.erl @@ -396,6 +396,9 @@ accept_handshake_when_enabled(Config) -> MaskedHello = ws_SUITE:do_mask(<<"Hello">>, Mask, <<>>), ok = gen_tcp:send(Socket, cow_http2:data(1, nofin, <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedHello/binary>>)), + %% Ignore expected WINDOW_UPDATE frames. + {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000), + {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000), {ok, <<Len2:24, _:8, _:8, _:32>>} = gen_tcp:recv(Socket, 9, 1000), {ok, <<1:1, 0:3, 1:4, 0:1, 5:7, "Hello">>} = gen_tcp:recv(Socket, Len2, 1000), ok. diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl index 6fa4e61..2f1d3e2 100644 --- a/test/ws_SUITE.erl +++ b/test/ws_SUITE.erl @@ -42,7 +42,7 @@ init_dispatch() -> {"localhost", [ {"/ws_echo", ws_echo, []}, {"/ws_echo_timer", ws_echo_timer, []}, - {"/ws_init", ws_init_h, []}, + {"/ws_init", ws_init_h, #{}}, {"/ws_init_shutdown", ws_init_shutdown, []}, {"/ws_send_many", ws_send_many, [ {sequence, [ diff --git a/test/ws_SUITE_data/ws_echo.erl b/test/ws_SUITE_data/ws_echo.erl index efdc204..1f3a4be 100644 --- a/test/ws_SUITE_data/ws_echo.erl +++ b/test/ws_SUITE_data/ws_echo.erl @@ -8,6 +8,7 @@ init(Req, _) -> {cowboy_websocket, Req, undefined, #{ + data_delivery => relay, compress => true }}. diff --git a/test/ws_handler_SUITE.erl b/test/ws_handler_SUITE.erl index ab9dbe2..00c584d 100644 --- a/test/ws_handler_SUITE.erl +++ b/test/ws_handler_SUITE.erl @@ -19,21 +19,40 @@ -import(ct_helper, [config/2]). -import(ct_helper, [doc/1]). -import(cowboy_test, [gun_open/1]). +-import(cowboy_test, [gun_open/2]). -import(cowboy_test, [gun_down/1]). %% ct. all() -> - [{group, ws}, {group, ws_hibernate}]. + [ + {group, h1}, + {group, h1_hibernate}, + {group, h2}, + {group, h2_relay} + ]. %% @todo Test against HTTP/2 too. groups() -> AllTests = ct_helper:all(?MODULE), - [{ws, [parallel], AllTests}, {ws_hibernate, [parallel], AllTests}]. - -init_per_group(Name, Config) -> + [ + {h1, [parallel], AllTests}, + {h1_hibernate, [parallel], AllTests}, + %% The websocket_deflate_false test isn't compatible with HTTP/2. + {h2, [parallel], AllTests -- [websocket_deflate_false]}, + {h2_relay, [parallel], AllTests -- [websocket_deflate_false]} + ]. + +init_per_group(Name, Config) + when Name =:= h1; Name =:= h1_hibernate -> cowboy_test:init_http(Name, #{ env => #{dispatch => init_dispatch(Name)} + }, Config); +init_per_group(Name, Config) + when Name =:= h2; Name =:= h2_relay -> + cowboy_test:init_http2(Name, #{ + enable_connect_protocol => true, + env => #{dispatch => init_dispatch(Name)} }, Config). end_per_group(Name, _) -> @@ -42,25 +61,27 @@ end_per_group(Name, _) -> %% Dispatch configuration. init_dispatch(Name) -> - RunOrHibernate = case Name of - ws -> run; - ws_hibernate -> hibernate + InitialState = case Name of + h1_hibernate -> #{run_or_hibernate => hibernate}; + h2_relay -> #{run_or_hibernate => run, data_delivery => relay}; + _ -> #{run_or_hibernate => run} end, cowboy_router:compile([{'_', [ - {"/init", ws_init_commands_h, RunOrHibernate}, - {"/handle", ws_handle_commands_h, RunOrHibernate}, - {"/info", ws_info_commands_h, RunOrHibernate}, - {"/trap_exit", ws_init_h, RunOrHibernate}, - {"/active", ws_active_commands_h, RunOrHibernate}, - {"/deflate", ws_deflate_commands_h, RunOrHibernate}, - {"/set_options", ws_set_options_commands_h, RunOrHibernate}, - {"/shutdown_reason", ws_shutdown_reason_commands_h, RunOrHibernate} + {"/init", ws_init_commands_h, InitialState}, + {"/handle", ws_handle_commands_h, InitialState}, + {"/info", ws_info_commands_h, InitialState}, + {"/trap_exit", ws_init_h, InitialState}, + {"/active", ws_active_commands_h, InitialState}, + {"/deflate", ws_deflate_commands_h, InitialState}, + {"/set_options", ws_set_options_commands_h, InitialState}, + {"/shutdown_reason", ws_shutdown_reason_commands_h, InitialState} ]}]). %% Support functions for testing using Gun. gun_open_ws(Config, Path, Commands) -> - ConnPid = gun_open(Config), + ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}), + do_await_enable_connect_protocol(config(protocol, Config), ConnPid), StreamRef = gun:ws_upgrade(ConnPid, Path, [ {<<"x-commands">>, base64:encode(term_to_binary(Commands))} ]), @@ -75,6 +96,13 @@ gun_open_ws(Config, Path, Commands) -> error(timeout) end. +do_await_enable_connect_protocol(http, _) -> + ok; +do_await_enable_connect_protocol(http2, ConnPid) -> + {notify, settings_changed, #{enable_connect_protocol := true}} + = gun:await(ConnPid, undefined), %% @todo Maybe have a gun:await/1? + ok. + receive_ws(ConnPid, StreamRef) -> receive {gun_ws, ConnPid, StreamRef, Frame} -> @@ -123,7 +151,14 @@ websocket_info_invalid(Config) -> do_invalid(Config, Path) -> {ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, bad), ensure_handle_is_called(ConnPid, StreamRef, Path), - gun_down(ConnPid). + case config(protocol, Config) of + %% HTTP/1.1 closes the connection. + http -> gun_down(ConnPid); + %% HTTP/2 terminates the stream. + http2 -> + receive {gun_error, ConnPid, StreamRef, {stream_error, internal_error, _}} -> ok + after 500 -> error(timeout) end + end. websocket_init_one_frame(Config) -> doc("A single frame is received when websocket_init/1 returns it as a command."), @@ -223,8 +258,12 @@ websocket_active_false(Config) -> doc("The {active, false} command stops receiving data from the socket. " "The {active, true} command reenables it."), {ok, ConnPid, StreamRef} = gun_open_ws(Config, "/active", []), + %% We must exhaust the HTTP/2 flow control window + %% otherwise the frame will be received even if active mode is disabled. + gun:ws_send(ConnPid, StreamRef, {binary, <<0:100000/unit:8>>}), gun:ws_send(ConnPid, StreamRef, {text, <<"Not received until the handler enables active again.">>}), {error, timeout} = receive_ws(ConnPid, StreamRef), + {ok, {binary, _}} = receive_ws(ConnPid, StreamRef), {ok, {text, <<"Not received until the handler enables active again.">>}} = receive_ws(ConnPid, StreamRef), ok. @@ -271,7 +310,8 @@ websocket_deflate_ignore_if_not_negotiated(Config) -> websocket_set_options_idle_timeout(Config) -> doc("The idle_timeout option can be modified using the " "command {set_options, Opts} at runtime."), - ConnPid = gun_open(Config), + ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}), + do_await_enable_connect_protocol(config(protocol, Config), ConnPid), StreamRef = gun:ws_upgrade(ConnPid, "/set_options"), receive {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} -> @@ -299,7 +339,8 @@ websocket_set_options_idle_timeout(Config) -> websocket_set_options_max_frame_size(Config) -> doc("The max_frame_size option can be modified using the " "command {set_options, Opts} at runtime."), - ConnPid = gun_open(Config), + ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}), + do_await_enable_connect_protocol(config(protocol, Config), ConnPid), StreamRef = gun:ws_upgrade(ConnPid, "/set_options"), receive {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} -> @@ -334,7 +375,8 @@ websocket_set_options_max_frame_size(Config) -> websocket_shutdown_reason(Config) -> doc("The command {shutdown_reason, any()} can be used to " "change the shutdown reason of a Websocket connection."), - ConnPid = gun_open(Config), + ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}), + do_await_enable_connect_protocol(config(protocol, Config), ConnPid), StreamRef = gun:ws_upgrade(ConnPid, "/shutdown_reason", [ {<<"x-test-pid">>, pid_to_list(self())} ]), diff --git a/test/ws_perf_SUITE.erl b/test/ws_perf_SUITE.erl index ff88554..98b4e18 100644 --- a/test/ws_perf_SUITE.erl +++ b/test/ws_perf_SUITE.erl @@ -55,14 +55,15 @@ init_per_group(Name, Config) when Name =:= h2c; Name =:= h2c_compress -> h2c_compress -> {compress, #{stream_handlers => [cowboy_compress_h, cowboy_stream_h]}} end, Config1 = cowboy_test:init_http(Name, Opts#{ - connection_window_margin_size => 64*1024, + %% The margin sizes must be larger than the larger test message for plain sockets. + connection_window_margin_size => 128*1024, enable_connect_protocol => true, env => #{dispatch => init_dispatch(Config)}, max_frame_size_sent => 64*1024, max_frame_size_received => 16384 * 1024 - 1, max_received_frame_rate => {10_000_000, 1}, stream_window_data_threshold => 1024, - stream_window_margin_size => 64*1024 + stream_window_margin_size => 128*1024 }, [{flavor, Flavor}|Config]), lists:keyreplace(protocol, 1, Config1, {protocol, http2}); init_per_group(ascii, Config) -> @@ -265,13 +266,14 @@ send_N_16384B(Config) -> do_send(Config, What, Num, FrameSize) -> {ok, ConnPid, StreamRef} = do_gun_open_ws("/ws_ignore", Config), + %% Prepare the frame data. FrameType = config(frame_type, Config), FrameData = case FrameType of text -> do_text_data(Config, FrameSize); binary -> rand:bytes(FrameSize) end, %% Heat up the processes before doing the real run. -% do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData), +% do_send_loop(Socket, Num, FrameType, Mask, MaskedFrameData), {Time, _} = timer:tc(?MODULE, do_send_loop, [ConnPid, StreamRef, Num, FrameType, FrameData]), do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]), gun:ws_send(ConnPid, StreamRef, close), @@ -279,12 +281,181 @@ do_send(Config, What, Num, FrameSize) -> gun_down(ConnPid). do_send_loop(ConnPid, StreamRef, 0, _, _) -> - gun:ws_send(ConnPid, StreamRef, {text, <<"CHECK">>}), - {ok, {text, <<"CHECK">>}} = receive_ws(ConnPid, StreamRef), - ok; + gun:ws_send(ConnPid, StreamRef, {text, <<"CHECK">>}), + {ok, {text, <<"CHECK">>}} = receive_ws(ConnPid, StreamRef), + ok; do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData) -> - gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}), - do_send_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData). + gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}), + do_send_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData). + +tcp_send_N_00000B(Config) -> + doc("Send a 0B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 0). + +tcp_send_N_00256B(Config) -> + doc("Send a 256B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 256). + +tcp_send_N_01024B(Config) -> + doc("Send a 1024B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 1024). + +tcp_send_N_04096B(Config) -> + doc("Send a 4096B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 4096). + +tcp_send_N_16384B(Config) -> + doc("Send a 16384B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 16384). + +do_tcp_send(Config, What, Num, FrameSize) -> + {ok, Socket} = do_tcp_handshake(Config, #{}), + %% Prepare the frame data. + FrameType = config(frame_type, Config), + FrameData = case FrameType of + text -> do_text_data(Config, FrameSize); + binary -> rand:bytes(FrameSize) + end, + %% Mask the data outside the benchmark to avoid influencing the results. + Mask = 16#37fa213d, + MaskedFrameData = ws_SUITE:do_mask(FrameData, Mask, <<>>), + FrameSizeWithHeader = FrameSize + case FrameSize of + N when N =< 125 -> 6; + N when N =< 16#ffff -> 8; + N when N =< 16#7fffffffffffffff -> 14 + end, + %% Run the benchmark; different function for h1 and h2. + {Time, _} = case config(protocol, Config) of + http -> timer:tc(?MODULE, do_tcp_send_loop_h1, + [Socket, Num, FrameType, Mask, MaskedFrameData]); + http2 -> timer:tc(?MODULE, do_tcp_send_loop_h2, + [Socket, 65535, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader]) + end, + do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]), + gen_tcp:close(Socket). + +%% Do a prior knowledge handshake. +do_tcp_handshake(Config, LocalSettings) -> + Protocol = config(protocol, Config), + Socket1 = case Protocol of + http -> + {ok, Socket, _} = ws_SUITE:do_handshake(<<"/ws_ignore">>, Config), + Socket; + http2 -> + {ok, Socket} = gen_tcp:connect("localhost", config(port, Config), [binary, {active, false}]), + %% Send a valid preface. + ok = gen_tcp:send(Socket, [ + "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n", + cow_http2:settings(LocalSettings) + ]), + %% Receive the server preface. + {ok, << Len:24 >>} = gen_tcp:recv(Socket, 3, 1000), + {ok, << 4:8, 0:40, SettingsPayload:Len/binary >>} = gen_tcp:recv(Socket, 6 + Len, 1000), + RemoteSettings = cow_http2:parse_settings_payload(SettingsPayload), + #{enable_connect_protocol := true} = RemoteSettings, + %% Send the SETTINGS ack. + ok = gen_tcp:send(Socket, cow_http2:settings_ack()), + %% Receive the SETTINGS ack. + {ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000), + %% Send a CONNECT :protocol request to upgrade the stream to Websocket. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"websocket">>}, + {<<":scheme">>, <<"http">>}, + {<<":path">>, <<"/ws_ignore">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a 200 response. + {ok, << Len1:24, 1:8, _:8, 1:32 >>} = gen_tcp:recv(Socket, 9, 1000), + {ok, RespHeadersBlock} = gen_tcp:recv(Socket, Len1, 1000), + {RespHeaders, _} = cow_hpack:decode(RespHeadersBlock), + {_, <<"200">>} = lists:keyfind(<<":status">>, 1, RespHeaders), + Socket + end, + %% Enable active mode to avoid delays in receiving data at the end of benchmark. + ok = inet:setopts(Socket1, [{active, true}]), + %% Stream number 1 is ready. + {ok, Socket1}. + +do_tcp_send_loop_h1(Socket, 0, _, Mask, _) -> + MaskedFrameData = ws_SUITE:do_mask(<<"CHECK">>, Mask, <<>>), + ok = gen_tcp:send(Socket, + <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedFrameData/binary>>), + do_tcp_wait_for_check(Socket); +do_tcp_send_loop_h1(Socket, Num, FrameType, Mask, MaskedFrameData) -> + Len = byte_size(MaskedFrameData), + LenBits = case Len of + N when N =< 125 -> << N:7 >>; + N when N =< 16#ffff -> << 126:7, N:16 >>; + N when N =< 16#7fffffffffffffff -> << 127:7, N:64 >> + end, + FrameHeader = <<1:1, 0:3, 2:4, 1:1, LenBits/bits, Mask:32>>, + ok = gen_tcp:send(Socket, [ + FrameHeader, + MaskedFrameData + ]), + do_tcp_send_loop_h1(Socket, Num - 1, FrameType, Mask, MaskedFrameData). + +do_tcp_send_loop_h2(Socket, _, 0, _, Mask, _, _) -> + MaskedFrameData = ws_SUITE:do_mask(<<"CHECK">>, Mask, <<>>), + ok = gen_tcp:send(Socket, cow_http2:data(1, nofin, + <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedFrameData/binary>>)), + do_tcp_wait_for_check(Socket); +do_tcp_send_loop_h2(Socket, Window0, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader) + when Window0 < FrameSizeWithHeader -> + %% The remaining window isn't large enough so + %% we wait for WINDOW_UPDATE frames. + Window = do_tcp_wait_window_updates(Socket, Window0), + do_tcp_send_loop_h2(Socket, Window, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader); +do_tcp_send_loop_h2(Socket, Window, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader) -> + Len = byte_size(MaskedFrameData), + LenBits = case Len of + N when N =< 125 -> << N:7 >>; + N when N =< 16#ffff -> << 126:7, N:16 >>; + N when N =< 16#7fffffffffffffff -> << 127:7, N:64 >> + end, + FrameHeader = <<1:1, 0:3, 2:4, 1:1, LenBits/bits, Mask:32>>, + ok = gen_tcp:send(Socket, cow_http2:data(1, nofin, [ + FrameHeader, + MaskedFrameData + ])), + do_tcp_send_loop_h2(Socket, Window - FrameSizeWithHeader, + Num - 1, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader). + +do_tcp_wait_window_updates(Socket, Window) -> + receive + {tcp, Socket, Data} -> + do_tcp_wait_window_updates_parse(Socket, Window, Data) + after 0 -> + Window + end. + +do_tcp_wait_window_updates_parse(Socket, Window, Data) -> + case Data of + %% Ignore the connection-wide WINDOW_UPDATE. + <<4:24, 8:8, 0:8, 0:32, 0:1, _:31, Rest/bits>> -> + do_tcp_wait_window_updates_parse(Socket, Window, Rest); + %% Use the stream-specific WINDOW_UPDATE to increment our window. + <<4:24, 8:8, 0:8, 1:32, 0:1, Inc:31, Rest/bits>> -> + do_tcp_wait_window_updates_parse(Socket, Window + Inc, Rest); + %% Other frames are not expected. + <<>> -> + do_tcp_wait_window_updates(Socket, Window) + end. + +do_tcp_wait_for_check(Socket) -> + receive + {tcp, Socket, Data} -> + case binary:match(Data, <<"CHECK">>) of + nomatch -> do_tcp_wait_for_check(Socket); + _ -> ok + end + after 5000 -> + error(timeout) + end. %% Internal. |