aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_http2.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/gun_http2.erl')
-rw-r--r--src/gun_http2.erl144
1 files changed, 115 insertions, 29 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 3b3b79b..5942037 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -19,6 +19,7 @@
-export([init/4]).
-export([handle/4]).
-export([update_flow/4]).
+-export([closing/4]).
-export([close/4]).
-export([keepalive/1]).
-export([headers/11]).
@@ -53,6 +54,10 @@
content_handlers :: gun_content_handler:opt(),
buffer = <<>> :: binary(),
+ %% Current status of the connection. We use this to ensure we are
+ %% not sending the GOAWAY frame more than once.
+ status = connected :: connected | goaway | closing,
+
%% HTTP/2 state machine.
http2_machine :: cow_http2_machine:http2_machine(),
@@ -66,6 +71,10 @@ check_options(Opts) ->
do_check_options([]) ->
ok;
+do_check_options([{closing_timeout, infinity}|Opts]) ->
+ do_check_options(Opts);
+do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 ->
+ do_check_options(Opts);
do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
case gun_content_handler:check_option(Handlers) of
ok -> do_check_options(Opts);
@@ -99,7 +108,8 @@ handle(Data, State=#http2_state{buffer=Buffer}, EvHandler, EvHandlerState) ->
parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>},
EvHandler, EvHandlerState).
-parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandlerState0) ->
+parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, streams=Streams},
+ EvHandler, EvHandlerState0) ->
MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine),
case cow_http2:parse(Data, MaxFrameSize) of
{ok, Frame, Rest} ->
@@ -116,7 +126,14 @@ parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandle
parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}),
EvHandler, EvHandlerState0);
Error = {connection_error, _, _} ->
- {terminate(State0, Error), EvHandlerState0};
+ {connection_error(State0, Error), EvHandlerState0};
+ %% If we both received and sent a GOAWAY frame and there are no streams
+ %% currently running, we can close the connection immediately.
+ more when Status =/= connected, Streams =:= [] ->
+ {[{state, State0#http2_state{buffer=Data, status=closing}}, close], EvHandlerState0};
+ %% Otherwise we enter the closing state.
+ more when Status =:= goaway ->
+ {[{state, State0#http2_state{buffer=Data, status=closing}}, closing(State0)], EvHandlerState0};
more ->
{{state, State0#http2_state{buffer=Data}}, EvHandlerState0}
end.
@@ -169,9 +186,8 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl
push_promise_frame(State#http2_state{http2_machine=HTTP2Machine},
StreamID, PromisedStreamID, Headers, PseudoHeaders,
EvHandler, EvHandlerState);
- {ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} ->
- {terminate(State#http2_state{http2_machine=HTTP2Machine},
- {stop, Frame, 'Server is going away.'}),
+ {ok, GoAway={goaway, _, _, _}, HTTP2Machine} ->
+ {goaway(State#http2_state{http2_machine=HTTP2Machine}, GoAway),
EvHandlerState};
{send, SendData, HTTP2Machine} ->
send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData,
@@ -181,7 +197,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl
StreamID, {stream_error, Reason, Human}),
EvHandlerState};
{error, Error={connection_error, _, _}, HTTP2Machine} ->
- {terminate(State#http2_state{http2_machine=HTTP2Machine}, Error),
+ {connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error),
EvHandlerState}
end.
@@ -318,7 +334,8 @@ rst_stream_frame(State=#http2_state{streams=Streams0},
end.
%% Pushed streams receive the same initial flow value as the parent stream.
-push_promise_frame(State=#http2_state{streams=Streams},
+push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
+ status=Status, http2_machine=HTTP2Machine0, streams=Streams},
StreamID, PromisedStreamID, Headers, #{
method := Method, scheme := Scheme,
authority := Authority, path := Path},
@@ -326,25 +343,39 @@ push_promise_frame(State=#http2_state{streams=Streams},
#stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID),
PromisedStreamRef = make_ref(),
URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]),
- ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers},
- EvHandlerState = EvHandler:push_promise_end(#{
+ PushPromiseEvent0 = #{
stream_ref => StreamRef,
reply_to => ReplyTo,
- promised_stream_ref => PromisedStreamRef,
method => Method,
uri => URI,
headers => Headers
- }, EvHandlerState0),
- NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef,
- reply_to=ReplyTo, flow=InitialFlow},
- {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}.
+ },
+ PushPromiseEvent = case Status of
+ connected ->
+ ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers},
+ PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef};
+ _ ->
+ PushPromiseEvent0
+ end,
+ EvHandlerState = EvHandler:push_promise_end(PushPromiseEvent, EvHandlerState0),
+ case Status of
+ connected ->
+ NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef,
+ reply_to=ReplyTo, flow=InitialFlow},
+ {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState};
+ %% We cancel the push_promise immediately when we are shutting down.
+ _ ->
+ {ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0),
+ Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)),
+ {State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState}
+ end.
ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
case cow_http2_machine:ignored_frame(HTTP2Machine0) of
{ok, HTTP2Machine} ->
State#http2_state{http2_machine=HTTP2Machine};
{error, Error={connection_error, _, _}, HTTP2Machine} ->
- terminate(State#http2_state{http2_machine=HTTP2Machine}, Error)
+ connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error)
end.
update_flow(State=#http2_state{socket=Socket, transport=Transport,
@@ -369,16 +400,72 @@ update_flow(State=#http2_state{socket=Socket, transport=Transport,
[]
end.
-%% @todo Use Reason.
-close(_, #http2_state{streams=Streams}, _, EvHandlerState) ->
- {close_streams(Streams), EvHandlerState}.
+%% We may have to cancel streams even if we receive multiple
+%% GOAWAY frames as the LastStreamID value may be lower than
+%% the one previously received.
+goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine,
+ status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) ->
+ Streams = goaway_streams(Streams0, LastStreamID, {goaway, Reason, 'The connection is going away.'}, []),
+ State = State0#http2_state{streams=Streams},
+ case Status of
+ connected ->
+ Transport:send(Socket, cow_http2:goaway(
+ cow_http2_machine:get_last_streamid(HTTP2Machine),
+ no_error, <<>>)),
+ State#http2_state{status=goaway};
+ _ ->
+ State
+ end.
+
+%% Cancel server-initiated streams that are above LastStreamID.
+goaway_streams([], _, _, Acc) ->
+ Acc;
+goaway_streams([Stream=#stream{id=StreamID}|Tail], LastStreamID, Reason, Acc)
+ when StreamID > LastStreamID, (StreamID rem 2) =:= 1 ->
+ close_stream(Stream, Reason),
+ goaway_streams(Tail, LastStreamID, Reason, Acc);
+goaway_streams([Stream|Tail], LastStreamID, Reason, Acc) ->
+ goaway_streams(Tail, LastStreamID, Reason, [Stream|Acc]).
+
+%% We are already closing, do nothing.
+closing(_, #http2_state{status=closing}, _, EvHandlerState) ->
+ {[], EvHandlerState};
+closing(Reason0, State=#http2_state{socket=Socket, transport=Transport,
+ http2_machine=HTTP2Machine}, _, EvHandlerState) ->
+ Reason = case Reason0 of
+ normal -> no_error;
+ owner_down -> no_error;
+ _ -> internal_error
+ end,
+ Transport:send(Socket, cow_http2:goaway(
+ cow_http2_machine:get_last_streamid(HTTP2Machine),
+ Reason, <<>>)),
+ {[
+ {state, State#http2_state{status=closing}},
+ closing(State)
+ ], EvHandlerState}.
-close_streams([]) ->
+closing(#http2_state{opts=Opts}) ->
+ Timeout = maps:get(closing_timeout, Opts, 15000),
+ {closing, Timeout}.
+
+close(Reason, #http2_state{streams=Streams}, _, EvHandlerState) ->
+ close_streams(Streams, close_reason(Reason)),
+ EvHandlerState.
+
+close_reason(closed) -> closed;
+close_reason(Reason) -> {closed, Reason}.
+
+close_streams([], _) ->
ok;
-close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
- ReplyTo ! {gun_error, self(), StreamRef, {closed,
- "The connection was lost."}},
- close_streams(Tail).
+close_streams([Stream|Tail], Reason) ->
+ close_stream(Stream, Reason),
+ close_streams(Tail, Reason).
+
+%% @todo Do we want an event for this?
+close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) ->
+ ReplyTo ! {gun_error, self(), StreamRef, Reason},
+ ok.
keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
Transport:send(Socket, cow_http2:ping(0)),
@@ -429,6 +516,7 @@ request(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
headers => Headers
},
EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
+ %% @todo We should not send an empty DATA frame on empty bodies.
{ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
@@ -586,8 +674,9 @@ down(#http2_state{streams=Streams}) ->
KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
-terminate(#http2_state{socket=Socket, transport=Transport,
- http2_machine=HTTP2Machine, streams=Streams}, Reason) ->
+connection_error(#http2_state{socket=Socket, transport=Transport,
+ http2_machine=HTTP2Machine, streams=Streams},
+ {connection_error, Reason, _}) ->
%% The connection is going away either at the request of the server,
%% or because an error occurred in the protocol. Inform the streams.
%% @todo We should not send duplicate messages to processes.
@@ -597,12 +686,9 @@ terminate(#http2_state{socket=Socket, transport=Transport,
_ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams],
Transport:send(Socket, cow_http2:goaway(
cow_http2_machine:get_last_streamid(HTTP2Machine),
- terminate_reason(Reason), <<>>)),
+ Reason, <<>>)),
close.
-terminate_reason({connection_error, Reason, _}) -> Reason;
-terminate_reason({stop, _, _}) -> no_error.
-
%% Stream functions.
error_stream_closed(State, StreamRef, ReplyTo) ->