From eaa052616f7c85d4daf4817ceecd5f87fecfc8f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 2 Oct 2019 20:30:32 +0200 Subject: Ensure we can stream the response body from any process --- src/cowboy_req.erl | 10 +++++----- src/cowboy_stream_h.erl | 17 +++++++++++++---- test/handlers/resp_h.erl | 15 +++++++++++++++ test/req_SUITE.erl | 5 +++++ 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index b2756e3..e5ec4a7 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -827,19 +827,19 @@ stream_body(_, _, #{method := <<"HEAD">>, has_sent_resp := headers}) -> stream_body({sendfile, _, 0, _}, nofin, _) -> ok; stream_body({sendfile, _, 0, _}, IsFin=fin, Req=#{has_sent_resp := headers}) -> - stream_body({data, IsFin, <<>>}, Req); + stream_body({data, self(), IsFin, <<>>}, Req); stream_body({sendfile, O, B, P}, IsFin, Req=#{has_sent_resp := headers}) when is_integer(O), O >= 0, is_integer(B), B > 0 -> - stream_body({data, IsFin, {sendfile, O, B, P}}, Req); + stream_body({data, self(), IsFin, {sendfile, O, B, P}}, Req); stream_body(Data, IsFin=nofin, Req=#{has_sent_resp := headers}) when not is_tuple(Data) -> case iolist_size(Data) of 0 -> ok; - _ -> stream_body({data, IsFin, Data}, Req) + _ -> stream_body({data, self(), IsFin, Data}, Req) end; stream_body(Data, IsFin, Req=#{has_sent_resp := headers}) when not is_tuple(Data) -> - stream_body({data, IsFin, Data}, Req). + stream_body({data, self(), IsFin, Data}, Req). %% @todo Do we need a timeout? stream_body(Msg, #{pid := Pid, streamid := StreamID}) -> @@ -850,7 +850,7 @@ stream_body(Msg, #{pid := Pid, streamid := StreamID}) -> stream_events(Event, IsFin, Req) when is_map(Event) -> stream_events([Event], IsFin, Req); stream_events(Events, IsFin, Req=#{has_sent_resp := headers}) -> - stream_body({data, IsFin, cow_sse:events(Events)}, Req). + stream_body({data, self(), IsFin, cow_sse:events(Events)}, Req). -spec stream_trailers(cowboy:http_headers(), req()) -> ok. stream_trailers(Trailers, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) -> diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index cc38bb2..9f0e745 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -41,6 +41,7 @@ read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()}, read_body_buffer = <<>> :: binary(), body_length = 0 :: non_neg_integer(), + stream_body_pid = undefined :: pid() | undefined, stream_body_status = normal :: normal | blocking | blocked }). @@ -225,28 +226,36 @@ info(StreamID, Headers={headers, _, _}, State) -> do_info(StreamID, Headers, [Headers], State#state{expect=undefined}); %% Sending data involves the data message, the stream_buffer_full alarm %% and the connection_buffer_full alarm. We stop sending acks when an alarm is on. -info(StreamID, Data={data, _, _}, State0=#state{pid=Pid, stream_body_status=Status}) -> +%% +%% We only apply backpressure when the message includes a pid. Otherwise +%% it is a message from Cowboy, or the user circumventing the backpressure. +%% +%% We currently do not support sending data from multiple processes concurrently. +info(StreamID, Data={data, _, _}, State) -> + do_info(StreamID, Data, [Data], State); +info(StreamID, Data0={data, Pid, _, _}, State0=#state{stream_body_status=Status}) -> State = case Status of normal -> Pid ! {data_ack, self()}, State0; blocking -> - State0#state{stream_body_status=blocked}; + State0#state{stream_body_pid=Pid, stream_body_status=blocked}; blocked -> State0 end, + Data = erlang:delete_element(2, Data0), do_info(StreamID, Data, [Data], State); info(StreamID, Alarm={alarm, Name, on}, State) when Name =:= connection_buffer_full; Name =:= stream_buffer_full -> do_info(StreamID, Alarm, [], State#state{stream_body_status=blocking}); -info(StreamID, Alarm={alarm, Name, off}, State=#state{pid=Pid, stream_body_status=Status}) +info(StreamID, Alarm={alarm, Name, off}, State=#state{stream_body_pid=Pid, stream_body_status=Status}) when Name =:= connection_buffer_full; Name =:= stream_buffer_full -> _ = case Status of normal -> ok; blocking -> ok; blocked -> Pid ! {data_ack, self()} end, - do_info(StreamID, Alarm, [], State#state{stream_body_status=normal}); + do_info(StreamID, Alarm, [], State#state{stream_body_pid=undefined, stream_body_status=normal}); info(StreamID, Trailers={trailers, _}, State) -> do_info(StreamID, Trailers, [Trailers], State); info(StreamID, Push={push, _, _, _, _, _, _, _}, State) -> diff --git a/test/handlers/resp_h.erl b/test/handlers/resp_h.erl index 19405db..bc76d56 100644 --- a/test/handlers/resp_h.erl +++ b/test/handlers/resp_h.erl @@ -247,6 +247,21 @@ do(<<"stream_body">>, Req0, Opts) -> cowboy_req:stream_body(<<"Hello! ">>, nofin, Req), cowboy_req:stream_body({sendfile, 0, AppSize, AppFile}, fin, Req), {ok, Req, Opts}; + <<"spawn">> -> + Req = cowboy_req:stream_reply(200, Req0), + Parent = self(), + Pid = spawn(fun() -> + cowboy_req:stream_body(<<"Hello ">>, nofin, Req), + cowboy_req:stream_body(<<"world">>, nofin, Req), + cowboy_req:stream_body(<<"!">>, fin, Req), + Parent ! {self(), ok} + end), + receive + {Pid, ok} -> ok + after 5000 -> + error(timeout) + end, + {ok, Req, Opts}; _ -> %% Call stream_body without initiating streaming. cowboy_req:stream_body(<<0:800000>>, fin, Req0), diff --git a/test/req_SUITE.erl b/test/req_SUITE.erl index 72cc0ed..2cc8de4 100644 --- a/test/req_SUITE.erl +++ b/test/req_SUITE.erl @@ -942,6 +942,11 @@ stream_body_sendfile_fin(Config) -> {200, _, ExpectedBody} = do_get("/resp/stream_body/sendfile_fin", Config), ok. +stream_body_spawn(Config) -> + doc("Confirm we can use cowboy_req:stream_body/3 from another process."), + {200, _, <<"Hello world!">>} = do_get("/resp/stream_body/spawn", Config), + ok. + stream_body_content_length_multiple(Config) -> doc("Streamed body via multiple calls."), {200, _, <<"Hello world!">>} = do_get("/resp/stream_body_content_length/multiple", Config), -- cgit v1.2.3