aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-12-04 11:17:34 +0100
committerLoïc Hoguin <[email protected]>2020-01-06 12:58:14 +0100
commitdb0d6f8d254f2cc01bd458dc41969e0b96991cc3 (patch)
tree5d77236bc703223fcb36e45cbf3de72de1763d50
parent592029070dea7c1f7b85d465e250ef6842e1a46b (diff)
downloadcowboy-db0d6f8d254f2cc01bd458dc41969e0b96991cc3.tar.gz
cowboy-db0d6f8d254f2cc01bd458dc41969e0b96991cc3.tar.bz2
cowboy-db0d6f8d254f2cc01bd458dc41969e0b96991cc3.zip
Use active,N
This reduces the number of times we need to ask for more packets, and as a result we get a fairly large boost in performance, especially with HTTP/1.1. Unfortunately this makes Cowboy require at least Erlang/OTP 21.3+ because the ssl application did not have active,N. For simplicity the version required will be Erlang/OTP 22+. In addition this change improves hibernate handling in cowboy_websocket. Hibernate will now work for HTTP/2 transport as well, and stray or unrelated messages will no longer cancel hibernate (the process will handle the message and go back into hibernation). Thanks go to Stressgrid for benchmarking an early version of this commit: https://stressgrid.com/blog/cowboy_performance_part_2/
-rw-r--r--Makefile4
-rw-r--r--doc/src/guide/introduction.asciidoc2
-rw-r--r--doc/src/manual/cowboy_http.asciidoc10
-rw-r--r--doc/src/manual/cowboy_http2.asciidoc10
-rw-r--r--doc/src/manual/cowboy_websocket.asciidoc12
-rw-r--r--src/cowboy_http.erl117
-rw-r--r--src/cowboy_http2.erl17
-rw-r--r--src/cowboy_stream_h.erl1
-rw-r--r--src/cowboy_websocket.erl100
9 files changed, 208 insertions, 65 deletions
diff --git a/Makefile b/Makefile
index 38a7c81..825c33d 100644
--- a/Makefile
+++ b/Makefile
@@ -29,10 +29,10 @@ dep_gun = git https://github.com/ninenines/gun master
dep_ci.erlang.mk = git https://github.com/ninenines/ci.erlang.mk master
DEP_EARLY_PLUGINS = ci.erlang.mk
-AUTO_CI_OTP ?= OTP-LATEST-20+
+AUTO_CI_OTP ?= OTP-LATEST-22+
AUTO_CI_HIPE ?= OTP-LATEST
# AUTO_CI_ERLLVM ?= OTP-LATEST
-AUTO_CI_WINDOWS ?= OTP-LATEST-20+
+AUTO_CI_WINDOWS ?= OTP-LATEST-22+
# Standard targets.
diff --git a/doc/src/guide/introduction.asciidoc b/doc/src/guide/introduction.asciidoc
index 18023ae..f81c872 100644
--- a/doc/src/guide/introduction.asciidoc
+++ b/doc/src/guide/introduction.asciidoc
@@ -35,7 +35,7 @@ guarantee that the experience will be safe and smooth. You are advised
to perform the necessary testing and security audits prior to deploying
on other platforms.
-Cowboy is developed for Erlang/OTP 20.0 and newer.
+Cowboy is developed for Erlang/OTP 22.0 and newer.
=== License
diff --git a/doc/src/manual/cowboy_http.asciidoc b/doc/src/manual/cowboy_http.asciidoc
index 8d89ea2..b088797 100644
--- a/doc/src/manual/cowboy_http.asciidoc
+++ b/doc/src/manual/cowboy_http.asciidoc
@@ -17,6 +17,7 @@ as a Ranch protocol.
[source,erlang]
----
opts() :: #{
+ active_n => pos_integer(),
chunked => boolean(),
connection_type => worker | supervisor,
http10_keepalive => boolean(),
@@ -51,6 +52,14 @@ Ranch functions `ranch:get_protocol_options/1` and
The default value is given next to the option name:
+active_n (100)::
+
+The number of packets Cowboy will request from the socket at once.
+This can be used to tweak the performance of the server. Higher
+values reduce the number of times Cowboy need to request more
+packets from the port driver at the expense of potentially
+higher memory being used.
+
chunked (true)::
Whether chunked transfer-encoding is enabled for HTTP/1.1 connections.
@@ -151,6 +160,7 @@ Ordered list of stream handlers that will handle all stream events.
== Changelog
+* *2.8*: The `active_n` option was added.
* *2.7*: The `initial_stream_flow_size` and `logger` options were added.
* *2.6*: The `chunked`, `http10_keepalive`, `proxy_header` and `sendfile` options were added.
* *2.5*: The `linger_timeout` option was added.
diff --git a/doc/src/manual/cowboy_http2.asciidoc b/doc/src/manual/cowboy_http2.asciidoc
index ccd3bb3..b8a9258 100644
--- a/doc/src/manual/cowboy_http2.asciidoc
+++ b/doc/src/manual/cowboy_http2.asciidoc
@@ -17,6 +17,7 @@ as a Ranch protocol.
[source,erlang]
----
opts() :: #{
+ active_n => pos_integer(),
connection_type => worker | supervisor,
connection_window_margin_size => 0..16#7fffffff,
connection_window_update_threshold => 0..16#7fffffff,
@@ -59,6 +60,14 @@ Ranch functions `ranch:get_protocol_options/1` and
The default value is given next to the option name:
+active_n (100)::
+
+The number of packets Cowboy will request from the socket at once.
+This can be used to tweak the performance of the server. Higher
+values reduce the number of times Cowboy need to request more
+packets from the port driver at the expense of potentially
+higher memory being used.
+
connection_type (supervisor)::
Whether the connection process also acts as a supervisor.
@@ -226,6 +235,7 @@ too many `WINDOW_UPDATE` frames.
== Changelog
+* *2.8*: The `active_n` option was added.
* *2.7*: Add the options `connection_window_margin_size`,
`connection_window_update_threshold`,
`max_connection_window_size`, `max_stream_window_size`,
diff --git a/doc/src/manual/cowboy_websocket.asciidoc b/doc/src/manual/cowboy_websocket.asciidoc
index a11ca18..5b1558c 100644
--- a/doc/src/manual/cowboy_websocket.asciidoc
+++ b/doc/src/manual/cowboy_websocket.asciidoc
@@ -198,6 +198,7 @@ Cowboy does it automatically for you.
[source,erlang]
----
opts() :: #{
+ active_n => pos_integer(),
compress => boolean(),
deflate_opts => cow_ws:deflate_opts()
idle_timeout => timeout(),
@@ -221,6 +222,16 @@ init(Req, State) ->
The default value is given next to the option name:
+active_n (100)::
+
+The number of packets Cowboy will request from the socket at once.
+This can be used to tweak the performance of the server. Higher
+values reduce the number of times Cowboy need to request more
+packets from the port driver at the expense of potentially
+higher memory being used.
++
+This option does not apply to Websocket over HTTP/2.
+
compress (false)::
Whether to enable the Websocket frame compression
@@ -274,6 +285,7 @@ normal circumstances if necessary.
== Changelog
+* *2.8*: The `active_n` option was added.
* *2.7*: The commands based interface has been documented.
The old interface is now deprecated.
* *2.7*: The command `shutdown_reason` was introduced.
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index 90e3203..ad2ef75 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -21,6 +21,7 @@
-export([system_code_change/4]).
-type opts() :: #{
+ active_n => pos_integer(),
chunked => boolean(),
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
@@ -121,6 +122,9 @@
timer = undefined :: undefined | reference(),
+ %% Whether we are currently receiving data from the socket.
+ active = true :: boolean(),
+
%% Identifier for the stream currently being read (or waiting to be received).
in_streamid = 1 :: pos_integer(),
@@ -173,7 +177,8 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
peer=Peer, sock=Sock, cert=Cert,
last_streamid=maps:get(max_keepalive, Opts, 100)},
- before_loop(set_timeout(State, request_timeout));
+ setopts_active(State),
+ loop(set_timeout(State, request_timeout));
{{error, Reason}, _, _} ->
terminate(undefined, {socket_error, Reason,
'A socket error occurred when retrieving the peer name.'});
@@ -185,12 +190,29 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
'A socket error occurred when retrieving the client TLS certificate.'})
end.
-%% Do not read from the socket unless flow is large enough.
-before_loop(State=#state{flow=Flow}) when Flow =< 0 ->
- loop(State);
-before_loop(State=#state{socket=Socket, transport=Transport}) ->
- Transport:setopts(Socket, [{active, once}]),
- loop(State).
+setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
+ N = maps:get(active_n, Opts, 100),
+ Transport:setopts(Socket, [{active, N}]).
+
+active(State) ->
+ setopts_active(State),
+ State#state{active=true}.
+
+passive(State=#state{socket=Socket, transport=Transport}) ->
+ Transport:setopts(Socket, [{active, false}]),
+ Messages = Transport:messages(),
+ flush_passive(Socket, Messages),
+ State#state{active=false}.
+
+flush_passive(Socket, Messages) ->
+ receive
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ flush_passive(Socket, Messages)
+ after 0 ->
+ ok
+ end.
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
@@ -201,7 +223,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
%% Discard data coming in after the last request
%% we want to process was received fully.
{OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
- before_loop(State);
+ loop(State);
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
parse(<< Buffer/binary, Data/binary >>, State);
@@ -209,6 +231,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ setopts_active(State),
+ loop(State);
%% Timeouts.
{timeout, Ref, {shutdown, Pid}} ->
cowboy_children:shutdown_timeout(Children, Ref, Pid),
@@ -297,12 +324,12 @@ timeout(State, idle_timeout) ->
'Connection idle longer than configuration allows.'}).
parse(<<>>, State) ->
- before_loop(State#state{buffer= <<>>});
+ loop(State#state{buffer= <<>>});
%% Do not process requests that come in after the last request
%% and discard the buffer if any to save memory.
parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
- before_loop(State#state{buffer= <<>>});
+ loop(State#state{buffer= <<>>});
parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
after_parse(parse_request(Buffer, State, EmptyLines));
parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
@@ -364,17 +391,26 @@ after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer
%% No corresponding stream. We must skip the body of the previous request
%% in order to process the next one.
after_parse({data, _, IsFin, _, State}) ->
- before_loop(set_timeout(State, case IsFin of
+ loop(set_timeout(State, case IsFin of
fin -> request_timeout;
nofin -> idle_timeout
end));
after_parse({more, State}) ->
- before_loop(set_timeout(State, idle_timeout)).
+ loop(set_timeout(State, idle_timeout)).
update_flow(fin, _, State) ->
+ %% This function is only called after parsing, therefore we
+ %% are expecting to be in active mode already.
State#state{flow=infinity};
-update_flow(nofin, Data, State=#state{flow=Flow0}) ->
- State#state{flow=Flow0 - byte_size(Data)}.
+update_flow(nofin, Data, State0=#state{flow=Flow0}) ->
+ Flow = Flow0 - byte_size(Data),
+ State = State0#state{flow=Flow},
+ if
+ Flow0 > 0, Flow =< 0 ->
+ passive(State);
+ true ->
+ State
+ end.
%% Request-line.
@@ -935,8 +971,7 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command
Stream#stream{queue=Queue ++ Commands}),
State#state{streams=Streams};
%% Read the request body.
-commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
- [{flow, Size}|Tail]) ->
+commands(State0=#state{flow=Flow0}, StreamID, [{flow, Size}|Tail]) ->
%% We must read *at least* Size of data otherwise functions
%% like cowboy_req:read_body/1,2 will wait indefinitely.
Flow = if
@@ -944,11 +979,11 @@ commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
true -> Flow0 + Size
end,
%% Reenable active mode if necessary.
- _ = if
+ State = if
Flow0 =< 0, Flow > 0 ->
- Transport:setopts(Socket, [{active, once}]);
+ active(State0);
true ->
- ok
+ State0
end,
commands(State#state{flow=Flow}, StreamID, Tail);
%% Error responses are sent only if a response wasn't sent already.
@@ -1118,14 +1153,14 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
%% @todo If there's streams opened after this one, fail instead of 101.
- State = cancel_timeout(State0),
+ State1 = cancel_timeout(State0),
%% Before we send the 101 response we need to stop receiving data
%% from the socket, otherwise the data might be receive before the
%% call to flush/0 and we end up inadvertently dropping a packet.
%%
%% @todo Handle cases where the request came with a body. We need
%% to process or skip the body before the upgrade can be completed.
- Transport:setopts(Socket, [{active, false}]),
+ State = passive(State1),
%% Send a 101 response if necessary, then terminate the stream.
#state{streams=Streams} = case OutState of
wait -> info(State, StreamID, {inform, 101, Headers});
@@ -1415,37 +1450,41 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
0 ->
ok;
infinity ->
- terminate_linger_loop(State, undefined);
+ terminate_linger_before_loop(State, undefined, Transport:messages());
Timeout ->
TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
- terminate_linger_loop(State, TimerRef)
+ terminate_linger_before_loop(State, TimerRef, Transport:messages())
end;
{error, _} ->
ok
end.
-terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef) ->
- Messages = Transport:messages(),
- %% We may already have a message in the mailbox when we do this
+terminate_linger_before_loop(State, TimerRef, Messages) ->
+ %% We may already be in active mode when we do this
%% but it's OK because we are shutting down anyway.
- case Transport:setopts(Socket, [{active, once}]) of
+ case setopts_active(State) of
ok ->
- receive
- {OK, Socket, _} when OK =:= element(1, Messages) ->
- terminate_linger_loop(State, TimerRef);
- {Closed, Socket} when Closed =:= element(2, Messages) ->
- ok;
- {Error, Socket, _} when Error =:= element(3, Messages) ->
- ok;
- {timeout, TimerRef, linger_timeout} ->
- ok;
- _ ->
- terminate_linger_loop(State, TimerRef)
- end;
+ terminate_linger_loop(State, TimerRef, Messages);
{error, _} ->
ok
end.
+terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->
+ receive
+ {OK, Socket, _} when OK =:= element(1, Messages) ->
+ terminate_linger_loop(State, TimerRef, Messages);
+ {Closed, Socket} when Closed =:= element(2, Messages) ->
+ ok;
+ {Error, Socket, _} when Error =:= element(3, Messages) ->
+ ok;
+ {Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ terminate_linger_before_loop(State, TimerRef, Messages);
+ {timeout, TimerRef, linger_timeout} ->
+ ok;
+ _ ->
+ terminate_linger_loop(State, TimerRef, Messages)
+ end.
+
%% System callbacks.
-spec system_continue(_, _, #state{}) -> ok.
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index c8a4c7a..03ec9f8 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -23,6 +23,7 @@
-export([system_code_change/4]).
-type opts() :: #{
+ active_n => pos_integer(),
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
connection_type => worker | supervisor,
@@ -163,6 +164,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
http2_status=sequence, http2_machine=HTTP2Machine})),
Transport:send(Socket, Preface),
+ setopts_active(State),
case Buffer of
<<>> -> loop(State, Buffer);
_ -> parse(State, Buffer)
@@ -204,15 +206,21 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
}, ?MODULE, undefined}), %% @todo undefined or #{}?
State = set_timeout(init_rate_limiting(State2#state{http2_status=sequence})),
Transport:send(Socket, Preface),
+ setopts_active(State),
case Buffer of
<<>> -> loop(State, Buffer);
_ -> parse(State, Buffer)
end.
+%% Because HTTP/2 has flow control and Cowboy has other rate limiting
+%% mechanisms implemented, a very large active_n value should be fine,
+%% as long as the stream handlers do their work in a timely manner.
+setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
+ N = maps:get(active_n, Opts, 100),
+ Transport:setopts(Socket, [{active, N}]).
+
loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
opts=Opts, timer=TimerRef, children=Children}, Buffer) ->
- %% @todo This should only be called when data was read.
- Transport:setopts(Socket, [{active, once}]),
Messages = Transport:messages(),
InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
receive
@@ -223,6 +231,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ setopts_active(State),
+ loop(State, Buffer);
%% System messages.
{'EXIT', Parent, Reason} ->
%% @todo Graceful shutdown here as well?
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl
index 2a50d6a..9f42acc 100644
--- a/src/cowboy_stream_h.erl
+++ b/src/cowboy_stream_h.erl
@@ -92,6 +92,7 @@ data(StreamID, IsFin, Data, State=#state{read_body_pid=Pid, read_body_ref=Ref,
send_request_body(Pid, Ref, IsFin, BodyLen, Data),
do_data(StreamID, IsFin, Data, [{flow, byte_size(Data)}], State#state{
read_body_ref=undefined,
+ %% @todo This is wrong, it's missing byte_size(Data).
body_length=BodyLen
});
%% Stream is waiting for data but we didn't receive enough to send yet.
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index b3600be..e7d8f31 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -66,6 +66,7 @@
-optional_callbacks([terminate/3]).
-type opts() :: #{
+ active_n => pos_integer(),
compress => boolean(),
deflate_opts => cow_ws:deflate_opts(),
idle_timeout => timeout(),
@@ -85,7 +86,8 @@
handler :: module(),
key = undefined :: undefined | binary(),
timeout_ref = undefined :: undefined | reference(),
- messages = undefined :: undefined | {atom(), atom(), atom()},
+ messages = undefined :: undefined | {atom(), atom(), atom()}
+ | {atom(), atom(), atom(), atom()},
hibernate = false :: boolean(),
frag_state = undefined :: cow_ws:frag_state(),
frag_buffer = <<>> :: binary(),
@@ -300,28 +302,71 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
%% we still want to process that data if any.
case erlang:function_exported(Handler, websocket_init, 1) of
true -> handler_call(State, HandlerState, #ps_header{buffer=Buffer},
- websocket_init, undefined, fun parse_header/3);
- false -> parse_header(State, HandlerState, #ps_header{buffer=Buffer})
+ websocket_init, undefined, fun after_init/3);
+ false -> after_init(State, HandlerState, #ps_header{buffer=Buffer})
end.
-before_loop(State=#state{active=false}, HandlerState, ParseState) ->
- loop(State, HandlerState, ParseState);
-%% @todo We probably shouldn't do the setopts if we have not received a socket message.
-%% @todo We need to hibernate when HTTP/2 is used too.
-before_loop(State=#state{socket=Stream={Pid, _}, transport=undefined},
- HandlerState, ParseState) ->
+after_init(State=#state{active=true}, HandlerState, ParseState) ->
+ %% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2.
+ %% We must do this only after calling websocket_init/1 (if any)
+ %% to give the handler a chance to disable active mode immediately.
+ setopts_active(State),
+ maybe_read_body(State),
+ parse_header(State, HandlerState, ParseState);
+after_init(State, HandlerState, ParseState) ->
+ parse_header(State, HandlerState, ParseState).
+
+%% We have two ways of reading the body for Websocket. For HTTP/1.1
+%% we have full control of the socket and can therefore use active,N.
+%% For HTTP/2 we are just a stream, and are instead using read_body
+%% (automatic mode). Technically HTTP/2 will only go passive after
+%% receiving the next data message, while HTTP/1.1 goes passive
+%% immediately but there might still be data to be processed in
+%% the message queue.
+
+setopts_active(#state{transport=undefined}) ->
+ ok;
+setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
+ N = maps:get(active_n, Opts, 100),
+ Transport:setopts(Socket, [{active, N}]).
+
+maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
%% @todo Keep Ref around.
ReadBodyRef = make_ref(),
Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}},
- loop(State, HandlerState, ParseState);
-before_loop(State=#state{socket=Socket, transport=Transport, hibernate=true},
- HandlerState, ParseState) ->
- Transport:setopts(Socket, [{active, once}]),
+ ok;
+maybe_read_body(_) ->
+ ok.
+
+active(State) ->
+ setopts_active(State),
+ maybe_read_body(State),
+ State#state{active=true}.
+
+passive(State=#state{transport=undefined}) ->
+ %% Unfortunately we cannot currently cancel read_body.
+ %% But that's OK, we will just stop reading the body
+ %% after the next message.
+ State#state{active=false};
+passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) ->
+ Transport:setopts(Socket, [{active, false}]),
+ flush_passive(Socket, Messages),
+ State#state{active=false}.
+
+flush_passive(Socket, Messages) ->
+ receive
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ flush_passive(Socket, Messages)
+ after 0 ->
+ ok
+ end.
+
+before_loop(State=#state{hibernate=true}, HandlerState, ParseState) ->
proc_lib:hibernate(?MODULE, loop,
[State#state{hibernate=false}, HandlerState, ParseState]);
-before_loop(State=#state{socket=Socket, transport=Transport},
- HandlerState, ParseState) ->
- Transport:setopts(Socket, [{active, once}]),
+before_loop(State, HandlerState, ParseState) ->
loop(State, HandlerState, ParseState).
-spec loop_timeout(#state{}) -> #state{}.
@@ -350,22 +395,28 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
terminate(State, HandlerState, {error, closed});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
terminate(State, HandlerState, {error, Reason});
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ setopts_active(State),
+ loop(State, HandlerState, ParseState);
%% Body reading messages. (HTTP/2)
{request_body, _Ref, nofin, Data} ->
+ maybe_read_body(State),
State2 = loop_timeout(State),
parse(State2, HandlerState, ParseState, Data);
%% @todo We need to handle this case as if it was an {error, closed}
%% but not before we finish processing frames. We probably should have
%% a check in before_loop to let us stop looping if a flag is set.
{request_body, _Ref, fin, _, Data} ->
+ maybe_read_body(State),
State2 = loop_timeout(State),
parse(State2, HandlerState, ParseState, Data);
%% Timeouts.
{timeout, TRef, ?MODULE} ->
websocket_close(State, HandlerState, timeout);
{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
- %% @todo This should call before_loop.
- loop(State, HandlerState, ParseState);
+ before_loop(State, HandlerState, ParseState);
%% System messages.
{'EXIT', Parent, Reason} ->
%% @todo We should exit gracefully.
@@ -376,8 +427,7 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
%% Calls from supervisor module.
{'$gen_call', From, Call} ->
cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
- %% @todo This should call before_loop.
- loop(State, HandlerState, ParseState);
+ before_loop(State, HandlerState, ParseState);
Message ->
handler_call(State, HandlerState, ParseState,
websocket_info, Message, fun before_loop/3)
@@ -531,7 +581,15 @@ commands([], State, []) ->
commands([], State, Data) ->
Result = transport_send(State, nofin, lists:reverse(Data)),
{Result, State};
-commands([{active, Active}|Tail], State, Data) when is_boolean(Active) ->
+commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_boolean(Active) ->
+ State = if
+ Active, not Active0 ->
+ active(State0);
+ Active0, not Active ->
+ passive(State0);
+ true ->
+ State0
+ end,
commands(Tail, State#state{active=Active}, Data);
commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) ->
commands(Tail, State#state{deflate=Deflate}, Data);