%% Copyright (c) 2016, Loïc Hoguin %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above %% copyright notice and this permission notice appear in all copies. %% %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -module(cowboy_stream_h). %% @todo -behaviour(cowboy_stream). %% @todo Maybe have a callback for the type of process this is, worker or supervisor. -export([init/3]). -export([data/4]). -export([info/3]). -export([terminate/3]). -export([execute/3]). -export([resume/5]). -record(state, { pid = undefined :: pid(), read_body_ref = undefined :: reference(), read_body_length = 0 :: non_neg_integer(), read_body_is_fin = nofin :: nofin | fin, read_body_buffer = <<>> :: binary() }). %% @todo For shutting down children we need to have a timeout before we terminate %% the stream like supervisors do. So here just send a message to yourself first, %% and then decide what to do when receiving this message. %% @todo proper specs -spec init(_,_,_) -> _. init(_StreamID, Req, Opts) -> Env = maps:get(env, Opts, #{}), Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]), Shutdown = maps:get(shutdown, Opts, 5000), Pid = proc_lib:spawn_link(?MODULE, execute, [Req, Env, Middlewares]), {[{spawn, Pid, Shutdown}], #state{pid=Pid}}. %% If we receive data and stream is waiting for data: %% If we accumulated enough data or IsFin=fin, send it. %% If not, buffer it. %% If not, buffer it. %% @todo proper specs -spec data(_,_,_,_) -> _. data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) -> {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}}; data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length -> {[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}}; data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_buffer=Buffer}) -> Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>}, {[], State#state{read_body_ref=undefined, read_body_buffer= <<>>}}. %% @todo proper specs -spec info(_,_,_) -> _. info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) -> {[stop], State}; info(_StreamID, Reason = {'EXIT', Pid, _}, State=#state{pid=Pid}) -> {[{internal_error, Reason, 'Stream process crashed.'}], State}; %% Request body, no body buffer but IsFin=fin. info(_StreamID, {read_body, Ref, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) -> Pid ! {request_body, Ref, fin, <<>>}, {[], State}; %% Request body, body buffered large enough or complete. info(_StreamID, {read_body, Ref, Length}, State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data}) when element(1, IsFin) =:= fin; byte_size(Data) >= Length -> Pid ! {request_body, Ref, IsFin, Data}, {[], State#state{read_body_buffer= <<>>}}; %% Request body, not enough to send yet. info(_StreamID, {read_body, Ref, Length}, State) -> {[{flow, Length}], State#state{read_body_ref=Ref, read_body_length=Length}}; %% Response. info(_StreamID, Response = {response, _, _, _}, State) -> {[Response], State}; info(_StreamID, Headers = {headers, _, _}, State) -> {[Headers], State}; info(_StreamID, Data = {data, _, _}, State) -> {[Data], State}; info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) -> {[SwitchProtocol], State}; %% Stray message. info(_StreamID, _Msg, State) -> %% @todo Cleanup if no reply was sent when stream ends. {[], State}. %% @todo proper specs -spec terminate(_,_,_) -> _. terminate(_StreamID, _Reason, _State) -> ok. %% Request process. %% @todo %-spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()]) % -> ok. -spec execute(_, _, _) -> _. execute(_, _, []) -> ok; %% @todo Maybe error reason should differ here and there. execute(Req, Env, [Middleware|Tail]) -> case Middleware:execute(Req, Env) of {ok, Req2, Env2} -> execute(Req2, Env2, Tail); {suspend, Module, Function, Args} -> proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]); {stop, _Req2} -> ok %% @todo Maybe error reason should differ here and there. end. -spec resume(cowboy_middleware:env(), [module()], module(), module(), [any()]) -> ok. resume(Env, Tail, Module, Function, Args) -> case apply(Module, Function, Args) of {ok, Req2, Env2} -> execute(Req2, Env2, Tail); {suspend, Module2, Function2, Args2} -> proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]); {stop, _Req2} -> ok %% @todo Maybe error reason should differ here and there. end.