aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/src/manual/cowboy_http.asciidoc16
-rw-r--r--doc/src/manual/cowboy_http2.asciidoc16
-rw-r--r--doc/src/manual/cowboy_websocket.asciidoc16
-rw-r--r--src/cowboy.erl43
-rw-r--r--src/cowboy_dynamic_buffer.hrl80
-rw-r--r--src/cowboy_http.erl47
-rw-r--r--src/cowboy_http2.erl21
-rw-r--r--src/cowboy_websocket.erl31
-rw-r--r--test/handlers/read_body_h.erl15
-rw-r--r--test/handlers/ws_ignore.erl20
-rw-r--r--test/http_perf_SUITE.erl95
-rw-r--r--test/ws_perf_SUITE.erl134
12 files changed, 450 insertions, 84 deletions
diff --git a/doc/src/manual/cowboy_http.asciidoc b/doc/src/manual/cowboy_http.asciidoc
index 58f0435..31e2d37 100644
--- a/doc/src/manual/cowboy_http.asciidoc
+++ b/doc/src/manual/cowboy_http.asciidoc
@@ -20,6 +20,7 @@ opts() :: #{
active_n => pos_integer(),
chunked => boolean(),
connection_type => worker | supervisor,
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
http10_keepalive => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
@@ -53,7 +54,7 @@ Ranch functions `ranch:get_protocol_options/1` and
The default value is given next to the option name:
-active_n (100)::
+active_n (1)::
The number of packets Cowboy will request from the socket at once.
This can be used to tweak the performance of the server. Higher
@@ -75,6 +76,17 @@ connection_type (supervisor)::
Whether the connection process also acts as a supervisor.
+dynamic_buffer ({8192, 131072})::
+
+Cowboy will dynamically change the socket's `buffer` size
+depending on the size of the data it receives from the socket.
+This lets Cowboy use the optimal buffer size for the current
+workload.
++
+The dynamic buffer size functionality can be disabled by
+setting this option to `false`. Cowboy will also disable
+it by default when the `buffer` transport option is configured.
+
http10_keepalive (true)::
Whether keep-alive is enabled for HTTP/1.0 connections.
@@ -166,6 +178,8 @@ Ordered list of stream handlers that will handle all stream events.
== Changelog
+* *2.13*: The `active_n` default value was changed to `1`.
+* *2.13*: The `dynamic_buffer` option was added.
* *2.11*: The `reset_idle_timeout_on_send` option was added.
* *2.8*: The `active_n` option was added.
* *2.7*: The `initial_stream_flow_size` and `logger` options were added.
diff --git a/doc/src/manual/cowboy_http2.asciidoc b/doc/src/manual/cowboy_http2.asciidoc
index 1d2619c..a5fcd0b 100644
--- a/doc/src/manual/cowboy_http2.asciidoc
+++ b/doc/src/manual/cowboy_http2.asciidoc
@@ -21,6 +21,7 @@ opts() :: #{
connection_type => worker | supervisor,
connection_window_margin_size => 0..16#7fffffff,
connection_window_update_threshold => 0..16#7fffffff,
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
enable_connect_protocol => boolean(),
goaway_initial_timeout => timeout(),
goaway_complete_timeout => timeout(),
@@ -66,7 +67,7 @@ Ranch functions `ranch:get_protocol_options/1` and
The default value is given next to the option name:
-active_n (100)::
+active_n (1)::
The number of packets Cowboy will request from the socket at once.
This can be used to tweak the performance of the server. Higher
@@ -91,6 +92,17 @@ The connection window will only get updated when its size
becomes lower than this threshold, in bytes. This is to
avoid sending too many `WINDOW_UPDATE` frames.
+dynamic_buffer ({8192, 131072})::
+
+Cowboy will dynamically change the socket's `buffer` size
+depending on the size of the data it receives from the socket.
+This lets Cowboy use the optimal buffer size for the current
+workload.
++
+The dynamic buffer size functionality can be disabled by
+setting this option to `false`. Cowboy will also disable
+it by default when the `buffer` transport option is configured.
+
enable_connect_protocol (false)::
Whether to enable the extended CONNECT method to allow
@@ -289,6 +301,8 @@ too many `WINDOW_UPDATE` frames.
== Changelog
+* *2.13*: The `active_n` default value was changed to `1`.
+* *2.13*: The `dynamic_buffer` option was added.
* *2.11*: Websocket over HTTP/2 is now considered stable.
* *2.11*: The `reset_idle_timeout_on_send` option was added.
* *2.11*: Add the option `max_cancel_stream_rate` to protect
diff --git a/doc/src/manual/cowboy_websocket.asciidoc b/doc/src/manual/cowboy_websocket.asciidoc
index e152182..d5db82f 100644
--- a/doc/src/manual/cowboy_websocket.asciidoc
+++ b/doc/src/manual/cowboy_websocket.asciidoc
@@ -203,6 +203,7 @@ opts() :: #{
active_n => pos_integer(),
compress => boolean(),
deflate_opts => cow_ws:deflate_opts()
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
idle_timeout => timeout(),
max_frame_size => non_neg_integer() | infinity,
req_filter => fun((cowboy_req:req()) -> map()),
@@ -224,7 +225,7 @@ init(Req, State) ->
The default value is given next to the option name:
-active_n (100)::
+active_n (1)::
The number of packets Cowboy will request from the socket at once.
This can be used to tweak the performance of the server. Higher
@@ -248,6 +249,17 @@ options and the zlib compression options. The
defaults optimize the compression at the expense
of some memory and CPU.
+dynamic_buffer ({8192, 131072})::
+
+Cowboy will dynamically change the socket's `buffer` size
+depending on the size of the data it receives from the socket.
+This lets Cowboy use the optimal buffer size for the current
+workload.
++
+The dynamic buffer size functionality can be disabled by
+setting this option to `false`. Cowboy will also disable
+it by default when the `buffer` transport option is configured.
+
idle_timeout (60000)::
Time in milliseconds that Cowboy will keep the
@@ -287,6 +299,8 @@ normal circumstances if necessary.
== Changelog
+* *2.13*: The `active_n` default value was changed to `1`.
+* *2.13*: The `dynamic_buffer` option was added.
* *2.13*: The `max_frame_size` option can now be set dynamically.
* *2.11*: Websocket over HTTP/2 is now considered stable.
* *2.11*: HTTP/1.1 Websocket no longer traps exits by default.
diff --git a/src/cowboy.erl b/src/cowboy.erl
index e5ed831..c685649 100644
--- a/src/cowboy.erl
+++ b/src/cowboy.erl
@@ -51,8 +51,12 @@
start_clear(Ref, TransOpts0, ProtoOpts0) ->
TransOpts1 = ranch:normalize_opts(TransOpts0),
- {TransOpts, ConnectionType} = ensure_connection_type(TransOpts1),
- ProtoOpts = ProtoOpts0#{connection_type => ConnectionType},
+ {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0),
+ {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2),
+ ProtoOpts = ProtoOpts0#{
+ connection_type => ConnectionType,
+ dynamic_buffer => DynamicBuffer
+ },
ranch:start_listener(Ref, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts).
-spec start_tls(ranch:ref(), ranch:opts(), opts())
@@ -60,12 +64,13 @@ start_clear(Ref, TransOpts0, ProtoOpts0) ->
start_tls(Ref, TransOpts0, ProtoOpts0) ->
TransOpts1 = ranch:normalize_opts(TransOpts0),
- SocketOpts = maps:get(socket_opts, TransOpts1, []),
- TransOpts2 = TransOpts1#{socket_opts => [
- {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}
- |SocketOpts]},
- {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2),
- ProtoOpts = ProtoOpts0#{connection_type => ConnectionType},
+ {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0),
+ TransOpts3 = ensure_alpn(TransOpts2),
+ {TransOpts, ConnectionType} = ensure_connection_type(TransOpts3),
+ ProtoOpts = ProtoOpts0#{
+ connection_type => ConnectionType,
+ dynamic_buffer => DynamicBuffer
+ },
ranch:start_listener(Ref, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts).
%% @todo Experimental function to start a barebone QUIC listener.
@@ -77,6 +82,7 @@ start_tls(Ref, TransOpts0, ProtoOpts0) ->
-spec start_quic(ranch:ref(), #{socket_opts => [{atom(), _}]}, cowboy_http3:opts())
-> {ok, pid()}.
+%% @todo Implement dynamic_buffer for HTTP/3 if/when it applies.
start_quic(Ref, TransOpts, ProtoOpts) ->
{ok, _} = application:ensure_all_started(quicer),
Parent = self(),
@@ -139,11 +145,32 @@ port_0() ->
end,
Port.
+ensure_alpn(TransOpts) ->
+ SocketOpts = maps:get(socket_opts, TransOpts, []),
+ TransOpts#{socket_opts => [
+ {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}
+ |SocketOpts]}.
+
ensure_connection_type(TransOpts=#{connection_type := ConnectionType}) ->
{TransOpts, ConnectionType};
ensure_connection_type(TransOpts) ->
{TransOpts#{connection_type => supervisor}, supervisor}.
+%% Dynamic buffer was set; accept transport options as-is.
+%% Note that initial 'buffer' size may be lower than dynamic buffer allows.
+ensure_dynamic_buffer(TransOpts, #{dynamic_buffer := DynamicBuffer}) ->
+ {TransOpts, DynamicBuffer};
+%% Dynamic buffer was not set; define default dynamic buffer
+%% only if 'buffer' size was not configured. In that case we
+%% set the 'buffer' size to the lowest value.
+ensure_dynamic_buffer(TransOpts=#{socket_opts := SocketOpts}, _) ->
+ case proplists:get_value(buffer, SocketOpts, undefined) of
+ undefined ->
+ {TransOpts#{socket_opts => [{buffer, 8192}|SocketOpts]}, {8192, 131072}};
+ _ ->
+ {TransOpts, false}
+ end.
+
-spec stop_listener(ranch:ref()) -> ok | {error, not_found}.
stop_listener(Ref) ->
diff --git a/src/cowboy_dynamic_buffer.hrl b/src/cowboy_dynamic_buffer.hrl
new file mode 100644
index 0000000..cb07aab
--- /dev/null
+++ b/src/cowboy_dynamic_buffer.hrl
@@ -0,0 +1,80 @@
+%% Copyright (c) 2025, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% These functions are common to cowboy_http, cowboy_http2 and
+%% cowboy_websocket. It requires the options and the state
+%% to use the same field names.
+
+%% Experiments have shown that the size of the 'buffer' can greatly
+%% impact performance: a buffer too small leads to more messages
+%% being handled and typically more binary appends; and a buffer
+%% too large results in inefficient use of memory which in turn
+%% reduces the throughput, presumably because large binary appends
+%% are not as efficient as smaller ones, and because while the
+%% buffer gets allocated only when there is data, the allocated
+%% size remains until the binary is GC and so under-use hurts.
+%%
+%% The performance of a given 'buffer' size will also depend on
+%% how the client is sending data, and on the protocol. For example,
+%% HTTP/1.1 doesn't need a very large 'buffer' size for reading
+%% request headers, but it does need one for reading large request
+%% bodies. At the same time, HTTP/2 performs best reading large
+%% request bodies when the 'buffer' size is about half that of
+%% HTTP/1.1.
+%%
+%% It therefore becomes important to resize the buffer dynamically
+%% depending on what is currently going on. We do this based on
+%% the size of data packets we received from the transport. We
+%% maintain a moving average and when that moving average is
+%% 90% of the current 'buffer' size, we double the 'buffer' size.
+%% When things slow down and the moving average falls below
+%% 40% of the current 'buffer' size, we halve the 'buffer' size.
+%%
+%% To calculate the moving average we do (MovAvg + DataLen) div 2.
+%% This means that the moving average will change very quickly when
+%% DataLen increases or decreases rapidly. That's OK, we want to
+%% be reactive, but also setting the buffer size is a pretty fast
+%% operation. The formula could be changed to the following if it
+%% became a problem: (MovAvg * N + DataLen) div (N + 1).
+%%
+%% Note that this works best when active,N uses low values of N.
+%% We don't want to accumulate too much data because we resize
+%% the buffer.
+
+init_dynamic_buffer_size(#{dynamic_buffer_initial_size := DynamicBuffer}) ->
+ DynamicBuffer;
+init_dynamic_buffer_size(#{dynamic_buffer := {LowDynamicBuffer, _}}) ->
+ LowDynamicBuffer;
+init_dynamic_buffer_size(_) ->
+ false.
+
+maybe_resize_buffer(State=#state{dynamic_buffer_size=false}, _) ->
+ State;
+maybe_resize_buffer(State=#state{transport=Transport, socket=Socket,
+ opts=#{dynamic_buffer := {LowDynamicBuffer, HighDynamicBuffer}},
+ dynamic_buffer_size=BufferSize0, dynamic_buffer_moving_average=MovingAvg0}, Data) ->
+ DataLen = byte_size(Data),
+ MovingAvg = (MovingAvg0 + DataLen) div 2,
+ if
+ BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
+ BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
+ ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])),
+ State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize};
+ BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
+ BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
+ ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])),
+ State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize};
+ true ->
+ State#state{dynamic_buffer_moving_average=MovingAvg}
+ end.
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index 7c62b13..de268a6 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -28,6 +28,9 @@
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
connection_type => worker | supervisor,
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ dynamic_buffer_initial_average => non_neg_integer(),
+ dynamic_buffer_initial_size => pos_integer(),
env => cowboy_middleware:env(),
http10_keepalive => boolean(),
idle_timeout => timeout(),
@@ -137,6 +140,10 @@
%% Flow requested for the current stream.
flow = infinity :: non_neg_integer() | infinity,
+ %% Dynamic buffer moving average and current buffer size.
+ dynamic_buffer_size :: pos_integer() | false,
+ dynamic_buffer_moving_average :: non_neg_integer(),
+
%% Identifier for the stream currently being written.
%% Note that out_streamid =< in_streamid.
out_streamid = 1 :: pos_integer(),
@@ -181,12 +188,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
peer=Peer, sock=Sock, cert=Cert,
+ dynamic_buffer_size=init_dynamic_buffer_size(Opts),
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
last_streamid=maps:get(max_keepalive, Opts, 1000)},
safe_setopts_active(State),
loop(set_timeout(State, request_timeout)).
+-include("cowboy_dynamic_buffer.hrl").
+
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
- N = maps:get(active_n, Opts, 100),
+ N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
safe_setopts_active(State) ->
@@ -220,11 +231,13 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
receive
%% Discard data coming in after the last request
%% we want to process was received fully.
- {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
- loop(State);
+ {OK, Socket, Data} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
+ State1 = maybe_resize_buffer(State, Data),
+ loop(State1);
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- parse(<< Buffer/binary, Data/binary >>, State);
+ State1 = maybe_resize_buffer(State, Data),
+ parse(<< Buffer/binary, Data/binary >>, State1);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
@@ -885,12 +898,12 @@ is_http2_upgrade(_, _) ->
%% Prior knowledge upgrade, without an HTTP/1.1 request.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
- proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
+ proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
case Transport:secure() of
false ->
_ = cancel_timeout(State),
- cowboy_http2:init(Parent, Ref, Socket, Transport,
- ProxyHeader, Opts, Peer, Sock, Cert, Buffer);
+ cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader,
+ opts_for_upgrade(State), Peer, Sock, Cert, Buffer);
true ->
error_terminate(400, State, {connection_error, protocol_error,
'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
@@ -898,7 +911,7 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
%% Upgrade via an HTTP/1.1 request.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
- proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert},
+ proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert},
Buffer, HTTP2Settings, Req) ->
%% @todo
%% However if the client sent a body, we need to read the body in full
@@ -907,13 +920,22 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
try cow_http_hd:parse_http2_settings(HTTP2Settings) of
Settings ->
_ = cancel_timeout(State),
- cowboy_http2:init(Parent, Ref, Socket, Transport,
- ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req)
+ cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader,
+ opts_for_upgrade(State), Peer, Sock, Cert, Buffer, Settings, Req)
catch _:_ ->
error_terminate(400, State, {connection_error, protocol_error,
'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
end.
+opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=false}) ->
+ Opts;
+opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=Size,
+ dynamic_buffer_moving_average=MovingAvg}) ->
+ Opts#{
+ dynamic_buffer_initial_average => MovingAvg,
+ dynamic_buffer_initial_size => Size
+ }.
+
%% Request body parsing.
parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
@@ -1210,7 +1232,7 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams, out_
commands(State, StreamID, Tail);
%% Protocol takeover.
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
- out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
+ out_state=OutState, buffer=Buffer, children=Children}, StreamID,
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
%% @todo If there's streams opened after this one, fail instead of 101.
State1 = cancel_timeout(State0),
@@ -1234,7 +1256,8 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
%% Turn off the trap_exit process flag
%% since this process will no longer be a supervisor.
process_flag(trap_exit, false),
- Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState);
+ Protocol:takeover(Parent, Ref, Socket, Transport,
+ opts_for_upgrade(State), Buffer, InitialState);
%% Set options dynamically.
commands(State0=#state{overriden_opts=Opts},
StreamID, [{set_options, SetOpts}|Tail]) ->
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 0e110cd..a6ffca7 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -29,6 +29,9 @@
connection_type => worker | supervisor,
connection_window_margin_size => 0..16#7fffffff,
connection_window_update_threshold => 0..16#7fffffff,
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ dynamic_buffer_initial_average => non_neg_integer(),
+ dynamic_buffer_initial_size => pos_integer(),
enable_connect_protocol => boolean(),
env => cowboy_middleware:env(),
goaway_initial_timeout => timeout(),
@@ -133,6 +136,10 @@
%% Flow requested for all streams.
flow = 0 :: non_neg_integer(),
+ %% Dynamic buffer moving average and current buffer size.
+ dynamic_buffer_size :: pos_integer() | false,
+ dynamic_buffer_moving_average :: non_neg_integer(),
+
%% Currently active HTTP/2 streams. Streams may be initiated either
%% by the client or by the server through PUSH_PROMISE frames.
streams = #{} :: #{cow_http2:streamid() => #stream{}},
@@ -169,12 +176,15 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
{inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
binary() | undefined, binary()) -> ok.
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) ->
+ DynamicBuffer = init_dynamic_buffer_size(Opts),
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts),
%% Send the preface before doing all the init in case we get a socket error.
ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)),
State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader,
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
+ dynamic_buffer_size=DynamicBuffer,
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
http2_status=sequence, http2_machine=HTTP2Machine}), 0),
safe_setopts_active(State),
case Buffer of
@@ -216,12 +226,15 @@ add_period(Time, Period) -> Time + Period.
binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok.
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer,
_Settings, Req=#{method := Method}) ->
+ DynamicBuffer = init_dynamic_buffer_size(Opts),
{ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts),
{ok, StreamID, HTTP2Machine}
= cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0),
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader,
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
+ dynamic_buffer_size=DynamicBuffer,
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
http2_status=upgrade, http2_machine=HTTP2Machine},
State1 = headers_frame(State0#state{
http2_machine=HTTP2Machine}, StreamID, Req),
@@ -241,11 +254,14 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
_ -> parse(State, Buffer)
end.
+-include("cowboy_dynamic_buffer.hrl").
+
%% Because HTTP/2 has flow control and Cowboy has other rate limiting
%% mechanisms implemented, a very large active_n value should be fine,
%% as long as the stream handlers do their work in a timely manner.
+%% However large active_n values reduce the impact of dynamic_buffer.
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
- N = maps:get(active_n, Opts, 100),
+ N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
safe_setopts_active(State) ->
@@ -258,7 +274,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
receive
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- parse(State#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>);
+ State1 = maybe_resize_buffer(State, Data),
+ parse(State1#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>);
{Closed, Socket} when Closed =:= element(2, Messages) ->
Reason = case State#state.http2_status of
closing -> {stop, closed, 'The client is going away.'};
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index 8d3ed08..d2eb99f 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -69,6 +69,9 @@
active_n => pos_integer(),
compress => boolean(),
deflate_opts => cow_ws:deflate_opts(),
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ dynamic_buffer_initial_average => non_neg_integer(),
+ dynamic_buffer_initial_size => pos_integer(),
idle_timeout => timeout(),
max_frame_size => non_neg_integer() | infinity,
req_filter => fun((cowboy_req:req()) -> map()),
@@ -97,6 +100,11 @@
timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS,
messages = undefined :: undefined | {atom(), atom(), atom()}
| {atom(), atom(), atom(), atom()},
+
+ %% Dynamic buffer moving average and current buffer size.
+ dynamic_buffer_size = false :: pos_integer() | false,
+ dynamic_buffer_moving_average = 0 :: non_neg_integer(),
+
hibernate = false :: boolean(),
frag_state = undefined :: cow_ws:frag_state(),
frag_buffer = <<>> :: binary(),
@@ -270,7 +278,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
%% @todo We don't want date and server headers.
Headers = cowboy_req:response_headers(#{}, Req),
Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}},
- takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>,
+ takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>,
{State, HandlerState}).
%% Connection process.
@@ -295,8 +303,8 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
-spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()},
module() | undefined, any(), binary(),
{#state{}, any()}) -> no_return().
-takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
- {State0=#state{handler=Handler, req=Req}, HandlerState}) ->
+takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
+ {State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) ->
case Req of
#{version := 'HTTP/3'} -> ok;
%% @todo We should have an option to disable this behavior.
@@ -308,7 +316,11 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
end,
State = set_idle_timeout(State0#state{parent=Parent,
ref=Ref, socket=Socket, transport=Transport,
- key=undefined, messages=Messages}, 0),
+ opts=WsOpts#{dynamic_buffer => maps:get(dynamic_buffer, Opts, false)},
+ key=undefined, messages=Messages,
+ %% Dynamic buffer only applies to HTTP/1.1 Websocket.
+ dynamic_buffer_size=init_dynamic_buffer_size(Opts),
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0)}, 0),
%% We call parse_header/3 immediately because there might be
%% some data in the buffer that was sent along with the handshake.
%% While it is not allowed by the protocol to send frames immediately,
@@ -319,6 +331,12 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
false -> after_init(State, HandlerState, #ps_header{buffer=Buffer})
end.
+-include("cowboy_dynamic_buffer.hrl").
+
+%% @todo Implement early socket error detection.
+maybe_socket_error(_, _) ->
+ ok.
+
after_init(State=#state{active=true}, HandlerState, ParseState) ->
%% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2.
%% We must do this only after calling websocket_init/1 (if any)
@@ -340,7 +358,7 @@ after_init(State, HandlerState, ParseState) ->
setopts_active(#state{transport=undefined}) ->
ok;
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
- N = maps:get(active_n, Opts, 100),
+ N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
@@ -414,7 +432,8 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
receive
%% Socket messages. (HTTP/1.1)
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
+ State1 = maybe_resize_buffer(State, Data),
+ parse(?reset_idle_timeout(State1), HandlerState, ParseState, Data);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, HandlerState, {error, closed});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
diff --git a/test/handlers/read_body_h.erl b/test/handlers/read_body_h.erl
new file mode 100644
index 0000000..a0de3b3
--- /dev/null
+++ b/test/handlers/read_body_h.erl
@@ -0,0 +1,15 @@
+%% This module reads the request body fully and send a 204 response.
+
+-module(read_body_h).
+
+-export([init/2]).
+
+init(Req0, Opts) ->
+ {ok, Req} = read_body(Req0),
+ {ok, cowboy_req:reply(200, #{}, Req), Opts}.
+
+read_body(Req0) ->
+ case cowboy_req:read_body(Req0) of
+ {ok, _, Req} -> {ok, Req};
+ {more, _, Req} -> read_body(Req)
+ end.
diff --git a/test/handlers/ws_ignore.erl b/test/handlers/ws_ignore.erl
new file mode 100644
index 0000000..9fe3322
--- /dev/null
+++ b/test/handlers/ws_ignore.erl
@@ -0,0 +1,20 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(ws_ignore).
+
+-export([init/2]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+
+init(Req, _) ->
+ {cowboy_websocket, Req, undefined, #{
+ compress => true
+ }}.
+
+websocket_handle({text, <<"CHECK">>}, State) ->
+ {[{text, <<"CHECK">>}], State};
+websocket_handle(_Frame, State) ->
+ {[], State}.
+
+websocket_info(_Info, State) ->
+ {[], State}.
diff --git a/test/http_perf_SUITE.erl b/test/http_perf_SUITE.erl
index 8702b9d..161f2b5 100644
--- a/test/http_perf_SUITE.erl
+++ b/test/http_perf_SUITE.erl
@@ -32,7 +32,7 @@ groups() ->
init_per_suite(Config) ->
do_log("", []),
%% Optionally enable `perf` for the current node.
-% spawn(fun() -> ct:pal(os:cmd("perf record -g -F 9999 -o /tmp/ws_perf.data -p " ++ os:getpid() ++ " -- sleep 60")) end),
+% spawn(fun() -> ct:pal(os:cmd("perf record -g -F 9999 -o /tmp/http_perf.data -p " ++ os:getpid() ++ " -- sleep 60")) end),
Config.
end_per_suite(_) ->
@@ -43,7 +43,16 @@ init_per_group(Name, Config) ->
%% HTTP/1.1
max_keepalive => infinity,
%% HTTP/2
- max_received_frame_rate => {10_000_000, 1}
+ %% @todo Must configure Gun for performance too.
+ connection_window_margin_size => 64*1024,
+ enable_connect_protocol => true,
+ env => #{dispatch => init_dispatch(Config)},
+ max_frame_size_sent => 64*1024,
+ max_frame_size_received => 16384 * 1024 - 1,
+ max_received_frame_rate => {10_000_000, 1},
+ stream_window_data_threshold => 1024,
+ stream_window_margin_size => 64*1024
+
})].
end_per_group(Name, _) ->
@@ -54,10 +63,19 @@ end_per_group(Name, _) ->
init_dispatch(_) ->
cowboy_router:compile([{'_', [
- {"/", hello_h, []}
+ {"/", hello_h, []},
+ {"/read_body", read_body_h, []}
]}]).
-%% Tests.
+%% Tests: Hello world.
+
+plain_h_hello_1(Config) ->
+ doc("Plain HTTP handler Hello World; 10K requests per 1 client."),
+ do_bench_get(?FUNCTION_NAME, "/", #{}, 1, 10000, Config).
+
+plain_h_hello_10(Config) ->
+ doc("Plain HTTP handler Hello World; 10K requests per 10 clients."),
+ do_bench_get(?FUNCTION_NAME, "/", #{}, 10, 10000, Config).
stream_h_hello_1(Config) ->
doc("Stream handler Hello World; 10K requests per 1 client."),
@@ -81,51 +99,86 @@ do_stream_h_hello(Config, NumClients) ->
do_bench_get(?FUNCTION_NAME, "/", #{}, NumClients, 10000, Config),
ranch:set_protocol_options(Ref, ProtoOpts).
-plain_h_hello_1(Config) ->
- doc("Plain HTTP handler Hello World; 10K requests per 1 client."),
- do_bench_get(?FUNCTION_NAME, "/", #{}, 1, 10000, Config).
+%% Tests: Large body upload.
-plain_h_hello_10(Config) ->
- doc("Plain HTTP handler Hello World; 10K requests per 10 clients."),
- do_bench_get(?FUNCTION_NAME, "/", #{}, 10, 10000, Config).
+plain_h_1M_post_1(Config) ->
+ doc("Plain HTTP handler body reading; 100 requests per 1 client."),
+ do_bench_post(?FUNCTION_NAME, "/read_body", #{}, <<0:8_000_000>>, 1, 10000, Config).
+
+plain_h_1M_post_10(Config) ->
+ doc("Plain HTTP handler body reading; 100 requests per 10 clients."),
+ do_bench_post(?FUNCTION_NAME, "/read_body", #{}, <<0:8_000_000>>, 10, 10000, Config).
%% Internal.
do_bench_get(What, Path, Headers, NumClients, NumRuns, Config) ->
- Clients = [spawn_link(?MODULE, do_bench_proc, [self(), What, Path, Headers, NumRuns, Config])
+ Clients = [spawn_link(?MODULE, do_bench_get_proc,
+ [self(), What, Path, Headers, NumRuns, Config])
|| _ <- lists:seq(1, NumClients)],
_ = [receive {What, ready} -> ok end || _ <- Clients],
- {Time, _} = timer:tc(?MODULE, do_bench_get1, [What, Clients]),
+ {Time, _} = timer:tc(?MODULE, do_bench_wait, [What, Clients]),
do_log("~32s: ~8bµs ~8.1freqs/s", [
[atom_to_list(config(group, Config)), $., atom_to_list(What)],
Time,
(NumClients * NumRuns) / Time * 1_000_000]),
ok.
-do_bench_get1(What, Clients) ->
- _ = [ClientPid ! {What, go} || ClientPid <- Clients],
- _ = [receive {What, done} -> ok end || _ <- Clients],
+do_bench_get_proc(Parent, What, Path, Headers0, NumRuns, Config) ->
+ ConnPid = gun_open(Config),
+ Headers = Headers0#{<<"accept-encoding">> => <<"gzip">>},
+ Parent ! {What, ready},
+ receive {What, go} -> ok end,
+ do_bench_get_run(ConnPid, Path, Headers, NumRuns),
+ Parent ! {What, done},
+ gun:close(ConnPid).
+
+do_bench_get_run(_, _, _, 0) ->
+ ok;
+do_bench_get_run(ConnPid, Path, Headers, Num) ->
+ Ref = gun:request(ConnPid, <<"GET">>, Path, Headers, <<>>),
+ {response, IsFin, 200, _RespHeaders} = gun:await(ConnPid, Ref, infinity),
+ {ok, _} = case IsFin of
+ nofin -> gun:await_body(ConnPid, Ref, infinity);
+ fin -> {ok, <<>>}
+ end,
+ do_bench_get_run(ConnPid, Path, Headers, Num - 1).
+
+do_bench_post(What, Path, Headers, Body, NumClients, NumRuns, Config) ->
+ Clients = [spawn_link(?MODULE, do_bench_post_proc,
+ [self(), What, Path, Headers, Body, NumRuns, Config])
+ || _ <- lists:seq(1, NumClients)],
+ _ = [receive {What, ready} -> ok end || _ <- Clients],
+ {Time, _} = timer:tc(?MODULE, do_bench_wait, [What, Clients]),
+ do_log("~32s: ~8bµs ~8.1freqs/s", [
+ [atom_to_list(config(group, Config)), $., atom_to_list(What)],
+ Time,
+ (NumClients * NumRuns) / Time * 1_000_000]),
ok.
-do_bench_proc(Parent, What, Path, Headers0, NumRuns, Config) ->
+do_bench_post_proc(Parent, What, Path, Headers0, Body, NumRuns, Config) ->
ConnPid = gun_open(Config),
Headers = Headers0#{<<"accept-encoding">> => <<"gzip">>},
Parent ! {What, ready},
receive {What, go} -> ok end,
- do_bench_run(ConnPid, Path, Headers, NumRuns),
+ do_bench_post_run(ConnPid, Path, Headers, Body, NumRuns),
Parent ! {What, done},
gun:close(ConnPid).
-do_bench_run(_, _, _, 0) ->
+do_bench_post_run(_, _, _, _, 0) ->
ok;
-do_bench_run(ConnPid, Path, Headers, Num) ->
- Ref = gun:request(ConnPid, <<"GET">>, Path, Headers, <<>>),
+do_bench_post_run(ConnPid, Path, Headers, Body, Num) ->
+ Ref = gun:request(ConnPid, <<"POST">>, Path, Headers, Body),
{response, IsFin, 200, _RespHeaders} = gun:await(ConnPid, Ref, infinity),
{ok, _} = case IsFin of
nofin -> gun:await_body(ConnPid, Ref, infinity);
fin -> {ok, <<>>}
end,
- do_bench_run(ConnPid, Path, Headers, Num - 1).
+ do_bench_post_run(ConnPid, Path, Headers, Body, Num - 1).
+
+do_bench_wait(What, Clients) ->
+ _ = [ClientPid ! {What, go} || ClientPid <- Clients],
+ _ = [receive {What, done} -> ok end || _ <- Clients],
+ ok.
do_log(Str, Args) ->
ct:log(Str, Args),
diff --git a/test/ws_perf_SUITE.erl b/test/ws_perf_SUITE.erl
index 2fd2db2..2d7a1c0 100644
--- a/test/ws_perf_SUITE.erl
+++ b/test/ws_perf_SUITE.erl
@@ -60,6 +60,7 @@ init_per_group(Name, Config) when Name =:= h2c; Name =:= h2c_compress ->
env => #{dispatch => init_dispatch(Config)},
max_frame_size_sent => 64*1024,
max_frame_size_received => 16384 * 1024 - 1,
+ max_received_frame_rate => {10_000_000, 1},
stream_window_data_threshold => 1024,
stream_window_margin_size => 64*1024
}, [{flavor, Flavor}|Config]),
@@ -102,13 +103,14 @@ end_per_group(Name, _Config) ->
init_dispatch(_Config) ->
cowboy_router:compile([
{"localhost", [
- {"/ws_echo", ws_echo, []}
+ {"/ws_echo", ws_echo, []},
+ {"/ws_ignore", ws_ignore, []}
]}
]).
%% Support functions for testing using Gun.
-do_gun_open_ws(Config) ->
+do_gun_open_ws(Path, Config) ->
ConnPid = gun_open(Config, #{
http2_opts => #{
connection_window_margin_size => 64*1024,
@@ -127,7 +129,7 @@ do_gun_open_ws(Config) ->
{notify, settings_changed, #{enable_connect_protocol := true}}
= gun:await(ConnPid, undefined) %% @todo Maybe have a gun:await/1?
end,
- StreamRef = gun:ws_upgrade(ConnPid, "/ws_echo"),
+ StreamRef = gun:ws_upgrade(ConnPid, Path),
receive
{gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} ->
{ok, ConnPid, StreamRef};
@@ -149,72 +151,140 @@ receive_ws(ConnPid, StreamRef) ->
%% Tests.
-one_00064KiB(Config) ->
+echo_1_00064KiB(Config) ->
doc("Send and receive a 64KiB frame."),
- do_full(Config, one, 1, 64 * 1024).
+ do_echo(Config, echo_1, 1, 64 * 1024).
-one_00256KiB(Config) ->
+echo_1_00256KiB(Config) ->
doc("Send and receive a 256KiB frame."),
- do_full(Config, one, 1, 256 * 1024).
+ do_echo(Config, echo_1, 1, 256 * 1024).
-one_01024KiB(Config) ->
+echo_1_01024KiB(Config) ->
doc("Send and receive a 1024KiB frame."),
- do_full(Config, one, 1, 1024 * 1024).
+ do_echo(Config, echo_1, 1, 1024 * 1024).
-one_04096KiB(Config) ->
+echo_1_04096KiB(Config) ->
doc("Send and receive a 4096KiB frame."),
- do_full(Config, one, 1, 4096 * 1024).
+ do_echo(Config, echo_1, 1, 4096 * 1024).
%% Minus one because frames can only get so big.
-one_16384KiB(Config) ->
+echo_1_16384KiB(Config) ->
doc("Send and receive a 16384KiB - 1 frame."),
- do_full(Config, one, 1, 16384 * 1024 - 1).
+ do_echo(Config, echo_1, 1, 16384 * 1024 - 1).
-repeat_00000B(Config) ->
+echo_N_00000B(Config) ->
doc("Send and receive a 0B frame 1000 times."),
- do_full(Config, repeat, 1000, 0).
+ do_echo(Config, echo_N, 1000, 0).
-repeat_00256B(Config) ->
+echo_N_00256B(Config) ->
doc("Send and receive a 256B frame 1000 times."),
- do_full(Config, repeat, 1000, 256).
+ do_echo(Config, echo_N, 1000, 256).
-repeat_01024B(Config) ->
+echo_N_01024B(Config) ->
doc("Send and receive a 1024B frame 1000 times."),
- do_full(Config, repeat, 1000, 1024).
+ do_echo(Config, echo_N, 1000, 1024).
-repeat_04096B(Config) ->
+echo_N_04096B(Config) ->
doc("Send and receive a 4096B frame 1000 times."),
- do_full(Config, repeat, 1000, 4096).
+ do_echo(Config, echo_N, 1000, 4096).
-repeat_16384B(Config) ->
+echo_N_16384B(Config) ->
doc("Send and receive a 16384B frame 1000 times."),
- do_full(Config, repeat, 1000, 16384).
+ do_echo(Config, echo_N, 1000, 16384).
-%repeat_16384B_10K(Config) ->
+%echo_N_16384B_10K(Config) ->
% doc("Send and receive a 16384B frame 10000 times."),
-% do_full(Config, repeat, 10000, 16384).
+% do_echo(Config, echo_N, 10000, 16384).
-do_full(Config, What, Num, FrameSize) ->
- {ok, ConnPid, StreamRef} = do_gun_open_ws(Config),
+do_echo(Config, What, Num, FrameSize) ->
+ {ok, ConnPid, StreamRef} = do_gun_open_ws("/ws_echo", Config),
FrameType = config(frame_type, Config),
FrameData = case FrameType of
text -> do_text_data(Config, FrameSize);
binary -> rand:bytes(FrameSize)
end,
%% Heat up the processes before doing the real run.
-% do_full1(ConnPid, StreamRef, Num, FrameType, FrameData),
- {Time, _} = timer:tc(?MODULE, do_full1, [ConnPid, StreamRef, Num, FrameType, FrameData]),
+% do_echo_loop(ConnPid, StreamRef, Num, FrameType, FrameData),
+ {Time, _} = timer:tc(?MODULE, do_echo_loop, [ConnPid, StreamRef, Num, FrameType, FrameData]),
do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]),
gun:ws_send(ConnPid, StreamRef, close),
{ok, close} = receive_ws(ConnPid, StreamRef),
gun_down(ConnPid).
-do_full1(_, _, 0, _, _) ->
+do_echo_loop(_, _, 0, _, _) ->
ok;
-do_full1(ConnPid, StreamRef, Num, FrameType, FrameData) ->
+do_echo_loop(ConnPid, StreamRef, Num, FrameType, FrameData) ->
gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}),
{ok, {FrameType, FrameData}} = receive_ws(ConnPid, StreamRef),
- do_full1(ConnPid, StreamRef, Num - 1, FrameType, FrameData).
+ do_echo_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData).
+
+send_1_00064KiB(Config) ->
+ doc("Send a 64KiB frame."),
+ do_send(Config, send_1, 1, 64 * 1024).
+
+send_1_00256KiB(Config) ->
+ doc("Send a 256KiB frame."),
+ do_send(Config, send_1, 1, 256 * 1024).
+
+send_1_01024KiB(Config) ->
+ doc("Send a 1024KiB frame."),
+ do_send(Config, send_1, 1, 1024 * 1024).
+
+send_1_04096KiB(Config) ->
+ doc("Send a 4096KiB frame."),
+ do_send(Config, send_1, 1, 4096 * 1024).
+
+%% Minus one because frames can only get so big.
+send_1_16384KiB(Config) ->
+ doc("Send a 16384KiB - 1 frame."),
+ do_send(Config, send_1, 1, 16384 * 1024 - 1).
+
+send_N_00000B(Config) ->
+ doc("Send a 0B frame 10000 times."),
+ do_send(Config, send_N, 10000, 0).
+
+send_N_00256B(Config) ->
+ doc("Send a 256B frame 10000 times."),
+ do_send(Config, send_N, 10000, 256).
+
+send_N_01024B(Config) ->
+ doc("Send a 1024B frame 10000 times."),
+ do_send(Config, send_N, 10000, 1024).
+
+send_N_04096B(Config) ->
+ doc("Send a 4096B frame 10000 times."),
+ do_send(Config, send_N, 10000, 4096).
+
+send_N_16384B(Config) ->
+ doc("Send a 16384B frame 10000 times."),
+ do_send(Config, send_N, 10000, 16384).
+
+%send_N_16384B_10K(Config) ->
+% doc("Send and receive a 16384B frame 10000 times."),
+% do_send(Config, send_N, 10000, 16384).
+
+do_send(Config, What, Num, FrameSize) ->
+ {ok, ConnPid, StreamRef} = do_gun_open_ws("/ws_ignore", Config),
+ FrameType = config(frame_type, Config),
+ FrameData = case FrameType of
+ text -> do_text_data(Config, FrameSize);
+ binary -> rand:bytes(FrameSize)
+ end,
+ %% Heat up the processes before doing the real run.
+% do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData),
+ {Time, _} = timer:tc(?MODULE, do_send_loop, [ConnPid, StreamRef, Num, FrameType, FrameData]),
+ do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]),
+ gun:ws_send(ConnPid, StreamRef, close),
+ {ok, close} = receive_ws(ConnPid, StreamRef),
+ gun_down(ConnPid).
+
+do_send_loop(ConnPid, StreamRef, 0, _, _) ->
+ gun:ws_send(ConnPid, StreamRef, {text, <<"CHECK">>}),
+ {ok, {text, <<"CHECK">>}} = receive_ws(ConnPid, StreamRef),
+ ok;
+do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData) ->
+ gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}),
+ do_send_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData).
%% Internal.