aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-08-08 16:33:09 +0200
committerLoïc Hoguin <[email protected]>2019-09-05 11:28:07 +0200
commitc974b4334e7ab660f9bf95653696c3663c02ead3 (patch)
tree9e501a4928b261c4fe9adc74d80c47b6b14ae50a /test
parent491ddf58c0e14824a741852fdc522b390b306ae2 (diff)
downloadgun-c974b4334e7ab660f9bf95653696c3663c02ead3.tar.gz
gun-c974b4334e7ab660f9bf95653696c3663c02ead3.tar.bz2
gun-c974b4334e7ab660f9bf95653696c3663c02ead3.zip
Implement graceful shutdown
The graceful shutdown is implemented through a new 'closing' state. This state is entered under different circumstances depending on the protocol. The gun:shutdown/1 function is now implemented and documented. It allows shutting down the connection gracefully regardless of the current state of the connection and for all protocols. The behavior is entirely dependent on the protocol. For HTTP/1.1 the connection stays up only until after the current stream is complete; other streams are immediately canceled. For HTTP/2 a GOAWAY frame is sent and existing streams continue to be processed. The connection is closed after all streams are processed and the server's GOAWAY frame is received. For Websocket a close frame is sent. The connection is closed when receiving the server's close frame. In all cases the closing_timeout option defines how long we wait, as a maximum, before closing the connection after the graceful shutdown was started. The graceful shutdown is also initiated when the owner process goes away; when sending an HTTP/1.1 request with the connection: close header; when receiving an HTTP/1.1 response with the connection: close header; when receiving an HTTP/1.0 response without a connection header; when the server sends a GOAWAY HTTP/2 frame; or when we send or receive a Websocket close frame. Along with these changes, the gun:ws_send/2 function now accepts a list of frames as argument. Those frames may include a close frame that initiates the graceful shutdown.
Diffstat (limited to 'test')
-rw-r--r--test/gun_SUITE.erl88
-rw-r--r--test/handlers/delayed_hello_h.erl11
-rw-r--r--test/handlers/delayed_push_h.erl13
-rw-r--r--test/handlers/ws_frozen_h.erl23
-rw-r--r--test/handlers/ws_timeout_close_h.erl25
-rw-r--r--test/rfc7540_SUITE.erl2
-rw-r--r--test/shutdown_SUITE.erl609
-rw-r--r--test/ws_SUITE.erl27
8 files changed, 709 insertions, 89 deletions
diff --git a/test/gun_SUITE.erl b/test/gun_SUITE.erl
index 3d3734b..0beee43 100644
--- a/test/gun_SUITE.erl
+++ b/test/gun_SUITE.erl
@@ -90,94 +90,6 @@ do_timeout(Opt, Timeout) ->
gun:close(Pid)
end.
-detect_owner_down(_) ->
- {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]),
- {ok, {_, Port}} = inet:sockname(ListenSocket),
- Self = self(),
- spawn(fun() ->
- {ok, ConnPid} = gun:open("localhost", Port),
- Self ! {conn, ConnPid},
- gun:await_up(ConnPid),
- timer:sleep(100)
- end),
- {ok, _} = gen_tcp:accept(ListenSocket, 5000),
- Pid = receive
- {conn, C} ->
- C
- after 1000 ->
- error(timeout)
- end,
- Ref = monitor(process, Pid),
- receive
- {'DOWN', Ref, process, Pid, normal} ->
- ok
- after 1000 ->
- true = erlang:is_process_alive(Pid),
- error(timeout)
- end.
-
-detect_owner_down_unexpected(_) ->
- {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]),
- {ok, {_, Port}} = inet:sockname(ListenSocket),
- Self = self(),
- spawn(fun() ->
- {ok, ConnPid} = gun:open("localhost", Port),
- Self ! {conn, ConnPid},
- gun:await_up(ConnPid),
- timer:sleep(100),
- exit(unexpected)
- end),
- {ok, _} = gen_tcp:accept(ListenSocket, 5000),
- Pid = receive
- {conn, C} ->
- C
- after 1000 ->
- error(timeout)
- end,
- Ref = monitor(process, Pid),
- receive
- {'DOWN', Ref, process, Pid, {shutdown, {owner_down, unexpected}}} ->
- ok
- after 1000 ->
- true = erlang:is_process_alive(Pid),
- error(timeout)
- end.
-
-detect_owner_down_ws(_) ->
- Name = name(),
- {ok, _} = cowboy:start_clear(Name, [], #{env => #{
- dispatch => cowboy_router:compile([{'_', [{"/", ws_echo_h, []}]}])
- }}),
- Port = ranch:get_port(Name),
- Self = self(),
- spawn(fun() ->
- {ok, ConnPid} = gun:open("localhost", Port),
- Self ! {conn, ConnPid},
- gun:await_up(ConnPid),
- gun:ws_upgrade(ConnPid, "/", []),
- receive
- {gun_upgrade, ConnPid, _, [<<"websocket">>], _} ->
- ok
- after 1000 ->
- error(timeout)
- end
- end),
- Pid = receive
- {conn, C} ->
- C
- after 1000 ->
- error(timeout)
- end,
- Ref = monitor(process, Pid),
- receive
- {'DOWN', Ref, process, Pid, normal} ->
- ok
- after 1000 ->
- true = erlang:is_process_alive(Pid),
- error(timeout)
- end,
- cowboy:stop_listener(Name).
-
ignore_empty_data_http(_) ->
doc("When gun:data/4 is called with nofin and empty data, it must be ignored."),
{ok, OriginPid, OriginPort} = init_origin(tcp, http),
diff --git a/test/handlers/delayed_hello_h.erl b/test/handlers/delayed_hello_h.erl
new file mode 100644
index 0000000..68ef1ad
--- /dev/null
+++ b/test/handlers/delayed_hello_h.erl
@@ -0,0 +1,11 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(delayed_hello_h).
+
+-export([init/2]).
+
+init(Req, Timeout) ->
+ timer:sleep(Timeout),
+ {ok, cowboy_req:reply(200, #{
+ <<"content-type">> => <<"text/plain">>
+ }, <<"Hello world!">>, Req), Timeout}.
diff --git a/test/handlers/delayed_push_h.erl b/test/handlers/delayed_push_h.erl
new file mode 100644
index 0000000..dbb8e56
--- /dev/null
+++ b/test/handlers/delayed_push_h.erl
@@ -0,0 +1,13 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(delayed_push_h).
+
+-export([init/2]).
+
+init(Req, Timeout) ->
+ timer:sleep(Timeout),
+ cowboy_req:push("/", #{<<"accept">> => <<"text/plain">>}, Req),
+ cowboy_req:push("/empty", #{<<"accept">> => <<"text/plain">>}, Req),
+ {ok, cowboy_req:reply(200, #{
+ <<"content-type">> => <<"text/plain">>
+ }, <<"Hello world!">>, Req), Timeout}.
diff --git a/test/handlers/ws_frozen_h.erl b/test/handlers/ws_frozen_h.erl
new file mode 100644
index 0000000..bac77c2
--- /dev/null
+++ b/test/handlers/ws_frozen_h.erl
@@ -0,0 +1,23 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(ws_frozen_h).
+
+-export([init/2]).
+-export([websocket_init/1]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+
+init(Req, State) ->
+ {cowboy_websocket, Req, State, #{
+ compress => true
+ }}.
+
+websocket_init(Timeout) ->
+ timer:sleep(Timeout),
+ {ok, undefined}.
+
+websocket_handle(_Frame, State) ->
+ {[], State}.
+
+websocket_info(_Info, State) ->
+ {[], State}.
diff --git a/test/handlers/ws_timeout_close_h.erl b/test/handlers/ws_timeout_close_h.erl
new file mode 100644
index 0000000..6fef168
--- /dev/null
+++ b/test/handlers/ws_timeout_close_h.erl
@@ -0,0 +1,25 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(ws_timeout_close_h).
+
+-export([init/2]).
+-export([websocket_init/1]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+
+init(Req, State) ->
+ {cowboy_websocket, Req, State, #{
+ compress => true
+ }}.
+
+websocket_init(Timeout) ->
+ _ = erlang:send_after(Timeout, self(), timeout_close),
+ {[], undefined}.
+
+websocket_handle(_Frame, State) ->
+ {[], State}.
+
+websocket_info(timeout_close, State) ->
+ {[{close, 3333, <<>>}], State};
+websocket_info(_Info, State) ->
+ {[], State}.
diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl
index f494c9f..507b75a 100644
--- a/test/rfc7540_SUITE.erl
+++ b/test/rfc7540_SUITE.erl
@@ -81,11 +81,11 @@ lingering_data_counts_toward_connection_window(_) ->
{ok, _} = gen_tcp:recv(Socket, SkipLen, 1000),
%% Skip the data.
{ok, <<_:24, 0:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Step 3.
%% Send a HEADERS frame.
{HeadersBlock, _} = cow_hpack:encode([
{<<":status">>, <<"200">>}
]),
- %% Step 3.
ok = Transport:send(Socket, [
cow_http2:headers(1, nofin, HeadersBlock)
]),
diff --git a/test/shutdown_SUITE.erl b/test/shutdown_SUITE.erl
new file mode 100644
index 0000000..e52a3ab
--- /dev/null
+++ b/test/shutdown_SUITE.erl
@@ -0,0 +1,609 @@
+%% Copyright (c) 2019, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(shutdown_SUITE).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-import(ct_helper, [doc/1]).
+-import(ct_helper, [config/2]).
+-import(gun_test, [init_origin/3]).
+
+all() ->
+ [{group, shutdown}].
+
+groups() ->
+ [{shutdown, [parallel], ct_helper:all(?MODULE)}].
+
+init_per_suite(Config) ->
+ ProtoOpts = #{env => #{
+ dispatch => cowboy_router:compile([{'_', [
+ {"/", hello_h, []},
+ {"/delayed", delayed_hello_h, 500},
+ {"/delayed_push", delayed_push_h, 500},
+ {"/empty", empty_h, []},
+ {"/ws", ws_echo_h, []},
+ {"/ws_frozen", ws_frozen_h, 500},
+ %% This timeout determines how long the test suite will run.
+ {"/ws_frozen_long", ws_frozen_h, 1500},
+ {"/ws_timeout_close", ws_timeout_close_h, 500}
+ ]}])
+ }},
+ {ok, _} = cowboy:start_clear(?MODULE, [], ProtoOpts),
+ OriginPort = ranch:get_port(?MODULE),
+ [{origin_port, OriginPort}|Config].
+
+end_per_suite(_) ->
+ ok = cowboy:stop_listener(?MODULE).
+
+%% Tests.
+%%
+%% This test suite checks that the various ways to shut down
+%% the connection are all working as expected for the different
+%% protocols and scenarios.
+
+not_connected_gun_shutdown(_) ->
+ doc("Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 while it isn't connected."),
+ {ok, ConnPid} = gun:open("localhost", 12345),
+ ConnRef = monitor(process, ConnPid),
+ gun:shutdown(ConnPid),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+not_connected_owner_down(_) ->
+ doc("Confirm that the Gun process shuts down when the owner exits normally "
+ "while it isn't connected."),
+ do_not_connected_owner_down(normal, normal).
+
+not_connected_owner_down_error(_) ->
+ doc("Confirm that the Gun process shuts down when the owner exits with an error "
+ "while it isn't connected."),
+ do_not_connected_owner_down(unexpected, {shutdown, {owner_down, unexpected}}).
+
+do_not_connected_owner_down(ExitReason, DownReason) ->
+ Self = self(),
+ spawn(fun() ->
+ {ok, ConnPid} = gun:open("localhost", 12345),
+ Self ! {conn, ConnPid},
+ timer:sleep(500),
+ exit(ExitReason)
+ end),
+ ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end,
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, DownReason).
+
+http1_gun_shutdown_no_streams(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with no active streams."),
+ do_http_gun_shutdown_no_streams(Config, http).
+
+do_http_gun_shutdown_no_streams(Config, Protocol) ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ gun:shutdown(ConnPid),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http1_gun_shutdown_one_stream(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with one active stream."),
+ do_http_gun_shutdown_one_stream(Config, http).
+
+do_http_gun_shutdown_one_stream(Config, Protocol) ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/delayed"),
+ gun:shutdown(ConnPid),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http1_gun_shutdown_pipelined_streams(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with one active stream and additional pipelined streams."),
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef1 = gun:get(ConnPid, "/delayed"),
+ StreamRef2 = gun:get(ConnPid, "/delayed"),
+ StreamRef3 = gun:get(ConnPid, "/delayed"),
+ gun:shutdown(ConnPid),
+ %% Pipelined streams are canceled immediately.
+ {error, {stream_error, {closing, shutdown}}} = gun:await(ConnPid, StreamRef2),
+ {error, {stream_error, {closing, shutdown}}} = gun:await(ConnPid, StreamRef3),
+ %% The active stream is still processed.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {ok, _} = gun:await_body(ConnPid, StreamRef1),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http1_gun_shutdown_timeout(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down when the closing_timeout "
+ "triggers after calling gun:shutdown/1 with one active stream."),
+ do_http_gun_shutdown_timeout(Config, http, http_opts).
+
+do_http_gun_shutdown_timeout(Config, Protocol, ProtoOpts) ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ ProtoOpts => #{closing_timeout => 100},
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/delayed"),
+ gun:shutdown(ConnPid),
+ %% The closing timeout occurs before the server gets to send the response.
+ %% We get a 'closed' error instead of 'closing' as a result.
+ {error, {stream_error, {closed, shutdown}}} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http1_owner_down(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down when the owner exits normally."),
+ do_http_owner_down(Config, http, normal, normal).
+
+http1_owner_down_error(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down when the owner exits with an error."),
+ do_http_owner_down(Config, http, unexpected, {shutdown, {owner_down, unexpected}}).
+
+do_http_owner_down(Config, Protocol, ExitReason, DownReason) ->
+ Self = self(),
+ spawn(fun() ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ Self ! {conn, ConnPid},
+ {ok, Protocol} = gun:await_up(ConnPid),
+ timer:sleep(500),
+ exit(ExitReason)
+ end),
+ ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end,
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, DownReason).
+
+http1_request_connection_close(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when sending a request with the connection: close header and "
+ "retry is disabled."),
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/", #{
+ <<"connection">> => <<"close">>
+ }),
+ %% We get the response followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http1_request_connection_close_pipeline(Config) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when sending a request with the connection: close header and "
+ "retry is disabled. Pipelined requests get canceled."),
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef1 = gun:get(ConnPid, "/", #{
+ <<"connection">> => <<"close">>
+ }),
+ StreamRef2 = gun:get(ConnPid, "/"),
+ StreamRef3 = gun:get(ConnPid, "/"),
+ %% We get the response, pipelined streams get canceled, followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {ok, _} = gun:await_body(ConnPid, StreamRef1),
+ {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef2),
+ {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef3),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http1_response_connection_close(_) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when receiving a response with the connection: close header and "
+ "retry is disabled."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{
+ env => #{dispatch => cowboy_router:compile([{'_', [{"/", hello_h, []}]}])},
+ max_keepalive => 1
+ }),
+ OriginPort = ranch:get_port(?FUNCTION_NAME),
+ try
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", OriginPort, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/"),
+ %% We get the response followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+http1_response_connection_close_pipeline(_) ->
+ doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully "
+ "when receiving a response with the connection: close header and "
+ "retry is disabled. Pipelined requests get canceled."),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{
+ env => #{dispatch => cowboy_router:compile([{'_', [{"/", hello_h, []}]}])},
+ max_keepalive => 1
+ }),
+ OriginPort = ranch:get_port(?FUNCTION_NAME),
+ try
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", OriginPort, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef1 = gun:get(ConnPid, "/"),
+ StreamRef2 = gun:get(ConnPid, "/"),
+ StreamRef3 = gun:get(ConnPid, "/"),
+ %% We get the response, pipelined streams get canceled, followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {ok, _} = gun:await_body(ConnPid, StreamRef1),
+ {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef2),
+ {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef3),
+ gun_is_down(ConnPid, ConnRef, normal)
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+http10_connection_close(Config) ->
+ doc("HTTP/1.0: Confirm that the Gun process shuts down gracefully "
+ "when sending a request without a connection header and "
+ "retry is disabled."),
+ Protocol = http,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ http_opts => #{version => 'HTTP/1.0'},
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/"),
+ %% We get the response followed by Gun shutting down.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http2_gun_shutdown_no_streams(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with no active streams."),
+ do_http_gun_shutdown_no_streams(Config, http2).
+
+http2_gun_shutdown_one_stream(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with one active stream."),
+ do_http_gun_shutdown_one_stream(Config, http2).
+
+http2_gun_shutdown_many_streams(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with many active streams."),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef1 = gun:get(ConnPid, "/delayed"),
+ StreamRef2 = gun:get(ConnPid, "/delayed"),
+ StreamRef3 = gun:get(ConnPid, "/delayed"),
+ gun:shutdown(ConnPid),
+ %% All streams are processed.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {ok, _} = gun:await_body(ConnPid, StreamRef1),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef2),
+ {ok, _} = gun:await_body(ConnPid, StreamRef2),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef3),
+ {ok, _} = gun:await_body(ConnPid, StreamRef3),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http2_gun_shutdown_timeout(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down when the closing_timeout "
+ "triggers after calling gun:shutdown/1 with one active stream."),
+ do_http_gun_shutdown_timeout(Config, http2, http2_opts).
+
+http2_gun_shutdown_ignore_push_promise(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 with one active stream. The "
+ "resource pushed by the server after we sent the GOAWAY frame "
+ "must be ignored."),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:get(ConnPid, "/delayed_push"),
+ gun:shutdown(ConnPid),
+ %% We do not receive the push streams. Only the response.
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ {ok, _} = gun:await_body(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+http2_owner_down(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down when the owner exits normally."),
+ do_http_owner_down(Config, http2, normal, normal).
+
+http2_owner_down_error(Config) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down when the owner exits with an error."),
+ do_http_owner_down(Config, http2, unexpected, {shutdown, {owner_down, unexpected}}).
+
+http2_server_goaway_no_streams(_) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when receiving a GOAWAY frame with no active streams and "
+ "retry is disabled."),
+ {ok, _, Port} = init_origin(tcp, http2, fun(_, Socket, Transport) ->
+ Transport:send(Socket, cow_http2:goaway(0, no_error, <<>>)),
+ timer:sleep(500)
+ end),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http2_server_goaway_one_stream(_) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when receiving a GOAWAY frame with one active stream and "
+ "retry is disabled."),
+ {ok, _, OriginPort} = init_origin(tcp, http2, fun(_, Socket, Transport) ->
+ %% Receive a HEADERS frame.
+ {ok, <<SkipLen:24, 1:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Skip the header.
+ {ok, _} = gen_tcp:recv(Socket, SkipLen, 1000),
+ %% Skip the data.
+ {ok, <<_:24, 0:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Send a GOAWAY frame.
+ Transport:send(Socket, cow_http2:goaway(1, no_error, <<>>)),
+ %% Wait before sending the response back and closing the connection.
+ timer:sleep(500),
+ %% Send a HEADERS frame.
+ {HeadersBlock, _} = cow_hpack:encode([
+ {<<":status">>, <<"200">>}
+ ]),
+ ok = Transport:send(Socket, [
+ cow_http2:headers(1, fin, HeadersBlock)
+ ]),
+ timer:sleep(500)
+ end),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", OriginPort, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ timer:sleep(100), %% Give enough time for the handshake to fully complete.
+ StreamRef = gun:get(ConnPid, "/"),
+ ConnRef = monitor(process, ConnPid),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+http2_server_goaway_many_streams(_) ->
+ doc("HTTP/2: Confirm that the Gun process shuts down gracefully "
+ "when receiving a GOAWAY frame with many active streams and "
+ "retry is disabled."),
+ {ok, _, OriginPort} = init_origin(tcp, http2, fun(_, Socket, Transport) ->
+ %% Stream 1.
+ %% Receive a HEADERS frame.
+ {ok, <<SkipLen1:24, 1:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Skip the header.
+ {ok, _} = gen_tcp:recv(Socket, SkipLen1, 1000),
+ %% Skip the data.
+ {ok, <<_:24, 0:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Stream 2.
+ %% Receive a HEADERS frame.
+ {ok, <<SkipLen2:24, 1:8, _:8, 3:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Skip the header.
+ {ok, _} = gen_tcp:recv(Socket, SkipLen2, 1000),
+ %% Skip the data.
+ {ok, <<_:24, 0:8, _:8, 3:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Stream 3.
+ %% Receive a HEADERS frame.
+ {ok, <<SkipLen3:24, 1:8, _:8, 5:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Skip the header.
+ {ok, _} = gen_tcp:recv(Socket, SkipLen3, 1000),
+ %% Skip the data.
+ {ok, <<_:24, 0:8, _:8, 5:32>>} = Transport:recv(Socket, 9, 1000),
+ %% Send a GOAWAY frame.
+ Transport:send(Socket, cow_http2:goaway(5, no_error, <<>>)),
+ %% Wait before sending the responses back and closing the connection.
+ timer:sleep(500),
+ %% Send a HEADERS frame.
+ {HeadersBlock1, State0} = cow_hpack:encode([
+ {<<":status">>, <<"200">>}
+ ]),
+ ok = Transport:send(Socket, [
+ cow_http2:headers(1, fin, HeadersBlock1)
+ ]),
+ {HeadersBlock2, State} = cow_hpack:encode([
+ {<<":status">>, <<"200">>}
+ ], State0),
+ ok = Transport:send(Socket, [
+ cow_http2:headers(3, fin, HeadersBlock2)
+ ]),
+ {HeadersBlock3, _} = cow_hpack:encode([
+ {<<":status">>, <<"200">>}
+ ], State),
+ ok = Transport:send(Socket, [
+ cow_http2:headers(5, fin, HeadersBlock3)
+ ]),
+ timer:sleep(500)
+ end),
+ Protocol = http2,
+ {ok, ConnPid} = gun:open("localhost", OriginPort, #{
+ protocols => [Protocol],
+ retry => 0
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ timer:sleep(100), %% Give enough time for the handshake to fully complete.
+ StreamRef1 = gun:get(ConnPid, "/"),
+ StreamRef2 = gun:get(ConnPid, "/"),
+ StreamRef3 = gun:get(ConnPid, "/"),
+ ConnRef = monitor(process, ConnPid),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef2),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef3),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+ws_gun_shutdown(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ gun:shutdown(ConnPid),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+ws_gun_shutdown_timeout(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down when "
+ "the closing_timeout triggers after calling gun:shutdown/1."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ ws_opts => #{closing_timeout => 100}
+ }),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws_frozen_long", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ gun:shutdown(ConnPid),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+ws_owner_down(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down when the owner exits normally."),
+ do_ws_owner_down(Config, normal, normal).
+
+ws_owner_down_error(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down when the owner exits with an error."),
+ do_ws_owner_down(Config, unexpected, {shutdown, {owner_down, unexpected}}).
+
+do_ws_owner_down(Config, ExitReason, DownReason) ->
+ Self = self(),
+ spawn(fun() ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)),
+ Self ! {conn, ConnPid},
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ timer:sleep(500),
+ exit(ExitReason)
+ end),
+ ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end,
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, DownReason).
+
+ws_gun_send_close_frame(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down gracefully "
+ "when sending a close frame, with retry disabled."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ retry => 0
+ }),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ %% We send a close frame. We expect the same frame back
+ %% before the connection is closed.
+ Frame = {close, 3333, <<>>},
+ gun:ws_send(ConnPid, Frame),
+ {ws, Frame} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+ws_gun_receive_close_frame(Config) ->
+ doc("Websocket: Confirm that the Gun process shuts down gracefully "
+ "when receiving a close frame, with retry disabled."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{
+ retry => 0
+ }),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws_timeout_close", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ %% We expect a close frame before the connection is closed.
+ {ws, {close, 3333, <<>>}} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, normal).
+
+closing_gun_shutdown(Config) ->
+ doc("Confirm that the Gun process shuts down gracefully "
+ "when calling gun:shutdown/1 while Gun is closing a connection."),
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)),
+ {ok, http} = gun:await_up(ConnPid),
+ ConnRef = monitor(process, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws_frozen", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ %% We send a close frame then immediately call gun:shutdown/1.
+ %% We expect Gun to go down without retrying to reconnect.
+ Frame = {close, 3333, <<>>},
+ gun:ws_send(ConnPid, Frame),
+ gun:shutdown(ConnPid),
+ {ws, Frame} = gun:await(ConnPid, StreamRef),
+ gun_is_down(ConnPid, ConnRef, shutdown).
+
+closing_owner_down(Config) ->
+ doc("Confirm that the Gun process shuts down gracefully "
+ "when the owner exits normally while Gun is closing a connection."),
+ do_closing_owner_down(Config, normal, normal).
+
+closing_owner_down_error(Config) ->
+ doc("Confirm that the Gun process shuts down gracefully "
+ "when the owner exits with an error while Gun is closing a connection."),
+ do_closing_owner_down(Config, unexpected, {shutdown, {owner_down, unexpected}}).
+
+do_closing_owner_down(Config, ExitReason, DownReason) ->
+ Self = self(),
+ spawn(fun() ->
+ {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)),
+ Self ! {conn, ConnPid},
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/ws_frozen", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ gun:ws_send(ConnPid, {close, 3333, <<>>}),
+ timer:sleep(100),
+ exit(ExitReason)
+ end),
+ ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end,
+ ConnRef = monitor(process, ConnPid),
+ gun_is_down(ConnPid, ConnRef, DownReason).
+
+%% Internal.
+
+gun_is_down(ConnPid, ConnRef, Expected) ->
+ receive
+ {'DOWN', ConnRef, process, ConnPid, Reason} ->
+ Expected = Reason,
+ ok
+ after 1000 ->
+ true = erlang:is_process_alive(ConnPid),
+ error(timeout)
+ end.
diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl
index 5cc50ec..1abf046 100644
--- a/test/ws_SUITE.erl
+++ b/test/ws_SUITE.erl
@@ -68,3 +68,30 @@ reject_upgrade(Config) ->
after 1000 ->
error(timeout)
end.
+
+send_many(Config) ->
+ doc("Ensure we can send a list of frames in one gun:ws_send call."),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config)),
+ {ok, _} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ Frame1 = {text, <<"Hello!">>},
+ Frame2 = {binary, <<"World!">>},
+ gun:ws_send(ConnPid, [Frame1, Frame2]),
+ {ws, Frame1} = gun:await(ConnPid, StreamRef),
+ {ws, Frame2} = gun:await(ConnPid, StreamRef),
+ gun:close(ConnPid).
+
+send_many_close(Config) ->
+ doc("Ensure we can send a list of frames in one gun:ws_send call, including a close frame."),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config)),
+ {ok, _} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ Frame1 = {text, <<"Hello!">>},
+ Frame2 = {binary, <<"World!">>},
+ gun:ws_send(ConnPid, [Frame1, Frame2, close]),
+ {ws, Frame1} = gun:await(ConnPid, StreamRef),
+ {ws, Frame2} = gun:await(ConnPid, StreamRef),
+ {ws, close} = gun:await(ConnPid, StreamRef),
+ gun:close(ConnPid).