From 32db544782f2528ed0916eecb200f75924dcc407 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 27 Apr 2017 15:23:57 +0200 Subject: Add content handlers and built-in SSE support Content handlers are a chain of modules implementing callbacks that receive the body of responses and may modify it (for example for decompressing the content) or act upon it (like sending a message to the owner process. The gun_sse content handler module can be used to translate text/event-stream events on the fly and deliver them to the owner process as a {gun_sse...} message. This feature is currently not documented and is only tested against a public server. It requires an up to date Cowlib. --- ebin/gun.app | 2 +- src/gun.erl | 4 +- src/gun_content_handler.erl | 69 +++++++++++++++++++++++++++ src/gun_data.erl | 33 +++++++++++++ src/gun_http.erl | 111 ++++++++++++++++++++++++++++---------------- src/gun_http2.erl | 40 +++++++++++----- src/gun_sse.erl | 49 +++++++++++++++++++ test/sse_SUITE.erl | 59 +++++++++++++++++++++++ 8 files changed, 312 insertions(+), 55 deletions(-) create mode 100644 src/gun_content_handler.erl create mode 100644 src/gun_data.erl create mode 100644 src/gun_sse.erl create mode 100644 test/sse_SUITE.erl diff --git a/ebin/gun.app b/ebin/gun.app index cd61220..eefe4cc 100644 --- a/ebin/gun.app +++ b/ebin/gun.app @@ -1,7 +1,7 @@ {application, gun, [ {description, "HTTP/1.1, HTTP/2 and Websocket client for Erlang/OTP."}, {vsn, "1.0.0-pre.2"}, - {modules, ['gun','gun_app','gun_http','gun_http2','gun_spdy','gun_sup','gun_ws']}, + {modules, ['gun','gun_app','gun_content_handler','gun_data','gun_http','gun_http2','gun_spdy','gun_sse','gun_sup','gun_ws']}, {registered, [gun_sup]}, {applications, [kernel,stdlib,ssl,cowlib,ranch]}, {mod, {gun_app, []}}, diff --git a/src/gun.erl b/src/gun.erl index eeaaa1e..3edc698 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -102,8 +102,8 @@ host :: inet:hostname(), port :: inet:port_number(), opts :: opts(), - keepalive_ref :: reference(), - socket :: inet:socket() | ssl:sslsocket(), + keepalive_ref :: undefined | reference(), + socket :: undefined | inet:socket() | ssl:sslsocket(), transport :: module(), protocol :: module(), protocol_state :: any() diff --git a/src/gun_content_handler.erl b/src/gun_content_handler.erl new file mode 100644 index 0000000..b268f56 --- /dev/null +++ b/src/gun_content_handler.erl @@ -0,0 +1,69 @@ +%% Copyright (c) 2017, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_content_handler). + +-export([init/5]). +-export([handle/3]). +-export([check_option/1]). + +-type opt() :: [module() | {module(), map()}]. +-export_type([opt/0]). + +-type state() :: opt() | [{module(), any()}]. +-export_type([state/0]). + +-callback init(pid(), reference(), cow_http:status(), + cow_http:headers(), map()) -> {ok, any()} | disable. +%% @todo Make fin | nofin its own type. +-callback handle(fin | nofin, any(), State) + -> {ok, any(), State} | {done, State} when State::any(). + +-spec init(pid(), reference(), cow_http:status(), + cow_http:headers(), State) -> State when State::state(). +init(_, _, _, _, []) -> + []; +init(ReplyTo, Owner, Status, Headers, [Handler|Tail]) -> + {Mod, Opts} = case Handler of + Tuple = {_, _} -> Tuple; + Atom -> {Atom, #{}} + end, + case Mod:init(ReplyTo, Owner, Status, Headers, Opts) of + {ok, State} -> [{Mod, State}|init(ReplyTo, Owner, Status, Headers, Tail)]; + disable -> init(ReplyTo, Owner, Status, Headers, Tail) + end. + +-spec handle(fin | nofin, any(), State) -> State when State::state(). +handle(_, _, []) -> + []; +handle(IsFin, Data0, [{Mod, State0}|Tail]) -> + case Mod:handle(IsFin, Data0, State0) of + {ok, Data, State} -> [{Mod, State}|handle(IsFin, Data, Tail)]; + {done, State} -> [{Mod, State}|Tail] + end. + +-spec check_option(list()) -> ok | error. +check_option([]) -> + error; +check_option(Opt) -> + check_option1(Opt). + +check_option1([]) -> + ok; +check_option1([Atom|Tail]) when is_atom(Atom) -> + check_option1(Tail); +check_option1([{Atom, #{}}|Tail]) when is_atom(Atom) -> + check_option1(Tail); +check_option1(_) -> + error. diff --git a/src/gun_data.erl b/src/gun_data.erl new file mode 100644 index 0000000..e70c25e --- /dev/null +++ b/src/gun_data.erl @@ -0,0 +1,33 @@ +%% Copyright (c) 2017, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_data). +-behavior(gun_content_handler). + +-export([init/5]). +-export([handle/3]). + +-record(state, { + reply_to :: pid(), + stream_ref :: reference() +}). + +-spec init(pid(), reference(), _, _, _) -> {ok, #state{}}. +init(ReplyTo, StreamRef, _, _, _) -> + {ok, #state{reply_to=ReplyTo, stream_ref=StreamRef}}. + +-spec handle(fin | nofin, binary(), State) -> {done, State} when State::#state{}. +handle(IsFin, Data, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) -> + ReplyTo ! {gun_data, self(), StreamRef, IsFin, Data}, + {done, State}. diff --git a/src/gun_http.erl b/src/gun_http.erl index 65135f9..f5eef71 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -31,17 +31,25 @@ -type websocket_info() :: {websocket, reference(), binary(), [binary()], [], gun:ws_opts()}. %% key, extensions, protocols, options +-record(stream, { + ref :: reference() | websocket_info(), + method :: binary(), + is_alive :: boolean(), + handler_state :: undefined | gun_content_handler:state() +}). + -record(http_state, { owner :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), version = 'HTTP/1.1' :: cow_http:version(), + content_handlers :: gun_content_handler:opt(), connection = keepalive :: keepalive | close, buffer = <<>> :: binary(), %% Stream reference, request method and whether the stream is alive. - streams = [] :: [{reference() | websocket_info(), binary(), boolean()}], + streams = [] :: [#stream{}], in = head :: io(), - in_state :: {non_neg_integer(), non_neg_integer()}, + in_state = {0, 0} :: {non_neg_integer(), non_neg_integer()}, out = head :: io() }). @@ -54,6 +62,11 @@ do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> do_check_options(Opts); do_check_options([{version, V}|Opts]) when V =:= 'HTTP/1.1'; V =:= 'HTTP/1.0' -> do_check_options(Opts); +do_check_options([Opt={content_handlers, Handlers}|Opts]) -> + case gun_content_handler:check_option(Handlers) of + ok -> do_check_options(Opts); + error -> {error, {options, {http, Opt}}} + end; do_check_options([Opt|_]) -> {error, {options, {http, Opt}}}. @@ -61,7 +74,9 @@ name() -> http. init(Owner, Socket, Transport, Opts) -> Version = maps:get(version, Opts, 'HTTP/1.1'), - #http_state{owner=Owner, socket=Socket, transport=Transport, version=Version}. + Handlers = maps:get(content_handlers, Opts, [gun_data]), + #http_state{owner=Owner, socket=Socket, transport=Transport, version=Version, + content_handlers=Handlers}. %% Stop looping when we got no more data. handle(<<>>, State) -> @@ -78,8 +93,7 @@ handle(Data, State=#http_state{in=head, buffer=Buffer}) -> end; %% Everything sent to the socket until it closes is part of the response body. handle(Data, State=#http_state{in=body_close}) -> - send_data_if_alive(Data, State, nofin), - State; + send_data_if_alive(Data, State, nofin); handle(Data, State=#http_state{in=body_chunked, in_state=InState, buffer=Buffer, connection=Conn}) -> Buffer2 = << Buffer/binary, Data/binary >>, @@ -87,30 +101,33 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, more -> State#http_state{buffer=Buffer2}; {more, Data2, InState2} -> - send_data_if_alive(Data2, State, nofin), - State#http_state{buffer= <<>>, in_state=InState2}; + send_data_if_alive(Data2, + State#http_state{buffer= <<>>, in_state=InState2}, + nofin); {more, Data2, Length, InState2} when is_integer(Length) -> %% @todo See if we can recv faster than one message at a time. - send_data_if_alive(Data2, State, nofin), - State#http_state{buffer= <<>>, in_state=InState2}; + send_data_if_alive(Data2, + State#http_state{buffer= <<>>, in_state=InState2}, + nofin); {more, Data2, Rest, InState2} -> %% @todo See if we can recv faster than one message at a time. - send_data_if_alive(Data2, State, nofin), - State#http_state{buffer=Rest, in_state=InState2}; + send_data_if_alive(Data2, + State#http_state{buffer=Rest, in_state=InState2}, + nofin); {done, _TotalLength, Rest} -> %% I suppose it doesn't hurt to append an empty binary. - send_data_if_alive(<<>>, State, fin), + State1 = send_data_if_alive(<<>>, State, fin), case Conn of keepalive -> - handle(Rest, end_stream(State#http_state{buffer= <<>>})); + handle(Rest, end_stream(State1#http_state{buffer= <<>>})); close -> close end; {done, Data2, _TotalLength, Rest} -> - send_data_if_alive(Data2, State, fin), + State1 = send_data_if_alive(Data2, State, fin), case Conn of keepalive -> - handle(Rest, end_stream(State#http_state{buffer= <<>>})); + handle(Rest, end_stream(State1#http_state{buffer= <<>>})); close -> close end @@ -121,27 +138,29 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) -> if %% More data coming. DataSize < Length -> - send_data_if_alive(Data, State, nofin), - State#http_state{in={body, Length - DataSize}}; + send_data_if_alive(Data, + State#http_state{in={body, Length - DataSize}}, + nofin); %% Stream finished, no rest. DataSize =:= Length -> - send_data_if_alive(Data, State, fin), + State1 = send_data_if_alive(Data, State, fin), case Conn of - keepalive -> end_stream(State); + keepalive -> end_stream(State1); close -> close end; %% Stream finished, rest. true -> << Body:Length/binary, Rest/bits >> = Data, - send_data_if_alive(Body, State, fin), + State1 = send_data_if_alive(Body, State, fin), case Conn of - keepalive -> handle(Rest, end_stream(State)); + keepalive -> handle(Rest, end_stream(State1)); close -> close end end. handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion, - connection=Conn, streams=[{StreamRef, Method, IsAlive}|_]}) -> + content_handlers=Handlers0, connection=Conn, + streams=[Stream=#stream{ref=StreamRef, method=Method, is_alive=IsAlive}|Tail]}) -> {Version, Status, _, Rest} = cow_http:parse_status_line(Data), {Headers, Rest2} = cow_http:parse_headers(Rest), case {Status, StreamRef} of @@ -150,7 +169,7 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion, _ -> In = response_io_from_headers(Method, Version, Status, Headers), IsFin = case In of head -> fin; _ -> nofin end, - case IsAlive of + Handlers = case IsAlive of false -> ok; true -> @@ -160,7 +179,13 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion, end, Owner ! {gun_response, self(), StreamRef2, IsFin, Status, Headers}, - ok + %% @todo Change to ReplyTo. + case IsFin of + fin -> undefined; + nofin -> + gun_content_handler:init(Owner, StreamRef, + Status, Headers, Handlers0) + end end, Conn2 = if Conn =:= close -> close; @@ -174,33 +199,36 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion, close; IsFin =:= fin -> handle(Rest2, end_stream(State#http_state{in=In, - in_state={0, 0}, connection=Conn2})); + in_state={0, 0}, connection=Conn2, + streams=[Stream#stream{handler_state=Handlers}|Tail]})); true -> - handle(Rest2, State#http_state{in=In, in_state={0, 0}, connection=Conn2}) + handle(Rest2, State#http_state{in=In, + in_state={0, 0}, connection=Conn2, + streams=[Stream#stream{handler_state=Handlers}|Tail]}) end end. -send_data_if_alive(<<>>, _, nofin) -> - ok; +send_data_if_alive(<<>>, State, nofin) -> + State; %% @todo What if we receive data when the HEAD method was used? -send_data_if_alive(Data, #http_state{owner=Owner, - streams=[{StreamRef, _, true}|_]}, IsFin) -> - Owner ! {gun_data, self(), StreamRef, IsFin, Data}, - ok; -send_data_if_alive(_, _, _) -> - ok. +send_data_if_alive(Data, State=#http_state{streams=[Stream=#stream{ + is_alive=true, handler_state=Handlers0}|Tail]}, IsFin) -> + Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), + State#http_state{streams=[Stream#stream{handler_state=Handlers}|Tail]}; +send_data_if_alive(_, State, _) -> + State. close(State=#http_state{in=body_close, owner=Owner, streams=[_|Tail]}) -> - send_data_if_alive(<<>>, State, fin), + _ = send_data_if_alive(<<>>, State, fin), close_streams(Owner, Tail); close(#http_state{owner=Owner, streams=Streams}) -> close_streams(Owner, Streams). close_streams(_, []) -> ok; -close_streams(Owner, [{_, _, false}|Tail]) -> +close_streams(Owner, [#stream{is_alive=false}|Tail]) -> close_streams(Owner, Tail); -close_streams(Owner, [{StreamRef, _, _}|Tail]) -> +close_streams(Owner, [#stream{ref=StreamRef}|Tail]) -> Owner ! {gun_error, self(), StreamRef, {closed, "The connection was lost."}}, close_streams(Owner, Tail). @@ -256,7 +284,7 @@ data(State=#http_state{streams=[]}, StreamRef, _, _) -> data(State=#http_state{socket=Socket, transport=Transport, version=Version, out=Out, streams=Streams}, StreamRef, IsFin, Data) -> case lists:last(Streams) of - {StreamRef, _, true} -> + #stream{ref=StreamRef, is_alive=true} -> DataLength = iolist_size(Data), case Out of body_chunked when Version =:= 'HTTP/1.1', IsFin =:= fin -> @@ -304,7 +332,7 @@ down(#http_state{streams=Streams}) -> KilledStreams = [case Ref of {websocket, Ref2, _, _, _, _} -> Ref2; _ -> Ref - end || {Ref, _, _} <- Streams], + end || #stream{ref=Ref} <- Streams], {KilledStreams, []}. error_stream_closed(State=#http_state{owner=Owner}, StreamRef) -> @@ -373,7 +401,8 @@ response_io_from_headers(_, Version, _Status, Headers) -> %% Streams. new_stream(State=#http_state{streams=Streams}, StreamRef, Method) -> - State#http_state{streams=Streams ++ [{StreamRef, iolist_to_binary(Method), true}]}. + State#http_state{streams=Streams + ++ [#stream{ref=StreamRef, method=iolist_to_binary(Method), is_alive=true}]}. is_stream(#http_state{streams=Streams}, StreamRef) -> lists:keymember(StreamRef, 1, Streams). @@ -384,7 +413,7 @@ cancel_stream(State=#http_state{streams=Streams}, StreamRef) -> {Ref, false}; _ -> Tuple - end || Tuple = {Ref, _, _} <- Streams], + end || Tuple = #stream{ref=Ref} <- Streams], State#http_state{streams=Streams2}. end_stream(State=#http_state{streams=[_|Tail]}) -> diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 8f9bae7..30ee06e 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -32,13 +32,16 @@ %% Whether we finished sending data. local = nofin :: cowboy_stream:fin(), %% Whether we finished receiving data. - remote = nofin :: cowboy_stream:fin() + remote = nofin :: cowboy_stream:fin(), + %% Content handlers state. + handler_state :: undefined | gun_content_handler:state() }). -record(http2_state, { owner :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), + content_handlers :: gun_content_handler:opt(), buffer = <<>> :: binary(), %% @todo local_settings, next_settings, remote_settings @@ -58,18 +61,25 @@ do_check_options([]) -> ok; do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> do_check_options(Opts); +do_check_options([Opt={content_handlers, Handlers}|Opts]) -> + case gun_content_handler:check_option(Handlers) of + ok -> do_check_options(Opts); + error -> {error, {options, {http, Opt}}} + end; do_check_options([Opt|_]) -> {error, {options, {http2, Opt}}}. name() -> http2. -init(Owner, Socket, Transport, _Opts) -> +init(Owner, Socket, Transport, Opts) -> %% Send the HTTP/2 preface. Transport:send(Socket, [ << "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>, cow_http2:settings(#{}) %% @todo Settings. ]), - #http2_state{owner=Owner, socket=Socket, transport=Transport}. + Handlers = maps:get(content_handlers, Opts, [gun_data]), + #http2_state{owner=Owner, socket=Socket, transport=Transport, + content_handlers=Handlers}. handle(Data, State=#http2_state{buffer=Buffer}) -> parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}). @@ -89,11 +99,11 @@ parse(Data0, State=#http2_state{buffer=Buffer}) -> end. %% DATA frame. -frame({data, StreamID, IsFin, Data}, State=#http2_state{owner=Owner}) -> +frame({data, StreamID, IsFin, Data}, State) -> case get_stream_by_id(StreamID, State) of - Stream = #stream{ref=StreamRef, remote=nofin} -> - Owner ! {gun_data, self(), StreamRef, IsFin, Data}, - remote_fin(Stream, State, IsFin); + Stream = #stream{remote=nofin, handler_state=Handlers0} -> + Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), + remote_fin(Stream#stream{handler_state=Handlers}, State, IsFin); _ -> %% @todo protocol_error if not existing stream_reset(State, StreamID, {stream_error, stream_closed, @@ -101,7 +111,7 @@ frame({data, StreamID, IsFin, Data}, State=#http2_state{owner=Owner}) -> end; %% Single HEADERS frame headers block. frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, - State=#http2_state{owner=Owner, decode_state=DecodeState0}) -> + State=#http2_state{owner=Owner, decode_state=DecodeState0, content_handlers=Handlers0}) -> case get_stream_by_id(StreamID, State) of Stream = #stream{ref=StreamRef, remote=nofin} -> try cow_hpack:decode(HeaderBlock, DecodeState0) of @@ -109,7 +119,15 @@ frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, case lists:keytake(<<":status">>, 1, Headers0) of {value, {_, Status}, Headers} -> Owner ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers}, - remote_fin(Stream, State#http2_state{decode_state=DecodeState}, IsFin); + %% @todo Change to ReplyTo. + Handlers = case IsFin of + fin -> undefined; + nofin -> + gun_content_handler:init(Owner, StreamRef, + Status, Headers, Handlers0) + end, + remote_fin(Stream#stream{handler_state=Handlers}, + State#http2_state{decode_state=DecodeState}, IsFin); false -> stream_reset(State, StreamID, {stream_error, protocol_error, 'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'}) @@ -368,10 +386,10 @@ delete_stream(StreamID, State=#http2_state{streams=Streams}) -> Streams2 = lists:keydelete(StreamID, #stream.id, Streams), State#http2_state{streams=Streams2}. -remote_fin(_, State, nofin) -> - State; remote_fin(S=#stream{local=fin}, State, fin) -> delete_stream(S#stream.id, State); +%% We always replace the stream in the state because +%% the content handler state has changed. remote_fin(S, State=#http2_state{streams=Streams}, IsFin) -> Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams, S#stream{remote=IsFin}), diff --git a/src/gun_sse.erl b/src/gun_sse.erl new file mode 100644 index 0000000..4fe7419 --- /dev/null +++ b/src/gun_sse.erl @@ -0,0 +1,49 @@ +%% Copyright (c) 2017, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_sse). +-behavior(gun_content_handler). + +-export([init/5]). +-export([handle/3]). + +-record(state, { + reply_to :: pid(), + stream_ref :: reference(), + sse_state :: cow_sse:state() +}). + +%% @todo In the future we want to allow different media types. +%% @todo For text/event-stream specifically, the parameters must be ignored. + +-spec init(pid(), reference(), _, cow_http:headers(), _) + -> {ok, #state{}} | disable. +init(ReplyTo, StreamRef, _, Headers, _) -> + case lists:keyfind(<<"content-type">>, 1, Headers) of + {_, <<"text/event-stream">>} -> + {ok, #state{reply_to=ReplyTo, stream_ref=StreamRef, + sse_state=cow_sse:init()}}; + _ -> + disable + end. + +-spec handle(_, binary(), State) -> {done, State} when State::#state{}. +handle(IsFin, Data, State=#state{reply_to=ReplyTo, stream_ref=StreamRef, sse_state=SSE0}) -> + case cow_sse:parse(Data, SSE0) of + {event, Event, SSE} -> + ReplyTo ! {gun_sse, self(), StreamRef, Event}, + handle(IsFin, <<>>, State#state{sse_state=SSE}); + {more, SSE} -> + {done, State#state{sse_state=SSE}} + end. diff --git a/test/sse_SUITE.erl b/test/sse_SUITE.erl new file mode 100644 index 0000000..a036552 --- /dev/null +++ b/test/sse_SUITE.erl @@ -0,0 +1,59 @@ +%% Copyright (c) 2017, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(sse_SUITE). +-compile(export_all). + +all() -> + [http, http2]. + +http(_) -> + {ok, Pid} = gun:open("sse.now.sh", 443, #{ + protocols => [http], + http_opts => #{content_handlers => [gun_sse, gun_data]} + }), + {ok, http} = gun:await_up(Pid), + common(Pid). + +http2(_) -> + {ok, Pid} = gun:open("sse.now.sh", 443, #{ + protocols => [http2], + http2_opts => #{content_handlers => [gun_sse, gun_data]} + }), + {ok, http2} = gun:await_up(Pid), + common(Pid). + +common(Pid) -> + Ref = gun:get(Pid, "/", [ + {<<"host">>, <<"sse.now.sh">>}, + {<<"accept">>, <<"text/event-stream">>} + ]), + receive + {gun_response, Pid, Ref, nofin, Status, Headers} -> + ct:print("response ~p ~p", [Status, Headers]), + event_loop(Pid, Ref, 3) + after 5000 -> + error(timeout) + end. + +event_loop(_, _, 0) -> + ok; +event_loop(Pid, Ref, N) -> + receive + {gun_sse, Pid, Ref, Event} -> + ct:print("event ~p", [Event]), + event_loop(Pid, Ref, N - 1) + after 10000 -> + error(timeout) + end. -- cgit v1.2.3