path: root/src/cowboy_websocket.erl
diff options
authorLoïc Hoguin <[email protected]>2019-12-04 11:17:34 +0100
committerLoïc Hoguin <[email protected]>2020-01-06 12:58:14 +0100
commitdb0d6f8d254f2cc01bd458dc41969e0b96991cc3 (patch)
tree5d77236bc703223fcb36e45cbf3de72de1763d50 /src/cowboy_websocket.erl
parent592029070dea7c1f7b85d465e250ef6842e1a46b (diff)
Use active,N
This reduces the number of times we need to ask for more packets, and as a result we get a fairly large boost in performance, especially with HTTP/1.1. Unfortunately this makes Cowboy require at least Erlang/OTP 21.3+ because the ssl application did not have active,N. For simplicity the version required will be Erlang/OTP 22+. In addition this change improves hibernate handling in cowboy_websocket. Hibernate will now work for HTTP/2 transport as well, and stray or unrelated messages will no longer cancel hibernate (the process will handle the message and go back into hibernation). Thanks go to Stressgrid for benchmarking an early version of this commit: https://stressgrid.com/blog/cowboy_performance_part_2/
Diffstat (limited to 'src/cowboy_websocket.erl')
1 files changed, 79 insertions, 21 deletions
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index b3600be..e7d8f31 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -66,6 +66,7 @@
-type opts() :: #{
+ active_n => pos_integer(),
compress => boolean(),
deflate_opts => cow_ws:deflate_opts(),
idle_timeout => timeout(),
@@ -85,7 +86,8 @@
handler :: module(),
key = undefined :: undefined | binary(),
timeout_ref = undefined :: undefined | reference(),
- messages = undefined :: undefined | {atom(), atom(), atom()},
+ messages = undefined :: undefined | {atom(), atom(), atom()}
+ | {atom(), atom(), atom(), atom()},
hibernate = false :: boolean(),
frag_state = undefined :: cow_ws:frag_state(),
frag_buffer = <<>> :: binary(),
@@ -300,28 +302,71 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
%% we still want to process that data if any.
case erlang:function_exported(Handler, websocket_init, 1) of
true -> handler_call(State, HandlerState, #ps_header{buffer=Buffer},
- websocket_init, undefined, fun parse_header/3);
- false -> parse_header(State, HandlerState, #ps_header{buffer=Buffer})
+ websocket_init, undefined, fun after_init/3);
+ false -> after_init(State, HandlerState, #ps_header{buffer=Buffer})
-before_loop(State=#state{active=false}, HandlerState, ParseState) ->
- loop(State, HandlerState, ParseState);
-%% @todo We probably shouldn't do the setopts if we have not received a socket message.
-%% @todo We need to hibernate when HTTP/2 is used too.
-before_loop(State=#state{socket=Stream={Pid, _}, transport=undefined},
- HandlerState, ParseState) ->
+after_init(State=#state{active=true}, HandlerState, ParseState) ->
+ %% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2.
+ %% We must do this only after calling websocket_init/1 (if any)
+ %% to give the handler a chance to disable active mode immediately.
+ setopts_active(State),
+ maybe_read_body(State),
+ parse_header(State, HandlerState, ParseState);
+after_init(State, HandlerState, ParseState) ->
+ parse_header(State, HandlerState, ParseState).
+%% We have two ways of reading the body for Websocket. For HTTP/1.1
+%% we have full control of the socket and can therefore use active,N.
+%% For HTTP/2 we are just a stream, and are instead using read_body
+%% (automatic mode). Technically HTTP/2 will only go passive after
+%% receiving the next data message, while HTTP/1.1 goes passive
+%% immediately but there might still be data to be processed in
+%% the message queue.
+setopts_active(#state{transport=undefined}) ->
+ ok;
+setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
+ N = maps:get(active_n, Opts, 100),
+ Transport:setopts(Socket, [{active, N}]).
+maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
%% @todo Keep Ref around.
ReadBodyRef = make_ref(),
Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}},
- loop(State, HandlerState, ParseState);
-before_loop(State=#state{socket=Socket, transport=Transport, hibernate=true},
- HandlerState, ParseState) ->
- Transport:setopts(Socket, [{active, once}]),
+ ok;
+maybe_read_body(_) ->
+ ok.
+active(State) ->
+ setopts_active(State),
+ maybe_read_body(State),
+ State#state{active=true}.
+passive(State=#state{transport=undefined}) ->
+ %% Unfortunately we cannot currently cancel read_body.
+ %% But that's OK, we will just stop reading the body
+ %% after the next message.
+ State#state{active=false};
+passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) ->
+ Transport:setopts(Socket, [{active, false}]),
+ flush_passive(Socket, Messages),
+ State#state{active=false}.
+flush_passive(Socket, Messages) ->
+ receive
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ flush_passive(Socket, Messages)
+ after 0 ->
+ ok
+ end.
+before_loop(State=#state{hibernate=true}, HandlerState, ParseState) ->
proc_lib:hibernate(?MODULE, loop,
[State#state{hibernate=false}, HandlerState, ParseState]);
-before_loop(State=#state{socket=Socket, transport=Transport},
- HandlerState, ParseState) ->
- Transport:setopts(Socket, [{active, once}]),
+before_loop(State, HandlerState, ParseState) ->
loop(State, HandlerState, ParseState).
-spec loop_timeout(#state{}) -> #state{}.
@@ -350,22 +395,28 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
terminate(State, HandlerState, {error, closed});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
terminate(State, HandlerState, {error, Reason});
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ setopts_active(State),
+ loop(State, HandlerState, ParseState);
%% Body reading messages. (HTTP/2)
{request_body, _Ref, nofin, Data} ->
+ maybe_read_body(State),
State2 = loop_timeout(State),
parse(State2, HandlerState, ParseState, Data);
%% @todo We need to handle this case as if it was an {error, closed}
%% but not before we finish processing frames. We probably should have
%% a check in before_loop to let us stop looping if a flag is set.
{request_body, _Ref, fin, _, Data} ->
+ maybe_read_body(State),
State2 = loop_timeout(State),
parse(State2, HandlerState, ParseState, Data);
%% Timeouts.
{timeout, TRef, ?MODULE} ->
websocket_close(State, HandlerState, timeout);
{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
- %% @todo This should call before_loop.
- loop(State, HandlerState, ParseState);
+ before_loop(State, HandlerState, ParseState);
%% System messages.
{'EXIT', Parent, Reason} ->
%% @todo We should exit gracefully.
@@ -376,8 +427,7 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
%% Calls from supervisor module.
{'$gen_call', From, Call} ->
cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
- %% @todo This should call before_loop.
- loop(State, HandlerState, ParseState);
+ before_loop(State, HandlerState, ParseState);
Message ->
handler_call(State, HandlerState, ParseState,
websocket_info, Message, fun before_loop/3)
@@ -531,7 +581,15 @@ commands([], State, []) ->
commands([], State, Data) ->
Result = transport_send(State, nofin, lists:reverse(Data)),
{Result, State};
-commands([{active, Active}|Tail], State, Data) when is_boolean(Active) ->
+commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_boolean(Active) ->
+ State = if
+ Active, not Active0 ->
+ active(State0);
+ Active0, not Active ->
+ passive(State0);
+ true ->
+ State0
+ end,
commands(Tail, State#state{active=Active}, Data);
commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) ->
commands(Tail, State#state{deflate=Deflate}, Data);