aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2025-08-21 18:09:42 +0200
committerLoïc Hoguin <[email protected]>2025-09-15 13:09:23 +0200
commit8da6ca11e8ea4e93def78bd0299decd6f409bc43 (patch)
treee897bf997175a113e02acb2b7f4c179c7528aeb4
parenta8c717718a3f4dd7b4bc67fe7bebe3a4e7a7ed74 (diff)
downloadcowboy-8da6ca11e8ea4e93def78bd0299decd6f409bc43.tar.gz
cowboy-8da6ca11e8ea4e93def78bd0299decd6f409bc43.tar.bz2
cowboy-8da6ca11e8ea4e93def78bd0299decd6f409bc43.zip
New data delivery mechanism for HTTP/2+ Websocketdirect-data_delivery-for-h2-websocket
A new data_delivery mechanism called 'relay' has been added. It bypasses stream handlers (and the buffering in cowboy_stream_h) and sends the data directly to the process implementing Websocket (and should work for other similar protocols like HTTP/2 WebTransport). Flow control in HTTP/2 is maintained in a simpler way, via a configured flow value that is used to maintain the window to a reasonable value when data is received. The 'relay' data_delivery has been implemented for both HTTP/2 and HTTP/3. It has not been implemented for HTTP/1.1 since switching protocol there overrides the connection process. HTTP/2 Websocket is now better tested. A bug was fixed with the 'stream_handlers' data_delivery where active mode would not be reenabled if it was disabled at some point. The Websocket performance suite has been updated to include tests that do not use Gun. Websocket modules used by the performance suite use the 'relay' data_delivery now. Performance is improved significantly with 'relay', between 10% and 20% faster. HTTP/2 Websocket performance is not on par with HTTP/1.1 still, but the remaining difference is thought to be from the HTTP/2 overhead and flow control.
-rw-r--r--doc/src/manual/cowboy_websocket.asciidoc38
-rw-r--r--src/cowboy_http2.erl59
-rw-r--r--src/cowboy_http3.erl54
-rw-r--r--src/cowboy_quicer.erl13
-rw-r--r--src/cowboy_websocket.erl62
-rw-r--r--src/cowboy_webtransport.erl1
-rw-r--r--test/handlers/ws_active_commands_h.erl4
-rw-r--r--test/handlers/ws_deflate_commands_h.erl7
-rw-r--r--test/handlers/ws_handle_commands_h.erl14
-rw-r--r--test/handlers/ws_ignore.erl1
-rw-r--r--test/handlers/ws_info_commands_h.erl14
-rw-r--r--test/handlers/ws_init_commands_h.erl14
-rw-r--r--test/handlers/ws_init_h.erl4
-rw-r--r--test/handlers/ws_set_options_commands_h.erl7
-rw-r--r--test/handlers/ws_shutdown_reason_commands_h.erl4
-rw-r--r--test/rfc7231_SUITE.erl2
-rw-r--r--test/rfc8441_SUITE.erl3
-rw-r--r--test/ws_SUITE.erl2
-rw-r--r--test/ws_SUITE_data/ws_echo.erl1
-rw-r--r--test/ws_handler_SUITE.erl82
-rw-r--r--test/ws_perf_SUITE.erl187
21 files changed, 494 insertions, 79 deletions
diff --git a/doc/src/manual/cowboy_websocket.asciidoc b/doc/src/manual/cowboy_websocket.asciidoc
index 319dbae..f30ffca 100644
--- a/doc/src/manual/cowboy_websocket.asciidoc
+++ b/doc/src/manual/cowboy_websocket.asciidoc
@@ -200,14 +200,16 @@ Cowboy does it automatically for you.
[source,erlang]
----
opts() :: #{
- active_n => pos_integer(),
- compress => boolean(),
- deflate_opts => cow_ws:deflate_opts()
- dynamic_buffer => false | {pos_integer(), pos_integer()},
- idle_timeout => timeout(),
- max_frame_size => non_neg_integer() | infinity,
- req_filter => fun((cowboy_req:req()) -> map()),
- validate_utf8 => boolean()
+ active_n => pos_integer(),
+ compress => boolean(),
+ data_delivery => stream_handlers | relay,
+ data_delivery_flow => pos_integer(),
+ deflate_opts => cow_ws:deflate_opts(),
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ idle_timeout => timeout(),
+ max_frame_size => non_neg_integer() | infinity,
+ req_filter => fun((cowboy_req:req()) -> map()),
+ validate_utf8 => boolean()
}
----
@@ -241,6 +243,22 @@ Whether to enable the Websocket frame compression
extension. Frames will only be compressed for the
clients that support this extension.
+data_delivery (stream_handlers)::
+
+HTTP/2+ only. Determines how data will be delivered
+to the Websocket session process. `stream_handlers`
+is the default and makes data go through stream
+handlers. `relay` is a faster method introduced in
+Cowboy 2.14 and sends data directly. `relay` is
+intended to become the default in Cowboy 3.0.
+
+data_delivery_flow (pos_integer())::
+
+When the `relay` data delivery method is used,
+this value may be used to decide how much the
+flow control window should be for the Websocket
+stream. Currently only applies to HTTP/2.
+
deflate_opts (#{})::
Configuration for the permessage-deflate Websocket
@@ -299,6 +317,10 @@ normal circumstances if necessary.
== Changelog
+* *2.14*: The `data_delivery` and `data_delivery_flow` options
+ were added. The `relay` data delivery mechanism
+ provides a better way of forwarding data to
+ HTTP/2+ Websocket session processes.
* *2.13*: The `active_n` default value was changed to `1`.
* *2.13*: The `dynamic_buffer` option was added.
* *2.13*: The `max_frame_size` option can now be set dynamically.
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 0d22fa1..5cb0585 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -82,8 +82,13 @@
-export_type([opts/0]).
-record(stream, {
- %% Whether the stream is currently stopping.
- status = running :: running | stopping,
+ %% Whether the stream is currently in a special state.
+ %%
+ %% - The running state is the normal state of a stream.
+ %% - The relaying state is used by extended CONNECT protocols to
+ %% use a 'relay' data_delivery method.
+ %% - The stopping state indicates the stream used the 'stop' command.
+ status = running :: running | {relaying, non_neg_integer(), pid()} | stopping,
%% Flow requested for this stream.
flow = 0 :: non_neg_integer(),
@@ -327,6 +332,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
before_loop(info(State, StreamID, Msg), Buffer);
+ {'$cowboy_relay_command', {Pid, StreamID}, RelayCommand} when Pid =:= self() ->
+ before_loop(relay_command(State, StreamID, RelayCommand), Buffer);
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
before_loop(down(State, Pid, Msg), Buffer);
@@ -520,6 +527,14 @@ data_frame(State0=#state{opts=Opts, flow=Flow0, streams=Streams}, StreamID, IsFi
reset_stream(State0, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:data/4.'})
end;
+ %% Stream handlers are not used for the data when relaying.
+ #{StreamID := #stream{status={relaying, _, RelayPid}}} ->
+ RelayPid ! {'$cowboy_relay_data', {self(), StreamID}, IsFin, Data},
+ %% We keep a steady flow using the configured flow value.
+ %% Because we do not change the 'flow' value the update_window/2
+ %% function will always maintain this value (of course with
+ %% thresholds applying).
+ update_window(State0, StreamID);
%% We ignore DATA frames for streams that are stopping.
#{} ->
State0
@@ -866,6 +881,26 @@ commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade},
commands(State, StreamID, Tail);
%% Use a different protocol within the stream (CONNECT :protocol).
%% @todo Make sure we error out when the feature is disabled.
+%% There are two data_delivery: stream_handlers and relay.
+%% The former just has the data go through stream handlers
+%% like normal requests. The latter relays data directly.
+%%
+%% @todo When relaying there might be some data that is
+%% in stream handlers and that need to be received,
+%% depending on whether the protocol sends data
+%% before processing the response.
+commands(State0=#state{flow=Flow, streams=Streams}, StreamID,
+ [{switch_protocol, Headers, _Mod, ModState=#{data_delivery := relay}}|Tail]) ->
+ State1 = info(State0, StreamID, {headers, 200, Headers}),
+ #{StreamID := Stream} = Streams,
+ #{data_delivery_pid := RelayPid} = ModState,
+ %% WINDOW_UPDATE frames updating the window will be sent after
+ %% the first DATA frame has been received.
+ RelayFlow = maps:get(data_delivery_flow, ModState, 131072),
+ State = State1#state{flow=Flow + RelayFlow, streams=Streams#{StreamID => Stream#stream{
+ status={relaying, RelayFlow, RelayPid},
+ flow=RelayFlow}}},
+ commands(State, StreamID, Tail);
commands(State0, StreamID, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
State = info(State0, StreamID, {headers, 200, Headers}),
commands(State, StreamID, Tail);
@@ -881,6 +916,26 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
cowboy:log(Log, Opts),
commands(State, StreamID, Tail).
+%% Relay data delivery commands.
+
+relay_command(State, StreamID, DataCmd = {data, _, _}) ->
+ commands(State, StreamID, [DataCmd]);
+%% When going active mode again we set the RelayFlow again
+%% and update the window if necessary.
+relay_command(State=#state{flow=Flow, streams=Streams}, StreamID, active) ->
+ #{StreamID := Stream} = Streams,
+ #stream{status={relaying, RelayFlow, _}} = Stream,
+ update_window(State#state{flow=Flow + RelayFlow,
+ streams=Streams#{StreamID => Stream#stream{flow=RelayFlow}}},
+ StreamID);
+%% When going passive mode we don't update the window
+%% since we have not incremented it.
+relay_command(State=#state{flow=Flow, streams=Streams}, StreamID, passive) ->
+ #{StreamID := Stream} = Streams,
+ #stream{flow=StreamFlow} = Stream,
+ State#state{flow=Flow - StreamFlow,
+ streams=Streams#{StreamID => Stream#stream{flow=0}}}.
+
%% Tentatively update the window after the flow was updated.
update_window(State0=#state{socket=Socket, transport=Transport,
diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl
index 9aa6be5..740b9e3 100644
--- a/src/cowboy_http3.erl
+++ b/src/cowboy_http3.erl
@@ -67,6 +67,7 @@
%% Whether the stream is currently in a special state.
status :: header | {unidi, control | encoder | decoder}
| normal | {data | ignore, non_neg_integer()} | stopping
+ | {relaying, normal | {data, non_neg_integer()}, pid()}
| {webtransport_session, normal | {ignore, non_neg_integer()}}
| {webtransport_stream, cow_http3:stream_id()},
@@ -164,6 +165,8 @@ loop(State0=#state{opts=Opts, children=Children}) ->
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
loop(info(State0, StreamID, Msg));
+ {'$cowboy_relay_command', {Pid, StreamID}, RelayCommand} when Pid =:= self() ->
+ loop(relay_command(State0, StreamID, RelayCommand));
%% WebTransport commands.
{'$webtransport_commands', SessionID, Commands} ->
loop(webtransport_commands(State0, SessionID, Commands));
@@ -299,6 +302,22 @@ parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) ->
parse(frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin),
StreamID, Rest, IsFin)
end;
+%% This clause mirrors the {data, Len} clause.
+parse1(State, Stream=#stream{status={relaying, {data, Len}, RelayPid}, id=StreamID},
+ Data, IsFin) ->
+ DataLen = byte_size(Data),
+ if
+ DataLen < Len ->
+ %% We don't have the full frame but this is the end of the
+ %% data we have. So FrameIsFin is equivalent to IsFin here.
+ loop(frame(State, Stream#stream{status={relaying, {data, Len - DataLen}, RelayPid}},
+ {data, Data}, IsFin));
+ true ->
+ <<Data1:Len/binary, Rest/bits>> = Data,
+ FrameIsFin = is_fin(IsFin, Rest),
+ parse(frame(State, Stream#stream{status={relaying, normal, RelayPid}},
+ {data, Data1}, FrameIsFin), StreamID, Rest, IsFin)
+ end;
parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) ->
DataLen = byte_size(Data),
if
@@ -311,7 +330,7 @@ parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) ->
end;
%% @todo Clause that discards receiving data for stopping streams.
%% We may receive a few more frames after we abort receiving.
-parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
+parse1(State=#state{opts=Opts}, Stream=#stream{status=Status0, id=StreamID}, Data, IsFin) ->
case cow_http3:parse(Data) of
{ok, Frame, Rest} ->
FrameIsFin = is_fin(IsFin, Rest),
@@ -322,6 +341,10 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
{more, Frame = {data, _}, Len} ->
%% We're at the end of the data so FrameIsFin is equivalent to IsFin.
case IsFin of
+ nofin when element(1, Status0) =:= relaying ->
+ %% The stream will be stored at the end of processing commands.
+ Status = setelement(2, Status0, {data, Len}),
+ loop(frame(State, Stream#stream{status=Status}, Frame, nofin));
nofin ->
%% The stream will be stored at the end of processing commands.
loop(frame(State, Stream#stream{status={data, Len}}, Frame, nofin));
@@ -432,6 +455,9 @@ frame(State=#state{http3_machine=HTTP3Machine0},
terminate(State#state{http3_machine=HTTP3Machine}, Error)
end.
+data_frame(State, Stream=#stream{status={relaying, _, RelayPid}, id=StreamID}, IsFin, Data) ->
+ RelayPid ! {'$cowboy_relay_data', {self(), StreamID}, IsFin, Data},
+ stream_store(State, Stream);
data_frame(State=#state{opts=Opts},
Stream=#stream{id=StreamID, state=StreamState0}, IsFin, Data) ->
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
@@ -767,6 +793,18 @@ commands(State0, Stream0=#stream{id=StreamID},
},
%% @todo We must propagate the buffer to capsule handling if any.
commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail);
+%% There are two data_delivery: stream_handlers and relay.
+%% The former just has the data go through stream handlers
+%% like normal requests. The latter relays data directly.
+commands(State0, Stream0=#stream{id=StreamID},
+ [{switch_protocol, Headers, _Mod, ModState=#{data_delivery := relay}}|Tail]) ->
+ State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
+ Stream1 = #stream{status=normal} = stream_get(State, StreamID),
+ #{data_delivery_pid := RelayPid} = ModState,
+ %% We do not set data_delivery_flow because it is managed by quicer
+ %% and we do not have an easy way to modify it.
+ Stream = Stream1#stream{status={relaying, normal, RelayPid}},
+ commands(State, Stream, Tail);
commands(State0, Stream0=#stream{id=StreamID},
[{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
@@ -872,6 +910,20 @@ send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID},
cowboy_quicer:send(Conn, EncoderID, EncData)),
State.
+%% Relay data delivery commands.
+
+relay_command(State, StreamID, DataCmd = {data, _, _}) ->
+ Stream = stream_get(State, StreamID),
+ commands(State, Stream, [DataCmd]);
+relay_command(State=#state{conn=Conn}, StreamID, active) ->
+ ok = maybe_socket_error(State,
+ cowboy_quicer:setopt(Conn, StreamID, active, true)),
+ State;
+relay_command(State=#state{conn=Conn}, StreamID, passive) ->
+ ok = maybe_socket_error(State,
+ cowboy_quicer:setopt(Conn, StreamID, active, false)),
+ State.
+
%% We mark the stream as being a WebTransport stream
%% and then continue parsing the data as a WebTransport
%% stream. This function is common for incoming unidi
diff --git a/src/cowboy_quicer.erl b/src/cowboy_quicer.erl
index aa52fae..d2f5433 100644
--- a/src/cowboy_quicer.erl
+++ b/src/cowboy_quicer.erl
@@ -25,6 +25,7 @@
%% Streams.
-export([start_bidi_stream/2]).
-export([start_unidi_stream/2]).
+-export([setopt/4]).
-export([send/3]).
-export([send/4]).
-export([send_datagram/2]).
@@ -54,6 +55,9 @@ start_bidi_stream(_, _) -> no_quicer().
-spec start_unidi_stream(_, _) -> no_return().
start_unidi_stream(_, _) -> no_quicer().
+-spec setopt(_, _, _, _) -> no_return().
+setopt(_, _, _, _) -> no_quicer().
+
-spec send(_, _, _) -> no_return().
send(_, _, _) -> no_quicer().
@@ -109,7 +113,7 @@ sockname(Conn) ->
| {error, any()}.
peercert(Conn) ->
- quicer_nif:peercert(Conn).
+ quicer_nif:peercert(Conn).
-spec shutdown(quicer_connection_handle(), quicer_app_errno())
-> ok | {error, any()}.
@@ -154,6 +158,13 @@ start_stream(Conn, InitialData, OpenFlag) ->
Error
end.
+-spec setopt(quicer_connection_handle(), cow_http3:stream_id(), active, boolean())
+ -> ok | {error, any()}.
+
+setopt(_Conn, StreamID, active, Value) ->
+ StreamRef = get({quicer_stream, StreamID}),
+ quicer:setopt(StreamRef, active, Value).
+
-spec send(quicer_connection_handle(), cow_http3:stream_id(), iodata())
-> ok | {error, any()}.
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index cb30c3f..65289cd 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -68,6 +68,8 @@
-type opts() :: #{
active_n => pos_integer(),
compress => boolean(),
+ data_delivery => stream_handlers | relay,
+ data_delivery_flow => pos_integer(),
deflate_opts => cow_ws:deflate_opts(),
dynamic_buffer => false | {pos_integer(), pos_integer()},
dynamic_buffer_initial_average => non_neg_integer(),
@@ -91,7 +93,7 @@
parent :: undefined | pid(),
ref :: ranch:ref(),
socket = undefined :: inet:socket() | {pid(), cowboy_stream:streamid()} | undefined,
- transport = undefined :: module() | undefined,
+ transport :: module() | {data_delivery, stream_handlers | relay},
opts = #{} :: opts(),
active = true :: boolean(),
handler :: module(),
@@ -149,7 +151,7 @@ upgrade(Req, Env, Handler, HandlerState) ->
%% @todo Immediately crash if a response has already been sent.
upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) ->
FilteredReq = case maps:get(req_filter, Opts, undefined) of
- undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req0);
+ undefined -> maps:with([method, version, scheme, host, port, path, qs, peer, streamid], Req0);
FilterFun -> FilterFun(Req0)
end,
Utf8State = case maps:get(validate_utf8, Opts, true) of
@@ -273,12 +275,27 @@ websocket_handshake(State=#state{key=Key},
%% For HTTP/2 we do not let the process die, we instead keep it
%% for the Websocket stream. This is because in HTTP/2 we only
%% have a stream, it doesn't take over the whole connection.
-websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
+%%
+%% There are two methods of delivering data to the Websocket session:
+%% - 'stream_handlers' is the default and makes the data go
+%% through stream handlers just like when reading a request body;
+%% - 'relay' is a new method where data is sent as a message as
+%% soon as it is received from the socket in a DATA frame.
+websocket_handshake(State=#state{opts=Opts},
+ Req=#{ref := Ref, pid := Pid, streamid := StreamID},
HandlerState, _Env) ->
%% @todo We don't want date and server headers.
Headers = cowboy_req:response_headers(#{}, Req),
- Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}},
- takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>,
+ DataDelivery = maps:get(data_delivery, Opts, stream_handlers),
+ ModState = #{
+ data_delivery => DataDelivery,
+ %% For relay data_delivery. The flow is a hint and may
+ %% not be used by the underlying protocol.
+ data_delivery_pid => self(),
+ data_delivery_flow => maps:get(data_delivery_flow, Opts, 131072)
+ },
+ Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, ModState}},
+ takeover(Pid, Ref, {Pid, StreamID}, {data_delivery, DataDelivery}, #{}, <<>>,
{State, HandlerState}).
%% Connection process.
@@ -301,7 +318,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
-type parse_state() :: #ps_header{} | #ps_payload{}.
-spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()},
- module() | undefined, any(), binary(),
+ module() | {data_delivery, stream_handlers | relay}, any(), binary(),
{#state{}, any()}) -> no_return().
takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
{State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) ->
@@ -311,7 +328,7 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
_ -> ranch:remove_connection(Ref)
end,
Messages = case Transport of
- undefined -> undefined;
+ {data_delivery, _} -> undefined;
_ -> Transport:messages()
end,
State = set_idle_timeout(State0#state{parent=Parent,
@@ -355,13 +372,14 @@ after_init(State, HandlerState, ParseState) ->
%% immediately but there might still be data to be processed in
%% the message queue.
-setopts_active(#state{transport=undefined}) ->
+setopts_active(#state{transport={data_delivery, _}}) ->
ok;
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
-maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
+maybe_read_body(#state{transport={data_delivery, stream_handlers},
+ socket=Stream={Pid, _}, active=true}) ->
%% @todo Keep Ref around.
ReadBodyRef = make_ref(),
Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}},
@@ -369,16 +387,25 @@ maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}
maybe_read_body(_) ->
ok.
-active(State) ->
+active(State=#state{transport={data_delivery, relay},
+ socket=Stream={Pid, _}}) ->
+ Pid ! {'$cowboy_relay_command', Stream, active},
+ State#state{active=true};
+active(State0) ->
+ State = State0#state{active=true},
setopts_active(State),
maybe_read_body(State),
- State#state{active=true}.
+ State.
-passive(State=#state{transport=undefined}) ->
+passive(State=#state{transport={data_delivery, stream_handlers}}) ->
%% Unfortunately we cannot currently cancel read_body.
%% But that's OK, we will just stop reading the body
%% after the next message.
State#state{active=false};
+passive(State=#state{transport={data_delivery, relay},
+ socket=Stream={Pid, _}}) ->
+ Pid ! {'$cowboy_relay_command', Stream, passive},
+ State#state{active=false};
passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) ->
Transport:setopts(Socket, [{active, false}]),
flush_passive(Socket, Messages),
@@ -454,6 +481,10 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
{request_body, _Ref, fin, _, Data} ->
maybe_read_body(State),
parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
+ %% @todo It would be better to check StreamID.
+ %% @todo We must ensure that IsFin=fin is handled like a socket close?
+ {'$cowboy_relay_data', {Pid, _StreamID}, _IsFin, Data} when Pid =:= Parent ->
+ parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
%% Timeouts.
{timeout, TRef, ?MODULE} ->
tick_idle_timeout(State, HandlerState, ParseState);
@@ -662,9 +693,14 @@ commands([Frame|Tail], State, Data0) ->
commands(Tail, State, Data)
end.
-transport_send(#state{socket=Stream={Pid, _}, transport=undefined}, IsFin, Data) ->
+transport_send(#state{transport={data_delivery, stream_handlers},
+ socket=Stream={Pid, _}}, IsFin, Data) ->
Pid ! {Stream, {data, IsFin, Data}},
ok;
+transport_send(#state{transport={data_delivery, relay},
+ socket=Stream={Pid, _}}, IsFin, Data) ->
+ Pid ! {'$cowboy_relay_command', Stream, {data, IsFin, Data}},
+ ok;
transport_send(#state{socket=Socket, transport=Transport}, _, Data) ->
Transport:send(Socket, Data).
diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl
index 8c8ca39..7bcac55 100644
--- a/src/cowboy_webtransport.erl
+++ b/src/cowboy_webtransport.erl
@@ -277,6 +277,7 @@ handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) ->
%% callback is not implemented.
%%
%% @todo Better type than map() for the cowboy_stream state.
+%% @todo Is this really useful?
-spec info(cowboy_stream:streamid(), any(), State)
-> {cowboy_stream:commands(), State} when State::map().
diff --git a/test/handlers/ws_active_commands_h.erl b/test/handlers/ws_active_commands_h.erl
index 1c615e3..bbf8907 100644
--- a/test/handlers/ws_active_commands_h.erl
+++ b/test/handlers/ws_active_commands_h.erl
@@ -9,8 +9,8 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req, RunOrHibernate) ->
- {cowboy_websocket, Req, RunOrHibernate}.
+init(Req, Opts) ->
+ {cowboy_websocket, Req, maps:get(run_or_hibernate, Opts), Opts}.
websocket_init(State=run) ->
erlang:send_after(1500, self(), active_true),
diff --git a/test/handlers/ws_deflate_commands_h.erl b/test/handlers/ws_deflate_commands_h.erl
index 14236bc..be3e16e 100644
--- a/test/handlers/ws_deflate_commands_h.erl
+++ b/test/handlers/ws_deflate_commands_h.erl
@@ -8,10 +8,11 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req, RunOrHibernate) ->
+init(Req, Opts) ->
+ DataDelivery = maps:get(data_delivery, Opts, stream_handlers),
{cowboy_websocket, Req,
- #{deflate => true, hibernate => RunOrHibernate},
- #{compress => true}}.
+ #{deflate => true, hibernate => maps:get(run_or_hibernate, Opts)},
+ #{compress => true, data_delivery => DataDelivery}}.
websocket_handle(Frame, State=#{deflate := Deflate0, hibernate := run}) ->
Deflate = not Deflate0,
diff --git a/test/handlers/ws_handle_commands_h.erl b/test/handlers/ws_handle_commands_h.erl
index da3ffad..0a8513b 100644
--- a/test/handlers/ws_handle_commands_h.erl
+++ b/test/handlers/ws_handle_commands_h.erl
@@ -9,14 +9,20 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req=#{pid := Pid}, RunOrHibernate) ->
+init(Req, Opts) ->
Commands0 = cowboy_req:header(<<"x-commands">>, Req),
Commands = binary_to_term(base64:decode(Commands0)),
case Commands of
- bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
- _ -> ok
+ bad ->
+ Pid = case Req of
+ #{version := 'HTTP/2'} -> self();
+ #{pid := Pid0} -> Pid0
+ end,
+ ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
+ _ ->
+ ok
end,
- {cowboy_websocket, Req, {Commands, RunOrHibernate}}.
+ {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}.
websocket_init(State) ->
{[], State}.
diff --git a/test/handlers/ws_ignore.erl b/test/handlers/ws_ignore.erl
index 9fe3322..37595aa 100644
--- a/test/handlers/ws_ignore.erl
+++ b/test/handlers/ws_ignore.erl
@@ -8,6 +8,7 @@
init(Req, _) ->
{cowboy_websocket, Req, undefined, #{
+ data_delivery => relay,
compress => true
}}.
diff --git a/test/handlers/ws_info_commands_h.erl b/test/handlers/ws_info_commands_h.erl
index d596473..2115727 100644
--- a/test/handlers/ws_info_commands_h.erl
+++ b/test/handlers/ws_info_commands_h.erl
@@ -10,14 +10,20 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req=#{pid := Pid}, RunOrHibernate) ->
+init(Req, Opts) ->
Commands0 = cowboy_req:header(<<"x-commands">>, Req),
Commands = binary_to_term(base64:decode(Commands0)),
case Commands of
- bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
- _ -> ok
+ bad ->
+ Pid = case Req of
+ #{version := 'HTTP/2'} -> self();
+ #{pid := Pid0} -> Pid0
+ end,
+ ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
+ _ ->
+ ok
end,
- {cowboy_websocket, Req, {Commands, RunOrHibernate}}.
+ {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}.
websocket_init(State) ->
self() ! shoot,
diff --git a/test/handlers/ws_init_commands_h.erl b/test/handlers/ws_init_commands_h.erl
index 8bae352..d821b18 100644
--- a/test/handlers/ws_init_commands_h.erl
+++ b/test/handlers/ws_init_commands_h.erl
@@ -9,14 +9,20 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req=#{pid := Pid}, RunOrHibernate) ->
+init(Req, Opts) ->
Commands0 = cowboy_req:header(<<"x-commands">>, Req),
Commands = binary_to_term(base64:decode(Commands0)),
case Commands of
- bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
- _ -> ok
+ bad ->
+ Pid = case Req of
+ #{version := 'HTTP/2'} -> self();
+ #{pid := Pid0} -> Pid0
+ end,
+ ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
+ _ ->
+ ok
end,
- {cowboy_websocket, Req, {Commands, RunOrHibernate}}.
+ {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}.
websocket_init(State={Commands, run}) ->
{Commands, State};
diff --git a/test/handlers/ws_init_h.erl b/test/handlers/ws_init_h.erl
index bbe9ef9..c44986f 100644
--- a/test/handlers/ws_init_h.erl
+++ b/test/handlers/ws_init_h.erl
@@ -8,9 +8,9 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req, _) ->
+init(Req, Opts) ->
State = binary_to_atom(cowboy_req:qs(Req), latin1),
- {cowboy_websocket, Req, State}.
+ {cowboy_websocket, Req, State, Opts}.
%% Sleep to make sure the HTTP response was sent.
websocket_init(State) ->
diff --git a/test/handlers/ws_set_options_commands_h.erl b/test/handlers/ws_set_options_commands_h.erl
index 1ab0af4..af768d6 100644
--- a/test/handlers/ws_set_options_commands_h.erl
+++ b/test/handlers/ws_set_options_commands_h.erl
@@ -7,9 +7,10 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req, RunOrHibernate) ->
- {cowboy_websocket, Req, RunOrHibernate,
- #{idle_timeout => infinity}}.
+init(Req, Opts) ->
+ DataDelivery = maps:get(data_delivery, Opts, stream_handlers),
+ {cowboy_websocket, Req, maps:get(run_or_hibernate, Opts),
+ #{idle_timeout => infinity, data_delivery => DataDelivery}}.
%% Set the idle_timeout option dynamically.
websocket_handle({text, <<"idle_timeout_short">>}, State=run) ->
diff --git a/test/handlers/ws_shutdown_reason_commands_h.erl b/test/handlers/ws_shutdown_reason_commands_h.erl
index 90b435c..878b70a 100644
--- a/test/handlers/ws_shutdown_reason_commands_h.erl
+++ b/test/handlers/ws_shutdown_reason_commands_h.erl
@@ -10,9 +10,9 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req, RunOrHibernate) ->
+init(Req, Opts) ->
TestPid = list_to_pid(binary_to_list(cowboy_req:header(<<"x-test-pid">>, Req))),
- {cowboy_websocket, Req, {TestPid, RunOrHibernate}}.
+ {cowboy_websocket, Req, {TestPid, maps:get(run_or_hibernate, Opts)}, Opts}.
websocket_init(State={TestPid, RunOrHibernate}) ->
TestPid ! {ws_pid, self()},
diff --git a/test/rfc7231_SUITE.erl b/test/rfc7231_SUITE.erl
index 183fa0f..a42ca3f 100644
--- a/test/rfc7231_SUITE.erl
+++ b/test/rfc7231_SUITE.erl
@@ -44,7 +44,7 @@ init_dispatch(_) ->
{"/echo/:key", echo_h, []},
{"/delay/echo/:key", echo_h, []},
{"/resp/:key[/:arg]", resp_h, []},
- {"/ws", ws_init_h, []}
+ {"/ws", ws_init_h, #{}}
]}]).
%% @todo The documentation should list what methods, headers and status codes
diff --git a/test/rfc8441_SUITE.erl b/test/rfc8441_SUITE.erl
index b788f9f..efa753c 100644
--- a/test/rfc8441_SUITE.erl
+++ b/test/rfc8441_SUITE.erl
@@ -396,6 +396,9 @@ accept_handshake_when_enabled(Config) ->
MaskedHello = ws_SUITE:do_mask(<<"Hello">>, Mask, <<>>),
ok = gen_tcp:send(Socket, cow_http2:data(1, nofin,
<<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedHello/binary>>)),
+ %% Ignore expected WINDOW_UPDATE frames.
+ {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000),
+ {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000),
{ok, <<Len2:24, _:8, _:8, _:32>>} = gen_tcp:recv(Socket, 9, 1000),
{ok, <<1:1, 0:3, 1:4, 0:1, 5:7, "Hello">>} = gen_tcp:recv(Socket, Len2, 1000),
ok.
diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl
index 6fa4e61..2f1d3e2 100644
--- a/test/ws_SUITE.erl
+++ b/test/ws_SUITE.erl
@@ -42,7 +42,7 @@ init_dispatch() ->
{"localhost", [
{"/ws_echo", ws_echo, []},
{"/ws_echo_timer", ws_echo_timer, []},
- {"/ws_init", ws_init_h, []},
+ {"/ws_init", ws_init_h, #{}},
{"/ws_init_shutdown", ws_init_shutdown, []},
{"/ws_send_many", ws_send_many, [
{sequence, [
diff --git a/test/ws_SUITE_data/ws_echo.erl b/test/ws_SUITE_data/ws_echo.erl
index efdc204..1f3a4be 100644
--- a/test/ws_SUITE_data/ws_echo.erl
+++ b/test/ws_SUITE_data/ws_echo.erl
@@ -8,6 +8,7 @@
init(Req, _) ->
{cowboy_websocket, Req, undefined, #{
+ data_delivery => relay,
compress => true
}}.
diff --git a/test/ws_handler_SUITE.erl b/test/ws_handler_SUITE.erl
index ab9dbe2..00c584d 100644
--- a/test/ws_handler_SUITE.erl
+++ b/test/ws_handler_SUITE.erl
@@ -19,21 +19,40 @@
-import(ct_helper, [config/2]).
-import(ct_helper, [doc/1]).
-import(cowboy_test, [gun_open/1]).
+-import(cowboy_test, [gun_open/2]).
-import(cowboy_test, [gun_down/1]).
%% ct.
all() ->
- [{group, ws}, {group, ws_hibernate}].
+ [
+ {group, h1},
+ {group, h1_hibernate},
+ {group, h2},
+ {group, h2_relay}
+ ].
%% @todo Test against HTTP/2 too.
groups() ->
AllTests = ct_helper:all(?MODULE),
- [{ws, [parallel], AllTests}, {ws_hibernate, [parallel], AllTests}].
-
-init_per_group(Name, Config) ->
+ [
+ {h1, [parallel], AllTests},
+ {h1_hibernate, [parallel], AllTests},
+ %% The websocket_deflate_false test isn't compatible with HTTP/2.
+ {h2, [parallel], AllTests -- [websocket_deflate_false]},
+ {h2_relay, [parallel], AllTests -- [websocket_deflate_false]}
+ ].
+
+init_per_group(Name, Config)
+ when Name =:= h1; Name =:= h1_hibernate ->
cowboy_test:init_http(Name, #{
env => #{dispatch => init_dispatch(Name)}
+ }, Config);
+init_per_group(Name, Config)
+ when Name =:= h2; Name =:= h2_relay ->
+ cowboy_test:init_http2(Name, #{
+ enable_connect_protocol => true,
+ env => #{dispatch => init_dispatch(Name)}
}, Config).
end_per_group(Name, _) ->
@@ -42,25 +61,27 @@ end_per_group(Name, _) ->
%% Dispatch configuration.
init_dispatch(Name) ->
- RunOrHibernate = case Name of
- ws -> run;
- ws_hibernate -> hibernate
+ InitialState = case Name of
+ h1_hibernate -> #{run_or_hibernate => hibernate};
+ h2_relay -> #{run_or_hibernate => run, data_delivery => relay};
+ _ -> #{run_or_hibernate => run}
end,
cowboy_router:compile([{'_', [
- {"/init", ws_init_commands_h, RunOrHibernate},
- {"/handle", ws_handle_commands_h, RunOrHibernate},
- {"/info", ws_info_commands_h, RunOrHibernate},
- {"/trap_exit", ws_init_h, RunOrHibernate},
- {"/active", ws_active_commands_h, RunOrHibernate},
- {"/deflate", ws_deflate_commands_h, RunOrHibernate},
- {"/set_options", ws_set_options_commands_h, RunOrHibernate},
- {"/shutdown_reason", ws_shutdown_reason_commands_h, RunOrHibernate}
+ {"/init", ws_init_commands_h, InitialState},
+ {"/handle", ws_handle_commands_h, InitialState},
+ {"/info", ws_info_commands_h, InitialState},
+ {"/trap_exit", ws_init_h, InitialState},
+ {"/active", ws_active_commands_h, InitialState},
+ {"/deflate", ws_deflate_commands_h, InitialState},
+ {"/set_options", ws_set_options_commands_h, InitialState},
+ {"/shutdown_reason", ws_shutdown_reason_commands_h, InitialState}
]}]).
%% Support functions for testing using Gun.
gun_open_ws(Config, Path, Commands) ->
- ConnPid = gun_open(Config),
+ ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}),
+ do_await_enable_connect_protocol(config(protocol, Config), ConnPid),
StreamRef = gun:ws_upgrade(ConnPid, Path, [
{<<"x-commands">>, base64:encode(term_to_binary(Commands))}
]),
@@ -75,6 +96,13 @@ gun_open_ws(Config, Path, Commands) ->
error(timeout)
end.
+do_await_enable_connect_protocol(http, _) ->
+ ok;
+do_await_enable_connect_protocol(http2, ConnPid) ->
+ {notify, settings_changed, #{enable_connect_protocol := true}}
+ = gun:await(ConnPid, undefined), %% @todo Maybe have a gun:await/1?
+ ok.
+
receive_ws(ConnPid, StreamRef) ->
receive
{gun_ws, ConnPid, StreamRef, Frame} ->
@@ -123,7 +151,14 @@ websocket_info_invalid(Config) ->
do_invalid(Config, Path) ->
{ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, bad),
ensure_handle_is_called(ConnPid, StreamRef, Path),
- gun_down(ConnPid).
+ case config(protocol, Config) of
+ %% HTTP/1.1 closes the connection.
+ http -> gun_down(ConnPid);
+ %% HTTP/2 terminates the stream.
+ http2 ->
+ receive {gun_error, ConnPid, StreamRef, {stream_error, internal_error, _}} -> ok
+ after 500 -> error(timeout) end
+ end.
websocket_init_one_frame(Config) ->
doc("A single frame is received when websocket_init/1 returns it as a command."),
@@ -223,8 +258,12 @@ websocket_active_false(Config) ->
doc("The {active, false} command stops receiving data from the socket. "
"The {active, true} command reenables it."),
{ok, ConnPid, StreamRef} = gun_open_ws(Config, "/active", []),
+ %% We must exhaust the HTTP/2 flow control window
+ %% otherwise the frame will be received even if active mode is disabled.
+ gun:ws_send(ConnPid, StreamRef, {binary, <<0:100000/unit:8>>}),
gun:ws_send(ConnPid, StreamRef, {text, <<"Not received until the handler enables active again.">>}),
{error, timeout} = receive_ws(ConnPid, StreamRef),
+ {ok, {binary, _}} = receive_ws(ConnPid, StreamRef),
{ok, {text, <<"Not received until the handler enables active again.">>}}
= receive_ws(ConnPid, StreamRef),
ok.
@@ -271,7 +310,8 @@ websocket_deflate_ignore_if_not_negotiated(Config) ->
websocket_set_options_idle_timeout(Config) ->
doc("The idle_timeout option can be modified using the "
"command {set_options, Opts} at runtime."),
- ConnPid = gun_open(Config),
+ ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}),
+ do_await_enable_connect_protocol(config(protocol, Config), ConnPid),
StreamRef = gun:ws_upgrade(ConnPid, "/set_options"),
receive
{gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} ->
@@ -299,7 +339,8 @@ websocket_set_options_idle_timeout(Config) ->
websocket_set_options_max_frame_size(Config) ->
doc("The max_frame_size option can be modified using the "
"command {set_options, Opts} at runtime."),
- ConnPid = gun_open(Config),
+ ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}),
+ do_await_enable_connect_protocol(config(protocol, Config), ConnPid),
StreamRef = gun:ws_upgrade(ConnPid, "/set_options"),
receive
{gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} ->
@@ -334,7 +375,8 @@ websocket_set_options_max_frame_size(Config) ->
websocket_shutdown_reason(Config) ->
doc("The command {shutdown_reason, any()} can be used to "
"change the shutdown reason of a Websocket connection."),
- ConnPid = gun_open(Config),
+ ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}),
+ do_await_enable_connect_protocol(config(protocol, Config), ConnPid),
StreamRef = gun:ws_upgrade(ConnPid, "/shutdown_reason", [
{<<"x-test-pid">>, pid_to_list(self())}
]),
diff --git a/test/ws_perf_SUITE.erl b/test/ws_perf_SUITE.erl
index ff88554..98b4e18 100644
--- a/test/ws_perf_SUITE.erl
+++ b/test/ws_perf_SUITE.erl
@@ -55,14 +55,15 @@ init_per_group(Name, Config) when Name =:= h2c; Name =:= h2c_compress ->
h2c_compress -> {compress, #{stream_handlers => [cowboy_compress_h, cowboy_stream_h]}}
end,
Config1 = cowboy_test:init_http(Name, Opts#{
- connection_window_margin_size => 64*1024,
+ %% The margin sizes must be larger than the larger test message for plain sockets.
+ connection_window_margin_size => 128*1024,
enable_connect_protocol => true,
env => #{dispatch => init_dispatch(Config)},
max_frame_size_sent => 64*1024,
max_frame_size_received => 16384 * 1024 - 1,
max_received_frame_rate => {10_000_000, 1},
stream_window_data_threshold => 1024,
- stream_window_margin_size => 64*1024
+ stream_window_margin_size => 128*1024
}, [{flavor, Flavor}|Config]),
lists:keyreplace(protocol, 1, Config1, {protocol, http2});
init_per_group(ascii, Config) ->
@@ -265,13 +266,14 @@ send_N_16384B(Config) ->
do_send(Config, What, Num, FrameSize) ->
{ok, ConnPid, StreamRef} = do_gun_open_ws("/ws_ignore", Config),
+ %% Prepare the frame data.
FrameType = config(frame_type, Config),
FrameData = case FrameType of
text -> do_text_data(Config, FrameSize);
binary -> rand:bytes(FrameSize)
end,
%% Heat up the processes before doing the real run.
-% do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData),
+% do_send_loop(Socket, Num, FrameType, Mask, MaskedFrameData),
{Time, _} = timer:tc(?MODULE, do_send_loop, [ConnPid, StreamRef, Num, FrameType, FrameData]),
do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]),
gun:ws_send(ConnPid, StreamRef, close),
@@ -279,12 +281,181 @@ do_send(Config, What, Num, FrameSize) ->
gun_down(ConnPid).
do_send_loop(ConnPid, StreamRef, 0, _, _) ->
- gun:ws_send(ConnPid, StreamRef, {text, <<"CHECK">>}),
- {ok, {text, <<"CHECK">>}} = receive_ws(ConnPid, StreamRef),
- ok;
+ gun:ws_send(ConnPid, StreamRef, {text, <<"CHECK">>}),
+ {ok, {text, <<"CHECK">>}} = receive_ws(ConnPid, StreamRef),
+ ok;
do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData) ->
- gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}),
- do_send_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData).
+ gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}),
+ do_send_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData).
+
+tcp_send_N_00000B(Config) ->
+ doc("Send a 0B frame 10000 times."),
+ do_tcp_send(Config, tcps_N, 10000, 0).
+
+tcp_send_N_00256B(Config) ->
+ doc("Send a 256B frame 10000 times."),
+ do_tcp_send(Config, tcps_N, 10000, 256).
+
+tcp_send_N_01024B(Config) ->
+ doc("Send a 1024B frame 10000 times."),
+ do_tcp_send(Config, tcps_N, 10000, 1024).
+
+tcp_send_N_04096B(Config) ->
+ doc("Send a 4096B frame 10000 times."),
+ do_tcp_send(Config, tcps_N, 10000, 4096).
+
+tcp_send_N_16384B(Config) ->
+ doc("Send a 16384B frame 10000 times."),
+ do_tcp_send(Config, tcps_N, 10000, 16384).
+
+do_tcp_send(Config, What, Num, FrameSize) ->
+ {ok, Socket} = do_tcp_handshake(Config, #{}),
+ %% Prepare the frame data.
+ FrameType = config(frame_type, Config),
+ FrameData = case FrameType of
+ text -> do_text_data(Config, FrameSize);
+ binary -> rand:bytes(FrameSize)
+ end,
+ %% Mask the data outside the benchmark to avoid influencing the results.
+ Mask = 16#37fa213d,
+ MaskedFrameData = ws_SUITE:do_mask(FrameData, Mask, <<>>),
+ FrameSizeWithHeader = FrameSize + case FrameSize of
+ N when N =< 125 -> 6;
+ N when N =< 16#ffff -> 8;
+ N when N =< 16#7fffffffffffffff -> 14
+ end,
+ %% Run the benchmark; different function for h1 and h2.
+ {Time, _} = case config(protocol, Config) of
+ http -> timer:tc(?MODULE, do_tcp_send_loop_h1,
+ [Socket, Num, FrameType, Mask, MaskedFrameData]);
+ http2 -> timer:tc(?MODULE, do_tcp_send_loop_h2,
+ [Socket, 65535, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader])
+ end,
+ do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]),
+ gen_tcp:close(Socket).
+
+%% Do a prior knowledge handshake.
+do_tcp_handshake(Config, LocalSettings) ->
+ Protocol = config(protocol, Config),
+ Socket1 = case Protocol of
+ http ->
+ {ok, Socket, _} = ws_SUITE:do_handshake(<<"/ws_ignore">>, Config),
+ Socket;
+ http2 ->
+ {ok, Socket} = gen_tcp:connect("localhost", config(port, Config), [binary, {active, false}]),
+ %% Send a valid preface.
+ ok = gen_tcp:send(Socket, [
+ "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n",
+ cow_http2:settings(LocalSettings)
+ ]),
+ %% Receive the server preface.
+ {ok, << Len:24 >>} = gen_tcp:recv(Socket, 3, 1000),
+ {ok, << 4:8, 0:40, SettingsPayload:Len/binary >>} = gen_tcp:recv(Socket, 6 + Len, 1000),
+ RemoteSettings = cow_http2:parse_settings_payload(SettingsPayload),
+ #{enable_connect_protocol := true} = RemoteSettings,
+ %% Send the SETTINGS ack.
+ ok = gen_tcp:send(Socket, cow_http2:settings_ack()),
+ %% Receive the SETTINGS ack.
+ {ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000),
+ %% Send a CONNECT :protocol request to upgrade the stream to Websocket.
+ {ReqHeadersBlock, _} = cow_hpack:encode([
+ {<<":method">>, <<"CONNECT">>},
+ {<<":protocol">>, <<"websocket">>},
+ {<<":scheme">>, <<"http">>},
+ {<<":path">>, <<"/ws_ignore">>},
+ {<<":authority">>, <<"localhost">>}, %% @todo Correct port number.
+ {<<"sec-websocket-version">>, <<"13">>},
+ {<<"origin">>, <<"http://localhost">>}
+ ]),
+ ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)),
+ %% Receive a 200 response.
+ {ok, << Len1:24, 1:8, _:8, 1:32 >>} = gen_tcp:recv(Socket, 9, 1000),
+ {ok, RespHeadersBlock} = gen_tcp:recv(Socket, Len1, 1000),
+ {RespHeaders, _} = cow_hpack:decode(RespHeadersBlock),
+ {_, <<"200">>} = lists:keyfind(<<":status">>, 1, RespHeaders),
+ Socket
+ end,
+ %% Enable active mode to avoid delays in receiving data at the end of benchmark.
+ ok = inet:setopts(Socket1, [{active, true}]),
+ %% Stream number 1 is ready.
+ {ok, Socket1}.
+
+do_tcp_send_loop_h1(Socket, 0, _, Mask, _) ->
+ MaskedFrameData = ws_SUITE:do_mask(<<"CHECK">>, Mask, <<>>),
+ ok = gen_tcp:send(Socket,
+ <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedFrameData/binary>>),
+ do_tcp_wait_for_check(Socket);
+do_tcp_send_loop_h1(Socket, Num, FrameType, Mask, MaskedFrameData) ->
+ Len = byte_size(MaskedFrameData),
+ LenBits = case Len of
+ N when N =< 125 -> << N:7 >>;
+ N when N =< 16#ffff -> << 126:7, N:16 >>;
+ N when N =< 16#7fffffffffffffff -> << 127:7, N:64 >>
+ end,
+ FrameHeader = <<1:1, 0:3, 2:4, 1:1, LenBits/bits, Mask:32>>,
+ ok = gen_tcp:send(Socket, [
+ FrameHeader,
+ MaskedFrameData
+ ]),
+ do_tcp_send_loop_h1(Socket, Num - 1, FrameType, Mask, MaskedFrameData).
+
+do_tcp_send_loop_h2(Socket, _, 0, _, Mask, _, _) ->
+ MaskedFrameData = ws_SUITE:do_mask(<<"CHECK">>, Mask, <<>>),
+ ok = gen_tcp:send(Socket, cow_http2:data(1, nofin,
+ <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedFrameData/binary>>)),
+ do_tcp_wait_for_check(Socket);
+do_tcp_send_loop_h2(Socket, Window0, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader)
+ when Window0 < FrameSizeWithHeader ->
+ %% The remaining window isn't large enough so
+ %% we wait for WINDOW_UPDATE frames.
+ Window = do_tcp_wait_window_updates(Socket, Window0),
+ do_tcp_send_loop_h2(Socket, Window, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader);
+do_tcp_send_loop_h2(Socket, Window, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader) ->
+ Len = byte_size(MaskedFrameData),
+ LenBits = case Len of
+ N when N =< 125 -> << N:7 >>;
+ N when N =< 16#ffff -> << 126:7, N:16 >>;
+ N when N =< 16#7fffffffffffffff -> << 127:7, N:64 >>
+ end,
+ FrameHeader = <<1:1, 0:3, 2:4, 1:1, LenBits/bits, Mask:32>>,
+ ok = gen_tcp:send(Socket, cow_http2:data(1, nofin, [
+ FrameHeader,
+ MaskedFrameData
+ ])),
+ do_tcp_send_loop_h2(Socket, Window - FrameSizeWithHeader,
+ Num - 1, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader).
+
+do_tcp_wait_window_updates(Socket, Window) ->
+ receive
+ {tcp, Socket, Data} ->
+ do_tcp_wait_window_updates_parse(Socket, Window, Data)
+ after 0 ->
+ Window
+ end.
+
+do_tcp_wait_window_updates_parse(Socket, Window, Data) ->
+ case Data of
+ %% Ignore the connection-wide WINDOW_UPDATE.
+ <<4:24, 8:8, 0:8, 0:32, 0:1, _:31, Rest/bits>> ->
+ do_tcp_wait_window_updates_parse(Socket, Window, Rest);
+ %% Use the stream-specific WINDOW_UPDATE to increment our window.
+ <<4:24, 8:8, 0:8, 1:32, 0:1, Inc:31, Rest/bits>> ->
+ do_tcp_wait_window_updates_parse(Socket, Window + Inc, Rest);
+ %% Other frames are not expected.
+ <<>> ->
+ do_tcp_wait_window_updates(Socket, Window)
+ end.
+
+do_tcp_wait_for_check(Socket) ->
+ receive
+ {tcp, Socket, Data} ->
+ case binary:match(Data, <<"CHECK">>) of
+ nomatch -> do_tcp_wait_for_check(Socket);
+ _ -> ok
+ end
+ after 5000 ->
+ error(timeout)
+ end.
%% Internal.