From 49af57d546b5e2fd5aaa9fcd43d09060b9682c5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 13 Sep 2019 14:20:04 +0200 Subject: Implement backpressure on cowboy_req:stream_body This should limit the amount of memory that Cowboy is using when a handler is sending data much faster than the network. The new max_stream_buffer_size is a soft limit and only has an effect when the cowboy_stream_h handler is used. --- doc/src/manual/cowboy_http2.asciidoc | 11 ++++++++++- src/cowboy_http2.erl | 36 +++++++++++++++++++++++++++++++++--- src/cowboy_req.erl | 33 +++++++++++++++------------------ src/cowboy_stream_h.erl | 24 ++++++++++++++++++++++-- test/handlers/resp_h.erl | 5 +++++ test/req_SUITE.erl | 7 ++++++- 6 files changed, 91 insertions(+), 25 deletions(-) diff --git a/doc/src/manual/cowboy_http2.asciidoc b/doc/src/manual/cowboy_http2.asciidoc index 19271d4..e899289 100644 --- a/doc/src/manual/cowboy_http2.asciidoc +++ b/doc/src/manual/cowboy_http2.asciidoc @@ -31,6 +31,7 @@ opts() :: #{ max_encode_table_size => non_neg_integer(), max_frame_size_received => 16384..16777215, max_frame_size_sent => 16384..16777215 | infinity, + max_stream_buffer_size => non_neg_integer(), max_stream_window_size => 0..16#7fffffff, preface_timeout => timeout(), proxy_header => boolean(), @@ -136,6 +137,12 @@ following the client's advertised maximum. Note that actual frame sizes may be lower than the limit when there is not enough space left in the flow control window. +max_stream_buffer_size (8000000):: + +Maximum stream buffer size in bytes. This is a soft limit used +to apply backpressure to handlers that send data faster than +the HTTP/2 connection allows. + max_stream_window_size (16#7fffffff):: Maximum stream window size in bytes. This is used as an upper bound @@ -186,7 +193,9 @@ too many `WINDOW_UPDATE` frames. `max_connection_window_size`, `max_stream_window_size`, `stream_window_margin_size` and `stream_window_update_threshold` to configure - behavior on sending WINDOW_UPDATE frames. + behavior on sending WINDOW_UPDATE frames, and + `max_stream_buffer_size` to apply backpressure + when sending data too fast. * *2.6*: The `proxy_header` and `sendfile` options were added. * *2.4*: Add the options `initial_connection_window_size`, `initial_stream_window_size`, `max_concurrent_streams`, diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 7039ab2..c4b9767 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -45,6 +45,7 @@ max_encode_table_size => non_neg_integer(), max_frame_size_received => 16384..16777215, max_frame_size_sent => 16384..16777215 | infinity, + max_stream_buffer_size => non_neg_integer(), max_stream_window_size => 0..16#7fffffff, metrics_callback => cowboy_metrics_h:metrics_callback(), middlewares => [module()], @@ -292,7 +293,11 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> {ok, GoAway={goaway, _, _, _}, HTTP2Machine} -> goaway(State#state{http2_machine=HTTP2Machine}, GoAway); {send, SendData, HTTP2Machine} -> - send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData); + %% We may need to send an alarm for each of the streams sending data. + lists:foldl( + fun({StreamID, _, _}, S) -> maybe_send_data_alarm(S, HTTP2Machine0, StreamID) end, + send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData), + SendData); {error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} -> reset_stream(State#state{http2_machine=HTTP2Machine}, StreamID, {stream_error, Reason, Human}); @@ -712,7 +717,7 @@ maybe_send_data(State0=#state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Dat end, case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of {ok, HTTP2Machine} -> - State0#state{http2_machine=HTTP2Machine}; + maybe_send_data_alarm(State0#state{http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID); {send, SendData, HTTP2Machine} -> State = #state{http2_status=Status, streams=Streams} = send_data(State0#state{http2_machine=HTTP2Machine}, SendData), @@ -721,7 +726,7 @@ maybe_send_data(State0=#state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Dat Status =:= closing, Streams =:= #{} -> terminate(State, {stop, normal, 'The connection is going away.'}); true -> - State + maybe_send_data_alarm(State, HTTP2Machine0, StreamID) end end. @@ -759,6 +764,31 @@ send_data_frame(State=#state{socket=Socket, transport=Transport, Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), State#state{http2_machine=HTTP2Machine}. +%% After we have sent or queued data we may need to set or clear an alarm. +%% We do this by comparing the HTTP2Machine buffer state before/after for +%% the relevant streams. +maybe_send_data_alarm(State=#state{opts=Opts, http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID) -> + {ok, BufferSizeBefore} = cow_http2_machine:get_stream_local_buffer_size(StreamID, HTTP2Machine0), + %% When the stream ends up closed after it finished sending data, + %% we do not want to trigger an alarm. We act as if the buffer + %% size did not change. + BufferSizeAfter = case cow_http2_machine:get_stream_local_buffer_size(StreamID, HTTP2Machine) of + {ok, BSA} -> BSA; + {error, closed} -> BufferSizeBefore + end, + MaxBufferSize = maps:get(max_stream_buffer_size, Opts, 8000000), + %% I do not want to document these internal_events yet. I am not yet + %% convinced it should be {alarm, Name, on|off} and not {internal_event, E} + %% or something else entirely. + if + BufferSizeBefore >= MaxBufferSize, BufferSizeAfter < MaxBufferSize -> + info(State, StreamID, {alarm, stream_buffer_full, off}); + BufferSizeBefore < MaxBufferSize, BufferSizeAfter >= MaxBufferSize -> + info(State, StreamID, {alarm, stream_buffer_full, on}); + true -> + State + end. + %% Terminate a stream or the connection. %% We may have to cancel streams even if we receive multiple diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 60dc86a..8e64a39 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -826,34 +826,31 @@ stream_body(_, _, #{method := <<"HEAD">>, has_sent_resp := headers}) -> %% is converted to a data tuple, however. stream_body({sendfile, _, 0, _}, nofin, _) -> ok; -stream_body({sendfile, _, 0, _}, IsFin=fin, - #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) -> - Pid ! {{Pid, StreamID}, {data, IsFin, <<>>}}, - ok; -stream_body({sendfile, O, B, P}, IsFin, - #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) +stream_body({sendfile, _, 0, _}, IsFin=fin, Req=#{has_sent_resp := headers}) -> + stream_body({data, IsFin, <<>>}, Req); +stream_body({sendfile, O, B, P}, IsFin, Req=#{has_sent_resp := headers}) when is_integer(O), O >= 0, is_integer(B), B > 0 -> - Pid ! {{Pid, StreamID}, {data, IsFin, {sendfile, O, B, P}}}, - ok; -stream_body(Data, IsFin=nofin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) + stream_body({data, IsFin, {sendfile, O, B, P}}, Req); +stream_body(Data, IsFin=nofin, Req=#{has_sent_resp := headers}) when not is_tuple(Data) -> case iolist_size(Data) of 0 -> ok; - _ -> - Pid ! {{Pid, StreamID}, {data, IsFin, Data}}, - ok + _ -> stream_body({data, IsFin, Data}, Req) end; -stream_body(Data, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) +stream_body(Data, IsFin, Req=#{has_sent_resp := headers}) when not is_tuple(Data) -> - Pid ! {{Pid, StreamID}, {data, IsFin, Data}}, - ok. + stream_body({data, IsFin, Data}, Req). + +%% @todo Do we need a timeout? +stream_body(Msg, #{pid := Pid, streamid := StreamID}) -> + Pid ! {{Pid, StreamID}, Msg}, + receive {data_ack, Pid} -> ok end. -spec stream_events(cow_sse:event() | [cow_sse:event()], fin | nofin, req()) -> ok. stream_events(Event, IsFin, Req) when is_map(Event) -> stream_events([Event], IsFin, Req); -stream_events(Events, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) -> - Pid ! {{Pid, StreamID}, {data, IsFin, cow_sse:events(Events)}}, - ok. +stream_events(Events, IsFin, Req=#{has_sent_resp := headers}) -> + stream_body({data, IsFin, cow_sse:events(Events)}, Req). -spec stream_trailers(cowboy:http_headers(), req()) -> ok. stream_trailers(Trailers, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) -> diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index 6c10517..cc9e271 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -39,7 +39,8 @@ read_body_length = 0 :: non_neg_integer() | infinity | auto, read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()}, read_body_buffer = <<>> :: binary(), - body_length = 0 :: non_neg_integer() + body_length = 0 :: non_neg_integer(), + stream_body_status = normal :: normal | blocking | blocked }). %% @todo For shutting down children we need to have a timeout before we terminate @@ -219,8 +220,27 @@ info(StreamID, Response={response, _, _, _}, State) -> do_info(StreamID, Response, [Response], State#state{expect=undefined}); info(StreamID, Headers={headers, _, _}, State) -> do_info(StreamID, Headers, [Headers], State#state{expect=undefined}); -info(StreamID, Data={data, _, _}, State) -> +%% Sending data involves the data message and the stream_buffer_full alarm. +%% We stop sending acks when the alarm is on. +info(StreamID, Data={data, _, _}, State0=#state{pid=Pid, stream_body_status=Status}) -> + State = case Status of + normal -> + Pid ! {data_ack, self()}, + State0; + blocking -> + State0#state{stream_body_status=blocked}; + blocked -> + State0 + end, do_info(StreamID, Data, [Data], State); +info(StreamID, Alarm={alarm, stream_buffer_full, on}, State) -> + do_info(StreamID, Alarm, [], State#state{stream_body_status=blocking}); +info(StreamID, Alarm={alarm, stream_buffer_full, off}, State=#state{pid=Pid, stream_body_status=Status}) -> + _ = case Status of + blocking -> ok; + blocked -> Pid ! {data_ack, self()} + end, + do_info(StreamID, Alarm, [], State#state{stream_body_status=normal}); info(StreamID, Trailers={trailers, _}, State) -> do_info(StreamID, Trailers, [Trailers], State); info(StreamID, Push={push, _, _, _, _, _, _, _}, State) -> diff --git a/test/handlers/resp_h.erl b/test/handlers/resp_h.erl index 5e5e766..19405db 100644 --- a/test/handlers/resp_h.erl +++ b/test/handlers/resp_h.erl @@ -221,6 +221,11 @@ do(<<"stream_body">>, Req0, Opts) -> cowboy_req:stream_body(<<"world">>, nofin, Req), cowboy_req:stream_body(<<"!">>, fin, Req), {ok, Req, Opts}; + <<"loop">> -> + Req = cowboy_req:stream_reply(200, Req0), + _ = [cowboy_req:stream_body(<<0:1000000/unit:8>>, nofin, Req) + || _ <- lists:seq(1, 32)], + {ok, Req, Opts}; <<"nofin">> -> Req = cowboy_req:stream_reply(200, Req0), cowboy_req:stream_body(<<"Hello world!">>, nofin, Req), diff --git a/test/req_SUITE.erl b/test/req_SUITE.erl index a30e33f..cbb0991 100644 --- a/test/req_SUITE.erl +++ b/test/req_SUITE.erl @@ -98,7 +98,7 @@ do_get(Path, Headers, Config) -> Ref = gun:get(ConnPid, Path, [{<<"accept-encoding">>, <<"gzip">>}|Headers]), {response, IsFin, Status, RespHeaders} = gun:await(ConnPid, Ref), {ok, RespBody} = case IsFin of - nofin -> gun:await_body(ConnPid, Ref); + nofin -> gun:await_body(ConnPid, Ref, 30000); fin -> {ok, <<>>} end, gun:close(ConnPid), @@ -891,6 +891,11 @@ stream_body_multiple(Config) -> {200, _, <<"Hello world!">>} = do_get("/resp/stream_body/multiple", Config), ok. +stream_body_loop(Config) -> + doc("Streamed body via a fast loop."), + {200, _, <<0:32000000/unit:8>>} = do_get("/resp/stream_body/loop", Config), + ok. + stream_body_nofin(Config) -> doc("Unfinished streamed body."), {200, _, <<"Hello world!">>} = do_get("/resp/stream_body/nofin", Config), -- cgit v1.2.3