From f08f4610a08a187c573da2273494a27894bea54c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eric=20Meadows-J=C3=B6nsson?= Date: Wed, 16 May 2018 13:28:49 +0200 Subject: Add streaming without chunking for HTTP/1.1 If content-length is set in the response headers we can skip chunked transfer-encoding. --- doc/src/manual/cowboy_req.stream_reply.asciidoc | 4 +- src/cowboy_http.erl | 99 ++++++++++++++++--------- test/handlers/resp_h.erl | 26 +++++++ test/req_SUITE.erl | 49 ++++++++++++ 4 files changed, 140 insertions(+), 38 deletions(-) diff --git a/doc/src/manual/cowboy_req.stream_reply.asciidoc b/doc/src/manual/cowboy_req.stream_reply.asciidoc index 74c1768..82f49c6 100644 --- a/doc/src/manual/cowboy_req.stream_reply.asciidoc +++ b/doc/src/manual/cowboy_req.stream_reply.asciidoc @@ -45,7 +45,9 @@ more efficiently. The streaming method varies depending on the protocol being used. HTTP/2 will use the usual DATA frames. HTTP/1.1 will -use chunked transfer-encoding. HTTP/1.0 will send the body +use chunked transfer-encoding, if the content-length +response header is set the body will be sent without chunked +chunked transfer-encoding. HTTP/1.0 will send the body unmodified and close the connection at the end if no content-length was set. diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 9660902..157f246 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -78,6 +78,10 @@ version = undefined :: cowboy:http_version(), %% Unparsed te header. Used to know if we can send trailers. te :: undefined | binary(), + %% Expected body size. + local_expected_size = undefined :: undefined | non_neg_integer(), + %% Sent body size. + local_sent_size = 0 :: non_neg_integer(), %% Commands queued. queue = [] :: cowboy_stream:commands() }). @@ -113,7 +117,7 @@ out_streamid = 1 :: pos_integer(), %% Whether we finished writing data for the current stream. - out_state = wait :: wait | chunked | done, + out_state = wait :: wait | chunked | streaming | done, %% The connection will be closed after this stream. last_streamid = undefined :: pos_integer(), @@ -924,22 +928,29 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea Transport:send(Socket, [Response, Body]), commands(State#state{out_state=done}, StreamID, Tail) end; -%% Send response headers and initiate chunked encoding. -commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, - [{headers, StatusCode, Headers0}|Tail]) -> +%% Send response headers and initiate chunked encoding or streaming. +commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState}, + StreamID, [{headers, StatusCode, Headers0}|Tail]) -> %% @todo Same as above (about the last stream in the list). - Stream = #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams), - {State1, Headers1} = case {cow_http:status_to_integer(StatusCode), Version} of - {204, 'HTTP/1.1'} -> + Stream = #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams0), + Status = cow_http:status_to_integer(StatusCode), + ContentLength = maps:get(<<"content-length">>, Headers0, undefined), + {State1, Headers1} = case {Status, ContentLength, Version} of + {204, _, 'HTTP/1.1'} -> {State0#state{out_state=done}, Headers0}; - {_, 'HTTP/1.1'} -> + {_, undefined, 'HTTP/1.1'} -> {State0#state{out_state=chunked}, Headers0#{<<"transfer-encoding">> => <<"chunked">>}}; - %% Close the connection after streaming the data to HTTP/1.0 client. - %% @todo I'm guessing we need to differentiate responses with a content-length and others. - {_, 'HTTP/1.0'} -> - {State0#state{out_state=chunked, last_streamid=StreamID}, Headers0} + %% Close the connection after streaming without content-length to HTTP/1.0 client. + {_, undefined, 'HTTP/1.0'} -> + {State0#state{out_state=streaming, last_streamid=StreamID}, Headers0}; + %% Stream the response body without chunked transfer-encoding. + _ -> + ExpectedSize = cow_http_hd:parse_content_length(ContentLength), + Streams = lists:keyreplace(StreamID, #stream.id, Streams0, + Stream#stream{local_expected_size=ExpectedSize}), + {State0#state{out_state=streaming, streams=Streams}, Headers0} end, - Headers2 = case stream_te(Stream) of + Headers2 = case stream_te(OutState, Stream) of trailers -> Headers1; _ -> maps:remove(<<"trailer">>, Headers1) end, @@ -950,49 +961,60 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, Str %% %% @todo WINDOW_UPDATE stuff require us to buffer some data. %% @todo We probably want to allow Data to be the {sendfile, ...} tuple also. -commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, - [{data, IsFin, Data}|Tail]) -> +commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState}, + StreamID, [{data, IsFin, Data}|Tail]) -> %% Do not send anything when the user asks to send an empty %% data frame, as that would break the protocol. Size = iolist_size(Data), - case Size of + Stream0 = lists:keyfind(StreamID, #stream.id, Streams0), + Stream = case Size of 0 -> %% We send the last chunk only if version is HTTP/1.1 and IsFin=fin. - case lists:keyfind(StreamID, #stream.id, Streams) of - #stream{method= <<"HEAD">>} -> + case {OutState, Stream0} of + {_, #stream{method= <<"HEAD">>}} -> ok; - #stream{version='HTTP/1.1'} when IsFin =:= fin -> + {chunked, _} when IsFin =:= fin -> Transport:send(Socket, <<"0\r\n\r\n">>); _ -> ok - end; + end, + Stream0; _ -> %% @todo We need to kill the stream if it tries to send data before headers. %% @todo Same as above. - case lists:keyfind(StreamID, #stream.id, Streams) of - #stream{method= <<"HEAD">>} -> - ok; - #stream{version='HTTP/1.1'} -> + case {OutState, Stream0} of + {_, #stream{method= <<"HEAD">>}} -> + Stream0; + {chunked, _} -> Transport:send(Socket, [ integer_to_binary(Size, 16), <<"\r\n">>, Data, case IsFin of fin -> <<"\r\n0\r\n\r\n">>; nofin -> <<"\r\n">> end - ]); - #stream{version='HTTP/1.0'} -> - Transport:send(Socket, Data) + ]), + Stream0; + {streaming, #stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize}} -> + SentSize = SentSize0 + Size, + if + %% undefined is > any integer value. + SentSize > ExpectedSize -> + terminate(State0, response_body_too_large); + true -> + Transport:send(Socket, Data), + Stream0#stream{local_sent_size=SentSize} + end end end, State = case IsFin of fin -> State0#state{out_state=done}; nofin -> State0 end, - commands(State, StreamID, Tail); -%% Send trailers. -commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, - [{trailers, Trailers}|Tail]) -> - case stream_te(lists:keyfind(StreamID, #stream.id, Streams)) of + Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream), + commands(State#state{streams=Streams}, StreamID, Tail); +commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState}, + StreamID, [{trailers, Trailers}|Tail]) -> + case stream_te(OutState, lists:keyfind(StreamID, #stream.id, Streams)) of trailers -> Transport:send(Socket, [ <<"0\r\n">>, @@ -1008,6 +1030,7 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, Stre %% Send a file. commands(State0=#state{socket=Socket, transport=Transport}, StreamID, [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) -> + %% @todo exit with response_body_too_large if we exceed content-length %% We wrap the sendfile call into a try/catch because on OTP-20 %% and earlier a few different crashes could occur for sockets %% that were closing or closed. For example a badarg in @@ -1112,7 +1135,8 @@ stream_reset(State, StreamID, StreamError={internal_error, _, _}) -> stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState, out_streamid=OutStreamID, out_state=OutState, streams=Streams0, children=Children0}, StreamID, Reason) -> - #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams0), + #stream{version=Version, local_expected_size=ExpectedSize, local_sent_size=SentSize} + = lists:keyfind(StreamID, #stream.id, Streams0), State1 = #state{streams=Streams1} = case OutState of wait when element(1, Reason) =:= internal_error -> info(State0, StreamID, {response, 500, #{<<"content-length">> => <<"0">>}, <<>>}); @@ -1122,6 +1146,8 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta info(State0, StreamID, {response, 204, #{}, <<>>}); chunked when Version =:= 'HTTP/1.1' -> info(State0, StreamID, {data, fin, <<>>}); + streaming when ExpectedSize < SentSize -> + terminate(State0, response_body_too_small); _ -> %% done or Version =:= 'HTTP/1.0' State0 end, @@ -1214,13 +1240,12 @@ connection_hd_is_close(Conn) -> Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)), lists:member(<<"close">>, Conns). -%% HTTP/1.0 doesn't support chunked transfer-encoding. -stream_te(#stream{version='HTTP/1.0'}) -> +stream_te(streaming, _) -> not_chunked; %% No TE header was sent. -stream_te(#stream{te=undefined}) -> +stream_te(_, #stream{te=undefined}) -> no_trailers; -stream_te(#stream{te=TE0}) -> +stream_te(_, #stream{te=TE0}) -> try cow_http_hd:parse_te(TE0) of {TE1, _} -> TE1 catch _:_ -> diff --git a/test/handlers/resp_h.erl b/test/handlers/resp_h.erl index 68b5fe0..e924c89 100644 --- a/test/handlers/resp_h.erl +++ b/test/handlers/resp_h.erl @@ -220,6 +220,32 @@ do(<<"stream_body">>, Req0, Opts) -> cowboy_req:stream_body(<<0:800000>>, fin, Req0), {ok, Req0, Opts} end; +do(<<"stream_body_content_length">>, Req0, Opts) -> + case cowboy_req:binding(arg, Req0) of + <<"fin0">> -> + Req1 = cowboy_req:set_resp_header(<<"content-length">>, <<"12">>, Req0), + Req = cowboy_req:stream_reply(200, Req1), + cowboy_req:stream_body(<<"Hello world!">>, nofin, Req), + cowboy_req:stream_body(<<>>, fin, Req), + {ok, Req, Opts}; + <<"multiple">> -> + Req1 = cowboy_req:set_resp_header(<<"content-length">>, <<"12">>, Req0), + Req = cowboy_req:stream_reply(200, Req1), + cowboy_req:stream_body(<<"Hello ">>, nofin, Req), + cowboy_req:stream_body(<<"world">>, nofin, Req), + cowboy_req:stream_body(<<"!">>, fin, Req), + {ok, Req, Opts}; + <<"nofin">> -> + Req1 = cowboy_req:set_resp_header(<<"content-length">>, <<"12">>, Req0), + Req = cowboy_req:stream_reply(200, Req1), + cowboy_req:stream_body(<<"Hello world!">>, nofin, Req), + {ok, Req, Opts}; + <<"nofin-error">> -> + Req1 = cowboy_req:set_resp_header(<<"content-length">>, <<"12">>, Req0), + Req = cowboy_req:stream_reply(200, Req1), + cowboy_req:stream_body(<<"Hello">>, nofin, Req), + {ok, Req, Opts} + end; do(<<"stream_trailers">>, Req0, Opts) -> case cowboy_req:binding(arg, Req0) of <<"large">> -> diff --git a/test/req_SUITE.erl b/test/req_SUITE.erl index 6866123..520bc48 100644 --- a/test/req_SUITE.erl +++ b/test/req_SUITE.erl @@ -146,6 +146,23 @@ do_decode(Headers, Body) -> _ -> Body end. +do_get_error(Path, Config) -> + do_get_error(Path, [], Config). + +do_get_error(Path, Headers, Config) -> + ConnPid = gun_open(Config), + Ref = gun:get(ConnPid, Path, [{<<"accept-encoding">>, <<"gzip">>}|Headers]), + {response, IsFin, Status, RespHeaders} = gun:await(ConnPid, Ref), + Result = case IsFin of + nofin -> gun:await_body(ConnPid, Ref); + fin -> {ok, <<>>} + end, + gun:close(ConnPid), + case Result of + {ok, RespBody} -> {Status, RespHeaders, do_decode(RespHeaders, RespBody)}; + _ -> Result + end. + %% Tests: Request. binding(Config) -> @@ -856,6 +873,38 @@ stream_body_nofin(Config) -> {200, _, <<"Hello world!">>} = do_get("/resp/stream_body/nofin", Config), ok. +stream_body_content_length_multiple(Config) -> + doc("Streamed body via multiple calls."), + {200, _, <<"Hello world!">>} = do_get("/resp/stream_body_content_length/multiple", Config), + ok. + +stream_body_content_length_fin0(Config) -> + doc("Streamed body with last chunk of size 0."), + {200, _, <<"Hello world!">>} = do_get("/resp/stream_body_content_length/fin0", Config), + ok. + +stream_body_content_length_nofin(Config) -> + doc("Unfinished streamed body."), + {200, _, <<"Hello world!">>} = do_get("/resp/stream_body_content_length/nofin", Config), + ok. + +stream_body_content_length_nofin_error(Config) -> + doc("Not all of body sent."), + case config(protocol, Config) of + http -> + case do_get_error("/resp/stream_body_content_length/nofin-error", Config) of + {200, Headers, <<"Hello">>} -> + {_, <<"gzip">>} = lists:keyfind(<<"content-encoding">>, 1, Headers); + {error, {closed, "The connection was lost."}} -> + ok; + {error, timeout} -> + ok + end; + http2 -> + %% @todo HTTP2 should have the same content-length checks + ok + end. + %% @todo Crash when calling stream_body after the fin flag has been set. %% @todo Crash when calling stream_body after calling reply. %% @todo Crash when calling stream_body before calling stream_reply. -- cgit v1.2.3