diff options
author | Loïc Hoguin <[email protected]> | 2019-08-08 16:33:09 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2019-09-05 11:28:07 +0200 |
commit | c974b4334e7ab660f9bf95653696c3663c02ead3 (patch) | |
tree | 9e501a4928b261c4fe9adc74d80c47b6b14ae50a /src/gun.erl | |
parent | 491ddf58c0e14824a741852fdc522b390b306ae2 (diff) | |
download | gun-c974b4334e7ab660f9bf95653696c3663c02ead3.tar.gz gun-c974b4334e7ab660f9bf95653696c3663c02ead3.tar.bz2 gun-c974b4334e7ab660f9bf95653696c3663c02ead3.zip |
Implement graceful shutdown
The graceful shutdown is implemented through a new 'closing'
state. This state is entered under different circumstances
depending on the protocol.
The gun:shutdown/1 function is now implemented and documented.
It allows shutting down the connection gracefully regardless
of the current state of the connection and for all protocols.
The behavior is entirely dependent on the protocol.
For HTTP/1.1 the connection stays up only until after the
current stream is complete; other streams are immediately
canceled.
For HTTP/2 a GOAWAY frame is sent and existing streams
continue to be processed. The connection is closed after
all streams are processed and the server's GOAWAY frame
is received.
For Websocket a close frame is sent. The connection is
closed when receiving the server's close frame.
In all cases the closing_timeout option defines how long
we wait, as a maximum, before closing the connection after
the graceful shutdown was started.
The graceful shutdown is also initiated when the owner
process goes away; when sending an HTTP/1.1 request
with the connection: close header; when receiving an
HTTP/1.1 response with the connection: close header;
when receiving an HTTP/1.0 response without a connection
header; when the server sends a GOAWAY HTTP/2 frame;
or when we send or receive a Websocket close frame.
Along with these changes, the gun:ws_send/2 function
now accepts a list of frames as argument. Those frames
may include a close frame that initiates the graceful
shutdown.
Diffstat (limited to 'src/gun.erl')
-rw-r--r-- | src/gun.erl | 300 |
1 files changed, 177 insertions, 123 deletions
diff --git a/src/gun.erl b/src/gun.erl index 7b06aaf..bb659e7 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -98,6 +98,7 @@ -export([connecting/3]). -export([tls_handshake/3]). -export([connected/3]). +-export([closing/3]). -export([terminate/3]). -type req_headers() :: [{binary() | string() | atom(), iodata()}] @@ -165,19 +166,24 @@ -export_type([req_opts/0]). -type http_opts() :: #{ - keepalive => timeout(), + closing_timeout => timeout(), + flow => pos_integer(), + keepalive => timeout(), transform_header_name => fun((binary()) -> binary()), - version => 'HTTP/1.1' | 'HTTP/1.0' + version => 'HTTP/1.1' | 'HTTP/1.0' }. -export_type([http_opts/0]). -type http2_opts() :: #{ + closing_timeout => timeout(), + flow => pos_integer(), keepalive => timeout() }. -export_type([http2_opts/0]). %% @todo keepalive -type ws_opts() :: #{ + closing_timeout => timeout(), compress => boolean(), flow => pos_integer(), protocols => [{binary(), module()}] @@ -186,7 +192,7 @@ -record(state, { owner :: pid(), - owner_ref :: reference(), + status :: {up, reference()} | {down, any()} | shutdown, host :: inet:hostname() | inet:ip_address(), port :: inet:port_number(), origin_scheme :: binary(), @@ -781,7 +787,7 @@ init({Owner, Host, Port, Opts}) -> origin_port => Port, opts => Opts }, EvHandlerState0), - State = #state{owner=Owner, owner_ref=OwnerRef, + State = #state{owner=Owner, status={up, OwnerRef}, host=Host, port=Port, origin_scheme=OriginScheme, origin_host=Host, origin_port=Port, opts=Opts, transport=Transport, messages=Transport:messages(), @@ -875,11 +881,7 @@ connecting(_, {retries, Retries, LookupInfo}, State=#state{opts=Opts, }, EvHandlerState1), {next_state, not_connected, State#state{event_handler_state=EvHandlerState}, {next_event, internal, {retries, Retries, Reason}}} - end; -connecting({call, From}, {stream_info, _}, _) -> - {keep_state_and_data, {reply, From, {error, not_connected}}}; -connecting(Type, Event, State) -> - handle_common(Type, Event, ?FUNCTION_NAME, State). + end. tls_handshake(_, {retries, Retries, Socket0}, State=#state{opts=Opts, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> @@ -910,11 +912,7 @@ tls_handshake(_, {retries, Retries, Socket0}, State=#state{opts=Opts, }, EvHandlerState1), {next_state, not_connected, State#state{event_handler_state=EvHandlerState}, {next_event, internal, {retries, Retries, Reason}}} - end; -tls_handshake({call, From}, {stream_info, _}, _) -> - {keep_state_and_data, {reply, From, {error, not_connected}}}; -tls_handshake(Type, Event, State) -> - handle_common(Type, Event, ?FUNCTION_NAME, State). + end. ensure_alpn(Protocols0, TransOpts) -> Protocols = [case P of @@ -937,28 +935,6 @@ connected(internal, {connected, Socket, Protocol}, Owner ! {gun_up, self(), Protocol:name()}, {keep_state, keepalive_timeout(active(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}))}; -%% Socket events. -connected(info, {OK, Socket, Data}, State0=#state{socket=Socket, messages={OK, _, _}, - protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0), - case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of - {keep_state, State} -> - {keep_state, active(State)}; - Res -> - Res - end; -connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) -> - disconnect(State, closed); -connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) -> - disconnect(State, {error, Reason}); -%% Timeouts. -%% @todo HTTP/2 requires more timeouts than just the keepalive timeout. -%% We should have a timeout function in protocols that deal with -%% received timeouts. Currently the timeout messages are ignored. -connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> - ProtoState2 = Protocol:keepalive(ProtoState), - {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})}; %% Public HTTP interface. connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, @@ -976,14 +952,6 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, Initi StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; -%% @todo Do we want to reject ReplyTo if it's not the process -%% who initiated the connection? For both data and cancel. -connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, - State=#state{protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {ProtoState2, EvHandlerState} = Protocol:data(ProtoState, - StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), - {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow}, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> %% The protocol option has been deprecated in favor of the protocols option. @@ -1006,41 +974,6 @@ connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow end, ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers, InitialFlow), {keep_state, State#state{protocol_state=ProtoState2}}; -%% When using gun_tls_proxy we need a separate message to know whether -%% the handshake succeeded and whether we need to switch to a different protocol. -connected(info, {gun_tls_proxy, Socket, {ok, NewProtocol}, HandshakeEvent}, - State0=#state{socket=Socket, protocol=CurrentProtocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ - socket => Socket, - protocol => NewProtocol:name() - }, EvHandlerState0), - State = State0#state{event_handler_state=EvHandlerState}, - case NewProtocol of - CurrentProtocol -> {keep_state, State}; - _ -> commands([{switch_protocol, NewProtocol, ProtoState}], State) - end; -connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent}, - State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ - error => Reason - }, EvHandlerState0), - commands([Error], State#state{event_handler_state=EvHandlerState}); -connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, State0=#state{ - protocol=Protocol, protocol_state=ProtoState}) -> - Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow), - case commands(Commands, State0) of - {keep_state, State} -> - {keep_state, active(State)}; - Res -> - Res - end; -connected(cast, {cancel, ReplyTo, StreamRef}, State=#state{ - protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState, - StreamRef, ReplyTo, EvHandler, EvHandlerState0), - {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; %% Public Websocket interface. %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. %% An interface would also make sure that HTTP/1.0 can't upgrade. @@ -1067,40 +1000,148 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "Websocket is only supported over HTTP/1.1."}}, keep_state_and_data; -connected(cast, {ws_send, Owner, Frame}, State=#state{ +connected(cast, {ws_send, Owner, Frames}, State=#state{ owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {Commands, EvHandlerState} = Protocol:send(Frame, ProtoState, EvHandler, EvHandlerState0), + {Commands, EvHandlerState} = Protocol:send(Frames, ProtoState, EvHandler, EvHandlerState0), commands(Commands, State#state{event_handler_state=EvHandlerState}); connected(cast, {ws_send, ReplyTo, _}, _) -> ReplyTo ! {gun_error, self(), {badstate, "Connection needs to be upgraded to Websocket " "before the gun:ws_send/1 function can be used."}}, keep_state_and_data; -connected({call, From}, {stream_info, StreamRef}, +connected(Type, Event, State) -> + handle_common_connected(Type, Event, ?FUNCTION_NAME, State). + +%% Switch to the graceful connection close state. +closing(State=#state{protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) -> + {Commands, EvHandlerState} = Protocol:closing(Reason, ProtoState, EvHandler, EvHandlerState0), + commands(Commands, State#state{event_handler_state=EvHandlerState}). + +%% @todo Should explicitly reject ws_send in this state? +closing(state_timeout, closing_timeout, State=#state{status=Status}) -> + Reason = case Status of + shutdown -> shutdown; + {down, _} -> owner_down; + _ -> normal + end, + disconnect(State, Reason); +closing(Type, Event, State) -> + handle_common_connected(Type, Event, ?FUNCTION_NAME, State). + +%% Common events when we have a connection. +%% +%% Socket events. +handle_common_connected(info, {OK, Socket, Data}, _, State0=#state{socket=Socket, messages={OK, _, _}, + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0), + case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of + {keep_state, State} -> + {keep_state, active(State)}; + {next_state, closing, State, Actions} -> + {next_state, closing, active(State), Actions}; + Res -> + Res + end; +handle_common_connected(info, {Closed, Socket}, _, State=#state{socket=Socket, messages={_, Closed, _}}) -> + disconnect(State, closed); +handle_common_connected(info, {Error, Socket, Reason}, _, State=#state{socket=Socket, messages={_, _, Error}}) -> + disconnect(State, {error, Reason}); +%% Timeouts. +%% @todo HTTP/2 requires more timeouts than just the keepalive timeout. +%% We should have a timeout function in protocols that deal with +%% received timeouts. Currently the timeout messages are ignored. +handle_common_connected(info, keepalive, _, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> + ProtoState2 = Protocol:keepalive(ProtoState), + {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})}; +%% When using gun_tls_proxy we need a separate message to know whether +%% the handshake succeeded and whether we need to switch to a different protocol. +handle_common_connected(info, {gun_tls_proxy, Socket, {ok, NewProtocol}, HandshakeEvent}, _, + State0=#state{socket=Socket, protocol=CurrentProtocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ + socket => Socket, + protocol => NewProtocol:name() + }, EvHandlerState0), + State = State0#state{event_handler_state=EvHandlerState}, + case NewProtocol of + CurrentProtocol -> {keep_state, State}; + _ -> commands([{switch_protocol, NewProtocol, ProtoState}], State) + end; +handle_common_connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent}, _, + State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ + error => Reason + }, EvHandlerState0), + commands([Error], State#state{event_handler_state=EvHandlerState}); +%% @todo Do we want to reject ReplyTo if it's not the process +%% who initiated the connection? For both data and cancel. +handle_common_connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, _, + State=#state{protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:data(ProtoState, + StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; +handle_common_connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, _, State0=#state{ + protocol=Protocol, protocol_state=ProtoState}) -> + Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow), + case commands(Commands, State0) of + {keep_state, State} -> + {keep_state, active(State)}; + Res -> + Res + end; +handle_common_connected(cast, {cancel, ReplyTo, StreamRef}, _, State=#state{ + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState, + StreamRef, ReplyTo, EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; +handle_common_connected({call, From}, {stream_info, StreamRef}, _, #state{protocol=Protocol, protocol_state=ProtoState}) -> {keep_state_and_data, {reply, From, Protocol:stream_info(ProtoState, StreamRef)}}; -connected(Type, Event, State) -> - handle_common(Type, Event, ?FUNCTION_NAME, State). +handle_common_connected(Type, Event, StateName, State) -> + handle_common(Type, Event, StateName, State). %% Common events. -handle_common(cast, {shutdown, Owner}, _, #state{owner=Owner}) -> - %% @todo Graceful shutdown. - stop; +handle_common(cast, {shutdown, Owner}, StateName, State=#state{ + owner=Owner, status=Status, socket=Socket, transport=Transport, protocol=Protocol}) -> + case {Socket, Protocol} of + {undefined, _} -> + {stop, shutdown}; + {_, undefined} -> + %% @todo This is missing the disconnect event. + Transport:close(Socket), + {stop, shutdown}; + _ when StateName =:= closing, element(1, Status) =:= up -> + {keep_state, status(State, shutdown)}; + _ when StateName =:= closing -> + keep_state_and_data; + _ -> + closing(status(State, shutdown), shutdown) + end; %% We stop when the owner is down. -handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, _, State=#state{ - owner=Owner, owner_ref=OwnerRef, socket=Socket, transport=Transport, - protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {_, EvHandlerState} = case Protocol of - undefined -> {ok, EvHandlerState0}; - _ -> Protocol:close(owner_down, ProtoState, EvHandler, EvHandlerState0) - end, - _ = case Socket of - undefined -> ok; - _ -> Transport:close(Socket) - end, - owner_down(Reason, State#state{event_handler_state=EvHandlerState}); +%% @todo We need to demonitor/flush when the status is no longer up. +handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, StateName, State=#state{ + owner=Owner, status={up, OwnerRef}, socket=Socket, transport=Transport, protocol=Protocol}) -> + case Socket of + undefined -> + owner_down(Reason, State); + _ -> + case Protocol of + undefined -> + %% @todo This is missing the disconnect event. + Transport:close(Socket), + owner_down(Reason, State); + %% We are already closing so no need to initiate closing again. + _ when StateName =:= closing -> + {keep_state, status(State, {down, Reason})}; + _ -> + closing(status(State, {down, Reason}), owner_down) + end + end; handle_common({call, From}, _, _, _) -> {keep_state_and_data, {reply, From, {error, bad_call}}}; %% @todo The ReplyTo patch disabled the notowner behavior. @@ -1123,6 +1164,9 @@ commands([], State) -> {keep_state, State}; commands([close|_], State) -> disconnect(State, normal); +commands([{closing, Timeout}|_], State) -> + {next_state, closing, keepalive_cancel(State), + {state_timeout, Timeout, closing_timeout}}; commands([Error={error, _}|_], State) -> disconnect(State, Error); commands([{active, Active}|Tail], State) when is_boolean(Active) -> @@ -1176,33 +1220,37 @@ commands([{switch_protocol, Protocol, _ProtoState0}|Tail], State=#state{ commands(Tail, keepalive_timeout(State#state{protocol=Protocol, protocol_state=ProtoState, event_handler_state=EvHandlerState})). -disconnect(State=#state{owner=Owner, opts=Opts, +disconnect(State0=#state{owner=Owner, status=Status, opts=Opts, socket=Socket, transport=Transport, protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) -> - {_, EvHandlerState1} = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0), - %% @todo Need a special state for orderly shutdown of a connection. + EvHandlerState1 = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0), _ = Transport:close(Socket), - %% We closed the socket, discard any remaining socket events. - disconnect_flush(State), - %% @todo Stop keepalive timeout, flush message. - {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState), - Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams}, - %% Trigger the disconnect event. - DisconnectEvent = #{ - reason => Reason - }, - EvHandlerState = EvHandler:disconnect(DisconnectEvent, EvHandlerState1), - Retry = maps:get(retry, Opts, 5), - case Retry of - 0 -> - {stop, {shutdown, Reason}, State#state{event_handler_state=EvHandlerState}}; - _ -> - {next_state, not_connected, - keepalive_cancel(State#state{socket=undefined, - protocol=undefined, protocol_state=undefined, - event_handler_state=EvHandlerState}), - {next_event, internal, {retries, Retry - 1, Reason}}} + EvHandlerState = EvHandler:disconnect(#{reason => Reason}, EvHandlerState1), + State = State0#state{event_handler_state=EvHandlerState}, + case Status of + {down, DownReason} -> + owner_down(DownReason, State); + shutdown -> + {stop, shutdown, State}; + {up, _} -> + %% We closed the socket, discard any remaining socket events. + disconnect_flush(State), + %% @todo Stop keepalive timeout, flush message. + {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState), + Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams}, + Retry = maps:get(retry, Opts, 5), + case Retry of + 0 when Reason =:= normal -> + {stop, normal, State}; + 0 -> + {stop, {shutdown, Reason}, State}; + _ -> + {next_state, not_connected, + keepalive_cancel(State#state{socket=undefined, + protocol=undefined, protocol_state=undefined}), + {next_event, internal, {retries, Retry - 1, Reason}}} + end end. disconnect_flush(State=#state{socket=Socket, messages={OK, Closed, Error}}) -> @@ -1220,6 +1268,12 @@ active(State=#state{socket=Socket, transport=Transport}) -> Transport:setopts(Socket, [{active, once}]), State. +status(State=#state{status={up, OwnerRef}}, NewStatus) -> + demonitor(OwnerRef, [flush]), + State#state{status=NewStatus}; +status(State, NewStatus) -> + State#state{status=NewStatus}. + keepalive_timeout(State=#state{opts=Opts, protocol=Protocol}) -> {ProtoOptsKey, Default} = case Protocol of gun_http -> {http_opts, infinity}; |