From 20660d7566b63977e80f694724fee890d875ec1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 2 Oct 2019 19:12:05 +0200 Subject: Ensure we can read the request body from any process --- src/cowboy_req.erl | 2 +- src/cowboy_stream_h.erl | 17 ++++++++++------- src/cowboy_websocket.erl | 2 +- test/handlers/echo_h.erl | 10 ++++++++++ test/req_SUITE.erl | 6 ++++++ 5 files changed, 28 insertions(+), 9 deletions(-) diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 8e64a39..b2756e3 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -491,7 +491,7 @@ read_body(Req=#{pid := Pid, streamid := StreamID}, Opts) -> Period = maps:get(period, Opts, 15000), Timeout = maps:get(timeout, Opts, Period + 1000), Ref = make_ref(), - Pid ! {{Pid, StreamID}, {read_body, Ref, Length, Period}}, + Pid ! {{Pid, StreamID}, {read_body, self(), Ref, Length, Period}}, receive {request_body, Ref, nofin, Body} -> {more, Body, Req}; diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index 9397726..cc38bb2 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -34,6 +34,7 @@ ref = undefined :: ranch:ref(), pid = undefined :: pid(), expect = undefined :: undefined | continue, + read_body_pid = undefined :: pid() | undefined, read_body_ref = undefined :: reference() | undefined, read_body_timer_ref = undefined :: reference() | undefined, read_body_length = 0 :: non_neg_integer() | infinity | auto, @@ -94,7 +95,7 @@ data(StreamID, IsFin, Data, State=#state{ %% Stream is waiting for data using auto mode. %% %% There is no buffering done in auto mode. -data(StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, +data(StreamID, IsFin, Data, State=#state{read_body_pid=Pid, read_body_ref=Ref, read_body_length=auto, body_length=BodyLen}) -> send_request_body(Pid, Ref, IsFin, BodyLen, Data), do_data(StreamID, IsFin, Data, [{flow, byte_size(Data)}], State#state{ @@ -111,7 +112,7 @@ data(StreamID, IsFin=nofin, Data, State=#state{ body_length=BodyLen + byte_size(Data) }); %% Stream is waiting for data and we received enough to send. -data(StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, +data(StreamID, IsFin, Data, State=#state{read_body_pid=Pid, read_body_ref=Ref, read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) -> BodyLen = BodyLen0 + byte_size(Data), %% @todo Handle the infinity case where no TRef was defined. @@ -162,13 +163,14 @@ info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, p {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>} |Commands], State); %% Request body, auto mode, no body buffered. -info(StreamID, Info={read_body, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) -> +info(StreamID, Info={read_body, Pid, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) -> do_info(StreamID, Info, [], State#state{ + read_body_pid=Pid, read_body_ref=Ref, read_body_length=auto }); %% Request body, auto mode, body buffered or complete. -info(StreamID, Info={read_body, Ref, auto, infinity}, State=#state{pid=Pid, +info(StreamID, Info={read_body, Pid, Ref, auto, infinity}, State=#state{ read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) -> send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), do_info(StreamID, Info, [{flow, byte_size(Buffer)}], @@ -177,13 +179,13 @@ info(StreamID, Info={read_body, Ref, auto, infinity}, State=#state{pid=Pid, %% %% We do not send a 100 continue response if the client %% already started sending the body. -info(StreamID, Info={read_body, Ref, Length, _}, State=#state{pid=Pid, +info(StreamID, Info={read_body, Pid, Ref, Length, _}, State=#state{ read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) when IsFin =:= fin; byte_size(Buffer) >= Length -> send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), do_info(StreamID, Info, [], State#state{read_body_buffer= <<>>}); %% Request body, not enough to send yet. -info(StreamID, Info={read_body, Ref, Length, Period}, State=#state{expect=Expect}) -> +info(StreamID, Info={read_body, Pid, Ref, Length, Period}, State=#state{expect=Expect}) -> Commands = case Expect of continue -> [{inform, 100, #{}}, {flow, Length}]; undefined -> [{flow, Length}] @@ -191,12 +193,13 @@ info(StreamID, Info={read_body, Ref, Length, Period}, State=#state{expect=Expect %% @todo Handle the case where Period =:= infinity. TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}), do_info(StreamID, Info, Commands, State#state{ + read_body_pid=Pid, read_body_ref=Ref, read_body_timer_ref=TRef, read_body_length=Length }); %% Request body reading timeout; send what we got. -info(StreamID, Info={read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref, +info(StreamID, Info={read_body_timeout, Ref}, State=#state{read_body_pid=Pid, read_body_ref=Ref, read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) -> send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), do_info(StreamID, Info, [], State#state{ diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index 9fc2752..9540b75 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -305,7 +305,7 @@ before_loop(State=#state{socket=Stream={Pid, _}, transport=undefined}, HandlerState, ParseState) -> %% @todo Keep Ref around. ReadBodyRef = make_ref(), - Pid ! {Stream, {read_body, ReadBodyRef, auto, infinity}}, + Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}}, loop(State, HandlerState, ParseState); before_loop(State=#state{socket=Socket, transport=Transport, hibernate=true}, HandlerState, ParseState) -> diff --git a/test/handlers/echo_h.erl b/test/handlers/echo_h.erl index ec37a66..7d0e75b 100644 --- a/test/handlers/echo_h.erl +++ b/test/handlers/echo_h.erl @@ -30,6 +30,16 @@ echo(<<"read_body">>, Req0, Opts) -> Length = cowboy_req:body_length(Req1), {ok, integer_to_binary(Length), Req1}; <<"/opts", _/bits>> -> cowboy_req:read_body(Req0, Opts); + <<"/spawn", _/bits>> -> + Parent = self(), + Pid = spawn_link(fun() -> + Parent ! {self(), cowboy_req:read_body(Req0)} + end), + receive + {Pid, Msg} -> Msg + after 5000 -> + error(timeout) + end; _ -> cowboy_req:read_body(Req0) end, {ok, cowboy_req:reply(200, #{}, Body, Req), Opts}; diff --git a/test/req_SUITE.erl b/test/req_SUITE.erl index eafabfe..72cc0ed 100644 --- a/test/req_SUITE.erl +++ b/test/req_SUITE.erl @@ -57,6 +57,7 @@ init_dispatch(Config) -> {"/opts/:key/timeout", echo_h, #{timeout => 1000, crash => true}}, {"/100-continue/:key", echo_h, []}, {"/full/:key", echo_h, []}, + {"/spawn/:key", echo_h, []}, {"/no/:key", echo_h, []}, {"/direct/:key/[...]", echo_h, []}, {"/:key/[...]", echo_h, []} @@ -488,6 +489,11 @@ do_read_body_timeout(Path, Body, Config) -> {response, _, 500, _} = gun:await(ConnPid, Ref), gun:close(ConnPid). +read_body_spawn(Config) -> + doc("Confirm we can use cowboy_req:read_body/1,2 from another process."), + <<"hello world!">> = do_body("POST", "/spawn/read_body", [], "hello world!", Config), + ok. + read_body_expect_100_continue(Config) -> doc("Request body with a 100-continue expect header."), do_read_body_expect_100_continue("/read_body", Config). -- cgit v1.2.3