From 39baed6c800fa9e756f7491063ead399a23083f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 15 Nov 2017 14:58:49 +0100 Subject: Add preliminary support for trailers in responses This depends on changes in Cowlib that are only available on master. --- Makefile | 2 +- src/cowboy_http.erl | 37 ++++++++++++++++++++++++++++++++++++- src/cowboy_http2.erl | 34 +++++++++++++++++++++++++++++----- src/cowboy_req.erl | 6 ++++++ src/cowboy_stream_h.erl | 2 ++ test/handlers/resp_h.erl | 12 ++++++++++++ test/req_SUITE.erl | 34 ++++++++++++++++++++++++++++++++++ 7 files changed, 120 insertions(+), 7 deletions(-) diff --git a/Makefile b/Makefile index 4d6cf27..74843f5 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,7 @@ CT_OPTS += -ct_hooks cowboy_ct_hook [] # -boot start_sasl LOCAL_DEPS = crypto DEPS = cowlib ranch -dep_cowlib = git https://github.com/ninenines/cowlib 2.0.1 +dep_cowlib = git https://github.com/ninenines/cowlib master dep_ranch = git https://github.com/ninenines/ranch 1.4.0 DOC_DEPS = asciideck diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index e287edc..62454ac 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -69,6 +69,8 @@ state = undefined :: {module(), any()}, %% Client HTTP version for this stream. version = undefined :: cowboy:http_version(), + %% Unparsed te header. Used to know if we can send trailers. + te :: undefined | binary(), %% Commands queued. queue = [] :: cowboy_stream:commands() }). @@ -267,7 +269,9 @@ after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := State0=#state{opts=Opts, streams=Streams0}, Buffer}) -> try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> - Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0], + TE = maps:get(<<"te">>, Headers, undefined), + Streams = [#stream{id=StreamID, state=StreamState, + version=Version, te=TE}|Streams0], State1 = case maybe_req_close(State0, Headers, Version) of close -> State0#state{streams=Streams, last_streamid=StreamID}; keepalive -> State0#state{streams=Streams} @@ -900,6 +904,37 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, Str nofin -> State0 end, commands(State, StreamID, Tail); +%% Send trailers. +commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, + [{trailers, Trailers}|Tail]) -> + TE = case lists:keyfind(StreamID, #stream.id, Streams) of + %% HTTP/1.0 doesn't support chunked transfer-encoding. + #stream{version='HTTP/1.0'} -> + not_chunked; + %% No TE header was sent. + #stream{te=undefined} -> + no_trailers; + #stream{te=TE0} -> + try cow_http_hd:parse_te(TE0) of + {TE1, _} -> TE1 + catch _:_ -> + %% If we can't parse the TE header, assume we can't send trailers. + no_trailers + end + end, + case TE of + trailers -> + Transport:send(Socket, [ + <<"0\r\n">>, + cow_http:headers(maps:to_list(Trailers)), + <<"\r\n">> + ]); + no_trailers -> + Transport:send(Socket, <<"0\r\n\r\n">>); + not_chunked -> + ok + end, + commands(State#state{out_state=done}, StreamID, Tail); %% Send a file. commands(State0=#state{socket=Socket, transport=Transport}, StreamID, [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) -> diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 9e81957..0f48e0e 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -49,7 +49,9 @@ %% Whether we finished receiving data. remote = nofin :: cowboy_stream:fin(), %% Remote flow control window (how much we accept to receive). - remote_window :: integer() + remote_window :: integer(), + %% Unparsed te header. Used to know if we can send trailers. + te :: undefined | binary() }). -type stream() :: #stream{}. @@ -537,9 +539,24 @@ commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeSta commands(State0, Stream0=#stream{local=nofin}, [{data, IsFin, Data}|Tail]) -> {State, Stream} = send_data(State0, Stream0, IsFin, Data), commands(State, Stream, Tail); - %% @todo data when local!=nofin - +%% Send trailers. +commands(State0, Stream0=#stream{local=nofin, te=TE0}, [{trailers, Trailers}|Tail]) -> + %% We only accept TE headers containing exactly "trailers" (RFC7540 8.1.2.1). + TE = try cow_http_hd:parse_te(TE0) of + {trailers, []} -> trailers; + _ -> no_trailers + catch _:_ -> + %% If we can't parse the TE header, assume we can't send trailers. + no_trailers + end, + {State, Stream} = case TE of + trailers -> + send_data(State0, Stream0, fin, {trailers, Trailers}); + no_trailers -> + send_data(State0, Stream0, fin, <<>>) + end, + commands(State, Stream, Tail); %% Send a file. commands(State0, Stream0=#stream{local=nofin}, [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) -> @@ -664,6 +681,12 @@ send_data(State0, Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize} send_data(State, Stream, IsFin, Data) -> send_data(State, Stream, IsFin, Data, in). +%% Always send trailer frames even if the window is empty. +send_data(State=#state{socket=Socket, transport=Transport, encode_state=EncodeState0}, + Stream=#stream{id=StreamID}, fin, {trailers, Trailers}, _) -> + {HeaderBlock, EncodeState} = headers_encode(Trailers, EncodeState0), + Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), + {State#state{encode_state=EncodeState}, Stream#stream{local=fin}}; %% Send data immediately if we can, buffer otherwise. %% @todo We might want to print an error if local=fin. send_data(State=#state{local_window=ConnWindow}, @@ -800,13 +823,14 @@ stream_req_init(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert}, stream_handler_init(State=#state{opts=Opts, local_settings=#{initial_window_size := RemoteWindow}, remote_settings=#{initial_window_size := LocalWindow}}, - StreamID, RemoteIsFin, LocalIsFin, Req) -> + StreamID, RemoteIsFin, LocalIsFin, Req=#{headers := Headers}) -> try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> commands(State#state{client_streamid=StreamID}, #stream{id=StreamID, state=StreamState, remote=RemoteIsFin, local=LocalIsFin, - local_window=LocalWindow, remote_window=RemoteWindow}, + local_window=LocalWindow, remote_window=RemoteWindow, + te=maps:get(<<"te">>, Headers, undefined)}, Commands) catch Class:Exception -> cowboy_stream:report_error(init, diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 253564d..da3a0a4 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -81,6 +81,7 @@ %% @todo stream_body/2 (nofin) -export([stream_body/3]). %% @todo stream_event/2,3 +-export([stream_trailers/2]). -export([push/3]). -export([push/4]). @@ -774,6 +775,11 @@ stream_body(Data, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := he Pid ! {{Pid, StreamID}, {data, IsFin, Data}}, ok. +-spec stream_trailers(cowboy:http_headers(), req()) -> ok. +stream_trailers(Trailers, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) -> + Pid ! {{Pid, StreamID}, {trailers, Trailers}}, + ok. + -spec push(binary(), cowboy:http_headers(), req()) -> ok. push(Path, Headers, Req) -> push(Path, Headers, Req, #{}). diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index 8cbdbb0..93b8417 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -166,6 +166,8 @@ info(_StreamID, Headers = {headers, _, _}, State) -> {[Headers], State#state{expect=undefined}}; info(_StreamID, Data = {data, _, _}, State) -> {[Data], State}; +info(_StreamID, Trailers = {trailers, _}, State) -> + {[Trailers], State}; info(_StreamID, Push = {push, _, _, _, _, _, _, _}, State) -> {[Push], State}; info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) -> diff --git a/test/handlers/resp_h.erl b/test/handlers/resp_h.erl index add90ad..19686ff 100644 --- a/test/handlers/resp_h.erl +++ b/test/handlers/resp_h.erl @@ -204,6 +204,18 @@ do(<<"stream_body">>, Req0, Opts) -> cowboy_req:stream_body(<<0:800000>>, fin, Req0), {ok, Req0, Opts} end; +do(<<"stream_trailers">>, Req0, Opts) -> + case cowboy_req:binding(arg, Req0) of + _ -> + Req = cowboy_req:stream_reply(200, #{ + <<"trailer">> => <<"grpc-status">> + }, Req0), + cowboy_req:stream_body(<<"Hello world!">>, nofin, Req), + cowboy_req:stream_trailers(#{ + <<"grpc-status">> => <<"0">> + }, Req), + {ok, Req, Opts} + end; do(<<"push">>, Req, Opts) -> case cowboy_req:binding(arg, Req) of <<"method">> -> diff --git a/test/req_SUITE.erl b/test/req_SUITE.erl index 862ee53..4c6e2f8 100644 --- a/test/req_SUITE.erl +++ b/test/req_SUITE.erl @@ -841,6 +841,40 @@ stream_body_nofin(Config) -> %% @todo Crash when calling stream_body after calling reply. %% @todo Crash when calling stream_body before calling stream_reply. +stream_trailers(Config) -> + doc("Stream body followed by trailer headers."), + {200, RespHeaders, <<"Hello world!">>, [ + {<<"grpc-status">>, <<"0">>} + ]} = do_trailers("/resp/stream_trailers", Config), + {_, <<"grpc-status">>} = lists:keyfind(<<"trailer">>, 1, RespHeaders), + ok. + +stream_trailers_no_te(Config) -> + doc("Stream body followed by trailer headers."), + ConnPid = gun_open(Config), + Ref = gun:get(ConnPid, "/resp/stream_trailers", [ + {<<"accept-encoding">>, <<"gzip">>} + ]), + {response, nofin, 200, RespHeaders} = gun:await(ConnPid, Ref), + {ok, RespBody} = gun:await_body(ConnPid, Ref), + gun:close(ConnPid). + +do_trailers(Path, Config) -> + ConnPid = gun_open(Config), + Ref = gun:get(ConnPid, Path, [ + {<<"accept-encoding">>, <<"gzip">>}, + {<<"te">>, <<"trailers">>} + ]), + {response, nofin, Status, RespHeaders} = gun:await(ConnPid, Ref), + {ok, RespBody, Trailers} = gun:await_body(ConnPid, Ref), + gun:close(ConnPid), + {Status, RespHeaders, do_decode(RespHeaders, RespBody), Trailers}. + +%% @todo Crash when calling stream_trailers twice. +%% @todo Crash when calling stream_trailers after the fin flag has been set. +%% @todo Crash when calling stream_trailers after calling reply. +%% @todo Crash when calling stream_trailers before calling stream_reply. + %% Tests: Push. %% @todo We want to crash when push is called after reply has been initiated. -- cgit v1.2.3