%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1996-2013
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
%%
-module(mnesia_checkpoint).
%% TM callback interface
-export([
tm_add_copy/2,
tm_change_table_copy_type/3,
tm_del_copy/2,
tm_mnesia_down/1,
tm_prepare/1,
tm_retain/4,
tm_retain/5,
tm_enter_pending/1,
tm_enter_pending/3,
tm_exit_pending/1
]).
%% Public interface
-export([
activate/1,
checkpoints/0,
deactivate/1,
deactivate/2,
iterate/6,
most_local_node/2,
really_retain/2,
stop/0,
stop_iteration/1,
tables_and_cookie/1
]).
%% Internal
-export([
call/2,
cast/2,
init/1,
remote_deactivate/1,
start/1
]).
%% sys callback interface
-export([
system_code_change/4,
system_continue/3,
system_terminate/4
]).
-include("mnesia.hrl").
-import(mnesia_lib, [add/2, del/2, set/2, unset/1]).
-import(mnesia_lib, [dbg_out/2]).
-record(checkpoint_args, {name = {erlang:unique_integer([positive]), node()},
allow_remote = true,
ram_overrides_dump = false,
nodes = [],
node = node(),
now, %% unused
cookie = ?unique_cookie,
min = [],
max = [],
pending_tab,
wait_for_old, % Initially undefined then List
is_activated = false,
ignore_new = [],
retainers = [],
iterators = [],
supervisor,
pid
}).
-record(retainer, {cp_name, tab_name, store, writers = [], really_retain = true}).
-record(iter, {tab_name, oid_tab, main_tab, retainer_tab, source, val, pid}).
-record(pending, {tid, disc_nodes = [], ram_nodes = []}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% TM callback functions
stop() ->
lists:foreach(fun(Name) -> call(Name, stop) end,
checkpoints()),
ok.
tm_prepare(Cp) when is_record(Cp, checkpoint_args) ->
Name = Cp#checkpoint_args.name,
case lists:member(Name, checkpoints()) of
false ->
start_retainer(Cp);
true ->
{error, {already_exists, Name, node()}}
end.
tm_mnesia_down(Node) ->
lists:foreach(fun(Name) -> cast(Name, {mnesia_down, Node}) end,
checkpoints()).
%% Returns pending
tm_enter_pending(Tid, DiscNs, RamNs) ->
Pending = #pending{tid = Tid, disc_nodes = DiscNs, ram_nodes = RamNs},
tm_enter_pending(Pending).
tm_enter_pending(Pending) ->
PendingTabs = val(pending_checkpoints),
tm_enter_pending(PendingTabs, Pending).
tm_enter_pending([], Pending) ->
Pending;
tm_enter_pending([Tab | Tabs], Pending) ->
%% io:format("Add ~p ~p ~p~n",[Tab, Pending, hd(tl(element(2, process_info(self(), current_stacktrace))))]),
?SAFE(?ets_insert(Tab, Pending)),
tm_enter_pending(Tabs, Pending).
tm_exit_pending(Tid) ->
Pids = val(pending_checkpoint_pids),
tm_exit_pending(Pids, Tid).
tm_exit_pending([], Tid) ->
Tid;
tm_exit_pending([Pid | Pids], Tid) ->
Pid ! {self(), {exit_pending, Tid}},
tm_exit_pending(Pids, Tid).
enter_still_pending([Tid | Tids], Tab) ->
?ets_insert(Tab, #pending{tid = Tid}),
enter_still_pending(Tids, Tab);
enter_still_pending([], _Tab) ->
ok.
%% Looks up checkpoints for functions in mnesia_tm.
tm_retain(Tid, Tab, Key, Op) ->
case val({Tab, commit_work}) of
[{checkpoints, Checkpoints} | _ ] ->
tm_retain(Tid, Tab, Key, Op, Checkpoints);
_ ->
undefined
end.
tm_retain(Tid, Tab, Key, Op, Checkpoints) ->
case Op of
clear_table ->
OldRecs = mnesia_lib:db_match_object(Tab, '_'),
send_group_retain(OldRecs, Checkpoints, Tid, Tab, []),
OldRecs;
_ ->
OldRecs = mnesia_lib:db_get(Tab, Key),
send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}),
OldRecs
end.
send_group_retain([Rec | Recs], Checkpoints, Tid, Tab, [PrevRec | PrevRecs])
when element(2, Rec) /= element(2, PrevRec) ->
Key = element(2, PrevRec),
OldRecs = lists:reverse([PrevRec | PrevRecs]),
send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}),
send_group_retain(Recs, Checkpoints, Tid, Tab, [Rec]);
send_group_retain([Rec | Recs], Checkpoints, Tid, Tab, Acc) ->
send_group_retain(Recs, Checkpoints, Tid, Tab, [Rec | Acc]);
send_group_retain([], Checkpoints, Tid, Tab, [PrevRec | PrevRecs]) ->
Key = element(2, PrevRec),
OldRecs = lists:reverse([PrevRec | PrevRecs]),
send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}),
ok;
send_group_retain([], _Checkpoints, _Tid, _Tab, []) ->
ok.
send_retain([Name | Names], Msg) ->
cast(Name, Msg),
send_retain(Names, Msg);
send_retain([], _Msg) ->
ok.
tm_add_copy(Tab, Node) when Node /= node() ->
case val({Tab, commit_work}) of
[{checkpoints, Checkpoints} | _ ] ->
Fun = fun(Name) -> call(Name, {add_copy, Tab, Node}) end,
map_call(Fun, Checkpoints, ok);
_ ->
ok
end.
tm_del_copy(Tab, Node) when Node == node() ->
mnesia_subscr:unsubscribe_table(Tab),
case val({Tab, commit_work}) of
[{checkpoints, Checkpoints} | _ ] ->
Fun = fun(Name) -> call(Name, {del_copy, Tab, Node}) end,
map_call(Fun, Checkpoints, ok);
_ ->
ok
end.
tm_change_table_copy_type(Tab, From, To) ->
case val({Tab, commit_work}) of
[{checkpoints, Checkpoints} | _ ] ->
Fun = fun(Name) -> call(Name, {change_copy, Tab, From, To}) end,
map_call(Fun, Checkpoints, ok);
_ ->
ok
end.
map_call(Fun, [Name | Names], Res) ->
case Fun(Name) of
ok ->
map_call(Fun, Names, Res);
{error, {no_exists, Name}} ->
map_call(Fun, Names, Res);
{error, Reason} ->
%% BUGBUG: We may end up with some checkpoint retainers
%% too much in the add_copy case. How do we remove them?
map_call(Fun, Names, {error, Reason})
end;
map_call(_Fun, [], Res) ->
Res.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Public functions
deactivate(Name) ->
case call(Name, get_checkpoint) of
{error, Reason} ->
{error, Reason};
Cp ->
deactivate(Cp#checkpoint_args.nodes, Name)
end.
deactivate(Nodes, Name) ->
rpc:multicall(Nodes, ?MODULE, remote_deactivate, [Name]),
ok.
remote_deactivate(Name) ->
call(Name, deactivate).
checkpoints() -> val(checkpoints).
tables_and_cookie(Name) ->
case call(Name, get_checkpoint) of
{error, Reason} ->
{error, Reason};
Cp ->
Tabs = Cp#checkpoint_args.min ++ Cp#checkpoint_args.max,
Cookie = Cp#checkpoint_args.cookie,
{ok, Tabs, Cookie}
end.
most_local_node(Name, Tab) ->
case ?catch_val({Tab, {retainer, Name}}) of
{'EXIT', _} ->
{error, {"No retainer attached to table", [Tab, Name]}};
R ->
Writers = R#retainer.writers,
LocalWriter = lists:member(node(), Writers),
if
LocalWriter == true ->
{ok, node()};
Writers /= [] ->
{ok, hd(Writers)};
true ->
{error, {"No retainer attached to table", [Tab, Name]}}
end
end.
really_retain(Name, Tab) ->
R = val({Tab, {retainer, Name}}),
R#retainer.really_retain.
%% Activate a checkpoint.
%%
%% A checkpoint is a transaction consistent state that may be used to
%% perform a distributed backup or to rollback the involved tables to
%% their old state. Backups may also be used to restore tables to
%% their old state. Args is a list of the following tuples:
%%
%% {name, Name}
%% Name of checkpoint. Each checkpoint must have a name which
%% is unique on the reachable nodes. The name may be reused when
%% the checkpoint has been deactivated.
%% By default a probably unique name is generated.
%% Multiple checkpoints may be set on the same table.
%%
%% {allow_remote, Bool}
%% false means that all retainers must be local. If the
%% table does not reside locally, the checkpoint fails.
%% true allows retainers on other nodes.
%%
%% {min, MinTabs}
%% Minimize redundancy and only keep checkpoint info together with
%% one replica, preferrably at the local node. If any node involved
%% the checkpoint goes down, the checkpoint is deactivated.
%%
%% {max, MaxTabs}
%% Maximize redundancy and keep checkpoint info together with all
%% replicas. The checkpoint becomes more fault tolerant if the
%% tables has several replicas. When new replicas are added, they
%% will also get a retainer attached to them.
%%
%% {ram_overrides_dump, Bool}
%% {ram_overrides_dump, Tabs}
%% Only applicable for ram_copies. Bool controls which versions of
%% the records that should be included in the checkpoint state.
%% true means that the latest comitted records in ram (i.e. the
%% records that the application accesses) should be included
%% in the checkpoint. false means that the records dumped to
%% dat-files (the records that will be loaded at startup) should
%% be included in the checkpoint. Tabs is a list of tables.
%% Default is false.
%%
%% {ignore_new, TidList}
%% Normally we wait for all pending transactions to complete
%% before we allow iteration over the checkpoint. But in order
%% to cope with checkpoint activation inside a transaction that
%% currently prepares commit (mnesia_init:get_net_work_copy) we
%% need to have the ability to ignore the enclosing transaction.
%% We do not wait for the transactions in TidList to end. The
%% transactions in TidList are regarded as newer than the checkpoint.
activate(Args) ->
case args2cp(Args) of
{ok, Cp} ->
do_activate(Cp);
{error, Reason} ->
{error, Reason}
end.
args2cp(Args) when is_list(Args)->
try lists:foldl(fun check_arg/2, #checkpoint_args{}, Args) of
Cp ->
case check_tables(Cp) of
{error, Reason} ->
{error, Reason};
{ok, Overriders, AllTabs} ->
arrange_retainers(Cp, Overriders, AllTabs)
end
catch exit:Reason ->
{error, Reason};
error:Reason ->
{error, Reason}
end;
args2cp(Args) ->
{error, {badarg, Args}}.
check_arg({name, Name}, Cp) ->
case lists:member(Name, checkpoints()) of
true ->
exit({already_exists, Name});
false ->
try
[_|_] = tab2retainer({foo, Name}),
Cp#checkpoint_args{name = Name}
catch _:_ ->
exit({badarg, Name})
end
end;
check_arg({allow_remote, true}, Cp) ->
Cp#checkpoint_args{allow_remote = true};
check_arg({allow_remote, false}, Cp) ->
Cp#checkpoint_args{allow_remote = false};
check_arg({ram_overrides_dump, true}, Cp) ->
Cp#checkpoint_args{ram_overrides_dump = true};
check_arg({ram_overrides_dump, false}, Cp) ->
Cp#checkpoint_args{ram_overrides_dump = false};
check_arg({ram_overrides_dump, Tabs}, Cp) when is_list(Tabs) ->
Cp#checkpoint_args{ram_overrides_dump = Tabs};
check_arg({min, Tabs}, Cp) when is_list(Tabs) ->
Cp#checkpoint_args{min = Tabs};
check_arg({max, Tabs}, Cp) when is_list(Tabs) ->
Cp#checkpoint_args{max = Tabs};
check_arg({ignore_new, Tids}, Cp) when is_list(Tids) ->
Cp#checkpoint_args{ignore_new = Tids};
check_arg(Arg, _) ->
exit({badarg, Arg}).
check_tables(Cp) ->
Min = Cp#checkpoint_args.min,
Max = Cp#checkpoint_args.max,
AllTabs = Min ++ Max,
DoubleTabs = [T || T <- Min, lists:member(T, Max)],
Overriders = Cp#checkpoint_args.ram_overrides_dump,
if
DoubleTabs /= [] ->
{error, {combine_error, Cp#checkpoint_args.name,
[{min, DoubleTabs}, {max, DoubleTabs}]}};
Min == [], Max == [] ->
{error, {combine_error, Cp#checkpoint_args.name,
[{min, Min}, {max, Max}]}};
Overriders == false ->
{ok, [], AllTabs};
Overriders == true ->
{ok, AllTabs, AllTabs};
is_list(Overriders) ->
case [T || T <- Overriders, not lists:member(T, Min)] of
[] ->
case [T || T <- Overriders, not lists:member(T, Max)] of
[] ->
{ok, Overriders, AllTabs};
Outsiders ->
{error, {combine_error, Cp#checkpoint_args.name,
[{ram_overrides_dump, Outsiders},
{max, Outsiders}]}}
end;
Outsiders ->
{error, {combine_error, Cp#checkpoint_args.name,
[{ram_overrides_dump, Outsiders},
{min, Outsiders}]}}
end
end.
arrange_retainers(Cp, Overriders, AllTabs) ->
R = #retainer{cp_name = Cp#checkpoint_args.name},
try [R#retainer{tab_name = Tab,
writers = select_writers(Cp, Tab)}
|| Tab <- AllTabs] of
Retainers ->
{ok, Cp#checkpoint_args{ram_overrides_dump = Overriders,
retainers = Retainers,
nodes = writers(Retainers)}}
catch throw:Reason ->
{error, Reason}
end.
select_writers(Cp, Tab) ->
case filter_remote(Cp, val({Tab, active_replicas})) of
[] ->
throw({"Cannot prepare checkpoint (replica not available)",
[Tab, Cp#checkpoint_args.name]});
Writers ->
This = node(),
case {lists:member(Tab, Cp#checkpoint_args.max),
lists:member(This, Writers)} of
{true, _} -> Writers; % Max
{false, true} -> [This];
{false, false} -> [hd(Writers)]
end
end.
filter_remote(Cp, Writers) when Cp#checkpoint_args.allow_remote == true ->
Writers;
filter_remote(_Cp, Writers) ->
This = node(),
case lists:member(This, Writers) of
true -> [This];
false -> []
end.
writers(Retainers) ->
Fun = fun(R, Acc) -> R#retainer.writers ++ Acc end,
Writers = lists:foldl(Fun, [], Retainers),
mnesia_lib:uniq(Writers).
do_activate(Cp) ->
Name = Cp#checkpoint_args.name,
Nodes = Cp#checkpoint_args.nodes,
case mnesia_tm:prepare_checkpoint(Nodes, Cp) of
{Replies, []} ->
check_prep(Replies, Name, Nodes, Cp#checkpoint_args.ignore_new);
{_, BadNodes} ->
{error, {"Cannot prepare checkpoint (bad nodes)",
[Name, BadNodes]}}
end.
check_prep([{ok, Name, IgnoreNew, _Node} | Replies], Name, Nodes, IgnoreNew) ->
check_prep(Replies, Name, Nodes, IgnoreNew);
check_prep([{error, Reason} | _Replies], Name, _Nodes, _IgnoreNew) ->
{error, {"Cannot prepare checkpoint (bad reply)",
[Name, Reason]}};
check_prep([{badrpc, Reason} | _Replies], Name, _Nodes, _IgnoreNew) ->
{error, {"Cannot prepare checkpoint (badrpc)",
[Name, Reason]}};
check_prep([], Name, Nodes, IgnoreNew) ->
collect_pending(Name, Nodes, IgnoreNew).
collect_pending(Name, Nodes, IgnoreNew) ->
case rpc:multicall(Nodes, ?MODULE, call, [Name, collect_pending]) of
{Replies, []} ->
try
UnionTab = ?ets_new_table(mnesia_union, [bag]),
compute_union(Replies, Nodes, Name, UnionTab, IgnoreNew)
catch error:Reason -> %% system limit
Msg = "Cannot create an ets table pending union",
{error, {system_limit, Msg, Reason}}
end;
{_, BadNodes} ->
deactivate(Nodes, Name),
{error, {"Cannot collect from pending checkpoint", Name, BadNodes}}
end.
compute_union([{ok, Pending} | Replies], Nodes, Name, UnionTab, IgnoreNew) ->
add_pending(Pending, UnionTab),
compute_union(Replies, Nodes, Name, UnionTab, IgnoreNew);
compute_union([{error, Reason} | _Replies], Nodes, Name, UnionTab, _IgnoreNew) ->
deactivate(Nodes, Name),
?ets_delete_table(UnionTab),
{error, Reason};
compute_union([{badrpc, Reason} | _Replies], Nodes, Name, UnionTab, _IgnoreNew) ->
deactivate(Nodes, Name),
?ets_delete_table(UnionTab),
{error, {badrpc, Reason}};
compute_union([], Nodes, Name, UnionTab, IgnoreNew) ->
send_activate(Nodes, Nodes, Name, UnionTab, IgnoreNew).
add_pending([P | Pending], UnionTab) ->
add_pending_node(P#pending.disc_nodes, P#pending.tid, UnionTab),
add_pending_node(P#pending.ram_nodes, P#pending.tid, UnionTab),
add_pending(Pending, UnionTab);
add_pending([], _UnionTab) ->
ok.
add_pending_node([Node | Nodes], Tid, UnionTab) ->
?ets_insert(UnionTab, {Node, Tid}),
add_pending_node(Nodes, Tid, UnionTab);
add_pending_node([], _Tid, _UnionTab) ->
ok.
send_activate([Node | Nodes], AllNodes, Name, UnionTab, IgnoreNew) ->
Pending = [Tid || {_, Tid} <- ?ets_lookup(UnionTab, Node),
not lists:member(Tid, IgnoreNew)],
case rpc:call(Node, ?MODULE, call, [Name, {activate, Pending}]) of
activated ->
send_activate(Nodes, AllNodes, Name, UnionTab, IgnoreNew);
{badrpc, Reason} ->
deactivate(Nodes, Name),
?ets_delete_table(UnionTab),
{error, {"Activation failed (bad node)", Name, Node, Reason}};
{error, Reason} ->
deactivate(Nodes, Name),
?ets_delete_table(UnionTab),
{error, {"Activation failed", Name, Node, Reason}}
end;
send_activate([], AllNodes, Name, UnionTab, _IgnoreNew) ->
?ets_delete_table(UnionTab),
{ok, Name, AllNodes}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Checkpoint server
cast(Name, Msg) ->
case ?catch_val({checkpoint, Name}) of
{'EXIT', _} ->
{error, {no_exists, Name}};
Pid when is_pid(Pid) ->
Pid ! {self(), Msg},
{ok, Pid}
end.
call(Name, Msg) ->
case ?catch_val({checkpoint, Name}) of
{'EXIT', _} ->
{error, {no_exists, Name}};
Pid when is_pid(Pid) ->
Monitor = erlang:monitor(process, Pid), %catch link(Pid), % Always local
Pid ! {self(), Msg},
Self = self(),
receive
{'EXIT', Pid, Reason} ->
{error, {"Got exit", [Name, Reason]}};
{'DOWN', Monitor, _, Pid, Reason} ->
{error, {"Got exit", [Name, Reason]}};
{Name, Self, Reply} ->
erlang:demonitor(Monitor),
Reply
end;
Error ->
Error
end.
abcast(Nodes, Name, Msg) ->
rpc:eval_everywhere(Nodes, ?MODULE, cast, [Name, Msg]).
reply(nopid, _Name, _Reply) ->
ignore;
reply(ReplyTo, Name, Reply) ->
ReplyTo ! {Name, ReplyTo, Reply}.
%% Returns {ok, NewCp} or {error, Reason}
start_retainer(Cp) ->
% Will never be restarted
Name = Cp#checkpoint_args.name,
case supervisor:start_child(mnesia_checkpoint_sup, [Cp]) of
{ok, _Pid} ->
{ok, Name, Cp#checkpoint_args.ignore_new, node()};
{error, Reason} ->
{error, {"Cannot create checkpoint retainer",
Name, node(), Reason}}
end.
start(Cp) ->
Name = Cp#checkpoint_args.name,
Args = [Cp#checkpoint_args{supervisor = self()}],
mnesia_monitor:start_proc({?MODULE, Name}, ?MODULE, init, Args).
init(Cp) ->
process_flag(trap_exit, true),
process_flag(priority, high), %% Needed dets files might starve the system
Name = Cp#checkpoint_args.name,
Props = [set, public, {keypos, 2}],
try ?ets_new_table(mnesia_pending_checkpoint, Props) of
PendingTab ->
Rs = [prepare_tab(Cp, R) || R <- Cp#checkpoint_args.retainers],
Cp2 = Cp#checkpoint_args{retainers = Rs,
pid = self(),
pending_tab = PendingTab},
add(pending_checkpoint_pids, self()),
add(pending_checkpoints, PendingTab),
set({checkpoint, Name}, self()),
add(checkpoints, Name),
dbg_out("Checkpoint ~p (~p) started~n", [Name, self()]),
proc_lib:init_ack(Cp2#checkpoint_args.supervisor, {ok, self()}),
retainer_loop(Cp2)
catch error:Reason -> %% system limit
Msg = "Cannot create an ets table for pending transactions",
Error = {error, {system_limit, Name, Msg, Reason}},
proc_lib:init_ack(Cp#checkpoint_args.supervisor, Error)
end.
prepare_tab(Cp, R) ->
Tab = R#retainer.tab_name,
prepare_tab(Cp, R, val({Tab, storage_type})).
prepare_tab(Cp, R, Storage) ->
Tab = R#retainer.tab_name,
Name = R#retainer.cp_name,
case lists:member(node(), R#retainer.writers) of
true ->
R2 = retainer_create(Cp, R, Tab, Name, Storage),
set({Tab, {retainer, Name}}, R2),
%% Keep checkpoint info for table_info & mnesia_session
add({Tab, checkpoints}, Name),
add_chkp_info(Tab, Name),
R2;
false ->
set({Tab, {retainer, Name}}, R#retainer{store = undefined}),
R
end.
add_chkp_info(Tab, Name) ->
case val({Tab, commit_work}) of
[{checkpoints, OldList} | CommitList] ->
case lists:member(Name, OldList) of
true ->
ok;
false ->
NewC = [{checkpoints, [Name | OldList]} | CommitList],
mnesia_lib:set({Tab, commit_work}, NewC)
end;
CommitList ->
Chkp = {checkpoints, [Name]},
%% OBS checkpoints needs to be first in the list!
mnesia_lib:set({Tab, commit_work}, [Chkp | CommitList])
end.
tab2retainer({Tab, Name}) ->
FlatName = lists:flatten(io_lib:write(Name)),
mnesia_lib:dir(lists:concat([?MODULE, "_", Tab, "_", FlatName, ".RET"])).
retainer_create(_Cp, R, Tab, Name, disc_only_copies) ->
Fname = tab2retainer({Tab, Name}),
file:delete(Fname),
Args = [{file, Fname}, {type, set}, {keypos, 2}, {repair, false}],
{ok, _} = mnesia_lib:dets_sync_open({Tab, Name}, Args),
dbg_out("Checkpoint retainer created ~p ~p~n", [Name, Tab]),
R#retainer{store = {dets, {Tab, Name}}, really_retain = true};
retainer_create(Cp, R, Tab, Name, Storage) ->
T = ?ets_new_table(mnesia_retainer, [set, public, {keypos, 2}]),
Overriders = Cp#checkpoint_args.ram_overrides_dump,
ReallyR = R#retainer.really_retain,
ReallyCp = lists:member(Tab, Overriders),
ReallyR2 = prepare_ram_tab(Tab, T, Storage, ReallyR, ReallyCp),
dbg_out("Checkpoint retainer created ~p ~p~n", [Name, Tab]),
R#retainer{store = {ets, T}, really_retain = ReallyR2}.
%% Copy the dumped table into retainer if needed
%% If the really_retain flag already has been set to false,
%% it should remain false even if we change storage type
%% while the checkpoint is activated.
prepare_ram_tab(Tab, T, ram_copies, true, false) ->
Fname = mnesia_lib:tab2dcd(Tab),
case mnesia_lib:exists(Fname) of
true ->
Log = mnesia_log:open_log(prepare_ram_tab,
mnesia_log:dcd_log_header(),
Fname, true,
mnesia_monitor:get_env(auto_repair),
read_only),
Add = fun(Rec) ->
Key = element(2, Rec),
Recs =
case ?ets_lookup(T, Key) of
[] -> [];
[{_, _, Old}] -> Old
end,
?ets_insert(T, {Tab, Key, [Rec | Recs]}),
continue
end,
traverse_dcd(mnesia_log:chunk_log(Log, start), Log, Add),
mnesia_log:close_log(Log);
false ->
ok
end,
false;
prepare_ram_tab(_, _, _, ReallyRetain, _) ->
ReallyRetain.
traverse_dcd({Cont, [LogH | Rest]}, Log, Fun)
when is_record(LogH, log_header),
LogH#log_header.log_kind == dcd_log,
LogH#log_header.log_version >= "1.0" ->
traverse_dcd({Cont, Rest}, Log, Fun); %% BUGBUG Error handling repaired files
traverse_dcd({Cont, Recs}, Log, Fun) -> %% trashed data??
lists:foreach(Fun, Recs),
traverse_dcd(mnesia_log:chunk_log(Log, Cont), Log, Fun);
traverse_dcd(eof, _Log, _Fun) ->
ok.
retainer_get({ets, Store}, Key) -> ?ets_lookup(Store, Key);
retainer_get({dets, Store}, Key) -> dets:lookup(Store, Key).
retainer_put({ets, Store}, Val) -> ?ets_insert(Store, Val);
retainer_put({dets, Store}, Val) -> dets:insert(Store, Val).
retainer_first({ets, Store}) -> ?ets_first(Store);
retainer_first({dets, Store}) -> dets:first(Store).
retainer_next({ets, Store}, Key) -> ?ets_next(Store, Key);
retainer_next({dets, Store}, Key) -> dets:next(Store, Key).
%% retainer_next_slot(Tab, Pos) ->
%% case retainer_slot(Tab, Pos) of
%% '$end_of_table' ->
%% '$end_of_table';
%% [] ->
%% retainer_next_slot(Tab, Pos + 1);
%% Recs when is_list(Recs) ->
%% {Pos, Recs}
%% end.
%%
%% retainer_slot({ets, Store}, Pos) -> ?ets_next(Store, Pos);
%% retainer_slot({dets, Store}, Pos) -> dets:slot(Store, Pos).
retainer_fixtable(Tab, Bool) when is_atom(Tab) ->
mnesia_lib:db_fixtable(val({Tab, storage_type}), Tab, Bool);
retainer_fixtable({ets, Tab}, Bool) ->
mnesia_lib:db_fixtable(ram_copies, Tab, Bool);
retainer_fixtable({dets, Tab}, Bool) ->
mnesia_lib:db_fixtable(disc_only_copies, Tab, Bool).
retainer_delete({ets, Store}) ->
?ets_delete_table(Store);
retainer_delete({dets, Store}) ->
mnesia_lib:dets_sync_close(Store),
Fname = tab2retainer(Store),
file:delete(Fname).
retainer_loop(Cp = #checkpoint_args{is_activated=false, name=Name}) ->
receive
%% Adm
{From, {activate, Pending}} ->
StillPending = mnesia_recover:still_pending(Pending),
enter_still_pending(StillPending, Cp#checkpoint_args.pending_tab),
Local = [Tid || #tid{pid=Pid} = Tid <- StillPending, node(Pid) =/= node()],
Cp2 = maybe_activate(Cp#checkpoint_args{wait_for_old = Local}),
reply(From, Name, activated),
retainer_loop(Cp2);
{_From, {exit_pending, Tid}} when is_list(Cp#checkpoint_args.wait_for_old) ->
StillPending = lists:delete(Tid, Cp#checkpoint_args.wait_for_old),
Cp2 = Cp#checkpoint_args{wait_for_old = StillPending},
Cp3 = maybe_activate(Cp2),
retainer_loop(Cp3);
{From, deactivate} ->
do_stop(Cp),
reply(From, Name, deactivated),
unlink(From),
exit(shutdown);
{From, get_checkpoint} ->
reply(From, Name, Cp),
retainer_loop(Cp);
{_From, {add_retainer, R, Node}} ->
Cp2 = do_add_retainer(Cp, R, Node),
retainer_loop(Cp2);
{From, collect_pending} ->
PendingTab = Cp#checkpoint_args.pending_tab,
del(pending_checkpoints, PendingTab),
Pending = ?ets_match_object(PendingTab, '_'),
reply(From, Name, {ok, Pending}),
retainer_loop(Cp);
{_From, {mnesia_down, Node}} ->
Cp2 = do_del_retainers(Cp, Node),
retainer_loop(Cp2);
{'EXIT', Parent, _} when Parent == Cp#checkpoint_args.supervisor ->
%% do_stop(Cp),
%% assume that entire Mnesia is terminating
exit(shutdown);
{'EXIT', From, _Reason} ->
Iters = [Iter || Iter <- Cp#checkpoint_args.iterators,
check_iter(From, Iter)],
retainer_loop(Cp#checkpoint_args{iterators = Iters});
{system, From, Msg} ->
dbg_out("~p got {system, ~p, ~p}~n", [?MODULE, From, Msg]),
sys:handle_system_msg(Msg, From, Cp#checkpoint_args.supervisor,
?MODULE, [], Cp)
end;
retainer_loop(Cp = #checkpoint_args{name=Name}) ->
receive
{_From, {retain, Tid, Tab, Key, OldRecs}} ->
R = val({Tab, {retainer, Name}}),
PendingTab = Cp#checkpoint_args.pending_tab,
case R#retainer.really_retain of
true ->
Store = R#retainer.store,
try true = ets:member(PendingTab, Tid),
%% io:format("CP: ~p ~p ~p ~p~n",[true, Tab, Key, Tid]),
case retainer_get(Store, Key) of
[] -> ignore;
_ -> ets:delete(element(2,Store), Key)
end
catch _:_ ->
%% io:format("CP: ~p ~p ~p ~p~n",[false, Tab, Key, Tid]),
case retainer_get(Store, Key) of
[] -> retainer_put(Store, {Tab, Key, OldRecs});
_ -> already_retained
end
end;
false ->
ignore
end,
retainer_loop(Cp);
%% Adm
{From, get_checkpoint} ->
reply(From, Name, Cp),
retainer_loop(Cp);
{From, {add_copy, Tab, Node}} ->
{Res, Cp2} = do_add_copy(Cp, Tab, Node),
reply(From, Name, Res),
retainer_loop(Cp2);
{From, {del_copy, Tab, Node}} ->
Cp2 = do_del_copy(Cp, Tab, Node),
reply(From, Name, ok),
retainer_loop(Cp2);
{From, {change_copy, Tab, From, To}} ->
Cp2 = do_change_copy(Cp, Tab, From, To),
reply(From, Name, ok),
retainer_loop(Cp2);
{_From, {add_retainer, R, Node}} ->
Cp2 = do_add_retainer(Cp, R, Node),
retainer_loop(Cp2);
{_From, {del_retainer, R, Node}} ->
Cp2 = do_del_retainer(Cp, R, Node),
retainer_loop(Cp2);
%% Iteration
{From, {iter_begin, Iter}} ->
Cp2 = iter_begin(Cp, From, Iter),
retainer_loop(Cp2);
{From, {iter_end, Iter}} ->
retainer_fixtable(Iter#iter.oid_tab, false),
Iters = Cp#checkpoint_args.iterators -- [Iter],
reply(From, Name, ok),
retainer_loop(Cp#checkpoint_args{iterators = Iters});
{_From, {exit_pending, _Tid}} ->
retainer_loop(Cp);
{From, deactivate} ->
do_stop(Cp),
reply(From, Name, deactivated),
unlink(From),
exit(shutdown);
{_From, {mnesia_down, Node}} ->
Cp2 = do_del_retainers(Cp, Node),
retainer_loop(Cp2);
{'EXIT', Parent, _} when Parent == Cp#checkpoint_args.supervisor ->
%% do_stop(Cp),
%% assume that entire Mnesia is terminating
exit(shutdown);
{'EXIT', From, _Reason} ->
Iters = [Iter || Iter <- Cp#checkpoint_args.iterators,
check_iter(From, Iter)],
retainer_loop(Cp#checkpoint_args{iterators = Iters});
{system, From, Msg} ->
dbg_out("~p got {system, ~p, ~p}~n", [?MODULE, From, Msg]),
sys:handle_system_msg(Msg, From, Cp#checkpoint_args.supervisor,
?MODULE, [], Cp);
Msg ->
dbg_out("~p got ~p~n", [?MODULE, Msg])
end.
maybe_activate(Cp)
when Cp#checkpoint_args.wait_for_old == [],
Cp#checkpoint_args.is_activated == false ->
Cp#checkpoint_args{%% pending_tab = undefined,
is_activated = true};
maybe_activate(Cp) ->
Cp.
iter_begin(Cp, From, Iter) ->
Name = Cp#checkpoint_args.name,
R = val({Iter#iter.tab_name, {retainer, Name}}),
Iter2 = init_tabs(R, Iter),
Iter3 = Iter2#iter{pid = From},
retainer_fixtable(Iter3#iter.oid_tab, true),
Iters = [Iter3 | Cp#checkpoint_args.iterators],
reply(From, Name, {ok, Iter3, self()}),
Cp#checkpoint_args{iterators = Iters}.
do_stop(Cp) ->
Name = Cp#checkpoint_args.name,
del(pending_checkpoints, Cp#checkpoint_args.pending_tab),
del(pending_checkpoint_pids, self()),
del(checkpoints, Name),
unset({checkpoint, Name}),
lists:foreach(fun deactivate_tab/1, Cp#checkpoint_args.retainers),
Iters = Cp#checkpoint_args.iterators,
lists:foreach(fun(I) -> retainer_fixtable(I#iter.oid_tab, false) end, Iters).
deactivate_tab(R) ->
Name = R#retainer.cp_name,
Tab = R#retainer.tab_name,
try
Active = lists:member(node(), R#retainer.writers),
case R#retainer.store of
undefined ->
ignore;
Store when Active == true ->
retainer_delete(Store);
_ ->
ignore
end,
unset({Tab, {retainer, Name}}),
del({Tab, checkpoints}, Name), %% Keep checkpoint info for table_info & mnesia_session
del_chkp_info(Tab, Name)
catch _:_ -> ignore
end.
del_chkp_info(Tab, Name) ->
case val({Tab, commit_work}) of
[{checkpoints, ChkList} | Rest] ->
case lists:delete(Name, ChkList) of
[] ->
%% The only checkpoint was deleted
mnesia_lib:set({Tab, commit_work}, Rest);
NewList ->
mnesia_lib:set({Tab, commit_work},
[{checkpoints, NewList} | Rest])
end;
_ -> ignore
end.
do_del_retainers(Cp, Node) ->
Rs = [do_del_retainer2(Cp, R, Node) || R <- Cp#checkpoint_args.retainers],
Cp#checkpoint_args{retainers = Rs, nodes = writers(Rs)}.
do_del_retainer2(Cp, R, Node) ->
Writers = R#retainer.writers -- [Node],
R2 = R#retainer{writers = Writers},
set({R2#retainer.tab_name, {retainer, R2#retainer.cp_name}}, R2),
if
Writers == [] ->
Event = {mnesia_checkpoint_deactivated, Cp#checkpoint_args.name},
mnesia_lib:report_system_event(Event),
do_stop(Cp),
exit(shutdown);
Node == node() ->
deactivate_tab(R), % Avoids unnecessary tm_retain accesses
set({R2#retainer.tab_name, {retainer, R2#retainer.cp_name}}, R2),
R2;
true ->
R2
end.
do_del_retainer(Cp, R0, Node) ->
{R, Rest} = find_retainer(R0, Cp#checkpoint_args.retainers, []),
R2 = do_del_retainer2(Cp, R, Node),
Rs = [R2|Rest],
Cp#checkpoint_args{retainers = Rs, nodes = writers(Rs)}.
do_del_copy(Cp, Tab, ThisNode) when ThisNode == node() ->
Name = Cp#checkpoint_args.name,
Others = Cp#checkpoint_args.nodes -- [ThisNode],
R = val({Tab, {retainer, Name}}),
abcast(Others, Name, {del_retainer, R, ThisNode}),
do_del_retainer(Cp, R, ThisNode).
do_add_copy(Cp, Tab, Node) when Node /= node()->
case lists:member(Tab, Cp#checkpoint_args.max) of
false ->
{ok, Cp};
true ->
Name = Cp#checkpoint_args.name,
R0 = val({Tab, {retainer, Name}}),
W = R0#retainer.writers,
R = R0#retainer{writers = W ++ [Node]},
case lists:member(Node, Cp#checkpoint_args.nodes) of
true ->
send_retainer(Cp, R, Node);
false ->
case tm_remote_prepare(Node, Cp) of
{ok, Name, _IgnoreNew, Node} ->
case lists:member(schema, Cp#checkpoint_args.max) of
true ->
%% We need to send schema retainer somewhere
RS0 = val({schema, {retainer, Name}}),
WS = RS0#retainer.writers,
RS1 = RS0#retainer{writers = WS ++ [Node]},
{ok, Cp1} = send_retainer(Cp, RS1, Node),
send_retainer(Cp1, R, Node);
false ->
send_retainer(Cp, R, Node)
end;
{badrpc, Reason} ->
{{error, {badrpc, Reason}}, Cp};
{error, Reason} ->
{{error, Reason}, Cp}
end
end
end.
tm_remote_prepare(Node, Cp) ->
rpc:call(Node, ?MODULE, tm_prepare, [Cp]).
do_add_retainer(Cp, R0, Node) ->
Writers = R0#retainer.writers,
{R, Rest} = find_retainer(R0, Cp#checkpoint_args.retainers, []),
NewRet =
if
Node == node() ->
prepare_tab(Cp, R#retainer{writers = Writers});
true ->
R#retainer{writers = Writers}
end,
Rs = [NewRet | Rest],
set({NewRet#retainer.tab_name, {retainer, NewRet#retainer.cp_name}}, NewRet),
Cp#checkpoint_args{retainers = Rs, nodes = writers(Rs)}.
find_retainer(#retainer{cp_name = CP, tab_name = Tab},
[Ret = #retainer{cp_name = CP, tab_name = Tab} | R], Acc) ->
{Ret, R ++ Acc};
find_retainer(Ret, [H|R], Acc) ->
find_retainer(Ret, R, [H|Acc]).
send_retainer(Cp, R, Node) ->
Name = Cp#checkpoint_args.name,
Nodes0 = Cp#checkpoint_args.nodes -- [Node],
Nodes = Nodes0 -- [node()],
Msg = {add_retainer, R, Node},
abcast(Nodes, Name, Msg),
{ok, _} = rpc:call(Node, ?MODULE, cast, [Name, Msg]),
Store = R#retainer.store,
send_retainer2(Node, Name, Store, retainer_first(Store)),
Cp2 = do_add_retainer(Cp, R, Node),
{ok, Cp2}.
send_retainer2(_, _, _, '$end_of_table') ->
ok;
%%send_retainer2(Node, Name, Store, {Slot, Records}) ->
send_retainer2(Node, Name, Store, Key) ->
[{Tab, _, Records}] = retainer_get(Store, Key),
abcast([Node], Name, {retain, {dirty, send_retainer}, Tab, Key, Records}),
send_retainer2(Node, Name, Store, retainer_next(Store, Key)).
do_change_copy(Cp, Tab, FromType, ToType) ->
Name = Cp#checkpoint_args.name,
R = val({Tab, {retainer, Name}}),
R2 = prepare_tab(Cp, R, ToType),
{_, Old} = R#retainer.store,
{_, New} = R2#retainer.store,
Fname = tab2retainer({Tab, Name}),
if
FromType == disc_only_copies ->
mnesia_lib:dets_sync_close(Old),
loaded = mnesia_lib:dets_to_ets(Old, New, Fname, set, no, yes),
ok = file:delete(Fname);
ToType == disc_only_copies ->
TabSize = ?ets_info(Old, size),
Props = [{file, Fname},
{type, set},
{keypos, 2},
%% {ram_file, true},
{estimated_no_objects, TabSize + 256},
{repair, false}],
{ok, _} = mnesia_lib:dets_sync_open(New, Props),
ok = mnesia_dumper:raw_dump_table(New, Old),
?ets_delete_table(Old);
true ->
ignore
end,
Pos = #retainer.tab_name,
Rs = lists:keyreplace(Tab, Pos, Cp#checkpoint_args.retainers, R2),
Cp#checkpoint_args{retainers = Rs, nodes = writers(Rs)}.
check_iter(From, Iter) when Iter#iter.pid == From ->
retainer_fixtable(Iter#iter.oid_tab, false),
false;
check_iter(_From, _Iter) ->
true.
init_tabs(R, Iter) ->
{Kind, _} = Store = R#retainer.store,
Main = {Kind, Iter#iter.tab_name},
Ret = Store,
Iter2 = Iter#iter{main_tab = Main, retainer_tab = Ret},
case Iter#iter.source of
table -> Iter2#iter{oid_tab = Main};
retainer -> Iter2#iter{oid_tab = Ret}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Iteration
%%
%% Iterates over a table and applies Fun(ListOfRecords)
%% with a suitable amount of records, e.g. 1000 or so.
%% ListOfRecords is [] when the iteration is over.
%%
%% OidKind affects which internal table to be iterated over and
%% ValKind affects which table to pick the actual records from. Legal
%% values for OidKind and ValKind is the atom table or the atom
%% retainer.
%%
%% The iteration may either be performed over the main table (which
%% contains the latest values of the records, i.e. the values that
%% are visible to the applications) or over the checkpoint retainer
%% (which contains the values as the looked like the timepoint when
%% the checkpoint was activated).
%%
%% It is possible to iterate over the main table and pick values
%% from the retainer and vice versa.
iterate(Name, Tab, Fun, Acc, Source, Val) ->
Iter0 = #iter{tab_name = Tab, source = Source, val = Val},
case call(Name, {iter_begin, Iter0}) of
{error, Reason} ->
{error, Reason};
{ok, Iter, Pid} ->
link(Pid), % We don't want any pending fixtable's
Res = ?CATCH(iter(Fun, Acc, Iter)),
unlink(Pid),
call(Name, {iter_end, Iter}),
case Res of
{'EXIT', Reason} -> {error, Reason};
{error, Reason} -> {error, Reason};
Acc2 -> {ok, Acc2}
end
end.
iter(Fun, Acc, Iter)->
iter(Fun, Acc, Iter, retainer_first(Iter#iter.oid_tab)).
iter(Fun, Acc, Iter, Key) ->
case get_records(Iter, Key) of
{'$end_of_table', []} ->
Fun([], Acc);
{'$end_of_table', Records} ->
Acc2 = Fun(Records, Acc),
Fun([], Acc2);
{Next, Records} ->
Acc2 = Fun(Records, Acc),
iter(Fun, Acc2, Iter, Next)
end.
stop_iteration(Reason) ->
throw({error, {stopped, Reason}}).
get_records(Iter, Key) ->
get_records(Iter, Key, 500, []). % 500 keys
get_records(_Iter, Key, 0, Acc) ->
{Key, lists:append(lists:reverse(Acc))};
get_records(_Iter, '$end_of_table', _I, Acc) ->
{'$end_of_table', lists:append(lists:reverse(Acc))};
get_records(Iter, Key, I, Acc) ->
Recs = get_val(Iter, Key),
Next = retainer_next(Iter#iter.oid_tab, Key),
get_records(Iter, Next, I-1, [Recs | Acc]).
get_val(Iter, Key) when Iter#iter.val == latest ->
get_latest_val(Iter, Key);
get_val(Iter, Key) when Iter#iter.val == checkpoint ->
get_checkpoint_val(Iter, Key).
get_latest_val(Iter, Key) when Iter#iter.source == table ->
retainer_get(Iter#iter.main_tab, Key);
get_latest_val(Iter, Key) when Iter#iter.source == retainer ->
DeleteOid = {Iter#iter.tab_name, Key},
[DeleteOid | retainer_get(Iter#iter.main_tab, Key)].
get_checkpoint_val(Iter, Key) when Iter#iter.source == table ->
retainer_get(Iter#iter.main_tab, Key);
get_checkpoint_val(Iter, Key) when Iter#iter.source == retainer ->
DeleteOid = {Iter#iter.tab_name, Key},
case retainer_get(Iter#iter.retainer_tab, Key) of
[{_, _, []}] -> [DeleteOid];
[{_, _, Records}] -> [DeleteOid | Records]
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% System upgrade
system_continue(_Parent, _Debug, Cp) ->
retainer_loop(Cp).
system_terminate(_Reason, _Parent,_Debug, Cp) ->
do_stop(Cp).
system_code_change(Cp, _Module, _OldVsn, _Extra) ->
{ok, Cp}.
%%%%%%%%%%%%%%%%%%%%%%%%%%
val(Var) ->
case ?catch_val(Var) of
{'EXIT', _} -> mnesia_lib:other_val(Var);
_VaLuE_ -> _VaLuE_
end.