aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Makefile4
-rw-r--r--doc/src/guide/introduction.asciidoc2
-rw-r--r--doc/src/manual/cowboy_http.asciidoc10
-rw-r--r--doc/src/manual/cowboy_http2.asciidoc10
-rw-r--r--doc/src/manual/cowboy_websocket.asciidoc12
-rw-r--r--src/cowboy_http.erl117
-rw-r--r--src/cowboy_http2.erl17
-rw-r--r--src/cowboy_stream_h.erl1
-rw-r--r--src/cowboy_websocket.erl100
9 files changed, 208 insertions, 65 deletions
diff --git a/Makefile b/Makefile
index 38a7c81..825c33d 100644
--- a/Makefile
+++ b/Makefile
@@ -29,10 +29,10 @@ dep_gun = git https://github.com/ninenines/gun master
dep_ci.erlang.mk = git https://github.com/ninenines/ci.erlang.mk master
DEP_EARLY_PLUGINS = ci.erlang.mk
-AUTO_CI_OTP ?= OTP-LATEST-20+
+AUTO_CI_OTP ?= OTP-LATEST-22+
AUTO_CI_HIPE ?= OTP-LATEST
# AUTO_CI_ERLLVM ?= OTP-LATEST
-AUTO_CI_WINDOWS ?= OTP-LATEST-20+
+AUTO_CI_WINDOWS ?= OTP-LATEST-22+
# Standard targets.
diff --git a/doc/src/guide/introduction.asciidoc b/doc/src/guide/introduction.asciidoc
index 18023ae..f81c872 100644
--- a/doc/src/guide/introduction.asciidoc
+++ b/doc/src/guide/introduction.asciidoc
@@ -35,7 +35,7 @@ guarantee that the experience will be safe and smooth. You are advised
to perform the necessary testing and security audits prior to deploying
on other platforms.
-Cowboy is developed for Erlang/OTP 20.0 and newer.
+Cowboy is developed for Erlang/OTP 22.0 and newer.
=== License
diff --git a/doc/src/manual/cowboy_http.asciidoc b/doc/src/manual/cowboy_http.asciidoc
index 8d89ea2..b088797 100644
--- a/doc/src/manual/cowboy_http.asciidoc
+++ b/doc/src/manual/cowboy_http.asciidoc
@@ -17,6 +17,7 @@ as a Ranch protocol.
[source,erlang]
----
opts() :: #{
+ active_n => pos_integer(),
chunked => boolean(),
connection_type => worker | supervisor,
http10_keepalive => boolean(),
@@ -51,6 +52,14 @@ Ranch functions `ranch:get_protocol_options/1` and
The default value is given next to the option name:
+active_n (100)::
+
+The number of packets Cowboy will request from the socket at once.
+This can be used to tweak the performance of the server. Higher
+values reduce the number of times Cowboy need to request more
+packets from the port driver at the expense of potentially
+higher memory being used.
+
chunked (true)::
Whether chunked transfer-encoding is enabled for HTTP/1.1 connections.
@@ -151,6 +160,7 @@ Ordered list of stream handlers that will handle all stream events.
== Changelog
+* *2.8*: The `active_n` option was added.
* *2.7*: The `initial_stream_flow_size` and `logger` options were added.
* *2.6*: The `chunked`, `http10_keepalive`, `proxy_header` and `sendfile` options were added.
* *2.5*: The `linger_timeout` option was added.
diff --git a/doc/src/manual/cowboy_http2.asciidoc b/doc/src/manual/cowboy_http2.asciidoc
index ccd3bb3..b8a9258 100644
--- a/doc/src/manual/cowboy_http2.asciidoc
+++ b/doc/src/manual/cowboy_http2.asciidoc
@@ -17,6 +17,7 @@ as a Ranch protocol.
[source,erlang]
----
opts() :: #{
+ active_n => pos_integer(),
connection_type => worker | supervisor,
connection_window_margin_size => 0..16#7fffffff,
connection_window_update_threshold => 0..16#7fffffff,
@@ -59,6 +60,14 @@ Ranch functions `ranch:get_protocol_options/1` and
The default value is given next to the option name:
+active_n (100)::
+
+The number of packets Cowboy will request from the socket at once.
+This can be used to tweak the performance of the server. Higher
+values reduce the number of times Cowboy need to request more
+packets from the port driver at the expense of potentially
+higher memory being used.
+
connection_type (supervisor)::
Whether the connection process also acts as a supervisor.
@@ -226,6 +235,7 @@ too many `WINDOW_UPDATE` frames.
== Changelog
+* *2.8*: The `active_n` option was added.
* *2.7*: Add the options `connection_window_margin_size`,
`connection_window_update_threshold`,
`max_connection_window_size`, `max_stream_window_size`,
diff --git a/doc/src/manual/cowboy_websocket.asciidoc b/doc/src/manual/cowboy_websocket.asciidoc
index a11ca18..5b1558c 100644
--- a/doc/src/manual/cowboy_websocket.asciidoc
+++ b/doc/src/manual/cowboy_websocket.asciidoc
@@ -198,6 +198,7 @@ Cowboy does it automatically for you.
[source,erlang]
----
opts() :: #{
+ active_n => pos_integer(),
compress => boolean(),
deflate_opts => cow_ws:deflate_opts()
idle_timeout => timeout(),
@@ -221,6 +222,16 @@ init(Req, State) ->
The default value is given next to the option name:
+active_n (100)::
+
+The number of packets Cowboy will request from the socket at once.
+This can be used to tweak the performance of the server. Higher
+values reduce the number of times Cowboy need to request more
+packets from the port driver at the expense of potentially
+higher memory being used.
++
+This option does not apply to Websocket over HTTP/2.
+
compress (false)::
Whether to enable the Websocket frame compression
@@ -274,6 +285,7 @@ normal circumstances if necessary.
== Changelog
+* *2.8*: The `active_n` option was added.
* *2.7*: The commands based interface has been documented.
The old interface is now deprecated.
* *2.7*: The command `shutdown_reason` was introduced.
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index 90e3203..ad2ef75 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -21,6 +21,7 @@
-export([system_code_change/4]).
-type opts() :: #{
+ active_n => pos_integer(),
chunked => boolean(),
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
@@ -121,6 +122,9 @@
timer = undefined :: undefined | reference(),
+ %% Whether we are currently receiving data from the socket.
+ active = true :: boolean(),
+
%% Identifier for the stream currently being read (or waiting to be received).
in_streamid = 1 :: pos_integer(),
@@ -173,7 +177,8 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
peer=Peer, sock=Sock, cert=Cert,
last_streamid=maps:get(max_keepalive, Opts, 100)},
- before_loop(set_timeout(State, request_timeout));
+ setopts_active(State),
+ loop(set_timeout(State, request_timeout));
{{error, Reason}, _, _} ->
terminate(undefined, {socket_error, Reason,
'A socket error occurred when retrieving the peer name.'});
@@ -185,12 +190,29 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
'A socket error occurred when retrieving the client TLS certificate.'})
end.
-%% Do not read from the socket unless flow is large enough.
-before_loop(State=#state{flow=Flow}) when Flow =< 0 ->
- loop(State);
-before_loop(State=#state{socket=Socket, transport=Transport}) ->
- Transport:setopts(Socket, [{active, once}]),
- loop(State).
+setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
+ N = maps:get(active_n, Opts, 100),
+ Transport:setopts(Socket, [{active, N}]).
+
+active(State) ->
+ setopts_active(State),
+ State#state{active=true}.
+
+passive(State=#state{socket=Socket, transport=Transport}) ->
+ Transport:setopts(Socket, [{active, false}]),
+ Messages = Transport:messages(),
+ flush_passive(Socket, Messages),
+ State#state{active=false}.
+
+flush_passive(Socket, Messages) ->
+ receive
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ flush_passive(Socket, Messages)
+ after 0 ->
+ ok
+ end.
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
@@ -201,7 +223,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
%% Discard data coming in after the last request
%% we want to process was received fully.
{OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
- before_loop(State);
+ loop(State);
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
parse(<< Buffer/binary, Data/binary >>, State);
@@ -209,6 +231,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ setopts_active(State),
+ loop(State);
%% Timeouts.
{timeout, Ref, {shutdown, Pid}} ->
cowboy_children:shutdown_timeout(Children, Ref, Pid),
@@ -297,12 +324,12 @@ timeout(State, idle_timeout) ->
'Connection idle longer than configuration allows.'}).
parse(<<>>, State) ->
- before_loop(State#state{buffer= <<>>});
+ loop(State#state{buffer= <<>>});
%% Do not process requests that come in after the last request
%% and discard the buffer if any to save memory.
parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
- before_loop(State#state{buffer= <<>>});
+ loop(State#state{buffer= <<>>});
parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
after_parse(parse_request(Buffer, State, EmptyLines));
parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
@@ -364,17 +391,26 @@ after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer
%% No corresponding stream. We must skip the body of the previous request
%% in order to process the next one.
after_parse({data, _, IsFin, _, State}) ->
- before_loop(set_timeout(State, case IsFin of
+ loop(set_timeout(State, case IsFin of
fin -> request_timeout;
nofin -> idle_timeout
end));
after_parse({more, State}) ->
- before_loop(set_timeout(State, idle_timeout)).
+ loop(set_timeout(State, idle_timeout)).
update_flow(fin, _, State) ->
+ %% This function is only called after parsing, therefore we
+ %% are expecting to be in active mode already.
State#state{flow=infinity};
-update_flow(nofin, Data, State=#state{flow=Flow0}) ->
- State#state{flow=Flow0 - byte_size(Data)}.
+update_flow(nofin, Data, State0=#state{flow=Flow0}) ->
+ Flow = Flow0 - byte_size(Data),
+ State = State0#state{flow=Flow},
+ if
+ Flow0 > 0, Flow =< 0 ->
+ passive(State);
+ true ->
+ State
+ end.
%% Request-line.
@@ -935,8 +971,7 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command
Stream#stream{queue=Queue ++ Commands}),
State#state{streams=Streams};
%% Read the request body.
-commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
- [{flow, Size}|Tail]) ->
+commands(State0=#state{flow=Flow0}, StreamID, [{flow, Size}|Tail]) ->
%% We must read *at least* Size of data otherwise functions
%% like cowboy_req:read_body/1,2 will wait indefinitely.
Flow = if
@@ -944,11 +979,11 @@ commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
true -> Flow0 + Size
end,
%% Reenable active mode if necessary.
- _ = if
+ State = if
Flow0 =< 0, Flow > 0 ->
- Transport:setopts(Socket, [{active, once}]);
+ active(State0);
true ->
- ok
+ State0
end,
commands(State#state{flow=Flow}, StreamID, Tail);
%% Error responses are sent only if a response wasn't sent already.
@@ -1118,14 +1153,14 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
%% @todo If there's streams opened after this one, fail instead of 101.
- State = cancel_timeout(State0),
+ State1 = cancel_timeout(State0),
%% Before we send the 101 response we need to stop receiving data
%% from the socket, otherwise the data might be receive before the
%% call to flush/0 and we end up inadvertently dropping a packet.
%%
%% @todo Handle cases where the request came with a body. We need
%% to process or skip the body before the upgrade can be completed.
- Transport:setopts(Socket, [{active, false}]),
+ State = passive(State1),
%% Send a 101 response if necessary, then terminate the stream.
#state{streams=Streams} = case OutState of
wait -> info(State, StreamID, {inform, 101, Headers});
@@ -1415,37 +1450,41 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
0 ->
ok;
infinity ->
- terminate_linger_loop(State, undefined);
+ terminate_linger_before_loop(State, undefined, Transport:messages());
Timeout ->
TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
- terminate_linger_loop(State, TimerRef)
+ terminate_linger_before_loop(State, TimerRef, Transport:messages())
end;
{error, _} ->
ok
end.
-terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef) ->
- Messages = Transport:messages(),
- %% We may already have a message in the mailbox when we do this
+terminate_linger_before_loop(State, TimerRef, Messages) ->
+ %% We may already be in active mode when we do this
%% but it's OK because we are shutting down anyway.
- case Transport:setopts(Socket, [{active, once}]) of
+ case setopts_active(State) of
ok ->
- receive
- {OK, Socket, _} when OK =:= element(1, Messages) ->
- terminate_linger_loop(State, TimerRef);
- {Closed, Socket} when Closed =:= element(2, Messages) ->
- ok;
- {Error, Socket, _} when Error =:= element(3, Messages) ->
- ok;
- {timeout, TimerRef, linger_timeout} ->
- ok;
- _ ->
- terminate_linger_loop(State, TimerRef)
- end;
+ terminate_linger_loop(State, TimerRef, Messages);
{error, _} ->
ok
end.
+terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->
+ receive
+ {OK, Socket, _} when OK =:= element(1, Messages) ->
+ terminate_linger_loop(State, TimerRef, Messages);
+ {Closed, Socket} when Closed =:= element(2, Messages) ->
+ ok;
+ {Error, Socket, _} when Error =:= element(3, Messages) ->
+ ok;
+ {Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ terminate_linger_before_loop(State, TimerRef, Messages);
+ {timeout, TimerRef, linger_timeout} ->
+ ok;
+ _ ->
+ terminate_linger_loop(State, TimerRef, Messages)
+ end.
+
%% System callbacks.
-spec system_continue(_, _, #state{}) -> ok.
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index c8a4c7a..03ec9f8 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -23,6 +23,7 @@
-export([system_code_change/4]).
-type opts() :: #{
+ active_n => pos_integer(),
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
connection_type => worker | supervisor,
@@ -163,6 +164,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
http2_status=sequence, http2_machine=HTTP2Machine})),
Transport:send(Socket, Preface),
+ setopts_active(State),
case Buffer of
<<>> -> loop(State, Buffer);
_ -> parse(State, Buffer)
@@ -204,15 +206,21 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
}, ?MODULE, undefined}), %% @todo undefined or #{}?
State = set_timeout(init_rate_limiting(State2#state{http2_status=sequence})),
Transport:send(Socket, Preface),
+ setopts_active(State),
case Buffer of
<<>> -> loop(State, Buffer);
_ -> parse(State, Buffer)
end.
+%% Because HTTP/2 has flow control and Cowboy has other rate limiting
+%% mechanisms implemented, a very large active_n value should be fine,
+%% as long as the stream handlers do their work in a timely manner.
+setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
+ N = maps:get(active_n, Opts, 100),
+ Transport:setopts(Socket, [{active, N}]).
+
loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
opts=Opts, timer=TimerRef, children=Children}, Buffer) ->
- %% @todo This should only be called when data was read.
- Transport:setopts(Socket, [{active, once}]),
Messages = Transport:messages(),
InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
receive
@@ -223,6 +231,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ setopts_active(State),
+ loop(State, Buffer);
%% System messages.
{'EXIT', Parent, Reason} ->
%% @todo Graceful shutdown here as well?
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl
index 2a50d6a..9f42acc 100644
--- a/src/cowboy_stream_h.erl
+++ b/src/cowboy_stream_h.erl
@@ -92,6 +92,7 @@ data(StreamID, IsFin, Data, State=#state{read_body_pid=Pid, read_body_ref=Ref,
send_request_body(Pid, Ref, IsFin, BodyLen, Data),
do_data(StreamID, IsFin, Data, [{flow, byte_size(Data)}], State#state{
read_body_ref=undefined,
+ %% @todo This is wrong, it's missing byte_size(Data).
body_length=BodyLen
});
%% Stream is waiting for data but we didn't receive enough to send yet.
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index b3600be..e7d8f31 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -66,6 +66,7 @@
-optional_callbacks([terminate/3]).
-type opts() :: #{
+ active_n => pos_integer(),
compress => boolean(),
deflate_opts => cow_ws:deflate_opts(),
idle_timeout => timeout(),
@@ -85,7 +86,8 @@
handler :: module(),
key = undefined :: undefined | binary(),
timeout_ref = undefined :: undefined | reference(),
- messages = undefined :: undefined | {atom(), atom(), atom()},
+ messages = undefined :: undefined | {atom(), atom(), atom()}
+ | {atom(), atom(), atom(), atom()},
hibernate = false :: boolean(),
frag_state = undefined :: cow_ws:frag_state(),
frag_buffer = <<>> :: binary(),
@@ -300,28 +302,71 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
%% we still want to process that data if any.
case erlang:function_exported(Handler, websocket_init, 1) of
true -> handler_call(State, HandlerState, #ps_header{buffer=Buffer},
- websocket_init, undefined, fun parse_header/3);
- false -> parse_header(State, HandlerState, #ps_header{buffer=Buffer})
+ websocket_init, undefined, fun after_init/3);
+ false -> after_init(State, HandlerState, #ps_header{buffer=Buffer})
end.
-before_loop(State=#state{active=false}, HandlerState, ParseState) ->
- loop(State, HandlerState, ParseState);
-%% @todo We probably shouldn't do the setopts if we have not received a socket message.
-%% @todo We need to hibernate when HTTP/2 is used too.
-before_loop(State=#state{socket=Stream={Pid, _}, transport=undefined},
- HandlerState, ParseState) ->
+after_init(State=#state{active=true}, HandlerState, ParseState) ->
+ %% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2.
+ %% We must do this only after calling websocket_init/1 (if any)
+ %% to give the handler a chance to disable active mode immediately.
+ setopts_active(State),
+ maybe_read_body(State),
+ parse_header(State, HandlerState, ParseState);
+after_init(State, HandlerState, ParseState) ->
+ parse_header(State, HandlerState, ParseState).
+
+%% We have two ways of reading the body for Websocket. For HTTP/1.1
+%% we have full control of the socket and can therefore use active,N.
+%% For HTTP/2 we are just a stream, and are instead using read_body
+%% (automatic mode). Technically HTTP/2 will only go passive after
+%% receiving the next data message, while HTTP/1.1 goes passive
+%% immediately but there might still be data to be processed in
+%% the message queue.
+
+setopts_active(#state{transport=undefined}) ->
+ ok;
+setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
+ N = maps:get(active_n, Opts, 100),
+ Transport:setopts(Socket, [{active, N}]).
+
+maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
%% @todo Keep Ref around.
ReadBodyRef = make_ref(),
Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}},
- loop(State, HandlerState, ParseState);
-before_loop(State=#state{socket=Socket, transport=Transport, hibernate=true},
- HandlerState, ParseState) ->
- Transport:setopts(Socket, [{active, once}]),
+ ok;
+maybe_read_body(_) ->
+ ok.
+
+active(State) ->
+ setopts_active(State),
+ maybe_read_body(State),
+ State#state{active=true}.
+
+passive(State=#state{transport=undefined}) ->
+ %% Unfortunately we cannot currently cancel read_body.
+ %% But that's OK, we will just stop reading the body
+ %% after the next message.
+ State#state{active=false};
+passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) ->
+ Transport:setopts(Socket, [{active, false}]),
+ flush_passive(Socket, Messages),
+ State#state{active=false}.
+
+flush_passive(Socket, Messages) ->
+ receive
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ flush_passive(Socket, Messages)
+ after 0 ->
+ ok
+ end.
+
+before_loop(State=#state{hibernate=true}, HandlerState, ParseState) ->
proc_lib:hibernate(?MODULE, loop,
[State#state{hibernate=false}, HandlerState, ParseState]);
-before_loop(State=#state{socket=Socket, transport=Transport},
- HandlerState, ParseState) ->
- Transport:setopts(Socket, [{active, once}]),
+before_loop(State, HandlerState, ParseState) ->
loop(State, HandlerState, ParseState).
-spec loop_timeout(#state{}) -> #state{}.
@@ -350,22 +395,28 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
terminate(State, HandlerState, {error, closed});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
terminate(State, HandlerState, {error, Reason});
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ setopts_active(State),
+ loop(State, HandlerState, ParseState);
%% Body reading messages. (HTTP/2)
{request_body, _Ref, nofin, Data} ->
+ maybe_read_body(State),
State2 = loop_timeout(State),
parse(State2, HandlerState, ParseState, Data);
%% @todo We need to handle this case as if it was an {error, closed}
%% but not before we finish processing frames. We probably should have
%% a check in before_loop to let us stop looping if a flag is set.
{request_body, _Ref, fin, _, Data} ->
+ maybe_read_body(State),
State2 = loop_timeout(State),
parse(State2, HandlerState, ParseState, Data);
%% Timeouts.
{timeout, TRef, ?MODULE} ->
websocket_close(State, HandlerState, timeout);
{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
- %% @todo This should call before_loop.
- loop(State, HandlerState, ParseState);
+ before_loop(State, HandlerState, ParseState);
%% System messages.
{'EXIT', Parent, Reason} ->
%% @todo We should exit gracefully.
@@ -376,8 +427,7 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
%% Calls from supervisor module.
{'$gen_call', From, Call} ->
cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
- %% @todo This should call before_loop.
- loop(State, HandlerState, ParseState);
+ before_loop(State, HandlerState, ParseState);
Message ->
handler_call(State, HandlerState, ParseState,
websocket_info, Message, fun before_loop/3)
@@ -531,7 +581,15 @@ commands([], State, []) ->
commands([], State, Data) ->
Result = transport_send(State, nofin, lists:reverse(Data)),
{Result, State};
-commands([{active, Active}|Tail], State, Data) when is_boolean(Active) ->
+commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_boolean(Active) ->
+ State = if
+ Active, not Active0 ->
+ active(State0);
+ Active0, not Active ->
+ passive(State0);
+ true ->
+ State0
+ end,
commands(Tail, State#state{active=Active}, Data);
commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) ->
commands(Tail, State#state{deflate=Deflate}, Data);