From a2facaf2da2c95df32ffdc529bd3bdd9f91c080c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 14 Aug 2017 17:17:44 +0200 Subject: Add tests for the streams shutdown mechanism --- test/handlers/stream_handler_h.erl | 45 ++++++++++++++-- test/stream_handler_SUITE.erl | 103 ++++++++++++++++++++++++++++++++++++- 2 files changed, 142 insertions(+), 6 deletions(-) diff --git a/test/handlers/stream_handler_h.erl b/test/handlers/stream_handler_h.erl index bb4a6da..3ee4932 100644 --- a/test/handlers/stream_handler_h.erl +++ b/test/handlers/stream_handler_h.erl @@ -9,21 +9,56 @@ -export([terminate/3]). -export([early_error/5]). +-record(state, { + pid, + test +}). + init(StreamID, Req, Opts) -> - %% @todo Vary behavior depending on x-test-case. Pid = list_to_pid(binary_to_list(cowboy_req:header(<<"x-test-pid">>, Req))), + Test = binary_to_atom(cowboy_req:header(<<"x-test-case">>, Req), latin1), + State = #state{pid=Pid, test=Test}, Pid ! {Pid, self(), init, StreamID, Req, Opts}, - {[{headers, 200, #{}}], Pid}. + {init_commands(StreamID, Req, State), State}. + +init_commands(_, _, State=#state{test=shutdown_on_stream_stop}) -> + Spawn = init_process(false, State), + [{headers, 200, #{}}, {spawn, Spawn, 5000}, stop]; +init_commands(_, _, State=#state{test=shutdown_on_socket_close}) -> + Spawn = init_process(false, State), + [{headers, 200, #{}}, {spawn, Spawn, 5000}]; +init_commands(_, _, State=#state{test=shutdown_timeout_on_stream_stop}) -> + Spawn = init_process(true, State), + [{headers, 200, #{}}, {spawn, Spawn, 2000}, stop]; +init_commands(_, _, State=#state{test=shutdown_timeout_on_socket_close}) -> + Spawn = init_process(true, State), + [{headers, 200, #{}}, {spawn, Spawn, 2000}]; +init_commands(_, _, _) -> + [{headers, 200, #{}}]. + +init_process(TrapExit, #state{pid=Pid}) -> + Self = self(), + Spawn = spawn_link(fun() -> + process_flag(trap_exit, TrapExit), + Pid ! {Pid, Self, spawned, self()}, + receive {Pid, ready} -> ok after 1000 -> error(timeout) end, + Self ! {self(), ready}, + receive after 5000 -> + Pid ! {Pid, Self, still_alive, self()} + end + end), + receive {Spawn, ready} -> ok after 1000 -> error(timeout) end, + Spawn. -data(StreamID, IsFin, Data, State=Pid) -> +data(StreamID, IsFin, Data, State=#state{pid=Pid}) -> Pid ! {Pid, self(), data, StreamID, IsFin, Data, State}, {[], State}. -info(StreamID, Info, State=Pid) -> +info(StreamID, Info, State=#state{pid=Pid}) -> Pid ! {Pid, self(), info, StreamID, Info, State}, {[], State}. -terminate(StreamID, Reason, State=Pid) -> +terminate(StreamID, Reason, State=#state{pid=Pid}) -> Pid ! {Pid, self(), terminate, StreamID, Reason, State}, ok. diff --git a/test/stream_handler_SUITE.erl b/test/stream_handler_SUITE.erl index 1cc4f89..6a5ff8e 100644 --- a/test/stream_handler_SUITE.erl +++ b/test/stream_handler_SUITE.erl @@ -59,13 +59,114 @@ end_per_group(Name, _) -> %% Tests. +shutdown_on_stream_stop(Config) -> + doc("Confirm supervised processes are shutdown when stopping the stream."), + Self = self(), + ConnPid = gun_open(Config), + Ref = gun:get(ConnPid, "/long_polling", [ + {<<"accept-encoding">>, <<"gzip">>}, + {<<"x-test-case">>, <<"shutdown_on_stream_stop">>}, + {<<"x-test-pid">>, pid_to_list(Self)} + ]), + %% Confirm init/3 is called. + Pid = receive {Self, P, init, _, _, _} -> P after 1000 -> error(timeout) end, + %% Receive the pid of the newly started process and monitor it. + Spawn = receive {Self, Pid, spawned, S} -> S after 1000 -> error(timeout) end, + MRef = monitor(process, Spawn), + Spawn ! {Self, ready}, + %% Confirm terminate/3 is called, indicating the stream ended. + receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end, + %% We should receive a DOWN message soon after (or before) because the stream + %% handler is stopping the stream immediately after the process started. + receive {'DOWN', MRef, process, Spawn, shutdown} -> ok after 1000 -> error(timeout) end, + %% The response is still sent. + {response, nofin, 200, _} = gun:await(ConnPid, Ref), + {ok, <<>>} = gun:await_body(ConnPid, Ref), + ok. + +shutdown_on_socket_close(Config) -> + doc("Confirm supervised processes are shutdown when the socket closes."), + Self = self(), + ConnPid = gun_open(Config), + Ref = gun:get(ConnPid, "/long_polling", [ + {<<"accept-encoding">>, <<"gzip">>}, + {<<"x-test-case">>, <<"shutdown_on_socket_close">>}, + {<<"x-test-pid">>, pid_to_list(Self)} + ]), + %% Confirm init/3 is called. + Pid = receive {Self, P, init, _, _, _} -> P after 1000 -> error(timeout) end, + %% Receive the pid of the newly started process and monitor it. + Spawn = receive {Self, Pid, spawned, S} -> S after 1000 -> error(timeout) end, + MRef = monitor(process, Spawn), + Spawn ! {Self, ready}, + %% Close the socket. + ok = gun:close(ConnPid), + %% Confirm terminate/3 is called, indicating the stream ended. + receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end, + %% Confirm we receive a DOWN message for the child process. + receive {'DOWN', MRef, process, Spawn, shutdown} -> ok after 1000 -> error(timeout) end, + ok. + +shutdown_timeout_on_stream_stop(Config) -> + doc("Confirm supervised processes are killed " + "when the shutdown timeout triggers after stopping the stream."), + Self = self(), + ConnPid = gun_open(Config), + Ref = gun:get(ConnPid, "/long_polling", [ + {<<"accept-encoding">>, <<"gzip">>}, + {<<"x-test-case">>, <<"shutdown_timeout_on_stream_stop">>}, + {<<"x-test-pid">>, pid_to_list(Self)} + ]), + %% Confirm init/3 is called. + Pid = receive {Self, P, init, _, _, _} -> P after 1000 -> error(timeout) end, + %% Receive the pid of the newly started process and monitor it. + Spawn = receive {Self, Pid, spawned, S} -> S after 1000 -> error(timeout) end, + MRef = monitor(process, Spawn), + Spawn ! {Self, ready}, + %% Confirm terminate/3 is called, indicating the stream ended. + receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end, + %% We should NOT receive a DOWN message immediately. + receive {'DOWN', MRef, process, Spawn, killed} -> error(killed) after 1500 -> ok end, + %% We should received it now. + receive {'DOWN', MRef, process, Spawn, killed} -> ok after 1000 -> error(timeout) end, + %% The response is still sent. + {response, nofin, 200, _} = gun:await(ConnPid, Ref), + {ok, <<>>} = gun:await_body(ConnPid, Ref), + ok. + +shutdown_timeout_on_socket_close(Config) -> + doc("Confirm supervised processes are killed " + "when the shutdown timeout triggers after the socket has closed."), + Self = self(), + ConnPid = gun_open(Config), + Ref = gun:get(ConnPid, "/long_polling", [ + {<<"accept-encoding">>, <<"gzip">>}, + {<<"x-test-case">>, <<"shutdown_timeout_on_socket_close">>}, + {<<"x-test-pid">>, pid_to_list(Self)} + ]), + %% Confirm init/3 is called. + Pid = receive {Self, P, init, _, _, _} -> P after 1000 -> error(timeout) end, + %% Receive the pid of the newly started process and monitor it. + Spawn = receive {Self, Pid, spawned, S} -> S after 1000 -> error(timeout) end, + MRef = monitor(process, Spawn), + Spawn ! {Self, ready}, + %% Close the socket. + ok = gun:close(ConnPid), + %% Confirm terminate/3 is called, indicating the stream ended. + receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end, + %% We should NOT receive a DOWN message immediately. + receive {'DOWN', MRef, process, Spawn, killed} -> error(killed) after 1500 -> ok end, + %% We should received it now. + receive {'DOWN', MRef, process, Spawn, killed} -> ok after 1000 -> error(timeout) end, + ok. + terminate_on_socket_close(Config) -> doc("Confirm terminate/3 is called when the socket gets closed brutally."), Self = self(), ConnPid = gun_open(Config), Ref = gun:get(ConnPid, "/long_polling", [ {<<"accept-encoding">>, <<"gzip">>}, - {<<"x-test-case">>, <<"stream">>}, + {<<"x-test-case">>, <<"terminate_on_socket_close">>}, {<<"x-test-pid">>, pid_to_list(Self)} ]), %% Confirm init/3 is called and receive the beginning of the response. -- cgit v1.2.3