diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 7039ab2..c4b9767 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -45,6 +45,7 @@
max_encode_table_size => non_neg_integer(),
max_frame_size_received => 16384..16777215,
max_frame_size_sent => 16384..16777215 | infinity,
+ max_stream_buffer_size => non_neg_integer(),
max_stream_window_size => 0..16#7fffffff,
metrics_callback => cowboy_metrics_h:metrics_callback(),
middlewares => [module()],
@@ -292,7 +293,11 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
{ok, GoAway={goaway, _, _, _}, HTTP2Machine} ->
goaway(State#state{http2_machine=HTTP2Machine}, GoAway);
{send, SendData, HTTP2Machine} ->
- send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData);
+ %% We may need to send an alarm for each of the streams sending data.
+ lists:foldl(
+ fun({StreamID, _, _}, S) -> maybe_send_data_alarm(S, HTTP2Machine0, StreamID) end,
+ send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData),
+ SendData);
{error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
StreamID, {stream_error, Reason, Human});
@@ -712,7 +717,7 @@ maybe_send_data(State0=#state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Dat
case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of
{ok, HTTP2Machine} ->
- State0#state{http2_machine=HTTP2Machine};
+ maybe_send_data_alarm(State0#state{http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID);
{send, SendData, HTTP2Machine} ->
State = #state{http2_status=Status, streams=Streams}
= send_data(State0#state{http2_machine=HTTP2Machine}, SendData),
@@ -721,7 +726,7 @@ maybe_send_data(State0=#state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Dat
Status =:= closing, Streams =:= #{} ->
terminate(State, {stop, normal, 'The connection is going away.'});
true ->
- State
+ maybe_send_data_alarm(State, HTTP2Machine0, StreamID)
@@ -759,6 +764,31 @@ send_data_frame(State=#state{socket=Socket, transport=Transport,
Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
+%% After we have sent or queued data we may need to set or clear an alarm.
+%% We do this by comparing the HTTP2Machine buffer state before/after for
+%% the relevant streams.
+maybe_send_data_alarm(State=#state{opts=Opts, http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID) ->
+ {ok, BufferSizeBefore} = cow_http2_machine:get_stream_local_buffer_size(StreamID, HTTP2Machine0),
+ %% When the stream ends up closed after it finished sending data,
+ %% we do not want to trigger an alarm. We act as if the buffer
+ %% size did not change.
+ BufferSizeAfter = case cow_http2_machine:get_stream_local_buffer_size(StreamID, HTTP2Machine) of
+ {ok, BSA} -> BSA;
+ {error, closed} -> BufferSizeBefore
+ end,
+ MaxBufferSize = maps:get(max_stream_buffer_size, Opts, 8000000),
+ %% I do not want to document these internal_events yet. I am not yet
+ %% convinced it should be {alarm, Name, on|off} and not {internal_event, E}
+ %% or something else entirely.
+ if
+ BufferSizeBefore >= MaxBufferSize, BufferSizeAfter < MaxBufferSize ->
+ info(State, StreamID, {alarm, stream_buffer_full, off});
+ BufferSizeBefore < MaxBufferSize, BufferSizeAfter >= MaxBufferSize ->
+ info(State, StreamID, {alarm, stream_buffer_full, on});
+ true ->
+ State
+ end.
%% Terminate a stream or the connection.
%% We may have to cancel streams even if we receive multiple
diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl
index 60dc86a..8e64a39 100644
--- a/src/cowboy_req.erl
+++ b/src/cowboy_req.erl
@@ -826,34 +826,31 @@ stream_body(_, _, #{method := <<"HEAD">>, has_sent_resp := headers}) ->
%% is converted to a data tuple, however.
stream_body({sendfile, _, 0, _}, nofin, _) ->
-stream_body({sendfile, _, 0, _}, IsFin=fin,
- #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->
- Pid ! {{Pid, StreamID}, {data, IsFin, <<>>}},
- ok;
-stream_body({sendfile, O, B, P}, IsFin,
- #{pid := Pid, streamid := StreamID, has_sent_resp := headers})
+stream_body({sendfile, _, 0, _}, IsFin=fin, Req=#{has_sent_resp := headers}) ->
+ stream_body({data, IsFin, <<>>}, Req);
+stream_body({sendfile, O, B, P}, IsFin, Req=#{has_sent_resp := headers})
when is_integer(O), O >= 0, is_integer(B), B > 0 ->
- Pid ! {{Pid, StreamID}, {data, IsFin, {sendfile, O, B, P}}},
- ok;
-stream_body(Data, IsFin=nofin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers})
+ stream_body({data, IsFin, {sendfile, O, B, P}}, Req);
+stream_body(Data, IsFin=nofin, Req=#{has_sent_resp := headers})
when not is_tuple(Data) ->
case iolist_size(Data) of
0 -> ok;
- _ ->
- Pid ! {{Pid, StreamID}, {data, IsFin, Data}},
- ok
+ _ -> stream_body({data, IsFin, Data}, Req)
-stream_body(Data, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers})
+stream_body(Data, IsFin, Req=#{has_sent_resp := headers})
when not is_tuple(Data) ->
- Pid ! {{Pid, StreamID}, {data, IsFin, Data}},
- ok.
+ stream_body({data, IsFin, Data}, Req).
+%% @todo Do we need a timeout?
+stream_body(Msg, #{pid := Pid, streamid := StreamID}) ->
+ Pid ! {{Pid, StreamID}, Msg},
+ receive {data_ack, Pid} -> ok end.
-spec stream_events(cow_sse:event() | [cow_sse:event()], fin | nofin, req()) -> ok.
stream_events(Event, IsFin, Req) when is_map(Event) ->
stream_events([Event], IsFin, Req);
-stream_events(Events, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->
- Pid ! {{Pid, StreamID}, {data, IsFin, cow_sse:events(Events)}},
- ok.
+stream_events(Events, IsFin, Req=#{has_sent_resp := headers}) ->
+ stream_body({data, IsFin, cow_sse:events(Events)}, Req).
-spec stream_trailers(cowboy:http_headers(), req()) -> ok.
stream_trailers(Trailers, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl
index 6c10517..cc9e271 100644
--- a/src/cowboy_stream_h.erl
+++ b/src/cowboy_stream_h.erl
@@ -39,7 +39,8 @@
read_body_length = 0 :: non_neg_integer() | infinity | auto,
read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
read_body_buffer = <<>> :: binary(),
- body_length = 0 :: non_neg_integer()
+ body_length = 0 :: non_neg_integer(),
+ stream_body_status = normal :: normal | blocking | blocked
%% @todo For shutting down children we need to have a timeout before we terminate
@@ -219,8 +220,27 @@ info(StreamID, Response={response, _, _, _}, State) ->
do_info(StreamID, Response, [Response], State#state{expect=undefined});
info(StreamID, Headers={headers, _, _}, State) ->
do_info(StreamID, Headers, [Headers], State#state{expect=undefined});
-info(StreamID, Data={data, _, _}, State) ->
+%% Sending data involves the data message and the stream_buffer_full alarm.
+%% We stop sending acks when the alarm is on.
+info(StreamID, Data={data, _, _}, State0=#state{pid=Pid, stream_body_status=Status}) ->
+ State = case Status of
+ normal ->
+ Pid ! {data_ack, self()},
+ State0;
+ blocking ->
+ State0#state{stream_body_status=blocked};
+ blocked ->
+ State0
+ end,
do_info(StreamID, Data, [Data], State);
+info(StreamID, Alarm={alarm, stream_buffer_full, on}, State) ->
+ do_info(StreamID, Alarm, [], State#state{stream_body_status=blocking});
+info(StreamID, Alarm={alarm, stream_buffer_full, off}, State=#state{pid=Pid, stream_body_status=Status}) ->
+ _ = case Status of
+ blocking -> ok;
+ blocked -> Pid ! {data_ack, self()}
+ end,
+ do_info(StreamID, Alarm, [], State#state{stream_body_status=normal});
info(StreamID, Trailers={trailers, _}, State) ->
do_info(StreamID, Trailers, [Trailers], State);
info(StreamID, Push={push, _, _, _, _, _, _, _}, State) ->