aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/src/manual/gun.asciidoc33
-rw-r--r--doc/src/manual/gun.close.asciidoc3
-rw-r--r--doc/src/manual/gun.shutdown.asciidoc67
-rw-r--r--doc/src/manual/gun.ws_send.asciidoc19
-rw-r--r--src/gun.erl300
-rw-r--r--src/gun_event.erl5
-rw-r--r--src/gun_http.erl56
-rw-r--r--src/gun_http2.erl144
-rw-r--r--src/gun_ws.erl94
-rw-r--r--test/gun_SUITE.erl88
-rw-r--r--test/handlers/delayed_hello_h.erl11
-rw-r--r--test/handlers/delayed_push_h.erl13
-rw-r--r--test/handlers/ws_frozen_h.erl23
-rw-r--r--test/handlers/ws_timeout_close_h.erl25
-rw-r--r--test/rfc7540_SUITE.erl2
-rw-r--r--test/shutdown_SUITE.erl609
-rw-r--r--test/ws_SUITE.erl27
17 files changed, 1214 insertions, 305 deletions
diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc
index acc1454..b9fbbc2 100644
--- a/doc/src/manual/gun.asciidoc
+++ b/doc/src/manual/gun.asciidoc
@@ -16,7 +16,7 @@ Connection:
* link:man:gun:open(3)[gun:open(3)] - Open a connection to the given host and port
* link:man:gun:open_unix(3)[gun:open_unix(3)] - Open a connection to the given Unix domain socket
-// @todo * link:man:gun:shutdown(3)[gun:shutdown(3)] - Gracefully close the connection
+* link:man:gun:shutdown(3)[gun:shutdown(3)] - Gracefully close the connection
* link:man:gun:close(3)[gun:close(3)] - Brutally close the connection
* link:man:gun:info(3)[gun:info(3)] - Obtain information about the connection
@@ -144,6 +144,7 @@ Handshake timeout for tunneled TLS connections.
[source,erlang]
----
http_opts() :: #{
+ closing_timeout => timeout(),
flow => pos_integer(),
keepalive => timeout(),
transform_header_name => fun((binary()) -> binary()),
@@ -157,6 +158,12 @@ The default value is given next to the option name:
// @todo Document content_handlers and gun_sse_h.
+closing_timeout (15000)::
+
+Time to wait before brutally closing the connection when a
+graceful shutdown was requested via a call to
+link:man:gun:shutdown(3)[gun:shutdown(3)].
+
flow - see below::
The initial flow control value for all HTTP/1.1 streams.
@@ -188,8 +195,9 @@ HTTP version to use.
[source,erlang]
----
http2_opts() :: #{
- flow => pos_integer(),
- keepalive => timeout()
+ closing_timeout => timeout(),
+ flow => pos_integer(),
+ keepalive => timeout()
}
----
@@ -199,6 +207,12 @@ The default value is given next to the option name:
// @todo Document content_handlers and gun_sse_h.
+closing_timeout (15000)::
+
+Time to wait before brutally closing the connection when a
+graceful shutdown was requested either via a call to
+link:man:gun:shutdown(3)[gun:shutdown(3)] or by the server.
+
flow - see below::
The initial flow control value for all HTTP/2 streams.
@@ -364,9 +378,10 @@ The pid of the process that will receive the response messages.
[source,erlang]
----
ws_opts() :: #{
- compress => boolean(),
- flow => pos_integer(),
- protocols => [{binary(), module()}]
+ closing_timeout => timeout(),
+ compress => boolean(),
+ flow => pos_integer(),
+ protocols => [{binary(), module()}]
}
----
@@ -374,6 +389,12 @@ Configuration for the Websocket protocol.
The default value is given next to the option name:
+closing_timeout (15000)::
+
+Time to wait before brutally closing the connection when a
+graceful shutdown was requested either via a call to
+link:man:gun:shutdown(3)[gun:shutdown(3)] or by the server.
+
compress (false)::
Whether to enable permessage-deflate compression. This does
diff --git a/doc/src/manual/gun.close.asciidoc b/doc/src/manual/gun.close.asciidoc
index 20fd1bf..cdbe05f 100644
--- a/doc/src/manual/gun.close.asciidoc
+++ b/doc/src/manual/gun.close.asciidoc
@@ -41,4 +41,5 @@ ok = gun:close(ConnPid).
link:man:gun(3)[gun(3)],
link:man:gun:open(3)[gun:open(3)],
-link:man:gun:open_unix(3)[gun:open_unix(3)]
+link:man:gun:open_unix(3)[gun:open_unix(3)],
+link:man:gun:shutdown(3)[gun:shutdown(3)]
diff --git a/doc/src/manual/gun.shutdown.asciidoc b/doc/src/manual/gun.shutdown.asciidoc
new file mode 100644
index 0000000..94db39d
--- /dev/null
+++ b/doc/src/manual/gun.shutdown.asciidoc
@@ -0,0 +1,67 @@
+= gun:shutdown(3)
+
+== Name
+
+gun:shutdown - Gracefully close the connection
+
+== Description
+
+[source,erlang]
+----
+shutdown(ConnPid) -> ok
+
+ConnPid :: pid()
+----
+
+Gracefully close the connection.
+
+Gun will wait for up to `closing_timeout` milliseconds
+before brutally closing the connection. The graceful
+shutdown mechanism varies between the different protocols:
+
+* For HTTP/1.1 there is no such mechanism and Gun will
+ close the connection once the current response is
+ received. Any pipelined requests are immediately
+ terminated.
+
+* For HTTP/2 Gun will send a GOAWAY frame and wait for
+ the existing streams to terminate.
+
+* For Websocket Gun will send a close frame and wait
+ for the server's close frame before closing the
+ connection.
+
+The function returns immediately. The connection may
+therefore still be up for some time after this call.
+
+Gun will not attempt to reconnect once graceful
+shutdown has been initiated.
+
+== Arguments
+
+ConnPid::
+
+The pid of the Gun connection process.
+
+== Return value
+
+The atom `ok` is returned.
+
+== Changelog
+
+* *2.0*: Function introduced.
+
+== Examples
+
+.Gracefully shutdown the connection
+[source,erlang]
+----
+ok = gun:shutdown(ConnPid).
+----
+
+== See also
+
+link:man:gun(3)[gun(3)],
+link:man:gun:open(3)[gun:open(3)],
+link:man:gun:open_unix(3)[gun:open_unix(3)],
+link:man:gun:close(3)[gun:close(3)]
diff --git a/doc/src/manual/gun.ws_send.asciidoc b/doc/src/manual/gun.ws_send.asciidoc
index fbb1025..b39f3f0 100644
--- a/doc/src/manual/gun.ws_send.asciidoc
+++ b/doc/src/manual/gun.ws_send.asciidoc
@@ -30,8 +30,7 @@ The pid of the Gun connection process.
Frames::
-A Websocket frame.
-// @todo One or more Websocket frame(s).
+One or more Websocket frame(s).
== Return value
@@ -49,14 +48,14 @@ The atom `ok` is returned.
gun:ws_send(ConnPid, {text, <<"Hello world!">>}).
----
-//.Send many frames including a close frame
-//[source,erlang]
-//----
-//gun:ws_send(ConnPid, [
-// {text, <<"See you later, world!">>},
-// close
-//]).
-//----
+.Send many frames including a close frame
+[source,erlang]
+----
+gun:ws_send(ConnPid, [
+ {text, <<"See you later, world!">>},
+ close
+]).
+----
== See also
diff --git a/src/gun.erl b/src/gun.erl
index 7b06aaf..bb659e7 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -98,6 +98,7 @@
-export([connecting/3]).
-export([tls_handshake/3]).
-export([connected/3]).
+-export([closing/3]).
-export([terminate/3]).
-type req_headers() :: [{binary() | string() | atom(), iodata()}]
@@ -165,19 +166,24 @@
-export_type([req_opts/0]).
-type http_opts() :: #{
- keepalive => timeout(),
+ closing_timeout => timeout(),
+ flow => pos_integer(),
+ keepalive => timeout(),
transform_header_name => fun((binary()) -> binary()),
- version => 'HTTP/1.1' | 'HTTP/1.0'
+ version => 'HTTP/1.1' | 'HTTP/1.0'
}.
-export_type([http_opts/0]).
-type http2_opts() :: #{
+ closing_timeout => timeout(),
+ flow => pos_integer(),
keepalive => timeout()
}.
-export_type([http2_opts/0]).
%% @todo keepalive
-type ws_opts() :: #{
+ closing_timeout => timeout(),
compress => boolean(),
flow => pos_integer(),
protocols => [{binary(), module()}]
@@ -186,7 +192,7 @@
-record(state, {
owner :: pid(),
- owner_ref :: reference(),
+ status :: {up, reference()} | {down, any()} | shutdown,
host :: inet:hostname() | inet:ip_address(),
port :: inet:port_number(),
origin_scheme :: binary(),
@@ -781,7 +787,7 @@ init({Owner, Host, Port, Opts}) ->
origin_port => Port,
opts => Opts
}, EvHandlerState0),
- State = #state{owner=Owner, owner_ref=OwnerRef,
+ State = #state{owner=Owner, status={up, OwnerRef},
host=Host, port=Port, origin_scheme=OriginScheme,
origin_host=Host, origin_port=Port, opts=Opts,
transport=Transport, messages=Transport:messages(),
@@ -875,11 +881,7 @@ connecting(_, {retries, Retries, LookupInfo}, State=#state{opts=Opts,
}, EvHandlerState1),
{next_state, not_connected, State#state{event_handler_state=EvHandlerState},
{next_event, internal, {retries, Retries, Reason}}}
- end;
-connecting({call, From}, {stream_info, _}, _) ->
- {keep_state_and_data, {reply, From, {error, not_connected}}};
-connecting(Type, Event, State) ->
- handle_common(Type, Event, ?FUNCTION_NAME, State).
+ end.
tls_handshake(_, {retries, Retries, Socket0}, State=#state{opts=Opts,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
@@ -910,11 +912,7 @@ tls_handshake(_, {retries, Retries, Socket0}, State=#state{opts=Opts,
}, EvHandlerState1),
{next_state, not_connected, State#state{event_handler_state=EvHandlerState},
{next_event, internal, {retries, Retries, Reason}}}
- end;
-tls_handshake({call, From}, {stream_info, _}, _) ->
- {keep_state_and_data, {reply, From, {error, not_connected}}};
-tls_handshake(Type, Event, State) ->
- handle_common(Type, Event, ?FUNCTION_NAME, State).
+ end.
ensure_alpn(Protocols0, TransOpts) ->
Protocols = [case P of
@@ -937,28 +935,6 @@ connected(internal, {connected, Socket, Protocol},
Owner ! {gun_up, self(), Protocol:name()},
{keep_state, keepalive_timeout(active(State#state{socket=Socket,
protocol=Protocol, protocol_state=ProtoState}))};
-%% Socket events.
-connected(info, {OK, Socket, Data}, State0=#state{socket=Socket, messages={OK, _, _},
- protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0),
- case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of
- {keep_state, State} ->
- {keep_state, active(State)};
- Res ->
- Res
- end;
-connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) ->
- disconnect(State, closed);
-connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) ->
- disconnect(State, {error, Reason});
-%% Timeouts.
-%% @todo HTTP/2 requires more timeouts than just the keepalive timeout.
-%% We should have a timeout function in protocols that deal with
-%% received timeouts. Currently the timeout messages are ignored.
-connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
- ProtoState2 = Protocol:keepalive(ProtoState),
- {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})};
%% Public HTTP interface.
connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
@@ -976,14 +952,6 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, Initi
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body,
InitialFlow, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
-%% @todo Do we want to reject ReplyTo if it's not the process
-%% who initiated the connection? For both data and cancel.
-connected(cast, {data, ReplyTo, StreamRef, IsFin, Data},
- State=#state{protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {ProtoState2, EvHandlerState} = Protocol:data(ProtoState,
- StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
- {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow},
State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
%% The protocol option has been deprecated in favor of the protocols option.
@@ -1006,41 +974,6 @@ connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow
end,
ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers, InitialFlow),
{keep_state, State#state{protocol_state=ProtoState2}};
-%% When using gun_tls_proxy we need a separate message to know whether
-%% the handshake succeeded and whether we need to switch to a different protocol.
-connected(info, {gun_tls_proxy, Socket, {ok, NewProtocol}, HandshakeEvent},
- State0=#state{socket=Socket, protocol=CurrentProtocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
- socket => Socket,
- protocol => NewProtocol:name()
- }, EvHandlerState0),
- State = State0#state{event_handler_state=EvHandlerState},
- case NewProtocol of
- CurrentProtocol -> {keep_state, State};
- _ -> commands([{switch_protocol, NewProtocol, ProtoState}], State)
- end;
-connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent},
- State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
- error => Reason
- }, EvHandlerState0),
- commands([Error], State#state{event_handler_state=EvHandlerState});
-connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, State0=#state{
- protocol=Protocol, protocol_state=ProtoState}) ->
- Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow),
- case commands(Commands, State0) of
- {keep_state, State} ->
- {keep_state, active(State)};
- Res ->
- Res
- end;
-connected(cast, {cancel, ReplyTo, StreamRef}, State=#state{
- protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState,
- StreamRef, ReplyTo, EvHandler, EvHandlerState0),
- {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
%% Public Websocket interface.
%% @todo Maybe make an interface in the protocol module instead of checking on protocol name.
%% An interface would also make sure that HTTP/1.0 can't upgrade.
@@ -1067,40 +1000,148 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) ->
ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"Websocket is only supported over HTTP/1.1."}},
keep_state_and_data;
-connected(cast, {ws_send, Owner, Frame}, State=#state{
+connected(cast, {ws_send, Owner, Frames}, State=#state{
owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {Commands, EvHandlerState} = Protocol:send(Frame, ProtoState, EvHandler, EvHandlerState0),
+ {Commands, EvHandlerState} = Protocol:send(Frames, ProtoState, EvHandler, EvHandlerState0),
commands(Commands, State#state{event_handler_state=EvHandlerState});
connected(cast, {ws_send, ReplyTo, _}, _) ->
ReplyTo ! {gun_error, self(), {badstate,
"Connection needs to be upgraded to Websocket "
"before the gun:ws_send/1 function can be used."}},
keep_state_and_data;
-connected({call, From}, {stream_info, StreamRef},
+connected(Type, Event, State) ->
+ handle_common_connected(Type, Event, ?FUNCTION_NAME, State).
+
+%% Switch to the graceful connection close state.
+closing(State=#state{protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) ->
+ {Commands, EvHandlerState} = Protocol:closing(Reason, ProtoState, EvHandler, EvHandlerState0),
+ commands(Commands, State#state{event_handler_state=EvHandlerState}).
+
+%% @todo Should explicitly reject ws_send in this state?
+closing(state_timeout, closing_timeout, State=#state{status=Status}) ->
+ Reason = case Status of
+ shutdown -> shutdown;
+ {down, _} -> owner_down;
+ _ -> normal
+ end,
+ disconnect(State, Reason);
+closing(Type, Event, State) ->
+ handle_common_connected(Type, Event, ?FUNCTION_NAME, State).
+
+%% Common events when we have a connection.
+%%
+%% Socket events.
+handle_common_connected(info, {OK, Socket, Data}, _, State0=#state{socket=Socket, messages={OK, _, _},
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0),
+ case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of
+ {keep_state, State} ->
+ {keep_state, active(State)};
+ {next_state, closing, State, Actions} ->
+ {next_state, closing, active(State), Actions};
+ Res ->
+ Res
+ end;
+handle_common_connected(info, {Closed, Socket}, _, State=#state{socket=Socket, messages={_, Closed, _}}) ->
+ disconnect(State, closed);
+handle_common_connected(info, {Error, Socket, Reason}, _, State=#state{socket=Socket, messages={_, _, Error}}) ->
+ disconnect(State, {error, Reason});
+%% Timeouts.
+%% @todo HTTP/2 requires more timeouts than just the keepalive timeout.
+%% We should have a timeout function in protocols that deal with
+%% received timeouts. Currently the timeout messages are ignored.
+handle_common_connected(info, keepalive, _, State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
+ ProtoState2 = Protocol:keepalive(ProtoState),
+ {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})};
+%% When using gun_tls_proxy we need a separate message to know whether
+%% the handshake succeeded and whether we need to switch to a different protocol.
+handle_common_connected(info, {gun_tls_proxy, Socket, {ok, NewProtocol}, HandshakeEvent}, _,
+ State0=#state{socket=Socket, protocol=CurrentProtocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
+ socket => Socket,
+ protocol => NewProtocol:name()
+ }, EvHandlerState0),
+ State = State0#state{event_handler_state=EvHandlerState},
+ case NewProtocol of
+ CurrentProtocol -> {keep_state, State};
+ _ -> commands([{switch_protocol, NewProtocol, ProtoState}], State)
+ end;
+handle_common_connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent}, _,
+ State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
+ error => Reason
+ }, EvHandlerState0),
+ commands([Error], State#state{event_handler_state=EvHandlerState});
+%% @todo Do we want to reject ReplyTo if it's not the process
+%% who initiated the connection? For both data and cancel.
+handle_common_connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, _,
+ State=#state{protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {ProtoState2, EvHandlerState} = Protocol:data(ProtoState,
+ StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
+ {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
+handle_common_connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, _, State0=#state{
+ protocol=Protocol, protocol_state=ProtoState}) ->
+ Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow),
+ case commands(Commands, State0) of
+ {keep_state, State} ->
+ {keep_state, active(State)};
+ Res ->
+ Res
+ end;
+handle_common_connected(cast, {cancel, ReplyTo, StreamRef}, _, State=#state{
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState,
+ StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
+handle_common_connected({call, From}, {stream_info, StreamRef}, _,
#state{protocol=Protocol, protocol_state=ProtoState}) ->
{keep_state_and_data, {reply, From, Protocol:stream_info(ProtoState, StreamRef)}};
-connected(Type, Event, State) ->
- handle_common(Type, Event, ?FUNCTION_NAME, State).
+handle_common_connected(Type, Event, StateName, State) ->
+ handle_common(Type, Event, StateName, State).
%% Common events.
-handle_common(cast, {shutdown, Owner}, _, #state{owner=Owner}) ->
- %% @todo Graceful shutdown.
- stop;
+handle_common(cast, {shutdown, Owner}, StateName, State=#state{
+ owner=Owner, status=Status, socket=Socket, transport=Transport, protocol=Protocol}) ->
+ case {Socket, Protocol} of
+ {undefined, _} ->
+ {stop, shutdown};
+ {_, undefined} ->
+ %% @todo This is missing the disconnect event.
+ Transport:close(Socket),
+ {stop, shutdown};
+ _ when StateName =:= closing, element(1, Status) =:= up ->
+ {keep_state, status(State, shutdown)};
+ _ when StateName =:= closing ->
+ keep_state_and_data;
+ _ ->
+ closing(status(State, shutdown), shutdown)
+ end;
%% We stop when the owner is down.
-handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, _, State=#state{
- owner=Owner, owner_ref=OwnerRef, socket=Socket, transport=Transport,
- protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {_, EvHandlerState} = case Protocol of
- undefined -> {ok, EvHandlerState0};
- _ -> Protocol:close(owner_down, ProtoState, EvHandler, EvHandlerState0)
- end,
- _ = case Socket of
- undefined -> ok;
- _ -> Transport:close(Socket)
- end,
- owner_down(Reason, State#state{event_handler_state=EvHandlerState});
+%% @todo We need to demonitor/flush when the status is no longer up.
+handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, StateName, State=#state{
+ owner=Owner, status={up, OwnerRef}, socket=Socket, transport=Transport, protocol=Protocol}) ->
+ case Socket of
+ undefined ->
+ owner_down(Reason, State);
+ _ ->
+ case Protocol of
+ undefined ->
+ %% @todo This is missing the disconnect event.
+ Transport:close(Socket),
+ owner_down(Reason, State);
+ %% We are already closing so no need to initiate closing again.
+ _ when StateName =:= closing ->
+ {keep_state, status(State, {down, Reason})};
+ _ ->
+ closing(status(State, {down, Reason}), owner_down)
+ end
+ end;
handle_common({call, From}, _, _, _) ->
{keep_state_and_data, {reply, From, {error, bad_call}}};
%% @todo The ReplyTo patch disabled the notowner behavior.
@@ -1123,6 +1164,9 @@ commands([], State) ->
{keep_state, State};
commands([close|_], State) ->
disconnect(State, normal);
+commands([{closing, Timeout}|_], State) ->
+ {next_state, closing, keepalive_cancel(State),
+ {state_timeout, Timeout, closing_timeout}};
commands([Error={error, _}|_], State) ->
disconnect(State, Error);
commands([{active, Active}|Tail], State) when is_boolean(Active) ->
@@ -1176,33 +1220,37 @@ commands([{switch_protocol, Protocol, _ProtoState0}|Tail], State=#state{
commands(Tail, keepalive_timeout(State#state{protocol=Protocol, protocol_state=ProtoState,
event_handler_state=EvHandlerState})).
-disconnect(State=#state{owner=Owner, opts=Opts,
+disconnect(State0=#state{owner=Owner, status=Status, opts=Opts,
socket=Socket, transport=Transport,
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) ->
- {_, EvHandlerState1} = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0),
- %% @todo Need a special state for orderly shutdown of a connection.
+ EvHandlerState1 = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0),
_ = Transport:close(Socket),
- %% We closed the socket, discard any remaining socket events.
- disconnect_flush(State),
- %% @todo Stop keepalive timeout, flush message.
- {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState),
- Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams},
- %% Trigger the disconnect event.
- DisconnectEvent = #{
- reason => Reason
- },
- EvHandlerState = EvHandler:disconnect(DisconnectEvent, EvHandlerState1),
- Retry = maps:get(retry, Opts, 5),
- case Retry of
- 0 ->
- {stop, {shutdown, Reason}, State#state{event_handler_state=EvHandlerState}};
- _ ->
- {next_state, not_connected,
- keepalive_cancel(State#state{socket=undefined,
- protocol=undefined, protocol_state=undefined,
- event_handler_state=EvHandlerState}),
- {next_event, internal, {retries, Retry - 1, Reason}}}
+ EvHandlerState = EvHandler:disconnect(#{reason => Reason}, EvHandlerState1),
+ State = State0#state{event_handler_state=EvHandlerState},
+ case Status of
+ {down, DownReason} ->
+ owner_down(DownReason, State);
+ shutdown ->
+ {stop, shutdown, State};
+ {up, _} ->
+ %% We closed the socket, discard any remaining socket events.
+ disconnect_flush(State),
+ %% @todo Stop keepalive timeout, flush message.
+ {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState),
+ Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams},
+ Retry = maps:get(retry, Opts, 5),
+ case Retry of
+ 0 when Reason =:= normal ->
+ {stop, normal, State};
+ 0 ->
+ {stop, {shutdown, Reason}, State};
+ _ ->
+ {next_state, not_connected,
+ keepalive_cancel(State#state{socket=undefined,
+ protocol=undefined, protocol_state=undefined}),
+ {next_event, internal, {retries, Retry - 1, Reason}}}
+ end
end.
disconnect_flush(State=#state{socket=Socket, messages={OK, Closed, Error}}) ->
@@ -1220,6 +1268,12 @@ active(State=#state{socket=Socket, transport=Transport}) ->
Transport:setopts(Socket, [{active, once}]),
State.
+status(State=#state{status={up, OwnerRef}}, NewStatus) ->
+ demonitor(OwnerRef, [flush]),
+ State#state{status=NewStatus};
+status(State, NewStatus) ->
+ State#state{status=NewStatus}.
+
keepalive_timeout(State=#state{opts=Opts, protocol=Protocol}) ->
{ProtoOptsKey, Default} = case Protocol of
gun_http -> {http_opts, infinity};
diff --git a/src/gun_event.erl b/src/gun_event.erl
index 1e158c4..a984796 100644
--- a/src/gun_event.erl
+++ b/src/gun_event.erl
@@ -113,7 +113,10 @@
-type push_promise_end_event() :: #{
stream_ref := reference(),
reply_to := pid(),
- promised_stream_ref := reference(),
+ %% No stream is created if we receive the push_promise while
+ %% in the process of gracefully shutting down the connection.
+ %% The promised stream is canceled immediately.
+ promised_stream_ref => reference(),
method := binary(),
uri := binary(),
headers := [{binary(), iodata()}]
diff --git a/src/gun_http.erl b/src/gun_http.erl
index ec268ad..309772e 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -19,6 +19,7 @@
-export([init/4]).
-export([handle/4]).
-export([update_flow/4]).
+-export([closing/4]).
-export([close/4]).
-export([keepalive/1]).
-export([headers/11]).
@@ -71,6 +72,10 @@ check_options(Opts) ->
do_check_options([]) ->
ok;
+do_check_options([{closing_timeout, infinity}|Opts]) ->
+ do_check_options(Opts);
+do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 ->
+ do_check_options(Opts);
do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
case gun_content_handler:check_option(Handlers) of
ok -> do_check_options(Opts);
@@ -460,26 +465,47 @@ update_flow(State=#http_state{streams=Streams0}, _ReplyTo, StreamRef, Inc) ->
end || Tuple = #stream{ref=Ref, flow=Flow} <- Streams0],
{state, State#http_state{streams=Streams}}.
-%% @todo Use Reason.
-close(_, State=#http_state{in=body_close, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]},
- EvHandler, EvHandlerState0) ->
+%% We can immediately close the connection when there's no streams.
+closing(_, #http_state{streams=[]}, _, EvHandlerState) ->
+ {close, EvHandlerState};
+%% Otherwise we set connection: close (even if the header was not sent)
+%% and close any pipelined streams, only keeping the active stream.
+closing(Reason, State=#http_state{streams=[LastStream|Tail]}, _, EvHandlerState) ->
+ close_streams(Tail, {closing, Reason}),
+ {[
+ {state, State#http_state{connection=close, streams=[LastStream]}},
+ closing(State)
+ ], EvHandlerState}.
+
+closing(#http_state{opts=Opts}) ->
+ Timeout = maps:get(closing_timeout, Opts, 15000),
+ {closing, Timeout}.
+
+close(Reason, State=#http_state{in=body_close,
+ streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]},
+ EvHandler, EvHandlerState) ->
+ %% We may have more than one stream in case we somehow close abruptly.
+ close_streams(Tail, close_reason(Reason)),
_ = send_data(<<>>, State, fin),
- EvHandlerState = EvHandler:response_end(#{
+ EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
- }, EvHandlerState0),
- {close_streams(Tail), EvHandlerState};
-close(_, #http_state{streams=Streams}, _, EvHandlerState) ->
- {close_streams(Streams), EvHandlerState}.
+ }, EvHandlerState);
+close(Reason, #http_state{streams=Streams}, _, EvHandlerState) ->
+ close_streams(Streams, close_reason(Reason)),
+ EvHandlerState.
+
+close_reason(closed) -> closed;
+close_reason(Reason) -> {closed, Reason}.
-close_streams([]) ->
+%% @todo Do we want an event for this?
+close_streams([], _) ->
ok;
-close_streams([#stream{is_alive=false}|Tail]) ->
- close_streams(Tail);
-close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
- ReplyTo ! {gun_error, self(), StreamRef, {closed,
- "The connection was lost."}},
- close_streams(Tail).
+close_streams([#stream{is_alive=false}|Tail], Reason) ->
+ close_streams(Tail, Reason);
+close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) ->
+ ReplyTo ! {gun_error, self(), StreamRef, Reason},
+ close_streams(Tail, Reason).
%% We don't send a keep-alive when a CONNECT request was initiated.
keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}) ->
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 3b3b79b..5942037 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -19,6 +19,7 @@
-export([init/4]).
-export([handle/4]).
-export([update_flow/4]).
+-export([closing/4]).
-export([close/4]).
-export([keepalive/1]).
-export([headers/11]).
@@ -53,6 +54,10 @@
content_handlers :: gun_content_handler:opt(),
buffer = <<>> :: binary(),
+ %% Current status of the connection. We use this to ensure we are
+ %% not sending the GOAWAY frame more than once.
+ status = connected :: connected | goaway | closing,
+
%% HTTP/2 state machine.
http2_machine :: cow_http2_machine:http2_machine(),
@@ -66,6 +71,10 @@ check_options(Opts) ->
do_check_options([]) ->
ok;
+do_check_options([{closing_timeout, infinity}|Opts]) ->
+ do_check_options(Opts);
+do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 ->
+ do_check_options(Opts);
do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
case gun_content_handler:check_option(Handlers) of
ok -> do_check_options(Opts);
@@ -99,7 +108,8 @@ handle(Data, State=#http2_state{buffer=Buffer}, EvHandler, EvHandlerState) ->
parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>},
EvHandler, EvHandlerState).
-parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandlerState0) ->
+parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, streams=Streams},
+ EvHandler, EvHandlerState0) ->
MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine),
case cow_http2:parse(Data, MaxFrameSize) of
{ok, Frame, Rest} ->
@@ -116,7 +126,14 @@ parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandle
parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}),
EvHandler, EvHandlerState0);
Error = {connection_error, _, _} ->
- {terminate(State0, Error), EvHandlerState0};
+ {connection_error(State0, Error), EvHandlerState0};
+ %% If we both received and sent a GOAWAY frame and there are no streams
+ %% currently running, we can close the connection immediately.
+ more when Status =/= connected, Streams =:= [] ->
+ {[{state, State0#http2_state{buffer=Data, status=closing}}, close], EvHandlerState0};
+ %% Otherwise we enter the closing state.
+ more when Status =:= goaway ->
+ {[{state, State0#http2_state{buffer=Data, status=closing}}, closing(State0)], EvHandlerState0};
more ->
{{state, State0#http2_state{buffer=Data}}, EvHandlerState0}
end.
@@ -169,9 +186,8 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl
push_promise_frame(State#http2_state{http2_machine=HTTP2Machine},
StreamID, PromisedStreamID, Headers, PseudoHeaders,
EvHandler, EvHandlerState);
- {ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} ->
- {terminate(State#http2_state{http2_machine=HTTP2Machine},
- {stop, Frame, 'Server is going away.'}),
+ {ok, GoAway={goaway, _, _, _}, HTTP2Machine} ->
+ {goaway(State#http2_state{http2_machine=HTTP2Machine}, GoAway),
EvHandlerState};
{send, SendData, HTTP2Machine} ->
send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData,
@@ -181,7 +197,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl
StreamID, {stream_error, Reason, Human}),
EvHandlerState};
{error, Error={connection_error, _, _}, HTTP2Machine} ->
- {terminate(State#http2_state{http2_machine=HTTP2Machine}, Error),
+ {connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error),
EvHandlerState}
end.
@@ -318,7 +334,8 @@ rst_stream_frame(State=#http2_state{streams=Streams0},
end.
%% Pushed streams receive the same initial flow value as the parent stream.
-push_promise_frame(State=#http2_state{streams=Streams},
+push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
+ status=Status, http2_machine=HTTP2Machine0, streams=Streams},
StreamID, PromisedStreamID, Headers, #{
method := Method, scheme := Scheme,
authority := Authority, path := Path},
@@ -326,25 +343,39 @@ push_promise_frame(State=#http2_state{streams=Streams},
#stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID),
PromisedStreamRef = make_ref(),
URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]),
- ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers},
- EvHandlerState = EvHandler:push_promise_end(#{
+ PushPromiseEvent0 = #{
stream_ref => StreamRef,
reply_to => ReplyTo,
- promised_stream_ref => PromisedStreamRef,
method => Method,
uri => URI,
headers => Headers
- }, EvHandlerState0),
- NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef,
- reply_to=ReplyTo, flow=InitialFlow},
- {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}.
+ },
+ PushPromiseEvent = case Status of
+ connected ->
+ ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers},
+ PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef};
+ _ ->
+ PushPromiseEvent0
+ end,
+ EvHandlerState = EvHandler:push_promise_end(PushPromiseEvent, EvHandlerState0),
+ case Status of
+ connected ->
+ NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef,
+ reply_to=ReplyTo, flow=InitialFlow},
+ {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState};
+ %% We cancel the push_promise immediately when we are shutting down.
+ _ ->
+ {ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0),
+ Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)),
+ {State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState}
+ end.
ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
case cow_http2_machine:ignored_frame(HTTP2Machine0) of
{ok, HTTP2Machine} ->
State#http2_state{http2_machine=HTTP2Machine};
{error, Error={connection_error, _, _}, HTTP2Machine} ->
- terminate(State#http2_state{http2_machine=HTTP2Machine}, Error)
+ connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error)
end.
update_flow(State=#http2_state{socket=Socket, transport=Transport,
@@ -369,16 +400,72 @@ update_flow(State=#http2_state{socket=Socket, transport=Transport,
[]
end.
-%% @todo Use Reason.
-close(_, #http2_state{streams=Streams}, _, EvHandlerState) ->
- {close_streams(Streams), EvHandlerState}.
+%% We may have to cancel streams even if we receive multiple
+%% GOAWAY frames as the LastStreamID value may be lower than
+%% the one previously received.
+goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine,
+ status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) ->
+ Streams = goaway_streams(Streams0, LastStreamID, {goaway, Reason, 'The connection is going away.'}, []),
+ State = State0#http2_state{streams=Streams},
+ case Status of
+ connected ->
+ Transport:send(Socket, cow_http2:goaway(
+ cow_http2_machine:get_last_streamid(HTTP2Machine),
+ no_error, <<>>)),
+ State#http2_state{status=goaway};
+ _ ->
+ State
+ end.
+
+%% Cancel server-initiated streams that are above LastStreamID.
+goaway_streams([], _, _, Acc) ->
+ Acc;
+goaway_streams([Stream=#stream{id=StreamID}|Tail], LastStreamID, Reason, Acc)
+ when StreamID > LastStreamID, (StreamID rem 2) =:= 1 ->
+ close_stream(Stream, Reason),
+ goaway_streams(Tail, LastStreamID, Reason, Acc);
+goaway_streams([Stream|Tail], LastStreamID, Reason, Acc) ->
+ goaway_streams(Tail, LastStreamID, Reason, [Stream|Acc]).
+
+%% We are already closing, do nothing.
+closing(_, #http2_state{status=closing}, _, EvHandlerState) ->
+ {[], EvHandlerState};
+closing(Reason0, State=#http2_state{socket=Socket, transport=Transport,
+ http2_machine=HTTP2Machine}, _, EvHandlerState) ->
+ Reason = case Reason0 of
+ normal -> no_error;
+ owner_down -> no_error;
+ _ -> internal_error
+ end,
+ Transport:send(Socket, cow_http2:goaway(
+ cow_http2_machine:get_last_streamid(HTTP2Machine),
+ Reason, <<>>)),
+ {[
+ {state, State#http2_state{status=closing}},
+ closing(State)
+ ], EvHandlerState}.
-close_streams([]) ->
+closing(#http2_state{opts=Opts}) ->
+ Timeout = maps:get(closing_timeout, Opts, 15000),
+ {closing, Timeout}.
+
+close(Reason, #http2_state{streams=Streams}, _, EvHandlerState) ->
+ close_streams(Streams, close_reason(Reason)),
+ EvHandlerState.
+
+close_reason(closed) -> closed;
+close_reason(Reason) -> {closed, Reason}.
+
+close_streams([], _) ->
ok;
-close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
- ReplyTo ! {gun_error, self(), StreamRef, {closed,
- "The connection was lost."}},
- close_streams(Tail).
+close_streams([Stream|Tail], Reason) ->
+ close_stream(Stream, Reason),
+ close_streams(Tail, Reason).
+
+%% @todo Do we want an event for this?
+close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) ->
+ ReplyTo ! {gun_error, self(), StreamRef, Reason},
+ ok.
keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
Transport:send(Socket, cow_http2:ping(0)),
@@ -429,6 +516,7 @@ request(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
headers => Headers
},
EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
+ %% @todo We should not send an empty DATA frame on empty bodies.
{ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
@@ -586,8 +674,9 @@ down(#http2_state{streams=Streams}) ->
KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
-terminate(#http2_state{socket=Socket, transport=Transport,
- http2_machine=HTTP2Machine, streams=Streams}, Reason) ->
+connection_error(#http2_state{socket=Socket, transport=Transport,
+ http2_machine=HTTP2Machine, streams=Streams},
+ {connection_error, Reason, _}) ->
%% The connection is going away either at the request of the server,
%% or because an error occurred in the protocol. Inform the streams.
%% @todo We should not send duplicate messages to processes.
@@ -597,12 +686,9 @@ terminate(#http2_state{socket=Socket, transport=Transport,
_ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams],
Transport:send(Socket, cow_http2:goaway(
cow_http2_machine:get_last_streamid(HTTP2Machine),
- terminate_reason(Reason), <<>>)),
+ Reason, <<>>)),
close.
-terminate_reason({connection_error, Reason, _}) -> Reason;
-terminate_reason({stop, _, _}) -> no_error.
-
%% Stream functions.
error_stream_closed(State, StreamRef, ReplyTo) ->
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index 42cf049..49911dc 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -19,6 +19,7 @@
-export([init/9]).
-export([handle/4]).
-export([update_flow/4]).
+-export([closing/4]).
-export([close/4]).
-export([send/4]).
-export([down/1]).
@@ -38,8 +39,10 @@
stream_ref :: reference(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
+ opts = #{} :: map(), %% @todo
buffer = <<>> :: binary(),
in = head :: head | #payload{} | close,
+ out = head :: head | close,
frag_state = undefined :: cow_ws:frag_state(),
utf8_state = 0 :: cow_ws:utf8_state(),
extensions = #{} :: cow_ws:extensions(),
@@ -53,6 +56,10 @@ check_options(Opts) ->
do_check_options([]) ->
ok;
+do_check_options([{closing_timeout, infinity}|Opts]) ->
+ do_check_options(Opts);
+do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 ->
+ do_check_options(Opts);
do_check_options([{compress, B}|Opts]) when B =:= true; B =:= false ->
do_check_options(Opts);
do_check_options([{default_protocol, M}|Opts]) when is_atom(M) ->
@@ -75,12 +82,15 @@ init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Hand
Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
{ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts),
{switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef,
- socket=Socket, transport=Transport, extensions=Extensions,
+ socket=Socket, transport=Transport, opts=Opts, extensions=Extensions,
flow=InitialFlow, handler=Handler, handler_state=HandlerState}}.
%% Do not handle anything if we received a close frame.
-handle(_, State=#ws_state{in=close}, _, EvHandlerState) ->
- {{state, State}, EvHandlerState};
+%% Initiate or terminate the closing state depending on whether we sent a close yet.
+handle(_, State=#ws_state{in=close, out=close}, _, EvHandlerState) ->
+ {[{state, State}, close], EvHandlerState};
+handle(_, State=#ws_state{in=close}, EvHandler, EvHandlerState) ->
+ closing(normal, State, EvHandler, EvHandlerState);
%% Shortcut for common case when Data is empty after processing a frame.
handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) ->
maybe_active(State, EvHandlerState);
@@ -119,7 +129,7 @@ handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer,
more ->
maybe_active(State#ws_state{buffer=Data2}, EvHandlerState1);
error ->
- close({error, badframe}, State, EvHandler, EvHandlerState1)
+ closing({error, badframe}, State, EvHandler, EvHandlerState1)
end;
handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey,
close_code=CloseCode, unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState,
@@ -143,7 +153,7 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke
len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2},
EvHandlerState);
Error = {error, _Reason} ->
- close(Error, State, EvHandler, EvHandlerState)
+ closing(Error, State, EvHandler, EvHandlerState)
end.
maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) ->
@@ -165,11 +175,11 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
}, EvHandlerState0),
case cow_ws:make_frame(Type, Payload, CloseCode, FragState) of
ping ->
- {{state, State}, EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1),
- handle(Rest, State, EvHandler, EvHandlerState);
+ {[], EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1),
+ handle(Rest, State0, EvHandler, EvHandlerState);
{ping, Payload} ->
- {{state, State}, EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1),
- handle(Rest, State, EvHandler, EvHandlerState);
+ {[], EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1),
+ handle(Rest, State0, EvHandler, EvHandlerState);
pong ->
handle(Rest, State0, EvHandler, EvHandlerState1);
{pong, _} ->
@@ -200,26 +210,30 @@ update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) ->
{active, Flow > 0}
].
-close(Reason, State, EvHandler, EvHandlerState) ->
- case Reason of
- normal ->
- send({close, 1000, <<>>}, State, EvHandler, EvHandlerState);
- owner_down ->
- send({close, 1001, <<>>}, State, EvHandler, EvHandlerState);
- {error, badframe} ->
- send({close, 1002, <<>>}, State, EvHandler, EvHandlerState);
- {error, badencoding} ->
- send({close, 1007, <<>>}, State, EvHandler, EvHandlerState);
- %% Socket errors; do nothing.
- closed ->
- {ok, EvHandlerState};
- {error, _} ->
- {ok, EvHandlerState}
- end.
+%% The user already sent the close frame; do nothing.
+closing(_, State=#ws_state{out=close}, _, EvHandlerState) ->
+ {closing(State), EvHandlerState};
+closing(Reason, State, EvHandler, EvHandlerState) ->
+ Code = case Reason of
+ normal -> 1000;
+ owner_down -> 1001;
+ shutdown -> 1001;
+ {error, badframe} -> 1002;
+ {error, badencoding} -> 1007
+ end,
+ send({close, Code, <<>>}, State, EvHandler, EvHandlerState).
+closing(#ws_state{opts=Opts}) ->
+ Timeout = maps:get(closing_timeout, Opts, 15000),
+ {closing, Timeout}.
+
+close(_, _, _, EvHandlerState) ->
+ EvHandlerState.
+
+%% Send one frame.
send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
- socket=Socket, transport=Transport, extensions=Extensions},
- EvHandler, EvHandlerState0) ->
+ socket=Socket, transport=Transport, in=In, extensions=Extensions},
+ EvHandler, EvHandlerState0) when not is_list(Frame) ->
WsSendFrameEvent = #{
stream_ref => StreamRef,
reply_to => ReplyTo,
@@ -229,10 +243,28 @@ send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
EvHandlerState1 = EvHandler:ws_send_frame_start(WsSendFrameEvent, EvHandlerState0),
Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)),
EvHandlerState = EvHandler:ws_send_frame_end(WsSendFrameEvent, EvHandlerState1),
- case Frame of
- close -> {close, EvHandlerState};
- {close, _, _} -> {close, EvHandlerState};
- _ -> {{state, State}, EvHandlerState}
+ if
+ Frame =:= close; element(1, Frame) =:= close ->
+ {[
+ {state, State#ws_state{out=close}},
+ %% We can close immediately if we already received a close frame.
+ case In of
+ close -> close;
+ _ -> closing(State)
+ end
+ ], EvHandlerState};
+ true ->
+ {[], EvHandlerState}
+ end;
+%% Send many frames.
+send([], _, _, EvHandlerState) ->
+ {[], EvHandlerState};
+send([Frame|Tail], State, EvHandler, EvHandlerState0) ->
+ case send(Frame, State, EvHandler, EvHandlerState0) of
+ {[], EvHandlerState} ->
+ send(Tail, State, EvHandler, EvHandlerState);
+ Other ->
+ Other
end.
%% Websocket has no concept of streams.
diff --git a/test/gun_SUITE.erl b/test/gun_SUITE.erl
index 3d3734b..0beee43 100644
--- a/test/gun_SUITE.erl
+++ b/test/gun_SUITE.erl
@@ -90,94 +90,6 @@ do_timeout(Opt, Timeout) ->
gun:close(Pid)
end.
-detect_owner_down(_) ->
- {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]),
- {ok, {_, Port}} = inet:sockname(ListenSocket),
- Self = self(),
- spawn(fun() ->
- {ok, ConnPid} = gun:open("localhost", Port),
- Self ! {conn, ConnPid},
- gun:await_up(ConnPid),
- timer:sleep(100)
- end),
- {ok, _} = gen_tcp:accept(ListenSocket, 5000),
- Pid = receive
- {conn, C} ->
- C
- after 1000 ->
- error(timeout)
- end,
- Ref = monitor(process, Pid),
- receive
- {'DOWN', Ref, process, Pid, normal} ->
- ok
- after 1000 ->
- true = erlang:is_process_alive(Pid),
- error(timeout)
- end.
-
-detect_owner_down_unexpected(_) ->
- {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]),
- {ok, {_, Port}} = inet:sockname(ListenSocket),
- Self = self(),
- spawn(fun() ->
- {ok, ConnPid} = gun:open("localhost", Port),
- Self ! {conn, ConnPid},
- gun:await_up(ConnPid),
- timer:sleep(100),
- exit(unexpected)
- end),
- {ok, _} = gen_tcp:accept(ListenSocket, 5000),
- Pid = receive
- {conn, C} ->
- C
- after 1000 ->
- error(timeout)
- end,
- Ref = monitor(process, Pid),
- receive
- {'DOWN', Ref, process, Pid, {shutdown, {owner_down, unexpected}}} ->
- ok
- after 1000 ->
- true = erlang:is_process_alive(Pid),
- error(timeout)
- end.
-
-detect_owner_down_ws(_) ->
- Name = name(),
- {ok, _} = cowboy:start_clear(Name, [], #{env => #{
- dispatch => cowboy_router:compile([{'_', [{"/", ws_echo_h, []}]}])
- }}),
- Port = ranch:get_port(Name),
- Self = self(),
- spawn(fun() ->
- {ok, ConnPid} = gun:open("localhost", Port),
- Self ! {conn, ConnPid},
- gun:await_up(ConnPid),
- gun:ws_upgrade(ConnPid, "/", []),
- receive
- {gun_upgrade, ConnPid, _, [<<"websocket">>], _} ->
- ok
- after 1000 ->
- error(timeout)
- end
- end),
- Pid = receive
- {conn, C} ->
- C
- after 1000 ->
- error(timeout)
- end,
- Ref = monitor(process, Pid),
- receive
- {'DOWN', Ref, process, Pid, normal} ->
- ok
- after 1000 ->
- true = erlang:is_process_alive(Pid),
- error(timeout)
- end,
- cowboy:stop_listener(Name).
-
ignore_empty_data_http(_) ->
doc("When gun:data/4 is called with nofin and empty data, it must be ignored."),
{ok, OriginPid, OriginPort} = init_origin(tcp, http),
diff --git a/test/handlers/delayed_hello_h.erl b/test/handlers/delayed_hello_h.erl
new file mode 100644
index 0000000..68ef1ad
--- /dev/null
+++ b/test/handlers/delayed_hello_h.erl
@@ -0,0 +1,11 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(delayed_hello_h).
+
+-export([init/2]).
+
+init(Req, Timeout) ->
+ timer:sleep(Timeout),
+ {ok, cowboy_req:reply(200, #{
+ <<"content-type">> => <<"text/plain">>
+ }, <<"Hello world!">>, Req), Timeout}.
diff --git a/test/handlers/delayed_push_h.erl b/test/handlers/delayed_push_h.erl
new file mode 100644
index 0000000..dbb8e56
--- /dev/null
+++ b/test/handlers/delayed_push_h.erl
@@ -0,0 +1,13 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(delayed_push_h).
+
+-export([init/2]).
+
+init(Req, Timeout) ->
+ timer:sleep(Timeout),
+ cowboy_req:push("/", #{<<"accept">> => <<"text/plain">>}, Req),
+ cowboy_req:push("/empty", #{<<"accept">> => <<"text/plain">>}, Req),
+ {ok, cowboy_req:reply(200, #{
+ <<"content-type">> => <<"text/plain">>
+ }, <<"Hello world!">>, Req), Timeout}.
diff --git a/test/handlers/ws_frozen_h.erl b/test/handlers/ws_frozen_h.erl
new file mode 100644
index 0000000..bac77c2
--- /dev/null
+++ b/test/handlers/ws_frozen_h.erl
@@ -0,0 +1,23 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(ws_frozen_h).
+
+-export([init/2]).
+-export([websocket_init/1]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+
+init(Req, State) ->
+ {cowboy_websocket, Req, State, #{
+ compress => true
+ }}.
+
+websocket_init(Timeout) ->
+ timer:sleep(Timeout),
+ {ok, undefined}.
+
+websocket_handle(_Frame, State) ->
+ {[], State}.
+
+websocket_info(_Info, State) ->
+ {[], State}.
diff --git a/test/handlers/ws_timeout_close_h.erl b/test/handlers/ws_timeout_close_h.erl
new file mode 100644
index 0000000..6fef168
--- /dev/null
+++ b/test/handlers/ws_timeout_close_h.erl
@@ -0,0 +1,25 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(ws_timeout_close_h).
+
+-export([init/2]).
+-export([websocket_init/1]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+
+init(Req, State) ->
+ {cowboy_websocket, Req, State, #{
+ compress => true
+ }}.
+
+websocket_init(Timeout) ->
+ _ = erlang:send_after(Timeout, self(), timeout_close),
+ {[], undefined}.
+
+websocket_handle(_Frame, State) ->
+ {[], State}.
+
+websocket_info(timeout_close, State) ->
+ {[{close, 3333, <<>>}], State};
+websocket_info(_Info, State) ->
+ {[], State}.
diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl
index f494c9f..507b75a 100644
--- a/test/rfc7540_SUITE.erl
+++ b/test/rfc7540_SUITE.erl
@@ -81,11 +81,11 @@ lingering_data_counts_toward_connection_window(_) ->
{ok, _} = gen_tcp:recv(Socket, SkipLen, 1000),
%% Skip the data.
{ok, <<_:24, 0:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Step 3.
%% Send a HEADERS frame.
{HeadersBlock, _} = cow_hpack:encode([
{<<":status">>, <<"200">>}
]),
- %% Step 3.
ok = Transport:send(Socket, [
cow_http2:headers(1, nofin, HeadersBlock)
]),
diff --git a/test/shutdown_SUITE.erl b/test/shutdown_SUITE.erl
new file mode 100644
index 0000000..e52a3ab
--- /dev/null
+++ b/test/shutdown_SUITE.erl
@@ -0,0 +1,609 @@
+%% Copyright (c) 2019, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(shutdown_SUITE).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-import(ct_helper, [doc/1]).
+-import(ct_helper, [config/2]).
+-import(gun_test, [init_origin/3]).
+
+all() ->
+ [{group, shutdown}].
+
+groups() ->
+ [{shutdown, [parallel], ct_helper:all(?MODULE)}].
+
+init_per_suite(Config) ->
+ ProtoOpts = #{env => #{
+ dispatch => cowboy_router:compile([{'_', [
+ {"/", hello_h, []},
+ {"/delayed", delayed_hello_h, 500},
+ {"/delayed_push", delayed_push_h, 500},
+ {"/empty", empty_h, []},
+ {"/ws", ws_echo_h, []},
+ {"/ws_frozen", ws_frozen_h, 500},
+ %% This timeout determines how long the test suite will run.
+ {"/ws_frozen_long", ws_frozen_h, 1500},
+ {"/ws_timeout_close", ws_timeout_close_h, 500}
+ ]}])
+ }},
+ {ok, _} = cowboy:start_clear(?MODULE, [], ProtoOpts),
+ OriginPort = ranch:get_port(?MODULE),
+ [{origin_port, OriginPort}|Config].
+
+end_per_suite(_) ->
+ ok = cowboy:stop_listener(?MODULE).
+
+%% Tests.
+%%
+%% This test suite checks that the various ways to shut down
+%% the connection are all working as expected for the different
+%% protocols and scenarios.
+
+not_connected_gun_shutdown(_) ->
+ doc("Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 while it isn't connected."),
+ {ok, ConnPid} = gun:open("localhost", 12345),
+ ConnRef = monitor(process, ConnPid),
+ gun:shutdown(ConnPid),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+not_connected_owner_down(_) ->
+ doc("Confirm that the Gun process shuts down when the owner exits normally "
+ "while it isn't connected."),
+ do_not_connected_owner_down(normal, normal).
+
+not_connected_owner_down_error(_) ->
+ doc("Confirm that the Gun process shuts down when the owner exits with an error "
+ "while it isn't connected."),
+ do_not_connected_owner_down(unexpected, {shutdown, {owner_down, unexpected}}).
+
+do_not_connected_owner_down(ExitReason, DownReason) ->
+ Self = self(),
+ spawn(fun() ->
+ {ok, ConnPid} = gun:open("localhost", 12345),
+ Self ! {conn, ConnPid},
+ timer:sleep(500),
+ exit(ExitReason)
+ end),
+ ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end,
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, DownReason).
+
+http1_gun_shutdown_no_streams(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with no active streams."),
+ do_http_gun_shutdown_no_streams(Config, http).
+
+do_http_gun_shutdown_no_streams(Config, Protocol) ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ gun:shutdown(ConnPid),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http1_gun_shutdown_one_stream(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with one active stream."),
+ do_http_gun_shutdown_one_stream(Config, http).
+
+do_http_gun_shutdown_one_stream(Config, Protocol) ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/delayed"),
+ gun:shutdown(ConnPid),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http1_gun_shutdown_pipelined_streams(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with one active stream and additional pipelined streams."),
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef1 = gun:get(ConnPid, "/delayed"),
+ StreamRef2 = gun:get(ConnPid, "/delayed"),
+ StreamRef3 = gun:get(ConnPid, "/delayed"),
+ gun:shutdown(ConnPid),
+ %% Pipelined streams are canceled immediately.
+ {error, {stream_error, {closing, shutdown}}} = gun:await(ConnPid, StreamRef2),
+ {error, {stream_error, {closing, shutdown}}} = gun:await(ConnPid, StreamRef3),
+ %% The active stream is still processed.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {ok, _} = gun:await_body(ConnPid, StreamRef1),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http1_gun_shutdown_timeout(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down when the closing_timeout "
+ "triggers after calling gun:shutdown/1 with one active stream."),
+ do_http_gun_shutdown_timeout(Config, http, http_opts).
+
+do_http_gun_shutdown_timeout(Config, Protocol, ProtoOpts) ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ ProtoOpts => #{closing_timeout => 100},
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/delayed"),
+ gun:shutdown(ConnPid),
+ %% The closing timeout occurs before the server gets to send the response.
+ %% We get a 'closed' error instead of 'closing' as a result.
+ {error, {stream_error, {closed, shutdown}}} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http1_owner_down(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down when the owner exits normally."),
+ do_http_owner_down(Config, http, normal, normal).
+
+http1_owner_down_error(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down when the owner exits with an error."),
+ do_http_owner_down(Config, http, unexpected, {shutdown, {owner_down, unexpected}}).
+
+do_http_owner_down(Config, Protocol, ExitReason, DownReason) ->
+ Self = self(),
+ spawn(fun() ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ Self ! {conn, ConnPid},
+ {ok, Protocol} = gun:await_up(ConnPid),
+ timer:sleep(500),
+ exit(ExitReason)
+ end),
+ ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end,
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, DownReason).
+
+http1_request_connection_close(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when sending a request with the connection: close header and "
+ "retry is disabled."),
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/", #{
+ <<"connection">> => <<"close">>
+ }),
+ %% We get the response followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http1_request_connection_close_pipeline(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when sending a request with the connection: close header and "
+ "retry is disabled. Pipelined requests get canceled."),
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef1 = gun:get(ConnPid, "/", #{
+ <<"connection">> => <<"close">>
+ }),
+ StreamRef2 = gun:get(ConnPid, "/"),
+ StreamRef3 = gun:get(ConnPid, "/"),
+ %% We get the response, pipelined streams get canceled, followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {ok, _} = gun:await_body(ConnPid, StreamRef1),
+ {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef2),
+ {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef3),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http1_response_connection_close(_) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when receiving a response with the connection: close header and "
+ "retry is disabled."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{
+ env => #{dispatch => cowboy_router:compile([{'_', [{"/", hello_h, []}]}])},
+ max_keepalive => 1
+ }),
+ OriginPort = ranch:get_port(?FUNCTION_NAME),
+ try
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", OriginPort, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/"),
+ %% We get the response followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+http1_response_connection_close_pipeline(_) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when receiving a response with the connection: close header and "
+ "retry is disabled. Pipelined requests get canceled."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{
+ env => #{dispatch => cowboy_router:compile([{'_', [{"/", hello_h, []}]}])},
+ max_keepalive => 1
+ }),
+ OriginPort = ranch:get_port(?FUNCTION_NAME),
+ try
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", OriginPort, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef1 = gun:get(ConnPid, "/"),
+ StreamRef2 = gun:get(ConnPid, "/"),
+ StreamRef3 = gun:get(ConnPid, "/"),
+ %% We get the response, pipelined streams get canceled, followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {ok, _} = gun:await_body(ConnPid, StreamRef1),
+ {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef2),
+ {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef3),
+ gun_is_down(ConnPid, ConnRef, normal)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+http10_connection_close(Config) ->
+ doc("HTTP/1.0: Confirm that the Gun process shuts down gracefully "
+ "when sending a request without a connection header and "
+ "retry is disabled."),
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ http_opts => #{version => 'HTTP/1.0'},
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/"),
+ %% We get the response followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http2_gun_shutdown_no_streams(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with no active streams."),
+ do_http_gun_shutdown_no_streams(Config, http2).
+
+http2_gun_shutdown_one_stream(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with one active stream."),
+ do_http_gun_shutdown_one_stream(Config, http2).
+
+http2_gun_shutdown_many_streams(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with many active streams."),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef1 = gun:get(ConnPid, "/delayed"),
+ StreamRef2 = gun:get(ConnPid, "/delayed"),
+ StreamRef3 = gun:get(ConnPid, "/delayed"),
+ gun:shutdown(ConnPid),
+ %% All streams are processed.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {ok, _} = gun:await_body(ConnPid, StreamRef1),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef2),
+ {ok, _} = gun:await_body(ConnPid, StreamRef2),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef3),
+ {ok, _} = gun:await_body(ConnPid, StreamRef3),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http2_gun_shutdown_timeout(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down when the closing_timeout "
+ "triggers after calling gun:shutdown/1 with one active stream."),
+ do_http_gun_shutdown_timeout(Config, http2, http2_opts).
+
+http2_gun_shutdown_ignore_push_promise(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with one active stream. The "
+ "resource pushed by the server after we sent the GOAWAY frame "
+ "must be ignored."),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/delayed_push"),
+ gun:shutdown(ConnPid),
+ %% We do not receive the push streams. Only the response.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http2_owner_down(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down when the owner exits normally."),
+ do_http_owner_down(Config, http2, normal, normal).
+
+http2_owner_down_error(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down when the owner exits with an error."),
+ do_http_owner_down(Config, http2, unexpected, {shutdown, {owner_down, unexpected}}).
+
+http2_server_goaway_no_streams(_) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when receiving a GOAWAY frame with no active streams and "
+ "retry is disabled."),
+ {ok, _, Port} = init_origin(tcp, http2, fun(_, Socket, Transport) ->
+ Transport:send(Socket, cow_http2:goaway(0, no_error, <<>>)),
+ timer:sleep(500)
+ end),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http2_server_goaway_one_stream(_) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when receiving a GOAWAY frame with one active stream and "
+ "retry is disabled."),
+ {ok, _, OriginPort} = init_origin(tcp, http2, fun(_, Socket, Transport) ->
+ %% Receive a HEADERS frame.
+ {ok, <<SkipLen:24, 1:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Skip the header.
+ {ok, _} = gen_tcp:recv(Socket, SkipLen, 1000),
+ %% Skip the data.
+ {ok, <<_:24, 0:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Send a GOAWAY frame.
+ Transport:send(Socket, cow_http2:goaway(1, no_error, <<>>)),
+ %% Wait before sending the response back and closing the connection.
+ timer:sleep(500),
+ %% Send a HEADERS frame.
+ {HeadersBlock, _} = cow_hpack:encode([
+ {<<":status">>, <<"200">>}
+ ]),
+ ok = Transport:send(Socket, [
+ cow_http2:headers(1, fin, HeadersBlock)
+ ]),
+ timer:sleep(500)
+ end),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", OriginPort, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ timer:sleep(100), %% Give enough time for the handshake to fully complete.
+ StreamRef = gun:get(ConnPid, "/"),
+ ConnRef = monitor(process, ConnPid),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http2_server_goaway_many_streams(_) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when receiving a GOAWAY frame with many active streams and "
+ "retry is disabled."),
+ {ok, _, OriginPort} = init_origin(tcp, http2, fun(_, Socket, Transport) ->
+ %% Stream 1.
+ %% Receive a HEADERS frame.
+ {ok, <<SkipLen1:24, 1:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Skip the header.
+ {ok, _} = gen_tcp:recv(Socket, SkipLen1, 1000),
+ %% Skip the data.
+ {ok, <<_:24, 0:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Stream 2.
+ %% Receive a HEADERS frame.
+ {ok, <<SkipLen2:24, 1:8, _:8, 3:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Skip the header.
+ {ok, _} = gen_tcp:recv(Socket, SkipLen2, 1000),
+ %% Skip the data.
+ {ok, <<_:24, 0:8, _:8, 3:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Stream 3.
+ %% Receive a HEADERS frame.
+ {ok, <<SkipLen3:24, 1:8, _:8, 5:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Skip the header.
+ {ok, _} = gen_tcp:recv(Socket, SkipLen3, 1000),
+ %% Skip the data.
+ {ok, <<_:24, 0:8, _:8, 5:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Send a GOAWAY frame.
+ Transport:send(Socket, cow_http2:goaway(5, no_error, <<>>)),
+ %% Wait before sending the responses back and closing the connection.
+ timer:sleep(500),
+ %% Send a HEADERS frame.
+ {HeadersBlock1, State0} = cow_hpack:encode([
+ {<<":status">>, <<"200">>}
+ ]),
+ ok = Transport:send(Socket, [
+ cow_http2:headers(1, fin, HeadersBlock1)
+ ]),
+ {HeadersBlock2, State} = cow_hpack:encode([
+ {<<":status">>, <<"200">>}
+ ], State0),
+ ok = Transport:send(Socket, [
+ cow_http2:headers(3, fin, HeadersBlock2)
+ ]),
+ {HeadersBlock3, _} = cow_hpack:encode([
+ {<<":status">>, <<"200">>}
+ ], State),
+ ok = Transport:send(Socket, [
+ cow_http2:headers(5, fin, HeadersBlock3)
+ ]),
+ timer:sleep(500)
+ end),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", OriginPort, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ timer:sleep(100), %% Give enough time for the handshake to fully complete.
+ StreamRef1 = gun:get(ConnPid, "/"),
+ StreamRef2 = gun:get(ConnPid, "/"),
+ StreamRef3 = gun:get(ConnPid, "/"),
+ ConnRef = monitor(process, ConnPid),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef2),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef3),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+ws_gun_shutdown(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ gun:shutdown(ConnPid),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+ws_gun_shutdown_timeout(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down when "
+ "the closing_timeout triggers after calling gun:shutdown/1."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ ws_opts => #{closing_timeout => 100}
+ }),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws_frozen_long", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ gun:shutdown(ConnPid),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+ws_owner_down(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down when the owner exits normally."),
+ do_ws_owner_down(Config, normal, normal).
+
+ws_owner_down_error(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down when the owner exits with an error."),
+ do_ws_owner_down(Config, unexpected, {shutdown, {owner_down, unexpected}}).
+
+do_ws_owner_down(Config, ExitReason, DownReason) ->
+ Self = self(),
+ spawn(fun() ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)),
+ Self ! {conn, ConnPid},
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ timer:sleep(500),
+ exit(ExitReason)
+ end),
+ ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end,
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, DownReason).
+
+ws_gun_send_close_frame(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down gracefully "
+ "when sending a close frame, with retry disabled."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ retry => 0
+ }),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ %% We send a close frame. We expect the same frame back
+ %% before the connection is closed.
+ Frame = {close, 3333, <<>>},
+ gun:ws_send(ConnPid, Frame),
+ {ws, Frame} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+ws_gun_receive_close_frame(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down gracefully "
+ "when receiving a close frame, with retry disabled."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ retry => 0
+ }),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws_timeout_close", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ %% We expect a close frame before the connection is closed.
+ {ws, {close, 3333, <<>>}} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+closing_gun_shutdown(Config) ->
+ doc("Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 while Gun is closing a connection."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws_frozen", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ %% We send a close frame then immediately call gun:shutdown/1.
+ %% We expect Gun to go down without retrying to reconnect.
+ Frame = {close, 3333, <<>>},
+ gun:ws_send(ConnPid, Frame),
+ gun:shutdown(ConnPid),
+ {ws, Frame} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+closing_owner_down(Config) ->
+ doc("Confirm that the Gun process shuts down gracefully "
+ "when the owner exits normally while Gun is closing a connection."),
+ do_closing_owner_down(Config, normal, normal).
+
+closing_owner_down_error(Config) ->
+ doc("Confirm that the Gun process shuts down gracefully "
+ "when the owner exits with an error while Gun is closing a connection."),
+ do_closing_owner_down(Config, unexpected, {shutdown, {owner_down, unexpected}}).
+
+do_closing_owner_down(Config, ExitReason, DownReason) ->
+ Self = self(),
+ spawn(fun() ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)),
+ Self ! {conn, ConnPid},
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws_frozen", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ gun:ws_send(ConnPid, {close, 3333, <<>>}),
+ timer:sleep(100),
+ exit(ExitReason)
+ end),
+ ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end,
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, DownReason).
+
+%% Internal.
+
+gun_is_down(ConnPid, ConnRef, Expected) ->
+ receive
+ {'DOWN', ConnRef, process, ConnPid, Reason} ->
+ Expected = Reason,
+ ok
+ after 1000 ->
+ true = erlang:is_process_alive(ConnPid),
+ error(timeout)
+ end.
diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl
index 5cc50ec..1abf046 100644
--- a/test/ws_SUITE.erl
+++ b/test/ws_SUITE.erl
@@ -68,3 +68,30 @@ reject_upgrade(Config) ->
after 1000 ->
error(timeout)
end.
+
+send_many(Config) ->
+ doc("Ensure we can send a list of frames in one gun:ws_send call."),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config)),
+ {ok, _} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ Frame1 = {text, <<"Hello!">>},
+ Frame2 = {binary, <<"World!">>},
+ gun:ws_send(ConnPid, [Frame1, Frame2]),
+ {ws, Frame1} = gun:await(ConnPid, StreamRef),
+ {ws, Frame2} = gun:await(ConnPid, StreamRef),
+ gun:close(ConnPid).
+
+send_many_close(Config) ->
+ doc("Ensure we can send a list of frames in one gun:ws_send call, including a close frame."),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config)),
+ {ok, _} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ Frame1 = {text, <<"Hello!">>},
+ Frame2 = {binary, <<"World!">>},
+ gun:ws_send(ConnPid, [Frame1, Frame2, close]),
+ {ws, Frame1} = gun:await(ConnPid, StreamRef),
+ {ws, Frame2} = gun:await(ConnPid, StreamRef),
+ {ws, close} = gun:await(ConnPid, StreamRef),
+ gun:close(ConnPid).