From a45813c60f0f983a24ea29d491b37f0590fdd087 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Sat, 18 Feb 2017 18:26:20 +0100 Subject: Allow passing options to sub protocols Before this commit we had an issue where configuring a Websocket connection was simply not possible without doing magic, adding callbacks or extra return values. The init/2 function only allowed setting hibernate and timeout options. After this commit, when switching to a different type of handler you can either return {module, Req, State} or {module, Req, State, Opts} where Opts is any value (as far as the sub protocol interface is concerned) and is ultimately checked by the custom handlers. A large protocol like Websocket would accept only a map there, with many different options, while a small interface like loop handlers would allow passing hibernate and nothing else. For Websocket, hibernate must be set from the websocket_init/1 callback, because init/2 executes in a separate process. Sub protocols now have two callbacks: one with the Opts value, one without. The loop handler code was largely reworked and simplified. It does not need to manage a timeout or read from the socket anymore, it's the job of the protocol code. A lot of unnecessary stuff was therefore removed. Websocket compression must now be enabled from the handler options instead of per listener. This means that a project can have two separate Websocket handlers with different options. Compression is still disabled by default, and the idle_timeout value was changed from inifnity to 60000 (60 seconds), as that's safer and is also a good value for mobile devices. --- src/cowboy_handler.erl | 14 ++--- src/cowboy_http.erl | 5 +- src/cowboy_loop.erl | 126 +++++++++++--------------------------------- src/cowboy_rest.erl | 17 +++--- src/cowboy_sub_protocol.erl | 6 ++- src/cowboy_websocket.erl | 34 ++++++++---- 6 files changed, 78 insertions(+), 124 deletions(-) (limited to 'src') diff --git a/src/cowboy_handler.erl b/src/cowboy_handler.erl index 61c1b6a..3249f76 100644 --- a/src/cowboy_handler.erl +++ b/src/cowboy_handler.erl @@ -25,9 +25,7 @@ -callback init(Req, any()) -> {ok | module(), Req, any()} - | {module(), Req, any(), hibernate} - | {module(), Req, any(), timeout()} - | {module(), Req, any(), timeout(), hibernate} + | {module(), Req, any(), any()} when Req::cowboy_req:req(). -callback terminate(any(), cowboy_req:req(), any()) -> ok. @@ -41,13 +39,9 @@ execute(Req, Env=#{handler := Handler, handler_opts := HandlerOpts}) -> Result = terminate(normal, Req2, State, Handler), {ok, Req2, Env#{result => Result}}; {Mod, Req2, State} -> - Mod:upgrade(Req2, Env, Handler, State, infinity, run); - {Mod, Req2, State, hibernate} -> - Mod:upgrade(Req2, Env, Handler, State, infinity, hibernate); - {Mod, Req2, State, Timeout} -> - Mod:upgrade(Req2, Env, Handler, State, Timeout, run); - {Mod, Req2, State, Timeout, hibernate} -> - Mod:upgrade(Req2, Env, Handler, State, Timeout, hibernate) + Mod:upgrade(Req2, Env, Handler, State); + {Mod, Req2, State, Opts} -> + Mod:upgrade(Req2, Env, Handler, State, Opts) catch Class:Reason -> terminate({crash, Class, Reason}, Req, HandlerOpts, Handler), erlang:raise(Class, Reason, erlang:get_stacktrace()) diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 8615c85..ac0d915 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -136,11 +136,10 @@ init(Parent, Ref, Socket, Transport, Opts) -> %% Timeouts: %% - waiting for new request (if no stream is currently running) %% -> request_timeout: for whole request/headers, set at init/when we set ps_request_line{} state -%% - waiting for body (if a stream requested the body to be read) -%% -> read_body_timeout: amount of time we wait without receiving any data when reading the body +%% - waiting for new request, or body (when a stream is currently running) +%% -> idle_timeout: amount of time we wait without receiving any data %% - if we skip the body, skip only for a specific duration %% -> skip_body_timeout: also have a skip_body_length -%% - none if we have a stream running and it didn't request the body to be read %% - global %% -> inactivity_timeout: max time to wait without anything happening before giving up diff --git a/src/cowboy_loop.erl b/src/cowboy_loop.erl index 117446a..7492350 100644 --- a/src/cowboy_loop.erl +++ b/src/cowboy_loop.erl @@ -12,27 +12,18 @@ %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -%% When using loop handlers, we are receiving data from the socket because we -%% want to know when the socket gets closed. This is generally not an issue -%% because these kinds of requests are generally not pipelined, and don't have -%% a body. If they do have a body, this body is often read in the -%% init/2 callback and this is no problem. Otherwise, this data -%% accumulates in a buffer until we reach a certain threshold of 5000 bytes -%% by default. This can be configured through the loop_max_buffer -%% environment value. The request will be terminated with an -%% {error, overflow} reason if this threshold is reached. -module(cowboy_loop). -behaviour(cowboy_sub_protocol). --export([upgrade/6]). +-export([upgrade/4]). +-export([upgrade/5]). -export([loop/4]). -callback init(Req, any()) -> {ok | module(), Req, any()} - | {module(), Req, any(), hibernate} - | {module(), Req, any(), timeout()} - | {module(), Req, any(), timeout(), hibernate} + | {module(), Req, any(), any()} when Req::cowboy_req:req(). + -callback info(any(), Req, State) -> {ok, Req, State} | {ok, Req, State, hibernate} @@ -42,97 +33,44 @@ -callback terminate(any(), cowboy_req:req(), any()) -> ok. -optional_callbacks([terminate/3]). --record(state, { - env :: cowboy_middleware:env(), - hibernate = false :: boolean(), - buffer_size = 0 :: non_neg_integer(), - max_buffer = 5000 :: non_neg_integer() | infinity, - timeout = infinity :: timeout(), - timeout_ref = undefined :: undefined | reference() -}). - --spec upgrade(Req, Env, module(), any(), timeout(), run | hibernate) - -> {ok, Req, Env} | {suspend, module(), atom(), [any()]} +-spec upgrade(Req, Env, module(), any()) + -> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). -upgrade(Req, Env, Handler, HandlerState, Timeout, Hibernate) -> - State = #state{env=Env, max_buffer=get_max_buffer(Env), timeout=Timeout, - hibernate=Hibernate =:= hibernate}, - State2 = timeout(State), - before_loop(Req, State2, Handler, HandlerState). - -get_max_buffer(#{loop_max_buffer := MaxBuffer}) -> MaxBuffer; -get_max_buffer(_) -> 5000. - -before_loop(Req, State=#state{hibernate=true}, Handler, HandlerState) -> - - %% @todo Yeah we can't get the socket anymore. - %% Everything changes since we are a separate process now. - %% Proper flow control at the connection level should be implemented - %% instead of what we have here. - -% [Socket, Transport] = cowboy_req:get([socket, transport], Req), -% Transport:setopts(Socket, [{active, once}]), - {suspend, ?MODULE, loop, [Req, State#state{hibernate=false}, Handler, HandlerState]}; -before_loop(Req, State, Handler, HandlerState) -> - - %% Same here. - -% [Socket, Transport] = cowboy_req:get([socket, transport], Req), -% Transport:setopts(Socket, [{active, once}]), - loop(Req, State, Handler, HandlerState). +upgrade(Req, Env, Handler, HandlerState) -> + loop(Req, Env, Handler, HandlerState). -%% Almost the same code can be found in cowboy_websocket. -timeout(State=#state{timeout=infinity}) -> - State#state{timeout_ref=undefined}; -timeout(State=#state{timeout=Timeout, - timeout_ref=PrevRef}) -> - _ = case PrevRef of - undefined -> ignore%; -% @todo PrevRef -> erlang:cancel_timer(PrevRef) - end, - TRef = erlang:start_timer(Timeout, self(), ?MODULE), - State#state{timeout_ref=TRef}. +-spec upgrade(Req, Env, module(), any(), hibernate) + -> {suspend, ?MODULE, loop, [any()]} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). +upgrade(Req, Env, Handler, HandlerState, hibernate) -> + suspend(Req, Env, Handler, HandlerState). --spec loop(Req, #state{}, module(), any()) - -> {ok, Req, cowboy_middleware:env()} | {suspend, module(), atom(), [any()]} - when Req::cowboy_req:req(). -loop(Req, State=#state{timeout_ref=TRef}, Handler, HandlerState) -> +-spec loop(Req, Env, module(), any()) + -> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). +%% @todo Handle system messages. +loop(Req, Env, Handler, HandlerState) -> receive - {timeout, TRef, ?MODULE} -> - terminate(Req, State, Handler, HandlerState, timeout); - {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> - loop(Req, State, Handler, HandlerState); Message -> - call(Req, State, Handler, HandlerState, Message) + call(Req, Env, Handler, HandlerState, Message) end. -call(Req, State, Handler, HandlerState, Message) -> - try Handler:info(Message, Req, HandlerState) of - {ok, Req2, HandlerState2} -> - before_loop(Req2, State, Handler, HandlerState2); - {ok, Req2, HandlerState2, hibernate} -> - before_loop(Req2, State#state{hibernate=true}, Handler, HandlerState2); - {stop, Req2, HandlerState2} -> - terminate(Req2, State, Handler, HandlerState2, stop) +call(Req0, Env, Handler, HandlerState0, Message) -> + try Handler:info(Message, Req0, HandlerState0) of + {ok, Req, HandlerState} -> + loop(Req, Env, Handler, HandlerState); + {ok, Req, HandlerState, hibernate} -> + suspend(Req, Env, Handler, HandlerState); + {stop, Req, HandlerState} -> + terminate(Req, Env, Handler, HandlerState, stop) catch Class:Reason -> - cowboy_handler:terminate({crash, Class, Reason}, Req, HandlerState, Handler), + cowboy_handler:terminate({crash, Class, Reason}, Req0, HandlerState0, Handler), erlang:raise(Class, Reason, erlang:get_stacktrace()) end. -terminate(Req, #state{env=Env, timeout_ref=TRef}, - Handler, HandlerState, Reason) -> - _ = case TRef of - undefined -> ignore; - TRef -> erlang:cancel_timer(TRef) - end, - flush_timeouts(), +suspend(Req, Env, Handler, HandlerState) -> + {suspend, ?MODULE, loop, [Req, Env, Handler, HandlerState]}. + +terminate(Req, Env, Handler, HandlerState, Reason) -> Result = cowboy_handler:terminate(Reason, Req, HandlerState, Handler), {ok, Req, Env#{result => Result}}. - -flush_timeouts() -> - receive - {timeout, TRef, ?MODULE} when is_reference(TRef) -> - flush_timeouts() - after 0 -> - ok - end. diff --git a/src/cowboy_rest.erl b/src/cowboy_rest.erl index 49e1946..21d56a5 100644 --- a/src/cowboy_rest.erl +++ b/src/cowboy_rest.erl @@ -17,15 +17,14 @@ -module(cowboy_rest). -behaviour(cowboy_sub_protocol). --export([upgrade/6]). +-export([upgrade/4]). +-export([upgrade/5]). %% Common handler callbacks. -callback init(Req, any()) -> {ok | module(), Req, any()} - | {module(), Req, any(), hibernate} - | {module(), Req, any(), timeout()} - | {module(), Req, any(), timeout(), hibernate} + | {module(), Req, any(), any()} when Req::cowboy_req:req(). -callback terminate(any(), cowboy_req:req(), any()) -> ok. @@ -232,14 +231,20 @@ expires :: undefined | no_call | calendar:datetime() | binary() }). --spec upgrade(Req, Env, module(), any(), infinity, run) +-spec upgrade(Req, Env, module(), any()) -> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). -upgrade(Req0, Env, Handler, HandlerState, infinity, run) -> +upgrade(Req0, Env, Handler, HandlerState) -> Method = cowboy_req:method(Req0), {ok, Req, Result} = service_available(Req0, #state{method=Method, handler=Handler, handler_state=HandlerState}), {ok, Req, Env#{result => Result}}. +-spec upgrade(Req, Env, module(), any(), any()) + -> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). +%% cowboy_rest takes no options. +upgrade(Req, Env, Handler, HandlerState, _Opts) -> + upgrade(Req, Env, Handler, HandlerState). + service_available(Req, State) -> expect(Req, State, service_available, true, fun known_methods/2, 503). diff --git a/src/cowboy_sub_protocol.erl b/src/cowboy_sub_protocol.erl index e068b0b..6714289 100644 --- a/src/cowboy_sub_protocol.erl +++ b/src/cowboy_sub_protocol.erl @@ -15,6 +15,10 @@ -module(cowboy_sub_protocol). --callback upgrade(Req, Env, module(), any(), timeout(), run | hibernate) +-callback upgrade(Req, Env, module(), any()) + -> {ok, Req, Env} | {suspend, module(), atom(), [any()]} | {stop, Req} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). + +-callback upgrade(Req, Env, module(), any(), any()) -> {ok, Req, Env} | {suspend, module(), atom(), [any()]} | {stop, Req} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index 62210d9..e20c111 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -17,7 +17,8 @@ -module(cowboy_websocket). -behaviour(cowboy_sub_protocol). --export([upgrade/6]). +-export([upgrade/4]). +-export([upgrade/5]). -export([takeover/7]). -export([handler_loop/3]). @@ -34,9 +35,7 @@ -callback init(Req, any()) -> {ok | module(), Req, any()} - | {module(), Req, any(), hibernate} - | {module(), Req, any(), timeout()} - | {module(), Req, any(), timeout(), hibernate} + | {module(), Req, any(), any()} when Req::cowboy_req:req(). -callback websocket_init(State) @@ -53,6 +52,12 @@ -callback terminate(any(), cowboy_req:req(), any()) -> ok. -optional_callbacks([terminate/3]). +-type opts() :: #{ + idle_timeout => timeout(), + compress => boolean() +}. +-export_type([opts/0]). + -record(state, { socket = undefined :: inet:socket() | undefined, transport = undefined :: module(), @@ -60,6 +65,7 @@ key = undefined :: undefined | binary(), timeout = infinity :: timeout(), timeout_ref = undefined :: undefined | reference(), + compress = false :: boolean(), messages = undefined :: undefined | {atom(), atom(), atom()}, hibernate = false :: boolean(), frag_state = undefined :: cow_ws:frag_state(), @@ -70,14 +76,22 @@ %% Stream process. --spec upgrade(Req, Env, module(), any(), timeout(), run | hibernate) +-spec upgrade(Req, Env, module(), any()) + -> {ok, Req, Env} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). +upgrade(Req, Env, Handler, HandlerState) -> + upgrade(Req, Env, Handler, HandlerState, #{}). + +-spec upgrade(Req, Env, module(), any(), opts()) -> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). %% @todo Immediately crash if a response has already been sent. %% @todo Error out if HTTP/2. -upgrade(Req0, Env, Handler, HandlerState, Timeout, Hibernate) -> - try websocket_upgrade(#state{handler=Handler, timeout=Timeout, - hibernate=Hibernate =:= hibernate}, Req0) of +upgrade(Req0, Env, Handler, HandlerState, Opts) -> + Timeout = maps:get(idle_timeout, Opts, 60000), + Compress = maps:get(compress, Opts, false), + State0 = #state{handler=Handler, timeout=Timeout, compress=Compress}, + try websocket_upgrade(State0, Req0) of {ok, State, Req} -> websocket_handshake(State, Req, HandlerState, Env) catch _:_ -> @@ -104,14 +118,13 @@ websocket_upgrade(State, Req) -> -spec websocket_extensions(#state{}, Req) -> {ok, #state{}, Req} when Req::cowboy_req:req(). -websocket_extensions(State, Req=#{ref := Ref}) -> +websocket_extensions(State=#state{compress=Compress}, Req) -> %% @todo We want different options for this. For example %% * compress everything auto %% * compress only text auto %% * compress only binary auto %% * compress nothing auto (but still enabled it) %% * disable compression - Compress = maps:get(websocket_compress, ranch:get_protocol_options(Ref), false), case {Compress, cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req)} of {true, Extensions} when Extensions =/= undefined -> websocket_extensions(State, Req, Extensions, []); @@ -170,6 +183,7 @@ websocket_handshake(State=#state{key=Key}, {#state{}, any()}) -> ok. takeover(_Parent, Ref, Socket, Transport, _Opts, Buffer, {State0=#state{handler=Handler}, HandlerState}) -> + %% @todo We should have an option to disable this behavior. ranch:remove_connection(Ref), State1 = handler_loop_timeout(State0#state{socket=Socket, transport=Transport}), State = State1#state{key=undefined, messages=Transport:messages()}, -- cgit v1.2.3