aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_long_polling.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cowboy_long_polling.erl')
-rw-r--r--src/cowboy_long_polling.erl191
1 files changed, 191 insertions, 0 deletions
diff --git a/src/cowboy_long_polling.erl b/src/cowboy_long_polling.erl
new file mode 100644
index 0000000..6616a78
--- /dev/null
+++ b/src/cowboy_long_polling.erl
@@ -0,0 +1,191 @@
+%% Copyright (c) 2011-2014, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% When using loop handlers, we are receiving data from the socket because we
+%% want to know when the socket gets closed. This is generally not an issue
+%% because these kinds of requests are generally not pipelined, and don't have
+%% a body. If they do have a body, this body is often read in the
+%% <em>init/2</em> callback and this is no problem. Otherwise, this data
+%% accumulates in a buffer until we reach a certain threshold of 5000 bytes
+%% by default. This can be configured through the <em>loop_max_buffer</em>
+%% environment value. The request will be terminated with an
+%% <em>{error, overflow}</em> reason if this threshold is reached.
+-module(cowboy_long_polling).
+-behaviour(cowboy_sub_protocol).
+
+-export([upgrade/6]).
+-export([loop/4]).
+
+-record(state, {
+ env :: cowboy_middleware:env(),
+ hibernate = false :: boolean(),
+ buffer_size = 0 :: non_neg_integer(),
+ max_buffer = 5000 :: non_neg_integer() | infinity,
+ timeout = infinity :: timeout(),
+ timeout_ref = undefined :: undefined | reference(),
+ resp_sent = false :: boolean()
+}).
+
+-spec upgrade(Req, Env, module(), any(), timeout(), run | hibernate)
+ -> {ok, Req, Env} | {suspend, module(), atom(), [any()]}
+ when Req::cowboy_req:req(), Env::cowboy_middleware:env().
+upgrade(Req, Env, Handler, HandlerState, Timeout, run) ->
+ State = #state{env=Env, max_buffer=get_max_buffer(Env), timeout=Timeout},
+ State2 = timeout(State),
+ after_call(Req, State2, Handler, HandlerState);
+upgrade(Req, Env, Handler, HandlerState, Timeout, hibernate) ->
+ State = #state{env=Env, max_buffer=get_max_buffer(Env), hibernate=true, timeout=Timeout},
+ State2 = timeout(State),
+ after_call(Req, State2, Handler, HandlerState).
+
+get_max_buffer(Env) ->
+ case lists:keyfind(loop_max_buffer, 1, Env) of
+ false -> 5000;
+ {_, MaxBuffer} -> MaxBuffer
+ end.
+
+%% Update the state if the response was sent in the callback.
+after_call(Req, State=#state{resp_sent=false}, Handler,
+ HandlerState) ->
+ receive
+ {cowboy_req, resp_sent} ->
+ before_loop(Req, State#state{resp_sent=true}, Handler, HandlerState)
+ after 0 ->
+ before_loop(Req, State, Handler, HandlerState)
+ end;
+after_call(Req, State, Handler, HandlerState) ->
+ before_loop(Req, State, Handler, HandlerState).
+
+before_loop(Req, State=#state{hibernate=true}, Handler, HandlerState) ->
+ [Socket, Transport] = cowboy_req:get([socket, transport], Req),
+ Transport:setopts(Socket, [{active, once}]),
+ {suspend, ?MODULE, loop, [Req, State#state{hibernate=false}, Handler, HandlerState]};
+before_loop(Req, State, Handler, HandlerState) ->
+ [Socket, Transport] = cowboy_req:get([socket, transport], Req),
+ Transport:setopts(Socket, [{active, once}]),
+ loop(Req, State, Handler, HandlerState).
+
+%% Almost the same code can be found in cowboy_websocket.
+timeout(State=#state{timeout=infinity}) ->
+ State#state{timeout_ref=undefined};
+timeout(State=#state{timeout=Timeout,
+ timeout_ref=PrevRef}) ->
+ _ = case PrevRef of
+ undefined -> ignore;
+ PrevRef -> erlang:cancel_timer(PrevRef)
+ end,
+ TRef = erlang:start_timer(Timeout, self(), ?MODULE),
+ State#state{timeout_ref=TRef}.
+
+-spec loop(Req, #state{}, module(), any())
+ -> {ok, Req, cowboy_middleware:env()} | {suspend, module(), atom(), [any()]}
+ when Req::cowboy_req:req().
+loop(Req, State=#state{env=Env, buffer_size=NbBytes,
+ max_buffer=Threshold, timeout_ref=TRef,
+ resp_sent=RespSent}, Handler, HandlerState) ->
+ [Socket, Transport] = cowboy_req:get([socket, transport], Req),
+ {OK, Closed, Error} = Transport:messages(),
+ receive
+ {OK, Socket, Data} ->
+ NbBytes2 = NbBytes + byte_size(Data),
+ if NbBytes2 > Threshold ->
+ _ = if RespSent -> ok; true ->
+ cowboy_req:reply(500, Req)
+ end,
+ _ = cowboy_handler:terminate(Req, Env, Handler, HandlerState, {error, overflow}),
+ exit(normal);
+ true ->
+ Req2 = cowboy_req:append_buffer(Data, Req),
+ State2 = timeout(State#state{buffer_size=NbBytes2}),
+ before_loop(Req2, State2, Handler, HandlerState)
+ end;
+ {Closed, Socket} ->
+ terminate(Req, State, Handler, HandlerState, {error, closed});
+ {Error, Socket, Reason} ->
+ terminate(Req, State, Handler, HandlerState, {error, Reason});
+ {timeout, TRef, ?MODULE} ->
+ after_loop(Req, State, Handler, HandlerState, {normal, timeout});
+ {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
+ loop(Req, State, Handler, HandlerState);
+ Message ->
+ %% We set the socket back to {active, false} mode in case
+ %% the handler is going to call recv. We also flush any
+ %% data received after that and put it into the buffer.
+ %% We do not check the size here, if data keeps coming
+ %% we'll error out on the next packet received.
+ Transport:setopts(Socket, [{active, false}]),
+ Req2 = receive {OK, Socket, Data} ->
+ cowboy_req:append_buffer(Data, Req)
+ after 0 ->
+ Req
+ end,
+ call(Req2, State, Handler, HandlerState, Message)
+ end.
+
+call(Req, State=#state{env=Env, resp_sent=RespSent},
+ Handler, HandlerState, Message) ->
+ try Handler:info(Message, Req, HandlerState) of
+ {ok, Req2, HandlerState2} ->
+ after_loop(Req2, State, Handler, HandlerState2, {normal, shutdown});
+ {loop, Req2, HandlerState2} ->
+ after_call(Req2, State, Handler, HandlerState2);
+ {loop, Req2, HandlerState2, hibernate} ->
+ after_call(Req2, State#state{hibernate=true}, Handler, HandlerState2)
+ catch Class:Reason ->
+ Stacktrace = erlang:get_stacktrace(),
+ if RespSent -> ok; true ->
+ cowboy_req:maybe_reply(Stacktrace, Req)
+ end,
+ _ = cowboy_handler:terminate(Req, Env, Handler, HandlerState, {error, overflow}),
+ erlang:Class([
+ {reason, Reason},
+ {mfa, {Handler, info, 3}},
+ {stacktrace, Stacktrace},
+ {req, cowboy_req:to_list(Req)},
+ {state, HandlerState}
+ ])
+ end.
+
+%% It is sometimes important to make a socket passive as it was initially
+%% and as it is expected to be by cowboy_protocol, right after we're done
+%% with loop handling. The browser may freely pipeline a bunch of requests
+%% if previous one was, say, a JSONP long-polling request.
+after_loop(Req, State, Handler, HandlerState, Reason) ->
+ [Socket, Transport] = cowboy_req:get([socket, transport], Req),
+ Transport:setopts(Socket, [{active, false}]),
+ {OK, _Closed, _Error} = Transport:messages(),
+ Req2 = receive
+ {OK, Socket, Data} ->
+ cowboy_req:append_buffer(Data, Req)
+ after 0 ->
+ Req
+ end,
+ terminate(Req2, State, Handler, HandlerState, Reason).
+
+terminate(Req, #state{env=Env, timeout_ref=TRef},
+ Handler, HandlerState, Reason) ->
+ _ = case TRef of
+ undefined -> ignore;
+ TRef -> erlang:cancel_timer(TRef)
+ end,
+ flush_timeouts(),
+ cowboy_handler:terminate(Req, Env, Handler, HandlerState, Reason).
+
+flush_timeouts() ->
+ receive
+ {timeout, TRef, ?MODULE} when is_reference(TRef) ->
+ flush_timeouts()
+ after 0 ->
+ ok
+ end.