diff options
author | Loïc Hoguin <[email protected]> | 2019-10-02 20:30:32 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2019-10-02 20:30:32 +0200 |
commit | eaa052616f7c85d4daf4817ceecd5f87fecfc8f4 (patch) | |
tree | 1865efbc9551631607435119839e8af0dc779a8b /src/cowboy_stream_h.erl | |
parent | 20660d7566b63977e80f694724fee890d875ec1b (diff) | |
download | cowboy-eaa052616f7c85d4daf4817ceecd5f87fecfc8f4.tar.gz cowboy-eaa052616f7c85d4daf4817ceecd5f87fecfc8f4.tar.bz2 cowboy-eaa052616f7c85d4daf4817ceecd5f87fecfc8f4.zip |
Ensure we can stream the response body from any process
Diffstat (limited to 'src/cowboy_stream_h.erl')
-rw-r--r-- | src/cowboy_stream_h.erl | 17 |
1 files changed, 13 insertions, 4 deletions
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index cc38bb2..9f0e745 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -41,6 +41,7 @@ read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()}, read_body_buffer = <<>> :: binary(), body_length = 0 :: non_neg_integer(), + stream_body_pid = undefined :: pid() | undefined, stream_body_status = normal :: normal | blocking | blocked }). @@ -225,28 +226,36 @@ info(StreamID, Headers={headers, _, _}, State) -> do_info(StreamID, Headers, [Headers], State#state{expect=undefined}); %% Sending data involves the data message, the stream_buffer_full alarm %% and the connection_buffer_full alarm. We stop sending acks when an alarm is on. -info(StreamID, Data={data, _, _}, State0=#state{pid=Pid, stream_body_status=Status}) -> +%% +%% We only apply backpressure when the message includes a pid. Otherwise +%% it is a message from Cowboy, or the user circumventing the backpressure. +%% +%% We currently do not support sending data from multiple processes concurrently. +info(StreamID, Data={data, _, _}, State) -> + do_info(StreamID, Data, [Data], State); +info(StreamID, Data0={data, Pid, _, _}, State0=#state{stream_body_status=Status}) -> State = case Status of normal -> Pid ! {data_ack, self()}, State0; blocking -> - State0#state{stream_body_status=blocked}; + State0#state{stream_body_pid=Pid, stream_body_status=blocked}; blocked -> State0 end, + Data = erlang:delete_element(2, Data0), do_info(StreamID, Data, [Data], State); info(StreamID, Alarm={alarm, Name, on}, State) when Name =:= connection_buffer_full; Name =:= stream_buffer_full -> do_info(StreamID, Alarm, [], State#state{stream_body_status=blocking}); -info(StreamID, Alarm={alarm, Name, off}, State=#state{pid=Pid, stream_body_status=Status}) +info(StreamID, Alarm={alarm, Name, off}, State=#state{stream_body_pid=Pid, stream_body_status=Status}) when Name =:= connection_buffer_full; Name =:= stream_buffer_full -> _ = case Status of normal -> ok; blocking -> ok; blocked -> Pid ! {data_ack, self()} end, - do_info(StreamID, Alarm, [], State#state{stream_body_status=normal}); + do_info(StreamID, Alarm, [], State#state{stream_body_pid=undefined, stream_body_status=normal}); info(StreamID, Trailers={trailers, _}, State) -> do_info(StreamID, Trailers, [Trailers], State); info(StreamID, Push={push, _, _, _, _, _, _, _}, State) -> |