aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-08-08 16:33:09 +0200
committerLoïc Hoguin <[email protected]>2019-09-05 11:28:07 +0200
commitc974b4334e7ab660f9bf95653696c3663c02ead3 (patch)
tree9e501a4928b261c4fe9adc74d80c47b6b14ae50a
parent491ddf58c0e14824a741852fdc522b390b306ae2 (diff)
downloadgun-c974b4334e7ab660f9bf95653696c3663c02ead3.tar.gz
gun-c974b4334e7ab660f9bf95653696c3663c02ead3.tar.bz2
gun-c974b4334e7ab660f9bf95653696c3663c02ead3.zip
Implement graceful shutdown
The graceful shutdown is implemented through a new 'closing' state. This state is entered under different circumstances depending on the protocol. The gun:shutdown/1 function is now implemented and documented. It allows shutting down the connection gracefully regardless of the current state of the connection and for all protocols. The behavior is entirely dependent on the protocol. For HTTP/1.1 the connection stays up only until after the current stream is complete; other streams are immediately canceled. For HTTP/2 a GOAWAY frame is sent and existing streams continue to be processed. The connection is closed after all streams are processed and the server's GOAWAY frame is received. For Websocket a close frame is sent. The connection is closed when receiving the server's close frame. In all cases the closing_timeout option defines how long we wait, as a maximum, before closing the connection after the graceful shutdown was started. The graceful shutdown is also initiated when the owner process goes away; when sending an HTTP/1.1 request with the connection: close header; when receiving an HTTP/1.1 response with the connection: close header; when receiving an HTTP/1.0 response without a connection header; when the server sends a GOAWAY HTTP/2 frame; or when we send or receive a Websocket close frame. Along with these changes, the gun:ws_send/2 function now accepts a list of frames as argument. Those frames may include a close frame that initiates the graceful shutdown.
-rw-r--r--doc/src/manual/gun.asciidoc33
-rw-r--r--doc/src/manual/gun.close.asciidoc3
-rw-r--r--doc/src/manual/gun.shutdown.asciidoc67
-rw-r--r--doc/src/manual/gun.ws_send.asciidoc19
-rw-r--r--src/gun.erl300
-rw-r--r--src/gun_event.erl5
-rw-r--r--src/gun_http.erl56
-rw-r--r--src/gun_http2.erl144
-rw-r--r--src/gun_ws.erl94
-rw-r--r--test/gun_SUITE.erl88
-rw-r--r--test/handlers/delayed_hello_h.erl11
-rw-r--r--test/handlers/delayed_push_h.erl13
-rw-r--r--test/handlers/ws_frozen_h.erl23
-rw-r--r--test/handlers/ws_timeout_close_h.erl25
-rw-r--r--test/rfc7540_SUITE.erl2
-rw-r--r--test/shutdown_SUITE.erl609
-rw-r--r--test/ws_SUITE.erl27
17 files changed, 1214 insertions, 305 deletions
diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc
index acc1454..b9fbbc2 100644
--- a/doc/src/manual/gun.asciidoc
+++ b/doc/src/manual/gun.asciidoc
@@ -16,7 +16,7 @@ Connection:
* link:man:gun:open(3)[gun:open(3)] - Open a connection to the given host and port
* link:man:gun:open_unix(3)[gun:open_unix(3)] - Open a connection to the given Unix domain socket
-// @todo * link:man:gun:shutdown(3)[gun:shutdown(3)] - Gracefully close the connection
+* link:man:gun:shutdown(3)[gun:shutdown(3)] - Gracefully close the connection
* link:man:gun:close(3)[gun:close(3)] - Brutally close the connection
* link:man:gun:info(3)[gun:info(3)] - Obtain information about the connection
@@ -144,6 +144,7 @@ Handshake timeout for tunneled TLS connections.
[source,erlang]
----
http_opts() :: #{
+ closing_timeout => timeout(),
flow => pos_integer(),
keepalive => timeout(),
transform_header_name => fun((binary()) -> binary()),
@@ -157,6 +158,12 @@ The default value is given next to the option name:
// @todo Document content_handlers and gun_sse_h.
+closing_timeout (15000)::
+
+Time to wait before brutally closing the connection when a
+graceful shutdown was requested via a call to
+link:man:gun:shutdown(3)[gun:shutdown(3)].
+
flow - see below::
The initial flow control value for all HTTP/1.1 streams.
@@ -188,8 +195,9 @@ HTTP version to use.
[source,erlang]
----
http2_opts() :: #{
- flow => pos_integer(),
- keepalive => timeout()
+ closing_timeout => timeout(),
+ flow => pos_integer(),
+ keepalive => timeout()
}
----
@@ -199,6 +207,12 @@ The default value is given next to the option name:
// @todo Document content_handlers and gun_sse_h.
+closing_timeout (15000)::
+
+Time to wait before brutally closing the connection when a
+graceful shutdown was requested either via a call to
+link:man:gun:shutdown(3)[gun:shutdown(3)] or by the server.
+
flow - see below::
The initial flow control value for all HTTP/2 streams.
@@ -364,9 +378,10 @@ The pid of the process that will receive the response messages.
[source,erlang]
----
ws_opts() :: #{
- compress => boolean(),
- flow => pos_integer(),
- protocols => [{binary(), module()}]
+ closing_timeout => timeout(),
+ compress => boolean(),
+ flow => pos_integer(),
+ protocols => [{binary(), module()}]
}
----
@@ -374,6 +389,12 @@ Configuration for the Websocket protocol.
The default value is given next to the option name:
+closing_timeout (15000)::
+
+Time to wait before brutally closing the connection when a
+graceful shutdown was requested either via a call to
+link:man:gun:shutdown(3)[gun:shutdown(3)] or by the server.
+
compress (false)::
Whether to enable permessage-deflate compression. This does
diff --git a/doc/src/manual/gun.close.asciidoc b/doc/src/manual/gun.close.asciidoc
index 20fd1bf..cdbe05f 100644
--- a/doc/src/manual/gun.close.asciidoc
+++ b/doc/src/manual/gun.close.asciidoc
@@ -41,4 +41,5 @@ ok = gun:close(ConnPid).
link:man:gun(3)[gun(3)],
link:man:gun:open(3)[gun:open(3)],
-link:man:gun:open_unix(3)[gun:open_unix(3)]
+link:man:gun:open_unix(3)[gun:open_unix(3)],
+link:man:gun:shutdown(3)[gun:shutdown(3)]
diff --git a/doc/src/manual/gun.shutdown.asciidoc b/doc/src/manual/gun.shutdown.asciidoc
new file mode 100644
index 0000000..94db39d
--- /dev/null
+++ b/doc/src/manual/gun.shutdown.asciidoc
@@ -0,0 +1,67 @@
+= gun:shutdown(3)
+
+== Name
+
+gun:shutdown - Gracefully close the connection
+
+== Description
+
+[source,erlang]
+----
+shutdown(ConnPid) -> ok
+
+ConnPid :: pid()
+----
+
+Gracefully close the connection.
+
+Gun will wait for up to `closing_timeout` milliseconds
+before brutally closing the connection. The graceful
+shutdown mechanism varies between the different protocols:
+
+* For HTTP/1.1 there is no such mechanism and Gun will
+ close the connection once the current response is
+ received. Any pipelined requests are immediately
+ terminated.
+
+* For HTTP/2 Gun will send a GOAWAY frame and wait for
+ the existing streams to terminate.
+
+* For Websocket Gun will send a close frame and wait
+ for the server's close frame before closing the
+ connection.
+
+The function returns immediately. The connection may
+therefore still be up for some time after this call.
+
+Gun will not attempt to reconnect once graceful
+shutdown has been initiated.
+
+== Arguments
+
+ConnPid::
+
+The pid of the Gun connection process.
+
+== Return value
+
+The atom `ok` is returned.
+
+== Changelog
+
+* *2.0*: Function introduced.
+
+== Examples
+
+.Gracefully shutdown the connection
+[source,erlang]
+----
+ok = gun:shutdown(ConnPid).
+----
+
+== See also
+
+link:man:gun(3)[gun(3)],
+link:man:gun:open(3)[gun:open(3)],
+link:man:gun:open_unix(3)[gun:open_unix(3)],
+link:man:gun:close(3)[gun:close(3)]
diff --git a/doc/src/manual/gun.ws_send.asciidoc b/doc/src/manual/gun.ws_send.asciidoc
index fbb1025..b39f3f0 100644
--- a/doc/src/manual/gun.ws_send.asciidoc
+++ b/doc/src/manual/gun.ws_send.asciidoc
@@ -30,8 +30,7 @@ The pid of the Gun connection process.
Frames::
-A Websocket frame.
-// @todo One or more Websocket frame(s).
+One or more Websocket frame(s).
== Return value
@@ -49,14 +48,14 @@ The atom `ok` is returned.
gun:ws_send(ConnPid, {text, <<"Hello world!">>}).
----
-//.Send many frames including a close frame
-//[source,erlang]
-//----
-//gun:ws_send(ConnPid, [
-// {text, <<"See you later, world!">>},
-// close
-//]).
-//----
+.Send many frames including a close frame
+[source,erlang]
+----
+gun:ws_send(ConnPid, [
+ {text, <<"See you later, world!">>},
+ close
+]).
+----
== See also
diff --git a/src/gun.erl b/src/gun.erl
index 7b06aaf..bb659e7 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -98,6 +98,7 @@
-export([connecting/3]).
-export([tls_handshake/3]).
-export([connected/3]).
+-export([closing/3]).
-export([terminate/3]).
-type req_headers() :: [{binary() | string() | atom(), iodata()}]
@@ -165,19 +166,24 @@
-export_type([req_opts/0]).
-type http_opts() :: #{
- keepalive => timeout(),
+ closing_timeout => timeout(),
+ flow => pos_integer(),
+ keepalive => timeout(),
transform_header_name => fun((binary()) -> binary()),
- version => 'HTTP/1.1' | 'HTTP/1.0'
+ version => 'HTTP/1.1' | 'HTTP/1.0'
}.
-export_type([http_opts/0]).
-type http2_opts() :: #{
+ closing_timeout => timeout(),
+ flow => pos_integer(),
keepalive => timeout()
}.
-export_type([http2_opts/0]).
%% @todo keepalive
-type ws_opts() :: #{
+ closing_timeout => timeout(),
compress => boolean(),
flow => pos_integer(),
protocols => [{binary(), module()}]
@@ -186,7 +192,7 @@
-record(state, {
owner :: pid(),
- owner_ref :: reference(),
+ status :: {up, reference()} | {down, any()} | shutdown,
host :: inet:hostname() | inet:ip_address(),
port :: inet:port_number(),
origin_scheme :: binary(),
@@ -781,7 +787,7 @@ init({Owner, Host, Port, Opts}) ->
origin_port => Port,
opts => Opts
}, EvHandlerState0),
- State = #state{owner=Owner, owner_ref=OwnerRef,
+ State = #state{owner=Owner, status={up, OwnerRef},
host=Host, port=Port, origin_scheme=OriginScheme,
origin_host=Host, origin_port=Port, opts=Opts,
transport=Transport, messages=Transport:messages(),
@@ -875,11 +881,7 @@ connecting(_, {retries, Retries, LookupInfo}, State=#state{opts=Opts,
}, EvHandlerState1),
{next_state, not_connected, State#state{event_handler_state=EvHandlerState},
{next_event, internal, {retries, Retries, Reason}}}
- end;
-connecting({call, From}, {stream_info, _}, _) ->
- {keep_state_and_data, {reply, From, {error, not_connected}}};
-connecting(Type, Event, State) ->
- handle_common(Type, Event, ?FUNCTION_NAME, State).
+ end.
tls_handshake(_, {retries, Retries, Socket0}, State=#state{opts=Opts,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
@@ -910,11 +912,7 @@ tls_handshake(_, {retries, Retries, Socket0}, State=#state{opts=Opts,
}, EvHandlerState1),
{next_state, not_connected, State#state{event_handler_state=EvHandlerState},
{next_event, internal, {retries, Retries, Reason}}}
- end;
-tls_handshake({call, From}, {stream_info, _}, _) ->
- {keep_state_and_data, {reply, From, {error, not_connected}}};
-tls_handshake(Type, Event, State) ->
- handle_common(Type, Event, ?FUNCTION_NAME, State).
+ end.
ensure_alpn(Protocols0, TransOpts) ->
Protocols = [case P of
@@ -937,28 +935,6 @@ connected(internal, {connected, Socket, Protocol},
Owner ! {gun_up, self(), Protocol:name()},
{keep_state, keepalive_timeout(active(State#state{socket=Socket,
protocol=Protocol, protocol_state=ProtoState}))};
-%% Socket events.
-connected(info, {OK, Socket, Data}, State0=#state{socket=Socket, messages={OK, _, _},
- protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0),
- case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of
- {keep_state, State} ->
- {keep_state, active(State)};
- Res ->
- Res
- end;
-connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) ->
- disconnect(State, closed);
-connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) ->
- disconnect(State, {error, Reason});
-%% Timeouts.
-%% @todo HTTP/2 requires more timeouts than just the keepalive timeout.
-%% We should have a timeout function in protocols that deal with
-%% received timeouts. Currently the timeout messages are ignored.
-connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
- ProtoState2 = Protocol:keepalive(ProtoState),
- {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})};
%% Public HTTP interface.
connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
@@ -976,14 +952,6 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, Initi
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body,
InitialFlow, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
-%% @todo Do we want to reject ReplyTo if it's not the process
-%% who initiated the connection? For both data and cancel.
-connected(cast, {data, ReplyTo, StreamRef, IsFin, Data},
- State=#state{protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {ProtoState2, EvHandlerState} = Protocol:data(ProtoState,
- StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
- {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow},
State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
%% The protocol option has been deprecated in favor of the protocols option.
@@ -1006,41 +974,6 @@ connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow
end,
ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers, InitialFlow),
{keep_state, State#state{protocol_state=ProtoState2}};
-%% When using gun_tls_proxy we need a separate message to know whether
-%% the handshake succeeded and whether we need to switch to a different protocol.
-connected(info, {gun_tls_proxy, Socket, {ok, NewProtocol}, HandshakeEvent},
- State0=#state{socket=Socket, protocol=CurrentProtocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
- socket => Socket,
- protocol => NewProtocol:name()
- }, EvHandlerState0),
- State = State0#state{event_handler_state=EvHandlerState},
- case NewProtocol of
- CurrentProtocol -> {keep_state, State};
- _ -> commands([{switch_protocol, NewProtocol, ProtoState}], State)
- end;
-connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent},
- State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
- error => Reason
- }, EvHandlerState0),
- commands([Error], State#state{event_handler_state=EvHandlerState});
-connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, State0=#state{
- protocol=Protocol, protocol_state=ProtoState}) ->
- Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow),
- case commands(Commands, State0) of
- {keep_state, State} ->
- {keep_state, active(State)};
- Res ->
- Res
- end;
-connected(cast, {cancel, ReplyTo, StreamRef}, State=#state{
- protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState,
- StreamRef, ReplyTo, EvHandler, EvHandlerState0),
- {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
%% Public Websocket interface.
%% @todo Maybe make an interface in the protocol module instead of checking on protocol name.
%% An interface would also make sure that HTTP/1.0 can't upgrade.
@@ -1067,40 +1000,148 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) ->
ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"Websocket is only supported over HTTP/1.1."}},
keep_state_and_data;
-connected(cast, {ws_send, Owner, Frame}, State=#state{
+connected(cast, {ws_send, Owner, Frames}, State=#state{
owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {Commands, EvHandlerState} = Protocol:send(Frame, ProtoState, EvHandler, EvHandlerState0),
+ {Commands, EvHandlerState} = Protocol:send(Frames, ProtoState, EvHandler, EvHandlerState0),
commands(Commands, State#state{event_handler_state=EvHandlerState});
connected(cast, {ws_send, ReplyTo, _}, _) ->
ReplyTo ! {gun_error, self(), {badstate,
"Connection needs to be upgraded to Websocket "
"before the gun:ws_send/1 function can be used."}},
keep_state_and_data;
-connected({call, From}, {stream_info, StreamRef},
+connected(Type, Event, State) ->
+ handle_common_connected(Type, Event, ?FUNCTION_NAME, State).
+
+%% Switch to the graceful connection close state.
+closing(State=#state{protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) ->
+ {Commands, EvHandlerState} = Protocol:closing(Reason, ProtoState, EvHandler, EvHandlerState0),
+ commands(Commands, State#state{event_handler_state=EvHandlerState}).
+
+%% @todo Should explicitly reject ws_send in this state?
+closing(state_timeout, closing_timeout, State=#state{status=Status}) ->
+ Reason = case Status of
+ shutdown -> shutdown;
+ {down, _} -> owner_down;
+ _ -> normal
+ end,
+ disconnect(State, Reason);
+closing(Type, Event, State) ->
+ handle_common_connected(Type, Event, ?FUNCTION_NAME, State).
+
+%% Common events when we have a connection.
+%%
+%% Socket events.
+handle_common_connected(info, {OK, Socket, Data}, _, State0=#state{socket=Socket, messages={OK, _, _},
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0),
+ case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of
+ {keep_state, State} ->
+ {keep_state, active(State)};
+ {next_state, closing, State, Actions} ->
+ {next_state, closing, active(State), Actions};
+ Res ->
+ Res
+ end;
+handle_common_connected(info, {Closed, Socket}, _, State=#state{socket=Socket, messages={_, Closed, _}}) ->
+ disconnect(State, closed);
+handle_common_connected(info, {Error, Socket, Reason}, _, State=#state{socket=Socket, messages={_, _, Error}}) ->
+ disconnect(State, {error, Reason});
+%% Timeouts.
+%% @todo HTTP/2 requires more timeouts than just the keepalive timeout.
+%% We should have a timeout function in protocols that deal with
+%% received timeouts. Currently the timeout messages are ignored.
+handle_common_connected(info, keepalive, _, State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
+ ProtoState2 = Protocol:keepalive(ProtoState),
+ {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})};
+%% When using gun_tls_proxy we need a separate message to know whether
+%% the handshake succeeded and whether we need to switch to a different protocol.
+handle_common_connected(info, {gun_tls_proxy, Socket, {ok, NewProtocol}, HandshakeEvent}, _,
+ State0=#state{socket=Socket, protocol=CurrentProtocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
+ socket => Socket,
+ protocol => NewProtocol:name()
+ }, EvHandlerState0),
+ State = State0#state{event_handler_state=EvHandlerState},
+ case NewProtocol of
+ CurrentProtocol -> {keep_state, State};
+ _ -> commands([{switch_protocol, NewProtocol, ProtoState}], State)
+ end;
+handle_common_connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent}, _,
+ State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
+ error => Reason
+ }, EvHandlerState0),
+ commands([Error], State#state{event_handler_state=EvHandlerState});
+%% @todo Do we want to reject ReplyTo if it's not the process
+%% who initiated the connection? For both data and cancel.
+handle_common_connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, _,
+ State=#state{protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {ProtoState2, EvHandlerState} = Protocol:data(ProtoState,
+ StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
+ {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
+handle_common_connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, _, State0=#state{
+ protocol=Protocol, protocol_state=ProtoState}) ->
+ Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow),
+ case commands(Commands, State0) of
+ {keep_state, State} ->
+ {keep_state, active(State)};
+ Res ->
+ Res
+ end;
+handle_common_connected(cast, {cancel, ReplyTo, StreamRef}, _, State=#state{
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState,
+ StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
+handle_common_connected({call, From}, {stream_info, StreamRef}, _,
#state{protocol=Protocol, protocol_state=ProtoState}) ->
{keep_state_and_data, {reply, From, Protocol:stream_info(ProtoState, StreamRef)}};
-connected(Type, Event, State) ->
- handle_common(Type, Event, ?FUNCTION_NAME, State).
+handle_common_connected(Type, Event, StateName, State) ->
+ handle_common(Type, Event, StateName, State).
%% Common events.
-handle_common(cast, {shutdown, Owner}, _, #state{owner=Owner}) ->
- %% @todo Graceful shutdown.
- stop;
+handle_common(cast, {shutdown, Owner}, StateName, State=#state{
+ owner=Owner, status=Status, socket=Socket, transport=Transport, protocol=Protocol}) ->
+ case {Socket, Protocol} of
+ {undefined, _} ->
+ {stop, shutdown};
+ {_, undefined} ->
+ %% @todo This is missing the disconnect event.
+ Transport:close(Socket),
+ {stop, shutdown};
+ _ when StateName =:= closing, element(1, Status) =:= up ->
+ {keep_state, status(State, shutdown)};
+ _ when StateName =:= closing ->
+ keep_state_and_data;
+ _ ->
+ closing(status(State, shutdown), shutdown)
+ end;
%% We stop when the owner is down.
-handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, _, State=#state{
- owner=Owner, owner_ref=OwnerRef, socket=Socket, transport=Transport,
- protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {_, EvHandlerState} = case Protocol of
- undefined -> {ok, EvHandlerState0};
- _ -> Protocol:close(owner_down, ProtoState, EvHandler, EvHandlerState0)
- end,
- _ = case Socket of
- undefined -> ok;
- _ -> Transport:close(Socket)
- end,
- owner_down(Reason, State#state{event_handler_state=EvHandlerState});
+%% @todo We need to demonitor/flush when the status is no longer up.
+handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, StateName, State=#state{
+ owner=Owner, status={up, OwnerRef}, socket=Socket, transport=Transport, protocol=Protocol}) ->
+ case Socket of
+ undefined ->
+ owner_down(Reason, State);
+ _ ->
+ case Protocol of
+ undefined ->
+ %% @todo This is missing the disconnect event.
+ Transport:close(Socket),
+ owner_down(Reason, State);
+ %% We are already closing so no need to initiate closing again.
+ _ when StateName =:= closing ->
+ {keep_state, status(State, {down, Reason})};
+ _ ->
+ closing(status(State, {down, Reason}), owner_down)
+ end
+ end;
handle_common({call, From}, _, _, _) ->
{keep_state_and_data, {reply, From, {error, bad_call}}};
%% @todo The ReplyTo patch disabled the notowner behavior.
@@ -1123,6 +1164,9 @@ commands([], State) ->
{keep_state, State};
commands([close|_], State) ->
disconnect(State, normal);
+commands([{closing, Timeout}|_], State) ->
+ {next_state, closing, keepalive_cancel(State),
+ {state_timeout, Timeout, closing_timeout}};
commands([Error={error, _}|_], State) ->
disconnect(State, Error);
commands([{active, Active}|Tail], State) when is_boolean(Active) ->
@@ -1176,33 +1220,37 @@ commands([{switch_protocol, Protocol, _ProtoState0}|Tail], State=#state{
commands(Tail, keepalive_timeout(State#state{protocol=Protocol, protocol_state=ProtoState,
event_handler_state=EvHandlerState})).
-disconnect(State=#state{owner=Owner, opts=Opts,
+disconnect(State0=#state{owner=Owner, status=Status, opts=Opts,
socket=Socket, transport=Transport,
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) ->
- {_, EvHandlerState1} = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0),
- %% @todo Need a special state for orderly shutdown of a connection.
+ EvHandlerState1 = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0),
_ = Transport:close(Socket),
- %% We closed the socket, discard any remaining socket events.
- disconnect_flush(State),
- %% @todo Stop keepalive timeout, flush message.
- {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState),
- Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams},
- %% Trigger the disconnect event.
- DisconnectEvent = #{
- reason => Reason
- },
- EvHandlerState = EvHandler:disconnect(DisconnectEvent, EvHandlerState1),
- Retry = maps:get(retry, Opts, 5),
- case Retry of
- 0 ->
- {stop, {shutdown, Reason}, State#state{event_handler_state=EvHandlerState}};
- _ ->
- {next_state, not_connected,
- keepalive_cancel(State#state{socket=undefined,
- protocol=undefined, protocol_state=undefined,
- event_handler_state=EvHandlerState}),
- {next_event, internal, {retries, Retry - 1, Reason}}}
+ EvHandlerState = EvHandler:disconnect(#{reason => Reason}, EvHandlerState1),
+ State = State0#state{event_handler_state=EvHandlerState},
+ case Status of
+ {down, DownReason} ->
+ owner_down(DownReason, State);
+ shutdown ->
+ {stop, shutdown, State};
+ {up, _} ->
+ %% We closed the socket, discard any remaining socket events.
+ disconnect_flush(State),
+ %% @todo Stop keepalive timeout, flush message.
+ {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState),
+ Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams},
+ Retry = maps:get(retry, Opts, 5),
+ case Retry of
+ 0 when Reason =:= normal ->
+ {stop, normal, State};
+ 0 ->
+ {stop, {shutdown, Reason}, State};
+ _ ->
+ {next_state, not_connected,
+ keepalive_cancel(State#state{socket=undefined,
+ protocol=undefined, protocol_state=undefined}),
+ {next_event, internal, {retries, Retry - 1, Reason}}}
+ end
end.
disconnect_flush(State=#state{socket=Socket, messages={OK, Closed, Error}}) ->
@@ -1220,6 +1268,12 @@ active(State=#state{socket=Socket, transport=Transport}) ->
Transport:setopts(Socket, [{active, once}]),
State.
+status(State=#state{status={up, OwnerRef}}, NewStatus) ->
+ demonitor(OwnerRef, [flush]),
+ State#state{status=NewStatus};
+status(State, NewStatus) ->
+ State#state{status=NewStatus}.
+
keepalive_timeout(State=#state{opts=Opts, protocol=Protocol}) ->
{ProtoOptsKey, Default} = case Protocol of
gun_http -> {http_opts, infinity};
diff --git a/src/gun_event.erl b/src/gun_event.erl
index 1e158c4..a984796 100644
--- a/src/gun_event.erl
+++ b/src/gun_event.erl
@@ -113,7 +113,10 @@
-type push_promise_end_event() :: #{
stream_ref := reference(),
reply_to := pid(),
- promised_stream_ref := reference(),
+ %% No stream is created if we receive the push_promise while
+ %% in the process of gracefully shutting down the connection.
+ %% The promised stream is canceled immediately.
+ promised_stream_ref => reference(),
method := binary(),
uri := binary(),
headers := [{binary(), iodata()}]
diff --git a/src/gun_http.erl b/src/gun_http.erl
index ec268ad..309772e 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -19,6 +19,7 @@
-export([init/4]).
-export([handle/4]).
-export([update_flow/4]).
+-export([closing/4]).
-export([close/4]).
-export([keepalive/1]).
-export([headers/11]).
@@ -71,6 +72,10 @@ check_options(Opts) ->
do_check_options([]) ->
ok;
+do_check_options([{closing_timeout, infinity}|Opts]) ->
+ do_check_options(Opts);
+do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 ->
+ do_check_options(Opts);
do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
case gun_content_handler:check_option(Handlers) of
ok -> do_check_options(Opts);
@@ -460,26 +465,47 @@ update_flow(State=#http_state{streams=Streams0}, _ReplyTo, StreamRef, Inc) ->
end || Tuple = #stream{ref=Ref, flow=Flow} <- Streams0],
{state, State#http_state{streams=Streams}}.
-%% @todo Use Reason.
-close(_, State=#http_state{in=body_close, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]},
- EvHandler, EvHandlerState0) ->
+%% We can immediately close the connection when there's no streams.
+closing(_, #http_state{streams=[]}, _, EvHandlerState) ->
+ {close, EvHandlerState};
+%% Otherwise we set connection: close (even if the header was not sent)
+%% and close any pipelined streams, only keeping the active stream.
+closing(Reason, State=#http_state{streams=[LastStream|Tail]}, _, EvHandlerState) ->
+ close_streams(Tail, {closing, Reason}),
+ {[
+ {state, State#http_state{connection=close, streams=[LastStream]}},
+ closing(State)
+ ], EvHandlerState}.
+
+closing(#http_state{opts=Opts}) ->
+ Timeout = maps:get(closing_timeout, Opts, 15000),
+ {closing, Timeout}.
+
+close(Reason, State=#http_state{in=body_close,
+ streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]},
+ EvHandler, EvHandlerState) ->
+ %% We may have more than one stream in case we somehow close abruptly.
+ close_streams(Tail, close_reason(Reason)),
_ = send_data(<<>>, State, fin),
- EvHandlerState = EvHandler:response_end(#{
+ EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
- }, EvHandlerState0),
- {close_streams(Tail), EvHandlerState};
-close(_, #http_state{streams=Streams}, _, EvHandlerState) ->
- {close_streams(Streams), EvHandlerState}.
+ }, EvHandlerState);
+close(Reason, #http_state{streams=Streams}, _, EvHandlerState) ->
+ close_streams(Streams, close_reason(Reason)),
+ EvHandlerState.
+
+close_reason(closed) -> closed;
+close_reason(Reason) -> {closed, Reason}.
-close_streams([]) ->
+%% @todo Do we want an event for this?
+close_streams([], _) ->
ok;
-close_streams([#stream{is_alive=false}|Tail]) ->
- close_streams(Tail);
-close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
- ReplyTo ! {gun_error, self(), StreamRef, {closed,
- "The connection was lost."}},
- close_streams(Tail).
+close_streams([#stream{is_alive=false}|Tail], Reason) ->
+ close_streams(Tail, Reason);
+close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) ->
+ ReplyTo ! {gun_error, self(), StreamRef, Reason},
+ close_streams(Tail, Reason).
%% We don't send a keep-alive when a CONNECT request was initiated.
keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}) ->
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 3b3b79b..5942037 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -19,6 +19,7 @@
-export([init/4]).
-export([handle/4]).
-export([update_flow/4]).
+-export([closing/4]).
-export([close/4]).
-export([keepalive/1]).
-export([headers/11]).
@@ -53,6 +54,10 @@
content_handlers :: gun_content_handler:opt(),
buffer = <<>> :: binary(),
+ %% Current status of the connection. We use this to ensure we are
+ %% not sending the GOAWAY frame more than once.
+ status = connected :: connected | goaway | closing,
+
%% HTTP/2 state machine.
http2_machine :: cow_http2_machine:http2_machine(),
@@ -66,6 +71,10 @@ check_options(Opts) ->
do_check_options([]) ->
ok;
+do_check_options([{closing_timeout, infinity}|Opts]) ->
+ do_check_options(Opts);
+do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 ->
+ do_check_options(Opts);
do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
case gun_content_handler:check_option(Handlers) of
ok -> do_check_options(Opts);
@@ -99,7 +108,8 @@ handle(Data, State=#http2_state{buffer=Buffer}, EvHandler, EvHandlerState) ->
parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>},
EvHandler, EvHandlerState).
-parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandlerState0) ->
+parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, streams=Streams},
+ EvHandler, EvHandlerState0) ->
MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine),
case cow_http2:parse(Data, MaxFrameSize) of
{ok, Frame, Rest} ->
@@ -116,7 +126,14 @@ parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandle
parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}),
EvHandler, EvHandlerState0);
Error = {connection_error, _, _} ->
- {terminate(State0, Error), EvHandlerState0};
+ {connection_error(State0, Error), EvHandlerState0};
+ %% If we both received and sent a GOAWAY frame and there are no streams
+ %% currently running, we can close the connection immediately.
+ more when Status =/= connected, Streams =:= [] ->
+ {[{state, State0#http2_state{buffer=Data, status=closing}}, close], EvHandlerState0};
+ %% Otherwise we enter the closing state.
+ more when Status =:= goaway ->
+ {[{state, State0#http2_state{buffer=Data, status=closing}}, closing(State0)], EvHandlerState0};
more ->
{{state, State0#http2_state{buffer=Data}}, EvHandlerState0}
end.
@@ -169,9 +186,8 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl
push_promise_frame(State#http2_state{http2_machine=HTTP2Machine},
StreamID, PromisedStreamID, Headers, PseudoHeaders,
EvHandler, EvHandlerState);
- {ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} ->
- {terminate(State#http2_state{http2_machine=HTTP2Machine},
- {stop, Frame, 'Server is going away.'}),
+ {ok, GoAway={goaway, _, _, _}, HTTP2Machine} ->
+ {goaway(State#http2_state{http2_machine=HTTP2Machine}, GoAway),
EvHandlerState};
{send, SendData, HTTP2Machine} ->
send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData,
@@ -181,7 +197,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl
StreamID, {stream_error, Reason, Human}),
EvHandlerState};
{error, Error={connection_error, _, _}, HTTP2Machine} ->
- {terminate(State#http2_state{http2_machine=HTTP2Machine}, Error),
+ {connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error),
EvHandlerState}
end.
@@ -318,7 +334,8 @@ rst_stream_frame(State=#http2_state{streams=Streams0},
end.
%% Pushed streams receive the same initial flow value as the parent stream.
-push_promise_frame(State=#http2_state{streams=Streams},
+push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
+ status=Status, http2_machine=HTTP2Machine0, streams=Streams},
StreamID, PromisedStreamID, Headers, #{
method := Method, scheme := Scheme,
authority := Authority, path := Path},
@@ -326,25 +343,39 @@ push_promise_frame(State=#http2_state{streams=Streams},
#stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID),
PromisedStreamRef = make_ref(),
URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]),
- ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers},
- EvHandlerState = EvHandler:push_promise_end(#{
+ PushPromiseEvent0 = #{
stream_ref => StreamRef,
reply_to => ReplyTo,
- promised_stream_ref => PromisedStreamRef,
method => Method,
uri => URI,
headers => Headers
- }, EvHandlerState0),
- NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef,
- reply_to=ReplyTo, flow=InitialFlow},
- {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}.
+ },
+ PushPromiseEvent = case Status of
+ connected ->
+ ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers},
+ PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef};
+ _ ->
+ PushPromiseEvent0
+ end,
+ EvHandlerState = EvHandler:push_promise_end(PushPromiseEvent, EvHandlerState0),
+ case Status of
+ connected ->
+ NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef,
+ reply_to=ReplyTo, flow=InitialFlow},
+ {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState};
+ %% We cancel the push_promise immediately when we are shutting down.
+ _ ->
+ {ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0),
+ Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)),
+ {State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState}
+ end.
ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
case cow_http2_machine:ignored_frame(HTTP2Machine0) of
{ok, HTTP2Machine} ->
State#http2_state{http2_machine=HTTP2Machine};
{error, Error={connection_error, _, _}, HTTP2Machine} ->
- terminate(State#http2_state{http2_machine=HTTP2Machine}, Error)
+ connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error)
end.
update_flow(State=#http2_state{socket=Socket, transport=Transport,
@@ -369,16 +400,72 @@ update_flow(State=#http2_state{socket=Socket, transport=Transport,
[]
end.
-%% @todo Use Reason.
-close(_, #http2_state{streams=Streams}, _, EvHandlerState) ->
- {close_streams(Streams), EvHandlerState}.
+%% We may have to cancel streams even if we receive multiple
+%% GOAWAY frames as the LastStreamID value may be lower than
+%% the one previously received.
+goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine,
+ status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) ->
+ Streams = goaway_streams(Streams0, LastStreamID, {goaway, Reason, 'The connection is going away.'}, []),
+ State = State0#http2_state{streams=Streams},
+ case Status of
+ connected ->
+ Transport:send(Socket, cow_http2:goaway(
+ cow_http2_machine:get_last_streamid(HTTP2Machine),
+ no_error, <<>>)),
+ State#http2_state{status=goaway};
+ _ ->
+ State
+ end.
+
+%% Cancel server-initiated streams that are above LastStreamID.
+goaway_streams([], _, _, Acc) ->
+ Acc;
+goaway_streams([Stream=#stream{id=StreamID}|Tail], LastStreamID, Reason, Acc)
+ when StreamID > LastStreamID, (StreamID rem 2) =:= 1 ->
+ close_stream(Stream, Reason),
+ goaway_streams(Tail, LastStreamID, Reason, Acc);
+goaway_streams([Stream|Tail], LastStreamID, Reason, Acc) ->
+ goaway_streams(Tail, LastStreamID, Reason, [Stream|Acc]).
+
+%% We are already closing, do nothing.
+closing(_, #http2_state{status=closing}, _, EvHandlerState) ->
+ {[], EvHandlerState};
+closing(Reason0, State=#http2_state{socket=Socket, transport=Transport,
+ http2_machine=HTTP2Machine}, _, EvHandlerState) ->
+ Reason = case Reason0 of
+ normal -> no_error;
+ owner_down -> no_error;
+ _ -> internal_error
+ end,
+ Transport:send(Socket, cow_http2:goaway(
+ cow_http2_machine:get_last_streamid(HTTP2Machine),
+ Reason, <<>>)),
+ {[
+ {state, State#http2_state{status=closing}},
+ closing(State)
+ ], EvHandlerState}.
-close_streams([]) ->
+closing(#http2_state{opts=Opts}) ->
+ Timeout = maps:get(closing_timeout, Opts, 15000),
+ {closing, Timeout}.
+
+close(Reason, #http2_state{streams=Streams}, _, EvHandlerState) ->
+ close_streams(Streams, close_reason(Reason)),
+ EvHandlerState.
+
+close_reason(closed) -> closed;
+close_reason(Reason) -> {closed, Reason}.
+
+close_streams([], _) ->
ok;
-close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
- ReplyTo ! {gun_error, self(), StreamRef, {closed,
- "The connection was lost."}},
- close_streams(Tail).
+close_streams([Stream|Tail], Reason) ->
+ close_stream(Stream, Reason),
+ close_streams(Tail, Reason).
+
+%% @todo Do we want an event for this?
+close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) ->
+ ReplyTo ! {gun_error, self(), StreamRef, Reason},
+ ok.
keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
Transport:send(Socket, cow_http2:ping(0)),
@@ -429,6 +516,7 @@ request(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
headers => Headers
},
EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
+ %% @todo We should not send an empty DATA frame on empty bodies.
{ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
@@ -586,8 +674,9 @@ down(#http2_state{streams=Streams}) ->
KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
-terminate(#http2_state{socket=Socket, transport=Transport,
- http2_machine=HTTP2Machine, streams=Streams}, Reason) ->
+connection_error(#http2_state{socket=Socket, transport=Transport,
+ http2_machine=HTTP2Machine, streams=Streams},
+ {connection_error, Reason, _}) ->
%% The connection is going away either at the request of the server,
%% or because an error occurred in the protocol. Inform the streams.
%% @todo We should not send duplicate messages to processes.
@@ -597,12 +686,9 @@ terminate(#http2_state{socket=Socket, transport=Transport,
_ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams],
Transport:send(Socket, cow_http2:goaway(
cow_http2_machine:get_last_streamid(HTTP2Machine),
- terminate_reason(Reason), <<>>)),
+ Reason, <<>>)),
close.
-terminate_reason({connection_error, Reason, _}) -> Reason;
-terminate_reason({stop, _, _}) -> no_error.
-
%% Stream functions.
error_stream_closed(State, StreamRef, ReplyTo) ->
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index 42cf049..49911dc 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -19,6 +19,7 @@
-export([init/9]).
-export([handle/4]).
-export([update_flow/4]).
+-export([closing/4]).
-export([close/4]).
-export([send/4]).
-export([down/1]).
@@ -38,8 +39,10 @@
stream_ref :: reference(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
+ opts = #{} :: map(), %% @todo
buffer = <<>> :: binary(),
in = head :: head | #payload{} | close,
+ out = head :: head | close,
frag_state = undefined :: cow_ws:frag_state(),
utf8_state = 0 :: cow_ws:utf8_state(),
extensions = #{} :: cow_ws:extensions(),
@@ -53,6 +56,10 @@ check_options(Opts) ->
do_check_options([]) ->
ok;
+do_check_options([{closing_timeout, infinity}|Opts]) ->
+ do_check_options(Opts);
+do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 ->
+ do_check_options(Opts);
do_check_options([{compress, B}|Opts]) when B =:= true; B =:= false ->
do_check_options(Opts);
do_check_options([{default_protocol, M}|Opts]) when is_atom(M) ->
@@ -75,12 +82,15 @@ init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Hand
Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
{ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts),
{switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef,
- socket=Socket, transport=Transport, extensions=Extensions,
+ socket=Socket, transport=Transport, opts=Opts, extensions=Extensions,
flow=InitialFlow, handler=Handler, handler_state=HandlerState}}.
%% Do not handle anything if we received a close frame.
-handle(_, State=#ws_state{in=close}, _, EvHandlerState) ->
- {{state, State}, EvHandlerState};
+%% Initiate or terminate the closing state depending on whether we sent a close yet.
+handle(_, State=#ws_state{in=close, out=close}, _, EvHandlerState) ->
+ {[{state, State}, close], EvHandlerState};
+handle(_, State=#ws_state{in=close}, EvHandler, EvHandlerState) ->
+ closing(normal, State, EvHandler, EvHandlerState);
%% Shortcut for common case when Data is empty after processing a frame.
handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) ->
maybe_active(State, EvHandlerState);
@@ -119,7 +129,7 @@ handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer,
more ->
maybe_active(State#ws_state{buffer=Data2}, EvHandlerState1);
error ->
- close({error, badframe}, State, EvHandler, EvHandlerState1)
+ closing({error, badframe}, State, EvHandler, EvHandlerState1)
end;
handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey,
close_code=CloseCode, unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState,
@@ -143,7 +153,7 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke
len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2},
EvHandlerState);
Error = {error, _Reason} ->
- close(Error, State, EvHandler, EvHandlerState)
+ closing(Error, State, EvHandler, EvHandlerState)
end.
maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) ->
@@ -165,11 +175,11 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
}, EvHandlerState0),
case cow_ws:make_frame(Type, Payload, CloseCode, FragState) of
ping ->
- {{state, State}, EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1),
- handle(Rest, State, EvHandler, EvHandlerState);
+ {[], EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1),
+ handle(Rest, State0, EvHandler, EvHandlerState);
{ping, Payload} ->
- {{state, State}, EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1),
- handle(Rest, State, EvHandler, EvHandlerState);
+ {[], EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1),
+ handle(Rest, State0, EvHandler, EvHandlerState);
pong ->
handle(Rest, State0, EvHandler, EvHandlerState1);
{pong, _} ->
@@ -200,26 +210,30 @@ update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) ->
{active, Flow > 0}
].
-close(Reason, State, EvHandler, EvHandlerState) ->
- case Reason of
- normal ->
- send({close, 1000, <<>>}, State, EvHandler, EvHandlerState);
- owner_down ->
- send({close, 1001, <<>>}, State, EvHandler, EvHandlerState);
- {error, badframe} ->
- send({close, 1002, <<>>}, State, EvHandler, EvHandlerState);
- {error, badencoding} ->
- send({close, 1007, <<>>}, State, EvHandler, EvHandlerState);
- %% Socket errors; do nothing.
- closed ->
- {ok, EvHandlerState};
- {error, _} ->
- {ok, EvHandlerState}
- end.
+%% The user already sent the close frame; do nothing.
+closing(_, State=#ws_state{out=close}, _, EvHandlerState) ->
+ {closing(State), EvHandlerState};
+closing(Reason, State, EvHandler, EvHandlerState) ->
+ Code = case Reason of
+ normal -> 1000;
+ owner_down -> 1001;
+ shutdown -> 1001;
+ {error, badframe} -> 1002;
+ {error, badencoding} -> 1007
+ end,
+ send({close, Code, <<>>}, State, EvHandler, EvHandlerState).
+closing(#ws_state{opts=Opts}) ->
+ Timeout = maps:get(closing_timeout, Opts, 15000),
+ {closing, Timeout}.
+
+close(_, _, _, EvHandlerState) ->
+ EvHandlerState.
+
+%% Send one frame.
send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
- socket=Socket, transport=Transport, extensions=Extensions},
- EvHandler, EvHandlerState0) ->
+ socket=Socket, transport=Transport, in=In, extensions=Extensions},
+ EvHandler, EvHandlerState0) when not is_list(Frame) ->
WsSendFrameEvent = #{
stream_ref => StreamRef,
reply_to => ReplyTo,
@@ -229,10 +243,28 @@ send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
EvHandlerState1 = EvHandler:ws_send_frame_start(WsSendFrameEvent, EvHandlerState0),
Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)),
EvHandlerState = EvHandler:ws_send_frame_end(WsSendFrameEvent, EvHandlerState1),
- case Frame of
- close -> {close, EvHandlerState};
- {close, _, _} -> {close, EvHandlerState};
- _ -> {{state, State}, EvHandlerState}
+ if
+ Frame =:= close; element(1, Frame) =:= close ->
+ {[
+ {state, State#ws_state{out=close}},
+ %% We can close immediately if we already received a close frame.
+ case In of
+ close -> close;
+ _ -> closing(State)
+ end
+ ], EvHandlerState};
+ true ->
+ {[], EvHandlerState}
+ end;
+%% Send many frames.
+send([], _, _, EvHandlerState) ->
+ {[], EvHandlerState};
+send([Frame|Tail], State, EvHandler, EvHandlerState0) ->
+ case send(Frame, State, EvHandler, EvHandlerState0) of
+ {[], EvHandlerState} ->
+ send(Tail, State, EvHandler, EvHandlerState);
+ Other ->
+ Other
end.
%% Websocket has no concept of streams.
diff --git a/test/gun_SUITE.erl b/test/gun_SUITE.erl
index 3d3734b..0beee43 100644
--- a/test/gun_SUITE.erl
+++ b/test/gun_SUITE.erl
@@ -90,94 +90,6 @@ do_timeout(Opt, Timeout) ->
gun:close(Pid)
end.
-detect_owner_down(_) ->
- {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]),
- {ok, {_, Port}} = inet:sockname(ListenSocket),
- Self = self(),
- spawn(fun() ->
- {ok, ConnPid} = gun:open("localhost", Port),
- Self ! {conn, ConnPid},
- gun:await_up(ConnPid),
- timer:sleep(100)
- end),
- {ok, _} = gen_tcp:accept(ListenSocket, 5000),
- Pid = receive
- {conn, C} ->
- C
- after 1000 ->
- error(timeout)
- end,
- Ref = monitor(process, Pid),
- receive
- {'DOWN', Ref, process, Pid, normal} ->
- ok
- after 1000 ->
- true = erlang:is_process_alive(Pid),
- error(timeout)
- end.
-
-detect_owner_down_unexpected(_) ->
- {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]),
- {ok, {_, Port}} = inet:sockname(ListenSocket),
- Self = self(),
- spawn(fun() ->
- {ok, ConnPid} = gun:open("localhost", Port),
- Self ! {conn, ConnPid},
- gun:await_up(ConnPid),
- timer:sleep(100),
- exit(unexpected)
- end),
- {ok, _} = gen_tcp:accept(ListenSocket, 5000),
- Pid = receive
- {conn, C} ->
- C
- after 1000 ->
- error(timeout)
- end,
- Ref = monitor(process, Pid),
- receive
- {'DOWN', Ref, process, Pid, {shutdown, {owner_down, unexpected}}} ->
- ok
- after 1000 ->
- true = erlang:is_process_alive(Pid),
- error(timeout)
- end.
-
-detect_owner_down_ws(_) ->
- Name = name(),
- {ok, _} = cowboy:start_clear(Name, [], #{env => #{
- dispatch => cowboy_router:compile([{'_', [{"/", ws_echo_h, []}]}])
- }}),
- Port = ranch:get_port(Name),
- Self = self(),
- spawn(fun() ->
- {ok, ConnPid} = gun:open("localhost", Port),
- Self ! {conn, ConnPid},
- gun:await_up(ConnPid),
- gun:ws_upgrade(ConnPid, "/", []),
- receive
- {gun_upgrade, ConnPid, _, [<<"websocket">>], _} ->
- ok
- after 1000 ->
- error(timeout)
- end
- end),
- Pid = receive
- {conn, C} ->
- C
- after 1000 ->
- error(timeout)
- end,
- Ref = monitor(process, Pid),
- receive
- {'DOWN', Ref, process, Pid, normal} ->
- ok
- after 1000 ->
- true = erlang:is_process_alive(Pid),
- error(timeout)
- end,
- cowboy:stop_listener(Name).
-
ignore_empty_data_http(_) ->
doc("When gun:data/4 is called with nofin and empty data, it must be ignored."),
{ok, OriginPid, OriginPort} = init_origin(tcp, http),
diff --git a/test/handlers/delayed_hello_h.erl b/test/handlers/delayed_hello_h.erl
new file mode 100644
index 0000000..68ef1ad
--- /dev/null
+++ b/test/handlers/delayed_hello_h.erl
@@ -0,0 +1,11 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(delayed_hello_h).
+
+-export([init/2]).
+
+init(Req, Timeout) ->
+ timer:sleep(Timeout),
+ {ok, cowboy_req:reply(200, #{
+ <<"content-type">> => <<"text/plain">>
+ }, <<"Hello world!">>, Req), Timeout}.
diff --git a/test/handlers/delayed_push_h.erl b/test/handlers/delayed_push_h.erl
new file mode 100644
index 0000000..dbb8e56
--- /dev/null
+++ b/test/handlers/delayed_push_h.erl
@@ -0,0 +1,13 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(delayed_push_h).
+
+-export([init/2]).
+
+init(Req, Timeout) ->
+ timer:sleep(Timeout),
+ cowboy_req:push("/", #{<<"accept">> => <<"text/plain">>}, Req),
+ cowboy_req:push("/empty", #{<<"accept">> => <<"text/plain">>}, Req),
+ {ok, cowboy_req:reply(200, #{
+ <<"content-type">> => <<"text/plain">>
+ }, <<"Hello world!">>, Req), Timeout}.
diff --git a/test/handlers/ws_frozen_h.erl b/test/handlers/ws_frozen_h.erl
new file mode 100644
index 0000000..bac77c2
--- /dev/null
+++ b/test/handlers/ws_frozen_h.erl
@@ -0,0 +1,23 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(ws_frozen_h).
+
+-export([init/2]).
+-export([websocket_init/1]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+
+init(Req, State) ->
+ {cowboy_websocket, Req, State, #{
+ compress => true
+ }}.
+
+websocket_init(Timeout) ->
+ timer:sleep(Timeout),
+ {ok, undefined}.
+
+websocket_handle(_Frame, State) ->
+ {[], State}.
+
+websocket_info(_Info, State) ->
+ {[], State}.
diff --git a/test/handlers/ws_timeout_close_h.erl b/test/handlers/ws_timeout_close_h.erl
new file mode 100644
index 0000000..6fef168
--- /dev/null
+++ b/test/handlers/ws_timeout_close_h.erl
@@ -0,0 +1,25 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(ws_timeout_close_h).
+
+-export([init/2]).
+-export([websocket_init/1]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+
+init(Req, State) ->
+ {cowboy_websocket, Req, State, #{
+ compress => true
+ }}.
+
+websocket_init(Timeout) ->
+ _ = erlang:send_after(Timeout, self(), timeout_close),
+ {[], undefined}.
+
+websocket_handle(_Frame, State) ->
+ {[], State}.
+
+websocket_info(timeout_close, State) ->
+ {[{close, 3333, <<>>}], State};
+websocket_info(_Info, State) ->
+ {[], State}.
diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl
index f494c9f..507b75a 100644
--- a/test/rfc7540_SUITE.erl
+++ b/test/rfc7540_SUITE.erl
@@ -81,11 +81,11 @@ lingering_data_counts_toward_connection_window(_) ->
{ok, _} = gen_tcp:recv(Socket, SkipLen, 1000),
%% Skip the data.
{ok, <<_:24, 0:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Step 3.
%% Send a HEADERS frame.
{HeadersBlock, _} = cow_hpack:encode([
{<<":status">>, <<"200">>}
]),
- %% Step 3.
ok = Transport:send(Socket, [
cow_http2:headers(1, nofin, HeadersBlock)
]),
diff --git a/test/shutdown_SUITE.erl b/test/shutdown_SUITE.erl
new file mode 100644
index 0000000..e52a3ab
--- /dev/null
+++ b/test/shutdown_SUITE.erl
@@ -0,0 +1,609 @@
+%% Copyright (c) 2019, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(shutdown_SUITE).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-import(ct_helper, [doc/1]).
+-import(ct_helper, [config/2]).
+-import(gun_test, [init_origin/3]).
+
+all() ->
+ [{group, shutdown}].
+
+groups() ->
+ [{shutdown, [parallel], ct_helper:all(?MODULE)}].
+
+init_per_suite(Config) ->
+ ProtoOpts = #{env => #{
+ dispatch => cowboy_router:compile([{'_', [
+ {"/", hello_h, []},
+ {"/delayed", delayed_hello_h, 500},
+ {"/delayed_push", delayed_push_h, 500},
+ {"/empty", empty_h, []},
+ {"/ws", ws_echo_h, []},
+ {"/ws_frozen", ws_frozen_h, 500},
+ %% This timeout determines how long the test suite will run.
+ {"/ws_frozen_long", ws_frozen_h, 1500},
+ {"/ws_timeout_close", ws_timeout_close_h, 500}
+ ]}])
+ }},
+ {ok, _} = cowboy:start_clear(?MODULE, [], ProtoOpts),
+ OriginPort = ranch:get_port(?MODULE),
+ [{origin_port, OriginPort}|Config].
+
+end_per_suite(_) ->
+ ok = cowboy:stop_listener(?MODULE).
+
+%% Tests.
+%%
+%% This test suite checks that the various ways to shut down
+%% the connection are all working as expected for the different
+%% protocols and scenarios.
+
+not_connected_gun_shutdown(_) ->
+ doc("Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 while it isn't connected."),
+ {ok, ConnPid} = gun:open("localhost", 12345),
+ ConnRef = monitor(process, ConnPid),
+ gun:shutdown(ConnPid),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+not_connected_owner_down(_) ->
+ doc("Confirm that the Gun process shuts down when the owner exits normally "
+ "while it isn't connected."),
+ do_not_connected_owner_down(normal, normal).
+
+not_connected_owner_down_error(_) ->
+ doc("Confirm that the Gun process shuts down when the owner exits with an error "
+ "while it isn't connected."),
+ do_not_connected_owner_down(unexpected, {shutdown, {owner_down, unexpected}}).
+
+do_not_connected_owner_down(ExitReason, DownReason) ->
+ Self = self(),
+ spawn(fun() ->
+ {ok, ConnPid} = gun:open("localhost", 12345),
+ Self ! {conn, ConnPid},
+ timer:sleep(500),
+ exit(ExitReason)
+ end),
+ ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end,
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, DownReason).
+
+http1_gun_shutdown_no_streams(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with no active streams."),
+ do_http_gun_shutdown_no_streams(Config, http).
+
+do_http_gun_shutdown_no_streams(Config, Protocol) ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ gun:shutdown(ConnPid),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http1_gun_shutdown_one_stream(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with one active stream."),
+ do_http_gun_shutdown_one_stream(Config, http).
+
+do_http_gun_shutdown_one_stream(Config, Protocol) ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/delayed"),
+ gun:shutdown(ConnPid),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http1_gun_shutdown_pipelined_streams(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with one active stream and additional pipelined streams."),
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef1 = gun:get(ConnPid, "/delayed"),
+ StreamRef2 = gun:get(ConnPid, "/delayed"),
+ StreamRef3 = gun:get(ConnPid, "/delayed"),
+ gun:shutdown(ConnPid),
+ %% Pipelined streams are canceled immediately.
+ {error, {stream_error, {closing, shutdown}}} = gun:await(ConnPid, StreamRef2),
+ {error, {stream_error, {closing, shutdown}}} = gun:await(ConnPid, StreamRef3),
+ %% The active stream is still processed.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {ok, _} = gun:await_body(ConnPid, StreamRef1),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http1_gun_shutdown_timeout(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down when the closing_timeout "
+ "triggers after calling gun:shutdown/1 with one active stream."),
+ do_http_gun_shutdown_timeout(Config, http, http_opts).
+
+do_http_gun_shutdown_timeout(Config, Protocol, ProtoOpts) ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ ProtoOpts => #{closing_timeout => 100},
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/delayed"),
+ gun:shutdown(ConnPid),
+ %% The closing timeout occurs before the server gets to send the response.
+ %% We get a 'closed' error instead of 'closing' as a result.
+ {error, {stream_error, {closed, shutdown}}} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http1_owner_down(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down when the owner exits normally."),
+ do_http_owner_down(Config, http, normal, normal).
+
+http1_owner_down_error(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down when the owner exits with an error."),
+ do_http_owner_down(Config, http, unexpected, {shutdown, {owner_down, unexpected}}).
+
+do_http_owner_down(Config, Protocol, ExitReason, DownReason) ->
+ Self = self(),
+ spawn(fun() ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ Self ! {conn, ConnPid},
+ {ok, Protocol} = gun:await_up(ConnPid),
+ timer:sleep(500),
+ exit(ExitReason)
+ end),
+ ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end,
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, DownReason).
+
+http1_request_connection_close(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when sending a request with the connection: close header and "
+ "retry is disabled."),
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/", #{
+ <<"connection">> => <<"close">>
+ }),
+ %% We get the response followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http1_request_connection_close_pipeline(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when sending a request with the connection: close header and "
+ "retry is disabled. Pipelined requests get canceled."),
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef1 = gun:get(ConnPid, "/", #{
+ <<"connection">> => <<"close">>
+ }),
+ StreamRef2 = gun:get(ConnPid, "/"),
+ StreamRef3 = gun:get(ConnPid, "/"),
+ %% We get the response, pipelined streams get canceled, followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {ok, _} = gun:await_body(ConnPid, StreamRef1),
+ {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef2),
+ {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef3),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http1_response_connection_close(_) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when receiving a response with the connection: close header and "
+ "retry is disabled."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{
+ env => #{dispatch => cowboy_router:compile([{'_', [{"/", hello_h, []}]}])},
+ max_keepalive => 1
+ }),
+ OriginPort = ranch:get_port(?FUNCTION_NAME),
+ try
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", OriginPort, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/"),
+ %% We get the response followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+http1_response_connection_close_pipeline(_) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when receiving a response with the connection: close header and "
+ "retry is disabled. Pipelined requests get canceled."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{
+ env => #{dispatch => cowboy_router:compile([{'_', [{"/", hello_h, []}]}])},
+ max_keepalive => 1
+ }),
+ OriginPort = ranch:get_port(?FUNCTION_NAME),
+ try
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", OriginPort, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef1 = gun:get(ConnPid, "/"),
+ StreamRef2 = gun:get(ConnPid, "/"),
+ StreamRef3 = gun:get(ConnPid, "/"),
+ %% We get the response, pipelined streams get canceled, followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {ok, _} = gun:await_body(ConnPid, StreamRef1),
+ {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef2),
+ {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef3),
+ gun_is_down(ConnPid, ConnRef, normal)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+http10_connection_close(Config) ->
+ doc("HTTP/1.0: Confirm that the Gun process shuts down gracefully "
+ "when sending a request without a connection header and "
+ "retry is disabled."),
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ http_opts => #{version => 'HTTP/1.0'},
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/"),
+ %% We get the response followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http2_gun_shutdown_no_streams(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with no active streams."),
+ do_http_gun_shutdown_no_streams(Config, http2).
+
+http2_gun_shutdown_one_stream(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with one active stream."),
+ do_http_gun_shutdown_one_stream(Config, http2).
+
+http2_gun_shutdown_many_streams(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with many active streams."),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef1 = gun:get(ConnPid, "/delayed"),
+ StreamRef2 = gun:get(ConnPid, "/delayed"),
+ StreamRef3 = gun:get(ConnPid, "/delayed"),
+ gun:shutdown(ConnPid),
+ %% All streams are processed.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {ok, _} = gun:await_body(ConnPid, StreamRef1),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef2),
+ {ok, _} = gun:await_body(ConnPid, StreamRef2),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef3),
+ {ok, _} = gun:await_body(ConnPid, StreamRef3),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http2_gun_shutdown_timeout(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down when the closing_timeout "
+ "triggers after calling gun:shutdown/1 with one active stream."),
+ do_http_gun_shutdown_timeout(Config, http2, http2_opts).
+
+http2_gun_shutdown_ignore_push_promise(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with one active stream. The "
+ "resource pushed by the server after we sent the GOAWAY frame "
+ "must be ignored."),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/delayed_push"),
+ gun:shutdown(ConnPid),
+ %% We do not receive the push streams. Only the response.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http2_owner_down(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down when the owner exits normally."),
+ do_http_owner_down(Config, http2, normal, normal).
+
+http2_owner_down_error(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down when the owner exits with an error."),
+ do_http_owner_down(Config, http2, unexpected, {shutdown, {owner_down, unexpected}}).
+
+http2_server_goaway_no_streams(_) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when receiving a GOAWAY frame with no active streams and "
+ "retry is disabled."),
+ {ok, _, Port} = init_origin(tcp, http2, fun(_, Socket, Transport) ->
+ Transport:send(Socket, cow_http2:goaway(0, no_error, <<>>)),
+ timer:sleep(500)
+ end),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http2_server_goaway_one_stream(_) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when receiving a GOAWAY frame with one active stream and "
+ "retry is disabled."),
+ {ok, _, OriginPort} = init_origin(tcp, http2, fun(_, Socket, Transport) ->
+ %% Receive a HEADERS frame.
+ {ok, <<SkipLen:24, 1:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Skip the header.
+ {ok, _} = gen_tcp:recv(Socket, SkipLen, 1000),
+ %% Skip the data.
+ {ok, <<_:24, 0:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Send a GOAWAY frame.
+ Transport:send(Socket, cow_http2:goaway(1, no_error, <<>>)),
+ %% Wait before sending the response back and closing the connection.
+ timer:sleep(500),
+ %% Send a HEADERS frame.
+ {HeadersBlock, _} = cow_hpack:encode([
+ {<<":status">>, <<"200">>}
+ ]),
+ ok = Transport:send(Socket, [
+ cow_http2:headers(1, fin, HeadersBlock)
+ ]),
+ timer:sleep(500)
+ end),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", OriginPort, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ timer:sleep(100), %% Give enough time for the handshake to fully complete.
+ StreamRef = gun:get(ConnPid, "/"),
+ ConnRef = monitor(process, ConnPid),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http2_server_goaway_many_streams(_) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when receiving a GOAWAY frame with many active streams and "
+ "retry is disabled."),
+ {ok, _, OriginPort} = init_origin(tcp, http2, fun(_, Socket, Transport) ->
+ %% Stream 1.
+ %% Receive a HEADERS frame.
+ {ok, <<SkipLen1:24, 1:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Skip the header.
+ {ok, _} = gen_tcp:recv(Socket, SkipLen1, 1000),
+ %% Skip the data.
+ {ok, <<_:24, 0:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Stream 2.
+ %% Receive a HEADERS frame.
+ {ok, <<SkipLen2:24, 1:8, _:8, 3:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Skip the header.
+ {ok, _} = gen_tcp:recv(Socket, SkipLen2, 1000),
+ %% Skip the data.
+ {ok, <<_:24, 0:8, _:8, 3:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Stream 3.
+ %% Receive a HEADERS frame.
+ {ok, <<SkipLen3:24, 1:8, _:8, 5:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Skip the header.
+ {ok, _} = gen_tcp:recv(Socket, SkipLen3, 1000),
+ %% Skip the data.
+ {ok, <<_:24, 0:8, _:8, 5:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Send a GOAWAY frame.
+ Transport:send(Socket, cow_http2:goaway(5, no_error, <<>>)),
+ %% Wait before sending the responses back and closing the connection.
+ timer:sleep(500),
+ %% Send a HEADERS frame.
+ {HeadersBlock1, State0} = cow_hpack:encode([
+ {<<":status">>, <<"200">>}
+ ]),
+ ok = Transport:send(Socket, [
+ cow_http2:headers(1, fin, HeadersBlock1)
+ ]),
+ {HeadersBlock2, State} = cow_hpack:encode([
+ {<<":status">>, <<"200">>}
+ ], State0),
+ ok = Transport:send(Socket, [
+ cow_http2:headers(3, fin, HeadersBlock2)
+ ]),
+ {HeadersBlock3, _} = cow_hpack:encode([
+ {<<":status">>, <<"200">>}
+ ], State),
+ ok = Transport:send(Socket, [
+ cow_http2:headers(5, fin, HeadersBlock3)
+ ]),
+ timer:sleep(500)
+ end),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", OriginPort, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ timer:sleep(100), %% Give enough time for the handshake to fully complete.
+ StreamRef1 = gun:get(ConnPid, "/"),
+ StreamRef2 = gun:get(ConnPid, "/"),
+ StreamRef3 = gun:get(ConnPid, "/"),
+ ConnRef = monitor(process, ConnPid),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef2),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef3),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+ws_gun_shutdown(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ gun:shutdown(ConnPid),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+ws_gun_shutdown_timeout(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down when "
+ "the closing_timeout triggers after calling gun:shutdown/1."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ ws_opts => #{closing_timeout => 100}
+ }),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws_frozen_long", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ gun:shutdown(ConnPid),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+ws_owner_down(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down when the owner exits normally."),
+ do_ws_owner_down(Config, normal, normal).
+
+ws_owner_down_error(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down when the owner exits with an error."),
+ do_ws_owner_down(Config, unexpected, {shutdown, {owner_down, unexpected}}).
+
+do_ws_owner_down(Config, ExitReason, DownReason) ->
+ Self = self(),
+ spawn(fun() ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)),
+ Self ! {conn, ConnPid},
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ timer:sleep(500),
+ exit(ExitReason)
+ end),
+ ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end,
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, DownReason).
+
+ws_gun_send_close_frame(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down gracefully "
+ "when sending a close frame, with retry disabled."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ retry => 0
+ }),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ %% We send a close frame. We expect the same frame back
+ %% before the connection is closed.
+ Frame = {close, 3333, <<>>},
+ gun:ws_send(ConnPid, Frame),
+ {ws, Frame} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+ws_gun_receive_close_frame(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down gracefully "
+ "when receiving a close frame, with retry disabled."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ retry => 0
+ }),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws_timeout_close", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ %% We expect a close frame before the connection is closed.
+ {ws, {close, 3333, <<>>}} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+closing_gun_shutdown(Config) ->
+ doc("Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 while Gun is closing a connection."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws_frozen", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ %% We send a close frame then immediately call gun:shutdown/1.
+ %% We expect Gun to go down without retrying to reconnect.
+ Frame = {close, 3333, <<>>},
+ gun:ws_send(ConnPid, Frame),
+ gun:shutdown(ConnPid),
+ {ws, Frame} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+closing_owner_down(Config) ->
+ doc("Confirm that the Gun process shuts down gracefully "
+ "when the owner exits normally while Gun is closing a connection."),
+ do_closing_owner_down(Config, normal, normal).
+
+closing_owner_down_error(Config) ->
+ doc("Confirm that the Gun process shuts down gracefully "
+ "when the owner exits with an error while Gun is closing a connection."),
+ do_closing_owner_down(Config, unexpected, {shutdown, {owner_down, unexpected}}).
+
+do_closing_owner_down(Config, ExitReason, DownReason) ->
+ Self = self(),
+ spawn(fun() ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)),
+ Self ! {conn, ConnPid},
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws_frozen", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ gun:ws_send(ConnPid, {close, 3333, <<>>}),
+ timer:sleep(100),
+ exit(ExitReason)
+ end),
+ ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end,
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, DownReason).
+
+%% Internal.
+
+gun_is_down(ConnPid, ConnRef, Expected) ->
+ receive
+ {'DOWN', ConnRef, process, ConnPid, Reason} ->
+ Expected = Reason,
+ ok
+ after 1000 ->
+ true = erlang:is_process_alive(ConnPid),
+ error(timeout)
+ end.
diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl
index 5cc50ec..1abf046 100644
--- a/test/ws_SUITE.erl
+++ b/test/ws_SUITE.erl
@@ -68,3 +68,30 @@ reject_upgrade(Config) ->
after 1000 ->
error(timeout)
end.
+
+send_many(Config) ->
+ doc("Ensure we can send a list of frames in one gun:ws_send call."),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config)),
+ {ok, _} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ Frame1 = {text, <<"Hello!">>},
+ Frame2 = {binary, <<"World!">>},
+ gun:ws_send(ConnPid, [Frame1, Frame2]),
+ {ws, Frame1} = gun:await(ConnPid, StreamRef),
+ {ws, Frame2} = gun:await(ConnPid, StreamRef),
+ gun:close(ConnPid).
+
+send_many_close(Config) ->
+ doc("Ensure we can send a list of frames in one gun:ws_send call, including a close frame."),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config)),
+ {ok, _} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ Frame1 = {text, <<"Hello!">>},
+ Frame2 = {binary, <<"World!">>},
+ gun:ws_send(ConnPid, [Frame1, Frame2, close]),
+ {ws, Frame1} = gun:await(ConnPid, StreamRef),
+ {ws, Frame2} = gun:await(ConnPid, StreamRef),
+ {ws, close} = gun:await(ConnPid, StreamRef),
+ gun:close(ConnPid).