aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_http2.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2017-04-27 15:23:57 +0200
committerLoïc Hoguin <[email protected]>2017-04-27 15:23:57 +0200
commit32db544782f2528ed0916eecb200f75924dcc407 (patch)
treeef8746ab021a8172ac84e9f72062922ec4263619 /src/gun_http2.erl
parente8c08c95b896bf9d2dd299e5fdbff50f714e8749 (diff)
downloadgun-32db544782f2528ed0916eecb200f75924dcc407.tar.gz
gun-32db544782f2528ed0916eecb200f75924dcc407.tar.bz2
gun-32db544782f2528ed0916eecb200f75924dcc407.zip
Add content handlers and built-in SSE support
Content handlers are a chain of modules implementing callbacks that receive the body of responses and may modify it (for example for decompressing the content) or act upon it (like sending a message to the owner process. The gun_sse content handler module can be used to translate text/event-stream events on the fly and deliver them to the owner process as a {gun_sse...} message. This feature is currently not documented and is only tested against a public server. It requires an up to date Cowlib.
Diffstat (limited to 'src/gun_http2.erl')
-rw-r--r--src/gun_http2.erl40
1 files changed, 29 insertions, 11 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 8f9bae7..30ee06e 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -32,13 +32,16 @@
%% Whether we finished sending data.
local = nofin :: cowboy_stream:fin(),
%% Whether we finished receiving data.
- remote = nofin :: cowboy_stream:fin()
+ remote = nofin :: cowboy_stream:fin(),
+ %% Content handlers state.
+ handler_state :: undefined | gun_content_handler:state()
}).
-record(http2_state, {
owner :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
+ content_handlers :: gun_content_handler:opt(),
buffer = <<>> :: binary(),
%% @todo local_settings, next_settings, remote_settings
@@ -58,18 +61,25 @@ do_check_options([]) ->
ok;
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
do_check_options(Opts);
+do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
+ case gun_content_handler:check_option(Handlers) of
+ ok -> do_check_options(Opts);
+ error -> {error, {options, {http, Opt}}}
+ end;
do_check_options([Opt|_]) ->
{error, {options, {http2, Opt}}}.
name() -> http2.
-init(Owner, Socket, Transport, _Opts) ->
+init(Owner, Socket, Transport, Opts) ->
%% Send the HTTP/2 preface.
Transport:send(Socket, [
<< "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>,
cow_http2:settings(#{}) %% @todo Settings.
]),
- #http2_state{owner=Owner, socket=Socket, transport=Transport}.
+ Handlers = maps:get(content_handlers, Opts, [gun_data]),
+ #http2_state{owner=Owner, socket=Socket, transport=Transport,
+ content_handlers=Handlers}.
handle(Data, State=#http2_state{buffer=Buffer}) ->
parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}).
@@ -89,11 +99,11 @@ parse(Data0, State=#http2_state{buffer=Buffer}) ->
end.
%% DATA frame.
-frame({data, StreamID, IsFin, Data}, State=#http2_state{owner=Owner}) ->
+frame({data, StreamID, IsFin, Data}, State) ->
case get_stream_by_id(StreamID, State) of
- Stream = #stream{ref=StreamRef, remote=nofin} ->
- Owner ! {gun_data, self(), StreamRef, IsFin, Data},
- remote_fin(Stream, State, IsFin);
+ Stream = #stream{remote=nofin, handler_state=Handlers0} ->
+ Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
+ remote_fin(Stream#stream{handler_state=Handlers}, State, IsFin);
_ ->
%% @todo protocol_error if not existing
stream_reset(State, StreamID, {stream_error, stream_closed,
@@ -101,7 +111,7 @@ frame({data, StreamID, IsFin, Data}, State=#http2_state{owner=Owner}) ->
end;
%% Single HEADERS frame headers block.
frame({headers, StreamID, IsFin, head_fin, HeaderBlock},
- State=#http2_state{owner=Owner, decode_state=DecodeState0}) ->
+ State=#http2_state{owner=Owner, decode_state=DecodeState0, content_handlers=Handlers0}) ->
case get_stream_by_id(StreamID, State) of
Stream = #stream{ref=StreamRef, remote=nofin} ->
try cow_hpack:decode(HeaderBlock, DecodeState0) of
@@ -109,7 +119,15 @@ frame({headers, StreamID, IsFin, head_fin, HeaderBlock},
case lists:keytake(<<":status">>, 1, Headers0) of
{value, {_, Status}, Headers} ->
Owner ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers},
- remote_fin(Stream, State#http2_state{decode_state=DecodeState}, IsFin);
+ %% @todo Change to ReplyTo.
+ Handlers = case IsFin of
+ fin -> undefined;
+ nofin ->
+ gun_content_handler:init(Owner, StreamRef,
+ Status, Headers, Handlers0)
+ end,
+ remote_fin(Stream#stream{handler_state=Handlers},
+ State#http2_state{decode_state=DecodeState}, IsFin);
false ->
stream_reset(State, StreamID, {stream_error, protocol_error,
'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'})
@@ -368,10 +386,10 @@ delete_stream(StreamID, State=#http2_state{streams=Streams}) ->
Streams2 = lists:keydelete(StreamID, #stream.id, Streams),
State#http2_state{streams=Streams2}.
-remote_fin(_, State, nofin) ->
- State;
remote_fin(S=#stream{local=fin}, State, fin) ->
delete_stream(S#stream.id, State);
+%% We always replace the stream in the state because
+%% the content handler state has changed.
remote_fin(S, State=#http2_state{streams=Streams}, IsFin) ->
Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams,
S#stream{remote=IsFin}),