aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_stream_h.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-09-13 14:20:04 +0200
committerLoïc Hoguin <[email protected]>2019-09-14 18:21:05 +0200
commit49af57d546b5e2fd5aaa9fcd43d09060b9682c5a (patch)
treea59b73e1039fe33081491bf2a55621a7d2563356 /src/cowboy_stream_h.erl
parent4427108b69fcd1e6a8233a217fa0e99d0564b714 (diff)
downloadcowboy-49af57d546b5e2fd5aaa9fcd43d09060b9682c5a.tar.gz
cowboy-49af57d546b5e2fd5aaa9fcd43d09060b9682c5a.tar.bz2
cowboy-49af57d546b5e2fd5aaa9fcd43d09060b9682c5a.zip
Implement backpressure on cowboy_req:stream_body
This should limit the amount of memory that Cowboy is using when a handler is sending data much faster than the network. The new max_stream_buffer_size is a soft limit and only has an effect when the cowboy_stream_h handler is used.
Diffstat (limited to 'src/cowboy_stream_h.erl')
-rw-r--r--src/cowboy_stream_h.erl24
1 files changed, 22 insertions, 2 deletions
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl
index 6c10517..cc9e271 100644
--- a/src/cowboy_stream_h.erl
+++ b/src/cowboy_stream_h.erl
@@ -39,7 +39,8 @@
read_body_length = 0 :: non_neg_integer() | infinity | auto,
read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
read_body_buffer = <<>> :: binary(),
- body_length = 0 :: non_neg_integer()
+ body_length = 0 :: non_neg_integer(),
+ stream_body_status = normal :: normal | blocking | blocked
}).
%% @todo For shutting down children we need to have a timeout before we terminate
@@ -219,8 +220,27 @@ info(StreamID, Response={response, _, _, _}, State) ->
do_info(StreamID, Response, [Response], State#state{expect=undefined});
info(StreamID, Headers={headers, _, _}, State) ->
do_info(StreamID, Headers, [Headers], State#state{expect=undefined});
-info(StreamID, Data={data, _, _}, State) ->
+%% Sending data involves the data message and the stream_buffer_full alarm.
+%% We stop sending acks when the alarm is on.
+info(StreamID, Data={data, _, _}, State0=#state{pid=Pid, stream_body_status=Status}) ->
+ State = case Status of
+ normal ->
+ Pid ! {data_ack, self()},
+ State0;
+ blocking ->
+ State0#state{stream_body_status=blocked};
+ blocked ->
+ State0
+ end,
do_info(StreamID, Data, [Data], State);
+info(StreamID, Alarm={alarm, stream_buffer_full, on}, State) ->
+ do_info(StreamID, Alarm, [], State#state{stream_body_status=blocking});
+info(StreamID, Alarm={alarm, stream_buffer_full, off}, State=#state{pid=Pid, stream_body_status=Status}) ->
+ _ = case Status of
+ blocking -> ok;
+ blocked -> Pid ! {data_ack, self()}
+ end,
+ do_info(StreamID, Alarm, [], State#state{stream_body_status=normal});
info(StreamID, Trailers={trailers, _}, State) ->
do_info(StreamID, Trailers, [Trailers], State);
info(StreamID, Push={push, _, _, _, _, _, _, _}, State) ->