path: root/src/cowboy_websocket.erl
diff options
Diffstat (limited to 'src/cowboy_websocket.erl')
1 files changed, 79 insertions, 21 deletions
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index b3600be..e7d8f31 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -66,6 +66,7 @@
-type opts() :: #{
+ active_n => pos_integer(),
compress => boolean(),
deflate_opts => cow_ws:deflate_opts(),
idle_timeout => timeout(),
@@ -85,7 +86,8 @@
handler :: module(),
key = undefined :: undefined | binary(),
timeout_ref = undefined :: undefined | reference(),
- messages = undefined :: undefined | {atom(), atom(), atom()},
+ messages = undefined :: undefined | {atom(), atom(), atom()}
+ | {atom(), atom(), atom(), atom()},
hibernate = false :: boolean(),
frag_state = undefined :: cow_ws:frag_state(),
frag_buffer = <<>> :: binary(),
@@ -300,28 +302,71 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
%% we still want to process that data if any.
case erlang:function_exported(Handler, websocket_init, 1) of
true -> handler_call(State, HandlerState, #ps_header{buffer=Buffer},
- websocket_init, undefined, fun parse_header/3);
- false -> parse_header(State, HandlerState, #ps_header{buffer=Buffer})
+ websocket_init, undefined, fun after_init/3);
+ false -> after_init(State, HandlerState, #ps_header{buffer=Buffer})
-before_loop(State=#state{active=false}, HandlerState, ParseState) ->
- loop(State, HandlerState, ParseState);
-%% @todo We probably shouldn't do the setopts if we have not received a socket message.
-%% @todo We need to hibernate when HTTP/2 is used too.
-before_loop(State=#state{socket=Stream={Pid, _}, transport=undefined},
- HandlerState, ParseState) ->
+after_init(State=#state{active=true}, HandlerState, ParseState) ->
+ %% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2.
+ %% We must do this only after calling websocket_init/1 (if any)
+ %% to give the handler a chance to disable active mode immediately.
+ setopts_active(State),
+ maybe_read_body(State),
+ parse_header(State, HandlerState, ParseState);
+after_init(State, HandlerState, ParseState) ->
+ parse_header(State, HandlerState, ParseState).
+%% We have two ways of reading the body for Websocket. For HTTP/1.1
+%% we have full control of the socket and can therefore use active,N.
+%% For HTTP/2 we are just a stream, and are instead using read_body
+%% (automatic mode). Technically HTTP/2 will only go passive after
+%% receiving the next data message, while HTTP/1.1 goes passive
+%% immediately but there might still be data to be processed in
+%% the message queue.
+setopts_active(#state{transport=undefined}) ->
+ ok;
+setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
+ N = maps:get(active_n, Opts, 100),
+ Transport:setopts(Socket, [{active, N}]).
+maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
%% @todo Keep Ref around.
ReadBodyRef = make_ref(),
Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}},
- loop(State, HandlerState, ParseState);
-before_loop(State=#state{socket=Socket, transport=Transport, hibernate=true},
- HandlerState, ParseState) ->
- Transport:setopts(Socket, [{active, once}]),
+ ok;
+maybe_read_body(_) ->
+ ok.
+active(State) ->
+ setopts_active(State),
+ maybe_read_body(State),
+ State#state{active=true}.
+passive(State=#state{transport=undefined}) ->
+ %% Unfortunately we cannot currently cancel read_body.
+ %% But that's OK, we will just stop reading the body
+ %% after the next message.
+ State#state{active=false};
+passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) ->
+ Transport:setopts(Socket, [{active, false}]),
+ flush_passive(Socket, Messages),
+ State#state{active=false}.
+flush_passive(Socket, Messages) ->
+ receive
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ flush_passive(Socket, Messages)
+ after 0 ->
+ ok
+ end.
+before_loop(State=#state{hibernate=true}, HandlerState, ParseState) ->
proc_lib:hibernate(?MODULE, loop,
[State#state{hibernate=false}, HandlerState, ParseState]);
-before_loop(State=#state{socket=Socket, transport=Transport},
- HandlerState, ParseState) ->
- Transport:setopts(Socket, [{active, once}]),
+before_loop(State, HandlerState, ParseState) ->
loop(State, HandlerState, ParseState).
-spec loop_timeout(#state{}) -> #state{}.
@@ -350,22 +395,28 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
terminate(State, HandlerState, {error, closed});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
terminate(State, HandlerState, {error, Reason});
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ setopts_active(State),
+ loop(State, HandlerState, ParseState);
%% Body reading messages. (HTTP/2)
{request_body, _Ref, nofin, Data} ->
+ maybe_read_body(State),
State2 = loop_timeout(State),
parse(State2, HandlerState, ParseState, Data);
%% @todo We need to handle this case as if it was an {error, closed}
%% but not before we finish processing frames. We probably should have
%% a check in before_loop to let us stop looping if a flag is set.
{request_body, _Ref, fin, _, Data} ->
+ maybe_read_body(State),
State2 = loop_timeout(State),
parse(State2, HandlerState, ParseState, Data);
%% Timeouts.
{timeout, TRef, ?MODULE} ->
websocket_close(State, HandlerState, timeout);
{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
- %% @todo This should call before_loop.
- loop(State, HandlerState, ParseState);
+ before_loop(State, HandlerState, ParseState);
%% System messages.
{'EXIT', Parent, Reason} ->
%% @todo We should exit gracefully.
@@ -376,8 +427,7 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
%% Calls from supervisor module.
{'$gen_call', From, Call} ->
cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
- %% @todo This should call before_loop.
- loop(State, HandlerState, ParseState);
+ before_loop(State, HandlerState, ParseState);
Message ->
handler_call(State, HandlerState, ParseState,
websocket_info, Message, fun before_loop/3)
@@ -531,7 +581,15 @@ commands([], State, []) ->
commands([], State, Data) ->
Result = transport_send(State, nofin, lists:reverse(Data)),
{Result, State};
-commands([{active, Active}|Tail], State, Data) when is_boolean(Active) ->
+commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_boolean(Active) ->
+ State = if
+ Active, not Active0 ->
+ active(State0);
+ Active0, not Active ->
+ passive(State0);
+ true ->
+ State0
+ end,
commands(Tail, State#state{active=Active}, Data);
commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) ->
commands(Tail, State#state{deflate=Deflate}, Data);