diff options
author | Loïc Hoguin <[email protected]> | 2019-08-08 16:33:09 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2019-09-05 11:28:07 +0200 |
commit | c974b4334e7ab660f9bf95653696c3663c02ead3 (patch) | |
tree | 9e501a4928b261c4fe9adc74d80c47b6b14ae50a /src/gun_ws.erl | |
parent | 491ddf58c0e14824a741852fdc522b390b306ae2 (diff) | |
download | gun-c974b4334e7ab660f9bf95653696c3663c02ead3.tar.gz gun-c974b4334e7ab660f9bf95653696c3663c02ead3.tar.bz2 gun-c974b4334e7ab660f9bf95653696c3663c02ead3.zip |
Implement graceful shutdown
The graceful shutdown is implemented through a new 'closing'
state. This state is entered under different circumstances
depending on the protocol.
The gun:shutdown/1 function is now implemented and documented.
It allows shutting down the connection gracefully regardless
of the current state of the connection and for all protocols.
The behavior is entirely dependent on the protocol.
For HTTP/1.1 the connection stays up only until after the
current stream is complete; other streams are immediately
canceled.
For HTTP/2 a GOAWAY frame is sent and existing streams
continue to be processed. The connection is closed after
all streams are processed and the server's GOAWAY frame
is received.
For Websocket a close frame is sent. The connection is
closed when receiving the server's close frame.
In all cases the closing_timeout option defines how long
we wait, as a maximum, before closing the connection after
the graceful shutdown was started.
The graceful shutdown is also initiated when the owner
process goes away; when sending an HTTP/1.1 request
with the connection: close header; when receiving an
HTTP/1.1 response with the connection: close header;
when receiving an HTTP/1.0 response without a connection
header; when the server sends a GOAWAY HTTP/2 frame;
or when we send or receive a Websocket close frame.
Along with these changes, the gun:ws_send/2 function
now accepts a list of frames as argument. Those frames
may include a close frame that initiates the graceful
shutdown.
Diffstat (limited to 'src/gun_ws.erl')
-rw-r--r-- | src/gun_ws.erl | 94 |
1 files changed, 63 insertions, 31 deletions
diff --git a/src/gun_ws.erl b/src/gun_ws.erl index 42cf049..49911dc 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -19,6 +19,7 @@ -export([init/9]). -export([handle/4]). -export([update_flow/4]). +-export([closing/4]). -export([close/4]). -export([send/4]). -export([down/1]). @@ -38,8 +39,10 @@ stream_ref :: reference(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), + opts = #{} :: map(), %% @todo buffer = <<>> :: binary(), in = head :: head | #payload{} | close, + out = head :: head | close, frag_state = undefined :: cow_ws:frag_state(), utf8_state = 0 :: cow_ws:utf8_state(), extensions = #{} :: cow_ws:extensions(), @@ -53,6 +56,10 @@ check_options(Opts) -> do_check_options([]) -> ok; +do_check_options([{closing_timeout, infinity}|Opts]) -> + do_check_options(Opts); +do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 -> + do_check_options(Opts); do_check_options([{compress, B}|Opts]) when B =:= true; B =:= false -> do_check_options(Opts); do_check_options([{default_protocol, M}|Opts]) when is_atom(M) -> @@ -75,12 +82,15 @@ init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Hand Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, {ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts), {switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef, - socket=Socket, transport=Transport, extensions=Extensions, + socket=Socket, transport=Transport, opts=Opts, extensions=Extensions, flow=InitialFlow, handler=Handler, handler_state=HandlerState}}. %% Do not handle anything if we received a close frame. -handle(_, State=#ws_state{in=close}, _, EvHandlerState) -> - {{state, State}, EvHandlerState}; +%% Initiate or terminate the closing state depending on whether we sent a close yet. +handle(_, State=#ws_state{in=close, out=close}, _, EvHandlerState) -> + {[{state, State}, close], EvHandlerState}; +handle(_, State=#ws_state{in=close}, EvHandler, EvHandlerState) -> + closing(normal, State, EvHandler, EvHandlerState); %% Shortcut for common case when Data is empty after processing a frame. handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) -> maybe_active(State, EvHandlerState); @@ -119,7 +129,7 @@ handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer, more -> maybe_active(State#ws_state{buffer=Data2}, EvHandlerState1); error -> - close({error, badframe}, State, EvHandler, EvHandlerState1) + closing({error, badframe}, State, EvHandler, EvHandlerState1) end; handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey, close_code=CloseCode, unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState, @@ -143,7 +153,7 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}, EvHandlerState); Error = {error, _Reason} -> - close(Error, State, EvHandler, EvHandlerState) + closing(Error, State, EvHandler, EvHandlerState) end. maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) -> @@ -165,11 +175,11 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, }, EvHandlerState0), case cow_ws:make_frame(Type, Payload, CloseCode, FragState) of ping -> - {{state, State}, EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1), - handle(Rest, State, EvHandler, EvHandlerState); + {[], EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1), + handle(Rest, State0, EvHandler, EvHandlerState); {ping, Payload} -> - {{state, State}, EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1), - handle(Rest, State, EvHandler, EvHandlerState); + {[], EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1), + handle(Rest, State0, EvHandler, EvHandlerState); pong -> handle(Rest, State0, EvHandler, EvHandlerState1); {pong, _} -> @@ -200,26 +210,30 @@ update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) -> {active, Flow > 0} ]. -close(Reason, State, EvHandler, EvHandlerState) -> - case Reason of - normal -> - send({close, 1000, <<>>}, State, EvHandler, EvHandlerState); - owner_down -> - send({close, 1001, <<>>}, State, EvHandler, EvHandlerState); - {error, badframe} -> - send({close, 1002, <<>>}, State, EvHandler, EvHandlerState); - {error, badencoding} -> - send({close, 1007, <<>>}, State, EvHandler, EvHandlerState); - %% Socket errors; do nothing. - closed -> - {ok, EvHandlerState}; - {error, _} -> - {ok, EvHandlerState} - end. +%% The user already sent the close frame; do nothing. +closing(_, State=#ws_state{out=close}, _, EvHandlerState) -> + {closing(State), EvHandlerState}; +closing(Reason, State, EvHandler, EvHandlerState) -> + Code = case Reason of + normal -> 1000; + owner_down -> 1001; + shutdown -> 1001; + {error, badframe} -> 1002; + {error, badencoding} -> 1007 + end, + send({close, Code, <<>>}, State, EvHandler, EvHandlerState). +closing(#ws_state{opts=Opts}) -> + Timeout = maps:get(closing_timeout, Opts, 15000), + {closing, Timeout}. + +close(_, _, _, EvHandlerState) -> + EvHandlerState. + +%% Send one frame. send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, - socket=Socket, transport=Transport, extensions=Extensions}, - EvHandler, EvHandlerState0) -> + socket=Socket, transport=Transport, in=In, extensions=Extensions}, + EvHandler, EvHandlerState0) when not is_list(Frame) -> WsSendFrameEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -229,10 +243,28 @@ send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, EvHandlerState1 = EvHandler:ws_send_frame_start(WsSendFrameEvent, EvHandlerState0), Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)), EvHandlerState = EvHandler:ws_send_frame_end(WsSendFrameEvent, EvHandlerState1), - case Frame of - close -> {close, EvHandlerState}; - {close, _, _} -> {close, EvHandlerState}; - _ -> {{state, State}, EvHandlerState} + if + Frame =:= close; element(1, Frame) =:= close -> + {[ + {state, State#ws_state{out=close}}, + %% We can close immediately if we already received a close frame. + case In of + close -> close; + _ -> closing(State) + end + ], EvHandlerState}; + true -> + {[], EvHandlerState} + end; +%% Send many frames. +send([], _, _, EvHandlerState) -> + {[], EvHandlerState}; +send([Frame|Tail], State, EvHandler, EvHandlerState0) -> + case send(Frame, State, EvHandler, EvHandlerState0) of + {[], EvHandlerState} -> + send(Tail, State, EvHandler, EvHandlerState); + Other -> + Other end. %% Websocket has no concept of streams. |