path: root/test
diff options
authorLoïc Hoguin <[email protected]>2017-08-14 17:17:44 +0200
committerLoïc Hoguin <[email protected]>2017-08-14 17:17:44 +0200
commita2facaf2da2c95df32ffdc529bd3bdd9f91c080c (patch)
tree6b260b63484eefd0cfbd72659943042f1ea29806 /test
parent58b70a594b7c74494f4859695e94295691b749c5 (diff)
Add tests for the streams shutdown mechanism
Diffstat (limited to 'test')
2 files changed, 142 insertions, 6 deletions
diff --git a/test/handlers/stream_handler_h.erl b/test/handlers/stream_handler_h.erl
index bb4a6da..3ee4932 100644
--- a/test/handlers/stream_handler_h.erl
+++ b/test/handlers/stream_handler_h.erl
@@ -9,21 +9,56 @@
+-record(state, {
+ pid,
+ test
init(StreamID, Req, Opts) ->
- %% @todo Vary behavior depending on x-test-case.
Pid = list_to_pid(binary_to_list(cowboy_req:header(<<"x-test-pid">>, Req))),
+ Test = binary_to_atom(cowboy_req:header(<<"x-test-case">>, Req), latin1),
+ State = #state{pid=Pid, test=Test},
Pid ! {Pid, self(), init, StreamID, Req, Opts},
- {[{headers, 200, #{}}], Pid}.
+ {init_commands(StreamID, Req, State), State}.
+init_commands(_, _, State=#state{test=shutdown_on_stream_stop}) ->
+ Spawn = init_process(false, State),
+ [{headers, 200, #{}}, {spawn, Spawn, 5000}, stop];
+init_commands(_, _, State=#state{test=shutdown_on_socket_close}) ->
+ Spawn = init_process(false, State),
+ [{headers, 200, #{}}, {spawn, Spawn, 5000}];
+init_commands(_, _, State=#state{test=shutdown_timeout_on_stream_stop}) ->
+ Spawn = init_process(true, State),
+ [{headers, 200, #{}}, {spawn, Spawn, 2000}, stop];
+init_commands(_, _, State=#state{test=shutdown_timeout_on_socket_close}) ->
+ Spawn = init_process(true, State),
+ [{headers, 200, #{}}, {spawn, Spawn, 2000}];
+init_commands(_, _, _) ->
+ [{headers, 200, #{}}].
+init_process(TrapExit, #state{pid=Pid}) ->
+ Self = self(),
+ Spawn = spawn_link(fun() ->
+ process_flag(trap_exit, TrapExit),
+ Pid ! {Pid, Self, spawned, self()},
+ receive {Pid, ready} -> ok after 1000 -> error(timeout) end,
+ Self ! {self(), ready},
+ receive after 5000 ->
+ Pid ! {Pid, Self, still_alive, self()}
+ end
+ end),
+ receive {Spawn, ready} -> ok after 1000 -> error(timeout) end,
+ Spawn.
-data(StreamID, IsFin, Data, State=Pid) ->
+data(StreamID, IsFin, Data, State=#state{pid=Pid}) ->
Pid ! {Pid, self(), data, StreamID, IsFin, Data, State},
{[], State}.
-info(StreamID, Info, State=Pid) ->
+info(StreamID, Info, State=#state{pid=Pid}) ->
Pid ! {Pid, self(), info, StreamID, Info, State},
{[], State}.
-terminate(StreamID, Reason, State=Pid) ->
+terminate(StreamID, Reason, State=#state{pid=Pid}) ->
Pid ! {Pid, self(), terminate, StreamID, Reason, State},
diff --git a/test/stream_handler_SUITE.erl b/test/stream_handler_SUITE.erl
index 1cc4f89..6a5ff8e 100644
--- a/test/stream_handler_SUITE.erl
+++ b/test/stream_handler_SUITE.erl
@@ -59,13 +59,114 @@ end_per_group(Name, _) ->
%% Tests.
+shutdown_on_stream_stop(Config) ->
+ doc("Confirm supervised processes are shutdown when stopping the stream."),
+ Self = self(),
+ ConnPid = gun_open(Config),
+ Ref = gun:get(ConnPid, "/long_polling", [
+ {<<"accept-encoding">>, <<"gzip">>},
+ {<<"x-test-case">>, <<"shutdown_on_stream_stop">>},
+ {<<"x-test-pid">>, pid_to_list(Self)}
+ ]),
+ %% Confirm init/3 is called.
+ Pid = receive {Self, P, init, _, _, _} -> P after 1000 -> error(timeout) end,
+ %% Receive the pid of the newly started process and monitor it.
+ Spawn = receive {Self, Pid, spawned, S} -> S after 1000 -> error(timeout) end,
+ MRef = monitor(process, Spawn),
+ Spawn ! {Self, ready},
+ %% Confirm terminate/3 is called, indicating the stream ended.
+ receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end,
+ %% We should receive a DOWN message soon after (or before) because the stream
+ %% handler is stopping the stream immediately after the process started.
+ receive {'DOWN', MRef, process, Spawn, shutdown} -> ok after 1000 -> error(timeout) end,
+ %% The response is still sent.
+ {response, nofin, 200, _} = gun:await(ConnPid, Ref),
+ {ok, <<>>} = gun:await_body(ConnPid, Ref),
+ ok.
+shutdown_on_socket_close(Config) ->
+ doc("Confirm supervised processes are shutdown when the socket closes."),
+ Self = self(),
+ ConnPid = gun_open(Config),
+ Ref = gun:get(ConnPid, "/long_polling", [
+ {<<"accept-encoding">>, <<"gzip">>},
+ {<<"x-test-case">>, <<"shutdown_on_socket_close">>},
+ {<<"x-test-pid">>, pid_to_list(Self)}
+ ]),
+ %% Confirm init/3 is called.
+ Pid = receive {Self, P, init, _, _, _} -> P after 1000 -> error(timeout) end,
+ %% Receive the pid of the newly started process and monitor it.
+ Spawn = receive {Self, Pid, spawned, S} -> S after 1000 -> error(timeout) end,
+ MRef = monitor(process, Spawn),
+ Spawn ! {Self, ready},
+ %% Close the socket.
+ ok = gun:close(ConnPid),
+ %% Confirm terminate/3 is called, indicating the stream ended.
+ receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end,
+ %% Confirm we receive a DOWN message for the child process.
+ receive {'DOWN', MRef, process, Spawn, shutdown} -> ok after 1000 -> error(timeout) end,
+ ok.
+shutdown_timeout_on_stream_stop(Config) ->
+ doc("Confirm supervised processes are killed "
+ "when the shutdown timeout triggers after stopping the stream."),
+ Self = self(),
+ ConnPid = gun_open(Config),
+ Ref = gun:get(ConnPid, "/long_polling", [
+ {<<"accept-encoding">>, <<"gzip">>},
+ {<<"x-test-case">>, <<"shutdown_timeout_on_stream_stop">>},
+ {<<"x-test-pid">>, pid_to_list(Self)}
+ ]),
+ %% Confirm init/3 is called.
+ Pid = receive {Self, P, init, _, _, _} -> P after 1000 -> error(timeout) end,
+ %% Receive the pid of the newly started process and monitor it.
+ Spawn = receive {Self, Pid, spawned, S} -> S after 1000 -> error(timeout) end,
+ MRef = monitor(process, Spawn),
+ Spawn ! {Self, ready},
+ %% Confirm terminate/3 is called, indicating the stream ended.
+ receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end,
+ %% We should NOT receive a DOWN message immediately.
+ receive {'DOWN', MRef, process, Spawn, killed} -> error(killed) after 1500 -> ok end,
+ %% We should received it now.
+ receive {'DOWN', MRef, process, Spawn, killed} -> ok after 1000 -> error(timeout) end,
+ %% The response is still sent.
+ {response, nofin, 200, _} = gun:await(ConnPid, Ref),
+ {ok, <<>>} = gun:await_body(ConnPid, Ref),
+ ok.
+shutdown_timeout_on_socket_close(Config) ->
+ doc("Confirm supervised processes are killed "
+ "when the shutdown timeout triggers after the socket has closed."),
+ Self = self(),
+ ConnPid = gun_open(Config),
+ Ref = gun:get(ConnPid, "/long_polling", [
+ {<<"accept-encoding">>, <<"gzip">>},
+ {<<"x-test-case">>, <<"shutdown_timeout_on_socket_close">>},
+ {<<"x-test-pid">>, pid_to_list(Self)}
+ ]),
+ %% Confirm init/3 is called.
+ Pid = receive {Self, P, init, _, _, _} -> P after 1000 -> error(timeout) end,
+ %% Receive the pid of the newly started process and monitor it.
+ Spawn = receive {Self, Pid, spawned, S} -> S after 1000 -> error(timeout) end,
+ MRef = monitor(process, Spawn),
+ Spawn ! {Self, ready},
+ %% Close the socket.
+ ok = gun:close(ConnPid),
+ %% Confirm terminate/3 is called, indicating the stream ended.
+ receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end,
+ %% We should NOT receive a DOWN message immediately.
+ receive {'DOWN', MRef, process, Spawn, killed} -> error(killed) after 1500 -> ok end,
+ %% We should received it now.
+ receive {'DOWN', MRef, process, Spawn, killed} -> ok after 1000 -> error(timeout) end,
+ ok.
terminate_on_socket_close(Config) ->
doc("Confirm terminate/3 is called when the socket gets closed brutally."),
Self = self(),
ConnPid = gun_open(Config),
Ref = gun:get(ConnPid, "/long_polling", [
{<<"accept-encoding">>, <<"gzip">>},
- {<<"x-test-case">>, <<"stream">>},
+ {<<"x-test-case">>, <<"terminate_on_socket_close">>},
{<<"x-test-pid">>, pid_to_list(Self)}
%% Confirm init/3 is called and receive the beginning of the response.