%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2008-2011. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%----------------------------------------------------------------
%% Purpose:Test Suite for the 'pg2' module.
%%-----------------------------------------------------------------
-module(pg2_SUITE).

-include_lib("test_server/include/test_server.hrl").
-define(datadir, ?config(data_dir, Config)).
-define(privdir, ?config(priv_dir, Config)).

-export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1, 
	 init_per_group/2,end_per_group/2, 
	 init_per_testcase/2, end_per_testcase/2]).

-export([
	 otp_7277/1, otp_8259/1, otp_8653/1,
         compat/1, basic/1]).

% Default timetrap timeout (set in init_per_testcase).
-define(default_timeout, ?t:minutes(1)).

-define(TESTCASE, testcase_name).
-define(testcase, ?config(?TESTCASE, Config)).

%% Internal export.
-export([mk_part_node_and_group/3, part2/4,
         mk_part_node/3, part1/5, p_init/3, start_proc/1, sane/0]).

init_per_testcase(Case, Config) ->
    ?line Dog = ?t:timetrap(?default_timeout),
    [{?TESTCASE, Case}, {watchdog, Dog} | Config].

end_per_testcase(_Case, _Config) ->
    test_server_ctrl:kill_slavenodes(),
    Dog = ?config(watchdog, _Config),
    test_server:timetrap_cancel(Dog),
    ok.

suite() -> [{ct_hooks,[ts_install_cth]}].

all() -> 
    [{group, tickets}].

groups() -> 
    [{tickets, [],
      [otp_7277, otp_8259, otp_8653, compat, basic]}].

init_per_suite(Config) ->
    Config.

end_per_suite(_Config) ->
    ok.

init_per_group(_GroupName, Config) ->
    Config.

end_per_group(_GroupName, Config) ->
    Config.



otp_7277(doc) ->
    "OTP-7277. Bugfix leave().";
otp_7277(suite) -> [];
otp_7277(Config) when is_list(Config) ->
    ?line ok = pg2:create(a),
    ?line ok = pg2:create(b),
    P = spawn(forever()),
    ?line ok = pg2:join(a, P),
    ?line ok = pg2:leave(b, P),
    ?line true = exit(P, kill),
    case {pg2:get_members(a), pg2:get_local_members(a)} of
        {[], []} ->
            ok;
        _ ->
            timer:sleep(100),
            ?line [] = pg2:get_members(a),
            ?line [] = pg2:get_local_members(a)
    end,
    ?line _ = pg2:delete(a),
    ?line _ = pg2:delete(b),
    ok.

-define(UNTIL(Seq), loop_until_true(fun() -> Seq end, Config)).
-define(UNTIL_LOOP, 300).

otp_8653(suite) -> [];
otp_8653(doc) ->
    ["OTP-8259. Member was not removed after being killed."];
otp_8653(Config) when is_list(Config) ->
    Timeout = 15,
    ?line Dog = test_server:timetrap({seconds,Timeout}),

    ?line [A, B, C] = start_nodes([a, b, c], peer, Config),

    ?line wait_for_ready_net(Config),

    % make b and c connected, partitioned from node() and a
    ?line rpc_cast(B, ?MODULE, part2, [Config, node(), A, C]),
    ?line ?UNTIL(is_ready_partition(Config)),

    % Connect to the other partition.
    ?line pong = net_adm:ping(B),
    timer:sleep(100),
    ?line pong = net_adm:ping(C),
    ?line _ = global:sync(),
    ?line [A, B, C] = lists:sort(nodes()),

    G = pg2_otp_8653,
    ?line ?UNTIL(begin
                     GA = lists:sort(rpc:call(A, pg2, get_members, [G])),
                     GB = lists:sort(rpc:call(B, pg2, get_members, [G])),
                     GC = lists:sort(rpc:call(C, pg2, get_members, [G])),
                     GT = lists:sort(pg2:get_members(G)),
                     GA =:= GB andalso
                     GB =:= GC andalso
                     GC =:= GT andalso
                     8 =:= length(GA)
                 end),
    ?line ok = pg2:delete(G),
    ?line stop_nodes([A,B,C]),
    ?line test_server:timetrap_cancel(Dog),
    ok.

part2(Config, Main, A, C) ->
    Function = mk_part_node_and_group,
    case catch begin
		   make_partition(Config, [Main, A], [node(), C], Function)
	       end
    of
	ok -> ok
    end.

mk_part_node_and_group(File, MyPart0, Config) ->
    touch(File, "start"), % debug
    MyPart = lists:sort(MyPart0),
    ?UNTIL(is_node_in_part(File, MyPart)),
    G = pg2_otp_8653,
    Pid = spawn(forever()),
    ok = pg2:create(G),
    _ = [ok = pg2:join(G, Pid) || _ <- [1,1]],
    touch(File, "done").

otp_8259(suite) -> [];
otp_8259(doc) ->
    ["OTP-8259. Member was not removed after being killed."];
otp_8259(Config) when is_list(Config) ->
    Timeout = 15,
    ?line Dog = test_server:timetrap({seconds,Timeout}),

    ?line [A, B, C] = start_nodes([a, b, c], peer, Config),

    ?line wait_for_ready_net(Config),

    G = pg2_otp_8259,
    Name = otp_8259_a_global_name,

    % start different processes in both partitions
    ?line {Pid, yes} = rpc:call(A, ?MODULE, start_proc, [Name]),

    ?line ok = pg2:create(G),
    ?line ok = pg2:join(G, Pid),

    % make b and c connected, partitioned from node() and a
    ?line rpc_cast(B, ?MODULE, part1, [Config, node(), A, C, Name]),
    ?line ?UNTIL(is_ready_partition(Config)),

    % Connect to the other partition.
    % The resolver on node b will be called.
    ?line pong = net_adm:ping(B),
    timer:sleep(100),
    ?line pong = net_adm:ping(C),
    ?line _ = global:sync(),
    ?line [A, B, C] = lists:sort(nodes()),

    %% Pid has been killed by the resolver.
    %% Pid has been removed from pg2 on all nodes, in particular node B.
    ?line ?UNTIL([] =:= rpc:call(B, pg2, get_members, [G])),
    ?line ?UNTIL([] =:= pg2:get_members(G)),
    ?line ?UNTIL([] =:= rpc:call(A, pg2, get_members, [G])),
    ?line ?UNTIL([] =:= rpc:call(C, pg2, get_members, [G])),

    ?line ok = pg2:delete(G),
    ?line stop_nodes([A,B,C]),
    ?line test_server:timetrap_cancel(Dog),
    ok.

part1(Config, Main, A, C, Name) ->
    case catch begin
		   make_partition(Config, [Main, A], [node(), C]),
		   ?line {_Pid, yes} = start_proc(Name)
	       end of
	{_, yes} -> ok
    end.

start_proc(Name) ->
    Pid = spawn(?MODULE, p_init, [self(), Name, node()]),
    receive
	{Pid, Res} -> {Pid, Res}
    end.

p_init(Parent, Name, TestServer) ->
    Resolve = fun(_Name, Pid1, Pid2) ->
                      %% The pid on node a will be chosen.
                      [{_,Min}, {_,Max}] =
                          lists:sort([{node(Pid1),Pid1}, {node(Pid2),Pid2}]),
                      %% b is connected to test_server.
                      %% exit(Min, kill), % would ping a
                      rpc:cast(TestServer, erlang, exit, [Min, kill]),
		      Max
              end,
    X = global:register_name(Name, self(), Resolve),
    Parent ! {self(),X},
    loop().

loop() ->
    receive
	die ->
	    exit(normal)
    end.

compat(suite) -> [];
compat(doc) ->
    ["OTP-8259. Check that 'exchange' and 'del_member' work."];
compat(Config) when is_list(Config) ->
    case ?t:is_release_available("r13b") of
        true ->
            Timeout = 15,
            ?line Dog = test_server:timetrap({seconds,Timeout}),
            Pid = spawn(forever()),
            G = a,
            ?line ok = pg2:create(G),
            ?line ok = pg2:join(G, Pid),
            ?line ok = pg2:join(G, Pid),
            ?line {ok, A} = start_node_rel(r13, r13b, slave),
            ?line pong = net_adm:ping(A),
            ?line wait_for_ready_net(Config),
            ?line {ok, _} = rpc:call(A, pg2, start, []),
            ?line ?UNTIL([Pid,Pid] =:= rpc:call(A, pg2, get_members, [a])),
            ?line true = exit(Pid, kill),
            ?line ?UNTIL([] =:= pg2:get_members(a)),
            ?line ?UNTIL([] =:= rpc:call(A, pg2, get_members, [a])),
            ?t:stop_node(A),
            ?line test_server:timetrap_cancel(Dog);
        false ->
	    {skipped, "No support for old node"}
    end.

basic(suite) -> [];
basic(doc) ->
    ["OTP-8259. Some basic tests."];
basic(Config) when is_list(Config) ->
    _ = [pg2:delete(G) || G <- pg2:which_groups()],
    ?line _ = [do(Cs, T, Config) || {T,Cs} <- ts()],
    ok.

ts() ->
    [
     {t1,
      [{create,[a],ignore},
       {which_groups,[],[a]},
       {get_closest_pid,[a],{error, {no_process, a}}},
       {delete,[a],ignore}]},
     {t2,
      [{create,[a],ignore},
       {join,[a,self()],ok},
       {get_closest_pid,[a],self()},
       {delete,[a],ignore}]},
     {t3,
      [{create,[a],ignore},
       {new,p1},
       {leave,[a,p1],ok},
       {join,[b,p1],{error,{no_such_group,b}}},
       {leave,[b,p1],{error,{no_such_group,b}}},
       {get_members,[c],{error,{no_such_group,c}}},
       {get_local_members,[c],{error,{no_such_group,c}}},
       {join,[a,p1],ok},
       {leave,[a,p1],ok},
       {join,[a,p1],ok},
       {join,[a,p1],ok},
       {create,[a],ignore},
       {get_closest_pid,[a],p1},
       {leave,[a,p1],ok},
       {get_closest_pid,[a],p1},
       {leave,[a,p1],ok},
       {get_closest_pid,[a],{error,{no_process, a}}},
       {kill,p1},
       {delete,[a],ignore}]},
     {t4,
      [{create,[a],ignore},
       {new,p1},
       {join,[a,p1],ok},
       {get_members,[a],[p1]},
       {get_local_members,[a],[p1]},
       {kill,p1},
       {get_members,[a],[]},
       {get_local_members,[a],[]},
       {delete,[a],ignore}]},
     {t5,
      [{create,[a],ignore},
       {nodeup,n1},
       {create,[a],ignore},
       {join,[a,self()],ok},
       {new,n1,p1},
       {n1,{create,[b],ignore}},
       {join,[a,p1],ok},
       {join,[b,p1],ok},
       {n1,{which_groups,[],[a,b]}},
       {n1,{join,[a,p1],ok}},
       {n1,{join,[b,p1],ok}},
       {leave,[a,self()],ok},
       {n1,{leave,[a,self()],ok}}, % noop
       {n1,{leave,[b,p1],ok}},
       {leave,[b,p1],ok},
       {kill,n1,p1},
       {nodedown,n1},
       {delete,[b],ignore},
       {delete,[a],ignore}]},
     {t6,
      [{create,[a],ignore}, % otp_7277
       {create,[b],ignore},
       {new,p},
       {join,[a,p],ok},
       {leave,[b,p],ok},
       {kill,p},
       {get_members,[a],[]},
       {get_local_members,[a],[]},
       {delete,[a],ignore},
       {delete,[b],ignore}]},
     {t7, % p1 joins twice, the new node gets informed about that
      [{create,[a],ignore},
       {new,p1},
       {join,[a,p1],ok},
       {join,[a,p1],ok},
       {get_members,[a],[p1,p1]},
       {get_local_members,[a],[p1,p1]},
       {nodeup,n1},
       {leave,[a,p1],ok},
       {get_members,[a],[p1]},
       {get_local_members,[a],[p1]},
       {n1,{get_members,[a],[p1]}},
       {leave,[a,p1],ok},
       {get_members,[a],[]},
       {n1,{get_members,[a],[]}},
       {nodedown,n1},
       {delete,[a],ignore},
       {kill,p1}]},
     {t8,
      [{create,[a],ignore},
       {new,p1},
       {join,[a,p1],ok},
       {join,[a,p1],ok},
       {delete,[a],ignore},
       {get_members,[a],{error,{no_such_group,a}}},
       {kill,p1}]}
    ].

do(Cs, T, Config) ->
    ?t:format("*** Test ~p ***~n", [T]),
    {ok,T} = (catch {do(Cs, [], [], Config),T}).

do([{nodeup,N} | Cs], Ps, Ns, Config) ->
    [TestNode] = start_nodes([N], peer, Config),
    pr(node(), {nodeup,N,TestNode}),
    global:sync(),
    timer:sleep(100),
    {ok,_} = rpc:call(TestNode, pg2, start, []),
    NNs = [{N,TestNode} | Ns],
    sane(NNs),
    do(Cs, Ps, NNs, Config);
do([{nodedown,N}=C | Cs], Ps, Ns, Config) ->
    {N, TestNode} = lists:keyfind(N, 1, Ns),
    stop_node(TestNode),
    timer:sleep(100),
    pr(node(), C),
    do(Cs, Ps, lists:keydelete(N, 1, Ns), Config);
do([{new,P} | Cs], Ps, Ns, Config) ->
    NPs = new_proc(node(), P, Ps, Ns),
    do(Cs, NPs, Ns, Config);
do([{new,N,P} | Cs], Ps, Ns, Config) ->
    NPs = new_proc(N, P, Ps, Ns),
    do(Cs, NPs, Ns, Config);
do([{kill,P} | Cs], Ps, Ns, Config) ->
    NPs = killit(node(), P, Ps, Ns),
    do(Cs, NPs, Ns, Config);
do([{kill,N,P} | Cs], Ps, Ns, Config) ->
    NPs = killit(N, P, Ps, Ns),
    do(Cs, NPs, Ns, Config);
do([{Node,{_,_,_}=C} | Cs], Ps, Ns, Config) ->
    doit(Node, C, Ps, Ns),
    do(Cs, Ps, Ns, Config);
do([C | Cs], Ps, Ns, Config) ->
    doit(node(), C, Ps, Ns),
    do(Cs, Ps, Ns, Config);
do([], Ps, Ns, _Config) ->
    [] = Ns,
    [] = Ps,
    [] = pg2:which_groups(),
    [] = ets:tab2list(pg2_table),
    [] = nodes(),
    ok.

doit(N, C, Ps, Ns) ->
    Node = get_node(N, Ns),
    pr(Node, C),
    {F,As,R} = replace_pids(C, Ps),
    case rpc:call(Node, erlang, apply, [pg2, F, As]) of
        Result when Result =:= R orelse R =:= ignore ->
            sane(Ns);
        Else ->
            ?t:format("~p and ~p: expected ~p, but got ~p~n",
                      [F, As, R, Else]),
            throw({error,{F, As, R, Else}})
    end.

new_proc(N, P, Ps, Ns) ->
    Node = get_node(N, Ns),
    Pid = rpc:call(Node, erlang, spawn, [forever()]),
    pr(Node, {new,P,Pid}),
    [{P,Pid}|Ps].

killit(N, P, Ps, Ns) ->
    {P, Pid} = lists:keyfind(P, 1, Ps),
    Node = get_node(N, Ns),
    pr(Node, {kill,P,Pid}),
    rpc:call(Node, erlang, exit, [Pid, kill]),
    timer:sleep(100),
    sane(Ns),
    lists:keydelete(P, 1, Ps).

pr(Node, C) ->
    _ = [?t:format("~p: ", [Node]) || Node =/= node()],
    ?t:format("do ~p~n", [C]).

get_node(N, Ns) ->
    if
        N =:= node() ->
            node();
        true ->
            {N, TestNode} = lists:keyfind(N, 1, Ns),
            TestNode
    end.

forever() ->
    fun() -> receive after infinity -> ok end end.

replace_pids(T, Ps) when is_tuple(T) ->
    list_to_tuple(replace_pids(tuple_to_list(T), Ps));
replace_pids([E | Es], Ps) ->
    [replace_pids(E, Ps) | replace_pids(Es, Ps)];
replace_pids(A, Ps) ->
    case lists:keyfind(A, 1, Ps) of
        {A, Pid} ->
            Pid;
        _ ->
            A
    end.

sane(Ns) ->
    Nodes = [node()] ++ [NN || {_,NN} <- Ns],
    _ = [?t:format("~p, pg2_table:~n   ~p~n",  % debug
                   [N, rpc:call(N, ets, tab2list, [pg2_table])]) ||
            N <- Nodes],
    R = [case rpc:call(Node, ?MODULE, sane, []) of
             {'EXIT',Error} ->
                 {error, Node, Error};
             _ ->
                 ok
         end || Node <- Nodes],
    case lists:usort(R) of
        [ok] -> wsane(Nodes);
        _ -> throw(R)
    end.

wsane(Ns) ->
    %% Same members on all nodes:
    {[_],gs} =
        {lists:usort([rpc:call(N, pg2, which_groups, []) || N <- Ns]),gs},
    _ = [{[_],ms,G} = {lists:usort([rpc:call(N, pg2, get_members, [G]) ||
                                       N <- Ns]),ms,G} ||
            G <- pg2:which_groups()],
    %% The local members are a partitioning of the members:
    [begin
         LocalMembers =
             lists:sort(lists:append(
                          [rpc:call(N, pg2, get_local_members, [G]) ||
                              N <- Ns])),
         {part, LocalMembers} = {part, lists:sort(pg2:get_members(G))}
     end || G <- pg2:which_groups()],
    %% The closest pid should run on the local node, if possible.
    [[case rpc:call(N, pg2, get_closest_pid, [G]) of
          Pid when is_pid(Pid), node(Pid) =:= N ->
              true =
                  lists:member(Pid, rpc:call(N, pg2, get_local_members, [G]));
%% FIXME. Om annan nod: member, local = [].
          _ -> [] = rpc:call(N, pg2, get_local_members, [G])
      end || N <- Ns]
     || G <- pg2:which_groups()].

%% Look inside the pg2_table.
sane() ->
    L = ets:tab2list(pg2_table),
    Gs = lists:sort([G || {{group,G}} <- L]),
    MGs = lists:usort([G || {{member,G,_},_} <- L]),
    MPs = lists:usort([P || {{member,_,P},_} <- L]),
    {[],mg,MGs,Gs} = {MGs -- Gs,mg,MGs,Gs},
    RPs = [P || {{ref,P},_RPid,_Ref,_C} <- L],
    {MPs,rp} = {RPs,rp},
    RPs2 = [P || {{ref,_Ref},P} <- L],
    {MPs,rp2} = {RPs2,rp2},
    _ = [true = C >= 1 || {{ref,_P},_RPid,_Ref,C} <- L],
    LGs = lists:usort([G || {{local_member,G,_}} <- L]),
    LPs = lists:usort([P || {{local_member,_,P}} <- L]),
    {[],lg} = {LGs -- Gs,lg},
    {[],lp} = {LPs -- MPs,lp},
    PGs = lists:usort([G || {{pid,_,G}} <- L]),
    PPs = lists:usort([P || {{pid,P,_}} <- L]),
    {[],pg} = {PGs -- Gs,pg},
    {MPs,pp} = {PPs,pp},
    _ = [true = C >= 1 || {{member,_,_},C} <- L],
    ok.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% Mostly copied from global_SUITE.erl
%% (Setting up a partition is quite tricky.)

loop_until_true(Fun, Config) ->
    case Fun() of
	true ->
	    true;
	_ ->
            timer:sleep(?UNTIL_LOOP),
            loop_until_true(Fun, Config)
    end.

start_node_rel(Name, Rel, How) ->
    {Release, Compat} = case Rel of
                            this ->
                                {[this], "+R8"};
                            Rel when is_atom(Rel) ->
                                {[{release, atom_to_list(Rel)}], ""};
                            RelList ->
			       {RelList, ""}
		       end,
    ?line Pa = filename:dirname(code:which(?MODULE)),
    ?line Res = test_server:start_node(Name, How,
                                       [{args,
                                         Compat ++
                                         " -kernel net_setuptime 100 "
                                         " -pa " ++ Pa},
                                        {erl, Release}]),
    Res.

start_nodes(L, How, Config) ->
    start_nodes2(L, How, 0, Config),
    Nodes = collect_nodes(0, length(L)),
    ?line ?UNTIL([] =:= Nodes -- nodes()),
    %% Pinging doesn't help, we have to wait too, for nodes() to become
    %% correct on the other node.
    lists:foreach(fun(E) ->
			  net_adm:ping(E)
		  end,
		  Nodes),
    verify_nodes(Nodes, Config),
    Nodes.

verify_nodes(Nodes, Config) ->
    verify_nodes(Nodes, lists:sort([node() | Nodes]), Config).

verify_nodes([], _N, _Config) ->
    [];
verify_nodes([Node | Rest], N, Config) ->
    ?line ?UNTIL(
       case rpc:call(Node, erlang, nodes, []) of
	   Nodes when is_list(Nodes) ->
	       case N =:= lists:sort([Node | Nodes]) of
		   true ->
		       true;
		   false ->
		       lists:foreach(fun(Nd) ->
					     rpc:call(Nd, net_adm, ping,
                                                      [Node])
				     end,
				     nodes()),
		       false
	       end;
	   _ ->
	       false
       end
      ),
    verify_nodes(Rest, N, Config).


start_nodes2([], _How, _, _Config) ->
    [];
start_nodes2([Name | Rest], How, N, Config) ->
    Self = self(),
    spawn(fun() ->
		  erlang:display({starting, Name}),
		  {ok, R} = start_node(Name, How, Config),
		  erlang:display({started, Name, R}),
		  Self ! {N, R},
		  %% sleeping is necessary, or with peer nodes, they will
		  %% go down again, despite {linked, false}.
		  test_server:sleep(100000)
	  end),
    start_nodes2(Rest, How, N+1, Config).

collect_nodes(N, N) ->
    [];
collect_nodes(N, Max) ->
    receive
	{N, Node} ->
            [Node | collect_nodes(N+1, Max)]
    end.

start_node(Name, How, Config) ->
    start_node(Name, How, "", Config).

start_node(Name0, How, Args, Config) ->
    Name = node_name(Name0, Config),
    Pa = filename:dirname(code:which(?MODULE)),
    test_server:start_node(Name, How, [{args,
                                        Args ++ " " ++
                                        "-kernel net_setuptime 100 "
                                        "-noshell "
                                        "-pa " ++ Pa},
                                       {linked, false}]).
stop_nodes(Nodes) ->
    lists:foreach(fun(Node) -> stop_node(Node) end, Nodes).

stop_node(Node) ->
    ?t:stop_node(Node).

get_known(Node) ->
    case catch gen_server:call({global_name_server,Node},get_known,infinity) of
        {'EXIT', _} ->
            [list, without, nodenames];
        Known when is_list(Known) ->
            lists:sort([Node | Known])
    end.

node_name(Name, Config) ->
    U = "_",
    {{Y,M,D}, {H,Min,S}} = calendar:now_to_local_time(now()),
    Date = io_lib:format("~4w_~2..0w_~2..0w__~2..0w_~2..0w_~2..0w",
                         [Y,M,D, H,Min,S]),
    L = lists:flatten(Date),
    lists:concat([Name,U,?testcase,U,U,L]).

%% This one runs on one node in Part2.
%% The partition is ready when is_ready_partition(Config) returns (true).
make_partition(Config, Part1, Part2) ->
    make_partition(Config, Part1, Part2, mk_part_node).

make_partition(Config, Part1, Part2, Function) ->
    Dir = ?config(priv_dir, Config),
    Ns = [begin
              Name = lists:concat([atom_to_list(N),"_",msec(),".part"]),
              File = filename:join([Dir, Name]),
              file:delete(File),
              rpc_cast(N, ?MODULE, Function, [File, Part, Config], File),
              {N, File}
          end || Part <- [Part1, Part2], N <- Part],
    all_nodes_files(Ns, "done", Config),
    lists:foreach(fun({_N,File}) -> file:delete(File) end, Ns),
    PartFile = make_partition_file(Config),
    touch(PartFile, "done").

%% The node signals its success by touching a file.
mk_part_node(File, MyPart0, Config) ->
    touch(File, "start"), % debug
    MyPart = lists:sort(MyPart0),
    ?UNTIL(is_node_in_part(File, MyPart)),
    touch(File, "done").

%% The calls to append_to_file are for debugging.
is_node_in_part(File, MyPart) ->
    lists:foreach(fun(N) ->
                          _ = erlang:disconnect_node(N)
                  end, nodes() -- MyPart),
    case {(Known = get_known(node())) =:= MyPart,
          (Nodes = lists:sort([node() | nodes()])) =:= MyPart} of
        {true, true} ->
            %% Make sure the resolvers have been terminated,
            %% otherwise they may pop up and send some message.
            %% (This check is probably unnecessary.)
            case element(5, global:info()) of
                [] ->
                    true;
                Rs ->
                    append_to_file(File, {now(), Known, Nodes, Rs}),
                    false
            end;
        _ ->
            append_to_file(File, {now(), Known, Nodes}),
            false
    end.

is_ready_partition(Config) ->
    File = make_partition_file(Config),
    file_contents(File, "done", Config),
    file:delete(File),
    true.

wait_for_ready_net(Config) ->
    wait_for_ready_net([node()|nodes()], Config).

wait_for_ready_net(Nodes0, Config) ->
    Nodes = lists:sort(Nodes0),
    ?t:format("wait_for_ready_net ~p~n", [Nodes]),
    ?UNTIL(begin
               lists:all(fun(N) -> Nodes =:= get_known(N) end, Nodes) and
               lists:all(fun(N) ->
                                 LNs = rpc:call(N, erlang, nodes, []),
                                 Nodes =:= lists:sort([N | LNs])
                         end, Nodes)
           end).

%% To make it less probable that some low-level problem causes
%% problems, the receiving node is ping:ed.
rpc_cast(Node, Module, Function, Args) ->
    {_,pong,Node}= {node(),net_adm:ping(Node),Node},
    rpc:cast(Node, Module, Function, Args).

rpc_cast(Node, Module, Function, Args, File) ->
    case net_adm:ping(Node) of
        pong ->
            rpc:cast(Node, Module, Function, Args);
        Else ->
            append_to_file(File, {now(), {rpc_cast, Node, Module, Function,
                                          Args, Else}})
            %% Maybe we should crash, but it probably doesn't matter.
    end.

touch(File, List) ->
    ok = file:write_file(File, list_to_binary(List)).

append_to_file(File, Term) ->
    {ok, Fd} = file:open(File, [raw,binary,append]),
    ok = file:write(Fd, io_lib:format("~p.~n", [Term])),
    ok = file:close(Fd).

all_nodes_files(Files, ContentsList, Config) ->
    lists:all(fun({_N,File}) ->
                      file_contents(File, ContentsList, Config)
              end, Files).

file_contents(File, ContentsList, Config) ->
    file_contents(File, ContentsList, Config, no_log_file).

file_contents(File, ContentsList, Config, LogFile) ->
    Contents = list_to_binary(ContentsList),
    Sz = size(Contents),
    ?UNTIL(begin
               case file:read_file(File) of
                   {ok, FileContents}=Reply ->
                       case catch split_binary(FileContents, Sz) of
                           {Contents,_} ->
                               true;
                           _ ->
                               catch append_to_file(LogFile,
                                                    {File,Contents,Reply}),
                               false
                       end;
                   Reply ->
                       catch append_to_file(LogFile, {File, Contents, Reply}),
                       false
               end
           end).

make_partition_file(Config) ->
    Dir = ?config(priv_dir, Config),
    filename:join([Dir, atom_to_list(make_partition_done)]).

msec() ->
    msec(now()).

msec(T) ->
     element(1,T)*1000000000 + element(2,T)*1000 + element(3,T) div 1000.