aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cowboy_handler.erl14
-rw-r--r--src/cowboy_http.erl5
-rw-r--r--src/cowboy_loop.erl126
-rw-r--r--src/cowboy_rest.erl17
-rw-r--r--src/cowboy_sub_protocol.erl6
-rw-r--r--src/cowboy_websocket.erl34
6 files changed, 78 insertions, 124 deletions
diff --git a/src/cowboy_handler.erl b/src/cowboy_handler.erl
index 61c1b6a..3249f76 100644
--- a/src/cowboy_handler.erl
+++ b/src/cowboy_handler.erl
@@ -25,9 +25,7 @@
-callback init(Req, any())
-> {ok | module(), Req, any()}
- | {module(), Req, any(), hibernate}
- | {module(), Req, any(), timeout()}
- | {module(), Req, any(), timeout(), hibernate}
+ | {module(), Req, any(), any()}
when Req::cowboy_req:req().
-callback terminate(any(), cowboy_req:req(), any()) -> ok.
@@ -41,13 +39,9 @@ execute(Req, Env=#{handler := Handler, handler_opts := HandlerOpts}) ->
Result = terminate(normal, Req2, State, Handler),
{ok, Req2, Env#{result => Result}};
{Mod, Req2, State} ->
- Mod:upgrade(Req2, Env, Handler, State, infinity, run);
- {Mod, Req2, State, hibernate} ->
- Mod:upgrade(Req2, Env, Handler, State, infinity, hibernate);
- {Mod, Req2, State, Timeout} ->
- Mod:upgrade(Req2, Env, Handler, State, Timeout, run);
- {Mod, Req2, State, Timeout, hibernate} ->
- Mod:upgrade(Req2, Env, Handler, State, Timeout, hibernate)
+ Mod:upgrade(Req2, Env, Handler, State);
+ {Mod, Req2, State, Opts} ->
+ Mod:upgrade(Req2, Env, Handler, State, Opts)
catch Class:Reason ->
terminate({crash, Class, Reason}, Req, HandlerOpts, Handler),
erlang:raise(Class, Reason, erlang:get_stacktrace())
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index 8615c85..ac0d915 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -136,11 +136,10 @@ init(Parent, Ref, Socket, Transport, Opts) ->
%% Timeouts:
%% - waiting for new request (if no stream is currently running)
%% -> request_timeout: for whole request/headers, set at init/when we set ps_request_line{} state
-%% - waiting for body (if a stream requested the body to be read)
-%% -> read_body_timeout: amount of time we wait without receiving any data when reading the body
+%% - waiting for new request, or body (when a stream is currently running)
+%% -> idle_timeout: amount of time we wait without receiving any data
%% - if we skip the body, skip only for a specific duration
%% -> skip_body_timeout: also have a skip_body_length
-%% - none if we have a stream running and it didn't request the body to be read
%% - global
%% -> inactivity_timeout: max time to wait without anything happening before giving up
diff --git a/src/cowboy_loop.erl b/src/cowboy_loop.erl
index 117446a..7492350 100644
--- a/src/cowboy_loop.erl
+++ b/src/cowboy_loop.erl
@@ -12,27 +12,18 @@
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-%% When using loop handlers, we are receiving data from the socket because we
-%% want to know when the socket gets closed. This is generally not an issue
-%% because these kinds of requests are generally not pipelined, and don't have
-%% a body. If they do have a body, this body is often read in the
-%% <em>init/2</em> callback and this is no problem. Otherwise, this data
-%% accumulates in a buffer until we reach a certain threshold of 5000 bytes
-%% by default. This can be configured through the <em>loop_max_buffer</em>
-%% environment value. The request will be terminated with an
-%% <em>{error, overflow}</em> reason if this threshold is reached.
-module(cowboy_loop).
-behaviour(cowboy_sub_protocol).
--export([upgrade/6]).
+-export([upgrade/4]).
+-export([upgrade/5]).
-export([loop/4]).
-callback init(Req, any())
-> {ok | module(), Req, any()}
- | {module(), Req, any(), hibernate}
- | {module(), Req, any(), timeout()}
- | {module(), Req, any(), timeout(), hibernate}
+ | {module(), Req, any(), any()}
when Req::cowboy_req:req().
+
-callback info(any(), Req, State)
-> {ok, Req, State}
| {ok, Req, State, hibernate}
@@ -42,97 +33,44 @@
-callback terminate(any(), cowboy_req:req(), any()) -> ok.
-optional_callbacks([terminate/3]).
--record(state, {
- env :: cowboy_middleware:env(),
- hibernate = false :: boolean(),
- buffer_size = 0 :: non_neg_integer(),
- max_buffer = 5000 :: non_neg_integer() | infinity,
- timeout = infinity :: timeout(),
- timeout_ref = undefined :: undefined | reference()
-}).
-
--spec upgrade(Req, Env, module(), any(), timeout(), run | hibernate)
- -> {ok, Req, Env} | {suspend, module(), atom(), [any()]}
+-spec upgrade(Req, Env, module(), any())
+ -> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
-upgrade(Req, Env, Handler, HandlerState, Timeout, Hibernate) ->
- State = #state{env=Env, max_buffer=get_max_buffer(Env), timeout=Timeout,
- hibernate=Hibernate =:= hibernate},
- State2 = timeout(State),
- before_loop(Req, State2, Handler, HandlerState).
-
-get_max_buffer(#{loop_max_buffer := MaxBuffer}) -> MaxBuffer;
-get_max_buffer(_) -> 5000.
-
-before_loop(Req, State=#state{hibernate=true}, Handler, HandlerState) ->
-
- %% @todo Yeah we can't get the socket anymore.
- %% Everything changes since we are a separate process now.
- %% Proper flow control at the connection level should be implemented
- %% instead of what we have here.
-
-% [Socket, Transport] = cowboy_req:get([socket, transport], Req),
-% Transport:setopts(Socket, [{active, once}]),
- {suspend, ?MODULE, loop, [Req, State#state{hibernate=false}, Handler, HandlerState]};
-before_loop(Req, State, Handler, HandlerState) ->
-
- %% Same here.
-
-% [Socket, Transport] = cowboy_req:get([socket, transport], Req),
-% Transport:setopts(Socket, [{active, once}]),
- loop(Req, State, Handler, HandlerState).
+upgrade(Req, Env, Handler, HandlerState) ->
+ loop(Req, Env, Handler, HandlerState).
-%% Almost the same code can be found in cowboy_websocket.
-timeout(State=#state{timeout=infinity}) ->
- State#state{timeout_ref=undefined};
-timeout(State=#state{timeout=Timeout,
- timeout_ref=PrevRef}) ->
- _ = case PrevRef of
- undefined -> ignore%;
-% @todo PrevRef -> erlang:cancel_timer(PrevRef)
- end,
- TRef = erlang:start_timer(Timeout, self(), ?MODULE),
- State#state{timeout_ref=TRef}.
+-spec upgrade(Req, Env, module(), any(), hibernate)
+ -> {suspend, ?MODULE, loop, [any()]}
+ when Req::cowboy_req:req(), Env::cowboy_middleware:env().
+upgrade(Req, Env, Handler, HandlerState, hibernate) ->
+ suspend(Req, Env, Handler, HandlerState).
--spec loop(Req, #state{}, module(), any())
- -> {ok, Req, cowboy_middleware:env()} | {suspend, module(), atom(), [any()]}
- when Req::cowboy_req:req().
-loop(Req, State=#state{timeout_ref=TRef}, Handler, HandlerState) ->
+-spec loop(Req, Env, module(), any())
+ -> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
+ when Req::cowboy_req:req(), Env::cowboy_middleware:env().
+%% @todo Handle system messages.
+loop(Req, Env, Handler, HandlerState) ->
receive
- {timeout, TRef, ?MODULE} ->
- terminate(Req, State, Handler, HandlerState, timeout);
- {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
- loop(Req, State, Handler, HandlerState);
Message ->
- call(Req, State, Handler, HandlerState, Message)
+ call(Req, Env, Handler, HandlerState, Message)
end.
-call(Req, State, Handler, HandlerState, Message) ->
- try Handler:info(Message, Req, HandlerState) of
- {ok, Req2, HandlerState2} ->
- before_loop(Req2, State, Handler, HandlerState2);
- {ok, Req2, HandlerState2, hibernate} ->
- before_loop(Req2, State#state{hibernate=true}, Handler, HandlerState2);
- {stop, Req2, HandlerState2} ->
- terminate(Req2, State, Handler, HandlerState2, stop)
+call(Req0, Env, Handler, HandlerState0, Message) ->
+ try Handler:info(Message, Req0, HandlerState0) of
+ {ok, Req, HandlerState} ->
+ loop(Req, Env, Handler, HandlerState);
+ {ok, Req, HandlerState, hibernate} ->
+ suspend(Req, Env, Handler, HandlerState);
+ {stop, Req, HandlerState} ->
+ terminate(Req, Env, Handler, HandlerState, stop)
catch Class:Reason ->
- cowboy_handler:terminate({crash, Class, Reason}, Req, HandlerState, Handler),
+ cowboy_handler:terminate({crash, Class, Reason}, Req0, HandlerState0, Handler),
erlang:raise(Class, Reason, erlang:get_stacktrace())
end.
-terminate(Req, #state{env=Env, timeout_ref=TRef},
- Handler, HandlerState, Reason) ->
- _ = case TRef of
- undefined -> ignore;
- TRef -> erlang:cancel_timer(TRef)
- end,
- flush_timeouts(),
+suspend(Req, Env, Handler, HandlerState) ->
+ {suspend, ?MODULE, loop, [Req, Env, Handler, HandlerState]}.
+
+terminate(Req, Env, Handler, HandlerState, Reason) ->
Result = cowboy_handler:terminate(Reason, Req, HandlerState, Handler),
{ok, Req, Env#{result => Result}}.
-
-flush_timeouts() ->
- receive
- {timeout, TRef, ?MODULE} when is_reference(TRef) ->
- flush_timeouts()
- after 0 ->
- ok
- end.
diff --git a/src/cowboy_rest.erl b/src/cowboy_rest.erl
index 49e1946..21d56a5 100644
--- a/src/cowboy_rest.erl
+++ b/src/cowboy_rest.erl
@@ -17,15 +17,14 @@
-module(cowboy_rest).
-behaviour(cowboy_sub_protocol).
--export([upgrade/6]).
+-export([upgrade/4]).
+-export([upgrade/5]).
%% Common handler callbacks.
-callback init(Req, any())
-> {ok | module(), Req, any()}
- | {module(), Req, any(), hibernate}
- | {module(), Req, any(), timeout()}
- | {module(), Req, any(), timeout(), hibernate}
+ | {module(), Req, any(), any()}
when Req::cowboy_req:req().
-callback terminate(any(), cowboy_req:req(), any()) -> ok.
@@ -232,14 +231,20 @@
expires :: undefined | no_call | calendar:datetime() | binary()
}).
--spec upgrade(Req, Env, module(), any(), infinity, run)
+-spec upgrade(Req, Env, module(), any())
-> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env().
-upgrade(Req0, Env, Handler, HandlerState, infinity, run) ->
+upgrade(Req0, Env, Handler, HandlerState) ->
Method = cowboy_req:method(Req0),
{ok, Req, Result} = service_available(Req0, #state{method=Method,
handler=Handler, handler_state=HandlerState}),
{ok, Req, Env#{result => Result}}.
+-spec upgrade(Req, Env, module(), any(), any())
+ -> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env().
+%% cowboy_rest takes no options.
+upgrade(Req, Env, Handler, HandlerState, _Opts) ->
+ upgrade(Req, Env, Handler, HandlerState).
+
service_available(Req, State) ->
expect(Req, State, service_available, true, fun known_methods/2, 503).
diff --git a/src/cowboy_sub_protocol.erl b/src/cowboy_sub_protocol.erl
index e068b0b..6714289 100644
--- a/src/cowboy_sub_protocol.erl
+++ b/src/cowboy_sub_protocol.erl
@@ -15,6 +15,10 @@
-module(cowboy_sub_protocol).
--callback upgrade(Req, Env, module(), any(), timeout(), run | hibernate)
+-callback upgrade(Req, Env, module(), any())
+ -> {ok, Req, Env} | {suspend, module(), atom(), [any()]} | {stop, Req}
+ when Req::cowboy_req:req(), Env::cowboy_middleware:env().
+
+-callback upgrade(Req, Env, module(), any(), any())
-> {ok, Req, Env} | {suspend, module(), atom(), [any()]} | {stop, Req}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index 62210d9..e20c111 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -17,7 +17,8 @@
-module(cowboy_websocket).
-behaviour(cowboy_sub_protocol).
--export([upgrade/6]).
+-export([upgrade/4]).
+-export([upgrade/5]).
-export([takeover/7]).
-export([handler_loop/3]).
@@ -34,9 +35,7 @@
-callback init(Req, any())
-> {ok | module(), Req, any()}
- | {module(), Req, any(), hibernate}
- | {module(), Req, any(), timeout()}
- | {module(), Req, any(), timeout(), hibernate}
+ | {module(), Req, any(), any()}
when Req::cowboy_req:req().
-callback websocket_init(State)
@@ -53,6 +52,12 @@
-callback terminate(any(), cowboy_req:req(), any()) -> ok.
-optional_callbacks([terminate/3]).
+-type opts() :: #{
+ idle_timeout => timeout(),
+ compress => boolean()
+}.
+-export_type([opts/0]).
+
-record(state, {
socket = undefined :: inet:socket() | undefined,
transport = undefined :: module(),
@@ -60,6 +65,7 @@
key = undefined :: undefined | binary(),
timeout = infinity :: timeout(),
timeout_ref = undefined :: undefined | reference(),
+ compress = false :: boolean(),
messages = undefined :: undefined | {atom(), atom(), atom()},
hibernate = false :: boolean(),
frag_state = undefined :: cow_ws:frag_state(),
@@ -70,14 +76,22 @@
%% Stream process.
--spec upgrade(Req, Env, module(), any(), timeout(), run | hibernate)
+-spec upgrade(Req, Env, module(), any())
+ -> {ok, Req, Env}
+ when Req::cowboy_req:req(), Env::cowboy_middleware:env().
+upgrade(Req, Env, Handler, HandlerState) ->
+ upgrade(Req, Env, Handler, HandlerState, #{}).
+
+-spec upgrade(Req, Env, module(), any(), opts())
-> {ok, Req, Env}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
%% @todo Immediately crash if a response has already been sent.
%% @todo Error out if HTTP/2.
-upgrade(Req0, Env, Handler, HandlerState, Timeout, Hibernate) ->
- try websocket_upgrade(#state{handler=Handler, timeout=Timeout,
- hibernate=Hibernate =:= hibernate}, Req0) of
+upgrade(Req0, Env, Handler, HandlerState, Opts) ->
+ Timeout = maps:get(idle_timeout, Opts, 60000),
+ Compress = maps:get(compress, Opts, false),
+ State0 = #state{handler=Handler, timeout=Timeout, compress=Compress},
+ try websocket_upgrade(State0, Req0) of
{ok, State, Req} ->
websocket_handshake(State, Req, HandlerState, Env)
catch _:_ ->
@@ -104,14 +118,13 @@ websocket_upgrade(State, Req) ->
-spec websocket_extensions(#state{}, Req)
-> {ok, #state{}, Req} when Req::cowboy_req:req().
-websocket_extensions(State, Req=#{ref := Ref}) ->
+websocket_extensions(State=#state{compress=Compress}, Req) ->
%% @todo We want different options for this. For example
%% * compress everything auto
%% * compress only text auto
%% * compress only binary auto
%% * compress nothing auto (but still enabled it)
%% * disable compression
- Compress = maps:get(websocket_compress, ranch:get_protocol_options(Ref), false),
case {Compress, cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req)} of
{true, Extensions} when Extensions =/= undefined ->
websocket_extensions(State, Req, Extensions, []);
@@ -170,6 +183,7 @@ websocket_handshake(State=#state{key=Key},
{#state{}, any()}) -> ok.
takeover(_Parent, Ref, Socket, Transport, _Opts, Buffer,
{State0=#state{handler=Handler}, HandlerState}) ->
+ %% @todo We should have an option to disable this behavior.
ranch:remove_connection(Ref),
State1 = handler_loop_timeout(State0#state{socket=Socket, transport=Transport}),
State = State1#state{key=undefined, messages=Transport:messages()},