%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1997-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%%
-module(mnesia_recover).
-behaviour(gen_server).
-export([
allow_garb/0,
call/1,
connect_nodes/1,
disconnect/1,
dump_decision_tab/0,
get_master_node_info/0,
get_master_node_tables/0,
get_master_nodes/1,
get_mnesia_downs/0,
has_mnesia_down/1,
incr_trans_tid_serial/0,
init/0,
log_decision/1,
log_dump_overload/1,
log_master_nodes/3,
log_mnesia_down/1,
log_mnesia_up/1,
mnesia_down/1,
note_decision/2,
note_log_decision/2,
outcome/2,
start/0,
next_garb/0,
next_check_overload/0,
still_pending/1,
sync_trans_tid_serial/1,
sync/0,
wait_for_decision/2,
what_happened/3
]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-compile({no_auto_import,[error/2]}).
-include("mnesia.hrl").
-import(mnesia_lib, [set/2, verbose/2, error/2, fatal/2]).
-record(state, {supervisor,
unclear_pid,
unclear_decision,
unclear_waitfor,
tm_queue_len = 0,
log_dump_overload = false,
initiated = false,
early_msgs = []
}).
%%-define(DBG(F, A), mnesia:report_event(list_to_atom(lists:flatten(io_lib:format(F, A))))).
%%-define(DBG(F, A), io:format("DBG: " ++ F, A)).
-record(transient_decision, {tid, outcome}).
start() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [self()],
[{timeout, infinity}
%%, {debug, [trace]}
]).
init() ->
call(init).
next_garb() ->
Pid = whereis(mnesia_recover),
erlang:send_after(timer:minutes(2), Pid, garb_decisions).
next_check_overload() ->
Pid = whereis(mnesia_recover),
erlang:send_after(timer:seconds(10), Pid, check_overload).
do_check_overload(S) ->
%% Time to check if mnesia_tm is overloaded
case whereis(mnesia_tm) of
Pid when is_pid(Pid) ->
Threshold = 100,
Prev = S#state.tm_queue_len,
{message_queue_len, Len} =
process_info(Pid, message_queue_len),
if
Len > Threshold, Prev > Threshold ->
What = {mnesia_tm, message_queue_len, [Prev, Len]},
mnesia_lib:report_system_event({mnesia_overload, What}),
mnesia_lib:overload_set(mnesia_tm, true),
S#state{tm_queue_len = 0};
Len > Threshold ->
S#state{tm_queue_len = Len};
true ->
mnesia_lib:overload_set(mnesia_tm, false),
S#state{tm_queue_len = 0}
end;
undefined ->
S
end.
allow_garb() ->
cast(allow_garb).
%% The transaction log has either been swiched (latest -> previous) or
%% there is nothing to be dumped. This means that the previous
%% transaction log only may contain commit records which refers to
%% transactions noted in the last two of the 'Prev' tables. All other
%% tables may now be garbed by 'garb_decisions' (after 2 minutes).
%% Max 10 tables are kept.
do_allow_garb() ->
%% The order of the following stuff is important!
Curr = val(latest_transient_decision),
%% Don't garb small tables, they are created on every
%% dump_log and may be small (empty) for schema transactions
%% which are dumped twice
case ets:info(Curr, size) > 20 of
true ->
Old = val(previous_transient_decisions),
Next = create_transient_decision(),
{Prev, ReallyOld} = sublist([Curr | Old], 10, []),
[?ets_delete_table(Tab) || Tab <- ReallyOld],
set(previous_transient_decisions, Prev),
set(latest_transient_decision, Next);
false ->
ignore
end.
sublist([H|R], N, Acc) when N > 0 ->
sublist(R, N-1, [H| Acc]);
sublist(List, _N, Acc) ->
{lists:reverse(Acc), List}.
do_garb_decisions() ->
case val(previous_transient_decisions) of
[First, Second | Rest] ->
set(previous_transient_decisions, [First, Second]),
[?ets_delete_table(Tab) || Tab <- Rest];
_ ->
ignore
end.
connect_nodes(Ns) ->
call({connect_nodes, Ns}).
disconnect(Node) ->
call({disconnect, Node}).
log_decision(D) ->
cast({log_decision, D}).
val(Var) ->
case ?catch_val(Var) of
{'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason);
Value -> Value
end.
call(Msg) ->
Pid = whereis(?MODULE),
case Pid of
undefined ->
{error, {node_not_running, node()}};
Pid ->
link(Pid),
Res = gen_server:call(Pid, Msg, infinity),
unlink(Pid),
%% We get an exit signal if server dies
receive
{'EXIT', Pid, _Reason} ->
{error, {node_not_running, node()}}
after 0 ->
Res
end
end.
multicall(Nodes, Msg) ->
rpc:multicall(Nodes, ?MODULE, call, [Msg]).
cast(Msg) ->
case whereis(?MODULE) of
undefined -> ignore;
Pid -> gen_server:cast(Pid, Msg)
end.
abcast(Nodes, Msg) ->
gen_server:abcast(Nodes, ?MODULE, Msg).
note_decision(Tid, Outcome) ->
Tab = val(latest_transient_decision),
?ets_insert(Tab, #transient_decision{tid = Tid, outcome = Outcome}).
note_up(Node, _Date, _Time) ->
?ets_delete(mnesia_decision, Node).
note_down(Node, Date, Time) ->
?ets_insert(mnesia_decision, {mnesia_down, Node, Date, Time}).
note_master_nodes(Tab, []) ->
?ets_delete(mnesia_decision, Tab);
note_master_nodes(Tab, Nodes) when is_list(Nodes) ->
Master = {master_nodes, Tab, Nodes},
?ets_insert(mnesia_decision, Master).
note_outcome(D) when D#decision.disc_nodes == [] ->
%% ?DBG("~w: note_tmp_decision: ~w~n", [node(), D]),
note_decision(D#decision.tid, filter_outcome(D#decision.outcome)),
?ets_delete(mnesia_decision, D#decision.tid);
note_outcome(D) when D#decision.disc_nodes /= [] ->
%% ?DBG("~w: note_decision: ~w~n", [node(), D]),
?ets_insert(mnesia_decision, D).
do_log_decision(D) when D#decision.outcome /= unclear ->
OldD = decision(D#decision.tid),
MergedD = merge_decisions(node(), OldD, D),
do_log_decision(MergedD, true, D);
do_log_decision(D) ->
do_log_decision(D, false, undefined).
do_log_decision(D, DoTell, NodeD) ->
DiscNs = D#decision.disc_nodes -- [node()],
Outcome = D#decision.outcome,
D2 =
case Outcome of
aborted -> D#decision{disc_nodes = DiscNs};
committed -> D#decision{disc_nodes = DiscNs};
_ -> D
end,
note_outcome(D2),
case mnesia_monitor:use_dir() of
true ->
if
DoTell == true, Outcome /= unclear ->
tell_im_certain(NodeD#decision.disc_nodes--[node()],D2),
tell_im_certain(NodeD#decision.ram_nodes--[node()], D2),
mnesia_log:log(D2);
Outcome /= unclear ->
mnesia_log:log(D2);
true ->
ignore
end;
false ->
ignore
end.
tell_im_certain([], _D) ->
ignore;
tell_im_certain(Nodes, D) ->
Msg = {im_certain, node(), D},
%% mnesia_lib:verbose("~w: tell: ~w~n", [Msg, Nodes]),
abcast(Nodes, Msg).
sync() ->
call(sync).
log_mnesia_up(Node) ->
call({log_mnesia_up, Node}).
log_mnesia_down(Node) ->
call({log_mnesia_down, Node}).
get_mnesia_downs() ->
Tab = mnesia_decision,
Pat = {mnesia_down, '_', '_', '_'},
Downs = ?ets_match_object(Tab, Pat),
[Node || {mnesia_down, Node, _Date, _Time} <- Downs].
%% Check if we have got a mnesia_down from Node
has_mnesia_down(Node) ->
case ?ets_lookup(mnesia_decision, Node) of
[{mnesia_down, Node, _Date, _Time}] ->
true;
[] ->
false
end.
mnesia_down(Node) ->
case ?catch_val(recover_nodes) of
{'EXIT', _} ->
%% Not started yet
ignore;
_ ->
mnesia_lib:del(recover_nodes, Node),
cast({mnesia_down, Node})
end.
log_dump_overload(Flag) when is_boolean(Flag) ->
cast({log_dump_overload, Flag}).
log_master_nodes(Args, UseDir, IsRunning) ->
if
IsRunning == yes ->
log_master_nodes2(Args, UseDir, IsRunning, ok);
UseDir == false ->
ok;
true ->
Name = latest_log,
Fname = mnesia_log:latest_log_file(),
Exists = mnesia_lib:exists(Fname),
Repair = mnesia:system_info(auto_repair),
OpenArgs = [{file, Fname}, {name, Name}, {repair, Repair}],
case disk_log:open(OpenArgs) of
{ok, Name} ->
log_master_nodes2(Args, UseDir, IsRunning, ok);
{repaired, Name, {recovered, _R}, {badbytes, _B}}
when Exists == true ->
log_master_nodes2(Args, UseDir, IsRunning, ok);
{repaired, Name, {recovered, _R}, {badbytes, _B}}
when Exists == false ->
mnesia_log:write_trans_log_header(),
log_master_nodes2(Args, UseDir, IsRunning, ok);
{error, Reason} ->
{error, Reason}
end
end.
log_master_nodes2([{Tab, Nodes} | Tail], UseDir, IsRunning, WorstRes) ->
Res =
case IsRunning of
yes ->
R = call({log_master_nodes, Tab, Nodes, UseDir, IsRunning}),
mnesia_controller:master_nodes_updated(Tab, Nodes),
R;
_ ->
do_log_master_nodes(Tab, Nodes, UseDir, IsRunning)
end,
case Res of
ok ->
log_master_nodes2(Tail, UseDir, IsRunning, WorstRes);
{error, Reason} ->
log_master_nodes2(Tail, UseDir, IsRunning, {error, Reason})
end;
log_master_nodes2([], _UseDir, IsRunning, WorstRes) ->
case IsRunning of
yes ->
WorstRes;
_ ->
disk_log:close(latest_log),
WorstRes
end.
get_master_node_info() ->
Tab = mnesia_decision,
Pat = {master_nodes, '_', '_'},
case catch mnesia_lib:db_match_object(ram_copies,Tab, Pat) of
{'EXIT', _} ->
[];
Masters ->
Masters
end.
get_master_node_tables() ->
Masters = get_master_node_info(),
[Tab || {master_nodes, Tab, _Nodes} <- Masters].
get_master_nodes(Tab) ->
case catch ?ets_lookup_element(mnesia_decision, Tab, 3) of
{'EXIT', _} -> [];
Nodes -> Nodes
end.
%% Determine what has happened to the transaction
what_happened(Tid, Protocol, Nodes) ->
Default =
case Protocol of
asym_trans -> aborted;
_ -> unclear %% sym_trans and sync_sym_trans
end,
This = node(),
case lists:member(This, Nodes) of
true ->
{ok, Outcome} = call({what_happened, Default, Tid}),
Others = Nodes -- [This],
case filter_outcome(Outcome) of
unclear -> what_happened_remotely(Tid, Default, Others);
aborted -> aborted;
committed -> committed
end;
false ->
what_happened_remotely(Tid, Default, Nodes)
end.
what_happened_remotely(Tid, Default, Nodes) ->
{Replies, _} = multicall(Nodes, {what_happened, Default, Tid}),
check_what_happened(Replies, 0, 0).
check_what_happened([H | T], Aborts, Commits) ->
case H of
{ok, R} ->
case filter_outcome(R) of
committed ->
check_what_happened(T, Aborts, Commits + 1);
aborted ->
check_what_happened(T, Aborts + 1, Commits);
unclear ->
check_what_happened(T, Aborts, Commits)
end;
{error, _} ->
check_what_happened(T, Aborts, Commits);
{badrpc, _} ->
check_what_happened(T, Aborts, Commits)
end;
check_what_happened([], Aborts, Commits) ->
if
Aborts == 0, Commits == 0 -> aborted; % None of the active nodes knows
Aborts > 0 -> aborted; % Someody has aborted
Aborts == 0, Commits > 0 -> committed % All has committed
end.
%% Determine what has happened to the transaction
%% and possibly wait forever for the decision.
wait_for_decision(presume_commit, _InitBy) ->
%% sym_trans
{{presume_commit, self()}, committed};
wait_for_decision(D, InitBy) when D#decision.outcome == presume_abort ->
wait_for_decision(D, InitBy, 0).
wait_for_decision(D, InitBy, N) ->
%% asym_trans
Tid = D#decision.tid,
Max = 10,
Outcome = outcome(Tid, D#decision.outcome),
if
Outcome =:= committed -> {Tid, committed};
Outcome =:= aborted -> {Tid, aborted};
Outcome =:= presume_abort ->
case N > Max of
true -> {Tid, aborted};
false -> % busy loop for ets decision moving
timer:sleep(10),
wait_for_decision(D, InitBy, N+1)
end;
InitBy /= startup ->
%% Wait a while for active transactions
%% to end and try again
timer:sleep(100),
wait_for_decision(D, InitBy, N);
InitBy == startup ->
{ok, Res} = call({wait_for_decision, D}),
{Tid, Res}
end.
still_pending([Tid | Pending]) ->
case filter_outcome(outcome(Tid, unclear)) of
unclear -> [Tid | still_pending(Pending)];
_ -> still_pending(Pending)
end;
still_pending([]) ->
[].
load_decision_tab() ->
Cont = mnesia_log:open_decision_tab(),
load_decision_tab(Cont, load_decision_tab),
mnesia_log:close_decision_tab().
load_decision_tab(eof, _InitBy) ->
ok;
load_decision_tab(Cont, InitBy) ->
case mnesia_log:chunk_decision_tab(Cont) of
{Cont2, Decisions} ->
note_log_decisions(Decisions, InitBy),
load_decision_tab(Cont2, InitBy);
eof ->
ok
end.
%% Dumps DECISION.LOG and PDECISION.LOG and removes them.
%% From now on all decisions are logged in the transaction log file
convert_old() ->
HasOldStuff =
mnesia_lib:exists(mnesia_log:previous_decision_log_file()) or
mnesia_lib:exists(mnesia_log:decision_log_file()),
case HasOldStuff of
true ->
mnesia_log:open_decision_log(),
dump_decision_log(startup),
dump_decision_log(startup),
mnesia_log:close_decision_log(),
Latest = mnesia_log:decision_log_file(),
ok = file:delete(Latest);
false ->
ignore
end.
dump_decision_log(InitBy) ->
%% Assumed to be run in transaction log dumper process
Cont = mnesia_log:prepare_decision_log_dump(),
perform_dump_decision_log(Cont, InitBy).
perform_dump_decision_log(eof, _InitBy) ->
confirm_decision_log_dump();
perform_dump_decision_log(Cont, InitBy) when InitBy == startup ->
case mnesia_log:chunk_decision_log(Cont) of
{Cont2, Decisions} ->
note_log_decisions(Decisions, InitBy),
perform_dump_decision_log(Cont2, InitBy);
eof ->
confirm_decision_log_dump()
end;
perform_dump_decision_log(_Cont, _InitBy) ->
confirm_decision_log_dump().
confirm_decision_log_dump() ->
dump_decision_tab(),
mnesia_log:confirm_decision_log_dump().
dump_decision_tab() ->
Tab = mnesia_decision,
All = mnesia_lib:db_match_object(ram_copies,Tab, '_'),
mnesia_log:save_decision_tab({decision_list, All}).
note_log_decisions([What | Tail], InitBy) ->
note_log_decision(What, InitBy),
note_log_decisions(Tail, InitBy);
note_log_decisions([], _InitBy) ->
ok.
note_log_decision(NewD, InitBy) when NewD#decision.outcome == pre_commit ->
note_log_decision(NewD#decision{outcome = unclear}, InitBy);
note_log_decision(NewD, _InitBy) when is_record(NewD, decision) ->
Tid = NewD#decision.tid,
sync_trans_tid_serial(Tid),
note_outcome(NewD);
note_log_decision({trans_tid, serial, _Serial}, startup) ->
ignore;
note_log_decision({trans_tid, serial, Serial}, _InitBy) ->
sync_trans_tid_serial(Serial);
note_log_decision({mnesia_up, Node, Date, Time}, _InitBy) ->
note_up(Node, Date, Time);
note_log_decision({mnesia_down, Node, Date, Time}, _InitBy) ->
note_down(Node, Date, Time);
note_log_decision({master_nodes, Tab, Nodes}, _InitBy) ->
note_master_nodes(Tab, Nodes);
note_log_decision(H, _InitBy) when H#log_header.log_kind == decision_log ->
V = mnesia_log:decision_log_version(),
if
H#log_header.log_version == V->
ok;
H#log_header.log_version == "2.0" ->
verbose("Accepting an old version format of decision log: ~p~n",
[V]),
ok;
true ->
fatal("Bad version of decision log: ~p~n", [H])
end;
note_log_decision(H, _InitBy) when H#log_header.log_kind == decision_tab ->
V = mnesia_log:decision_tab_version(),
if
V == H#log_header.log_version ->
ok;
true ->
fatal("Bad version of decision tab: ~p~n", [H])
end;
note_log_decision({decision_list, ItemList}, InitBy) ->
note_log_decisions(ItemList, InitBy);
note_log_decision(BadItem, InitBy) ->
exit({"Bad decision log item", BadItem, InitBy}).
trans_tid_serial() ->
?ets_lookup_element(mnesia_decision, serial, 3).
set_trans_tid_serial(Val) ->
?ets_insert(mnesia_decision, {trans_tid, serial, Val}).
incr_trans_tid_serial() ->
?ets_update_counter(mnesia_decision, serial, 1).
sync_trans_tid_serial(ThatCounter) when is_integer(ThatCounter) ->
ThisCounter = trans_tid_serial(),
if
ThatCounter > ThisCounter ->
set_trans_tid_serial(ThatCounter + 1);
true ->
ignore
end;
sync_trans_tid_serial(Tid) ->
sync_trans_tid_serial(Tid#tid.counter).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Callback functions from gen_server
%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% {stop, Reason}
%%----------------------------------------------------------------------
init([Parent]) ->
process_flag(trap_exit, true),
mnesia_lib:verbose("~p starting: ~p~n", [?MODULE, self()]),
set(latest_transient_decision, create_transient_decision()),
set(previous_transient_decisions, []),
set(recover_nodes, []),
State = #state{supervisor = Parent},
{ok, State}.
create_transient_decision() ->
?ets_new_table(mnesia_transient_decision, [{keypos, 2}, set, public]).
%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call(init, From, State) when State#state.initiated == false ->
Args = [{keypos, 2}, set, public, named_table],
case mnesia_monitor:use_dir() of
true ->
?ets_new_table(mnesia_decision, Args),
set_trans_tid_serial(0),
TabFile = mnesia_log:decision_tab_file(),
case mnesia_lib:exists(TabFile) of
true ->
load_decision_tab();
false ->
ignore
end,
convert_old(),
mnesia_dumper:opt_dump_log(scan_decisions);
false ->
?ets_new_table(mnesia_decision, Args),
set_trans_tid_serial(0)
end,
handle_early_msgs(State, From);
handle_call(Msg, From, State) when State#state.initiated == false ->
%% Buffer early messages
Msgs = State#state.early_msgs,
{noreply, State#state{early_msgs = [{call, Msg, From} | Msgs]}};
handle_call({disconnect, Node}, _From, State) ->
mnesia_monitor:disconnect(Node),
mnesia_lib:del(recover_nodes, Node),
{reply, ok, State};
handle_call({connect_nodes, Ns}, From, State) ->
%% Determine which nodes we should try to connect
AlreadyConnected = val(recover_nodes),
{_, Nodes} = mnesia_lib:search_delete(node(), Ns),
Check = Nodes -- AlreadyConnected,
case mnesia_monitor:negotiate_protocol(Check) of
busy ->
%% monitor is disconnecting some nodes retry
%% the req (to avoid deadlock).
erlang:send_after(2, self(), {connect_nodes,Ns,From}),
{noreply, State};
[] ->
%% No good noodes to connect to!
%% We can't use reply here because this function can be
%% called from handle_info
gen_server:reply(From, {[], AlreadyConnected}),
{noreply, State};
GoodNodes ->
%% Now we have agreed upon a protocol with some new nodes
%% and we may use them when we recover transactions
mnesia_lib:add_list(recover_nodes, GoodNodes),
cast({announce_all, GoodNodes}),
case get_master_nodes(schema) of
[] ->
Context = starting_partitioned_network,
mnesia_monitor:detect_inconcistency(GoodNodes, Context);
_ -> %% If master_nodes is set ignore old inconsistencies
ignore
end,
gen_server:reply(From, {GoodNodes, AlreadyConnected}),
{noreply,State}
end;
handle_call({what_happened, Default, Tid}, _From, State) ->
sync_trans_tid_serial(Tid),
Outcome = outcome(Tid, Default),
{reply, {ok, Outcome}, State};
handle_call({wait_for_decision, D}, From, State) ->
Recov = val(recover_nodes),
AliveRam = (mnesia_lib:intersect(D#decision.ram_nodes, Recov) -- [node()]),
RemoteDisc = D#decision.disc_nodes -- [node()],
if
AliveRam == [], RemoteDisc == [] ->
%% No more else to wait for and we may safely abort
{reply, {ok, aborted}, State};
true ->
verbose("Transaction ~p is unclear. "
"Wait for disc nodes: ~w ram: ~w~n",
[D#decision.tid, RemoteDisc, AliveRam]),
AliveDisc = mnesia_lib:intersect(RemoteDisc, Recov),
Msg = {what_decision, node(), D},
abcast(AliveRam, Msg),
abcast(AliveDisc, Msg),
case val(max_wait_for_decision) of
infinity ->
ignore;
MaxWait ->
ForceMsg = {force_decision, D#decision.tid},
{ok, _} = timer:send_after(MaxWait, ForceMsg)
end,
State2 = State#state{unclear_pid = From,
unclear_decision = D,
unclear_waitfor = (RemoteDisc ++ AliveRam)},
{noreply, State2}
end;
handle_call({log_mnesia_up, Node}, _From, State) ->
do_log_mnesia_up(Node),
{reply, ok, State};
handle_call({log_mnesia_down, Node}, _From, State) ->
do_log_mnesia_down(Node),
{reply, ok, State};
handle_call({log_master_nodes, Tab, Nodes, UseDir, IsRunning}, _From, State) ->
do_log_master_nodes(Tab, Nodes, UseDir, IsRunning),
{reply, ok, State};
handle_call(sync, _From, State) ->
{reply, ok, State};
handle_call(Msg, _From, State) ->
error("~p got unexpected call: ~p~n", [?MODULE, Msg]),
{noreply, State}.
do_log_mnesia_up(Node) ->
Yoyo = {mnesia_up, Node, Date = date(), Time = time()},
case mnesia_monitor:use_dir() of
true ->
mnesia_log:append(latest_log, Yoyo),
disk_log:sync(latest_log);
false ->
ignore
end,
note_up(Node, Date, Time).
do_log_mnesia_down(Node) ->
Yoyo = {mnesia_down, Node, Date = date(), Time = time()},
case mnesia_monitor:use_dir() of
true ->
mnesia_log:append(latest_log, Yoyo),
disk_log:sync(latest_log);
false ->
ignore
end,
note_down(Node, Date, Time).
do_log_master_nodes(Tab, Nodes, UseDir, IsRunning) ->
Master = {master_nodes, Tab, Nodes},
Res =
case UseDir of
true ->
LogRes = mnesia_log:append(latest_log, Master),
disk_log:sync(latest_log),
LogRes;
false ->
ok
end,
case IsRunning of
yes ->
note_master_nodes(Tab, Nodes);
_NotRunning ->
ignore
end,
Res.
%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast(Msg, State) when State#state.initiated == false ->
%% Buffer early messages
Msgs = State#state.early_msgs,
{noreply, State#state{early_msgs = [{cast, Msg} | Msgs]}};
handle_cast({im_certain, Node, NewD}, State) ->
OldD = decision(NewD#decision.tid),
MergedD = merge_decisions(Node, OldD, NewD),
do_log_decision(MergedD, false, undefined),
{noreply, State};
handle_cast({log_decision, D}, State) ->
do_log_decision(D),
{noreply, State};
handle_cast(allow_garb, State) ->
do_allow_garb(),
{noreply, State};
handle_cast({decisions, Node, Decisions}, State) ->
mnesia_lib:add(recover_nodes, Node),
State2 = add_remote_decisions(Node, Decisions, State),
{noreply, State2};
handle_cast({what_decision, Node, OtherD}, State) ->
Tid = OtherD#decision.tid,
sync_trans_tid_serial(Tid),
Decision =
case decision(Tid) of
no_decision -> OtherD;
MyD when is_record(MyD, decision) -> MyD
end,
announce([Node], [Decision], [], true),
{noreply, State};
handle_cast({mnesia_down, Node}, State) ->
case State#state.unclear_decision of
undefined ->
{noreply, State};
D ->
case lists:member(Node, D#decision.ram_nodes) of
false ->
{noreply, State};
true ->
State2 = add_remote_decision(Node, D, State),
{noreply, State2}
end
end;
handle_cast({announce_all, Nodes}, State) ->
announce_all(Nodes),
{noreply, State};
handle_cast({log_dump_overload, Flag}, State) when is_boolean(Flag) ->
Prev = State#state.log_dump_overload,
Overload = Prev orelse Flag,
mnesia_lib:overload_set(mnesia_dump_log, Overload),
{noreply, State#state{log_dump_overload = Flag}};
handle_cast(Msg, State) ->
error("~p got unexpected cast: ~p~n", [?MODULE, Msg]),
{noreply, State}.
%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
%% No need for buffering
%% handle_info(Msg, State) when State#state.initiated == false ->
%% %% Buffer early messages
%% Msgs = State#state.early_msgs,
%% {noreply, State#state{early_msgs = [{info, Msg} | Msgs]}};
handle_info({connect_nodes, Ns, From}, State) ->
handle_call({connect_nodes,Ns},From,State);
handle_info(check_overload, S) ->
State2 = do_check_overload(S),
next_check_overload(),
{noreply, State2};
handle_info(garb_decisions, State) ->
do_garb_decisions(),
next_garb(),
{noreply, State};
handle_info({force_decision, Tid}, State) ->
%% Enforce a transaction recovery decision,
%% if we still are waiting for the outcome
case State#state.unclear_decision of
U when U#decision.tid == Tid ->
verbose("Decided to abort transaction ~p since "
"max_wait_for_decision has been exceeded~n",
[Tid]),
D = U#decision{outcome = aborted},
State2 = add_remote_decision(node(), D, State),
{noreply, State2};
_ ->
{noreply, State}
end;
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor ->
mnesia_lib:dbg_out("~p was ~p~n",[?MODULE, R]),
{stop, shutdown, State};
handle_info(Msg, State) ->
error("~p got unexpected info: ~p~n", [?MODULE, Msg]),
{noreply, State}.
%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason, State) ->
mnesia_monitor:terminate_proc(?MODULE, Reason, State).
%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Upgrade process when its code is to be changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(_OldVsn, {state,
Supervisor,
Unclear_pid,
Unclear_decision,
Unclear_waitfor,
Tm_queue_len,
Initiated,
Early_msgs
}, _Extra) ->
{ok, #state{supervisor = Supervisor,
unclear_pid = Unclear_pid,
unclear_decision = Unclear_decision,
unclear_waitfor = Unclear_waitfor,
tm_queue_len = Tm_queue_len,
initiated = Initiated,
early_msgs = Early_msgs}};
code_change(_OldVsn, #state{} = State, _Extra) ->
{ok, State}.
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
handle_early_msgs(State, From) ->
Res = do_handle_early_msgs(State#state.early_msgs,
State#state{early_msgs = [],
initiated = true}),
gen_server:reply(From, ok),
Res.
do_handle_early_msgs([Msg | Msgs], State) ->
%% The messages are in reverted order
case do_handle_early_msgs(Msgs, State) of
%% {stop, Reason, Reply, State2} ->
%% {stop, Reason, Reply, State2};
{stop, Reason, State2} ->
{stop, Reason, State2};
{noreply, State2} ->
handle_early_msg(Msg, State2)
end;
do_handle_early_msgs([], State) ->
{noreply, State}.
handle_early_msg({call, Msg, From}, State) ->
case handle_call(Msg, From, State) of
{reply, R, S} ->
gen_server:reply(From, R),
{noreply, S};
Other ->
Other
end;
handle_early_msg({cast, Msg}, State) ->
handle_cast(Msg, State);
handle_early_msg({info, Msg}, State) ->
handle_info(Msg, State).
tabs() ->
Curr = val(latest_transient_decision), % Do not miss any trans even
Prev = val(previous_transient_decisions), % if the tabs are switched
[Curr, mnesia_decision | Prev]. % Ordered by hit probability
decision(Tid) ->
decision(Tid, tabs()).
decision(Tid, [Tab | Tabs]) ->
case catch ?ets_lookup(Tab, Tid) of
[D] when is_record(D, decision) ->
D;
[C] when is_record(C, transient_decision) ->
#decision{tid = C#transient_decision.tid,
outcome = C#transient_decision.outcome,
disc_nodes = [],
ram_nodes = []
};
[] ->
decision(Tid, Tabs);
{'EXIT', _} ->
%% Recently switched transient decision table
decision(Tid, Tabs)
end;
decision(_Tid, []) ->
no_decision.
outcome(Tid, Default) ->
outcome(Tid, Default, tabs()).
outcome(Tid, Default, [Tab | Tabs]) ->
case catch ?ets_lookup_element(Tab, Tid, 3) of
{'EXIT', _} ->
outcome(Tid, Default, Tabs);
Val ->
Val
end;
outcome(_Tid, Default, []) ->
Default.
filter_outcome(Val) ->
case Val of
unclear -> unclear;
aborted -> aborted;
presume_abort -> aborted;
committed -> committed;
pre_commit -> unclear
end.
filter_aborted(D) when D#decision.outcome == presume_abort ->
D#decision{outcome = aborted};
filter_aborted(D) ->
D.
%% Merge old decision D with new (probably remote) decision
merge_decisions(Node, D, NewD0) ->
NewD = filter_aborted(NewD0),
if
D == no_decision, node() /= Node ->
%% We did not know anything about this txn
NewD#decision{disc_nodes = []};
D == no_decision ->
NewD;
is_record(D, decision) ->
DiscNs = D#decision.disc_nodes -- ([node(), Node]),
OldD = filter_aborted(D#decision{disc_nodes = DiscNs}),
%% mnesia_lib:dbg_out("merge ~w: NewD = ~w~n D = ~w~n OldD = ~w~n",
%% [Node, NewD, D, OldD]),
if
OldD#decision.outcome == unclear,
NewD#decision.outcome == unclear ->
D;
OldD#decision.outcome == NewD#decision.outcome ->
%% We have come to the same decision
OldD;
OldD#decision.outcome == committed,
NewD#decision.outcome == aborted ->
%% Interesting! We have already committed,
%% but someone else has aborted. Now we
%% have a nice little inconcistency. The
%% other guy (or some one else) has
%% enforced a recovery decision when
%% max_wait_for_decision was exceeded.
%% We will pretend that we have obeyed
%% the forced recovery decision, but we
%% will also generate an event in case the
%% application wants to do something clever.
Msg = {inconsistent_database, bad_decision, Node},
mnesia_lib:report_system_event(Msg),
OldD#decision{outcome = aborted};
OldD#decision.outcome == aborted ->
%% aborted overrrides anything
OldD#decision{outcome = aborted};
NewD#decision.outcome == aborted ->
%% aborted overrrides anything
OldD#decision{outcome = aborted};
OldD#decision.outcome == committed,
NewD#decision.outcome == unclear ->
%% committed overrides unclear
OldD#decision{outcome = committed};
OldD#decision.outcome == unclear,
NewD#decision.outcome == committed ->
%% committed overrides unclear
OldD#decision{outcome = committed}
end
end.
add_remote_decisions(Node, [D | Tail], State) when is_record(D, decision) ->
State2 = add_remote_decision(Node, D, State),
add_remote_decisions(Node, Tail, State2);
add_remote_decisions(Node, [C | Tail], State)
when is_record(C, transient_decision) ->
D = #decision{tid = C#transient_decision.tid,
outcome = C#transient_decision.outcome,
disc_nodes = [],
ram_nodes = []},
State2 = add_remote_decision(Node, D, State),
add_remote_decisions(Node, Tail, State2);
add_remote_decisions(Node, [{mnesia_down, _, _, _} | Tail], State) ->
add_remote_decisions(Node, Tail, State);
add_remote_decisions(Node, [{trans_tid, serial, Serial} | Tail], State) ->
sync_trans_tid_serial(Serial),
case State#state.unclear_decision of
undefined ->
ignored;
D ->
case lists:member(Node, D#decision.ram_nodes) of
true ->
ignore;
false ->
abcast([Node], {what_decision, node(), D})
end
end,
add_remote_decisions(Node, Tail, State);
add_remote_decisions(_Node, [], State) ->
State.
add_remote_decision(Node, NewD, State) ->
Tid = NewD#decision.tid,
OldD = decision(Tid),
D = merge_decisions(Node, OldD, NewD),
do_log_decision(D, false, undefined),
Outcome = D#decision.outcome,
if
OldD == no_decision ->
ignore;
Outcome == unclear ->
ignore;
true ->
case lists:member(node(), NewD#decision.disc_nodes) or
lists:member(node(), NewD#decision.ram_nodes) of
true ->
tell_im_certain([Node], D);
false ->
ignore
end
end,
case State#state.unclear_decision of
U when U#decision.tid == Tid ->
WaitFor = State#state.unclear_waitfor -- [Node],
if
Outcome == unclear, WaitFor == [] ->
%% Everybody are uncertain, lets abort
NewOutcome = aborted,
CertainD = D#decision{outcome = NewOutcome,
disc_nodes = [],
ram_nodes = []},
tell_im_certain(D#decision.disc_nodes, CertainD),
tell_im_certain(D#decision.ram_nodes, CertainD),
do_log_decision(CertainD, false, undefined),
verbose("Decided to abort transaction ~p "
"since everybody are uncertain ~p~n",
[Tid, CertainD]),
gen_server:reply(State#state.unclear_pid, {ok, NewOutcome}),
State#state{unclear_pid = undefined,
unclear_decision = undefined,
unclear_waitfor = undefined};
Outcome /= unclear ->
verbose("~p told us that transaction ~p was ~p~n",
[Node, Tid, Outcome]),
gen_server:reply(State#state.unclear_pid, {ok, Outcome}),
State#state{unclear_pid = undefined,
unclear_decision = undefined,
unclear_waitfor = undefined};
Outcome == unclear ->
State#state{unclear_waitfor = WaitFor}
end;
_ ->
State
end.
announce_all([]) ->
ok;
announce_all(ToNodes) ->
Tid = trans_tid_serial(),
announce(ToNodes, [{trans_tid,serial,Tid}], [], false).
announce(ToNodes, [Head | Tail], Acc, ForceSend) ->
Acc2 = arrange(ToNodes, Head, Acc, ForceSend),
announce(ToNodes, Tail, Acc2, ForceSend);
announce(_ToNodes, [], Acc, _ForceSend) ->
send_decisions(Acc).
send_decisions([{Node, Decisions} | Tail]) ->
abcast([Node], {decisions, node(), Decisions}),
send_decisions(Tail);
send_decisions([]) ->
ok.
arrange([To | ToNodes], D, Acc, ForceSend) when is_record(D, decision) ->
NeedsAdd = (ForceSend or
lists:member(To, D#decision.disc_nodes) or
lists:member(To, D#decision.ram_nodes)),
case NeedsAdd of
true ->
Acc2 = add_decision(To, D, Acc),
arrange(ToNodes, D, Acc2, ForceSend);
false ->
arrange(ToNodes, D, Acc, ForceSend)
end;
arrange([To | ToNodes], {trans_tid, serial, Serial}, Acc, ForceSend) ->
%% Do the lamport thing plus release the others
%% from uncertainity.
Acc2 = add_decision(To, {trans_tid, serial, Serial}, Acc),
arrange(ToNodes, {trans_tid, serial, Serial}, Acc2, ForceSend);
arrange([], _Decision, Acc, _ForceSend) ->
Acc.
add_decision(Node, Decision, [{Node, Decisions} | Tail]) ->
[{Node, [Decision | Decisions]} | Tail];
add_decision(Node, Decision, [Head | Tail]) ->
[Head | add_decision(Node, Decision, Tail)];
add_decision(Node, Decision, []) ->
[{Node, [Decision]}].