aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_stream_h.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cowboy_stream_h.erl')
-rw-r--r--src/cowboy_stream_h.erl17
1 files changed, 13 insertions, 4 deletions
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl
index cc38bb2..9f0e745 100644
--- a/src/cowboy_stream_h.erl
+++ b/src/cowboy_stream_h.erl
@@ -41,6 +41,7 @@
read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
read_body_buffer = <<>> :: binary(),
body_length = 0 :: non_neg_integer(),
+ stream_body_pid = undefined :: pid() | undefined,
stream_body_status = normal :: normal | blocking | blocked
}).
@@ -225,28 +226,36 @@ info(StreamID, Headers={headers, _, _}, State) ->
do_info(StreamID, Headers, [Headers], State#state{expect=undefined});
%% Sending data involves the data message, the stream_buffer_full alarm
%% and the connection_buffer_full alarm. We stop sending acks when an alarm is on.
-info(StreamID, Data={data, _, _}, State0=#state{pid=Pid, stream_body_status=Status}) ->
+%%
+%% We only apply backpressure when the message includes a pid. Otherwise
+%% it is a message from Cowboy, or the user circumventing the backpressure.
+%%
+%% We currently do not support sending data from multiple processes concurrently.
+info(StreamID, Data={data, _, _}, State) ->
+ do_info(StreamID, Data, [Data], State);
+info(StreamID, Data0={data, Pid, _, _}, State0=#state{stream_body_status=Status}) ->
State = case Status of
normal ->
Pid ! {data_ack, self()},
State0;
blocking ->
- State0#state{stream_body_status=blocked};
+ State0#state{stream_body_pid=Pid, stream_body_status=blocked};
blocked ->
State0
end,
+ Data = erlang:delete_element(2, Data0),
do_info(StreamID, Data, [Data], State);
info(StreamID, Alarm={alarm, Name, on}, State)
when Name =:= connection_buffer_full; Name =:= stream_buffer_full ->
do_info(StreamID, Alarm, [], State#state{stream_body_status=blocking});
-info(StreamID, Alarm={alarm, Name, off}, State=#state{pid=Pid, stream_body_status=Status})
+info(StreamID, Alarm={alarm, Name, off}, State=#state{stream_body_pid=Pid, stream_body_status=Status})
when Name =:= connection_buffer_full; Name =:= stream_buffer_full ->
_ = case Status of
normal -> ok;
blocking -> ok;
blocked -> Pid ! {data_ack, self()}
end,
- do_info(StreamID, Alarm, [], State#state{stream_body_status=normal});
+ do_info(StreamID, Alarm, [], State#state{stream_body_pid=undefined, stream_body_status=normal});
info(StreamID, Trailers={trailers, _}, State) ->
do_info(StreamID, Trailers, [Trailers], State);
info(StreamID, Push={push, _, _, _, _, _, _, _}, State) ->