diff options
author | Loïc Hoguin <[email protected]> | 2019-09-13 14:20:04 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2019-09-14 18:21:05 +0200 |
commit | 49af57d546b5e2fd5aaa9fcd43d09060b9682c5a (patch) | |
tree | a59b73e1039fe33081491bf2a55621a7d2563356 /src/cowboy_req.erl | |
parent | 4427108b69fcd1e6a8233a217fa0e99d0564b714 (diff) | |
download | cowboy-49af57d546b5e2fd5aaa9fcd43d09060b9682c5a.tar.gz cowboy-49af57d546b5e2fd5aaa9fcd43d09060b9682c5a.tar.bz2 cowboy-49af57d546b5e2fd5aaa9fcd43d09060b9682c5a.zip |
Implement backpressure on cowboy_req:stream_body
This should limit the amount of memory that Cowboy is using
when a handler is sending data much faster than the network.
The new max_stream_buffer_size is a soft limit and only has
an effect when the cowboy_stream_h handler is used.
Diffstat (limited to 'src/cowboy_req.erl')
-rw-r--r-- | src/cowboy_req.erl | 33 |
1 files changed, 15 insertions, 18 deletions
diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 60dc86a..8e64a39 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -826,34 +826,31 @@ stream_body(_, _, #{method := <<"HEAD">>, has_sent_resp := headers}) -> %% is converted to a data tuple, however. stream_body({sendfile, _, 0, _}, nofin, _) -> ok; -stream_body({sendfile, _, 0, _}, IsFin=fin, - #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) -> - Pid ! {{Pid, StreamID}, {data, IsFin, <<>>}}, - ok; -stream_body({sendfile, O, B, P}, IsFin, - #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) +stream_body({sendfile, _, 0, _}, IsFin=fin, Req=#{has_sent_resp := headers}) -> + stream_body({data, IsFin, <<>>}, Req); +stream_body({sendfile, O, B, P}, IsFin, Req=#{has_sent_resp := headers}) when is_integer(O), O >= 0, is_integer(B), B > 0 -> - Pid ! {{Pid, StreamID}, {data, IsFin, {sendfile, O, B, P}}}, - ok; -stream_body(Data, IsFin=nofin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) + stream_body({data, IsFin, {sendfile, O, B, P}}, Req); +stream_body(Data, IsFin=nofin, Req=#{has_sent_resp := headers}) when not is_tuple(Data) -> case iolist_size(Data) of 0 -> ok; - _ -> - Pid ! {{Pid, StreamID}, {data, IsFin, Data}}, - ok + _ -> stream_body({data, IsFin, Data}, Req) end; -stream_body(Data, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) +stream_body(Data, IsFin, Req=#{has_sent_resp := headers}) when not is_tuple(Data) -> - Pid ! {{Pid, StreamID}, {data, IsFin, Data}}, - ok. + stream_body({data, IsFin, Data}, Req). + +%% @todo Do we need a timeout? +stream_body(Msg, #{pid := Pid, streamid := StreamID}) -> + Pid ! {{Pid, StreamID}, Msg}, + receive {data_ack, Pid} -> ok end. -spec stream_events(cow_sse:event() | [cow_sse:event()], fin | nofin, req()) -> ok. stream_events(Event, IsFin, Req) when is_map(Event) -> stream_events([Event], IsFin, Req); -stream_events(Events, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) -> - Pid ! {{Pid, StreamID}, {data, IsFin, cow_sse:events(Events)}}, - ok. +stream_events(Events, IsFin, Req=#{has_sent_resp := headers}) -> + stream_body({data, IsFin, cow_sse:events(Events)}, Req). -spec stream_trailers(cowboy:http_headers(), req()) -> ok. stream_trailers(Trailers, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) -> |