diff options
Diffstat (limited to 'src/cowboy_websocket.erl')
-rw-r--r-- | src/cowboy_websocket.erl | 100 |
1 files changed, 79 insertions, 21 deletions
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index b3600be..e7d8f31 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -66,6 +66,7 @@ -optional_callbacks([terminate/3]). -type opts() :: #{ + active_n => pos_integer(), compress => boolean(), deflate_opts => cow_ws:deflate_opts(), idle_timeout => timeout(), @@ -85,7 +86,8 @@ handler :: module(), key = undefined :: undefined | binary(), timeout_ref = undefined :: undefined | reference(), - messages = undefined :: undefined | {atom(), atom(), atom()}, + messages = undefined :: undefined | {atom(), atom(), atom()} + | {atom(), atom(), atom(), atom()}, hibernate = false :: boolean(), frag_state = undefined :: cow_ws:frag_state(), frag_buffer = <<>> :: binary(), @@ -300,28 +302,71 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, %% we still want to process that data if any. case erlang:function_exported(Handler, websocket_init, 1) of true -> handler_call(State, HandlerState, #ps_header{buffer=Buffer}, - websocket_init, undefined, fun parse_header/3); - false -> parse_header(State, HandlerState, #ps_header{buffer=Buffer}) + websocket_init, undefined, fun after_init/3); + false -> after_init(State, HandlerState, #ps_header{buffer=Buffer}) end. -before_loop(State=#state{active=false}, HandlerState, ParseState) -> - loop(State, HandlerState, ParseState); -%% @todo We probably shouldn't do the setopts if we have not received a socket message. -%% @todo We need to hibernate when HTTP/2 is used too. -before_loop(State=#state{socket=Stream={Pid, _}, transport=undefined}, - HandlerState, ParseState) -> +after_init(State=#state{active=true}, HandlerState, ParseState) -> + %% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2. + %% We must do this only after calling websocket_init/1 (if any) + %% to give the handler a chance to disable active mode immediately. + setopts_active(State), + maybe_read_body(State), + parse_header(State, HandlerState, ParseState); +after_init(State, HandlerState, ParseState) -> + parse_header(State, HandlerState, ParseState). + +%% We have two ways of reading the body for Websocket. For HTTP/1.1 +%% we have full control of the socket and can therefore use active,N. +%% For HTTP/2 we are just a stream, and are instead using read_body +%% (automatic mode). Technically HTTP/2 will only go passive after +%% receiving the next data message, while HTTP/1.1 goes passive +%% immediately but there might still be data to be processed in +%% the message queue. + +setopts_active(#state{transport=undefined}) -> + ok; +setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> + N = maps:get(active_n, Opts, 100), + Transport:setopts(Socket, [{active, N}]). + +maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) -> %% @todo Keep Ref around. ReadBodyRef = make_ref(), Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}}, - loop(State, HandlerState, ParseState); -before_loop(State=#state{socket=Socket, transport=Transport, hibernate=true}, - HandlerState, ParseState) -> - Transport:setopts(Socket, [{active, once}]), + ok; +maybe_read_body(_) -> + ok. + +active(State) -> + setopts_active(State), + maybe_read_body(State), + State#state{active=true}. + +passive(State=#state{transport=undefined}) -> + %% Unfortunately we cannot currently cancel read_body. + %% But that's OK, we will just stop reading the body + %% after the next message. + State#state{active=false}; +passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) -> + Transport:setopts(Socket, [{active, false}]), + flush_passive(Socket, Messages), + State#state{active=false}. + +flush_passive(Socket, Messages) -> + receive + {Passive, Socket} when Passive =:= element(4, Messages); + %% Hardcoded for compatibility with Ranch 1.x. + Passive =:= tcp_passive; Passive =:= ssl_passive -> + flush_passive(Socket, Messages) + after 0 -> + ok + end. + +before_loop(State=#state{hibernate=true}, HandlerState, ParseState) -> proc_lib:hibernate(?MODULE, loop, [State#state{hibernate=false}, HandlerState, ParseState]); -before_loop(State=#state{socket=Socket, transport=Transport}, - HandlerState, ParseState) -> - Transport:setopts(Socket, [{active, once}]), +before_loop(State, HandlerState, ParseState) -> loop(State, HandlerState, ParseState). -spec loop_timeout(#state{}) -> #state{}. @@ -350,22 +395,28 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages, terminate(State, HandlerState, {error, closed}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> terminate(State, HandlerState, {error, Reason}); + {Passive, Socket} when Passive =:= element(4, Messages); + %% Hardcoded for compatibility with Ranch 1.x. + Passive =:= tcp_passive; Passive =:= ssl_passive -> + setopts_active(State), + loop(State, HandlerState, ParseState); %% Body reading messages. (HTTP/2) {request_body, _Ref, nofin, Data} -> + maybe_read_body(State), State2 = loop_timeout(State), parse(State2, HandlerState, ParseState, Data); %% @todo We need to handle this case as if it was an {error, closed} %% but not before we finish processing frames. We probably should have %% a check in before_loop to let us stop looping if a flag is set. {request_body, _Ref, fin, _, Data} -> + maybe_read_body(State), State2 = loop_timeout(State), parse(State2, HandlerState, ParseState, Data); %% Timeouts. {timeout, TRef, ?MODULE} -> websocket_close(State, HandlerState, timeout); {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> - %% @todo This should call before_loop. - loop(State, HandlerState, ParseState); + before_loop(State, HandlerState, ParseState); %% System messages. {'EXIT', Parent, Reason} -> %% @todo We should exit gracefully. @@ -376,8 +427,7 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages, %% Calls from supervisor module. {'$gen_call', From, Call} -> cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE), - %% @todo This should call before_loop. - loop(State, HandlerState, ParseState); + before_loop(State, HandlerState, ParseState); Message -> handler_call(State, HandlerState, ParseState, websocket_info, Message, fun before_loop/3) @@ -531,7 +581,15 @@ commands([], State, []) -> commands([], State, Data) -> Result = transport_send(State, nofin, lists:reverse(Data)), {Result, State}; -commands([{active, Active}|Tail], State, Data) when is_boolean(Active) -> +commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_boolean(Active) -> + State = if + Active, not Active0 -> + active(State0); + Active0, not Active -> + passive(State0); + true -> + State0 + end, commands(Tail, State#state{active=Active}, Data); commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) -> commands(Tail, State#state{deflate=Deflate}, Data); |