From 34f57ebbd39b64f1dc867260aca8a1efc49b4a70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 27 Jun 2018 18:07:58 +0200 Subject: Make sure cowboy_stream_h calls subsequent stream handlers --- src/cowboy_stream_h.erl | 136 ++++++++++++++++++++++++++++-------------------- 1 file changed, 80 insertions(+), 56 deletions(-) (limited to 'src/cowboy_stream_h.erl') diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index d55846a..21651f7 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -29,9 +29,8 @@ -export([execute/3]). -export([resume/5]). -%% @todo Need to call subsequent handlers. - -record(state, { + next :: any(), ref = undefined :: ranch:ref(), pid = undefined :: pid(), expect = undefined :: undefined | continue, @@ -49,13 +48,15 @@ -spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts()) -> {[{spawn, pid(), timeout()}], #state{}}. -init(_StreamID, Req=#{ref := Ref}, Opts) -> +init(StreamID, Req=#{ref := Ref}, Opts) -> Env = maps:get(env, Opts, #{}), Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]), Shutdown = maps:get(shutdown_timeout, Opts, 5000), Pid = proc_lib:spawn_link(?MODULE, request_process, [Req, Env, Middlewares]), Expect = expect(Req), - {[{spawn, Pid, Shutdown}], #state{ref=Ref, pid=Pid, expect=Expect}}. + {Commands, Next} = cowboy_stream:init(StreamID, Req, Opts), + {[{spawn, Pid, Shutdown}|Commands], + #state{next=Next, ref=Ref, pid=Pid, expect=Expect}}. %% Ignore the expect header in HTTP/1.0. expect(#{version := 'HTTP/1.0'}) -> @@ -81,49 +82,58 @@ expect(Req) -> -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State) -> {cowboy_stream:commands(), State} when State::#state{}. %% Stream isn't waiting for data. -data(_StreamID, IsFin, Data, State=#state{ +data(StreamID, IsFin, Data, State=#state{ read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) -> - {[], State#state{ + do_data(StreamID, IsFin, Data, [], State#state{ expect=undefined, read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>, - body_length=BodyLen + byte_size(Data)}}; + body_length=BodyLen + byte_size(Data) + }); %% Stream is waiting for data using auto mode. %% %% There is no buffering done in auto mode. -data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, +data(StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_length=auto, body_length=BodyLen}) -> send_request_body(Pid, Ref, IsFin, BodyLen, Data), - {[{flow, byte_size(Data)}], State#state{ + do_data(StreamID, IsFin, Data, [{flow, byte_size(Data)}], State#state{ read_body_ref=undefined, - body_length=BodyLen}}; + body_length=BodyLen + }); %% Stream is waiting for data but we didn't receive enough to send yet. -data(_StreamID, nofin, Data, State=#state{ +data(StreamID, IsFin=nofin, Data, State=#state{ read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen}) when byte_size(Data) + byte_size(Buffer) < ReadLen -> - {[], State#state{ + do_data(StreamID, IsFin, Data, [], State#state{ expect=undefined, read_body_buffer= << Buffer/binary, Data/binary >>, - body_length=BodyLen + byte_size(Data)}}; + body_length=BodyLen + byte_size(Data) + }); %% Stream is waiting for data and we received enough to send. -data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, +data(StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) -> BodyLen = BodyLen0 + byte_size(Data), %% @todo Handle the infinity case where no TRef was defined. ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]), send_request_body(Pid, Ref, IsFin, BodyLen, <>), - {[], State#state{ + do_data(StreamID, IsFin, Data, [], State#state{ expect=undefined, read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>, - body_length=BodyLen}}. + body_length=BodyLen + }). + +do_data(StreamID, IsFin, Data, Commands1, State=#state{next=Next0}) -> + {Commands2, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0), + {Commands1 ++ Commands2, State#state{next=Next}}. -spec info(cowboy_stream:streamid(), any(), State) -> {cowboy_stream:commands(), State} when State::#state{}. -info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) -> - {[stop], State}; -info(_StreamID, {'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}}, State=#state{pid=Pid}) -> +info(StreamID, Info={'EXIT', Pid, normal}, State=#state{pid=Pid}) -> + do_info(StreamID, Info, [stop], State); +info(StreamID, Info={'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}}, + State=#state{pid=Pid}) -> %% @todo Optionally report the crash to help debugging. %%report_crash(Ref, StreamID, Pid, Reason, Stacktrace), Status = case Reason of @@ -132,80 +142,94 @@ info(_StreamID, {'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}}, Sta _ -> 400 end, %% @todo Headers? Details in body? More stuff in debug only? - {[{error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>}, stop], State}; -info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) -> + do_info(StreamID, Info, [ + {error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>}, + stop + ], State); +info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) -> report_crash(Ref, StreamID, Pid, Reason, Stacktrace), - {[ + do_info(StreamID, Exit, [ {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>}, {internal_error, Exit, 'Stream process crashed.'} - ], State}; + ], State); %% Request body, auto mode, no body buffered. -info(_StreamID, {read_body, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) -> - {[], State#state{ +info(StreamID, Info={read_body, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) -> + do_info(StreamID, Info, [], State#state{ read_body_ref=Ref, - read_body_length=auto}}; + read_body_length=auto + }); %% Request body, auto mode, body buffered or complete. -info(_StreamID, {read_body, Ref, auto, infinity}, State=#state{pid=Pid, +info(StreamID, Info={read_body, Ref, auto, infinity}, State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) -> send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), - {[{flow, byte_size(Buffer)}], State#state{read_body_buffer= <<>>}}; + do_info(StreamID, Info, [{flow, byte_size(Buffer)}], + State#state{read_body_buffer= <<>>}); %% Request body, body buffered large enough or complete. %% %% We do not send a 100 continue response if the client %% already started sending the body. -info(_StreamID, {read_body, Ref, Length, _}, State=#state{pid=Pid, +info(StreamID, Info={read_body, Ref, Length, _}, State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) when IsFin =:= fin; byte_size(Buffer) >= Length -> send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), - {[], State#state{read_body_buffer= <<>>}}; + do_info(StreamID, Info, [], State#state{read_body_buffer= <<>>}); %% Request body, not enough to send yet. -info(StreamID, {read_body, Ref, Length, Period}, State=#state{expect=Expect}) -> +info(StreamID, Info={read_body, Ref, Length, Period}, State=#state{expect=Expect}) -> Commands = case Expect of continue -> [{inform, 100, #{}}, {flow, Length}]; undefined -> [{flow, Length}] end, %% @todo Handle the case where Period =:= infinity. TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}), - {Commands, State#state{ + do_info(StreamID, Info, Commands, State#state{ read_body_ref=Ref, read_body_timer_ref=TRef, - read_body_length=Length}}; + read_body_length=Length + }); %% Request body reading timeout; send what we got. -info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref, +info(StreamID, Info={read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref, read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) -> send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), - {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}; -info(_StreamID, {read_body_timeout, _}, State) -> - {[], State}; + do_info(StreamID, Info, [], State#state{ + read_body_ref=undefined, + read_body_timer_ref=undefined, + read_body_buffer= <<>> + }); +info(StreamID, Info={read_body_timeout, _}, State) -> + do_info(StreamID, Info, [], State); %% Response. %% %% We reset the expect field when a 100 continue response %% is sent or when any final response is sent. -info(_StreamID, Inform = {inform, Status, _}, State0) -> +info(StreamID, Inform={inform, Status, _}, State0) -> State = case cow_http:status_to_integer(Status) of 100 -> State0#state{expect=undefined}; _ -> State0 end, - {[Inform], State}; -info(_StreamID, Response = {response, _, _, _}, State) -> - {[Response], State#state{expect=undefined}}; -info(_StreamID, Headers = {headers, _, _}, State) -> - {[Headers], State#state{expect=undefined}}; -info(_StreamID, Data = {data, _, _}, State) -> - {[Data], State}; -info(_StreamID, Trailers = {trailers, _}, State) -> - {[Trailers], State}; -info(_StreamID, Push = {push, _, _, _, _, _, _, _}, State) -> - {[Push], State}; -info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) -> - {[SwitchProtocol], State#state{expect=undefined}}; -%% Stray message. -info(_StreamID, _Info, State) -> - {[], State}. + do_info(StreamID, Inform, [Inform], State); +info(StreamID, Response={response, _, _, _}, State) -> + do_info(StreamID, Response, [Response], State#state{expect=undefined}); +info(StreamID, Headers={headers, _, _}, State) -> + do_info(StreamID, Headers, [Headers], State#state{expect=undefined}); +info(StreamID, Data={data, _, _}, State) -> + do_info(StreamID, Data, [Data], State); +info(StreamID, Trailers={trailers, _}, State) -> + do_info(StreamID, Trailers, [Trailers], State); +info(StreamID, Push={push, _, _, _, _, _, _, _}, State) -> + do_info(StreamID, Push, [Push], State); +info(StreamID, SwitchProtocol={switch_protocol, _, _, _}, State) -> + do_info(StreamID, SwitchProtocol, [SwitchProtocol], State#state{expect=undefined}); +%% Unknown message, either stray or meant for a handler down the line. +info(StreamID, Info, State) -> + do_info(StreamID, Info, [], State). + +do_info(StreamID, Info, Commands1, State0=#state{next=Next0}) -> + {Commands2, Next} = cowboy_stream:info(StreamID, Info, Next0), + {Commands1 ++ Commands2, State0#state{next=Next}}. -spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> ok. -terminate(_StreamID, _Reason, _State) -> - ok. +terminate(StreamID, Reason, #state{next=Next}) -> + cowboy_stream:terminate(StreamID, Reason, Next). -spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(), cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp -- cgit v1.2.3