diff options
-rw-r--r-- | Makefile | 1 | ||||
-rw-r--r-- | src/cowboy_websocket.erl | 49 | ||||
-rw-r--r-- | test/ws_perf_SUITE.erl | 10 |
3 files changed, 45 insertions, 15 deletions
@@ -9,6 +9,7 @@ PROJECT_REGISTERED = cowboy_clock PLT_APPS = public_key ssl # ct_helper gun common_test inets CT_OPTS += -ct_hooks cowboy_ct_hook [] # -boot start_sasl +#CT_OPTS += +JPperf true +S 1 # Dependencies. diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index 3cc4d30..577de47 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -76,6 +76,14 @@ }. -export_type([opts/0]). +%% We don't want to reset the idle timeout too often, +%% so we don't reset it on data. Instead we reset the +%% number of ticks we have observed. We divide the +%% timeout value by a value and that value becomes +%% the number of ticks at which point we can drop +%% the connection. This value is the number of ticks. +-define(IDLE_TIMEOUT_TICKS, 10). + -record(state, { parent :: undefined | pid(), ref :: ranch:ref(), @@ -86,6 +94,7 @@ handler :: module(), key = undefined :: undefined | binary(), timeout_ref = undefined :: undefined | reference(), + timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS, messages = undefined :: undefined | {atom(), atom(), atom()} | {atom(), atom(), atom(), atom()}, hibernate = false :: boolean(), @@ -297,9 +306,9 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, undefined -> undefined; _ -> Transport:messages() end, - State = loop_timeout(State0#state{parent=Parent, + State = set_idle_timeout(State0#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - key=undefined, messages=Messages}), + key=undefined, messages=Messages}, 0), %% We call parse_header/3 immediately because there might be %% some data in the buffer that was sent along with the handshake. %% While it is not allowed by the protocol to send frames immediately, @@ -373,28 +382,39 @@ before_loop(State=#state{hibernate=true}, HandlerState, ParseState) -> before_loop(State, HandlerState, ParseState) -> loop(State, HandlerState, ParseState). --spec loop_timeout(#state{}) -> #state{}. -loop_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}) -> +-spec set_idle_timeout(#state{}, 0..?IDLE_TIMEOUT_TICKS) -> #state{}. + +set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) -> + %% Most of the time we don't need to cancel the timer since it + %% will have triggered already. But this call is harmless so + %% it is kept to simplify the code as we do need to cancel when + %% options are changed dynamically. _ = case PrevRef of undefined -> ignore; PrevRef -> erlang:cancel_timer(PrevRef) end, case maps:get(idle_timeout, Opts, 60000) of infinity -> - State#state{timeout_ref=undefined}; + State#state{timeout_ref=undefined, timeout_num=TimeoutNum}; Timeout -> - TRef = erlang:start_timer(Timeout, self(), ?MODULE), - State#state{timeout_ref=TRef} + TRef = erlang:start_timer(Timeout div ?IDLE_TIMEOUT_TICKS, self(), ?MODULE), + State#state{timeout_ref=TRef, timeout_num=TimeoutNum} end. +-define(reset_idle_timeout(State), State#state{timeout_num=0}). + +tick_idle_timeout(State=#state{timeout_num=?IDLE_TIMEOUT_TICKS}, HandlerState, _) -> + websocket_close(State, HandlerState, timeout); +tick_idle_timeout(State=#state{timeout_num=TimeoutNum}, HandlerState, ParseState) -> + before_loop(set_idle_timeout(State, TimeoutNum + 1), HandlerState, ParseState). + -spec loop(#state{}, any(), parse_state()) -> no_return(). loop(State=#state{parent=Parent, socket=Socket, messages=Messages, timeout_ref=TRef}, HandlerState, ParseState) -> receive %% Socket messages. (HTTP/1.1) {OK, Socket, Data} when OK =:= element(1, Messages) -> - State2 = loop_timeout(State), - parse(State2, HandlerState, ParseState, Data); + parse(?reset_idle_timeout(State), HandlerState, ParseState, Data); {Closed, Socket} when Closed =:= element(2, Messages) -> terminate(State, HandlerState, {error, closed}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> @@ -407,18 +427,16 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages, %% Body reading messages. (HTTP/2) {request_body, _Ref, nofin, Data} -> maybe_read_body(State), - State2 = loop_timeout(State), - parse(State2, HandlerState, ParseState, Data); + parse(?reset_idle_timeout(State), HandlerState, ParseState, Data); %% @todo We need to handle this case as if it was an {error, closed} %% but not before we finish processing frames. We probably should have %% a check in before_loop to let us stop looping if a flag is set. {request_body, _Ref, fin, _, Data} -> maybe_read_body(State), - State2 = loop_timeout(State), - parse(State2, HandlerState, ParseState, Data); + parse(?reset_idle_timeout(State), HandlerState, ParseState, Data); %% Timeouts. {timeout, TRef, ?MODULE} -> - websocket_close(State, HandlerState, timeout); + tick_idle_timeout(State, HandlerState, ParseState); {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> before_loop(State, HandlerState, ParseState); %% System messages. @@ -600,7 +618,8 @@ commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) -> commands([{set_options, SetOpts}|Tail], State0=#state{opts=Opts}, Data) -> State = case SetOpts of #{idle_timeout := IdleTimeout} -> - loop_timeout(State0#state{opts=Opts#{idle_timeout => IdleTimeout}}); + %% We reset the number of ticks when changing the idle_timeout option. + set_idle_timeout(State0#state{opts=Opts#{idle_timeout => IdleTimeout}}, 0); _ -> State0 end, diff --git a/test/ws_perf_SUITE.erl b/test/ws_perf_SUITE.erl index 2155d69..e02a60c 100644 --- a/test/ws_perf_SUITE.erl +++ b/test/ws_perf_SUITE.erl @@ -37,6 +37,14 @@ groups() -> {japanese, [], SubGroups} ]. +init_per_suite(Config) -> + %% Optionally enable `perf` for the current node. +% spawn(fun() -> ct:pal(os:cmd("perf record -g -F 9999 -o /tmp/ws_perf.data -p " ++ os:getpid() ++ " -- sleep 11")) end), + Config. + +end_per_suite(_Config) -> + ok. + init_per_group(Name=http, Config) -> ct:pal("Websocket over cleartext HTTP/1.1 (~s)", [init_data_info(Config)]), @@ -185,6 +193,8 @@ do_full(Config, What, Num, FrameSize) -> text -> do_text_data(Config, FrameSize); binary -> rand:bytes(FrameSize) end, + %% Heat up the processes before doing the real run. +% do_full1(ConnPid, StreamRef, Num, FrameType, FrameData), {Time, _} = timer:tc(?MODULE, do_full1, [ConnPid, StreamRef, Num, FrameType, FrameData]), do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]), gun:ws_send(ConnPid, StreamRef, close), |