From 32db544782f2528ed0916eecb200f75924dcc407 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 27 Apr 2017 15:23:57 +0200 Subject: Add content handlers and built-in SSE support Content handlers are a chain of modules implementing callbacks that receive the body of responses and may modify it (for example for decompressing the content) or act upon it (like sending a message to the owner process. The gun_sse content handler module can be used to translate text/event-stream events on the fly and deliver them to the owner process as a {gun_sse...} message. This feature is currently not documented and is only tested against a public server. It requires an up to date Cowlib. --- src/gun_http2.erl | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) (limited to 'src/gun_http2.erl') diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 8f9bae7..30ee06e 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -32,13 +32,16 @@ %% Whether we finished sending data. local = nofin :: cowboy_stream:fin(), %% Whether we finished receiving data. - remote = nofin :: cowboy_stream:fin() + remote = nofin :: cowboy_stream:fin(), + %% Content handlers state. + handler_state :: undefined | gun_content_handler:state() }). -record(http2_state, { owner :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), + content_handlers :: gun_content_handler:opt(), buffer = <<>> :: binary(), %% @todo local_settings, next_settings, remote_settings @@ -58,18 +61,25 @@ do_check_options([]) -> ok; do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> do_check_options(Opts); +do_check_options([Opt={content_handlers, Handlers}|Opts]) -> + case gun_content_handler:check_option(Handlers) of + ok -> do_check_options(Opts); + error -> {error, {options, {http, Opt}}} + end; do_check_options([Opt|_]) -> {error, {options, {http2, Opt}}}. name() -> http2. -init(Owner, Socket, Transport, _Opts) -> +init(Owner, Socket, Transport, Opts) -> %% Send the HTTP/2 preface. Transport:send(Socket, [ << "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>, cow_http2:settings(#{}) %% @todo Settings. ]), - #http2_state{owner=Owner, socket=Socket, transport=Transport}. + Handlers = maps:get(content_handlers, Opts, [gun_data]), + #http2_state{owner=Owner, socket=Socket, transport=Transport, + content_handlers=Handlers}. handle(Data, State=#http2_state{buffer=Buffer}) -> parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}). @@ -89,11 +99,11 @@ parse(Data0, State=#http2_state{buffer=Buffer}) -> end. %% DATA frame. -frame({data, StreamID, IsFin, Data}, State=#http2_state{owner=Owner}) -> +frame({data, StreamID, IsFin, Data}, State) -> case get_stream_by_id(StreamID, State) of - Stream = #stream{ref=StreamRef, remote=nofin} -> - Owner ! {gun_data, self(), StreamRef, IsFin, Data}, - remote_fin(Stream, State, IsFin); + Stream = #stream{remote=nofin, handler_state=Handlers0} -> + Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), + remote_fin(Stream#stream{handler_state=Handlers}, State, IsFin); _ -> %% @todo protocol_error if not existing stream_reset(State, StreamID, {stream_error, stream_closed, @@ -101,7 +111,7 @@ frame({data, StreamID, IsFin, Data}, State=#http2_state{owner=Owner}) -> end; %% Single HEADERS frame headers block. frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, - State=#http2_state{owner=Owner, decode_state=DecodeState0}) -> + State=#http2_state{owner=Owner, decode_state=DecodeState0, content_handlers=Handlers0}) -> case get_stream_by_id(StreamID, State) of Stream = #stream{ref=StreamRef, remote=nofin} -> try cow_hpack:decode(HeaderBlock, DecodeState0) of @@ -109,7 +119,15 @@ frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, case lists:keytake(<<":status">>, 1, Headers0) of {value, {_, Status}, Headers} -> Owner ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers}, - remote_fin(Stream, State#http2_state{decode_state=DecodeState}, IsFin); + %% @todo Change to ReplyTo. + Handlers = case IsFin of + fin -> undefined; + nofin -> + gun_content_handler:init(Owner, StreamRef, + Status, Headers, Handlers0) + end, + remote_fin(Stream#stream{handler_state=Handlers}, + State#http2_state{decode_state=DecodeState}, IsFin); false -> stream_reset(State, StreamID, {stream_error, protocol_error, 'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'}) @@ -368,10 +386,10 @@ delete_stream(StreamID, State=#http2_state{streams=Streams}) -> Streams2 = lists:keydelete(StreamID, #stream.id, Streams), State#http2_state{streams=Streams2}. -remote_fin(_, State, nofin) -> - State; remote_fin(S=#stream{local=fin}, State, fin) -> delete_stream(S#stream.id, State); +%% We always replace the stream in the state because +%% the content handler state has changed. remote_fin(S, State=#http2_state{streams=Streams}, IsFin) -> Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams, S#stream{remote=IsFin}), -- cgit v1.2.3