diff options
-rw-r--r-- | doc/src/manual/cowboy_http.asciidoc | 16 | ||||
-rw-r--r-- | doc/src/manual/cowboy_http2.asciidoc | 16 | ||||
-rw-r--r-- | doc/src/manual/cowboy_websocket.asciidoc | 16 | ||||
-rw-r--r-- | src/cowboy.erl | 43 | ||||
-rw-r--r-- | src/cowboy_dynamic_buffer.hrl | 80 | ||||
-rw-r--r-- | src/cowboy_http.erl | 47 | ||||
-rw-r--r-- | src/cowboy_http2.erl | 21 | ||||
-rw-r--r-- | src/cowboy_websocket.erl | 31 | ||||
-rw-r--r-- | test/handlers/read_body_h.erl | 15 | ||||
-rw-r--r-- | test/handlers/ws_ignore.erl | 20 | ||||
-rw-r--r-- | test/http_perf_SUITE.erl | 95 | ||||
-rw-r--r-- | test/ws_perf_SUITE.erl | 134 |
12 files changed, 450 insertions, 84 deletions
diff --git a/doc/src/manual/cowboy_http.asciidoc b/doc/src/manual/cowboy_http.asciidoc index 58f0435..31e2d37 100644 --- a/doc/src/manual/cowboy_http.asciidoc +++ b/doc/src/manual/cowboy_http.asciidoc @@ -20,6 +20,7 @@ opts() :: #{ active_n => pos_integer(), chunked => boolean(), connection_type => worker | supervisor, + dynamic_buffer => false | {pos_integer(), pos_integer()}, http10_keepalive => boolean(), idle_timeout => timeout(), inactivity_timeout => timeout(), @@ -53,7 +54,7 @@ Ranch functions `ranch:get_protocol_options/1` and The default value is given next to the option name: -active_n (100):: +active_n (1):: The number of packets Cowboy will request from the socket at once. This can be used to tweak the performance of the server. Higher @@ -75,6 +76,17 @@ connection_type (supervisor):: Whether the connection process also acts as a supervisor. +dynamic_buffer ({8192, 131072}):: + +Cowboy will dynamically change the socket's `buffer` size +depending on the size of the data it receives from the socket. +This lets Cowboy use the optimal buffer size for the current +workload. ++ +The dynamic buffer size functionality can be disabled by +setting this option to `false`. Cowboy will also disable +it by default when the `buffer` transport option is configured. + http10_keepalive (true):: Whether keep-alive is enabled for HTTP/1.0 connections. @@ -166,6 +178,8 @@ Ordered list of stream handlers that will handle all stream events. == Changelog +* *2.13*: The `active_n` default value was changed to `1`. +* *2.13*: The `dynamic_buffer` option was added. * *2.11*: The `reset_idle_timeout_on_send` option was added. * *2.8*: The `active_n` option was added. * *2.7*: The `initial_stream_flow_size` and `logger` options were added. diff --git a/doc/src/manual/cowboy_http2.asciidoc b/doc/src/manual/cowboy_http2.asciidoc index 1d2619c..a5fcd0b 100644 --- a/doc/src/manual/cowboy_http2.asciidoc +++ b/doc/src/manual/cowboy_http2.asciidoc @@ -21,6 +21,7 @@ opts() :: #{ connection_type => worker | supervisor, connection_window_margin_size => 0..16#7fffffff, connection_window_update_threshold => 0..16#7fffffff, + dynamic_buffer => false | {pos_integer(), pos_integer()}, enable_connect_protocol => boolean(), goaway_initial_timeout => timeout(), goaway_complete_timeout => timeout(), @@ -66,7 +67,7 @@ Ranch functions `ranch:get_protocol_options/1` and The default value is given next to the option name: -active_n (100):: +active_n (1):: The number of packets Cowboy will request from the socket at once. This can be used to tweak the performance of the server. Higher @@ -91,6 +92,17 @@ The connection window will only get updated when its size becomes lower than this threshold, in bytes. This is to avoid sending too many `WINDOW_UPDATE` frames. +dynamic_buffer ({8192, 131072}):: + +Cowboy will dynamically change the socket's `buffer` size +depending on the size of the data it receives from the socket. +This lets Cowboy use the optimal buffer size for the current +workload. ++ +The dynamic buffer size functionality can be disabled by +setting this option to `false`. Cowboy will also disable +it by default when the `buffer` transport option is configured. + enable_connect_protocol (false):: Whether to enable the extended CONNECT method to allow @@ -289,6 +301,8 @@ too many `WINDOW_UPDATE` frames. == Changelog +* *2.13*: The `active_n` default value was changed to `1`. +* *2.13*: The `dynamic_buffer` option was added. * *2.11*: Websocket over HTTP/2 is now considered stable. * *2.11*: The `reset_idle_timeout_on_send` option was added. * *2.11*: Add the option `max_cancel_stream_rate` to protect diff --git a/doc/src/manual/cowboy_websocket.asciidoc b/doc/src/manual/cowboy_websocket.asciidoc index e152182..d5db82f 100644 --- a/doc/src/manual/cowboy_websocket.asciidoc +++ b/doc/src/manual/cowboy_websocket.asciidoc @@ -203,6 +203,7 @@ opts() :: #{ active_n => pos_integer(), compress => boolean(), deflate_opts => cow_ws:deflate_opts() + dynamic_buffer => false | {pos_integer(), pos_integer()}, idle_timeout => timeout(), max_frame_size => non_neg_integer() | infinity, req_filter => fun((cowboy_req:req()) -> map()), @@ -224,7 +225,7 @@ init(Req, State) -> The default value is given next to the option name: -active_n (100):: +active_n (1):: The number of packets Cowboy will request from the socket at once. This can be used to tweak the performance of the server. Higher @@ -248,6 +249,17 @@ options and the zlib compression options. The defaults optimize the compression at the expense of some memory and CPU. +dynamic_buffer ({8192, 131072}):: + +Cowboy will dynamically change the socket's `buffer` size +depending on the size of the data it receives from the socket. +This lets Cowboy use the optimal buffer size for the current +workload. ++ +The dynamic buffer size functionality can be disabled by +setting this option to `false`. Cowboy will also disable +it by default when the `buffer` transport option is configured. + idle_timeout (60000):: Time in milliseconds that Cowboy will keep the @@ -287,6 +299,8 @@ normal circumstances if necessary. == Changelog +* *2.13*: The `active_n` default value was changed to `1`. +* *2.13*: The `dynamic_buffer` option was added. * *2.13*: The `max_frame_size` option can now be set dynamically. * *2.11*: Websocket over HTTP/2 is now considered stable. * *2.11*: HTTP/1.1 Websocket no longer traps exits by default. diff --git a/src/cowboy.erl b/src/cowboy.erl index e5ed831..c685649 100644 --- a/src/cowboy.erl +++ b/src/cowboy.erl @@ -51,8 +51,12 @@ start_clear(Ref, TransOpts0, ProtoOpts0) -> TransOpts1 = ranch:normalize_opts(TransOpts0), - {TransOpts, ConnectionType} = ensure_connection_type(TransOpts1), - ProtoOpts = ProtoOpts0#{connection_type => ConnectionType}, + {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0), + {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2), + ProtoOpts = ProtoOpts0#{ + connection_type => ConnectionType, + dynamic_buffer => DynamicBuffer + }, ranch:start_listener(Ref, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts). -spec start_tls(ranch:ref(), ranch:opts(), opts()) @@ -60,12 +64,13 @@ start_clear(Ref, TransOpts0, ProtoOpts0) -> start_tls(Ref, TransOpts0, ProtoOpts0) -> TransOpts1 = ranch:normalize_opts(TransOpts0), - SocketOpts = maps:get(socket_opts, TransOpts1, []), - TransOpts2 = TransOpts1#{socket_opts => [ - {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]} - |SocketOpts]}, - {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2), - ProtoOpts = ProtoOpts0#{connection_type => ConnectionType}, + {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0), + TransOpts3 = ensure_alpn(TransOpts2), + {TransOpts, ConnectionType} = ensure_connection_type(TransOpts3), + ProtoOpts = ProtoOpts0#{ + connection_type => ConnectionType, + dynamic_buffer => DynamicBuffer + }, ranch:start_listener(Ref, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts). %% @todo Experimental function to start a barebone QUIC listener. @@ -77,6 +82,7 @@ start_tls(Ref, TransOpts0, ProtoOpts0) -> -spec start_quic(ranch:ref(), #{socket_opts => [{atom(), _}]}, cowboy_http3:opts()) -> {ok, pid()}. +%% @todo Implement dynamic_buffer for HTTP/3 if/when it applies. start_quic(Ref, TransOpts, ProtoOpts) -> {ok, _} = application:ensure_all_started(quicer), Parent = self(), @@ -139,11 +145,32 @@ port_0() -> end, Port. +ensure_alpn(TransOpts) -> + SocketOpts = maps:get(socket_opts, TransOpts, []), + TransOpts#{socket_opts => [ + {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]} + |SocketOpts]}. + ensure_connection_type(TransOpts=#{connection_type := ConnectionType}) -> {TransOpts, ConnectionType}; ensure_connection_type(TransOpts) -> {TransOpts#{connection_type => supervisor}, supervisor}. +%% Dynamic buffer was set; accept transport options as-is. +%% Note that initial 'buffer' size may be lower than dynamic buffer allows. +ensure_dynamic_buffer(TransOpts, #{dynamic_buffer := DynamicBuffer}) -> + {TransOpts, DynamicBuffer}; +%% Dynamic buffer was not set; define default dynamic buffer +%% only if 'buffer' size was not configured. In that case we +%% set the 'buffer' size to the lowest value. +ensure_dynamic_buffer(TransOpts=#{socket_opts := SocketOpts}, _) -> + case proplists:get_value(buffer, SocketOpts, undefined) of + undefined -> + {TransOpts#{socket_opts => [{buffer, 8192}|SocketOpts]}, {8192, 131072}}; + _ -> + {TransOpts, false} + end. + -spec stop_listener(ranch:ref()) -> ok | {error, not_found}. stop_listener(Ref) -> diff --git a/src/cowboy_dynamic_buffer.hrl b/src/cowboy_dynamic_buffer.hrl new file mode 100644 index 0000000..cb07aab --- /dev/null +++ b/src/cowboy_dynamic_buffer.hrl @@ -0,0 +1,80 @@ +%% Copyright (c) 2025, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% These functions are common to cowboy_http, cowboy_http2 and +%% cowboy_websocket. It requires the options and the state +%% to use the same field names. + +%% Experiments have shown that the size of the 'buffer' can greatly +%% impact performance: a buffer too small leads to more messages +%% being handled and typically more binary appends; and a buffer +%% too large results in inefficient use of memory which in turn +%% reduces the throughput, presumably because large binary appends +%% are not as efficient as smaller ones, and because while the +%% buffer gets allocated only when there is data, the allocated +%% size remains until the binary is GC and so under-use hurts. +%% +%% The performance of a given 'buffer' size will also depend on +%% how the client is sending data, and on the protocol. For example, +%% HTTP/1.1 doesn't need a very large 'buffer' size for reading +%% request headers, but it does need one for reading large request +%% bodies. At the same time, HTTP/2 performs best reading large +%% request bodies when the 'buffer' size is about half that of +%% HTTP/1.1. +%% +%% It therefore becomes important to resize the buffer dynamically +%% depending on what is currently going on. We do this based on +%% the size of data packets we received from the transport. We +%% maintain a moving average and when that moving average is +%% 90% of the current 'buffer' size, we double the 'buffer' size. +%% When things slow down and the moving average falls below +%% 40% of the current 'buffer' size, we halve the 'buffer' size. +%% +%% To calculate the moving average we do (MovAvg + DataLen) div 2. +%% This means that the moving average will change very quickly when +%% DataLen increases or decreases rapidly. That's OK, we want to +%% be reactive, but also setting the buffer size is a pretty fast +%% operation. The formula could be changed to the following if it +%% became a problem: (MovAvg * N + DataLen) div (N + 1). +%% +%% Note that this works best when active,N uses low values of N. +%% We don't want to accumulate too much data because we resize +%% the buffer. + +init_dynamic_buffer_size(#{dynamic_buffer_initial_size := DynamicBuffer}) -> + DynamicBuffer; +init_dynamic_buffer_size(#{dynamic_buffer := {LowDynamicBuffer, _}}) -> + LowDynamicBuffer; +init_dynamic_buffer_size(_) -> + false. + +maybe_resize_buffer(State=#state{dynamic_buffer_size=false}, _) -> + State; +maybe_resize_buffer(State=#state{transport=Transport, socket=Socket, + opts=#{dynamic_buffer := {LowDynamicBuffer, HighDynamicBuffer}}, + dynamic_buffer_size=BufferSize0, dynamic_buffer_moving_average=MovingAvg0}, Data) -> + DataLen = byte_size(Data), + MovingAvg = (MovingAvg0 + DataLen) div 2, + if + BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 -> + BufferSize = min(BufferSize0 * 2, HighDynamicBuffer), + ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])), + State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize}; + BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 -> + BufferSize = max(BufferSize0 div 2, LowDynamicBuffer), + ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])), + State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize}; + true -> + State#state{dynamic_buffer_moving_average=MovingAvg} + end. diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 7c62b13..de268a6 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -28,6 +28,9 @@ compress_buffering => boolean(), compress_threshold => non_neg_integer(), connection_type => worker | supervisor, + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), env => cowboy_middleware:env(), http10_keepalive => boolean(), idle_timeout => timeout(), @@ -137,6 +140,10 @@ %% Flow requested for the current stream. flow = infinity :: non_neg_integer() | infinity, + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size :: pos_integer() | false, + dynamic_buffer_moving_average :: non_neg_integer(), + %% Identifier for the stream currently being written. %% Note that out_streamid =< in_streamid. out_streamid = 1 :: pos_integer(), @@ -181,12 +188,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + dynamic_buffer_size=init_dynamic_buffer_size(Opts), + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), last_streamid=maps:get(max_keepalive, Opts, 1000)}, safe_setopts_active(State), loop(set_timeout(State, request_timeout)). +-include("cowboy_dynamic_buffer.hrl"). + setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). safe_setopts_active(State) -> @@ -220,11 +231,13 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, receive %% Discard data coming in after the last request %% we want to process was received fully. - {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID -> - loop(State); + {OK, Socket, Data} when OK =:= element(1, Messages), InStreamID > LastStreamID -> + State1 = maybe_resize_buffer(State, Data), + loop(State1); %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(<< Buffer/binary, Data/binary >>, State); + State1 = maybe_resize_buffer(State, Data), + parse(<< Buffer/binary, Data/binary >>, State1); {Closed, Socket} when Closed =:= element(2, Messages) -> terminate(State, {socket_error, closed, 'The socket has been closed.'}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> @@ -885,12 +898,12 @@ is_http2_upgrade(_, _) -> %% Prior knowledge upgrade, without an HTTP/1.1 request. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) -> + proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert}, Buffer) -> case Transport:secure() of false -> _ = cancel_timeout(State), - cowboy_http2:init(Parent, Ref, Socket, Transport, - ProxyHeader, Opts, Peer, Sock, Cert, Buffer); + cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader, + opts_for_upgrade(State), Peer, Sock, Cert, Buffer); true -> error_terminate(400, State, {connection_error, protocol_error, 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'}) @@ -898,7 +911,7 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran %% Upgrade via an HTTP/1.1 request. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, + proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert}, Buffer, HTTP2Settings, Req) -> %% @todo %% However if the client sent a body, we need to read the body in full @@ -907,13 +920,22 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran try cow_http_hd:parse_http2_settings(HTTP2Settings) of Settings -> _ = cancel_timeout(State), - cowboy_http2:init(Parent, Ref, Socket, Transport, - ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req) + cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader, + opts_for_upgrade(State), Peer, Sock, Cert, Buffer, Settings, Req) catch _:_ -> error_terminate(400, State, {connection_error, protocol_error, 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'}) end. +opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=false}) -> + Opts; +opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=Size, + dynamic_buffer_moving_average=MovingAvg}) -> + Opts#{ + dynamic_buffer_initial_average => MovingAvg, + dynamic_buffer_initial_size => Size + }. + %% Request body parsing. parse_body(Buffer, State=#state{in_streamid=StreamID, in_state= @@ -1210,7 +1232,7 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams, out_ commands(State, StreamID, Tail); %% Protocol takeover. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport, - out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID, + out_state=OutState, buffer=Buffer, children=Children}, StreamID, [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) -> %% @todo If there's streams opened after this one, fail instead of 101. State1 = cancel_timeout(State0), @@ -1234,7 +1256,8 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor %% Turn off the trap_exit process flag %% since this process will no longer be a supervisor. process_flag(trap_exit, false), - Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState); + Protocol:takeover(Parent, Ref, Socket, Transport, + opts_for_upgrade(State), Buffer, InitialState); %% Set options dynamically. commands(State0=#state{overriden_opts=Opts}, StreamID, [{set_options, SetOpts}|Tail]) -> diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 0e110cd..a6ffca7 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -29,6 +29,9 @@ connection_type => worker | supervisor, connection_window_margin_size => 0..16#7fffffff, connection_window_update_threshold => 0..16#7fffffff, + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), enable_connect_protocol => boolean(), env => cowboy_middleware:env(), goaway_initial_timeout => timeout(), @@ -133,6 +136,10 @@ %% Flow requested for all streams. flow = 0 :: non_neg_integer(), + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size :: pos_integer() | false, + dynamic_buffer_moving_average :: non_neg_integer(), + %% Currently active HTTP/2 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. streams = #{} :: #{cow_http2:streamid() => #stream{}}, @@ -169,12 +176,15 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()}, binary() | undefined, binary()) -> ok. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) -> + DynamicBuffer = init_dynamic_buffer_size(Opts), {ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts), %% Send the preface before doing all the init in case we get a socket error. ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)), State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + dynamic_buffer_size=DynamicBuffer, + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), http2_status=sequence, http2_machine=HTTP2Machine}), 0), safe_setopts_active(State), case Buffer of @@ -216,12 +226,15 @@ add_period(Time, Period) -> Time + Period. binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer, _Settings, Req=#{method := Method}) -> + DynamicBuffer = init_dynamic_buffer_size(Opts), {ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts), {ok, StreamID, HTTP2Machine} = cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0), State0 = #state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + dynamic_buffer_size=DynamicBuffer, + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), http2_status=upgrade, http2_machine=HTTP2Machine}, State1 = headers_frame(State0#state{ http2_machine=HTTP2Machine}, StreamID, Req), @@ -241,11 +254,14 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer _ -> parse(State, Buffer) end. +-include("cowboy_dynamic_buffer.hrl"). + %% Because HTTP/2 has flow control and Cowboy has other rate limiting %% mechanisms implemented, a very large active_n value should be fine, %% as long as the stream handlers do their work in a timely manner. +%% However large active_n values reduce the impact of dynamic_buffer. setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). safe_setopts_active(State) -> @@ -258,7 +274,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, receive %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(State#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>); + State1 = maybe_resize_buffer(State, Data), + parse(State1#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>); {Closed, Socket} when Closed =:= element(2, Messages) -> Reason = case State#state.http2_status of closing -> {stop, closed, 'The client is going away.'}; diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index 8d3ed08..d2eb99f 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -69,6 +69,9 @@ active_n => pos_integer(), compress => boolean(), deflate_opts => cow_ws:deflate_opts(), + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), idle_timeout => timeout(), max_frame_size => non_neg_integer() | infinity, req_filter => fun((cowboy_req:req()) -> map()), @@ -97,6 +100,11 @@ timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS, messages = undefined :: undefined | {atom(), atom(), atom()} | {atom(), atom(), atom(), atom()}, + + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size = false :: pos_integer() | false, + dynamic_buffer_moving_average = 0 :: non_neg_integer(), + hibernate = false :: boolean(), frag_state = undefined :: cow_ws:frag_state(), frag_buffer = <<>> :: binary(), @@ -270,7 +278,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, %% @todo We don't want date and server headers. Headers = cowboy_req:response_headers(#{}, Req), Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}}, - takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>, + takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>, {State, HandlerState}). %% Connection process. @@ -295,8 +303,8 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, -spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()}, module() | undefined, any(), binary(), {#state{}, any()}) -> no_return(). -takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, - {State0=#state{handler=Handler, req=Req}, HandlerState}) -> +takeover(Parent, Ref, Socket, Transport, Opts, Buffer, + {State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) -> case Req of #{version := 'HTTP/3'} -> ok; %% @todo We should have an option to disable this behavior. @@ -308,7 +316,11 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, end, State = set_idle_timeout(State0#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - key=undefined, messages=Messages}, 0), + opts=WsOpts#{dynamic_buffer => maps:get(dynamic_buffer, Opts, false)}, + key=undefined, messages=Messages, + %% Dynamic buffer only applies to HTTP/1.1 Websocket. + dynamic_buffer_size=init_dynamic_buffer_size(Opts), + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0)}, 0), %% We call parse_header/3 immediately because there might be %% some data in the buffer that was sent along with the handshake. %% While it is not allowed by the protocol to send frames immediately, @@ -319,6 +331,12 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, false -> after_init(State, HandlerState, #ps_header{buffer=Buffer}) end. +-include("cowboy_dynamic_buffer.hrl"). + +%% @todo Implement early socket error detection. +maybe_socket_error(_, _) -> + ok. + after_init(State=#state{active=true}, HandlerState, ParseState) -> %% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2. %% We must do this only after calling websocket_init/1 (if any) @@ -340,7 +358,7 @@ after_init(State, HandlerState, ParseState) -> setopts_active(#state{transport=undefined}) -> ok; setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) -> @@ -414,7 +432,8 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages, receive %% Socket messages. (HTTP/1.1) {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(?reset_idle_timeout(State), HandlerState, ParseState, Data); + State1 = maybe_resize_buffer(State, Data), + parse(?reset_idle_timeout(State1), HandlerState, ParseState, Data); {Closed, Socket} when Closed =:= element(2, Messages) -> terminate(State, HandlerState, {error, closed}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> diff --git a/test/handlers/read_body_h.erl b/test/handlers/read_body_h.erl new file mode 100644 index 0000000..a0de3b3 --- /dev/null +++ b/test/handlers/read_body_h.erl @@ -0,0 +1,15 @@ +%% This module reads the request body fully and send a 204 response. + +-module(read_body_h). + +-export([init/2]). + +init(Req0, Opts) -> + {ok, Req} = read_body(Req0), + {ok, cowboy_req:reply(200, #{}, Req), Opts}. + +read_body(Req0) -> + case cowboy_req:read_body(Req0) of + {ok, _, Req} -> {ok, Req}; + {more, _, Req} -> read_body(Req) + end. diff --git a/test/handlers/ws_ignore.erl b/test/handlers/ws_ignore.erl new file mode 100644 index 0000000..9fe3322 --- /dev/null +++ b/test/handlers/ws_ignore.erl @@ -0,0 +1,20 @@ +%% Feel free to use, reuse and abuse the code in this file. + +-module(ws_ignore). + +-export([init/2]). +-export([websocket_handle/2]). +-export([websocket_info/2]). + +init(Req, _) -> + {cowboy_websocket, Req, undefined, #{ + compress => true + }}. + +websocket_handle({text, <<"CHECK">>}, State) -> + {[{text, <<"CHECK">>}], State}; +websocket_handle(_Frame, State) -> + {[], State}. + +websocket_info(_Info, State) -> + {[], State}. diff --git a/test/http_perf_SUITE.erl b/test/http_perf_SUITE.erl index 8702b9d..161f2b5 100644 --- a/test/http_perf_SUITE.erl +++ b/test/http_perf_SUITE.erl @@ -32,7 +32,7 @@ groups() -> init_per_suite(Config) -> do_log("", []), %% Optionally enable `perf` for the current node. -% spawn(fun() -> ct:pal(os:cmd("perf record -g -F 9999 -o /tmp/ws_perf.data -p " ++ os:getpid() ++ " -- sleep 60")) end), +% spawn(fun() -> ct:pal(os:cmd("perf record -g -F 9999 -o /tmp/http_perf.data -p " ++ os:getpid() ++ " -- sleep 60")) end), Config. end_per_suite(_) -> @@ -43,7 +43,16 @@ init_per_group(Name, Config) -> %% HTTP/1.1 max_keepalive => infinity, %% HTTP/2 - max_received_frame_rate => {10_000_000, 1} + %% @todo Must configure Gun for performance too. + connection_window_margin_size => 64*1024, + enable_connect_protocol => true, + env => #{dispatch => init_dispatch(Config)}, + max_frame_size_sent => 64*1024, + max_frame_size_received => 16384 * 1024 - 1, + max_received_frame_rate => {10_000_000, 1}, + stream_window_data_threshold => 1024, + stream_window_margin_size => 64*1024 + })]. end_per_group(Name, _) -> @@ -54,10 +63,19 @@ end_per_group(Name, _) -> init_dispatch(_) -> cowboy_router:compile([{'_', [ - {"/", hello_h, []} + {"/", hello_h, []}, + {"/read_body", read_body_h, []} ]}]). -%% Tests. +%% Tests: Hello world. + +plain_h_hello_1(Config) -> + doc("Plain HTTP handler Hello World; 10K requests per 1 client."), + do_bench_get(?FUNCTION_NAME, "/", #{}, 1, 10000, Config). + +plain_h_hello_10(Config) -> + doc("Plain HTTP handler Hello World; 10K requests per 10 clients."), + do_bench_get(?FUNCTION_NAME, "/", #{}, 10, 10000, Config). stream_h_hello_1(Config) -> doc("Stream handler Hello World; 10K requests per 1 client."), @@ -81,51 +99,86 @@ do_stream_h_hello(Config, NumClients) -> do_bench_get(?FUNCTION_NAME, "/", #{}, NumClients, 10000, Config), ranch:set_protocol_options(Ref, ProtoOpts). -plain_h_hello_1(Config) -> - doc("Plain HTTP handler Hello World; 10K requests per 1 client."), - do_bench_get(?FUNCTION_NAME, "/", #{}, 1, 10000, Config). +%% Tests: Large body upload. -plain_h_hello_10(Config) -> - doc("Plain HTTP handler Hello World; 10K requests per 10 clients."), - do_bench_get(?FUNCTION_NAME, "/", #{}, 10, 10000, Config). +plain_h_1M_post_1(Config) -> + doc("Plain HTTP handler body reading; 100 requests per 1 client."), + do_bench_post(?FUNCTION_NAME, "/read_body", #{}, <<0:8_000_000>>, 1, 10000, Config). + +plain_h_1M_post_10(Config) -> + doc("Plain HTTP handler body reading; 100 requests per 10 clients."), + do_bench_post(?FUNCTION_NAME, "/read_body", #{}, <<0:8_000_000>>, 10, 10000, Config). %% Internal. do_bench_get(What, Path, Headers, NumClients, NumRuns, Config) -> - Clients = [spawn_link(?MODULE, do_bench_proc, [self(), What, Path, Headers, NumRuns, Config]) + Clients = [spawn_link(?MODULE, do_bench_get_proc, + [self(), What, Path, Headers, NumRuns, Config]) || _ <- lists:seq(1, NumClients)], _ = [receive {What, ready} -> ok end || _ <- Clients], - {Time, _} = timer:tc(?MODULE, do_bench_get1, [What, Clients]), + {Time, _} = timer:tc(?MODULE, do_bench_wait, [What, Clients]), do_log("~32s: ~8bµs ~8.1freqs/s", [ [atom_to_list(config(group, Config)), $., atom_to_list(What)], Time, (NumClients * NumRuns) / Time * 1_000_000]), ok. -do_bench_get1(What, Clients) -> - _ = [ClientPid ! {What, go} || ClientPid <- Clients], - _ = [receive {What, done} -> ok end || _ <- Clients], +do_bench_get_proc(Parent, What, Path, Headers0, NumRuns, Config) -> + ConnPid = gun_open(Config), + Headers = Headers0#{<<"accept-encoding">> => <<"gzip">>}, + Parent ! {What, ready}, + receive {What, go} -> ok end, + do_bench_get_run(ConnPid, Path, Headers, NumRuns), + Parent ! {What, done}, + gun:close(ConnPid). + +do_bench_get_run(_, _, _, 0) -> + ok; +do_bench_get_run(ConnPid, Path, Headers, Num) -> + Ref = gun:request(ConnPid, <<"GET">>, Path, Headers, <<>>), + {response, IsFin, 200, _RespHeaders} = gun:await(ConnPid, Ref, infinity), + {ok, _} = case IsFin of + nofin -> gun:await_body(ConnPid, Ref, infinity); + fin -> {ok, <<>>} + end, + do_bench_get_run(ConnPid, Path, Headers, Num - 1). + +do_bench_post(What, Path, Headers, Body, NumClients, NumRuns, Config) -> + Clients = [spawn_link(?MODULE, do_bench_post_proc, + [self(), What, Path, Headers, Body, NumRuns, Config]) + || _ <- lists:seq(1, NumClients)], + _ = [receive {What, ready} -> ok end || _ <- Clients], + {Time, _} = timer:tc(?MODULE, do_bench_wait, [What, Clients]), + do_log("~32s: ~8bµs ~8.1freqs/s", [ + [atom_to_list(config(group, Config)), $., atom_to_list(What)], + Time, + (NumClients * NumRuns) / Time * 1_000_000]), ok. -do_bench_proc(Parent, What, Path, Headers0, NumRuns, Config) -> +do_bench_post_proc(Parent, What, Path, Headers0, Body, NumRuns, Config) -> ConnPid = gun_open(Config), Headers = Headers0#{<<"accept-encoding">> => <<"gzip">>}, Parent ! {What, ready}, receive {What, go} -> ok end, - do_bench_run(ConnPid, Path, Headers, NumRuns), + do_bench_post_run(ConnPid, Path, Headers, Body, NumRuns), Parent ! {What, done}, gun:close(ConnPid). -do_bench_run(_, _, _, 0) -> +do_bench_post_run(_, _, _, _, 0) -> ok; -do_bench_run(ConnPid, Path, Headers, Num) -> - Ref = gun:request(ConnPid, <<"GET">>, Path, Headers, <<>>), +do_bench_post_run(ConnPid, Path, Headers, Body, Num) -> + Ref = gun:request(ConnPid, <<"POST">>, Path, Headers, Body), {response, IsFin, 200, _RespHeaders} = gun:await(ConnPid, Ref, infinity), {ok, _} = case IsFin of nofin -> gun:await_body(ConnPid, Ref, infinity); fin -> {ok, <<>>} end, - do_bench_run(ConnPid, Path, Headers, Num - 1). + do_bench_post_run(ConnPid, Path, Headers, Body, Num - 1). + +do_bench_wait(What, Clients) -> + _ = [ClientPid ! {What, go} || ClientPid <- Clients], + _ = [receive {What, done} -> ok end || _ <- Clients], + ok. do_log(Str, Args) -> ct:log(Str, Args), diff --git a/test/ws_perf_SUITE.erl b/test/ws_perf_SUITE.erl index 2fd2db2..2d7a1c0 100644 --- a/test/ws_perf_SUITE.erl +++ b/test/ws_perf_SUITE.erl @@ -60,6 +60,7 @@ init_per_group(Name, Config) when Name =:= h2c; Name =:= h2c_compress -> env => #{dispatch => init_dispatch(Config)}, max_frame_size_sent => 64*1024, max_frame_size_received => 16384 * 1024 - 1, + max_received_frame_rate => {10_000_000, 1}, stream_window_data_threshold => 1024, stream_window_margin_size => 64*1024 }, [{flavor, Flavor}|Config]), @@ -102,13 +103,14 @@ end_per_group(Name, _Config) -> init_dispatch(_Config) -> cowboy_router:compile([ {"localhost", [ - {"/ws_echo", ws_echo, []} + {"/ws_echo", ws_echo, []}, + {"/ws_ignore", ws_ignore, []} ]} ]). %% Support functions for testing using Gun. -do_gun_open_ws(Config) -> +do_gun_open_ws(Path, Config) -> ConnPid = gun_open(Config, #{ http2_opts => #{ connection_window_margin_size => 64*1024, @@ -127,7 +129,7 @@ do_gun_open_ws(Config) -> {notify, settings_changed, #{enable_connect_protocol := true}} = gun:await(ConnPid, undefined) %% @todo Maybe have a gun:await/1? end, - StreamRef = gun:ws_upgrade(ConnPid, "/ws_echo"), + StreamRef = gun:ws_upgrade(ConnPid, Path), receive {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} -> {ok, ConnPid, StreamRef}; @@ -149,72 +151,140 @@ receive_ws(ConnPid, StreamRef) -> %% Tests. -one_00064KiB(Config) -> +echo_1_00064KiB(Config) -> doc("Send and receive a 64KiB frame."), - do_full(Config, one, 1, 64 * 1024). + do_echo(Config, echo_1, 1, 64 * 1024). -one_00256KiB(Config) -> +echo_1_00256KiB(Config) -> doc("Send and receive a 256KiB frame."), - do_full(Config, one, 1, 256 * 1024). + do_echo(Config, echo_1, 1, 256 * 1024). -one_01024KiB(Config) -> +echo_1_01024KiB(Config) -> doc("Send and receive a 1024KiB frame."), - do_full(Config, one, 1, 1024 * 1024). + do_echo(Config, echo_1, 1, 1024 * 1024). -one_04096KiB(Config) -> +echo_1_04096KiB(Config) -> doc("Send and receive a 4096KiB frame."), - do_full(Config, one, 1, 4096 * 1024). + do_echo(Config, echo_1, 1, 4096 * 1024). %% Minus one because frames can only get so big. -one_16384KiB(Config) -> +echo_1_16384KiB(Config) -> doc("Send and receive a 16384KiB - 1 frame."), - do_full(Config, one, 1, 16384 * 1024 - 1). + do_echo(Config, echo_1, 1, 16384 * 1024 - 1). -repeat_00000B(Config) -> +echo_N_00000B(Config) -> doc("Send and receive a 0B frame 1000 times."), - do_full(Config, repeat, 1000, 0). + do_echo(Config, echo_N, 1000, 0). -repeat_00256B(Config) -> +echo_N_00256B(Config) -> doc("Send and receive a 256B frame 1000 times."), - do_full(Config, repeat, 1000, 256). + do_echo(Config, echo_N, 1000, 256). -repeat_01024B(Config) -> +echo_N_01024B(Config) -> doc("Send and receive a 1024B frame 1000 times."), - do_full(Config, repeat, 1000, 1024). + do_echo(Config, echo_N, 1000, 1024). -repeat_04096B(Config) -> +echo_N_04096B(Config) -> doc("Send and receive a 4096B frame 1000 times."), - do_full(Config, repeat, 1000, 4096). + do_echo(Config, echo_N, 1000, 4096). -repeat_16384B(Config) -> +echo_N_16384B(Config) -> doc("Send and receive a 16384B frame 1000 times."), - do_full(Config, repeat, 1000, 16384). + do_echo(Config, echo_N, 1000, 16384). -%repeat_16384B_10K(Config) -> +%echo_N_16384B_10K(Config) -> % doc("Send and receive a 16384B frame 10000 times."), -% do_full(Config, repeat, 10000, 16384). +% do_echo(Config, echo_N, 10000, 16384). -do_full(Config, What, Num, FrameSize) -> - {ok, ConnPid, StreamRef} = do_gun_open_ws(Config), +do_echo(Config, What, Num, FrameSize) -> + {ok, ConnPid, StreamRef} = do_gun_open_ws("/ws_echo", Config), FrameType = config(frame_type, Config), FrameData = case FrameType of text -> do_text_data(Config, FrameSize); binary -> rand:bytes(FrameSize) end, %% Heat up the processes before doing the real run. -% do_full1(ConnPid, StreamRef, Num, FrameType, FrameData), - {Time, _} = timer:tc(?MODULE, do_full1, [ConnPid, StreamRef, Num, FrameType, FrameData]), +% do_echo_loop(ConnPid, StreamRef, Num, FrameType, FrameData), + {Time, _} = timer:tc(?MODULE, do_echo_loop, [ConnPid, StreamRef, Num, FrameType, FrameData]), do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]), gun:ws_send(ConnPid, StreamRef, close), {ok, close} = receive_ws(ConnPid, StreamRef), gun_down(ConnPid). -do_full1(_, _, 0, _, _) -> +do_echo_loop(_, _, 0, _, _) -> ok; -do_full1(ConnPid, StreamRef, Num, FrameType, FrameData) -> +do_echo_loop(ConnPid, StreamRef, Num, FrameType, FrameData) -> gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}), {ok, {FrameType, FrameData}} = receive_ws(ConnPid, StreamRef), - do_full1(ConnPid, StreamRef, Num - 1, FrameType, FrameData). + do_echo_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData). + +send_1_00064KiB(Config) -> + doc("Send a 64KiB frame."), + do_send(Config, send_1, 1, 64 * 1024). + +send_1_00256KiB(Config) -> + doc("Send a 256KiB frame."), + do_send(Config, send_1, 1, 256 * 1024). + +send_1_01024KiB(Config) -> + doc("Send a 1024KiB frame."), + do_send(Config, send_1, 1, 1024 * 1024). + +send_1_04096KiB(Config) -> + doc("Send a 4096KiB frame."), + do_send(Config, send_1, 1, 4096 * 1024). + +%% Minus one because frames can only get so big. +send_1_16384KiB(Config) -> + doc("Send a 16384KiB - 1 frame."), + do_send(Config, send_1, 1, 16384 * 1024 - 1). + +send_N_00000B(Config) -> + doc("Send a 0B frame 10000 times."), + do_send(Config, send_N, 10000, 0). + +send_N_00256B(Config) -> + doc("Send a 256B frame 10000 times."), + do_send(Config, send_N, 10000, 256). + +send_N_01024B(Config) -> + doc("Send a 1024B frame 10000 times."), + do_send(Config, send_N, 10000, 1024). + +send_N_04096B(Config) -> + doc("Send a 4096B frame 10000 times."), + do_send(Config, send_N, 10000, 4096). + +send_N_16384B(Config) -> + doc("Send a 16384B frame 10000 times."), + do_send(Config, send_N, 10000, 16384). + +%send_N_16384B_10K(Config) -> +% doc("Send and receive a 16384B frame 10000 times."), +% do_send(Config, send_N, 10000, 16384). + +do_send(Config, What, Num, FrameSize) -> + {ok, ConnPid, StreamRef} = do_gun_open_ws("/ws_ignore", Config), + FrameType = config(frame_type, Config), + FrameData = case FrameType of + text -> do_text_data(Config, FrameSize); + binary -> rand:bytes(FrameSize) + end, + %% Heat up the processes before doing the real run. +% do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData), + {Time, _} = timer:tc(?MODULE, do_send_loop, [ConnPid, StreamRef, Num, FrameType, FrameData]), + do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]), + gun:ws_send(ConnPid, StreamRef, close), + {ok, close} = receive_ws(ConnPid, StreamRef), + gun_down(ConnPid). + +do_send_loop(ConnPid, StreamRef, 0, _, _) -> + gun:ws_send(ConnPid, StreamRef, {text, <<"CHECK">>}), + {ok, {text, <<"CHECK">>}} = receive_ws(ConnPid, StreamRef), + ok; +do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData) -> + gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}), + do_send_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData). %% Internal. |