diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cowboy.erl | 43 | ||||
-rw-r--r-- | src/cowboy_dynamic_buffer.hrl | 80 | ||||
-rw-r--r-- | src/cowboy_http.erl | 47 | ||||
-rw-r--r-- | src/cowboy_http2.erl | 21 | ||||
-rw-r--r-- | src/cowboy_websocket.erl | 31 |
5 files changed, 194 insertions, 28 deletions
diff --git a/src/cowboy.erl b/src/cowboy.erl index e5ed831..c685649 100644 --- a/src/cowboy.erl +++ b/src/cowboy.erl @@ -51,8 +51,12 @@ start_clear(Ref, TransOpts0, ProtoOpts0) -> TransOpts1 = ranch:normalize_opts(TransOpts0), - {TransOpts, ConnectionType} = ensure_connection_type(TransOpts1), - ProtoOpts = ProtoOpts0#{connection_type => ConnectionType}, + {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0), + {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2), + ProtoOpts = ProtoOpts0#{ + connection_type => ConnectionType, + dynamic_buffer => DynamicBuffer + }, ranch:start_listener(Ref, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts). -spec start_tls(ranch:ref(), ranch:opts(), opts()) @@ -60,12 +64,13 @@ start_clear(Ref, TransOpts0, ProtoOpts0) -> start_tls(Ref, TransOpts0, ProtoOpts0) -> TransOpts1 = ranch:normalize_opts(TransOpts0), - SocketOpts = maps:get(socket_opts, TransOpts1, []), - TransOpts2 = TransOpts1#{socket_opts => [ - {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]} - |SocketOpts]}, - {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2), - ProtoOpts = ProtoOpts0#{connection_type => ConnectionType}, + {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0), + TransOpts3 = ensure_alpn(TransOpts2), + {TransOpts, ConnectionType} = ensure_connection_type(TransOpts3), + ProtoOpts = ProtoOpts0#{ + connection_type => ConnectionType, + dynamic_buffer => DynamicBuffer + }, ranch:start_listener(Ref, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts). %% @todo Experimental function to start a barebone QUIC listener. @@ -77,6 +82,7 @@ start_tls(Ref, TransOpts0, ProtoOpts0) -> -spec start_quic(ranch:ref(), #{socket_opts => [{atom(), _}]}, cowboy_http3:opts()) -> {ok, pid()}. +%% @todo Implement dynamic_buffer for HTTP/3 if/when it applies. start_quic(Ref, TransOpts, ProtoOpts) -> {ok, _} = application:ensure_all_started(quicer), Parent = self(), @@ -139,11 +145,32 @@ port_0() -> end, Port. +ensure_alpn(TransOpts) -> + SocketOpts = maps:get(socket_opts, TransOpts, []), + TransOpts#{socket_opts => [ + {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]} + |SocketOpts]}. + ensure_connection_type(TransOpts=#{connection_type := ConnectionType}) -> {TransOpts, ConnectionType}; ensure_connection_type(TransOpts) -> {TransOpts#{connection_type => supervisor}, supervisor}. +%% Dynamic buffer was set; accept transport options as-is. +%% Note that initial 'buffer' size may be lower than dynamic buffer allows. +ensure_dynamic_buffer(TransOpts, #{dynamic_buffer := DynamicBuffer}) -> + {TransOpts, DynamicBuffer}; +%% Dynamic buffer was not set; define default dynamic buffer +%% only if 'buffer' size was not configured. In that case we +%% set the 'buffer' size to the lowest value. +ensure_dynamic_buffer(TransOpts=#{socket_opts := SocketOpts}, _) -> + case proplists:get_value(buffer, SocketOpts, undefined) of + undefined -> + {TransOpts#{socket_opts => [{buffer, 8192}|SocketOpts]}, {8192, 131072}}; + _ -> + {TransOpts, false} + end. + -spec stop_listener(ranch:ref()) -> ok | {error, not_found}. stop_listener(Ref) -> diff --git a/src/cowboy_dynamic_buffer.hrl b/src/cowboy_dynamic_buffer.hrl new file mode 100644 index 0000000..cb07aab --- /dev/null +++ b/src/cowboy_dynamic_buffer.hrl @@ -0,0 +1,80 @@ +%% Copyright (c) 2025, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% These functions are common to cowboy_http, cowboy_http2 and +%% cowboy_websocket. It requires the options and the state +%% to use the same field names. + +%% Experiments have shown that the size of the 'buffer' can greatly +%% impact performance: a buffer too small leads to more messages +%% being handled and typically more binary appends; and a buffer +%% too large results in inefficient use of memory which in turn +%% reduces the throughput, presumably because large binary appends +%% are not as efficient as smaller ones, and because while the +%% buffer gets allocated only when there is data, the allocated +%% size remains until the binary is GC and so under-use hurts. +%% +%% The performance of a given 'buffer' size will also depend on +%% how the client is sending data, and on the protocol. For example, +%% HTTP/1.1 doesn't need a very large 'buffer' size for reading +%% request headers, but it does need one for reading large request +%% bodies. At the same time, HTTP/2 performs best reading large +%% request bodies when the 'buffer' size is about half that of +%% HTTP/1.1. +%% +%% It therefore becomes important to resize the buffer dynamically +%% depending on what is currently going on. We do this based on +%% the size of data packets we received from the transport. We +%% maintain a moving average and when that moving average is +%% 90% of the current 'buffer' size, we double the 'buffer' size. +%% When things slow down and the moving average falls below +%% 40% of the current 'buffer' size, we halve the 'buffer' size. +%% +%% To calculate the moving average we do (MovAvg + DataLen) div 2. +%% This means that the moving average will change very quickly when +%% DataLen increases or decreases rapidly. That's OK, we want to +%% be reactive, but also setting the buffer size is a pretty fast +%% operation. The formula could be changed to the following if it +%% became a problem: (MovAvg * N + DataLen) div (N + 1). +%% +%% Note that this works best when active,N uses low values of N. +%% We don't want to accumulate too much data because we resize +%% the buffer. + +init_dynamic_buffer_size(#{dynamic_buffer_initial_size := DynamicBuffer}) -> + DynamicBuffer; +init_dynamic_buffer_size(#{dynamic_buffer := {LowDynamicBuffer, _}}) -> + LowDynamicBuffer; +init_dynamic_buffer_size(_) -> + false. + +maybe_resize_buffer(State=#state{dynamic_buffer_size=false}, _) -> + State; +maybe_resize_buffer(State=#state{transport=Transport, socket=Socket, + opts=#{dynamic_buffer := {LowDynamicBuffer, HighDynamicBuffer}}, + dynamic_buffer_size=BufferSize0, dynamic_buffer_moving_average=MovingAvg0}, Data) -> + DataLen = byte_size(Data), + MovingAvg = (MovingAvg0 + DataLen) div 2, + if + BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 -> + BufferSize = min(BufferSize0 * 2, HighDynamicBuffer), + ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])), + State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize}; + BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 -> + BufferSize = max(BufferSize0 div 2, LowDynamicBuffer), + ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])), + State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize}; + true -> + State#state{dynamic_buffer_moving_average=MovingAvg} + end. diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 7c62b13..de268a6 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -28,6 +28,9 @@ compress_buffering => boolean(), compress_threshold => non_neg_integer(), connection_type => worker | supervisor, + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), env => cowboy_middleware:env(), http10_keepalive => boolean(), idle_timeout => timeout(), @@ -137,6 +140,10 @@ %% Flow requested for the current stream. flow = infinity :: non_neg_integer() | infinity, + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size :: pos_integer() | false, + dynamic_buffer_moving_average :: non_neg_integer(), + %% Identifier for the stream currently being written. %% Note that out_streamid =< in_streamid. out_streamid = 1 :: pos_integer(), @@ -181,12 +188,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + dynamic_buffer_size=init_dynamic_buffer_size(Opts), + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), last_streamid=maps:get(max_keepalive, Opts, 1000)}, safe_setopts_active(State), loop(set_timeout(State, request_timeout)). +-include("cowboy_dynamic_buffer.hrl"). + setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). safe_setopts_active(State) -> @@ -220,11 +231,13 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, receive %% Discard data coming in after the last request %% we want to process was received fully. - {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID -> - loop(State); + {OK, Socket, Data} when OK =:= element(1, Messages), InStreamID > LastStreamID -> + State1 = maybe_resize_buffer(State, Data), + loop(State1); %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(<< Buffer/binary, Data/binary >>, State); + State1 = maybe_resize_buffer(State, Data), + parse(<< Buffer/binary, Data/binary >>, State1); {Closed, Socket} when Closed =:= element(2, Messages) -> terminate(State, {socket_error, closed, 'The socket has been closed.'}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> @@ -885,12 +898,12 @@ is_http2_upgrade(_, _) -> %% Prior knowledge upgrade, without an HTTP/1.1 request. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) -> + proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert}, Buffer) -> case Transport:secure() of false -> _ = cancel_timeout(State), - cowboy_http2:init(Parent, Ref, Socket, Transport, - ProxyHeader, Opts, Peer, Sock, Cert, Buffer); + cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader, + opts_for_upgrade(State), Peer, Sock, Cert, Buffer); true -> error_terminate(400, State, {connection_error, protocol_error, 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'}) @@ -898,7 +911,7 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran %% Upgrade via an HTTP/1.1 request. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, + proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert}, Buffer, HTTP2Settings, Req) -> %% @todo %% However if the client sent a body, we need to read the body in full @@ -907,13 +920,22 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran try cow_http_hd:parse_http2_settings(HTTP2Settings) of Settings -> _ = cancel_timeout(State), - cowboy_http2:init(Parent, Ref, Socket, Transport, - ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req) + cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader, + opts_for_upgrade(State), Peer, Sock, Cert, Buffer, Settings, Req) catch _:_ -> error_terminate(400, State, {connection_error, protocol_error, 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'}) end. +opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=false}) -> + Opts; +opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=Size, + dynamic_buffer_moving_average=MovingAvg}) -> + Opts#{ + dynamic_buffer_initial_average => MovingAvg, + dynamic_buffer_initial_size => Size + }. + %% Request body parsing. parse_body(Buffer, State=#state{in_streamid=StreamID, in_state= @@ -1210,7 +1232,7 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams, out_ commands(State, StreamID, Tail); %% Protocol takeover. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport, - out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID, + out_state=OutState, buffer=Buffer, children=Children}, StreamID, [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) -> %% @todo If there's streams opened after this one, fail instead of 101. State1 = cancel_timeout(State0), @@ -1234,7 +1256,8 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor %% Turn off the trap_exit process flag %% since this process will no longer be a supervisor. process_flag(trap_exit, false), - Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState); + Protocol:takeover(Parent, Ref, Socket, Transport, + opts_for_upgrade(State), Buffer, InitialState); %% Set options dynamically. commands(State0=#state{overriden_opts=Opts}, StreamID, [{set_options, SetOpts}|Tail]) -> diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 0e110cd..a6ffca7 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -29,6 +29,9 @@ connection_type => worker | supervisor, connection_window_margin_size => 0..16#7fffffff, connection_window_update_threshold => 0..16#7fffffff, + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), enable_connect_protocol => boolean(), env => cowboy_middleware:env(), goaway_initial_timeout => timeout(), @@ -133,6 +136,10 @@ %% Flow requested for all streams. flow = 0 :: non_neg_integer(), + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size :: pos_integer() | false, + dynamic_buffer_moving_average :: non_neg_integer(), + %% Currently active HTTP/2 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. streams = #{} :: #{cow_http2:streamid() => #stream{}}, @@ -169,12 +176,15 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()}, binary() | undefined, binary()) -> ok. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) -> + DynamicBuffer = init_dynamic_buffer_size(Opts), {ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts), %% Send the preface before doing all the init in case we get a socket error. ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)), State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + dynamic_buffer_size=DynamicBuffer, + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), http2_status=sequence, http2_machine=HTTP2Machine}), 0), safe_setopts_active(State), case Buffer of @@ -216,12 +226,15 @@ add_period(Time, Period) -> Time + Period. binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer, _Settings, Req=#{method := Method}) -> + DynamicBuffer = init_dynamic_buffer_size(Opts), {ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts), {ok, StreamID, HTTP2Machine} = cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0), State0 = #state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + dynamic_buffer_size=DynamicBuffer, + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), http2_status=upgrade, http2_machine=HTTP2Machine}, State1 = headers_frame(State0#state{ http2_machine=HTTP2Machine}, StreamID, Req), @@ -241,11 +254,14 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer _ -> parse(State, Buffer) end. +-include("cowboy_dynamic_buffer.hrl"). + %% Because HTTP/2 has flow control and Cowboy has other rate limiting %% mechanisms implemented, a very large active_n value should be fine, %% as long as the stream handlers do their work in a timely manner. +%% However large active_n values reduce the impact of dynamic_buffer. setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). safe_setopts_active(State) -> @@ -258,7 +274,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, receive %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(State#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>); + State1 = maybe_resize_buffer(State, Data), + parse(State1#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>); {Closed, Socket} when Closed =:= element(2, Messages) -> Reason = case State#state.http2_status of closing -> {stop, closed, 'The client is going away.'}; diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index 8d3ed08..d2eb99f 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -69,6 +69,9 @@ active_n => pos_integer(), compress => boolean(), deflate_opts => cow_ws:deflate_opts(), + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), idle_timeout => timeout(), max_frame_size => non_neg_integer() | infinity, req_filter => fun((cowboy_req:req()) -> map()), @@ -97,6 +100,11 @@ timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS, messages = undefined :: undefined | {atom(), atom(), atom()} | {atom(), atom(), atom(), atom()}, + + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size = false :: pos_integer() | false, + dynamic_buffer_moving_average = 0 :: non_neg_integer(), + hibernate = false :: boolean(), frag_state = undefined :: cow_ws:frag_state(), frag_buffer = <<>> :: binary(), @@ -270,7 +278,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, %% @todo We don't want date and server headers. Headers = cowboy_req:response_headers(#{}, Req), Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}}, - takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>, + takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>, {State, HandlerState}). %% Connection process. @@ -295,8 +303,8 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, -spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()}, module() | undefined, any(), binary(), {#state{}, any()}) -> no_return(). -takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, - {State0=#state{handler=Handler, req=Req}, HandlerState}) -> +takeover(Parent, Ref, Socket, Transport, Opts, Buffer, + {State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) -> case Req of #{version := 'HTTP/3'} -> ok; %% @todo We should have an option to disable this behavior. @@ -308,7 +316,11 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, end, State = set_idle_timeout(State0#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - key=undefined, messages=Messages}, 0), + opts=WsOpts#{dynamic_buffer => maps:get(dynamic_buffer, Opts, false)}, + key=undefined, messages=Messages, + %% Dynamic buffer only applies to HTTP/1.1 Websocket. + dynamic_buffer_size=init_dynamic_buffer_size(Opts), + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0)}, 0), %% We call parse_header/3 immediately because there might be %% some data in the buffer that was sent along with the handshake. %% While it is not allowed by the protocol to send frames immediately, @@ -319,6 +331,12 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, false -> after_init(State, HandlerState, #ps_header{buffer=Buffer}) end. +-include("cowboy_dynamic_buffer.hrl"). + +%% @todo Implement early socket error detection. +maybe_socket_error(_, _) -> + ok. + after_init(State=#state{active=true}, HandlerState, ParseState) -> %% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2. %% We must do this only after calling websocket_init/1 (if any) @@ -340,7 +358,7 @@ after_init(State, HandlerState, ParseState) -> setopts_active(#state{transport=undefined}) -> ok; setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) -> @@ -414,7 +432,8 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages, receive %% Socket messages. (HTTP/1.1) {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(?reset_idle_timeout(State), HandlerState, ParseState, Data); + State1 = maybe_resize_buffer(State, Data), + parse(?reset_idle_timeout(State1), HandlerState, ParseState, Data); {Closed, Socket} when Closed =:= element(2, Messages) -> terminate(State, HandlerState, {error, closed}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> |