aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/src/manual/cowboy_websocket.asciidoc38
-rw-r--r--src/cowboy_http2.erl59
-rw-r--r--src/cowboy_http3.erl54
-rw-r--r--src/cowboy_quicer.erl13
-rw-r--r--src/cowboy_websocket.erl62
-rw-r--r--src/cowboy_webtransport.erl1
-rw-r--r--test/handlers/ws_active_commands_h.erl4
-rw-r--r--test/handlers/ws_deflate_commands_h.erl7
-rw-r--r--test/handlers/ws_handle_commands_h.erl14
-rw-r--r--test/handlers/ws_ignore.erl1
-rw-r--r--test/handlers/ws_info_commands_h.erl14
-rw-r--r--test/handlers/ws_init_commands_h.erl14
-rw-r--r--test/handlers/ws_init_h.erl4
-rw-r--r--test/handlers/ws_set_options_commands_h.erl7
-rw-r--r--test/handlers/ws_shutdown_reason_commands_h.erl4
-rw-r--r--test/rfc7231_SUITE.erl2
-rw-r--r--test/rfc8441_SUITE.erl3
-rw-r--r--test/ws_SUITE.erl2
-rw-r--r--test/ws_SUITE_data/ws_echo.erl1
-rw-r--r--test/ws_handler_SUITE.erl82
-rw-r--r--test/ws_perf_SUITE.erl187
21 files changed, 494 insertions, 79 deletions
diff --git a/doc/src/manual/cowboy_websocket.asciidoc b/doc/src/manual/cowboy_websocket.asciidoc
index 319dbae..f30ffca 100644
--- a/doc/src/manual/cowboy_websocket.asciidoc
+++ b/doc/src/manual/cowboy_websocket.asciidoc
@@ -200,14 +200,16 @@ Cowboy does it automatically for you.
[source,erlang]
----
opts() :: #{
- active_n => pos_integer(),
- compress => boolean(),
- deflate_opts => cow_ws:deflate_opts()
- dynamic_buffer => false | {pos_integer(), pos_integer()},
- idle_timeout => timeout(),
- max_frame_size => non_neg_integer() | infinity,
- req_filter => fun((cowboy_req:req()) -> map()),
- validate_utf8 => boolean()
+ active_n => pos_integer(),
+ compress => boolean(),
+ data_delivery => stream_handlers | relay,
+ data_delivery_flow => pos_integer(),
+ deflate_opts => cow_ws:deflate_opts(),
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ idle_timeout => timeout(),
+ max_frame_size => non_neg_integer() | infinity,
+ req_filter => fun((cowboy_req:req()) -> map()),
+ validate_utf8 => boolean()
}
----
@@ -241,6 +243,22 @@ Whether to enable the Websocket frame compression
extension. Frames will only be compressed for the
clients that support this extension.
+data_delivery (stream_handlers)::
+
+HTTP/2+ only. Determines how data will be delivered
+to the Websocket session process. `stream_handlers`
+is the default and makes data go through stream
+handlers. `relay` is a faster method introduced in
+Cowboy 2.14 and sends data directly. `relay` is
+intended to become the default in Cowboy 3.0.
+
+data_delivery_flow (pos_integer())::
+
+When the `relay` data delivery method is used,
+this value may be used to decide how much the
+flow control window should be for the Websocket
+stream. Currently only applies to HTTP/2.
+
deflate_opts (#{})::
Configuration for the permessage-deflate Websocket
@@ -299,6 +317,10 @@ normal circumstances if necessary.
== Changelog
+* *2.14*: The `data_delivery` and `data_delivery_flow` options
+ were added. The `relay` data delivery mechanism
+ provides a better way of forwarding data to
+ HTTP/2+ Websocket session processes.
* *2.13*: The `active_n` default value was changed to `1`.
* *2.13*: The `dynamic_buffer` option was added.
* *2.13*: The `max_frame_size` option can now be set dynamically.
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 0d22fa1..5cb0585 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -82,8 +82,13 @@
-export_type([opts/0]).
-record(stream, {
- %% Whether the stream is currently stopping.
- status = running :: running | stopping,
+ %% Whether the stream is currently in a special state.
+ %%
+ %% - The running state is the normal state of a stream.
+ %% - The relaying state is used by extended CONNECT protocols to
+ %% use a 'relay' data_delivery method.
+ %% - The stopping state indicates the stream used the 'stop' command.
+ status = running :: running | {relaying, non_neg_integer(), pid()} | stopping,
%% Flow requested for this stream.
flow = 0 :: non_neg_integer(),
@@ -327,6 +332,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
before_loop(info(State, StreamID, Msg), Buffer);
+ {'$cowboy_relay_command', {Pid, StreamID}, RelayCommand} when Pid =:= self() ->
+ before_loop(relay_command(State, StreamID, RelayCommand), Buffer);
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
before_loop(down(State, Pid, Msg), Buffer);
@@ -520,6 +527,14 @@ data_frame(State0=#state{opts=Opts, flow=Flow0, streams=Streams}, StreamID, IsFi
reset_stream(State0, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:data/4.'})
end;
+ %% Stream handlers are not used for the data when relaying.
+ #{StreamID := #stream{status={relaying, _, RelayPid}}} ->
+ RelayPid ! {'$cowboy_relay_data', {self(), StreamID}, IsFin, Data},
+ %% We keep a steady flow using the configured flow value.
+ %% Because we do not change the 'flow' value the update_window/2
+ %% function will always maintain this value (of course with
+ %% thresholds applying).
+ update_window(State0, StreamID);
%% We ignore DATA frames for streams that are stopping.
#{} ->
State0
@@ -866,6 +881,26 @@ commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade},
commands(State, StreamID, Tail);
%% Use a different protocol within the stream (CONNECT :protocol).
%% @todo Make sure we error out when the feature is disabled.
+%% There are two data_delivery: stream_handlers and relay.
+%% The former just has the data go through stream handlers
+%% like normal requests. The latter relays data directly.
+%%
+%% @todo When relaying there might be some data that is
+%% in stream handlers and that need to be received,
+%% depending on whether the protocol sends data
+%% before processing the response.
+commands(State0=#state{flow=Flow, streams=Streams}, StreamID,
+ [{switch_protocol, Headers, _Mod, ModState=#{data_delivery := relay}}|Tail]) ->
+ State1 = info(State0, StreamID, {headers, 200, Headers}),
+ #{StreamID := Stream} = Streams,
+ #{data_delivery_pid := RelayPid} = ModState,
+ %% WINDOW_UPDATE frames updating the window will be sent after
+ %% the first DATA frame has been received.
+ RelayFlow = maps:get(data_delivery_flow, ModState, 131072),
+ State = State1#state{flow=Flow + RelayFlow, streams=Streams#{StreamID => Stream#stream{
+ status={relaying, RelayFlow, RelayPid},
+ flow=RelayFlow}}},
+ commands(State, StreamID, Tail);
commands(State0, StreamID, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
State = info(State0, StreamID, {headers, 200, Headers}),
commands(State, StreamID, Tail);
@@ -881,6 +916,26 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
cowboy:log(Log, Opts),
commands(State, StreamID, Tail).
+%% Relay data delivery commands.
+
+relay_command(State, StreamID, DataCmd = {data, _, _}) ->
+ commands(State, StreamID, [DataCmd]);
+%% When going active mode again we set the RelayFlow again
+%% and update the window if necessary.
+relay_command(State=#state{flow=Flow, streams=Streams}, StreamID, active) ->
+ #{StreamID := Stream} = Streams,
+ #stream{status={relaying, RelayFlow, _}} = Stream,
+ update_window(State#state{flow=Flow + RelayFlow,
+ streams=Streams#{StreamID => Stream#stream{flow=RelayFlow}}},
+ StreamID);
+%% When going passive mode we don't update the window
+%% since we have not incremented it.
+relay_command(State=#state{flow=Flow, streams=Streams}, StreamID, passive) ->
+ #{StreamID := Stream} = Streams,
+ #stream{flow=StreamFlow} = Stream,
+ State#state{flow=Flow - StreamFlow,
+ streams=Streams#{StreamID => Stream#stream{flow=0}}}.
+
%% Tentatively update the window after the flow was updated.
update_window(State0=#state{socket=Socket, transport=Transport,
diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl
index 9aa6be5..740b9e3 100644
--- a/src/cowboy_http3.erl
+++ b/src/cowboy_http3.erl
@@ -67,6 +67,7 @@
%% Whether the stream is currently in a special state.
status :: header | {unidi, control | encoder | decoder}
| normal | {data | ignore, non_neg_integer()} | stopping
+ | {relaying, normal | {data, non_neg_integer()}, pid()}
| {webtransport_session, normal | {ignore, non_neg_integer()}}
| {webtransport_stream, cow_http3:stream_id()},
@@ -164,6 +165,8 @@ loop(State0=#state{opts=Opts, children=Children}) ->
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
loop(info(State0, StreamID, Msg));
+ {'$cowboy_relay_command', {Pid, StreamID}, RelayCommand} when Pid =:= self() ->
+ loop(relay_command(State0, StreamID, RelayCommand));
%% WebTransport commands.
{'$webtransport_commands', SessionID, Commands} ->
loop(webtransport_commands(State0, SessionID, Commands));
@@ -299,6 +302,22 @@ parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) ->
parse(frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin),
StreamID, Rest, IsFin)
end;
+%% This clause mirrors the {data, Len} clause.
+parse1(State, Stream=#stream{status={relaying, {data, Len}, RelayPid}, id=StreamID},
+ Data, IsFin) ->
+ DataLen = byte_size(Data),
+ if
+ DataLen < Len ->
+ %% We don't have the full frame but this is the end of the
+ %% data we have. So FrameIsFin is equivalent to IsFin here.
+ loop(frame(State, Stream#stream{status={relaying, {data, Len - DataLen}, RelayPid}},
+ {data, Data}, IsFin));
+ true ->
+ <<Data1:Len/binary, Rest/bits>> = Data,
+ FrameIsFin = is_fin(IsFin, Rest),
+ parse(frame(State, Stream#stream{status={relaying, normal, RelayPid}},
+ {data, Data1}, FrameIsFin), StreamID, Rest, IsFin)
+ end;
parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) ->
DataLen = byte_size(Data),
if
@@ -311,7 +330,7 @@ parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) ->
end;
%% @todo Clause that discards receiving data for stopping streams.
%% We may receive a few more frames after we abort receiving.
-parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
+parse1(State=#state{opts=Opts}, Stream=#stream{status=Status0, id=StreamID}, Data, IsFin) ->
case cow_http3:parse(Data) of
{ok, Frame, Rest} ->
FrameIsFin = is_fin(IsFin, Rest),
@@ -322,6 +341,10 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
{more, Frame = {data, _}, Len} ->
%% We're at the end of the data so FrameIsFin is equivalent to IsFin.
case IsFin of
+ nofin when element(1, Status0) =:= relaying ->
+ %% The stream will be stored at the end of processing commands.
+ Status = setelement(2, Status0, {data, Len}),
+ loop(frame(State, Stream#stream{status=Status}, Frame, nofin));
nofin ->
%% The stream will be stored at the end of processing commands.
loop(frame(State, Stream#stream{status={data, Len}}, Frame, nofin));
@@ -432,6 +455,9 @@ frame(State=#state{http3_machine=HTTP3Machine0},
terminate(State#state{http3_machine=HTTP3Machine}, Error)
end.
+data_frame(State, Stream=#stream{status={relaying, _, RelayPid}, id=StreamID}, IsFin, Data) ->
+ RelayPid ! {'$cowboy_relay_data', {self(), StreamID}, IsFin, Data},
+ stream_store(State, Stream);
data_frame(State=#state{opts=Opts},
Stream=#stream{id=StreamID, state=StreamState0}, IsFin, Data) ->
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
@@ -767,6 +793,18 @@ commands(State0, Stream0=#stream{id=StreamID},
},
%% @todo We must propagate the buffer to capsule handling if any.
commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail);
+%% There are two data_delivery: stream_handlers and relay.
+%% The former just has the data go through stream handlers
+%% like normal requests. The latter relays data directly.
+commands(State0, Stream0=#stream{id=StreamID},
+ [{switch_protocol, Headers, _Mod, ModState=#{data_delivery := relay}}|Tail]) ->
+ State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
+ Stream1 = #stream{status=normal} = stream_get(State, StreamID),
+ #{data_delivery_pid := RelayPid} = ModState,
+ %% We do not set data_delivery_flow because it is managed by quicer
+ %% and we do not have an easy way to modify it.
+ Stream = Stream1#stream{status={relaying, normal, RelayPid}},
+ commands(State, Stream, Tail);
commands(State0, Stream0=#stream{id=StreamID},
[{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
@@ -872,6 +910,20 @@ send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID},
cowboy_quicer:send(Conn, EncoderID, EncData)),
State.
+%% Relay data delivery commands.
+
+relay_command(State, StreamID, DataCmd = {data, _, _}) ->
+ Stream = stream_get(State, StreamID),
+ commands(State, Stream, [DataCmd]);
+relay_command(State=#state{conn=Conn}, StreamID, active) ->
+ ok = maybe_socket_error(State,
+ cowboy_quicer:setopt(Conn, StreamID, active, true)),
+ State;
+relay_command(State=#state{conn=Conn}, StreamID, passive) ->
+ ok = maybe_socket_error(State,
+ cowboy_quicer:setopt(Conn, StreamID, active, false)),
+ State.
+
%% We mark the stream as being a WebTransport stream
%% and then continue parsing the data as a WebTransport
%% stream. This function is common for incoming unidi
diff --git a/src/cowboy_quicer.erl b/src/cowboy_quicer.erl
index aa52fae..d2f5433 100644
--- a/src/cowboy_quicer.erl
+++ b/src/cowboy_quicer.erl
@@ -25,6 +25,7 @@
%% Streams.
-export([start_bidi_stream/2]).
-export([start_unidi_stream/2]).
+-export([setopt/4]).
-export([send/3]).
-export([send/4]).
-export([send_datagram/2]).
@@ -54,6 +55,9 @@ start_bidi_stream(_, _) -> no_quicer().
-spec start_unidi_stream(_, _) -> no_return().
start_unidi_stream(_, _) -> no_quicer().
+-spec setopt(_, _, _, _) -> no_return().
+setopt(_, _, _, _) -> no_quicer().
+
-spec send(_, _, _) -> no_return().
send(_, _, _) -> no_quicer().
@@ -109,7 +113,7 @@ sockname(Conn) ->
| {error, any()}.
peercert(Conn) ->
- quicer_nif:peercert(Conn).
+ quicer_nif:peercert(Conn).
-spec shutdown(quicer_connection_handle(), quicer_app_errno())
-> ok | {error, any()}.
@@ -154,6 +158,13 @@ start_stream(Conn, InitialData, OpenFlag) ->
Error
end.
+-spec setopt(quicer_connection_handle(), cow_http3:stream_id(), active, boolean())
+ -> ok | {error, any()}.
+
+setopt(_Conn, StreamID, active, Value) ->
+ StreamRef = get({quicer_stream, StreamID}),
+ quicer:setopt(StreamRef, active, Value).
+
-spec send(quicer_connection_handle(), cow_http3:stream_id(), iodata())
-> ok | {error, any()}.
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index cb30c3f..65289cd 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -68,6 +68,8 @@
-type opts() :: #{
active_n => pos_integer(),
compress => boolean(),
+ data_delivery => stream_handlers | relay,
+ data_delivery_flow => pos_integer(),
deflate_opts => cow_ws:deflate_opts(),
dynamic_buffer => false | {pos_integer(), pos_integer()},
dynamic_buffer_initial_average => non_neg_integer(),
@@ -91,7 +93,7 @@
parent :: undefined | pid(),
ref :: ranch:ref(),
socket = undefined :: inet:socket() | {pid(), cowboy_stream:streamid()} | undefined,
- transport = undefined :: module() | undefined,
+ transport :: module() | {data_delivery, stream_handlers | relay},
opts = #{} :: opts(),
active = true :: boolean(),
handler :: module(),
@@ -149,7 +151,7 @@ upgrade(Req, Env, Handler, HandlerState) ->
%% @todo Immediately crash if a response has already been sent.
upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) ->
FilteredReq = case maps:get(req_filter, Opts, undefined) of
- undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req0);
+ undefined -> maps:with([method, version, scheme, host, port, path, qs, peer, streamid], Req0);
FilterFun -> FilterFun(Req0)
end,
Utf8State = case maps:get(validate_utf8, Opts, true) of
@@ -273,12 +275,27 @@ websocket_handshake(State=#state{key=Key},
%% For HTTP/2 we do not let the process die, we instead keep it
%% for the Websocket stream. This is because in HTTP/2 we only
%% have a stream, it doesn't take over the whole connection.
-websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
+%%
+%% There are two methods of delivering data to the Websocket session:
+%% - 'stream_handlers' is the default and makes the data go
+%% through stream handlers just like when reading a request body;
+%% - 'relay' is a new method where data is sent as a message as
+%% soon as it is received from the socket in a DATA frame.
+websocket_handshake(State=#state{opts=Opts},
+ Req=#{ref := Ref, pid := Pid, streamid := StreamID},
HandlerState, _Env) ->
%% @todo We don't want date and server headers.
Headers = cowboy_req:response_headers(#{}, Req),
- Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}},
- takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>,
+ DataDelivery = maps:get(data_delivery, Opts, stream_handlers),
+ ModState = #{
+ data_delivery => DataDelivery,
+ %% For relay data_delivery. The flow is a hint and may
+ %% not be used by the underlying protocol.
+ data_delivery_pid => self(),
+ data_delivery_flow => maps:get(data_delivery_flow, Opts, 131072)
+ },
+ Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, ModState}},
+ takeover(Pid, Ref, {Pid, StreamID}, {data_delivery, DataDelivery}, #{}, <<>>,
{State, HandlerState}).
%% Connection process.
@@ -301,7 +318,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
-type parse_state() :: #ps_header{} | #ps_payload{}.
-spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()},
- module() | undefined, any(), binary(),
+ module() | {data_delivery, stream_handlers | relay}, any(), binary(),
{#state{}, any()}) -> no_return().
takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
{State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) ->
@@ -311,7 +328,7 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
_ -> ranch:remove_connection(Ref)
end,
Messages = case Transport of
- undefined -> undefined;
+ {data_delivery, _} -> undefined;
_ -> Transport:messages()
end,
State = set_idle_timeout(State0#state{parent=Parent,
@@ -355,13 +372,14 @@ after_init(State, HandlerState, ParseState) ->
%% immediately but there might still be data to be processed in
%% the message queue.
-setopts_active(#state{transport=undefined}) ->
+setopts_active(#state{transport={data_delivery, _}}) ->
ok;
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
-maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
+maybe_read_body(#state{transport={data_delivery, stream_handlers},
+ socket=Stream={Pid, _}, active=true}) ->
%% @todo Keep Ref around.
ReadBodyRef = make_ref(),
Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}},
@@ -369,16 +387,25 @@ maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}
maybe_read_body(_) ->
ok.
-active(State) ->
+active(State=#state{transport={data_delivery, relay},
+ socket=Stream={Pid, _}}) ->
+ Pid ! {'$cowboy_relay_command', Stream, active},
+ State#state{active=true};
+active(State0) ->
+ State = State0#state{active=true},
setopts_active(State),
maybe_read_body(State),
- State#state{active=true}.
+ State.
-passive(State=#state{transport=undefined}) ->
+passive(State=#state{transport={data_delivery, stream_handlers}}) ->
%% Unfortunately we cannot currently cancel read_body.
%% But that's OK, we will just stop reading the body
%% after the next message.
State#state{active=false};
+passive(State=#state{transport={data_delivery, relay},
+ socket=Stream={Pid, _}}) ->
+ Pid ! {'$cowboy_relay_command', Stream, passive},
+ State#state{active=false};
passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) ->
Transport:setopts(Socket, [{active, false}]),
flush_passive(Socket, Messages),
@@ -454,6 +481,10 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
{request_body, _Ref, fin, _, Data} ->
maybe_read_body(State),
parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
+ %% @todo It would be better to check StreamID.
+ %% @todo We must ensure that IsFin=fin is handled like a socket close?
+ {'$cowboy_relay_data', {Pid, _StreamID}, _IsFin, Data} when Pid =:= Parent ->
+ parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
%% Timeouts.
{timeout, TRef, ?MODULE} ->
tick_idle_timeout(State, HandlerState, ParseState);
@@ -662,9 +693,14 @@ commands([Frame|Tail], State, Data0) ->
commands(Tail, State, Data)
end.
-transport_send(#state{socket=Stream={Pid, _}, transport=undefined}, IsFin, Data) ->
+transport_send(#state{transport={data_delivery, stream_handlers},
+ socket=Stream={Pid, _}}, IsFin, Data) ->
Pid ! {Stream, {data, IsFin, Data}},
ok;
+transport_send(#state{transport={data_delivery, relay},
+ socket=Stream={Pid, _}}, IsFin, Data) ->
+ Pid ! {'$cowboy_relay_command', Stream, {data, IsFin, Data}},
+ ok;
transport_send(#state{socket=Socket, transport=Transport}, _, Data) ->
Transport:send(Socket, Data).
diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl
index 8c8ca39..7bcac55 100644
--- a/src/cowboy_webtransport.erl
+++ b/src/cowboy_webtransport.erl
@@ -277,6 +277,7 @@ handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) ->
%% callback is not implemented.
%%
%% @todo Better type than map() for the cowboy_stream state.
+%% @todo Is this really useful?
-spec info(cowboy_stream:streamid(), any(), State)
-> {cowboy_stream:commands(), State} when State::map().
diff --git a/test/handlers/ws_active_commands_h.erl b/test/handlers/ws_active_commands_h.erl
index 1c615e3..bbf8907 100644
--- a/test/handlers/ws_active_commands_h.erl
+++ b/test/handlers/ws_active_commands_h.erl
@@ -9,8 +9,8 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req, RunOrHibernate) ->
- {cowboy_websocket, Req, RunOrHibernate}.
+init(Req, Opts) ->
+ {cowboy_websocket, Req, maps:get(run_or_hibernate, Opts), Opts}.
websocket_init(State=run) ->
erlang:send_after(1500, self(), active_true),
diff --git a/test/handlers/ws_deflate_commands_h.erl b/test/handlers/ws_deflate_commands_h.erl
index 14236bc..be3e16e 100644
--- a/test/handlers/ws_deflate_commands_h.erl
+++ b/test/handlers/ws_deflate_commands_h.erl
@@ -8,10 +8,11 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req, RunOrHibernate) ->
+init(Req, Opts) ->
+ DataDelivery = maps:get(data_delivery, Opts, stream_handlers),
{cowboy_websocket, Req,
- #{deflate => true, hibernate => RunOrHibernate},
- #{compress => true}}.
+ #{deflate => true, hibernate => maps:get(run_or_hibernate, Opts)},
+ #{compress => true, data_delivery => DataDelivery}}.
websocket_handle(Frame, State=#{deflate := Deflate0, hibernate := run}) ->
Deflate = not Deflate0,
diff --git a/test/handlers/ws_handle_commands_h.erl b/test/handlers/ws_handle_commands_h.erl
index da3ffad..0a8513b 100644
--- a/test/handlers/ws_handle_commands_h.erl
+++ b/test/handlers/ws_handle_commands_h.erl
@@ -9,14 +9,20 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req=#{pid := Pid}, RunOrHibernate) ->
+init(Req, Opts) ->
Commands0 = cowboy_req:header(<<"x-commands">>, Req),
Commands = binary_to_term(base64:decode(Commands0)),
case Commands of
- bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
- _ -> ok
+ bad ->
+ Pid = case Req of
+ #{version := 'HTTP/2'} -> self();
+ #{pid := Pid0} -> Pid0
+ end,
+ ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
+ _ ->
+ ok
end,
- {cowboy_websocket, Req, {Commands, RunOrHibernate}}.
+ {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}.
websocket_init(State) ->
{[], State}.
diff --git a/test/handlers/ws_ignore.erl b/test/handlers/ws_ignore.erl
index 9fe3322..37595aa 100644
--- a/test/handlers/ws_ignore.erl
+++ b/test/handlers/ws_ignore.erl
@@ -8,6 +8,7 @@
init(Req, _) ->
{cowboy_websocket, Req, undefined, #{
+ data_delivery => relay,
compress => true
}}.
diff --git a/test/handlers/ws_info_commands_h.erl b/test/handlers/ws_info_commands_h.erl
index d596473..2115727 100644
--- a/test/handlers/ws_info_commands_h.erl
+++ b/test/handlers/ws_info_commands_h.erl
@@ -10,14 +10,20 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req=#{pid := Pid}, RunOrHibernate) ->
+init(Req, Opts) ->
Commands0 = cowboy_req:header(<<"x-commands">>, Req),
Commands = binary_to_term(base64:decode(Commands0)),
case Commands of
- bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
- _ -> ok
+ bad ->
+ Pid = case Req of
+ #{version := 'HTTP/2'} -> self();
+ #{pid := Pid0} -> Pid0
+ end,
+ ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
+ _ ->
+ ok
end,
- {cowboy_websocket, Req, {Commands, RunOrHibernate}}.
+ {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}.
websocket_init(State) ->
self() ! shoot,
diff --git a/test/handlers/ws_init_commands_h.erl b/test/handlers/ws_init_commands_h.erl
index 8bae352..d821b18 100644
--- a/test/handlers/ws_init_commands_h.erl
+++ b/test/handlers/ws_init_commands_h.erl
@@ -9,14 +9,20 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req=#{pid := Pid}, RunOrHibernate) ->
+init(Req, Opts) ->
Commands0 = cowboy_req:header(<<"x-commands">>, Req),
Commands = binary_to_term(base64:decode(Commands0)),
case Commands of
- bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
- _ -> ok
+ bad ->
+ Pid = case Req of
+ #{version := 'HTTP/2'} -> self();
+ #{pid := Pid0} -> Pid0
+ end,
+ ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
+ _ ->
+ ok
end,
- {cowboy_websocket, Req, {Commands, RunOrHibernate}}.
+ {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}.
websocket_init(State={Commands, run}) ->
{Commands, State};
diff --git a/test/handlers/ws_init_h.erl b/test/handlers/ws_init_h.erl
index bbe9ef9..c44986f 100644
--- a/test/handlers/ws_init_h.erl
+++ b/test/handlers/ws_init_h.erl
@@ -8,9 +8,9 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req, _) ->
+init(Req, Opts) ->
State = binary_to_atom(cowboy_req:qs(Req), latin1),
- {cowboy_websocket, Req, State}.
+ {cowboy_websocket, Req, State, Opts}.
%% Sleep to make sure the HTTP response was sent.
websocket_init(State) ->
diff --git a/test/handlers/ws_set_options_commands_h.erl b/test/handlers/ws_set_options_commands_h.erl
index 1ab0af4..af768d6 100644
--- a/test/handlers/ws_set_options_commands_h.erl
+++ b/test/handlers/ws_set_options_commands_h.erl
@@ -7,9 +7,10 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req, RunOrHibernate) ->
- {cowboy_websocket, Req, RunOrHibernate,
- #{idle_timeout => infinity}}.
+init(Req, Opts) ->
+ DataDelivery = maps:get(data_delivery, Opts, stream_handlers),
+ {cowboy_websocket, Req, maps:get(run_or_hibernate, Opts),
+ #{idle_timeout => infinity, data_delivery => DataDelivery}}.
%% Set the idle_timeout option dynamically.
websocket_handle({text, <<"idle_timeout_short">>}, State=run) ->
diff --git a/test/handlers/ws_shutdown_reason_commands_h.erl b/test/handlers/ws_shutdown_reason_commands_h.erl
index 90b435c..878b70a 100644
--- a/test/handlers/ws_shutdown_reason_commands_h.erl
+++ b/test/handlers/ws_shutdown_reason_commands_h.erl
@@ -10,9 +10,9 @@
-export([websocket_handle/2]).
-export([websocket_info/2]).
-init(Req, RunOrHibernate) ->
+init(Req, Opts) ->
TestPid = list_to_pid(binary_to_list(cowboy_req:header(<<"x-test-pid">>, Req))),
- {cowboy_websocket, Req, {TestPid, RunOrHibernate}}.
+ {cowboy_websocket, Req, {TestPid, maps:get(run_or_hibernate, Opts)}, Opts}.
websocket_init(State={TestPid, RunOrHibernate}) ->
TestPid ! {ws_pid, self()},
diff --git a/test/rfc7231_SUITE.erl b/test/rfc7231_SUITE.erl
index 183fa0f..a42ca3f 100644
--- a/test/rfc7231_SUITE.erl
+++ b/test/rfc7231_SUITE.erl
@@ -44,7 +44,7 @@ init_dispatch(_) ->
{"/echo/:key", echo_h, []},
{"/delay/echo/:key", echo_h, []},
{"/resp/:key[/:arg]", resp_h, []},
- {"/ws", ws_init_h, []}
+ {"/ws", ws_init_h, #{}}
]}]).
%% @todo The documentation should list what methods, headers and status codes
diff --git a/test/rfc8441_SUITE.erl b/test/rfc8441_SUITE.erl
index b788f9f..efa753c 100644
--- a/test/rfc8441_SUITE.erl
+++ b/test/rfc8441_SUITE.erl
@@ -396,6 +396,9 @@ accept_handshake_when_enabled(Config) ->
MaskedHello = ws_SUITE:do_mask(<<"Hello">>, Mask, <<>>),
ok = gen_tcp:send(Socket, cow_http2:data(1, nofin,
<<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedHello/binary>>)),
+ %% Ignore expected WINDOW_UPDATE frames.
+ {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000),
+ {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000),
{ok, <<Len2:24, _:8, _:8, _:32>>} = gen_tcp:recv(Socket, 9, 1000),
{ok, <<1:1, 0:3, 1:4, 0:1, 5:7, "Hello">>} = gen_tcp:recv(Socket, Len2, 1000),
ok.
diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl
index 6fa4e61..2f1d3e2 100644
--- a/test/ws_SUITE.erl
+++ b/test/ws_SUITE.erl
@@ -42,7 +42,7 @@ init_dispatch() ->
{"localhost", [
{"/ws_echo", ws_echo, []},
{"/ws_echo_timer", ws_echo_timer, []},
- {"/ws_init", ws_init_h, []},
+ {"/ws_init", ws_init_h, #{}},
{"/ws_init_shutdown", ws_init_shutdown, []},
{"/ws_send_many", ws_send_many, [
{sequence, [
diff --git a/test/ws_SUITE_data/ws_echo.erl b/test/ws_SUITE_data/ws_echo.erl
index efdc204..1f3a4be 100644
--- a/test/ws_SUITE_data/ws_echo.erl
+++ b/test/ws_SUITE_data/ws_echo.erl
@@ -8,6 +8,7 @@
init(Req, _) ->
{cowboy_websocket, Req, undefined, #{
+ data_delivery => relay,
compress => true
}}.
diff --git a/test/ws_handler_SUITE.erl b/test/ws_handler_SUITE.erl
index ab9dbe2..00c584d 100644
--- a/test/ws_handler_SUITE.erl
+++ b/test/ws_handler_SUITE.erl
@@ -19,21 +19,40 @@
-import(ct_helper, [config/2]).
-import(ct_helper, [doc/1]).
-import(cowboy_test, [gun_open/1]).
+-import(cowboy_test, [gun_open/2]).
-import(cowboy_test, [gun_down/1]).
%% ct.
all() ->
- [{group, ws}, {group, ws_hibernate}].
+ [
+ {group, h1},
+ {group, h1_hibernate},
+ {group, h2},
+ {group, h2_relay}
+ ].
%% @todo Test against HTTP/2 too.
groups() ->
AllTests = ct_helper:all(?MODULE),
- [{ws, [parallel], AllTests}, {ws_hibernate, [parallel], AllTests}].
-
-init_per_group(Name, Config) ->
+ [
+ {h1, [parallel], AllTests},
+ {h1_hibernate, [parallel], AllTests},
+ %% The websocket_deflate_false test isn't compatible with HTTP/2.
+ {h2, [parallel], AllTests -- [websocket_deflate_false]},
+ {h2_relay, [parallel], AllTests -- [websocket_deflate_false]}
+ ].
+
+init_per_group(Name, Config)
+ when Name =:= h1; Name =:= h1_hibernate ->
cowboy_test:init_http(Name, #{
env => #{dispatch => init_dispatch(Name)}
+ }, Config);
+init_per_group(Name, Config)
+ when Name =:= h2; Name =:= h2_relay ->
+ cowboy_test:init_http2(Name, #{
+ enable_connect_protocol => true,
+ env => #{dispatch => init_dispatch(Name)}
}, Config).
end_per_group(Name, _) ->
@@ -42,25 +61,27 @@ end_per_group(Name, _) ->
%% Dispatch configuration.
init_dispatch(Name) ->
- RunOrHibernate = case Name of
- ws -> run;
- ws_hibernate -> hibernate
+ InitialState = case Name of
+ h1_hibernate -> #{run_or_hibernate => hibernate};
+ h2_relay -> #{run_or_hibernate => run, data_delivery => relay};
+ _ -> #{run_or_hibernate => run}
end,
cowboy_router:compile([{'_', [
- {"/init", ws_init_commands_h, RunOrHibernate},
- {"/handle", ws_handle_commands_h, RunOrHibernate},
- {"/info", ws_info_commands_h, RunOrHibernate},
- {"/trap_exit", ws_init_h, RunOrHibernate},
- {"/active", ws_active_commands_h, RunOrHibernate},
- {"/deflate", ws_deflate_commands_h, RunOrHibernate},
- {"/set_options", ws_set_options_commands_h, RunOrHibernate},
- {"/shutdown_reason", ws_shutdown_reason_commands_h, RunOrHibernate}
+ {"/init", ws_init_commands_h, InitialState},
+ {"/handle", ws_handle_commands_h, InitialState},
+ {"/info", ws_info_commands_h, InitialState},
+ {"/trap_exit", ws_init_h, InitialState},
+ {"/active", ws_active_commands_h, InitialState},
+ {"/deflate", ws_deflate_commands_h, InitialState},
+ {"/set_options", ws_set_options_commands_h, InitialState},
+ {"/shutdown_reason", ws_shutdown_reason_commands_h, InitialState}
]}]).
%% Support functions for testing using Gun.
gun_open_ws(Config, Path, Commands) ->
- ConnPid = gun_open(Config),
+ ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}),
+ do_await_enable_connect_protocol(config(protocol, Config), ConnPid),
StreamRef = gun:ws_upgrade(ConnPid, Path, [
{<<"x-commands">>, base64:encode(term_to_binary(Commands))}
]),
@@ -75,6 +96,13 @@ gun_open_ws(Config, Path, Commands) ->
error(timeout)
end.
+do_await_enable_connect_protocol(http, _) ->
+ ok;
+do_await_enable_connect_protocol(http2, ConnPid) ->
+ {notify, settings_changed, #{enable_connect_protocol := true}}
+ = gun:await(ConnPid, undefined), %% @todo Maybe have a gun:await/1?
+ ok.
+
receive_ws(ConnPid, StreamRef) ->
receive
{gun_ws, ConnPid, StreamRef, Frame} ->
@@ -123,7 +151,14 @@ websocket_info_invalid(Config) ->
do_invalid(Config, Path) ->
{ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, bad),
ensure_handle_is_called(ConnPid, StreamRef, Path),
- gun_down(ConnPid).
+ case config(protocol, Config) of
+ %% HTTP/1.1 closes the connection.
+ http -> gun_down(ConnPid);
+ %% HTTP/2 terminates the stream.
+ http2 ->
+ receive {gun_error, ConnPid, StreamRef, {stream_error, internal_error, _}} -> ok
+ after 500 -> error(timeout) end
+ end.
websocket_init_one_frame(Config) ->
doc("A single frame is received when websocket_init/1 returns it as a command."),
@@ -223,8 +258,12 @@ websocket_active_false(Config) ->
doc("The {active, false} command stops receiving data from the socket. "
"The {active, true} command reenables it."),
{ok, ConnPid, StreamRef} = gun_open_ws(Config, "/active", []),
+ %% We must exhaust the HTTP/2 flow control window
+ %% otherwise the frame will be received even if active mode is disabled.
+ gun:ws_send(ConnPid, StreamRef, {binary, <<0:100000/unit:8>>}),
gun:ws_send(ConnPid, StreamRef, {text, <<"Not received until the handler enables active again.">>}),
{error, timeout} = receive_ws(ConnPid, StreamRef),
+ {ok, {binary, _}} = receive_ws(ConnPid, StreamRef),
{ok, {text, <<"Not received until the handler enables active again.">>}}
= receive_ws(ConnPid, StreamRef),
ok.
@@ -271,7 +310,8 @@ websocket_deflate_ignore_if_not_negotiated(Config) ->
websocket_set_options_idle_timeout(Config) ->
doc("The idle_timeout option can be modified using the "
"command {set_options, Opts} at runtime."),
- ConnPid = gun_open(Config),
+ ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}),
+ do_await_enable_connect_protocol(config(protocol, Config), ConnPid),
StreamRef = gun:ws_upgrade(ConnPid, "/set_options"),
receive
{gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} ->
@@ -299,7 +339,8 @@ websocket_set_options_idle_timeout(Config) ->
websocket_set_options_max_frame_size(Config) ->
doc("The max_frame_size option can be modified using the "
"command {set_options, Opts} at runtime."),
- ConnPid = gun_open(Config),
+ ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}),
+ do_await_enable_connect_protocol(config(protocol, Config), ConnPid),
StreamRef = gun:ws_upgrade(ConnPid, "/set_options"),
receive
{gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} ->
@@ -334,7 +375,8 @@ websocket_set_options_max_frame_size(Config) ->
websocket_shutdown_reason(Config) ->
doc("The command {shutdown_reason, any()} can be used to "
"change the shutdown reason of a Websocket connection."),
- ConnPid = gun_open(Config),
+ ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}),
+ do_await_enable_connect_protocol(config(protocol, Config), ConnPid),
StreamRef = gun:ws_upgrade(ConnPid, "/shutdown_reason", [
{<<"x-test-pid">>, pid_to_list(self())}
]),
diff --git a/test/ws_perf_SUITE.erl b/test/ws_perf_SUITE.erl
index ff88554..98b4e18 100644
--- a/test/ws_perf_SUITE.erl
+++ b/test/ws_perf_SUITE.erl
@@ -55,14 +55,15 @@ init_per_group(Name, Config) when Name =:= h2c; Name =:= h2c_compress ->
h2c_compress -> {compress, #{stream_handlers => [cowboy_compress_h, cowboy_stream_h]}}
end,
Config1 = cowboy_test:init_http(Name, Opts#{
- connection_window_margin_size => 64*1024,
+ %% The margin sizes must be larger than the larger test message for plain sockets.
+ connection_window_margin_size => 128*1024,
enable_connect_protocol => true,
env => #{dispatch => init_dispatch(Config)},
max_frame_size_sent => 64*1024,
max_frame_size_received => 16384 * 1024 - 1,
max_received_frame_rate => {10_000_000, 1},
stream_window_data_threshold => 1024,
- stream_window_margin_size => 64*1024
+ stream_window_margin_size => 128*1024
}, [{flavor, Flavor}|Config]),
lists:keyreplace(protocol, 1, Config1, {protocol, http2});
init_per_group(ascii, Config) ->
@@ -265,13 +266,14 @@ send_N_16384B(Config) ->
do_send(Config, What, Num, FrameSize) ->
{ok, ConnPid, StreamRef} = do_gun_open_ws("/ws_ignore", Config),
+ %% Prepare the frame data.
FrameType = config(frame_type, Config),
FrameData = case FrameType of
text -> do_text_data(Config, FrameSize);
binary -> rand:bytes(FrameSize)
end,
%% Heat up the processes before doing the real run.
-% do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData),
+% do_send_loop(Socket, Num, FrameType, Mask, MaskedFrameData),
{Time, _} = timer:tc(?MODULE, do_send_loop, [ConnPid, StreamRef, Num, FrameType, FrameData]),
do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]),
gun:ws_send(ConnPid, StreamRef, close),
@@ -279,12 +281,181 @@ do_send(Config, What, Num, FrameSize) ->
gun_down(ConnPid).
do_send_loop(ConnPid, StreamRef, 0, _, _) ->
- gun:ws_send(ConnPid, StreamRef, {text, <<"CHECK">>}),
- {ok, {text, <<"CHECK">>}} = receive_ws(ConnPid, StreamRef),
- ok;
+ gun:ws_send(ConnPid, StreamRef, {text, <<"CHECK">>}),
+ {ok, {text, <<"CHECK">>}} = receive_ws(ConnPid, StreamRef),
+ ok;
do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData) ->
- gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}),
- do_send_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData).
+ gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}),
+ do_send_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData).
+
+tcp_send_N_00000B(Config) ->
+ doc("Send a 0B frame 10000 times."),
+ do_tcp_send(Config, tcps_N, 10000, 0).
+
+tcp_send_N_00256B(Config) ->
+ doc("Send a 256B frame 10000 times."),
+ do_tcp_send(Config, tcps_N, 10000, 256).
+
+tcp_send_N_01024B(Config) ->
+ doc("Send a 1024B frame 10000 times."),
+ do_tcp_send(Config, tcps_N, 10000, 1024).
+
+tcp_send_N_04096B(Config) ->
+ doc("Send a 4096B frame 10000 times."),
+ do_tcp_send(Config, tcps_N, 10000, 4096).
+
+tcp_send_N_16384B(Config) ->
+ doc("Send a 16384B frame 10000 times."),
+ do_tcp_send(Config, tcps_N, 10000, 16384).
+
+do_tcp_send(Config, What, Num, FrameSize) ->
+ {ok, Socket} = do_tcp_handshake(Config, #{}),
+ %% Prepare the frame data.
+ FrameType = config(frame_type, Config),
+ FrameData = case FrameType of
+ text -> do_text_data(Config, FrameSize);
+ binary -> rand:bytes(FrameSize)
+ end,
+ %% Mask the data outside the benchmark to avoid influencing the results.
+ Mask = 16#37fa213d,
+ MaskedFrameData = ws_SUITE:do_mask(FrameData, Mask, <<>>),
+ FrameSizeWithHeader = FrameSize + case FrameSize of
+ N when N =< 125 -> 6;
+ N when N =< 16#ffff -> 8;
+ N when N =< 16#7fffffffffffffff -> 14
+ end,
+ %% Run the benchmark; different function for h1 and h2.
+ {Time, _} = case config(protocol, Config) of
+ http -> timer:tc(?MODULE, do_tcp_send_loop_h1,
+ [Socket, Num, FrameType, Mask, MaskedFrameData]);
+ http2 -> timer:tc(?MODULE, do_tcp_send_loop_h2,
+ [Socket, 65535, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader])
+ end,
+ do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]),
+ gen_tcp:close(Socket).
+
+%% Do a prior knowledge handshake.
+do_tcp_handshake(Config, LocalSettings) ->
+ Protocol = config(protocol, Config),
+ Socket1 = case Protocol of
+ http ->
+ {ok, Socket, _} = ws_SUITE:do_handshake(<<"/ws_ignore">>, Config),
+ Socket;
+ http2 ->
+ {ok, Socket} = gen_tcp:connect("localhost", config(port, Config), [binary, {active, false}]),
+ %% Send a valid preface.
+ ok = gen_tcp:send(Socket, [
+ "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n",
+ cow_http2:settings(LocalSettings)
+ ]),
+ %% Receive the server preface.
+ {ok, << Len:24 >>} = gen_tcp:recv(Socket, 3, 1000),
+ {ok, << 4:8, 0:40, SettingsPayload:Len/binary >>} = gen_tcp:recv(Socket, 6 + Len, 1000),
+ RemoteSettings = cow_http2:parse_settings_payload(SettingsPayload),
+ #{enable_connect_protocol := true} = RemoteSettings,
+ %% Send the SETTINGS ack.
+ ok = gen_tcp:send(Socket, cow_http2:settings_ack()),
+ %% Receive the SETTINGS ack.
+ {ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000),
+ %% Send a CONNECT :protocol request to upgrade the stream to Websocket.
+ {ReqHeadersBlock, _} = cow_hpack:encode([
+ {<<":method">>, <<"CONNECT">>},
+ {<<":protocol">>, <<"websocket">>},
+ {<<":scheme">>, <<"http">>},
+ {<<":path">>, <<"/ws_ignore">>},
+ {<<":authority">>, <<"localhost">>}, %% @todo Correct port number.
+ {<<"sec-websocket-version">>, <<"13">>},
+ {<<"origin">>, <<"http://localhost">>}
+ ]),
+ ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)),
+ %% Receive a 200 response.
+ {ok, << Len1:24, 1:8, _:8, 1:32 >>} = gen_tcp:recv(Socket, 9, 1000),
+ {ok, RespHeadersBlock} = gen_tcp:recv(Socket, Len1, 1000),
+ {RespHeaders, _} = cow_hpack:decode(RespHeadersBlock),
+ {_, <<"200">>} = lists:keyfind(<<":status">>, 1, RespHeaders),
+ Socket
+ end,
+ %% Enable active mode to avoid delays in receiving data at the end of benchmark.
+ ok = inet:setopts(Socket1, [{active, true}]),
+ %% Stream number 1 is ready.
+ {ok, Socket1}.
+
+do_tcp_send_loop_h1(Socket, 0, _, Mask, _) ->
+ MaskedFrameData = ws_SUITE:do_mask(<<"CHECK">>, Mask, <<>>),
+ ok = gen_tcp:send(Socket,
+ <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedFrameData/binary>>),
+ do_tcp_wait_for_check(Socket);
+do_tcp_send_loop_h1(Socket, Num, FrameType, Mask, MaskedFrameData) ->
+ Len = byte_size(MaskedFrameData),
+ LenBits = case Len of
+ N when N =< 125 -> << N:7 >>;
+ N when N =< 16#ffff -> << 126:7, N:16 >>;
+ N when N =< 16#7fffffffffffffff -> << 127:7, N:64 >>
+ end,
+ FrameHeader = <<1:1, 0:3, 2:4, 1:1, LenBits/bits, Mask:32>>,
+ ok = gen_tcp:send(Socket, [
+ FrameHeader,
+ MaskedFrameData
+ ]),
+ do_tcp_send_loop_h1(Socket, Num - 1, FrameType, Mask, MaskedFrameData).
+
+do_tcp_send_loop_h2(Socket, _, 0, _, Mask, _, _) ->
+ MaskedFrameData = ws_SUITE:do_mask(<<"CHECK">>, Mask, <<>>),
+ ok = gen_tcp:send(Socket, cow_http2:data(1, nofin,
+ <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedFrameData/binary>>)),
+ do_tcp_wait_for_check(Socket);
+do_tcp_send_loop_h2(Socket, Window0, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader)
+ when Window0 < FrameSizeWithHeader ->
+ %% The remaining window isn't large enough so
+ %% we wait for WINDOW_UPDATE frames.
+ Window = do_tcp_wait_window_updates(Socket, Window0),
+ do_tcp_send_loop_h2(Socket, Window, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader);
+do_tcp_send_loop_h2(Socket, Window, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader) ->
+ Len = byte_size(MaskedFrameData),
+ LenBits = case Len of
+ N when N =< 125 -> << N:7 >>;
+ N when N =< 16#ffff -> << 126:7, N:16 >>;
+ N when N =< 16#7fffffffffffffff -> << 127:7, N:64 >>
+ end,
+ FrameHeader = <<1:1, 0:3, 2:4, 1:1, LenBits/bits, Mask:32>>,
+ ok = gen_tcp:send(Socket, cow_http2:data(1, nofin, [
+ FrameHeader,
+ MaskedFrameData
+ ])),
+ do_tcp_send_loop_h2(Socket, Window - FrameSizeWithHeader,
+ Num - 1, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader).
+
+do_tcp_wait_window_updates(Socket, Window) ->
+ receive
+ {tcp, Socket, Data} ->
+ do_tcp_wait_window_updates_parse(Socket, Window, Data)
+ after 0 ->
+ Window
+ end.
+
+do_tcp_wait_window_updates_parse(Socket, Window, Data) ->
+ case Data of
+ %% Ignore the connection-wide WINDOW_UPDATE.
+ <<4:24, 8:8, 0:8, 0:32, 0:1, _:31, Rest/bits>> ->
+ do_tcp_wait_window_updates_parse(Socket, Window, Rest);
+ %% Use the stream-specific WINDOW_UPDATE to increment our window.
+ <<4:24, 8:8, 0:8, 1:32, 0:1, Inc:31, Rest/bits>> ->
+ do_tcp_wait_window_updates_parse(Socket, Window + Inc, Rest);
+ %% Other frames are not expected.
+ <<>> ->
+ do_tcp_wait_window_updates(Socket, Window)
+ end.
+
+do_tcp_wait_for_check(Socket) ->
+ receive
+ {tcp, Socket, Data} ->
+ case binary:match(Data, <<"CHECK">>) of
+ nomatch -> do_tcp_wait_for_check(Socket);
+ _ -> ok
+ end
+ after 5000 ->
+ error(timeout)
+ end.
%% Internal.