diff options
Diffstat (limited to 'src/cowboy_stream_h.erl')
-rw-r--r-- | src/cowboy_stream_h.erl | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl new file mode 100644 index 0000000..f924e28 --- /dev/null +++ b/src/cowboy_stream_h.erl @@ -0,0 +1,128 @@ +%% Copyright (c) 2016, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(cowboy_stream_h). +%% @todo -behaviour(cowboy_stream). + +%% @todo Maybe have a callback for the type of process this is, worker or supervisor. +-export([init/3]). +-export([data/4]). +-export([info/3]). +-export([terminate/3]). + +-export([execute/3]). +-export([resume/5]). + +-record(state, { + pid = undefined :: pid(), + read_body_ref = undefined :: reference(), + read_body_length = 0 :: non_neg_integer(), + read_body_is_fin = nofin :: nofin | fin, + read_body_buffer = <<>> :: binary() +}). + +%% @todo For shutting down children we need to have a timeout before we terminate +%% the stream like supervisors do. So here just send a message to yourself first, +%% and then decide what to do when receiving this message. + +%% @todo proper specs +-spec init(_,_,_) -> _. +init(_StreamID, Req, Opts) -> + Env = maps:get(env, Opts, #{}), + Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]), + Shutdown = maps:get(shutdown, Opts, 5000), + Pid = proc_lib:spawn_link(?MODULE, execute, [Req, Env, Middlewares]), + {[{spawn, Pid, Shutdown}], #state{pid=Pid}}. + +%% If we receive data and stream is waiting for data: +%% If we accumulated enough data or IsFin=fin, send it. +%% If not, buffer it. +%% If not, buffer it. + +%% @todo proper specs +-spec data(_,_,_,_) -> _. +data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) -> + {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}}; +data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length -> + {[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}}; +data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_buffer=Buffer}) -> + Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>}, + {[], State#state{read_body_ref=undefined, read_body_buffer= <<>>}}. + +%% @todo proper specs +-spec info(_,_,_) -> _. +info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) -> + {[stop], State}; +info(_StreamID, Reason = {'EXIT', Pid, _}, State=#state{pid=Pid}) -> + {[{internal_error, Reason, 'Stream process crashed.'}], State}; +%% Request body, no body buffer but IsFin=fin. +info(_StreamID, {read_body, Ref, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) -> + Pid ! {request_body, Ref, fin, <<>>}, + {[], State}; +%% Request body, body buffered large enough or complete. +info(_StreamID, {read_body, Ref, Length}, State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data}) + when element(1, IsFin) =:= fin; byte_size(Data) >= Length -> + Pid ! {request_body, Ref, IsFin, Data}, + {[], State#state{read_body_buffer= <<>>}}; +%% Request body, not enough to send yet. +info(_StreamID, {read_body, Ref, Length}, State) -> + {[{flow, Length}], State#state{read_body_ref=Ref, read_body_length=Length}}; +%% Response. +info(_StreamID, Response = {response, _, _, _}, State) -> + {[Response], State}; +info(_StreamID, Headers = {headers, _, _}, State) -> + {[Headers], State}; +info(_StreamID, Data = {data, _, _}, State) -> + {[Data], State}; +info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) -> + {[SwitchProtocol], State}; +%% Stray message. +info(_StreamID, _Msg, State) -> + %% @todo Cleanup if no reply was sent when stream ends. + {[], State}. + +%% @todo proper specs +-spec terminate(_,_,_) -> _. +terminate(_StreamID, _Reason, _State) -> + ok. + +%% Request process. + +%% @todo +%-spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()]) +% -> ok. +-spec execute(_, _, _) -> _. +execute(_, _, []) -> + ok; %% @todo Maybe error reason should differ here and there. +execute(Req, Env, [Middleware|Tail]) -> + case Middleware:execute(Req, Env) of + {ok, Req2, Env2} -> + execute(Req2, Env2, Tail); + {suspend, Module, Function, Args} -> + proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]); + {stop, _Req2} -> + ok %% @todo Maybe error reason should differ here and there. + end. + +-spec resume(cowboy_middleware:env(), [module()], + module(), module(), [any()]) -> ok. +resume(Env, Tail, Module, Function, Args) -> + case apply(Module, Function, Args) of + {ok, Req2, Env2} -> + execute(Req2, Env2, Tail); + {suspend, Module2, Function2, Args2} -> + proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]); + {stop, _Req2} -> + ok %% @todo Maybe error reason should differ here and there. + end. |