From 92672b49aff04fb129bdf488448c074eeccfdb27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 20 Nov 2017 15:46:23 +0100 Subject: Queue HTTP/2 trailers when there's still data in the buffer --- src/cowboy_http2.erl | 22 ++++++++++++++++------ test/handlers/resp_h.erl | 9 +++++++++ test/req_SUITE.erl | 8 ++++++++ 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 0f48e0e..39a36f7 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -46,6 +46,7 @@ {cowboy_stream:fin(), non_neg_integer(), iolist() | {sendfile, non_neg_integer(), pos_integer(), file:name_all()}}), local_buffer_size = 0 :: non_neg_integer(), + local_trailers = undefined :: undefined | cowboy:http_headers(), %% Whether we finished receiving data. remote = nofin :: cowboy_stream:fin(), %% Remote flow control window (how much we accept to receive). @@ -663,6 +664,9 @@ resume_streams(State0, [Stream0|Tail], Acc) -> resume_streams(State1, Tail, [Stream|Acc]) end. +send_data(State, Stream=#stream{local=Local, local_buffer_size=0, local_trailers=Trailers}) + when (Trailers =/= undefined) andalso ((Local =:= idle) orelse (Local =:= nofin)) -> + send_trailers(State, Stream#stream{local_trailers=undefined}, Trailers); %% @todo We might want to print an error if local=fin. %% %% @todo It's possible that the stream terminates. We must remove it. @@ -681,12 +685,12 @@ send_data(State0, Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize} send_data(State, Stream, IsFin, Data) -> send_data(State, Stream, IsFin, Data, in). -%% Always send trailer frames even if the window is empty. -send_data(State=#state{socket=Socket, transport=Transport, encode_state=EncodeState0}, - Stream=#stream{id=StreamID}, fin, {trailers, Trailers}, _) -> - {HeaderBlock, EncodeState} = headers_encode(Trailers, EncodeState0), - Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), - {State#state{encode_state=EncodeState}, Stream#stream{local=fin}}; +%% We can send trailers immediately if the queue is empty, otherwise we queue. +%% We always send trailer frames even if the window is empty. +send_data(State, Stream=#stream{local_buffer_size=0}, fin, {trailers, Trailers}, _) -> + send_trailers(State, Stream, Trailers); +send_data(State, Stream, fin, {trailers, Trailers}, _) -> + {State, Stream#stream{local_trailers=Trailers}}; %% Send data immediately if we can, buffer otherwise. %% @todo We might want to print an error if local=fin. send_data(State=#state{local_window=ConnWindow}, @@ -725,6 +729,12 @@ send_data(State=#state{socket=Socket, transport=Transport, local_window=ConnWind end end. +send_trailers(State=#state{socket=Socket, transport=Transport, encode_state=EncodeState0}, + Stream=#stream{id=StreamID}, Trailers) -> + {HeaderBlock, EncodeState} = headers_encode(Trailers, EncodeState0), + Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), + {State#state{encode_state=EncodeState}, Stream#stream{local=fin}}. + queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) -> DataSize = case Data of {sendfile, _, Bytes, _} -> Bytes; diff --git a/test/handlers/resp_h.erl b/test/handlers/resp_h.erl index 9dae612..ba46213 100644 --- a/test/handlers/resp_h.erl +++ b/test/handlers/resp_h.erl @@ -212,6 +212,15 @@ do(<<"stream_body">>, Req0, Opts) -> end; do(<<"stream_trailers">>, Req0, Opts) -> case cowboy_req:binding(arg, Req0) of + <<"large">> -> + Req = cowboy_req:stream_reply(200, #{ + <<"trailer">> => <<"grpc-status">> + }, Req0), + cowboy_req:stream_body(<<0:800000>>, nofin, Req), + cowboy_req:stream_trailers(#{ + <<"grpc-status">> => <<"0">> + }, Req), + {ok, Req, Opts}; _ -> Req = cowboy_req:stream_reply(200, #{ <<"trailer">> => <<"grpc-status">> diff --git a/test/req_SUITE.erl b/test/req_SUITE.erl index f900067..9fa73e5 100644 --- a/test/req_SUITE.erl +++ b/test/req_SUITE.erl @@ -854,6 +854,14 @@ stream_trailers(Config) -> {_, <<"grpc-status">>} = lists:keyfind(<<"trailer">>, 1, RespHeaders), ok. +stream_trailers_large(Config) -> + doc("Stream large body followed by trailer headers."), + {200, RespHeaders, <<0:800000>>, [ + {<<"grpc-status">>, <<"0">>} + ]} = do_trailers("/resp/stream_trailers/large", Config), + {_, <<"grpc-status">>} = lists:keyfind(<<"trailer">>, 1, RespHeaders), + ok. + stream_trailers_no_te(Config) -> doc("Stream body followed by trailer headers without a te header in the request."), ConnPid = gun_open(Config), -- cgit v1.2.3