aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cowboy.erl43
-rw-r--r--src/cowboy_dynamic_buffer.hrl80
-rw-r--r--src/cowboy_http.erl47
-rw-r--r--src/cowboy_http2.erl21
-rw-r--r--src/cowboy_websocket.erl31
5 files changed, 194 insertions, 28 deletions
diff --git a/src/cowboy.erl b/src/cowboy.erl
index e5ed831..c685649 100644
--- a/src/cowboy.erl
+++ b/src/cowboy.erl
@@ -51,8 +51,12 @@
start_clear(Ref, TransOpts0, ProtoOpts0) ->
TransOpts1 = ranch:normalize_opts(TransOpts0),
- {TransOpts, ConnectionType} = ensure_connection_type(TransOpts1),
- ProtoOpts = ProtoOpts0#{connection_type => ConnectionType},
+ {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0),
+ {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2),
+ ProtoOpts = ProtoOpts0#{
+ connection_type => ConnectionType,
+ dynamic_buffer => DynamicBuffer
+ },
ranch:start_listener(Ref, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts).
-spec start_tls(ranch:ref(), ranch:opts(), opts())
@@ -60,12 +64,13 @@ start_clear(Ref, TransOpts0, ProtoOpts0) ->
start_tls(Ref, TransOpts0, ProtoOpts0) ->
TransOpts1 = ranch:normalize_opts(TransOpts0),
- SocketOpts = maps:get(socket_opts, TransOpts1, []),
- TransOpts2 = TransOpts1#{socket_opts => [
- {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}
- |SocketOpts]},
- {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2),
- ProtoOpts = ProtoOpts0#{connection_type => ConnectionType},
+ {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0),
+ TransOpts3 = ensure_alpn(TransOpts2),
+ {TransOpts, ConnectionType} = ensure_connection_type(TransOpts3),
+ ProtoOpts = ProtoOpts0#{
+ connection_type => ConnectionType,
+ dynamic_buffer => DynamicBuffer
+ },
ranch:start_listener(Ref, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts).
%% @todo Experimental function to start a barebone QUIC listener.
@@ -77,6 +82,7 @@ start_tls(Ref, TransOpts0, ProtoOpts0) ->
-spec start_quic(ranch:ref(), #{socket_opts => [{atom(), _}]}, cowboy_http3:opts())
-> {ok, pid()}.
+%% @todo Implement dynamic_buffer for HTTP/3 if/when it applies.
start_quic(Ref, TransOpts, ProtoOpts) ->
{ok, _} = application:ensure_all_started(quicer),
Parent = self(),
@@ -139,11 +145,32 @@ port_0() ->
end,
Port.
+ensure_alpn(TransOpts) ->
+ SocketOpts = maps:get(socket_opts, TransOpts, []),
+ TransOpts#{socket_opts => [
+ {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}
+ |SocketOpts]}.
+
ensure_connection_type(TransOpts=#{connection_type := ConnectionType}) ->
{TransOpts, ConnectionType};
ensure_connection_type(TransOpts) ->
{TransOpts#{connection_type => supervisor}, supervisor}.
+%% Dynamic buffer was set; accept transport options as-is.
+%% Note that initial 'buffer' size may be lower than dynamic buffer allows.
+ensure_dynamic_buffer(TransOpts, #{dynamic_buffer := DynamicBuffer}) ->
+ {TransOpts, DynamicBuffer};
+%% Dynamic buffer was not set; define default dynamic buffer
+%% only if 'buffer' size was not configured. In that case we
+%% set the 'buffer' size to the lowest value.
+ensure_dynamic_buffer(TransOpts=#{socket_opts := SocketOpts}, _) ->
+ case proplists:get_value(buffer, SocketOpts, undefined) of
+ undefined ->
+ {TransOpts#{socket_opts => [{buffer, 8192}|SocketOpts]}, {8192, 131072}};
+ _ ->
+ {TransOpts, false}
+ end.
+
-spec stop_listener(ranch:ref()) -> ok | {error, not_found}.
stop_listener(Ref) ->
diff --git a/src/cowboy_dynamic_buffer.hrl b/src/cowboy_dynamic_buffer.hrl
new file mode 100644
index 0000000..cb07aab
--- /dev/null
+++ b/src/cowboy_dynamic_buffer.hrl
@@ -0,0 +1,80 @@
+%% Copyright (c) 2025, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% These functions are common to cowboy_http, cowboy_http2 and
+%% cowboy_websocket. It requires the options and the state
+%% to use the same field names.
+
+%% Experiments have shown that the size of the 'buffer' can greatly
+%% impact performance: a buffer too small leads to more messages
+%% being handled and typically more binary appends; and a buffer
+%% too large results in inefficient use of memory which in turn
+%% reduces the throughput, presumably because large binary appends
+%% are not as efficient as smaller ones, and because while the
+%% buffer gets allocated only when there is data, the allocated
+%% size remains until the binary is GC and so under-use hurts.
+%%
+%% The performance of a given 'buffer' size will also depend on
+%% how the client is sending data, and on the protocol. For example,
+%% HTTP/1.1 doesn't need a very large 'buffer' size for reading
+%% request headers, but it does need one for reading large request
+%% bodies. At the same time, HTTP/2 performs best reading large
+%% request bodies when the 'buffer' size is about half that of
+%% HTTP/1.1.
+%%
+%% It therefore becomes important to resize the buffer dynamically
+%% depending on what is currently going on. We do this based on
+%% the size of data packets we received from the transport. We
+%% maintain a moving average and when that moving average is
+%% 90% of the current 'buffer' size, we double the 'buffer' size.
+%% When things slow down and the moving average falls below
+%% 40% of the current 'buffer' size, we halve the 'buffer' size.
+%%
+%% To calculate the moving average we do (MovAvg + DataLen) div 2.
+%% This means that the moving average will change very quickly when
+%% DataLen increases or decreases rapidly. That's OK, we want to
+%% be reactive, but also setting the buffer size is a pretty fast
+%% operation. The formula could be changed to the following if it
+%% became a problem: (MovAvg * N + DataLen) div (N + 1).
+%%
+%% Note that this works best when active,N uses low values of N.
+%% We don't want to accumulate too much data because we resize
+%% the buffer.
+
+init_dynamic_buffer_size(#{dynamic_buffer_initial_size := DynamicBuffer}) ->
+ DynamicBuffer;
+init_dynamic_buffer_size(#{dynamic_buffer := {LowDynamicBuffer, _}}) ->
+ LowDynamicBuffer;
+init_dynamic_buffer_size(_) ->
+ false.
+
+maybe_resize_buffer(State=#state{dynamic_buffer_size=false}, _) ->
+ State;
+maybe_resize_buffer(State=#state{transport=Transport, socket=Socket,
+ opts=#{dynamic_buffer := {LowDynamicBuffer, HighDynamicBuffer}},
+ dynamic_buffer_size=BufferSize0, dynamic_buffer_moving_average=MovingAvg0}, Data) ->
+ DataLen = byte_size(Data),
+ MovingAvg = (MovingAvg0 + DataLen) div 2,
+ if
+ BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
+ BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
+ ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])),
+ State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize};
+ BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
+ BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
+ ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])),
+ State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize};
+ true ->
+ State#state{dynamic_buffer_moving_average=MovingAvg}
+ end.
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index 7c62b13..de268a6 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -28,6 +28,9 @@
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
connection_type => worker | supervisor,
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ dynamic_buffer_initial_average => non_neg_integer(),
+ dynamic_buffer_initial_size => pos_integer(),
env => cowboy_middleware:env(),
http10_keepalive => boolean(),
idle_timeout => timeout(),
@@ -137,6 +140,10 @@
%% Flow requested for the current stream.
flow = infinity :: non_neg_integer() | infinity,
+ %% Dynamic buffer moving average and current buffer size.
+ dynamic_buffer_size :: pos_integer() | false,
+ dynamic_buffer_moving_average :: non_neg_integer(),
+
%% Identifier for the stream currently being written.
%% Note that out_streamid =< in_streamid.
out_streamid = 1 :: pos_integer(),
@@ -181,12 +188,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
peer=Peer, sock=Sock, cert=Cert,
+ dynamic_buffer_size=init_dynamic_buffer_size(Opts),
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
last_streamid=maps:get(max_keepalive, Opts, 1000)},
safe_setopts_active(State),
loop(set_timeout(State, request_timeout)).
+-include("cowboy_dynamic_buffer.hrl").
+
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
- N = maps:get(active_n, Opts, 100),
+ N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
safe_setopts_active(State) ->
@@ -220,11 +231,13 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
receive
%% Discard data coming in after the last request
%% we want to process was received fully.
- {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
- loop(State);
+ {OK, Socket, Data} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
+ State1 = maybe_resize_buffer(State, Data),
+ loop(State1);
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- parse(<< Buffer/binary, Data/binary >>, State);
+ State1 = maybe_resize_buffer(State, Data),
+ parse(<< Buffer/binary, Data/binary >>, State1);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
@@ -885,12 +898,12 @@ is_http2_upgrade(_, _) ->
%% Prior knowledge upgrade, without an HTTP/1.1 request.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
- proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
+ proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
case Transport:secure() of
false ->
_ = cancel_timeout(State),
- cowboy_http2:init(Parent, Ref, Socket, Transport,
- ProxyHeader, Opts, Peer, Sock, Cert, Buffer);
+ cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader,
+ opts_for_upgrade(State), Peer, Sock, Cert, Buffer);
true ->
error_terminate(400, State, {connection_error, protocol_error,
'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
@@ -898,7 +911,7 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
%% Upgrade via an HTTP/1.1 request.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
- proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert},
+ proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert},
Buffer, HTTP2Settings, Req) ->
%% @todo
%% However if the client sent a body, we need to read the body in full
@@ -907,13 +920,22 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
try cow_http_hd:parse_http2_settings(HTTP2Settings) of
Settings ->
_ = cancel_timeout(State),
- cowboy_http2:init(Parent, Ref, Socket, Transport,
- ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req)
+ cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader,
+ opts_for_upgrade(State), Peer, Sock, Cert, Buffer, Settings, Req)
catch _:_ ->
error_terminate(400, State, {connection_error, protocol_error,
'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
end.
+opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=false}) ->
+ Opts;
+opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=Size,
+ dynamic_buffer_moving_average=MovingAvg}) ->
+ Opts#{
+ dynamic_buffer_initial_average => MovingAvg,
+ dynamic_buffer_initial_size => Size
+ }.
+
%% Request body parsing.
parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
@@ -1210,7 +1232,7 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams, out_
commands(State, StreamID, Tail);
%% Protocol takeover.
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
- out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
+ out_state=OutState, buffer=Buffer, children=Children}, StreamID,
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
%% @todo If there's streams opened after this one, fail instead of 101.
State1 = cancel_timeout(State0),
@@ -1234,7 +1256,8 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
%% Turn off the trap_exit process flag
%% since this process will no longer be a supervisor.
process_flag(trap_exit, false),
- Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState);
+ Protocol:takeover(Parent, Ref, Socket, Transport,
+ opts_for_upgrade(State), Buffer, InitialState);
%% Set options dynamically.
commands(State0=#state{overriden_opts=Opts},
StreamID, [{set_options, SetOpts}|Tail]) ->
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 0e110cd..a6ffca7 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -29,6 +29,9 @@
connection_type => worker | supervisor,
connection_window_margin_size => 0..16#7fffffff,
connection_window_update_threshold => 0..16#7fffffff,
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ dynamic_buffer_initial_average => non_neg_integer(),
+ dynamic_buffer_initial_size => pos_integer(),
enable_connect_protocol => boolean(),
env => cowboy_middleware:env(),
goaway_initial_timeout => timeout(),
@@ -133,6 +136,10 @@
%% Flow requested for all streams.
flow = 0 :: non_neg_integer(),
+ %% Dynamic buffer moving average and current buffer size.
+ dynamic_buffer_size :: pos_integer() | false,
+ dynamic_buffer_moving_average :: non_neg_integer(),
+
%% Currently active HTTP/2 streams. Streams may be initiated either
%% by the client or by the server through PUSH_PROMISE frames.
streams = #{} :: #{cow_http2:streamid() => #stream{}},
@@ -169,12 +176,15 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
{inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
binary() | undefined, binary()) -> ok.
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) ->
+ DynamicBuffer = init_dynamic_buffer_size(Opts),
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts),
%% Send the preface before doing all the init in case we get a socket error.
ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)),
State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader,
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
+ dynamic_buffer_size=DynamicBuffer,
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
http2_status=sequence, http2_machine=HTTP2Machine}), 0),
safe_setopts_active(State),
case Buffer of
@@ -216,12 +226,15 @@ add_period(Time, Period) -> Time + Period.
binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok.
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer,
_Settings, Req=#{method := Method}) ->
+ DynamicBuffer = init_dynamic_buffer_size(Opts),
{ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts),
{ok, StreamID, HTTP2Machine}
= cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0),
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader,
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
+ dynamic_buffer_size=DynamicBuffer,
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
http2_status=upgrade, http2_machine=HTTP2Machine},
State1 = headers_frame(State0#state{
http2_machine=HTTP2Machine}, StreamID, Req),
@@ -241,11 +254,14 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
_ -> parse(State, Buffer)
end.
+-include("cowboy_dynamic_buffer.hrl").
+
%% Because HTTP/2 has flow control and Cowboy has other rate limiting
%% mechanisms implemented, a very large active_n value should be fine,
%% as long as the stream handlers do their work in a timely manner.
+%% However large active_n values reduce the impact of dynamic_buffer.
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
- N = maps:get(active_n, Opts, 100),
+ N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
safe_setopts_active(State) ->
@@ -258,7 +274,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
receive
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- parse(State#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>);
+ State1 = maybe_resize_buffer(State, Data),
+ parse(State1#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>);
{Closed, Socket} when Closed =:= element(2, Messages) ->
Reason = case State#state.http2_status of
closing -> {stop, closed, 'The client is going away.'};
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index 8d3ed08..d2eb99f 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -69,6 +69,9 @@
active_n => pos_integer(),
compress => boolean(),
deflate_opts => cow_ws:deflate_opts(),
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ dynamic_buffer_initial_average => non_neg_integer(),
+ dynamic_buffer_initial_size => pos_integer(),
idle_timeout => timeout(),
max_frame_size => non_neg_integer() | infinity,
req_filter => fun((cowboy_req:req()) -> map()),
@@ -97,6 +100,11 @@
timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS,
messages = undefined :: undefined | {atom(), atom(), atom()}
| {atom(), atom(), atom(), atom()},
+
+ %% Dynamic buffer moving average and current buffer size.
+ dynamic_buffer_size = false :: pos_integer() | false,
+ dynamic_buffer_moving_average = 0 :: non_neg_integer(),
+
hibernate = false :: boolean(),
frag_state = undefined :: cow_ws:frag_state(),
frag_buffer = <<>> :: binary(),
@@ -270,7 +278,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
%% @todo We don't want date and server headers.
Headers = cowboy_req:response_headers(#{}, Req),
Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}},
- takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>,
+ takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>,
{State, HandlerState}).
%% Connection process.
@@ -295,8 +303,8 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
-spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()},
module() | undefined, any(), binary(),
{#state{}, any()}) -> no_return().
-takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
- {State0=#state{handler=Handler, req=Req}, HandlerState}) ->
+takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
+ {State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) ->
case Req of
#{version := 'HTTP/3'} -> ok;
%% @todo We should have an option to disable this behavior.
@@ -308,7 +316,11 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
end,
State = set_idle_timeout(State0#state{parent=Parent,
ref=Ref, socket=Socket, transport=Transport,
- key=undefined, messages=Messages}, 0),
+ opts=WsOpts#{dynamic_buffer => maps:get(dynamic_buffer, Opts, false)},
+ key=undefined, messages=Messages,
+ %% Dynamic buffer only applies to HTTP/1.1 Websocket.
+ dynamic_buffer_size=init_dynamic_buffer_size(Opts),
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0)}, 0),
%% We call parse_header/3 immediately because there might be
%% some data in the buffer that was sent along with the handshake.
%% While it is not allowed by the protocol to send frames immediately,
@@ -319,6 +331,12 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
false -> after_init(State, HandlerState, #ps_header{buffer=Buffer})
end.
+-include("cowboy_dynamic_buffer.hrl").
+
+%% @todo Implement early socket error detection.
+maybe_socket_error(_, _) ->
+ ok.
+
after_init(State=#state{active=true}, HandlerState, ParseState) ->
%% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2.
%% We must do this only after calling websocket_init/1 (if any)
@@ -340,7 +358,7 @@ after_init(State, HandlerState, ParseState) ->
setopts_active(#state{transport=undefined}) ->
ok;
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
- N = maps:get(active_n, Opts, 100),
+ N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
@@ -414,7 +432,8 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
receive
%% Socket messages. (HTTP/1.1)
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
+ State1 = maybe_resize_buffer(State, Data),
+ parse(?reset_idle_timeout(State1), HandlerState, ParseState, Data);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, HandlerState, {error, closed});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->