aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2017-04-27 15:23:57 +0200
committerLoïc Hoguin <[email protected]>2017-04-27 15:23:57 +0200
commit32db544782f2528ed0916eecb200f75924dcc407 (patch)
treeef8746ab021a8172ac84e9f72062922ec4263619
parente8c08c95b896bf9d2dd299e5fdbff50f714e8749 (diff)
downloadgun-32db544782f2528ed0916eecb200f75924dcc407.tar.gz
gun-32db544782f2528ed0916eecb200f75924dcc407.tar.bz2
gun-32db544782f2528ed0916eecb200f75924dcc407.zip
Add content handlers and built-in SSE support
Content handlers are a chain of modules implementing callbacks that receive the body of responses and may modify it (for example for decompressing the content) or act upon it (like sending a message to the owner process. The gun_sse content handler module can be used to translate text/event-stream events on the fly and deliver them to the owner process as a {gun_sse...} message. This feature is currently not documented and is only tested against a public server. It requires an up to date Cowlib.
-rw-r--r--ebin/gun.app2
-rw-r--r--src/gun.erl4
-rw-r--r--src/gun_content_handler.erl69
-rw-r--r--src/gun_data.erl33
-rw-r--r--src/gun_http.erl111
-rw-r--r--src/gun_http2.erl40
-rw-r--r--src/gun_sse.erl49
-rw-r--r--test/sse_SUITE.erl59
8 files changed, 312 insertions, 55 deletions
diff --git a/ebin/gun.app b/ebin/gun.app
index cd61220..eefe4cc 100644
--- a/ebin/gun.app
+++ b/ebin/gun.app
@@ -1,7 +1,7 @@
{application, gun, [
{description, "HTTP/1.1, HTTP/2 and Websocket client for Erlang/OTP."},
{vsn, "1.0.0-pre.2"},
- {modules, ['gun','gun_app','gun_http','gun_http2','gun_spdy','gun_sup','gun_ws']},
+ {modules, ['gun','gun_app','gun_content_handler','gun_data','gun_http','gun_http2','gun_spdy','gun_sse','gun_sup','gun_ws']},
{registered, [gun_sup]},
{applications, [kernel,stdlib,ssl,cowlib,ranch]},
{mod, {gun_app, []}},
diff --git a/src/gun.erl b/src/gun.erl
index eeaaa1e..3edc698 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -102,8 +102,8 @@
host :: inet:hostname(),
port :: inet:port_number(),
opts :: opts(),
- keepalive_ref :: reference(),
- socket :: inet:socket() | ssl:sslsocket(),
+ keepalive_ref :: undefined | reference(),
+ socket :: undefined | inet:socket() | ssl:sslsocket(),
transport :: module(),
protocol :: module(),
protocol_state :: any()
diff --git a/src/gun_content_handler.erl b/src/gun_content_handler.erl
new file mode 100644
index 0000000..b268f56
--- /dev/null
+++ b/src/gun_content_handler.erl
@@ -0,0 +1,69 @@
+%% Copyright (c) 2017, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_content_handler).
+
+-export([init/5]).
+-export([handle/3]).
+-export([check_option/1]).
+
+-type opt() :: [module() | {module(), map()}].
+-export_type([opt/0]).
+
+-type state() :: opt() | [{module(), any()}].
+-export_type([state/0]).
+
+-callback init(pid(), reference(), cow_http:status(),
+ cow_http:headers(), map()) -> {ok, any()} | disable.
+%% @todo Make fin | nofin its own type.
+-callback handle(fin | nofin, any(), State)
+ -> {ok, any(), State} | {done, State} when State::any().
+
+-spec init(pid(), reference(), cow_http:status(),
+ cow_http:headers(), State) -> State when State::state().
+init(_, _, _, _, []) ->
+ [];
+init(ReplyTo, Owner, Status, Headers, [Handler|Tail]) ->
+ {Mod, Opts} = case Handler of
+ Tuple = {_, _} -> Tuple;
+ Atom -> {Atom, #{}}
+ end,
+ case Mod:init(ReplyTo, Owner, Status, Headers, Opts) of
+ {ok, State} -> [{Mod, State}|init(ReplyTo, Owner, Status, Headers, Tail)];
+ disable -> init(ReplyTo, Owner, Status, Headers, Tail)
+ end.
+
+-spec handle(fin | nofin, any(), State) -> State when State::state().
+handle(_, _, []) ->
+ [];
+handle(IsFin, Data0, [{Mod, State0}|Tail]) ->
+ case Mod:handle(IsFin, Data0, State0) of
+ {ok, Data, State} -> [{Mod, State}|handle(IsFin, Data, Tail)];
+ {done, State} -> [{Mod, State}|Tail]
+ end.
+
+-spec check_option(list()) -> ok | error.
+check_option([]) ->
+ error;
+check_option(Opt) ->
+ check_option1(Opt).
+
+check_option1([]) ->
+ ok;
+check_option1([Atom|Tail]) when is_atom(Atom) ->
+ check_option1(Tail);
+check_option1([{Atom, #{}}|Tail]) when is_atom(Atom) ->
+ check_option1(Tail);
+check_option1(_) ->
+ error.
diff --git a/src/gun_data.erl b/src/gun_data.erl
new file mode 100644
index 0000000..e70c25e
--- /dev/null
+++ b/src/gun_data.erl
@@ -0,0 +1,33 @@
+%% Copyright (c) 2017, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_data).
+-behavior(gun_content_handler).
+
+-export([init/5]).
+-export([handle/3]).
+
+-record(state, {
+ reply_to :: pid(),
+ stream_ref :: reference()
+}).
+
+-spec init(pid(), reference(), _, _, _) -> {ok, #state{}}.
+init(ReplyTo, StreamRef, _, _, _) ->
+ {ok, #state{reply_to=ReplyTo, stream_ref=StreamRef}}.
+
+-spec handle(fin | nofin, binary(), State) -> {done, State} when State::#state{}.
+handle(IsFin, Data, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) ->
+ ReplyTo ! {gun_data, self(), StreamRef, IsFin, Data},
+ {done, State}.
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 65135f9..f5eef71 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -31,17 +31,25 @@
-type websocket_info() :: {websocket, reference(), binary(), [binary()], [], gun:ws_opts()}. %% key, extensions, protocols, options
+-record(stream, {
+ ref :: reference() | websocket_info(),
+ method :: binary(),
+ is_alive :: boolean(),
+ handler_state :: undefined | gun_content_handler:state()
+}).
+
-record(http_state, {
owner :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
version = 'HTTP/1.1' :: cow_http:version(),
+ content_handlers :: gun_content_handler:opt(),
connection = keepalive :: keepalive | close,
buffer = <<>> :: binary(),
%% Stream reference, request method and whether the stream is alive.
- streams = [] :: [{reference() | websocket_info(), binary(), boolean()}],
+ streams = [] :: [#stream{}],
in = head :: io(),
- in_state :: {non_neg_integer(), non_neg_integer()},
+ in_state = {0, 0} :: {non_neg_integer(), non_neg_integer()},
out = head :: io()
}).
@@ -54,6 +62,11 @@ do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
do_check_options(Opts);
do_check_options([{version, V}|Opts]) when V =:= 'HTTP/1.1'; V =:= 'HTTP/1.0' ->
do_check_options(Opts);
+do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
+ case gun_content_handler:check_option(Handlers) of
+ ok -> do_check_options(Opts);
+ error -> {error, {options, {http, Opt}}}
+ end;
do_check_options([Opt|_]) ->
{error, {options, {http, Opt}}}.
@@ -61,7 +74,9 @@ name() -> http.
init(Owner, Socket, Transport, Opts) ->
Version = maps:get(version, Opts, 'HTTP/1.1'),
- #http_state{owner=Owner, socket=Socket, transport=Transport, version=Version}.
+ Handlers = maps:get(content_handlers, Opts, [gun_data]),
+ #http_state{owner=Owner, socket=Socket, transport=Transport, version=Version,
+ content_handlers=Handlers}.
%% Stop looping when we got no more data.
handle(<<>>, State) ->
@@ -78,8 +93,7 @@ handle(Data, State=#http_state{in=head, buffer=Buffer}) ->
end;
%% Everything sent to the socket until it closes is part of the response body.
handle(Data, State=#http_state{in=body_close}) ->
- send_data_if_alive(Data, State, nofin),
- State;
+ send_data_if_alive(Data, State, nofin);
handle(Data, State=#http_state{in=body_chunked, in_state=InState,
buffer=Buffer, connection=Conn}) ->
Buffer2 = << Buffer/binary, Data/binary >>,
@@ -87,30 +101,33 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
more ->
State#http_state{buffer=Buffer2};
{more, Data2, InState2} ->
- send_data_if_alive(Data2, State, nofin),
- State#http_state{buffer= <<>>, in_state=InState2};
+ send_data_if_alive(Data2,
+ State#http_state{buffer= <<>>, in_state=InState2},
+ nofin);
{more, Data2, Length, InState2} when is_integer(Length) ->
%% @todo See if we can recv faster than one message at a time.
- send_data_if_alive(Data2, State, nofin),
- State#http_state{buffer= <<>>, in_state=InState2};
+ send_data_if_alive(Data2,
+ State#http_state{buffer= <<>>, in_state=InState2},
+ nofin);
{more, Data2, Rest, InState2} ->
%% @todo See if we can recv faster than one message at a time.
- send_data_if_alive(Data2, State, nofin),
- State#http_state{buffer=Rest, in_state=InState2};
+ send_data_if_alive(Data2,
+ State#http_state{buffer=Rest, in_state=InState2},
+ nofin);
{done, _TotalLength, Rest} ->
%% I suppose it doesn't hurt to append an empty binary.
- send_data_if_alive(<<>>, State, fin),
+ State1 = send_data_if_alive(<<>>, State, fin),
case Conn of
keepalive ->
- handle(Rest, end_stream(State#http_state{buffer= <<>>}));
+ handle(Rest, end_stream(State1#http_state{buffer= <<>>}));
close ->
close
end;
{done, Data2, _TotalLength, Rest} ->
- send_data_if_alive(Data2, State, fin),
+ State1 = send_data_if_alive(Data2, State, fin),
case Conn of
keepalive ->
- handle(Rest, end_stream(State#http_state{buffer= <<>>}));
+ handle(Rest, end_stream(State1#http_state{buffer= <<>>}));
close ->
close
end
@@ -121,27 +138,29 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) ->
if
%% More data coming.
DataSize < Length ->
- send_data_if_alive(Data, State, nofin),
- State#http_state{in={body, Length - DataSize}};
+ send_data_if_alive(Data,
+ State#http_state{in={body, Length - DataSize}},
+ nofin);
%% Stream finished, no rest.
DataSize =:= Length ->
- send_data_if_alive(Data, State, fin),
+ State1 = send_data_if_alive(Data, State, fin),
case Conn of
- keepalive -> end_stream(State);
+ keepalive -> end_stream(State1);
close -> close
end;
%% Stream finished, rest.
true ->
<< Body:Length/binary, Rest/bits >> = Data,
- send_data_if_alive(Body, State, fin),
+ State1 = send_data_if_alive(Body, State, fin),
case Conn of
- keepalive -> handle(Rest, end_stream(State));
+ keepalive -> handle(Rest, end_stream(State1));
close -> close
end
end.
handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion,
- connection=Conn, streams=[{StreamRef, Method, IsAlive}|_]}) ->
+ content_handlers=Handlers0, connection=Conn,
+ streams=[Stream=#stream{ref=StreamRef, method=Method, is_alive=IsAlive}|Tail]}) ->
{Version, Status, _, Rest} = cow_http:parse_status_line(Data),
{Headers, Rest2} = cow_http:parse_headers(Rest),
case {Status, StreamRef} of
@@ -150,7 +169,7 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion,
_ ->
In = response_io_from_headers(Method, Version, Status, Headers),
IsFin = case In of head -> fin; _ -> nofin end,
- case IsAlive of
+ Handlers = case IsAlive of
false ->
ok;
true ->
@@ -160,7 +179,13 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion,
end,
Owner ! {gun_response, self(), StreamRef2,
IsFin, Status, Headers},
- ok
+ %% @todo Change to ReplyTo.
+ case IsFin of
+ fin -> undefined;
+ nofin ->
+ gun_content_handler:init(Owner, StreamRef,
+ Status, Headers, Handlers0)
+ end
end,
Conn2 = if
Conn =:= close -> close;
@@ -174,33 +199,36 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion,
close;
IsFin =:= fin ->
handle(Rest2, end_stream(State#http_state{in=In,
- in_state={0, 0}, connection=Conn2}));
+ in_state={0, 0}, connection=Conn2,
+ streams=[Stream#stream{handler_state=Handlers}|Tail]}));
true ->
- handle(Rest2, State#http_state{in=In, in_state={0, 0}, connection=Conn2})
+ handle(Rest2, State#http_state{in=In,
+ in_state={0, 0}, connection=Conn2,
+ streams=[Stream#stream{handler_state=Handlers}|Tail]})
end
end.
-send_data_if_alive(<<>>, _, nofin) ->
- ok;
+send_data_if_alive(<<>>, State, nofin) ->
+ State;
%% @todo What if we receive data when the HEAD method was used?
-send_data_if_alive(Data, #http_state{owner=Owner,
- streams=[{StreamRef, _, true}|_]}, IsFin) ->
- Owner ! {gun_data, self(), StreamRef, IsFin, Data},
- ok;
-send_data_if_alive(_, _, _) ->
- ok.
+send_data_if_alive(Data, State=#http_state{streams=[Stream=#stream{
+ is_alive=true, handler_state=Handlers0}|Tail]}, IsFin) ->
+ Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
+ State#http_state{streams=[Stream#stream{handler_state=Handlers}|Tail]};
+send_data_if_alive(_, State, _) ->
+ State.
close(State=#http_state{in=body_close, owner=Owner, streams=[_|Tail]}) ->
- send_data_if_alive(<<>>, State, fin),
+ _ = send_data_if_alive(<<>>, State, fin),
close_streams(Owner, Tail);
close(#http_state{owner=Owner, streams=Streams}) ->
close_streams(Owner, Streams).
close_streams(_, []) ->
ok;
-close_streams(Owner, [{_, _, false}|Tail]) ->
+close_streams(Owner, [#stream{is_alive=false}|Tail]) ->
close_streams(Owner, Tail);
-close_streams(Owner, [{StreamRef, _, _}|Tail]) ->
+close_streams(Owner, [#stream{ref=StreamRef}|Tail]) ->
Owner ! {gun_error, self(), StreamRef, {closed,
"The connection was lost."}},
close_streams(Owner, Tail).
@@ -256,7 +284,7 @@ data(State=#http_state{streams=[]}, StreamRef, _, _) ->
data(State=#http_state{socket=Socket, transport=Transport, version=Version,
out=Out, streams=Streams}, StreamRef, IsFin, Data) ->
case lists:last(Streams) of
- {StreamRef, _, true} ->
+ #stream{ref=StreamRef, is_alive=true} ->
DataLength = iolist_size(Data),
case Out of
body_chunked when Version =:= 'HTTP/1.1', IsFin =:= fin ->
@@ -304,7 +332,7 @@ down(#http_state{streams=Streams}) ->
KilledStreams = [case Ref of
{websocket, Ref2, _, _, _, _} -> Ref2;
_ -> Ref
- end || {Ref, _, _} <- Streams],
+ end || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
error_stream_closed(State=#http_state{owner=Owner}, StreamRef) ->
@@ -373,7 +401,8 @@ response_io_from_headers(_, Version, _Status, Headers) ->
%% Streams.
new_stream(State=#http_state{streams=Streams}, StreamRef, Method) ->
- State#http_state{streams=Streams ++ [{StreamRef, iolist_to_binary(Method), true}]}.
+ State#http_state{streams=Streams
+ ++ [#stream{ref=StreamRef, method=iolist_to_binary(Method), is_alive=true}]}.
is_stream(#http_state{streams=Streams}, StreamRef) ->
lists:keymember(StreamRef, 1, Streams).
@@ -384,7 +413,7 @@ cancel_stream(State=#http_state{streams=Streams}, StreamRef) ->
{Ref, false};
_ ->
Tuple
- end || Tuple = {Ref, _, _} <- Streams],
+ end || Tuple = #stream{ref=Ref} <- Streams],
State#http_state{streams=Streams2}.
end_stream(State=#http_state{streams=[_|Tail]}) ->
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 8f9bae7..30ee06e 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -32,13 +32,16 @@
%% Whether we finished sending data.
local = nofin :: cowboy_stream:fin(),
%% Whether we finished receiving data.
- remote = nofin :: cowboy_stream:fin()
+ remote = nofin :: cowboy_stream:fin(),
+ %% Content handlers state.
+ handler_state :: undefined | gun_content_handler:state()
}).
-record(http2_state, {
owner :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
+ content_handlers :: gun_content_handler:opt(),
buffer = <<>> :: binary(),
%% @todo local_settings, next_settings, remote_settings
@@ -58,18 +61,25 @@ do_check_options([]) ->
ok;
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
do_check_options(Opts);
+do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
+ case gun_content_handler:check_option(Handlers) of
+ ok -> do_check_options(Opts);
+ error -> {error, {options, {http, Opt}}}
+ end;
do_check_options([Opt|_]) ->
{error, {options, {http2, Opt}}}.
name() -> http2.
-init(Owner, Socket, Transport, _Opts) ->
+init(Owner, Socket, Transport, Opts) ->
%% Send the HTTP/2 preface.
Transport:send(Socket, [
<< "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>,
cow_http2:settings(#{}) %% @todo Settings.
]),
- #http2_state{owner=Owner, socket=Socket, transport=Transport}.
+ Handlers = maps:get(content_handlers, Opts, [gun_data]),
+ #http2_state{owner=Owner, socket=Socket, transport=Transport,
+ content_handlers=Handlers}.
handle(Data, State=#http2_state{buffer=Buffer}) ->
parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}).
@@ -89,11 +99,11 @@ parse(Data0, State=#http2_state{buffer=Buffer}) ->
end.
%% DATA frame.
-frame({data, StreamID, IsFin, Data}, State=#http2_state{owner=Owner}) ->
+frame({data, StreamID, IsFin, Data}, State) ->
case get_stream_by_id(StreamID, State) of
- Stream = #stream{ref=StreamRef, remote=nofin} ->
- Owner ! {gun_data, self(), StreamRef, IsFin, Data},
- remote_fin(Stream, State, IsFin);
+ Stream = #stream{remote=nofin, handler_state=Handlers0} ->
+ Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
+ remote_fin(Stream#stream{handler_state=Handlers}, State, IsFin);
_ ->
%% @todo protocol_error if not existing
stream_reset(State, StreamID, {stream_error, stream_closed,
@@ -101,7 +111,7 @@ frame({data, StreamID, IsFin, Data}, State=#http2_state{owner=Owner}) ->
end;
%% Single HEADERS frame headers block.
frame({headers, StreamID, IsFin, head_fin, HeaderBlock},
- State=#http2_state{owner=Owner, decode_state=DecodeState0}) ->
+ State=#http2_state{owner=Owner, decode_state=DecodeState0, content_handlers=Handlers0}) ->
case get_stream_by_id(StreamID, State) of
Stream = #stream{ref=StreamRef, remote=nofin} ->
try cow_hpack:decode(HeaderBlock, DecodeState0) of
@@ -109,7 +119,15 @@ frame({headers, StreamID, IsFin, head_fin, HeaderBlock},
case lists:keytake(<<":status">>, 1, Headers0) of
{value, {_, Status}, Headers} ->
Owner ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers},
- remote_fin(Stream, State#http2_state{decode_state=DecodeState}, IsFin);
+ %% @todo Change to ReplyTo.
+ Handlers = case IsFin of
+ fin -> undefined;
+ nofin ->
+ gun_content_handler:init(Owner, StreamRef,
+ Status, Headers, Handlers0)
+ end,
+ remote_fin(Stream#stream{handler_state=Handlers},
+ State#http2_state{decode_state=DecodeState}, IsFin);
false ->
stream_reset(State, StreamID, {stream_error, protocol_error,
'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'})
@@ -368,10 +386,10 @@ delete_stream(StreamID, State=#http2_state{streams=Streams}) ->
Streams2 = lists:keydelete(StreamID, #stream.id, Streams),
State#http2_state{streams=Streams2}.
-remote_fin(_, State, nofin) ->
- State;
remote_fin(S=#stream{local=fin}, State, fin) ->
delete_stream(S#stream.id, State);
+%% We always replace the stream in the state because
+%% the content handler state has changed.
remote_fin(S, State=#http2_state{streams=Streams}, IsFin) ->
Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams,
S#stream{remote=IsFin}),
diff --git a/src/gun_sse.erl b/src/gun_sse.erl
new file mode 100644
index 0000000..4fe7419
--- /dev/null
+++ b/src/gun_sse.erl
@@ -0,0 +1,49 @@
+%% Copyright (c) 2017, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_sse).
+-behavior(gun_content_handler).
+
+-export([init/5]).
+-export([handle/3]).
+
+-record(state, {
+ reply_to :: pid(),
+ stream_ref :: reference(),
+ sse_state :: cow_sse:state()
+}).
+
+%% @todo In the future we want to allow different media types.
+%% @todo For text/event-stream specifically, the parameters must be ignored.
+
+-spec init(pid(), reference(), _, cow_http:headers(), _)
+ -> {ok, #state{}} | disable.
+init(ReplyTo, StreamRef, _, Headers, _) ->
+ case lists:keyfind(<<"content-type">>, 1, Headers) of
+ {_, <<"text/event-stream">>} ->
+ {ok, #state{reply_to=ReplyTo, stream_ref=StreamRef,
+ sse_state=cow_sse:init()}};
+ _ ->
+ disable
+ end.
+
+-spec handle(_, binary(), State) -> {done, State} when State::#state{}.
+handle(IsFin, Data, State=#state{reply_to=ReplyTo, stream_ref=StreamRef, sse_state=SSE0}) ->
+ case cow_sse:parse(Data, SSE0) of
+ {event, Event, SSE} ->
+ ReplyTo ! {gun_sse, self(), StreamRef, Event},
+ handle(IsFin, <<>>, State#state{sse_state=SSE});
+ {more, SSE} ->
+ {done, State#state{sse_state=SSE}}
+ end.
diff --git a/test/sse_SUITE.erl b/test/sse_SUITE.erl
new file mode 100644
index 0000000..a036552
--- /dev/null
+++ b/test/sse_SUITE.erl
@@ -0,0 +1,59 @@
+%% Copyright (c) 2017, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(sse_SUITE).
+-compile(export_all).
+
+all() ->
+ [http, http2].
+
+http(_) ->
+ {ok, Pid} = gun:open("sse.now.sh", 443, #{
+ protocols => [http],
+ http_opts => #{content_handlers => [gun_sse, gun_data]}
+ }),
+ {ok, http} = gun:await_up(Pid),
+ common(Pid).
+
+http2(_) ->
+ {ok, Pid} = gun:open("sse.now.sh", 443, #{
+ protocols => [http2],
+ http2_opts => #{content_handlers => [gun_sse, gun_data]}
+ }),
+ {ok, http2} = gun:await_up(Pid),
+ common(Pid).
+
+common(Pid) ->
+ Ref = gun:get(Pid, "/", [
+ {<<"host">>, <<"sse.now.sh">>},
+ {<<"accept">>, <<"text/event-stream">>}
+ ]),
+ receive
+ {gun_response, Pid, Ref, nofin, Status, Headers} ->
+ ct:print("response ~p ~p", [Status, Headers]),
+ event_loop(Pid, Ref, 3)
+ after 5000 ->
+ error(timeout)
+ end.
+
+event_loop(_, _, 0) ->
+ ok;
+event_loop(Pid, Ref, N) ->
+ receive
+ {gun_sse, Pid, Ref, Event} ->
+ ct:print("event ~p", [Event]),
+ event_loop(Pid, Ref, N - 1)
+ after 10000 ->
+ error(timeout)
+ end.