From c974b4334e7ab660f9bf95653696c3663c02ead3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 8 Aug 2019 16:33:09 +0200 Subject: Implement graceful shutdown The graceful shutdown is implemented through a new 'closing' state. This state is entered under different circumstances depending on the protocol. The gun:shutdown/1 function is now implemented and documented. It allows shutting down the connection gracefully regardless of the current state of the connection and for all protocols. The behavior is entirely dependent on the protocol. For HTTP/1.1 the connection stays up only until after the current stream is complete; other streams are immediately canceled. For HTTP/2 a GOAWAY frame is sent and existing streams continue to be processed. The connection is closed after all streams are processed and the server's GOAWAY frame is received. For Websocket a close frame is sent. The connection is closed when receiving the server's close frame. In all cases the closing_timeout option defines how long we wait, as a maximum, before closing the connection after the graceful shutdown was started. The graceful shutdown is also initiated when the owner process goes away; when sending an HTTP/1.1 request with the connection: close header; when receiving an HTTP/1.1 response with the connection: close header; when receiving an HTTP/1.0 response without a connection header; when the server sends a GOAWAY HTTP/2 frame; or when we send or receive a Websocket close frame. Along with these changes, the gun:ws_send/2 function now accepts a list of frames as argument. Those frames may include a close frame that initiates the graceful shutdown. --- src/gun_ws.erl | 94 +++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 63 insertions(+), 31 deletions(-) (limited to 'src/gun_ws.erl') diff --git a/src/gun_ws.erl b/src/gun_ws.erl index 42cf049..49911dc 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -19,6 +19,7 @@ -export([init/9]). -export([handle/4]). -export([update_flow/4]). +-export([closing/4]). -export([close/4]). -export([send/4]). -export([down/1]). @@ -38,8 +39,10 @@ stream_ref :: reference(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), + opts = #{} :: map(), %% @todo buffer = <<>> :: binary(), in = head :: head | #payload{} | close, + out = head :: head | close, frag_state = undefined :: cow_ws:frag_state(), utf8_state = 0 :: cow_ws:utf8_state(), extensions = #{} :: cow_ws:extensions(), @@ -53,6 +56,10 @@ check_options(Opts) -> do_check_options([]) -> ok; +do_check_options([{closing_timeout, infinity}|Opts]) -> + do_check_options(Opts); +do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 -> + do_check_options(Opts); do_check_options([{compress, B}|Opts]) when B =:= true; B =:= false -> do_check_options(Opts); do_check_options([{default_protocol, M}|Opts]) when is_atom(M) -> @@ -75,12 +82,15 @@ init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Hand Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, {ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts), {switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef, - socket=Socket, transport=Transport, extensions=Extensions, + socket=Socket, transport=Transport, opts=Opts, extensions=Extensions, flow=InitialFlow, handler=Handler, handler_state=HandlerState}}. %% Do not handle anything if we received a close frame. -handle(_, State=#ws_state{in=close}, _, EvHandlerState) -> - {{state, State}, EvHandlerState}; +%% Initiate or terminate the closing state depending on whether we sent a close yet. +handle(_, State=#ws_state{in=close, out=close}, _, EvHandlerState) -> + {[{state, State}, close], EvHandlerState}; +handle(_, State=#ws_state{in=close}, EvHandler, EvHandlerState) -> + closing(normal, State, EvHandler, EvHandlerState); %% Shortcut for common case when Data is empty after processing a frame. handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) -> maybe_active(State, EvHandlerState); @@ -119,7 +129,7 @@ handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer, more -> maybe_active(State#ws_state{buffer=Data2}, EvHandlerState1); error -> - close({error, badframe}, State, EvHandler, EvHandlerState1) + closing({error, badframe}, State, EvHandler, EvHandlerState1) end; handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey, close_code=CloseCode, unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState, @@ -143,7 +153,7 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}, EvHandlerState); Error = {error, _Reason} -> - close(Error, State, EvHandler, EvHandlerState) + closing(Error, State, EvHandler, EvHandlerState) end. maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) -> @@ -165,11 +175,11 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, }, EvHandlerState0), case cow_ws:make_frame(Type, Payload, CloseCode, FragState) of ping -> - {{state, State}, EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1), - handle(Rest, State, EvHandler, EvHandlerState); + {[], EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1), + handle(Rest, State0, EvHandler, EvHandlerState); {ping, Payload} -> - {{state, State}, EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1), - handle(Rest, State, EvHandler, EvHandlerState); + {[], EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1), + handle(Rest, State0, EvHandler, EvHandlerState); pong -> handle(Rest, State0, EvHandler, EvHandlerState1); {pong, _} -> @@ -200,26 +210,30 @@ update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) -> {active, Flow > 0} ]. -close(Reason, State, EvHandler, EvHandlerState) -> - case Reason of - normal -> - send({close, 1000, <<>>}, State, EvHandler, EvHandlerState); - owner_down -> - send({close, 1001, <<>>}, State, EvHandler, EvHandlerState); - {error, badframe} -> - send({close, 1002, <<>>}, State, EvHandler, EvHandlerState); - {error, badencoding} -> - send({close, 1007, <<>>}, State, EvHandler, EvHandlerState); - %% Socket errors; do nothing. - closed -> - {ok, EvHandlerState}; - {error, _} -> - {ok, EvHandlerState} - end. +%% The user already sent the close frame; do nothing. +closing(_, State=#ws_state{out=close}, _, EvHandlerState) -> + {closing(State), EvHandlerState}; +closing(Reason, State, EvHandler, EvHandlerState) -> + Code = case Reason of + normal -> 1000; + owner_down -> 1001; + shutdown -> 1001; + {error, badframe} -> 1002; + {error, badencoding} -> 1007 + end, + send({close, Code, <<>>}, State, EvHandler, EvHandlerState). +closing(#ws_state{opts=Opts}) -> + Timeout = maps:get(closing_timeout, Opts, 15000), + {closing, Timeout}. + +close(_, _, _, EvHandlerState) -> + EvHandlerState. + +%% Send one frame. send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, - socket=Socket, transport=Transport, extensions=Extensions}, - EvHandler, EvHandlerState0) -> + socket=Socket, transport=Transport, in=In, extensions=Extensions}, + EvHandler, EvHandlerState0) when not is_list(Frame) -> WsSendFrameEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -229,10 +243,28 @@ send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, EvHandlerState1 = EvHandler:ws_send_frame_start(WsSendFrameEvent, EvHandlerState0), Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)), EvHandlerState = EvHandler:ws_send_frame_end(WsSendFrameEvent, EvHandlerState1), - case Frame of - close -> {close, EvHandlerState}; - {close, _, _} -> {close, EvHandlerState}; - _ -> {{state, State}, EvHandlerState} + if + Frame =:= close; element(1, Frame) =:= close -> + {[ + {state, State#ws_state{out=close}}, + %% We can close immediately if we already received a close frame. + case In of + close -> close; + _ -> closing(State) + end + ], EvHandlerState}; + true -> + {[], EvHandlerState} + end; +%% Send many frames. +send([], _, _, EvHandlerState) -> + {[], EvHandlerState}; +send([Frame|Tail], State, EvHandler, EvHandlerState0) -> + case send(Frame, State, EvHandler, EvHandlerState0) of + {[], EvHandlerState} -> + send(Tail, State, EvHandler, EvHandlerState); + Other -> + Other end. %% Websocket has no concept of streams. -- cgit v1.2.3