aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_stream_h.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cowboy_stream_h.erl')
-rw-r--r--src/cowboy_stream_h.erl128
1 files changed, 128 insertions, 0 deletions
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl
new file mode 100644
index 0000000..f924e28
--- /dev/null
+++ b/src/cowboy_stream_h.erl
@@ -0,0 +1,128 @@
+%% Copyright (c) 2016, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(cowboy_stream_h).
+%% @todo -behaviour(cowboy_stream).
+
+%% @todo Maybe have a callback for the type of process this is, worker or supervisor.
+-export([init/3]).
+-export([data/4]).
+-export([info/3]).
+-export([terminate/3]).
+
+-export([execute/3]).
+-export([resume/5]).
+
+-record(state, {
+ pid = undefined :: pid(),
+ read_body_ref = undefined :: reference(),
+ read_body_length = 0 :: non_neg_integer(),
+ read_body_is_fin = nofin :: nofin | fin,
+ read_body_buffer = <<>> :: binary()
+}).
+
+%% @todo For shutting down children we need to have a timeout before we terminate
+%% the stream like supervisors do. So here just send a message to yourself first,
+%% and then decide what to do when receiving this message.
+
+%% @todo proper specs
+-spec init(_,_,_) -> _.
+init(_StreamID, Req, Opts) ->
+ Env = maps:get(env, Opts, #{}),
+ Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
+ Shutdown = maps:get(shutdown, Opts, 5000),
+ Pid = proc_lib:spawn_link(?MODULE, execute, [Req, Env, Middlewares]),
+ {[{spawn, Pid, Shutdown}], #state{pid=Pid}}.
+
+%% If we receive data and stream is waiting for data:
+%% If we accumulated enough data or IsFin=fin, send it.
+%% If not, buffer it.
+%% If not, buffer it.
+
+%% @todo proper specs
+-spec data(_,_,_,_) -> _.
+data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) ->
+ {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}};
+data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length ->
+ {[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}};
+data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_buffer=Buffer}) ->
+ Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>},
+ {[], State#state{read_body_ref=undefined, read_body_buffer= <<>>}}.
+
+%% @todo proper specs
+-spec info(_,_,_) -> _.
+info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) ->
+ {[stop], State};
+info(_StreamID, Reason = {'EXIT', Pid, _}, State=#state{pid=Pid}) ->
+ {[{internal_error, Reason, 'Stream process crashed.'}], State};
+%% Request body, no body buffer but IsFin=fin.
+info(_StreamID, {read_body, Ref, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) ->
+ Pid ! {request_body, Ref, fin, <<>>},
+ {[], State};
+%% Request body, body buffered large enough or complete.
+info(_StreamID, {read_body, Ref, Length}, State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data})
+ when element(1, IsFin) =:= fin; byte_size(Data) >= Length ->
+ Pid ! {request_body, Ref, IsFin, Data},
+ {[], State#state{read_body_buffer= <<>>}};
+%% Request body, not enough to send yet.
+info(_StreamID, {read_body, Ref, Length}, State) ->
+ {[{flow, Length}], State#state{read_body_ref=Ref, read_body_length=Length}};
+%% Response.
+info(_StreamID, Response = {response, _, _, _}, State) ->
+ {[Response], State};
+info(_StreamID, Headers = {headers, _, _}, State) ->
+ {[Headers], State};
+info(_StreamID, Data = {data, _, _}, State) ->
+ {[Data], State};
+info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) ->
+ {[SwitchProtocol], State};
+%% Stray message.
+info(_StreamID, _Msg, State) ->
+ %% @todo Cleanup if no reply was sent when stream ends.
+ {[], State}.
+
+%% @todo proper specs
+-spec terminate(_,_,_) -> _.
+terminate(_StreamID, _Reason, _State) ->
+ ok.
+
+%% Request process.
+
+%% @todo
+%-spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()])
+% -> ok.
+-spec execute(_, _, _) -> _.
+execute(_, _, []) ->
+ ok; %% @todo Maybe error reason should differ here and there.
+execute(Req, Env, [Middleware|Tail]) ->
+ case Middleware:execute(Req, Env) of
+ {ok, Req2, Env2} ->
+ execute(Req2, Env2, Tail);
+ {suspend, Module, Function, Args} ->
+ proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]);
+ {stop, _Req2} ->
+ ok %% @todo Maybe error reason should differ here and there.
+ end.
+
+-spec resume(cowboy_middleware:env(), [module()],
+ module(), module(), [any()]) -> ok.
+resume(Env, Tail, Module, Function, Args) ->
+ case apply(Module, Function, Args) of
+ {ok, Req2, Env2} ->
+ execute(Req2, Env2, Tail);
+ {suspend, Module2, Function2, Args2} ->
+ proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]);
+ {stop, _Req2} ->
+ ok %% @todo Maybe error reason should differ here and there.
+ end.