%% This library is free software; you can redistribute it and/or modify
%% it under the terms of the GNU Lesser General Public License as
%% published by the Free Software Foundation; either version 2 of the
%% License, or (at your option) any later version.
%%
%% This library is distributed in the hope that it will be useful, but
%% WITHOUT ANY WARRANTY; without even the implied warranty of
%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%% Lesser General Public License for more details.
%%
%% You should have received a copy of the GNU Lesser General Public
%% License along with this library; if not, write to the Free Software
%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
%% USA
%%
%% $Id$ 
%%
%% @author Richard Carlsson <richardc@it.uu.se>
%% @copyright 2006 Richard Carlsson
%% @private
%% @see eunit
%% @doc Test runner process tree functions

-module(eunit_proc).

-include("eunit.hrl").
-include("eunit_internal.hrl").

-export([start/4]).

%% This must be exported; see new_group_leader/1 for details.
-export([group_leader_process/1]).

-record(procstate, {ref, id, super, insulator, parent, order}).


%% Spawns test process and returns the process Pid; sends {done,
%% Reference, Pid} to caller when finished. See the function
%% wait_for_task/2 for details about the need for the reference.
%%
%% The `Super' process receives a stream of status messages; see
%% message_super/3 for details.

start(Tests, Order, Super, Reference)
  when is_pid(Super), is_reference(Reference) ->
    St = #procstate{ref = Reference,
		    id = [],
		    super = Super,
		    order = Order},
    spawn_group(local, #group{tests = Tests}, St).


%% Status messages sent to the supervisor process. (A supervisor does
%% not have to act on these messages - it can e.g. just log them, or
%% even discard them.) Each status message has the following form:
%%
%%   {status, Id, Info}
%%
%% where Id identifies the item that the message pertains to, and the
%% Info part can be one of:
%%
%%   {progress, 'begin', {test | group, Data}}
%%       indicates that the item has been entered, and what type it is;
%%       Data is [{desc,binary()}, {source,Source}, {line,integer()}] for
%%       a test, and [{desc,binary()}, {spawn,SpawnType},
%%       {order,OrderType}] for a group.
%%
%%   {progress, 'end', {Status, Data}}
%%       Status = 'ok' | {error, Exception} | {skipped, Cause} | integer()
%%       Data = [{time,integer()}, {output,binary()}]
%%
%%       where Time is measured in milliseconds and Output is the data
%%       written to the standard output stream during the test; if
%%       Status is {skipped, Cause}, then Cause is a term thrown from
%%       eunit_test:run_testfun/1. For a group item, the Status field is
%%       the number of immediate subitems of the group; this helps the
%%       collation of results. Failure for groups is always signalled
%%       through a cancel message, not through the Status field.
%%
%%   {cancel, Descriptor}
%%       where Descriptor can be:
%%           timeout            a timeout occurred
%%           {blame, Id}        forced to terminate because of item `Id'
%%           {abort, Cause}     the test or group failed to execute
%%           {exit, Reason}     the test process terminated unexpectedly
%%           {startup, Reason}  failed to start a remote test process
%%
%%       where Cause is a term thrown from eunit_data:enter_context/4 or
%%       from eunit_data:iter_next/2, and Reason is an exit term from a
%%       crashed process
%%
%% Note that due to concurrent (and possibly distributed) execution,
%% there are *no* strict ordering guarantees on the status messages,
%% with one exception: a 'begin' message will always arrive before its
%% corresponding 'end' message.

message_super(Id, Info, St) ->
    St#procstate.super ! {status, Id, Info}.


%% @TODO implement synchronized mode for insulator/child execution

%% Ideas for synchronized mode:
%%
%% * At each "program point", i.e., before entering a test, entering a
%% group, or leaving a group, the child will synchronize with the
%% insulator to make sure it is ok to proceed.
%%
%% * The insulator can receive controlling messages from higher up in
%% the hierarchy, telling it to pause, resume, single-step, repeat, etc.
%%
%% * Synchronization on entering/leaving groups is necessary in order to
%% get control over things such as subprocess creation/termination and
%% setup/cleanup, making it possible to, e.g., repeat all the tests
%% within a particular subprocess without terminating and restarting it,
%% or repeating tests without repeating the setup/cleanup.
%%
%% * Some tests that depend on state will not be possible to repeat, but
%% require a fresh context setup. There is nothing that can be done
%% about this, and the many tests that are repeatable should not be
%% punished because of it. The user must decide which level to restart.
%%
%% * Question: How propagate control messages down the hierarchy
%% (preferably only to the correct insulator process)? An insulator does
%% not currenctly know whether its child process has spawned subtasks.
%% (The "supervisor" process does not know the Pids of the controlling
%% insulator processes in the tree, and it probably should not be
%% responsible for this anyway.)


%% ---------------------------------------------------------------------
%% Process tree primitives

%% A "task" consists of an insulator process and a child process which
%% handles the actual work. When the child terminates, the insulator
%% process sends {done, Reference, self()} to the process which started
%% the task (the "parent"). The child process is given a State record
%% which contains the process id:s of the parent, the insulator, and the
%% supervisor.

%% @spec (Type, (#procstate{}) -> () -> term(), #procstate{}) -> pid()
%%   Type = local | {remote, Node::atom()}

start_task(Type, Fun, St0) ->
    St = St0#procstate{parent = self()},
    %% (note: the link here is mainly to propagate signals *downwards*,
    %% so that the insulator can detect if the process that started the
    %% task dies before the task is done)
    F = fun () -> insulator_process(Type, Fun, St) end,
    case Type of
	local ->
	    %% we assume (at least for now) that local spawns can never
	    %% fail in such a way that the process does not start, so a
	    %% new local insulator does not need to synchronize here
	    spawn_link(F);
	{remote, Node} ->
	    Pid = spawn_link(Node, F),
	    %% See below for the need for the {ok, Reference, Pid}
	    %% message.
	    Reference = St#procstate.ref,
	    Monitor = erlang:monitor(process, Pid),
	    %% (the DOWN message is guaranteed to arrive after any
	    %% messages sent by the process itself)
	    receive
		{ok, Reference, Pid} ->
		    Pid;
		{'DOWN', Monitor, process, Pid, Reason} ->
		    %% send messages as if the insulator process was
		    %% started, but terminated on its own accord
		    Msg = {startup, Reason},
		    message_super(St#procstate.id, {cancel, Msg}, St),
		    self() ! {done, Reference, Pid}
	    end,
	    erlang:demonitor(Monitor, [flush]),
	    Pid
    end.

%% Relatively simple, and hopefully failure-proof insulator process
%% (This is cleaner than temporarily setting up the caller to trap
%% signals, and does not affect the caller's mailbox or other state.)
%%
%% We assume that nobody does a 'kill' on an insulator process - if that
%% should happen, the test framework will hang since the insulator will
%% never send a reply; see below for more.
%%
%% Note that even if the insulator process itself never fails, it is
%% still possible that it does not start properly, if it is spawned
%% remotely (e.g., if the remote node is down). Therefore, remote
%% insulators must always immediately send an {ok, Reference, self()}
%% message to the parent as soon as it is spawned.

%% @spec (Type, Fun::() -> term(), St::#procstate{}) -> ok
%%  Type = local | {remote, Node::atom()}

insulator_process(Type, Fun, St0) ->
    process_flag(trap_exit, true),
    Parent = St0#procstate.parent,
    if Type =:= local -> ok;
       true -> Parent ! {ok, St0#procstate.ref, self()}
    end,
    St = St0#procstate{insulator = self()},
    Child = spawn_link(fun () -> child_process(Fun(St), St) end),
    insulator_wait(Child, Parent, [], St).

%% Normally, child processes exit with the reason 'normal' even if the
%% executed tests failed (by throwing exceptions), since the tests are
%% executed within a try-block. Child processes can terminate abnormally
%% by the following reasons:
%%   1) an error in the processing of the test descriptors (a malformed
%%      descriptor, failure in a setup, cleanup or initialization, a
%%      missing module or function, or a failing generator function);
%%   2) an internal error in the test running framework itself;
%%   3) receiving a non-trapped error signal as a consequence of running
%%      test code.
%% Those under point 1 are "expected errors", handled specially in the
%% protocol, while the other two are unexpected errors. (Since alt. 3
%% implies that the test neither reported success nor failure, it can
%% never be considered "proper" behaviour of a test.) Abnormal
%% termination is reported to the supervisor process but otherwise does
%% not affect the insulator compared to normal termination. Child
%% processes can also be killed abruptly by their insulators, in case of
%% a timeout or if a parent process dies.
%%
%% The insulator is the group leader for the child process, and gets all
%% of its standard I/O. The output is buffered and associated with the
%% currently active test or group, and is sent along with the 'end'
%% progress message when the test or group has finished.

insulator_wait(Child, Parent, Buf, St) ->
    receive
	{child, Child, Id, {'begin', Type, Data}} ->
	    message_super(Id, {progress, 'begin', {Type, Data}}, St),
	    insulator_wait(Child, Parent, [[] | Buf], St);
	{child, Child, Id, {'end', Status, Time}} ->
	    Data = [{time, Time}, {output, buffer_to_binary(hd(Buf))}],
	    message_super(Id, {progress, 'end', {Status, Data}}, St),
	    insulator_wait(Child, Parent, tl(Buf), St);
	{child, Child, Id, {skipped, Reason}} ->
	    %% this happens when a subgroup fails to enter the context
	    message_super(Id, {cancel, {abort, Reason}}, St),
	    insulator_wait(Child, Parent, Buf, St);
	{child, Child, Id, {abort, Cause}} ->
	    %% this happens when the child code threw an internal
	    %% eunit_abort; the child process has already exited
	    exit_messages(Id, {abort, Cause}, St),
	    %% no need to wait for the {'EXIT',Child,_} message
	    terminate_insulator(St);
	{io_request, Child, ReplyAs, Req} ->
	    %% we only collect output from the child process itself, not
	    %% from secondary processes, otherwise we get race problems;
	    %% however, each test runs its personal group leader that
	    %% funnels all output - see the run_test() function
	    Buf1 = io_request(Child, ReplyAs, Req, hd(Buf)),
	    insulator_wait(Child, Parent, [Buf1 | tl(Buf)], St);
	{io_request, From, ReplyAs, Req} when is_pid(From) ->
	    %% (this shouldn't happen anymore, but we keep it safe)
	    %% just ensure the sender gets a reply; ignore the data
	    io_request(From, ReplyAs, Req, []),
	    insulator_wait(Child, Parent, Buf, St);
	{timeout, Child, Id} ->
	    exit_messages(Id, timeout, St),
	    kill_task(Child, St);
	{'EXIT', Child, normal} ->
	    terminate_insulator(St);
	{'EXIT', Child, Reason} ->
	    exit_messages(St#procstate.id, {exit, Reason}, St),
	    terminate_insulator(St);
	{'EXIT', Parent, _} ->
	    %% make sure child processes are cleaned up recursively
	    kill_task(Child, St)
    end.

kill_task(Child, St) ->
    exit(Child, kill),
    terminate_insulator(St).

buffer_to_binary([B]) when is_binary(B) -> B;  % avoid unnecessary copying
buffer_to_binary(Buf) -> list_to_binary(lists:reverse(Buf)).

%% Unlinking before exit avoids polluting the parent process with exit
%% signals from the insulator. The child process is already dead here.

terminate_insulator(St) ->
    %% messaging/unlinking is ok even if the parent is already dead
    Parent = St#procstate.parent,
    Parent ! {done, St#procstate.ref, self()},
    unlink(Parent),
    exit(normal).

%% send cancel messages for the Id of the "causing" item, and also for
%% the Id of the insulator itself, if they are different
exit_messages(Id, Cause, St) ->
    %% the message for the most specific Id is always sent first
    message_super(Id, {cancel, Cause}, St),
    case St#procstate.id of
	Id -> ok;
	Id1 -> message_super(Id1, {cancel, {blame, Id}}, St)
    end.

%% Child processes send all messages via the insulator to ensure proper
%% sequencing with timeouts and exit signals.

message_insulator(Data, St) ->
    St#procstate.insulator ! {child, self(), St#procstate.id, Data}.

%% Timeout handling

set_timeout(Time, St) ->
    erlang:send_after(Time, St#procstate.insulator,
		      {timeout, self(), St#procstate.id}).

clear_timeout(Ref) ->
    erlang:cancel_timer(Ref).

with_timeout(undefined, Default, F, St) ->
    with_timeout(Default, F, St);
with_timeout(Time, _Default, F, St) ->
    with_timeout(Time, F, St).

with_timeout(infinity, F, _St) ->
    %% don't start timers unnecessarily
    {T0, _} = statistics(wall_clock),
    Value = F(),
    {T1, _} = statistics(wall_clock),
    {Value, T1 - T0};
with_timeout(Time, F, St) when is_integer(Time), Time > 16#FFFFffff ->
    with_timeout(16#FFFFffff, F, St);
with_timeout(Time, F, St) when is_integer(Time), Time < 0 ->
    with_timeout(0, F, St);
with_timeout(Time, F, St) when is_integer(Time) ->
    Ref = set_timeout(Time, St),
    {T0, _} = statistics(wall_clock),
    try F() of
	Value ->
	    %% we could also read the timer, but this is simpler
	    {T1, _} = statistics(wall_clock),
	    {Value, T1 - T0}
    after
	clear_timeout(Ref)
    end.

%% The normal behaviour of a child process is not to trap exit
%% signals. The testing framework is not dependent on this, however, so
%% the test code is allowed to enable signal trapping as it pleases.
%% Note that I/O is redirected to the insulator process.

%% @spec (() -> term(), #procstate{}) -> ok

child_process(Fun, St) ->
    group_leader(St#procstate.insulator, self()),
    try Fun() of
	_ -> ok
    catch
	%% the only "normal" way for a child process to bail out (e.g,
	%% when not being able to parse the test descriptor) is to throw
	%% an {eunit_abort, Reason} exception; any other exception will
	%% be reported as an unexpected termination of the test
	{eunit_abort, Cause} ->
	    message_insulator({abort, Cause}, St),
	    exit(aborted)
    end.

-ifdef(TEST).
child_test_() ->
    [{"test processes do not trap exit signals",
      ?_assertMatch(false, process_flag(trap_exit, false))}].
-endif.

%% @throws abortException()
%% @type abortException() = {eunit_abort, Cause::term()}

abort_task(Cause) ->
    throw({eunit_abort, Cause}).

%% Typically, the process that executes this code is not trapping
%% signals, but it might be - it is outside of our control, since test
%% code can enable or disable trapping at will. That we cannot rely on
%% process links here, is why the insulator process of a task must be
%% guaranteed to always send a reply before it terminates.
%%
%% The unique reference guarantees that we don't extract any message
%% from the mailbox unless it belongs to the test framework (and not to
%% the running tests) - it is not possible to use selective receive to
%% match only messages that are tagged with some pid out of a
%% dynamically varying set of pids. When the wait-loop terminates, no
%% such message should remain in the mailbox.

wait_for_task(Pid, St) ->
    wait_for_tasks(sets:from_list([Pid]), St).

wait_for_tasks(PidSet, St) ->
    case sets:size(PidSet) of
	0 ->
	    ok;
	_ ->
	    %% (note that when we receive this message for some task, we
	    %% are guaranteed that the insulator process of the task has
	    %% already informed the supervisor about any anomalies)
	    Reference = St#procstate.ref,
	    receive
		{done, Reference, Pid} ->
		    %% (if Pid is not in the set, del_element has no
		    %% effect, so this is always safe)
		    Rest = sets:del_element(Pid, PidSet),
		    wait_for_tasks(Rest, St)
	    end
    end.

%% ---------------------------------------------------------------------
%% Separate testing process

%% TODO: Ability to stop after N failures.
%% TODO: Flow control, starting new job as soon as slot is available

tests(T, St) ->
    I = eunit_data:iter_init(T, St#procstate.id),
    case St#procstate.order of
	inorder -> tests_inorder(I, St);
	inparallel -> tests_inparallel(I, 0, St);
	{inparallel, N} when is_integer(N), N >= 0 ->
	    tests_inparallel(I, N, St)
    end.

set_id(I, St) ->
    St#procstate{id = eunit_data:iter_id(I)}.

tests_inorder(I, St) ->
    tests_inorder(I, 0, St).

tests_inorder(I, N, St) ->
    case get_next_item(I) of
	{T, I1} ->
	    handle_item(T, set_id(I1, St)),
	    tests_inorder(I1, N+1, St);
	none ->
	    N    % the return status of a group is the subtest count
    end.

tests_inparallel(I, K0, St) ->
    tests_inparallel(I, 0, St, K0, K0, sets:new()).

tests_inparallel(I, N, St, K, K0, Children) when K =< 0, K0 > 0 ->
    wait_for_tasks(Children, St),
    tests_inparallel(I, N, St, K0, K0, sets:new());
tests_inparallel(I, N, St, K, K0, Children) ->
    case get_next_item(I) of
	{T, I1} ->
	    Child = spawn_item(T, set_id(I1, St)),
	    tests_inparallel(I1, N+1, St, K - 1, K0,
			     sets:add_element(Child, Children));
	none ->
	    wait_for_tasks(Children, St),
	    N    % the return status of a group is the subtest count
    end.

%% this starts a new separate task for an inparallel-item (which might
%% be a group and in that case might cause yet another spawn in the
%% handle_group() function, but it might also be just a single test)
spawn_item(T, St0) ->
    Fun = fun (St) ->
		  fun () -> handle_item(T, St) end
	  end,
    %% inparallel-items are always spawned locally
    start_task(local, Fun, St0).

get_next_item(I) ->
    try eunit_data:iter_next(I)
    catch
	Term -> abort_task(Term)
    end.

handle_item(T, St) ->
    case T of
	#test{} -> handle_test(T, St);
	#group{} -> handle_group(T, St)
    end.

handle_test(T, St) ->
    Data = [{desc, T#test.desc}, {source, T#test.location},
	    {line, T#test.line}],
    message_insulator({'begin', test, Data}, St),

    %% each test case runs under a fresh group leader process
    G0 = group_leader(),
    Runner = self(),
    G1 = new_group_leader(Runner),
    group_leader(G1, self()),

    %% run the actual test, handling timeouts and getting the total run
    %% time of the test code (and nothing else)
    {Status, Time} = with_timeout(T#test.timeout, ?DEFAULT_TEST_TIMEOUT,
				  fun () -> run_test(T) end, St),

    %% restore group leader, get the collected output, and re-emit it so
    %% that it all seems to come from this process, and always comes
    %% before the 'end' message for this test
    group_leader(G0, self()),
    Output = group_leader_sync(G1),
    io:put_chars(Output),

    message_insulator({'end', Status, Time}, St),
    ok.

%% @spec (#test{}) -> ok | {error, eunit_lib:exception()}
%%                  | {skipped, eunit_test:wrapperError()}

run_test(#test{f = F}) ->
    try eunit_test:run_testfun(F) of
	{ok, _Value} ->
	    %% just discard the return value
	    ok;
	{error, Exception} ->
	    {error, Exception}
    catch
	throw:WrapperError -> {skipped, WrapperError}
    end.

set_group_order(#group{order = undefined}, St) ->
    St;
set_group_order(#group{order = Order}, St) ->
    St#procstate{order = Order}.

handle_group(T, St0) ->
    St = set_group_order(T, St0),
    case T#group.spawn of
	undefined ->
	    run_group(T, St);
	Type ->
	    Child = spawn_group(Type, T, St),
	    wait_for_task(Child, St)
    end.

spawn_group(Type, T, St0) ->
    Fun = fun (St) ->
		  fun () -> run_group(T, St) end
	  end,
    start_task(Type, Fun, St0).

run_group(T, St) ->
    %% note that the setup/cleanup is outside the group timeout; if the
    %% setup fails, we do not start any timers
    Timeout = T#group.timeout,
    Data = [{desc, T#group.desc}, {spawn, T#group.spawn},
	    {order, T#group.order}],
    message_insulator({'begin', group, Data}, St),
    F = fun (G) -> enter_group(G, Timeout, St) end,
    try with_context(T, F) of
	{Status, Time} ->
	    message_insulator({'end', Status, Time}, St)
    catch
	%% a throw here can come from eunit_data:enter_context/4 or from
	%% get_next_item/1; for context errors, report group as aborted,
	%% but continue processing tests
	{context_error, Why, Trace} ->
	    message_insulator({skipped, {Why, Trace}}, St)
    end,
    ok.

enter_group(T, Timeout, St) ->
    with_timeout(Timeout, ?DEFAULT_GROUP_TIMEOUT,
		 fun () -> tests(T, St) end, St).

with_context(#group{context = undefined, tests = T}, F) ->
    F(T);
with_context(#group{context = #context{} = C, tests = I}, F) ->
    eunit_data:enter_context(C, I, F).

%% Group leader process for test cases - collects I/O output requests.

new_group_leader(Runner) ->
    %% We must use spawn/3 here (with explicit module and function
    %% name), because the 'current function' status of the group leader
    %% is used by the UNDER_EUNIT macro (in eunit.hrl). If we spawn
    %% using a fun, the current function will be 'erlang:apply/2' during
    %% early process startup, which will fool the macro.
    spawn_link(?MODULE, group_leader_process, [Runner]).

group_leader_process(Runner) ->
    group_leader_loop(Runner, infinity, []).

group_leader_loop(Runner, Wait, Buf) ->
    receive
	{io_request, From, ReplyAs, Req} ->
	    P = process_flag(priority, normal),
	    %% run this part under normal priority always
	    Buf1 = io_request(From, ReplyAs, Req, Buf),
	    process_flag(priority, P),
	    group_leader_loop(Runner, Wait, Buf1);
	stop ->
	    %% quitting time: make a minimal pause, go low on priority,
	    %% set receive-timeout to zero and schedule out again
	    receive after 2 -> ok end,
	    process_flag(priority, low),
	    group_leader_loop(Runner, 0, Buf);
	_ ->
	    %% discard any other messages
	    group_leader_loop(Runner, Wait, Buf)
    after Wait ->
	    %% no more messages and nothing to wait for; we ought to
	    %% have collected all immediately pending output now
	    process_flag(priority, normal),
	    Runner ! {self(), buffer_to_binary(Buf)}
    end.

group_leader_sync(G) ->
    G ! stop,
    receive
	{G, Buf} -> Buf
    end.

%% Implementation of buffering I/O for group leader processes. (Note that
%% each batch of characters is just pushed on the buffer, so it needs to
%% be reversed when it is flushed.)

io_request(From, ReplyAs, Req, Buf) ->
    {Reply, Buf1} = io_request(Req, Buf),
    io_reply(From, ReplyAs, Reply),
    Buf1.

io_reply(From, ReplyAs, Reply) ->
    From ! {io_reply, ReplyAs, Reply}.

io_request({put_chars, Chars}, Buf) ->
    {ok, [Chars | Buf]};
io_request({put_chars, M, F, As}, Buf) ->
    try apply(M, F, As) of
	Chars -> {ok, [Chars | Buf]}
    catch
	C:T -> {{error, {C,T,erlang:get_stacktrace()}}, Buf}
    end;
io_request({put_chars, _Enc, Chars}, Buf) ->
    io_request({put_chars, Chars}, Buf);
io_request({put_chars, _Enc, Mod, Func, Args}, Buf) ->
    io_request({put_chars, Mod, Func, Args}, Buf);
io_request({get_chars, _Enc, _Prompt, _N}, Buf) ->
    {eof, Buf};
io_request({get_chars, _Prompt, _N}, Buf) ->
    {eof, Buf};
io_request({get_line, _Prompt}, Buf) ->
    {eof, Buf};
io_request({get_line, _Enc, _Prompt}, Buf) ->
    {eof, Buf};
io_request({get_until, _Prompt, _M, _F, _As}, Buf) ->
    {eof, Buf};
io_request({setopts, _Opts}, Buf) ->
    {ok, Buf};
io_request(getopts, Buf) ->
    {error, {error, enotsup}, Buf};
io_request({get_geometry,columns}, Buf) ->
    {error, {error, enotsup}, Buf};
io_request({get_geometry,rows}, Buf) ->
    {error, {error, enotsup}, Buf};
io_request({requests, Reqs}, Buf) ->
    io_requests(Reqs, {ok, Buf});
io_request(_, Buf) ->
    {{error, request}, Buf}.

io_requests([R | Rs], {ok, Buf}) ->
    io_requests(Rs, io_request(R, Buf));
io_requests(_, Result) ->
    Result.