%% Copyright (c) 2016-2017, Loïc Hoguin %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above %% copyright notice and this permission notice appear in all copies. %% %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -module(cowboy_stream_h). -behavior(cowboy_stream). -export([init/3]). -export([data/4]). -export([info/3]). -export([terminate/3]). -export([early_error/5]). -export([request_process/3]). -export([execute/3]). -export([resume/5]). %% @todo Need to call subsequent handlers. -record(state, { ref = undefined :: ranch:ref(), pid = undefined :: pid(), expect = undefined :: undefined | continue, read_body_ref = undefined :: reference() | undefined, read_body_timer_ref = undefined :: reference() | undefined, read_body_length = 0 :: non_neg_integer() | infinity, read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()}, read_body_buffer = <<>> :: binary(), body_length = 0 :: non_neg_integer() }). %% @todo For shutting down children we need to have a timeout before we terminate %% the stream like supervisors do. So here just send a message to yourself first, %% and then decide what to do when receiving this message. -spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts()) -> {[{spawn, pid(), timeout()}], #state{}}. init(_StreamID, Req=#{ref := Ref}, Opts) -> Env = maps:get(env, Opts, #{}), Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]), Shutdown = maps:get(shutdown_timeout, Opts, 5000), Pid = proc_lib:spawn_link(?MODULE, request_process, [Req, Env, Middlewares]), Expect = expect(Req), {[{spawn, Pid, Shutdown}], #state{ref=Ref, pid=Pid, expect=Expect}}. %% Ignore the expect header in HTTP/1.0. expect(#{version := 'HTTP/1.0'}) -> undefined; expect(Req) -> try cowboy_req:parse_header(<<"expect">>, Req) of Expect -> Expect catch _:_ -> undefined end. %% If we receive data and stream is waiting for data: %% If we accumulated enough data or IsFin=fin, send it. %% If not, buffer it. %% If not, buffer it. %% %% We always reset the expect field when we receive data, %% since the client started sending the request body before %% we could send a 100 continue response. -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State) -> {cowboy_stream:commands(), State} when State::#state{}. data(_StreamID, IsFin, Data, State=#state{ read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) -> {[], State#state{ expect=undefined, read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>, body_length=BodyLen + byte_size(Data)}}; data(_StreamID, nofin, Data, State=#state{ read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen}) when byte_size(Data) + byte_size(Buffer) < ReadLen -> {[], State#state{ expect=undefined, read_body_buffer= << Buffer/binary, Data/binary >>, body_length=BodyLen + byte_size(Data)}}; data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) -> BodyLen = BodyLen0 + byte_size(Data), ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]), send_request_body(Pid, Ref, IsFin, BodyLen, <>), {[], State#state{ expect=undefined, read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>, body_length=BodyLen}}. -spec info(cowboy_stream:streamid(), any(), State) -> {cowboy_stream:commands(), State} when State::#state{}. info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) -> {[stop], State}; info(_StreamID, {'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}}, State=#state{pid=Pid}) -> %% @todo Optionally report the crash to help debugging. %%report_crash(Ref, StreamID, Pid, Reason, Stacktrace), Status = case Reason of timeout -> 408; payload_too_large -> 413; _ -> 400 end, %% @todo Headers? Details in body? More stuff in debug only? {[{error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>}, stop], State}; info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) -> report_crash(Ref, StreamID, Pid, Reason, Stacktrace), {[ {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>}, {internal_error, Exit, 'Stream process crashed.'} ], State}; %% Request body, body buffered large enough or complete. %% %% We do not send a 100 continue response if the client %% already started sending the body. info(_StreamID, {read_body, Ref, Length, _}, State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) when IsFin =:= fin; byte_size(Buffer) >= Length -> send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), {[], State#state{read_body_buffer= <<>>}}; %% Request body, not enough to send yet. info(StreamID, {read_body, Ref, Length, Period}, State=#state{expect=Expect}) -> Commands = case Expect of continue -> [{inform, 100, #{}}, {flow, Length}]; undefined -> [{flow, Length}] end, TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}), {Commands, State#state{ read_body_ref=Ref, read_body_timer_ref=TRef, read_body_length=Length}}; %% Request body reading timeout; send what we got. info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref, read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) -> send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}; info(_StreamID, {read_body_timeout, _}, State) -> {[], State}; %% Response. %% %% We reset the expect field when a 100 continue response %% is sent or when any final response is sent. info(_StreamID, Inform = {inform, Status, _}, State0) -> State = case Status of 100 -> State0#state{expect=undefined}; <<"100">> -> State0#state{expect=undefined}; <<"100 ", _/bits>> -> State0#state{expect=undefined}; _ -> State0 end, {[Inform], State}; info(_StreamID, Response = {response, _, _, _}, State) -> {[Response], State#state{expect=undefined}}; info(_StreamID, Headers = {headers, _, _}, State) -> {[Headers], State#state{expect=undefined}}; info(_StreamID, Data = {data, _, _}, State) -> {[Data], State}; info(_StreamID, Trailers = {trailers, _}, State) -> {[Trailers], State}; info(_StreamID, Push = {push, _, _, _, _, _, _, _}, State) -> {[Push], State}; info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) -> {[SwitchProtocol], State#state{expect=undefined}}; %% Stray message. info(_StreamID, _Info, State) -> {[], State}. -spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> ok. terminate(_StreamID, _Reason, _State) -> ok. -spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(), cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp when Resp::cowboy_stream:resp_command(). early_error(StreamID, Reason, PartialReq, Resp, Opts) -> cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts). send_request_body(Pid, Ref, nofin, _, Data) -> Pid ! {request_body, Ref, nofin, Data}, ok; send_request_body(Pid, Ref, fin, BodyLen, Data) -> Pid ! {request_body, Ref, fin, BodyLen, Data}, ok. %% We use ~999999p here instead of ~w because the latter doesn't %% support printable strings. report_crash(_, _, _, normal, _) -> ok; report_crash(_, _, _, shutdown, _) -> ok; report_crash(_, _, _, {shutdown, _}, _) -> ok; report_crash(Ref, StreamID, Pid, Reason, Stacktrace) -> error_logger:error_msg( "Ranch listener ~p, connection process ~p, stream ~p " "had its request process ~p exit with reason " "~999999p and stacktrace ~999999p~n", [Ref, self(), StreamID, Pid, Reason, Stacktrace]). %% Request process. %% We catch all exceptions in order to add the stacktrace to %% the exit reason as it is not propagated by proc_lib otherwise %% and therefore not present in the 'EXIT' message. We want %% the stacktrace in order to simplify debugging of errors. %% %% This + the behavior in proc_lib means that we will get a %% {Reason, Stacktrace} tuple for every exceptions, instead of %% just for errors and throws. %% %% @todo Better spec. -spec request_process(_, _, _) -> _. request_process(Req, Env, Middlewares) -> OTP = erlang:system_info(otp_release), try execute(Req, Env, Middlewares) catch exit:Reason -> Stacktrace = erlang:get_stacktrace(), erlang:raise(exit, {Reason, Stacktrace}, Stacktrace); %% OTP 19 does not propagate any exception stacktraces, %% we therefore add it for every class of exception. _:Reason when OTP =:= "19" -> Stacktrace = erlang:get_stacktrace(), erlang:raise(exit, {Reason, Stacktrace}, Stacktrace); %% @todo I don't think this clause is necessary. Class:Reason -> erlang:raise(Class, Reason, erlang:get_stacktrace()) end. %% @todo %-spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()]) % -> ok. -spec execute(_, _, _) -> _. execute(_, _, []) -> ok; %% @todo Maybe error reason should differ here and there. execute(Req, Env, [Middleware|Tail]) -> case Middleware:execute(Req, Env) of {ok, Req2, Env2} -> execute(Req2, Env2, Tail); {suspend, Module, Function, Args} -> proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]); {stop, _Req2} -> ok %% @todo Maybe error reason should differ here and there. end. -spec resume(cowboy_middleware:env(), [module()], module(), module(), [any()]) -> ok. resume(Env, Tail, Module, Function, Args) -> case apply(Module, Function, Args) of {ok, Req2, Env2} -> execute(Req2, Env2, Tail); {suspend, Module2, Function2, Args2} -> proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]); {stop, _Req2} -> ok %% @todo Maybe error reason should differ here and there. end.