diff options
author | Loïc Hoguin <[email protected]> | 2019-08-08 16:33:09 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2019-09-05 11:28:07 +0200 |
commit | c974b4334e7ab660f9bf95653696c3663c02ead3 (patch) | |
tree | 9e501a4928b261c4fe9adc74d80c47b6b14ae50a /src/gun_http2.erl | |
parent | 491ddf58c0e14824a741852fdc522b390b306ae2 (diff) | |
download | gun-c974b4334e7ab660f9bf95653696c3663c02ead3.tar.gz gun-c974b4334e7ab660f9bf95653696c3663c02ead3.tar.bz2 gun-c974b4334e7ab660f9bf95653696c3663c02ead3.zip |
Implement graceful shutdown
The graceful shutdown is implemented through a new 'closing'
state. This state is entered under different circumstances
depending on the protocol.
The gun:shutdown/1 function is now implemented and documented.
It allows shutting down the connection gracefully regardless
of the current state of the connection and for all protocols.
The behavior is entirely dependent on the protocol.
For HTTP/1.1 the connection stays up only until after the
current stream is complete; other streams are immediately
canceled.
For HTTP/2 a GOAWAY frame is sent and existing streams
continue to be processed. The connection is closed after
all streams are processed and the server's GOAWAY frame
is received.
For Websocket a close frame is sent. The connection is
closed when receiving the server's close frame.
In all cases the closing_timeout option defines how long
we wait, as a maximum, before closing the connection after
the graceful shutdown was started.
The graceful shutdown is also initiated when the owner
process goes away; when sending an HTTP/1.1 request
with the connection: close header; when receiving an
HTTP/1.1 response with the connection: close header;
when receiving an HTTP/1.0 response without a connection
header; when the server sends a GOAWAY HTTP/2 frame;
or when we send or receive a Websocket close frame.
Along with these changes, the gun:ws_send/2 function
now accepts a list of frames as argument. Those frames
may include a close frame that initiates the graceful
shutdown.
Diffstat (limited to 'src/gun_http2.erl')
-rw-r--r-- | src/gun_http2.erl | 144 |
1 files changed, 115 insertions, 29 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 3b3b79b..5942037 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -19,6 +19,7 @@ -export([init/4]). -export([handle/4]). -export([update_flow/4]). +-export([closing/4]). -export([close/4]). -export([keepalive/1]). -export([headers/11]). @@ -53,6 +54,10 @@ content_handlers :: gun_content_handler:opt(), buffer = <<>> :: binary(), + %% Current status of the connection. We use this to ensure we are + %% not sending the GOAWAY frame more than once. + status = connected :: connected | goaway | closing, + %% HTTP/2 state machine. http2_machine :: cow_http2_machine:http2_machine(), @@ -66,6 +71,10 @@ check_options(Opts) -> do_check_options([]) -> ok; +do_check_options([{closing_timeout, infinity}|Opts]) -> + do_check_options(Opts); +do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 -> + do_check_options(Opts); do_check_options([Opt={content_handlers, Handlers}|Opts]) -> case gun_content_handler:check_option(Handlers) of ok -> do_check_options(Opts); @@ -99,7 +108,8 @@ handle(Data, State=#http2_state{buffer=Buffer}, EvHandler, EvHandlerState) -> parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}, EvHandler, EvHandlerState). -parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandlerState0) -> +parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, streams=Streams}, + EvHandler, EvHandlerState0) -> MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine), case cow_http2:parse(Data, MaxFrameSize) of {ok, Frame, Rest} -> @@ -116,7 +126,14 @@ parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandle parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}), EvHandler, EvHandlerState0); Error = {connection_error, _, _} -> - {terminate(State0, Error), EvHandlerState0}; + {connection_error(State0, Error), EvHandlerState0}; + %% If we both received and sent a GOAWAY frame and there are no streams + %% currently running, we can close the connection immediately. + more when Status =/= connected, Streams =:= [] -> + {[{state, State0#http2_state{buffer=Data, status=closing}}, close], EvHandlerState0}; + %% Otherwise we enter the closing state. + more when Status =:= goaway -> + {[{state, State0#http2_state{buffer=Data, status=closing}}, closing(State0)], EvHandlerState0}; more -> {{state, State0#http2_state{buffer=Data}}, EvHandlerState0} end. @@ -169,9 +186,8 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl push_promise_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, PromisedStreamID, Headers, PseudoHeaders, EvHandler, EvHandlerState); - {ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} -> - {terminate(State#http2_state{http2_machine=HTTP2Machine}, - {stop, Frame, 'Server is going away.'}), + {ok, GoAway={goaway, _, _, _}, HTTP2Machine} -> + {goaway(State#http2_state{http2_machine=HTTP2Machine}, GoAway), EvHandlerState}; {send, SendData, HTTP2Machine} -> send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData, @@ -181,7 +197,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl StreamID, {stream_error, Reason, Human}), EvHandlerState}; {error, Error={connection_error, _, _}, HTTP2Machine} -> - {terminate(State#http2_state{http2_machine=HTTP2Machine}, Error), + {connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error), EvHandlerState} end. @@ -318,7 +334,8 @@ rst_stream_frame(State=#http2_state{streams=Streams0}, end. %% Pushed streams receive the same initial flow value as the parent stream. -push_promise_frame(State=#http2_state{streams=Streams}, +push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, + status=Status, http2_machine=HTTP2Machine0, streams=Streams}, StreamID, PromisedStreamID, Headers, #{ method := Method, scheme := Scheme, authority := Authority, path := Path}, @@ -326,25 +343,39 @@ push_promise_frame(State=#http2_state{streams=Streams}, #stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID), PromisedStreamRef = make_ref(), URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]), - ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers}, - EvHandlerState = EvHandler:push_promise_end(#{ + PushPromiseEvent0 = #{ stream_ref => StreamRef, reply_to => ReplyTo, - promised_stream_ref => PromisedStreamRef, method => Method, uri => URI, headers => Headers - }, EvHandlerState0), - NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, - reply_to=ReplyTo, flow=InitialFlow}, - {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}. + }, + PushPromiseEvent = case Status of + connected -> + ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers}, + PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef}; + _ -> + PushPromiseEvent0 + end, + EvHandlerState = EvHandler:push_promise_end(PushPromiseEvent, EvHandlerState0), + case Status of + connected -> + NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, + reply_to=ReplyTo, flow=InitialFlow}, + {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}; + %% We cancel the push_promise immediately when we are shutting down. + _ -> + {ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0), + Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)), + {State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState} + end. ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> case cow_http2_machine:ignored_frame(HTTP2Machine0) of {ok, HTTP2Machine} -> State#http2_state{http2_machine=HTTP2Machine}; {error, Error={connection_error, _, _}, HTTP2Machine} -> - terminate(State#http2_state{http2_machine=HTTP2Machine}, Error) + connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error) end. update_flow(State=#http2_state{socket=Socket, transport=Transport, @@ -369,16 +400,72 @@ update_flow(State=#http2_state{socket=Socket, transport=Transport, [] end. -%% @todo Use Reason. -close(_, #http2_state{streams=Streams}, _, EvHandlerState) -> - {close_streams(Streams), EvHandlerState}. +%% We may have to cancel streams even if we receive multiple +%% GOAWAY frames as the LastStreamID value may be lower than +%% the one previously received. +goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, + status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) -> + Streams = goaway_streams(Streams0, LastStreamID, {goaway, Reason, 'The connection is going away.'}, []), + State = State0#http2_state{streams=Streams}, + case Status of + connected -> + Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + no_error, <<>>)), + State#http2_state{status=goaway}; + _ -> + State + end. + +%% Cancel server-initiated streams that are above LastStreamID. +goaway_streams([], _, _, Acc) -> + Acc; +goaway_streams([Stream=#stream{id=StreamID}|Tail], LastStreamID, Reason, Acc) + when StreamID > LastStreamID, (StreamID rem 2) =:= 1 -> + close_stream(Stream, Reason), + goaway_streams(Tail, LastStreamID, Reason, Acc); +goaway_streams([Stream|Tail], LastStreamID, Reason, Acc) -> + goaway_streams(Tail, LastStreamID, Reason, [Stream|Acc]). + +%% We are already closing, do nothing. +closing(_, #http2_state{status=closing}, _, EvHandlerState) -> + {[], EvHandlerState}; +closing(Reason0, State=#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine}, _, EvHandlerState) -> + Reason = case Reason0 of + normal -> no_error; + owner_down -> no_error; + _ -> internal_error + end, + Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + Reason, <<>>)), + {[ + {state, State#http2_state{status=closing}}, + closing(State) + ], EvHandlerState}. -close_streams([]) -> +closing(#http2_state{opts=Opts}) -> + Timeout = maps:get(closing_timeout, Opts, 15000), + {closing, Timeout}. + +close(Reason, #http2_state{streams=Streams}, _, EvHandlerState) -> + close_streams(Streams, close_reason(Reason)), + EvHandlerState. + +close_reason(closed) -> closed; +close_reason(Reason) -> {closed, Reason}. + +close_streams([], _) -> ok; -close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> - ReplyTo ! {gun_error, self(), StreamRef, {closed, - "The connection was lost."}}, - close_streams(Tail). +close_streams([Stream|Tail], Reason) -> + close_stream(Stream, Reason), + close_streams(Tail, Reason). + +%% @todo Do we want an event for this? +close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> + ReplyTo ! {gun_error, self(), StreamRef, Reason}, + ok. keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> Transport:send(Socket, cow_http2:ping(0)), @@ -429,6 +516,7 @@ request(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, headers => Headers }, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), + %% @todo We should not send an empty DATA frame on empty bodies. {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), @@ -586,8 +674,9 @@ down(#http2_state{streams=Streams}) -> KilledStreams = [Ref || #stream{ref=Ref} <- Streams], {KilledStreams, []}. -terminate(#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine, streams=Streams}, Reason) -> +connection_error(#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine, streams=Streams}, + {connection_error, Reason, _}) -> %% The connection is going away either at the request of the server, %% or because an error occurred in the protocol. Inform the streams. %% @todo We should not send duplicate messages to processes. @@ -597,12 +686,9 @@ terminate(#http2_state{socket=Socket, transport=Transport, _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams], Transport:send(Socket, cow_http2:goaway( cow_http2_machine:get_last_streamid(HTTP2Machine), - terminate_reason(Reason), <<>>)), + Reason, <<>>)), close. -terminate_reason({connection_error, Reason, _}) -> Reason; -terminate_reason({stop, _, _}) -> no_error. - %% Stream functions. error_stream_closed(State, StreamRef, ReplyTo) -> |