%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1997-2014. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%

%%
-module(mnesia_recover).

-behaviour(gen_server).

-export([
	 allow_garb/0,
	 call/1,
	 connect_nodes/1,
	 disconnect/1,
	 dump_decision_tab/0,
	 get_master_node_info/0,
	 get_master_node_tables/0,
	 get_master_nodes/1,
	 get_mnesia_downs/0,
	 has_mnesia_down/1,
	 incr_trans_tid_serial/0,
	 init/0,
	 log_decision/1,
         log_dump_overload/1,
	 log_master_nodes/3,
	 log_mnesia_down/1,
	 log_mnesia_up/1,
	 mnesia_down/1,
	 note_decision/2,
	 note_log_decision/2,
	 outcome/2,
	 start/0,
	 next_garb/0,
	 next_check_overload/0,
	 still_pending/1,
	 sync_trans_tid_serial/1,
	 sync/0,
	 wait_for_decision/2,
	 what_happened/3
	]).

%% gen_server callbacks
-export([init/1,
	 handle_call/3,
	 handle_cast/2,
	 handle_info/2,
	 terminate/2,
	 code_change/3
	]).

-compile({no_auto_import,[error/2]}).

-include("mnesia.hrl").
-import(mnesia_lib, [set/2, verbose/2, error/2, fatal/2]).

-record(state, {supervisor,
		unclear_pid,
		unclear_decision,
		unclear_waitfor,
		tm_queue_len = 0,
                log_dump_overload = false,
		initiated = false,
		early_msgs = []
	       }).

%%-define(DBG(F, A), mnesia:report_event(list_to_atom(lists:flatten(io_lib:format(F, A))))).
%%-define(DBG(F, A), io:format("DBG: " ++ F, A)).

-record(transient_decision, {tid, outcome}).

start() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [self()],
			  [{timeout, infinity}
			   %%, {debug, [trace]}
			  ]).

init() ->
    call(init).

next_garb() ->
    Pid = whereis(mnesia_recover),
    erlang:send_after(timer:minutes(2), Pid, garb_decisions).

next_check_overload() ->
    Pid = whereis(mnesia_recover),
    erlang:send_after(timer:seconds(10), Pid, check_overload).


do_check_overload(S) ->
    %% Time to check if mnesia_tm is overloaded
    case whereis(mnesia_tm) of
    Pid when is_pid(Pid) ->
        Threshold = 100,
        Prev = S#state.tm_queue_len,
        {message_queue_len, Len} =
        process_info(Pid, message_queue_len),
        if
        Len > Threshold, Prev > Threshold ->
            What = {mnesia_tm, message_queue_len, [Prev, Len]},
            mnesia_lib:report_system_event({mnesia_overload, What}),
            mnesia_lib:overload_set(mnesia_tm, true),
            S#state{tm_queue_len = 0};
        Len > Threshold ->
            S#state{tm_queue_len = Len};
        true ->
            mnesia_lib:overload_set(mnesia_tm, false),
            S#state{tm_queue_len = 0}
        end;
    undefined ->
        S
    end.

allow_garb() ->
    cast(allow_garb).


%% The transaction log has either been swiched (latest -> previous) or
%% there is nothing to be dumped. This means that the previous
%% transaction log only may contain commit records which refers to
%% transactions noted in the last two of the 'Prev' tables. All other
%% tables may now be garbed by 'garb_decisions' (after 2 minutes).
%% Max 10 tables are kept. 
do_allow_garb() ->
    %% The order of the following stuff is important!
    Curr = val(latest_transient_decision),
    %% Don't garb small tables, they are created on every 
    %% dump_log and may be small (empty) for schema transactions
    %% which are dumped twice
    case ets:info(Curr, size) > 20 of
	true ->
	    Old = val(previous_transient_decisions),
	    Next = create_transient_decision(),
	    {Prev, ReallyOld} = sublist([Curr | Old], 10, []),
	    [?ets_delete_table(Tab) || Tab <- ReallyOld],
	    set(previous_transient_decisions, Prev),
	    set(latest_transient_decision, Next);
	false ->
	    ignore
    end.
    
sublist([H|R], N, Acc) when N > 0 ->
    sublist(R, N-1, [H| Acc]);
sublist(List, _N, Acc) ->
    {lists:reverse(Acc), List}.

do_garb_decisions() ->
    case val(previous_transient_decisions) of
	[First, Second | Rest] ->
	    set(previous_transient_decisions, [First, Second]),
	    [?ets_delete_table(Tab) || Tab <- Rest];
	_ ->
	    ignore
    end.

connect_nodes(Ns) ->
    call({connect_nodes, Ns}).

disconnect(Node) ->
    call({disconnect, Node}).

log_decision(D) ->
    cast({log_decision, D}).

val(Var) ->
    case ?catch_val(Var) of
	{'EXIT', _Reason} ->
	    mnesia_lib:other_val(Var);
	Value -> Value
    end.

call(Msg) ->
    Pid = whereis(?MODULE),
    case Pid of
	undefined ->
	    {error, {node_not_running, node()}};
	Pid ->
	    link(Pid),
	    Res = gen_server:call(Pid, Msg, infinity),
	    unlink(Pid),

            %% We get an exit signal if server dies
            receive
                {'EXIT', Pid, _Reason} ->
                    {error, {node_not_running, node()}}
            after 0 ->
                    Res
            end
    end.

multicall(Nodes, Msg) ->
    rpc:multicall(Nodes, ?MODULE, call, [Msg]).

cast(Msg) ->
    case whereis(?MODULE) of
	undefined -> ignore;
	Pid ->  gen_server:cast(Pid, Msg)
    end.

abcast(Nodes, Msg) ->
    gen_server:abcast(Nodes, ?MODULE, Msg).

note_decision(Tid, Outcome) ->
    Tab = val(latest_transient_decision),
    ?ets_insert(Tab, #transient_decision{tid = Tid, outcome = Outcome}).

note_up(Node, _Date, _Time) ->
    ?ets_delete(mnesia_decision, Node).
    
note_down(Node, Date, Time) ->
    ?ets_insert(mnesia_decision, {mnesia_down, Node, Date, Time}).
    
note_master_nodes(Tab, []) ->
    ?ets_delete(mnesia_decision, Tab);
note_master_nodes(Tab, Nodes) when is_list(Nodes) ->
    Master = {master_nodes, Tab, Nodes},
    ?ets_insert(mnesia_decision, Master).

note_outcome(D) when D#decision.disc_nodes == [] ->
%%    ?DBG("~w: note_tmp_decision: ~w~n", [node(), D]),
    note_decision(D#decision.tid, filter_outcome(D#decision.outcome)),
    ?ets_delete(mnesia_decision, D#decision.tid);
note_outcome(D) when D#decision.disc_nodes /= [] ->
%%    ?DBG("~w: note_decision: ~w~n", [node(), D]),
    ?ets_insert(mnesia_decision, D).

do_log_decision(D) when D#decision.outcome /= unclear ->
    OldD = decision(D#decision.tid),
    MergedD = merge_decisions(node(), OldD, D),
    do_log_decision(MergedD, true, D);
do_log_decision(D) ->
    do_log_decision(D, false, undefined).

do_log_decision(D, DoTell, NodeD) ->
    DiscNs = D#decision.disc_nodes -- [node()],
    Outcome = D#decision.outcome,
    D2 =
	case Outcome of
	    aborted -> D#decision{disc_nodes = DiscNs};
	    committed -> D#decision{disc_nodes = DiscNs};
	    _ -> D
	end,
    note_outcome(D2),
    case mnesia_monitor:use_dir() of
	true ->
	    if
		DoTell == true, Outcome /= unclear ->
		    tell_im_certain(NodeD#decision.disc_nodes--[node()],D2),
		    tell_im_certain(NodeD#decision.ram_nodes--[node()], D2),
		    mnesia_log:log(D2);
		Outcome /= unclear ->
		    mnesia_log:log(D2);
		true ->
		    ignore
	    end;
	false ->
	    ignore
    end.

tell_im_certain([], _D) ->
    ignore;
tell_im_certain(Nodes, D) ->
    Msg = {im_certain, node(), D},
  %%  mnesia_lib:verbose("~w: tell: ~w~n", [Msg, Nodes]), 
    abcast(Nodes, Msg).

sync() ->
    call(sync).

log_mnesia_up(Node) ->
    call({log_mnesia_up, Node}).

log_mnesia_down(Node) ->
    call({log_mnesia_down, Node}).

get_mnesia_downs() ->
    Tab = mnesia_decision,
    Pat = {mnesia_down, '_', '_', '_'},
    Downs = ?ets_match_object(Tab, Pat),
    [Node || {mnesia_down, Node, _Date, _Time} <- Downs].

%% Check if we have got a mnesia_down from Node
has_mnesia_down(Node) ->
    case ?ets_lookup(mnesia_decision, Node) of
	[{mnesia_down, Node, _Date, _Time}] ->
	    true;
	[] ->
	    false
    end.
    
mnesia_down(Node) ->
    case ?catch_val(recover_nodes) of
	{'EXIT', _} ->
	    %% Not started yet
	    ignore;
	_ ->
	    mnesia_lib:del(recover_nodes, Node),
	    cast({mnesia_down, Node})
    end.

log_dump_overload(Flag) when is_boolean(Flag) ->
    cast({log_dump_overload, Flag}).

log_master_nodes(Args, UseDir, IsRunning) ->
    if
	IsRunning == yes ->
	    log_master_nodes2(Args, UseDir, IsRunning, ok);
	UseDir == false ->
	    ok;
	true ->
	    Name = latest_log,
	    Fname = mnesia_log:latest_log_file(),
	    Exists = mnesia_lib:exists(Fname),
	    Repair = mnesia:system_info(auto_repair),
	    OpenArgs = [{file, Fname}, {name, Name}, {repair, Repair}],
	    case disk_log:open(OpenArgs) of
		{ok, Name} ->
		    log_master_nodes2(Args, UseDir, IsRunning, ok);
		{repaired, Name, {recovered,  _R}, {badbytes, _B}}
		  when Exists == true ->
		    log_master_nodes2(Args, UseDir, IsRunning, ok);
		{repaired, Name, {recovered,  _R}, {badbytes, _B}}
		  when Exists == false ->
		    mnesia_log:write_trans_log_header(),
		    log_master_nodes2(Args, UseDir, IsRunning, ok);
		{error, Reason} ->
		    {error, Reason}
	    end
    end.

log_master_nodes2([{Tab, Nodes} | Tail], UseDir, IsRunning, WorstRes) ->
    Res = 
	case IsRunning of
	    yes ->
		R = call({log_master_nodes, Tab, Nodes, UseDir, IsRunning}),
		mnesia_controller:master_nodes_updated(Tab, Nodes),
		R;
	    _ ->
		do_log_master_nodes(Tab, Nodes, UseDir, IsRunning)
	end,
    case Res of
	ok ->
	    log_master_nodes2(Tail, UseDir, IsRunning, WorstRes);
	{error, Reason} ->
	    log_master_nodes2(Tail, UseDir, IsRunning, {error, Reason})
    end;
log_master_nodes2([], _UseDir, IsRunning, WorstRes) ->
    case IsRunning of
	yes ->
	    WorstRes;
	_ ->
	    disk_log:close(latest_log),
	    WorstRes
    end.

get_master_node_info() ->
    Tab = mnesia_decision,
    Pat = {master_nodes, '_', '_'},
    try mnesia_lib:db_match_object(ram_copies,Tab, Pat)
    catch error:_ -> []
    end.

get_master_node_tables() ->
    Masters = get_master_node_info(),
    [Tab || {master_nodes, Tab, _Nodes} <- Masters].

get_master_nodes(Tab) ->
    try ?ets_lookup_element(mnesia_decision, Tab, 3)
    catch error:_ -> []
    end.

%% Determine what has happened to the transaction
what_happened(Tid, Protocol, Nodes) ->
    Default = 
	case Protocol of
	    asym_trans -> aborted;
	    _ -> unclear  %% sym_trans and sync_sym_trans
	end,
    This = node(),
    case lists:member(This, Nodes) of
	true ->
	    {ok, Outcome} = call({what_happened, Default, Tid}),
	    Others = Nodes -- [This],
	    case filter_outcome(Outcome) of
		unclear -> what_happened_remotely(Tid, Default, Others);
		aborted -> aborted;
		committed -> committed
	    end;
	false ->
	    what_happened_remotely(Tid, Default, Nodes)
    end.

what_happened_remotely(Tid, Default, Nodes) ->
    {Replies, _} = multicall(Nodes, {what_happened, Default, Tid}),
    check_what_happened(Replies, 0, 0).

check_what_happened([H | T], Aborts, Commits) ->
    case H of
	{ok, R} ->
	    case filter_outcome(R) of
		committed ->
		    check_what_happened(T, Aborts, Commits + 1);
		aborted ->
		    check_what_happened(T, Aborts + 1, Commits);
		unclear ->
		    check_what_happened(T, Aborts, Commits)
	    end;
	{error, _} ->
	    check_what_happened(T, Aborts, Commits);
	{badrpc, _} ->
	    check_what_happened(T, Aborts, Commits)
    end;
check_what_happened([], Aborts, Commits) ->
    if
	Aborts == 0, Commits == 0 -> aborted;  % None of the active nodes knows
	Aborts > 0 -> aborted;                 % Someody has aborted
	Aborts == 0, Commits > 0 -> committed  % All has committed
    end.

%% Determine what has happened to the transaction
%% and possibly wait forever for the decision.
wait_for_decision(presume_commit, _InitBy) ->
    %% sym_trans
    {{presume_commit, self()}, committed};

wait_for_decision(D, InitBy) when D#decision.outcome == presume_abort ->
    wait_for_decision(D, InitBy, 0).
    
wait_for_decision(D, InitBy, N) -> 
    %% asym_trans
    Tid = D#decision.tid,
    Max = 10,
    Outcome = outcome(Tid, D#decision.outcome),
    if 
	Outcome =:= committed -> {Tid, committed};
	Outcome =:= aborted   -> {Tid, aborted};
	Outcome =:= presume_abort -> 
	    case N > Max of
		true -> {Tid, aborted};
		false -> % busy loop for ets decision moving
		    timer:sleep(10),
		    wait_for_decision(D, InitBy, N+1)
	    end;
	InitBy /= startup ->
	    %% Wait a while for active transactions
	    %% to end and try again
	    timer:sleep(100), 
	    wait_for_decision(D, InitBy, N);
	InitBy == startup ->
	    {ok, Res} = call({wait_for_decision, D}),
	    {Tid, Res}
    end.

still_pending([Tid | Pending]) ->
    case filter_outcome(outcome(Tid, unclear)) of
	unclear -> [Tid | still_pending(Pending)];
	_ -> still_pending(Pending)
    end;
still_pending([]) ->
    [].

load_decision_tab() ->
    Cont = mnesia_log:open_decision_tab(),
    load_decision_tab(Cont, load_decision_tab),
    mnesia_log:close_decision_tab().

load_decision_tab(Cont, InitBy) ->
    case mnesia_log:chunk_decision_tab(Cont) of
	{Cont2, Decisions} ->
	    note_log_decisions(Decisions, InitBy),
	    load_decision_tab(Cont2, InitBy);
	eof ->
	    ok
    end.

%% Dumps DECISION.LOG and PDECISION.LOG and removes them.
%% From now on all decisions are logged in the transaction log file
convert_old() ->
    HasOldStuff = 
	mnesia_lib:exists(mnesia_log:previous_decision_log_file()) or
	mnesia_lib:exists(mnesia_log:decision_log_file()),
    case HasOldStuff of
	true ->
	    mnesia_log:open_decision_log(),
	    dump_decision_log(startup),
	    dump_decision_log(startup),
	    mnesia_log:close_decision_log(),
	    Latest = mnesia_log:decision_log_file(),
	    ok = file:delete(Latest);
	false ->
	    ignore
    end.

dump_decision_log(InitBy) ->
    %% Assumed to be run in transaction log dumper process
    Cont = mnesia_log:prepare_decision_log_dump(),
    perform_dump_decision_log(Cont, InitBy).

perform_dump_decision_log(Cont, InitBy) when InitBy == startup ->
    case mnesia_log:chunk_decision_log(Cont) of
	{Cont2, Decisions} ->
	    note_log_decisions(Decisions, InitBy),
	    perform_dump_decision_log(Cont2, InitBy);
	eof ->
	    confirm_decision_log_dump()
    end;
perform_dump_decision_log(_Cont, _InitBy) ->
    confirm_decision_log_dump().

confirm_decision_log_dump() ->
    dump_decision_tab(),
    mnesia_log:confirm_decision_log_dump().

dump_decision_tab() ->
    Tab = mnesia_decision,
    All = mnesia_lib:db_match_object(ram_copies,Tab, '_'),
    mnesia_log:save_decision_tab({decision_list, All}).

note_log_decisions([What | Tail], InitBy) ->
    note_log_decision(What, InitBy),
    note_log_decisions(Tail, InitBy);
note_log_decisions([], _InitBy) ->
    ok.

note_log_decision(NewD, InitBy) when NewD#decision.outcome == pre_commit ->
    note_log_decision(NewD#decision{outcome = unclear}, InitBy);

note_log_decision(NewD, _InitBy) when is_record(NewD, decision) ->
    Tid = NewD#decision.tid,
    sync_trans_tid_serial(Tid),
    note_outcome(NewD);
note_log_decision({trans_tid, serial, _Serial}, startup) ->
    ignore;
note_log_decision({trans_tid, serial, Serial}, _InitBy) ->
    sync_trans_tid_serial(Serial);
note_log_decision({mnesia_up, Node, Date, Time}, _InitBy) ->
    note_up(Node, Date, Time);
note_log_decision({mnesia_down, Node, Date, Time}, _InitBy) ->
    note_down(Node, Date, Time);
note_log_decision({master_nodes, Tab, Nodes}, _InitBy) ->
    note_master_nodes(Tab, Nodes);
note_log_decision(H, _InitBy) when H#log_header.log_kind == decision_log ->
    V = mnesia_log:decision_log_version(),
    if
	H#log_header.log_version == V->
	    ok;
	H#log_header.log_version == "2.0" ->
	    verbose("Accepting an old version format of decision log: ~p~n",
		    [V]),
	    ok;
	true ->
	    fatal("Bad version of decision log: ~p~n", [H])
    end;
note_log_decision(H, _InitBy) when H#log_header.log_kind == decision_tab ->
    V = mnesia_log:decision_tab_version(),
    if
	V == H#log_header.log_version ->
	    ok;
	true ->
	    fatal("Bad version of decision tab: ~p~n", [H])
    end;
note_log_decision({decision_list, ItemList}, InitBy) ->
    note_log_decisions(ItemList, InitBy);
note_log_decision(BadItem, InitBy) ->
    exit({"Bad decision log item", BadItem, InitBy}).

trans_tid_serial() ->
    ?ets_lookup_element(mnesia_decision, serial, 3).

set_trans_tid_serial(Val) ->
    ?ets_insert(mnesia_decision, {trans_tid, serial, Val}).

incr_trans_tid_serial() ->
    ?ets_update_counter(mnesia_decision, serial, 1).

sync_trans_tid_serial(ThatCounter) when is_integer(ThatCounter) ->
    ThisCounter = trans_tid_serial(),
    if
	ThatCounter > ThisCounter ->
	    set_trans_tid_serial(ThatCounter + 1);
	true ->
	    ignore
    end;
sync_trans_tid_serial(Tid) ->
    sync_trans_tid_serial(Tid#tid.counter).



%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Callback functions from gen_server

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State}          |
%%          {ok, State, Timeout} |
%%          {stop, Reason}
%%----------------------------------------------------------------------
init([Parent]) ->
    process_flag(trap_exit, true),
    mnesia_lib:verbose("~p starting: ~p~n", [?MODULE, self()]),
    set(latest_transient_decision, create_transient_decision()),
    set(previous_transient_decisions, []),
    set(recover_nodes, []),
    State = #state{supervisor = Parent},
    {ok, State}.

create_transient_decision() ->
    ?ets_new_table(mnesia_transient_decision, [{keypos, 2}, set, public]).

%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State}          |
%%          {reply, Reply, State, Timeout} |
%%          {noreply, State}               |
%%          {noreply, State, Timeout}      |
%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
%%----------------------------------------------------------------------

handle_call(init, From, State) when State#state.initiated == false ->
    Args = [{keypos, 2}, set, public, named_table],
    case mnesia_monitor:use_dir() of
	true ->
	    ?ets_new_table(mnesia_decision, Args),
	    set_trans_tid_serial(0),
	    TabFile = mnesia_log:decision_tab_file(),
	    case mnesia_lib:exists(TabFile) of
		true ->
		    load_decision_tab();
		false ->
		    ignore
	    end,
	    convert_old(),
	    mnesia_dumper:opt_dump_log(scan_decisions);
	false ->
	    ?ets_new_table(mnesia_decision, Args),
	    set_trans_tid_serial(0)
    end,
    handle_early_msgs(State, From);

handle_call(Msg, From, State) when State#state.initiated == false ->
    %% Buffer early messages
    Msgs = State#state.early_msgs,
    {noreply, State#state{early_msgs = [{call, Msg, From} | Msgs]}};

handle_call({disconnect, Node}, _From, State) ->
    mnesia_monitor:disconnect(Node),
    mnesia_lib:del(recover_nodes, Node),
    {reply, ok, State};

handle_call({connect_nodes, Ns}, From, State) ->
    %% Determine which nodes we should try to connect
    AlreadyConnected = val(recover_nodes),
    {_, Nodes} = mnesia_lib:search_delete(node(), Ns),
    Check = Nodes -- AlreadyConnected,
    case mnesia_monitor:negotiate_protocol(Check) of
	busy -> 
	    %% monitor is disconnecting some nodes retry 
	    %% the req (to avoid deadlock).
	    erlang:send_after(2, self(), {connect_nodes,Ns,From}),
	    {noreply, State};
	[] ->
	    %% No good noodes to connect to!
	    %% We can't use reply here because this function can be
	    %% called from handle_info
	    gen_server:reply(From, {[], AlreadyConnected}),
	    {noreply, State};
	ProbablyGoodNodes ->
	    %% Now we have agreed upon a protocol with some new nodes
	    %% and we may use them when we recover transactions.
	    %%
	    %% Just in case Mnesia was stopped on some of those nodes
	    %% between the protocol negotiation and now, we check one
	    %% more time the state of Mnesia.
	    %%
	    %% Of course, there is still a chance that mnesia_down
	    %% events occur during this check and we miss them. To
	    %% prevent it, handle_cast({mnesia_down, ...}, ...) removes
	    %% the down node again, in addition to mnesia_down/1.
	    %%
	    %% See a comment in handle_cast({mnesia_down, ...}, ...).
	    Verify = fun(N) ->
			     Run = mnesia_lib:is_running(N),
			     Run =:= yes orelse Run =:= starting
		     end,
	    GoodNodes = [N || N <- ProbablyGoodNodes, Verify(N)],

	    mnesia_lib:add_list(recover_nodes, GoodNodes),
	    cast({announce_all, GoodNodes}),
	    case get_master_nodes(schema) of
		[] ->
		    Context = starting_partitioned_network,
		    mnesia_monitor:detect_inconcistency(GoodNodes, Context);
		_ -> %% If master_nodes is set ignore old inconsistencies
		    ignore
	    end,
	    gen_server:reply(From, {GoodNodes, AlreadyConnected}),
	    {noreply,State}
    end;

handle_call({what_happened, Default, Tid}, _From, State) ->
    sync_trans_tid_serial(Tid),
    Outcome = outcome(Tid, Default),
    {reply, {ok, Outcome}, State};

handle_call({wait_for_decision, D}, From, State) ->
    Recov = val(recover_nodes),
    AliveRam = (mnesia_lib:intersect(D#decision.ram_nodes, Recov) -- [node()]),
    RemoteDisc = D#decision.disc_nodes -- [node()],
    if
	AliveRam == [], RemoteDisc == [] ->
	    %% No more else to wait for and we may safely abort
	    {reply, {ok, aborted}, State};
	true ->
	    verbose("Transaction ~p is unclear. "
		    "Wait for disc nodes: ~w ram: ~w~n",
		    [D#decision.tid, RemoteDisc, AliveRam]),
	    AliveDisc = mnesia_lib:intersect(RemoteDisc, Recov),
	    Msg = {what_decision, node(), D},
	    abcast(AliveRam, Msg),
	    abcast(AliveDisc, Msg),
	    case val(max_wait_for_decision) of
		infinity ->
		    ignore;
		MaxWait ->
		    ForceMsg =  {force_decision, D#decision.tid},
		    {ok, _} = timer:send_after(MaxWait, ForceMsg)
	    end,
	    State2 = State#state{unclear_pid = From,
				 unclear_decision = D,
				 unclear_waitfor = (RemoteDisc ++ AliveRam)},
	    {noreply, State2}
    end;

handle_call({log_mnesia_up, Node}, _From, State) ->
    do_log_mnesia_up(Node),
    {reply, ok, State};

handle_call({log_mnesia_down, Node}, _From, State) ->
    do_log_mnesia_down(Node),
    {reply, ok, State};

handle_call({log_master_nodes, Tab, Nodes, UseDir, IsRunning}, _From, State) ->
    do_log_master_nodes(Tab, Nodes, UseDir, IsRunning),
    {reply, ok, State};

handle_call(sync, _From, State) ->
    {reply, ok, State};

handle_call(Msg, _From, State) ->
    error("~p got unexpected call: ~p~n", [?MODULE, Msg]),
    {noreply, State}.

do_log_mnesia_up(Node) ->
    Yoyo = {mnesia_up, Node, Date = date(), Time = time()},
    case mnesia_monitor:use_dir() of
	true ->
	    mnesia_log:append(latest_log, Yoyo),
	    disk_log:sync(latest_log);
	false  ->
	    ignore
    end,
    note_up(Node, Date, Time).

do_log_mnesia_down(Node) ->
    Yoyo = {mnesia_down, Node, Date = date(), Time = time()},
    case mnesia_monitor:use_dir() of
	true ->
	    mnesia_log:append(latest_log, Yoyo),
	    disk_log:sync(latest_log);
	false  ->
	    ignore
    end,
    note_down(Node, Date, Time).

do_log_master_nodes(Tab, Nodes, UseDir, IsRunning) ->
    Master = {master_nodes, Tab, Nodes},
    Res = 
	case UseDir of
	    true ->
		LogRes = mnesia_log:append(latest_log, Master),
		disk_log:sync(latest_log),
		LogRes;
	    false  ->
		ok
	end,
    case IsRunning of
	yes ->
	    note_master_nodes(Tab, Nodes);
	_NotRunning ->
	    ignore
    end,
    Res.

%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------

handle_cast(Msg, State) when State#state.initiated == false ->
    %% Buffer early messages
    Msgs = State#state.early_msgs,
    {noreply, State#state{early_msgs = [{cast, Msg} | Msgs]}};

handle_cast({im_certain, Node, NewD}, State) ->
    OldD = decision(NewD#decision.tid),
    MergedD = merge_decisions(Node, OldD, NewD),    
    do_log_decision(MergedD, false, undefined),
    {noreply, State};

handle_cast({log_decision, D}, State) ->
    do_log_decision(D),
    {noreply, State};

handle_cast(allow_garb, State) ->
    do_allow_garb(),
    {noreply, State};

handle_cast({decisions, Node, Decisions}, State) ->
    mnesia_lib:add(recover_nodes, Node),
    State2 = add_remote_decisions(Node, Decisions, State),
    {noreply, State2};

handle_cast({what_decision, Node, OtherD}, State) ->
    Tid = OtherD#decision.tid,
    sync_trans_tid_serial(Tid),
    Decision = 
	case decision(Tid) of
	    no_decision -> OtherD;
	    MyD when is_record(MyD, decision) -> MyD
	end,
    announce([Node], [Decision], [], true),
    {noreply, State};

handle_cast({mnesia_down, Node}, State) ->
    %% The node was already removed from recover_nodes in mnesia_down/1,
    %% but we do it again here in the mnesia_recover process, in case
    %% another event incorrectly added it back. This can happen during
    %% Mnesia startup which takes time betweenthe connection, the
    %% protocol negotiation and the merge of the schema.
    %%
    %% See a comment in handle_call({connect_nodes, ...), ...).
    mnesia_lib:del(recover_nodes, Node),
    case State#state.unclear_decision of
	undefined ->
	    {noreply, State};
	D ->
	    case lists:member(Node, D#decision.ram_nodes) of
		false ->
		    {noreply, State};
		true ->
		    State2 = add_remote_decision(Node, D, State),
		    {noreply, State2}
		end
    end;

handle_cast({announce_all, Nodes}, State) ->
    announce_all(Nodes),
    {noreply, State};

handle_cast({log_dump_overload, Flag}, State) when is_boolean(Flag) ->
    Prev = State#state.log_dump_overload,
    Overload = Prev orelse Flag,
    mnesia_lib:overload_set(mnesia_dump_log, Overload),
    {noreply, State#state{log_dump_overload = Flag}};

handle_cast(Msg, State) ->
    error("~p got unexpected cast: ~p~n", [?MODULE, Msg]),
    {noreply, State}.

%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------

%% No need for buffering
%% handle_info(Msg, State) when State#state.initiated == false ->
%%     %% Buffer early messages
%%     Msgs = State#state.early_msgs,
%%     {noreply, State#state{early_msgs = [{info, Msg} | Msgs]}};

handle_info({connect_nodes, Ns, From}, State) ->
    handle_call({connect_nodes,Ns},From,State);

handle_info(check_overload, S) ->
    State2 = do_check_overload(S),
    next_check_overload(),
    {noreply, State2};

handle_info(garb_decisions, State) ->
    do_garb_decisions(),
    next_garb(),
    {noreply, State};

handle_info({force_decision, Tid}, State) ->
    %% Enforce a transaction recovery decision,
    %% if we still are waiting for the outcome
    
    case State#state.unclear_decision of
	U when U#decision.tid == Tid ->
	    verbose("Decided to abort transaction ~p since "
		    "max_wait_for_decision has been exceeded~n",
		    [Tid]),
	    D = U#decision{outcome = aborted},
	    State2 = add_remote_decision(node(), D, State),
	    {noreply, State2};
	_ ->
	    {noreply, State}
    end;

handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor ->
    mnesia_lib:dbg_out("~p was ~p~n",[?MODULE, R]),
    {stop, shutdown, State};

handle_info(Msg, State) ->
    error("~p got unexpected info: ~p~n", [?MODULE, Msg]),
    {noreply, State}.

%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------

terminate(Reason, State) ->
    mnesia_monitor:terminate_proc(?MODULE, Reason, State).

%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Upgrade process when its code is to be changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(_OldVsn, {state,
                      Supervisor,
                      Unclear_pid,
                      Unclear_decision,
                      Unclear_waitfor,
                      Tm_queue_len,
                      Initiated,
                      Early_msgs
                     }, _Extra) ->
    {ok, #state{supervisor = Supervisor,
                unclear_pid = Unclear_pid,
                unclear_decision = Unclear_decision,
                unclear_waitfor = Unclear_waitfor,
                tm_queue_len = Tm_queue_len,
                initiated = Initiated,
                early_msgs = Early_msgs}};
code_change(_OldVsn, #state{} = State, _Extra) ->
    {ok, State}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

handle_early_msgs(State, From) ->
    Res = do_handle_early_msgs(State#state.early_msgs,
			       State#state{early_msgs = [],
					   initiated = true}),
    gen_server:reply(From, ok),
    Res.

do_handle_early_msgs([Msg | Msgs], State) ->
    %% The messages are in reverted order
    case do_handle_early_msgs(Msgs, State) of
%%         {stop, Reason, Reply, State2} ->
%% 	    {stop, Reason, Reply, State2};
        {stop, Reason, State2} ->
	    {stop, Reason, State2};
	{noreply, State2} ->
	    handle_early_msg(Msg, State2)
    end;

do_handle_early_msgs([], State) ->
    {noreply, State}.
    
handle_early_msg({call, Msg, From}, State) ->
    case handle_call(Msg, From, State) of
	{reply, R, S} ->
	    gen_server:reply(From, R),
	    {noreply, S};
	Other ->
	    Other
    end;
handle_early_msg({cast, Msg}, State) ->
    handle_cast(Msg, State);
handle_early_msg({info, Msg}, State) ->
    handle_info(Msg, State).

tabs() ->
    Curr = val(latest_transient_decision),    % Do not miss any trans even
    Prev = val(previous_transient_decisions), % if the tabs are switched
    [Curr, mnesia_decision | Prev].           % Ordered by hit probability

decision(Tid) ->
    decision(Tid, tabs()).

decision(Tid, [Tab | Tabs]) ->
    try ?ets_lookup(Tab, Tid) of
	[D] when is_record(D, decision) ->
	    D;
	[C] when is_record(C, transient_decision) ->
	    #decision{tid = C#transient_decision.tid,
		      outcome =  C#transient_decision.outcome,
		      disc_nodes = [],
		      ram_nodes = []
		     };
	[] ->
	    decision(Tid, Tabs)
    catch error:_ ->
	    %% Recently switched transient decision table
	    decision(Tid, Tabs)
    end;
decision(_Tid, []) ->
    no_decision.

outcome(Tid, Default) ->
    outcome(Tid, Default, tabs()).

outcome(Tid, Default, [Tab | Tabs]) ->
    try ?ets_lookup_element(Tab, Tid, 3)
    catch error:_ -> outcome(Tid, Default, Tabs)
    end;
outcome(_Tid, Default, []) ->
    Default.

filter_outcome(Val) ->
    case Val of
	unclear -> unclear;
	aborted -> aborted;
	presume_abort -> aborted;
	committed -> committed;
	pre_commit -> unclear
    end.

filter_aborted(D) when D#decision.outcome == presume_abort ->
    D#decision{outcome = aborted};
filter_aborted(D) ->
    D.
  
%% Merge old decision D with new (probably remote) decision
merge_decisions(Node, D, NewD0) ->
    NewD = filter_aborted(NewD0),
    if
	D == no_decision, node() /= Node ->
	    %% We did not know anything about this txn
	    NewD#decision{disc_nodes = []};
	D == no_decision ->
	    NewD;
	is_record(D, decision) ->
	    DiscNs = D#decision.disc_nodes -- ([node(), Node]),
	    OldD = filter_aborted(D#decision{disc_nodes = DiscNs}),
%%	    mnesia_lib:dbg_out("merge ~w: NewD = ~w~n D = ~w~n OldD = ~w~n", 
%%			       [Node, NewD, D, OldD]),
	    if
		OldD#decision.outcome == unclear,
		NewD#decision.outcome == unclear ->
		    D;

		OldD#decision.outcome == NewD#decision.outcome ->
		    %% We have come to the same decision
		    OldD;

		OldD#decision.outcome == committed,
		NewD#decision.outcome == aborted ->
		    %% Interesting! We have already committed,
		    %% but someone else has aborted. Now we
		    %% have a nice little inconcistency. The
		    %% other guy (or some one else) has 
		    %% enforced a recovery decision when
		    %% max_wait_for_decision was exceeded.
		    %% We will pretend that we have obeyed
		    %% the forced recovery decision, but we
		    %% will also generate an event in case the
		    %% application wants to do something clever.
		    Msg = {inconsistent_database, bad_decision, Node},
		    mnesia_lib:report_system_event(Msg),
		    OldD#decision{outcome = aborted};

		OldD#decision.outcome == aborted ->
		    %% aborted overrrides anything
		    OldD#decision{outcome = aborted};

		NewD#decision.outcome == aborted ->
		    %% aborted overrrides anything
		    OldD#decision{outcome = aborted};

		OldD#decision.outcome == committed,
		NewD#decision.outcome == unclear ->
		    %% committed overrides unclear
		    OldD#decision{outcome = committed};

		OldD#decision.outcome == unclear,
		NewD#decision.outcome == committed ->
		    %% committed overrides unclear
		    OldD#decision{outcome = committed}
	    end
    end.

add_remote_decisions(Node, [D | Tail], State) when is_record(D, decision) ->
    State2 = add_remote_decision(Node, D, State),
    add_remote_decisions(Node, Tail, State2);

add_remote_decisions(Node, [C | Tail], State)
        when is_record(C, transient_decision) ->
    D = #decision{tid = C#transient_decision.tid,
		  outcome = C#transient_decision.outcome,
		  disc_nodes = [],
		  ram_nodes = []},
    State2 = add_remote_decision(Node, D, State),
    add_remote_decisions(Node, Tail, State2);

add_remote_decisions(Node, [{mnesia_down, _, _, _} | Tail], State) ->
    add_remote_decisions(Node, Tail, State);

add_remote_decisions(Node, [{trans_tid, serial, Serial} | Tail], State) ->
    sync_trans_tid_serial(Serial),
    case State#state.unclear_decision of
	undefined ->
	    ignored;
	D ->
	    case lists:member(Node, D#decision.ram_nodes) of
		true ->
		    ignore;
		false ->
		    abcast([Node], {what_decision, node(), D})
	    end
    end,
    add_remote_decisions(Node, Tail, State);

add_remote_decisions(_Node, [], State) ->
    State.

add_remote_decision(Node, NewD, State) ->
    Tid = NewD#decision.tid,
    OldD = decision(Tid),
    D = merge_decisions(Node, OldD, NewD),
    do_log_decision(D, false, undefined),
    Outcome = D#decision.outcome,
    if
	OldD == no_decision ->
	    ignore;
	Outcome == unclear ->
	    ignore;
	true ->
	    case lists:member(node(), NewD#decision.disc_nodes) or
		 lists:member(node(), NewD#decision.ram_nodes) of
		true ->
		    tell_im_certain([Node], D);
		false ->
		    ignore
	    end
    end,
    case State#state.unclear_decision of
	U when U#decision.tid == Tid ->
	    WaitFor = State#state.unclear_waitfor -- [Node],
	    if
		Outcome == unclear, WaitFor == [] ->
		    %% Everybody are uncertain, lets abort
		    NewOutcome = aborted,
		    CertainD = D#decision{outcome = NewOutcome, 
					  disc_nodes = [],
					  ram_nodes = []},
		    tell_im_certain(D#decision.disc_nodes, CertainD),
		    tell_im_certain(D#decision.ram_nodes, CertainD),
		    do_log_decision(CertainD, false, undefined),
		    verbose("Decided to abort transaction ~p "
			    "since everybody are uncertain ~p~n",
			    [Tid, CertainD]),
		    gen_server:reply(State#state.unclear_pid, {ok, NewOutcome}),
		    State#state{unclear_pid = undefined,
				unclear_decision = undefined,
				unclear_waitfor = undefined};
		Outcome /= unclear ->
		    verbose("~p told us that transaction ~p was ~p~n",
			    [Node, Tid, Outcome]),
		    gen_server:reply(State#state.unclear_pid, {ok, Outcome}),
		    State#state{unclear_pid = undefined,
				unclear_decision = undefined,
				unclear_waitfor = undefined};
		Outcome == unclear ->
		    State#state{unclear_waitfor = WaitFor}
	    end;
	_ ->
	    State
    end.

announce_all([]) ->
    ok;
announce_all(ToNodes) ->
    Tid = trans_tid_serial(),
    announce(ToNodes, [{trans_tid,serial,Tid}], [], false).
    
announce(ToNodes, [Head | Tail], Acc, ForceSend) ->
    Acc2 = arrange(ToNodes, Head, Acc, ForceSend),
    announce(ToNodes, Tail, Acc2, ForceSend);

announce(_ToNodes, [], Acc, _ForceSend) ->
    send_decisions(Acc).

send_decisions([{Node, Decisions} | Tail]) ->
    abcast([Node], {decisions, node(), Decisions}),
    send_decisions(Tail);
send_decisions([]) ->
    ok.

arrange([To | ToNodes], D, Acc, ForceSend) when is_record(D, decision) ->
    NeedsAdd = (ForceSend or
		lists:member(To, D#decision.disc_nodes) or
		lists:member(To, D#decision.ram_nodes)),
    case NeedsAdd of
	true ->
	    Acc2 = add_decision(To, D, Acc),
	    arrange(ToNodes, D, Acc2, ForceSend);
	false ->
	    arrange(ToNodes, D, Acc, ForceSend)
    end;

arrange([To | ToNodes], {trans_tid, serial, Serial}, Acc, ForceSend) ->
    %% Do the lamport thing plus release the others
    %% from uncertainity.
    Acc2 = add_decision(To, {trans_tid, serial, Serial}, Acc),
    arrange(ToNodes, {trans_tid, serial, Serial}, Acc2, ForceSend);

arrange([], _Decision, Acc, _ForceSend) ->
    Acc.

add_decision(Node, Decision, [{Node, Decisions} | Tail]) ->
    [{Node, [Decision | Decisions]} | Tail];
add_decision(Node, Decision, [Head | Tail]) ->
    [Head | add_decision(Node, Decision, Tail)];
add_decision(Node, Decision, []) ->
    [{Node, [Decision]}].