%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1996-2011. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%%
-module(mnesia_bup).
-export([
%% Public interface
iterate/4,
read_schema/2,
fallback_bup/0,
fallback_exists/0,
tm_fallback_start/1,
create_schema/1,
install_fallback/1,
install_fallback/2,
uninstall_fallback/0,
uninstall_fallback/1,
traverse_backup/4,
traverse_backup/6,
make_initial_backup/3,
fallback_to_schema/0,
lookup_schema/2,
schema2bup/1,
refresh_cookie/2,
%% Internal
fallback_receiver/2,
install_fallback_master/2,
uninstall_fallback_master/2,
local_uninstall_fallback/2,
do_traverse_backup/7,
trav_apply/4
]).
-include("mnesia.hrl").
-import(mnesia_lib, [verbose/2, dbg_out/2]).
-record(restore, {mode, bup_module, bup_data}).
-record(fallback_args, {opaque,
scope = global,
module = mnesia_monitor:get_env(backup_module),
use_default_dir = true,
mnesia_dir,
fallback_bup,
fallback_tmp,
skip_tables = [],
keep_tables = [],
default_op = keep_tables
}).
-type fallback_args() :: #fallback_args{}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Backup iterator
%% Reads schema section and iterates over all records in a backup.
%%
%% Fun(BunchOfRecords, Header, Schema, Acc) is applied when a suitable amount
%% of records has been collected.
%%
%% BunchOfRecords will be [] when the iteration is done.
iterate(Mod, Fun, Opaque, Acc) ->
R = #restore{bup_module = Mod, bup_data = Opaque},
try read_schema_section(R) of
{R2, {Header, Schema, Rest}} ->
try iter(R2, Header, Schema, Fun, Acc, Rest) of
{ok, R3, Res} ->
close_read(R3),
{ok, Res}
catch throw:Err ->
close_read(R2),
Err;
_:Reason ->
close_read(R2),
{error, {Reason, erlang:get_stacktrace()}}
end
catch throw:{error,_} = Err ->
Err
end.
iter(R, Header, Schema, Fun, Acc, []) ->
case safe_apply(R, read, [R#restore.bup_data]) of
{R2, []} ->
Res = Fun([], Header, Schema, Acc),
{ok, R2, Res};
{R2, BupItems} ->
iter(R2, Header, Schema, Fun, Acc, BupItems)
end;
iter(R, Header, Schema, Fun, Acc, BupItems) ->
Acc2 = Fun(BupItems, Header, Schema, Acc),
iter(R, Header, Schema, Fun, Acc2, []).
-spec safe_apply(#restore{}, atom(), list()) -> tuple().
safe_apply(R, write, [_, Items]) when Items =:= [] ->
R;
safe_apply(R, What, Args) ->
Abort = fun(Re) -> abort_restore(R, What, Args, Re) end,
Mod = R#restore.bup_module,
try apply(Mod, What, Args) of
{ok, Opaque, Items} when What =:= read ->
{R#restore{bup_data = Opaque}, Items};
{ok, Opaque} when What =/= read->
R#restore{bup_data = Opaque};
{error, Re} ->
Abort(Re);
Re ->
Abort(Re)
catch _:Re ->
Abort(Re)
end.
abort_restore(R = #restore{bup_module=Mod}, What, Args, Reason) ->
dbg_out("Restore aborted. ~p:~p~p -> ~p~n",
[Mod, What, Args, Reason]),
close_read(R),
throw({error, Reason}).
close_read(#restore{bup_module=Mod, bup_data=Opaque}) ->
?SAFE(Mod:close_read(Opaque)).
fallback_to_schema() ->
Fname = fallback_bup(),
fallback_to_schema(Fname).
fallback_to_schema(Fname) ->
Mod = mnesia_backup,
case read_schema(Mod, Fname) of
{error, Reason} ->
{error, Reason};
Schema ->
try lookup_schema(schema, Schema) of
List -> {ok, fallback, List}
catch throw:_ ->
{error, "No schema in fallback"}
end
end.
%% Opens Opaque reads schema and then close
read_schema(Mod, Opaque) ->
R = #restore{bup_module = Mod, bup_data = Opaque},
try read_schema_section(R) of
{_, {_Header, Schema, _}} -> Schema
catch throw:{error,_} = Error ->
Error
after close_read(R)
end.
%% Open backup media and extract schema
%% rewind backup media and leave it open
%% Returns {R, {Header, Schema}}
read_schema_section(R) ->
{R2, {H, Schema, Rest}} = do_read_schema_section(R),
Schema2 = convert_schema(H#log_header.log_version, Schema),
{R2, {H, Schema2, Rest}}.
do_read_schema_section(R) ->
R2 = safe_apply(R, open_read, [R#restore.bup_data]),
{R3, RawSchema} = safe_apply(R2, read, [R2#restore.bup_data]),
do_read_schema_section(R3, verify_header(RawSchema), []).
do_read_schema_section(R, {ok, B, C, []}, Acc) ->
case safe_apply(R, read, [R#restore.bup_data]) of
{R2, []} ->
{R2, {B, Acc, []}};
{R2, RawSchema} ->
do_read_schema_section(R2, {ok, B, C, RawSchema}, Acc)
end;
do_read_schema_section(R, {ok, B, C, [Head | Tail]}, Acc)
when element(1, Head) =:= schema ->
do_read_schema_section(R, {ok, B, C, Tail}, Acc ++ [Head]);
do_read_schema_section(R, {ok, B, _C, Rest}, Acc) ->
{R, {B, Acc, Rest}};
do_read_schema_section(_R, {error, Reason}, _Acc) ->
throw({error, Reason}).
verify_header([H | RawSchema]) when is_record(H, log_header) ->
Current = mnesia_log:backup_log_header(),
if
H#log_header.log_kind =:= Current#log_header.log_kind ->
Versions = ["0.1", "1.1", Current#log_header.log_version],
case lists:member(H#log_header.log_version, Versions) of
true ->
{ok, H, Current, RawSchema};
false ->
{error, {"Bad header version. Cannot be used as backup.", H}}
end;
true ->
{error, {"Bad kind of header. Cannot be used as backup.", H}}
end;
verify_header(RawSchema) ->
{error, {"Missing header. Cannot be used as backup.", ?CATCH(hd(RawSchema))}}.
refresh_cookie(Schema, NewCookie) ->
case lists:keysearch(schema, 2, Schema) of
{value, {schema, schema, List}} ->
Cs = mnesia_schema:list2cs(List),
Cs2 = Cs#cstruct{cookie = NewCookie},
Item = {schema, schema, mnesia_schema:cs2list(Cs2)},
lists:keyreplace(schema, 2, Schema, Item);
false ->
Reason = "No schema found. Cannot be used as backup.",
throw({error, {Reason, Schema}})
end.
%% Convert schema items from an external backup
%% If backup format is the latest, no conversion is needed
%% All supported backup formats should have their converters
%% here as separate function clauses.
convert_schema("0.1", Schema) ->
convert_0_1(Schema);
convert_schema("1.1", Schema) ->
%% The new backup format is a pure extension of the old one
Current = mnesia_log:backup_log_header(),
convert_schema(Current#log_header.log_version, Schema);
convert_schema(Latest, Schema) ->
H = mnesia_log:backup_log_header(),
if
H#log_header.log_version =:= Latest ->
Schema;
true ->
Reason = "Bad backup header version. Cannot convert schema.",
throw({error, {Reason, H}})
end.
%% Backward compatibility for 0.1
convert_0_1(Schema) ->
case lists:keysearch(schema, 2, Schema) of
{value, {schema, schema, List}} ->
Schema2 = lists:keydelete(schema, 2, Schema),
Cs = mnesia_schema:list2cs(List),
convert_0_1(Schema2, [], Cs);
false ->
List = mnesia_schema:get_initial_schema(disc_copies, [node()]),
Cs = mnesia_schema:list2cs(List),
convert_0_1(Schema, [], Cs)
end.
convert_0_1([{schema, cookie, Cookie} | Schema], Acc, Cs) ->
convert_0_1(Schema, Acc, Cs#cstruct{cookie = Cookie});
convert_0_1([{schema, db_nodes, DbNodes} | Schema], Acc, Cs) ->
convert_0_1(Schema, Acc, Cs#cstruct{disc_copies = DbNodes});
convert_0_1([{schema, version, Version} | Schema], Acc, Cs) ->
convert_0_1(Schema, Acc, Cs#cstruct{version = Version});
convert_0_1([{schema, Tab, Def} | Schema], Acc, Cs) ->
Head =
case lists:keysearch(index, 1, Def) of
{value, {index, PosList}} ->
%% Remove the snmp "index"
P = PosList -- [snmp],
Def2 = lists:keyreplace(index, 1, Def, {index, P}),
{schema, Tab, Def2};
false ->
{schema, Tab, Def}
end,
convert_0_1(Schema, [Head | Acc], Cs);
convert_0_1([Head | Schema], Acc, Cs) ->
convert_0_1(Schema, [Head | Acc], Cs);
convert_0_1([], Acc, Cs) ->
[schema2bup({schema, schema, Cs}) | Acc].
%% Returns Val or throw error
lookup_schema(Key, Schema) ->
case lists:keysearch(Key, 2, Schema) of
{value, {schema, Key, Val}} -> Val;
false -> throw({error, {"Cannot lookup", Key}})
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Backup compatibility
%% Convert internal schema items to backup dito
schema2bup({schema, Tab}) ->
{schema, Tab};
schema2bup({schema, Tab, TableDef}) ->
{schema, Tab, mnesia_schema:cs2list(TableDef)}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Create schema on the given nodes
%% Requires that old schemas has been deleted
%% Returns ok | {error, Reason}
create_schema([]) ->
create_schema([node()]);
create_schema(Ns) when is_list(Ns) ->
case is_set(Ns) of
true ->
create_schema(Ns, mnesia_schema:ensure_no_schema(Ns));
false ->
{error, {combine_error, Ns}}
end;
create_schema(Ns) ->
{error, {badarg, Ns}}.
is_set(List) when is_list(List) ->
ordsets:is_set(lists:sort(List));
is_set(_) ->
false.
create_schema(Ns, ok) ->
%% Ensure that we access the intended Mnesia
%% directory. This function may not be called
%% during startup since it will cause the
%% application_controller to get into deadlock
case mnesia_lib:ensure_loaded(?APPLICATION) of
ok ->
case mnesia_monitor:get_env(schema_location) of
ram ->
{error, {has_no_disc, node()}};
_ ->
case mnesia_schema:opt_create_dir(true, mnesia_lib:dir()) of
{error, What} ->
{error, What};
ok ->
Mod = mnesia_backup,
Str = mk_str(),
File = mnesia_lib:dir(Str),
file:delete(File),
try make_initial_backup(Ns, File, Mod) of
{ok, _Res} ->
case do_install_fallback(File, Mod) of
ok ->
file:delete(File),
ok;
{error, Reason} ->
{error, Reason}
end
catch throw:{error, Reason} ->
{error, Reason}
end
end
end;
{error, Reason} ->
{error, Reason}
end;
create_schema(_Ns, {error, Reason}) ->
{error, Reason};
create_schema(_Ns, Reason) ->
{error, Reason}.
mk_str() ->
Now = integer_to_list(erlang:unique_integer([positive])),
lists:concat([node()] ++ Now ++ ".TMP").
make_initial_backup(Ns, Opaque, Mod) ->
Orig = mnesia_schema:get_initial_schema(disc_copies, Ns),
Modded = proplists:delete(storage_properties, proplists:delete(majority, Orig)),
Schema = [{schema, schema, Modded}],
O2 = do_apply(Mod, open_write, [Opaque], Opaque),
O3 = do_apply(Mod, write, [O2, [mnesia_log:backup_log_header()]], O2),
O4 = do_apply(Mod, write, [O3, Schema], O3),
O5 = do_apply(Mod, commit_write, [O4], O4),
{ok, O5}.
do_apply(_, write, [_, Items], Opaque) when Items =:= [] ->
Opaque;
do_apply(Mod, What, Args, _Opaque) ->
try apply(Mod, What, Args) of
{ok, Opaque2} -> Opaque2;
{error, Reason} -> throw({error, Reason})
catch _:Reason ->
throw({error, {'EXIT', Reason}})
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Restore
%% Restore schema and possibly other tables from a backup
%% and replicate them to the necessary nodes
%% Requires that old schemas has been deleted
%% Returns ok | {error, Reason}
install_fallback(Opaque) ->
install_fallback(Opaque, []).
install_fallback(Opaque, Args) ->
%% Ensure that we access the intended Mnesia
%% directory. This function may not be called
%% during startup since it will cause the
%% application_controller to get into deadlock
case mnesia_lib:ensure_loaded(?APPLICATION) of
ok ->
do_install_fallback(Opaque, Args);
{error, Reason} ->
{error, Reason}
end.
do_install_fallback(Opaque, Mod) when is_atom(Mod) ->
do_install_fallback(Opaque, [{module, Mod}]);
do_install_fallback(Opaque, Args) when is_list(Args) ->
case check_fallback_args(Args, #fallback_args{opaque = Opaque}) of
{ok, FA} ->
do_install_fallback(FA);
{error, Reason} ->
{error, Reason}
end;
do_install_fallback(_Opaque, Args) ->
{error, {badarg, Args}}.
check_fallback_args([Arg | Tail], FA) ->
try check_fallback_arg_type(Arg, FA) of
FA2 ->
check_fallback_args(Tail, FA2)
catch error:_ ->
{error, {badarg, Arg}}
end;
check_fallback_args([], FA) ->
{ok, FA}.
check_fallback_arg_type(Arg, FA) ->
case Arg of
{scope, global} ->
FA#fallback_args{scope = global};
{scope, local} ->
FA#fallback_args{scope = local};
{module, Mod} ->
Mod2 = mnesia_monitor:do_check_type(backup_module, Mod),
FA#fallback_args{module = Mod2};
{mnesia_dir, Dir} ->
FA#fallback_args{mnesia_dir = Dir,
use_default_dir = false};
{keep_tables, Tabs} ->
atom_list(Tabs),
FA#fallback_args{keep_tables = Tabs};
{skip_tables, Tabs} ->
atom_list(Tabs),
FA#fallback_args{skip_tables = Tabs};
{default_op, keep_tables} ->
FA#fallback_args{default_op = keep_tables};
{default_op, skip_tables} ->
FA#fallback_args{default_op = skip_tables}
end.
atom_list([H | T]) when is_atom(H) ->
atom_list(T);
atom_list([]) ->
ok.
do_install_fallback(FA) ->
Pid = spawn_link(?MODULE, install_fallback_master, [self(), FA]),
Res =
receive
{'EXIT', Pid, Reason} -> % if appl has trapped exit
{error, {'EXIT', Reason}};
{Pid, Res2} ->
case Res2 of
{ok, _} ->
ok;
{error, Reason} ->
{error, {"Cannot install fallback", Reason}}
end
end,
Res.
install_fallback_master(ClientPid, FA) ->
process_flag(trap_exit, true),
State = {start, FA},
Opaque = FA#fallback_args.opaque,
Mod = FA#fallback_args.module,
Res = iterate(Mod, fun restore_recs/4, Opaque, State),
unlink(ClientPid),
ClientPid ! {self(), Res},
exit(shutdown).
restore_recs(_, _, _, stop) ->
throw({error, "restore_recs already stopped"});
restore_recs(Recs, Header, Schema, {start, FA}) ->
%% No records in backup
Schema2 = convert_schema(Header#log_header.log_version, Schema),
CreateList = lookup_schema(schema, Schema2),
try mnesia_schema:list2cs(CreateList) of
Cs ->
Ns = get_fallback_nodes(FA, Cs#cstruct.disc_copies),
global:set_lock({{mnesia_table_lock, schema}, self()}, Ns, infinity),
Args = [self(), FA],
Pids = [spawn_link(N, ?MODULE, fallback_receiver, Args) || N <- Ns],
send_fallback(Pids, {start, Header, Schema2}),
Res = restore_recs(Recs, Header, Schema2, Pids),
global:del_lock({{mnesia_table_lock, schema}, self()}, Ns),
Res
catch _:Reason ->
throw({error, {"Bad schema in restore_recs", Reason}})
end;
restore_recs([], _Header, _Schema, Pids) ->
send_fallback(Pids, swap),
send_fallback(Pids, stop),
stop;
restore_recs(Recs, _, _, Pids) ->
send_fallback(Pids, {records, Recs}),
Pids.
get_fallback_nodes(FA, Ns) ->
This = node(),
case lists:member(This, Ns) of
true ->
case FA#fallback_args.scope of
global -> Ns;
local -> [This]
end;
false ->
throw({error, {"No disc resident schema on local node", Ns}})
end.
send_fallback(Pids, Msg) when is_list(Pids), Pids =/= [] ->
lists:foreach(fun(Pid) -> Pid ! {self(), Msg} end, Pids),
rec_answers(Pids, []).
rec_answers([], Acc) ->
case {lists:keysearch(error, 1, Acc), mnesia_lib:uniq(Acc)} of
{{value, {error, Val}}, _} -> throw({error, Val});
{_, [SameAnswer]} -> SameAnswer;
{_, Other} -> throw({error, {"Different answers", Other}})
end;
rec_answers(Pids, Acc) ->
receive
{'EXIT', Pid, stopped} ->
Pids2 = lists:delete(Pid, Pids),
rec_answers(Pids2, [stopped|Acc]);
{'EXIT', Pid, Reason} ->
Pids2 = lists:delete(Pid, Pids),
rec_answers(Pids2, [{error, {'EXIT', Pid, Reason}}|Acc]);
{Pid, Reply} ->
Pids2 = lists:delete(Pid, Pids),
rec_answers(Pids2, [Reply|Acc])
end.
fallback_exists() ->
Fname = fallback_bup(),
fallback_exists(Fname).
fallback_exists(Fname) ->
case mnesia_monitor:use_dir() of
true ->
mnesia_lib:exists(Fname);
false ->
case ?catch_val(active_fallback) of
{'EXIT', _} -> false;
Bool -> Bool
end
end.
fallback_name() -> "FALLBACK.BUP".
fallback_bup() -> mnesia_lib:dir(fallback_name()).
fallback_tmp_name() -> "FALLBACK.TMP".
%% fallback_full_tmp_name() -> mnesia_lib:dir(fallback_tmp_name()).
-spec fallback_receiver(pid(), fallback_args()) -> no_return().
fallback_receiver(Master, FA) ->
process_flag(trap_exit, true),
Res = try
register(mnesia_fallback, self()),
FA2 = check_fallback_dir(FA),
Bup = FA2#fallback_args.fallback_bup,
false = mnesia_lib:exists(Bup),
Mod = mnesia_backup,
Tmp = FA2#fallback_args.fallback_tmp,
R = #restore{mode = replace,
bup_module = Mod,
bup_data = Tmp},
file:delete(Tmp),
fallback_receiver_loop(Master, R, FA2, schema)
catch
error:_ ->
Reason = {already_exists, node()},
local_fallback_error(Master, Reason);
throw:{error, Reason} ->
local_fallback_error(Master, Reason)
end,
exit(Res).
local_fallback_error(Master, Reason) ->
Master ! {self(), {error, Reason}},
unlink(Master),
exit(Reason).
check_fallback_dir(Master, FA) ->
try check_fallback_dir(FA)
catch throw:{error,Reason} ->
local_fallback_error(Master, Reason)
end.
check_fallback_dir(FA) ->
case mnesia:system_info(schema_location) of
ram ->
Reason = {has_no_disc, node()},
throw({error, Reason});
_ ->
Dir = check_fallback_dir_arg(FA),
Bup = filename:join([Dir, fallback_name()]),
Tmp = filename:join([Dir, fallback_tmp_name()]),
FA#fallback_args{fallback_bup = Bup,
fallback_tmp = Tmp,
mnesia_dir = Dir}
end.
check_fallback_dir_arg(FA) ->
case FA#fallback_args.use_default_dir of
true ->
mnesia_lib:dir();
false when FA#fallback_args.scope =:= local ->
Dir = FA#fallback_args.mnesia_dir,
try mnesia_monitor:do_check_type(dir, Dir)
catch _:_ ->
Reason = {badarg, {dir, Dir}, node()},
throw({error, Reason})
end;
false when FA#fallback_args.scope =:= global ->
Reason = {combine_error, global, dir, node()},
throw({error, Reason})
end.
fallback_receiver_loop(Master, R, FA, State) ->
receive
{Master, {start, Header, Schema}} when State =:= schema ->
Dir = FA#fallback_args.mnesia_dir,
throw_bad_res(ok, mnesia_schema:opt_create_dir(true, Dir)),
R2 = safe_apply(R, open_write, [R#restore.bup_data]),
R3 = safe_apply(R2, write, [R2#restore.bup_data, [Header]]),
BupSchema = [schema2bup(S) || S <- Schema],
R4 = safe_apply(R3, write, [R3#restore.bup_data, BupSchema]),
Master ! {self(), ok},
fallback_receiver_loop(Master, R4, FA, records);
{Master, {records, Recs}} when State =:= records ->
R2 = safe_apply(R, write, [R#restore.bup_data, Recs]),
Master ! {self(), ok},
fallback_receiver_loop(Master, R2, FA, records);
{Master, swap} when State =/= schema ->
?eval_debug_fun({?MODULE, fallback_receiver_loop, pre_swap}, []),
safe_apply(R, commit_write, [R#restore.bup_data]),
Bup = FA#fallback_args.fallback_bup,
Tmp = FA#fallback_args.fallback_tmp,
throw_bad_res(ok, file:rename(Tmp, Bup)),
?SAFE(mnesia_lib:set(active_fallback, true)),
?eval_debug_fun({?MODULE, fallback_receiver_loop, post_swap}, []),
Master ! {self(), ok},
fallback_receiver_loop(Master, R, FA, stop);
{Master, stop} when State =:= stop ->
stopped;
Msg ->
safe_apply(R, abort_write, [R#restore.bup_data]),
Tmp = FA#fallback_args.fallback_tmp,
file:delete(Tmp),
throw({error, "Unexpected msg fallback_receiver_loop", Msg})
end.
throw_bad_res(Expected, Expected) -> Expected;
throw_bad_res(_Expected, {error, Actual}) -> throw({error, Actual});
throw_bad_res(_Expected, Actual) -> throw({error, Actual}).
-record(local_tab, {name,
storage_type,
open,
add,
close,
swap,
record_name,
opened}).
tm_fallback_start(IgnoreFallback) ->
mnesia_schema:lock_schema(),
Res = do_fallback_start(fallback_exists(), IgnoreFallback),
mnesia_schema:unlock_schema(),
case Res of
ok -> ok;
{error, Reason} -> exit(Reason)
end.
do_fallback_start(false, _IgnoreFallback) ->
ok;
do_fallback_start(true, true) ->
verbose("Ignoring fallback at startup, but leaving it active...~n", []),
mnesia_lib:set(active_fallback, true),
ok;
do_fallback_start(true, false) ->
verbose("Starting from fallback...~n", []),
BupFile = fallback_bup(),
Mod = mnesia_backup,
LocalTabs = ?ets_new_table(mnesia_local_tables, [set, public, {keypos, 2}]),
case iterate(Mod, fun restore_tables/4, BupFile, {start, LocalTabs}) of
{ok, _Res} ->
?SAFE(dets:close(schema)),
TmpSchema = mnesia_lib:tab2tmp(schema),
DatSchema = mnesia_lib:tab2dat(schema),
AllLT = ?ets_match_object(LocalTabs, '_'),
?ets_delete_table(LocalTabs),
case file:rename(TmpSchema, DatSchema) of
ok ->
[(LT#local_tab.swap)(LT#local_tab.name, LT) ||
LT <- AllLT, LT#local_tab.name =/= schema],
file:delete(BupFile),
ok;
{error, Reason} ->
file:delete(TmpSchema),
{error, {"Cannot start from fallback. Rename error.", Reason}}
end;
{error, Reason} ->
{error, {"Cannot start from fallback", Reason}}
end.
restore_tables(All=[Rec | Recs], Header, Schema, State={local, LocalTabs, LT}) ->
Tab = element(1, Rec),
if
Tab =:= LT#local_tab.name ->
Key = element(2, Rec),
(LT#local_tab.add)(Tab, Key, Rec, LT),
restore_tables(Recs, Header, Schema, State);
true ->
NewState = {new, LocalTabs},
restore_tables(All, Header, Schema, NewState)
end;
restore_tables(All=[Rec | Recs], Header, Schema, {new, LocalTabs}) ->
Tab = element(1, Rec),
case ?ets_lookup(LocalTabs, Tab) of
[] ->
State = {not_local, LocalTabs, Tab},
restore_tables(Recs, Header, Schema, State);
[LT] when is_record(LT, local_tab) ->
State = {local, LocalTabs, LT},
case LT#local_tab.opened of
true -> ignore;
false ->
(LT#local_tab.open)(Tab, LT),
?ets_insert(LocalTabs,LT#local_tab{opened=true})
end,
restore_tables(All, Header, Schema, State)
end;
restore_tables(All=[Rec | Recs], Header, Schema, S = {not_local, LocalTabs, PrevTab}) ->
Tab = element(1, Rec),
if
Tab =:= PrevTab ->
restore_tables(Recs, Header, Schema, S);
true ->
State = {new, LocalTabs},
restore_tables(All, Header, Schema, State)
end;
restore_tables(Recs, Header, Schema, {start, LocalTabs}) ->
Dir = mnesia_lib:dir(),
OldDir = filename:join([Dir, "OLD_DIR"]),
mnesia_schema:purge_dir(OldDir, []),
mnesia_schema:purge_dir(Dir, [fallback_name()]),
init_dat_files(Schema, LocalTabs),
State = {new, LocalTabs},
restore_tables(Recs, Header, Schema, State);
restore_tables([], _Header, _Schema, State) ->
State.
%% Creates all neccessary dat files and inserts
%% the table definitions in the schema table
%%
%% Returns a list of local_tab tuples for all local tables
init_dat_files(Schema, LocalTabs) ->
TmpFile = mnesia_lib:tab2tmp(schema),
Args = [{file, TmpFile}, {keypos, 2}, {type, set}],
case dets:open_file(schema, Args) of % Assume schema lock
{ok, _} ->
create_dat_files(Schema, LocalTabs),
ok = dets:close(schema),
LocalTab = #local_tab{name = schema,
storage_type = disc_copies,
open = undefined,
add = undefined,
close = undefined,
swap = undefined,
record_name = schema,
opened = false},
?ets_insert(LocalTabs, LocalTab);
{error, Reason} ->
throw({error, {"Cannot open file", schema, Args, Reason}})
end.
create_dat_files([{schema, schema, TabDef} | Tail], LocalTabs) ->
ok = dets:insert(schema, {schema, schema, TabDef}),
create_dat_files(Tail, LocalTabs);
create_dat_files([{schema, Tab, TabDef} | Tail], LocalTabs) ->
TmpFile = mnesia_lib:tab2tmp(Tab),
DatFile = mnesia_lib:tab2dat(Tab),
DclFile = mnesia_lib:tab2dcl(Tab),
DcdFile = mnesia_lib:tab2dcd(Tab),
Expunge = fun() ->
file:delete(DatFile),
file:delete(DclFile),
file:delete(DcdFile)
end,
mnesia_lib:dets_sync_close(Tab),
file:delete(TmpFile),
Cs = mnesia_schema:list2cs(TabDef),
ok = dets:insert(schema, {schema, Tab, TabDef}),
RecName = Cs#cstruct.record_name,
Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
if
Storage =:= unknown ->
ok = dets:delete(schema, {schema, Tab}),
create_dat_files(Tail, LocalTabs);
Storage =:= disc_only_copies ->
Args = [{file, TmpFile}, {keypos, 2},
{type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)}],
Open = fun(T, LT) when T =:= LT#local_tab.name ->
case mnesia_lib:dets_sync_open(T, Args) of
{ok, _} ->
ok;
{error, Reason} ->
throw({error, {"Cannot open file", T, Args, Reason}})
end
end,
Add = fun(T, Key, Rec, LT) when T =:= LT#local_tab.name ->
case Rec of
{_T, Key} ->
ok = dets:delete(T, Key);
(Rec) when T =:= RecName ->
ok = dets:insert(Tab, Rec);
(Rec) ->
Rec2 = setelement(1, Rec, RecName),
ok = dets:insert(T, Rec2)
end
end,
Close = fun(T, LT) when T =:= LT#local_tab.name ->
mnesia_lib:dets_sync_close(T)
end,
Swap = fun(T, LT) when T =:= LT#local_tab.name ->
Expunge(),
case LT#local_tab.opened of
true ->
Close(T,LT);
false ->
Open(T,LT),
Close(T,LT)
end,
case file:rename(TmpFile, DatFile) of
ok ->
ok;
{error, Reason} ->
mnesia_lib:fatal("Cannot rename file ~p -> ~p: ~p~n",
[TmpFile, DatFile, Reason])
end
end,
LocalTab = #local_tab{name = Tab,
storage_type = Storage,
open = Open,
add = Add,
close = Close,
swap = Swap,
record_name = RecName,
opened = false},
?ets_insert(LocalTabs, LocalTab),
create_dat_files(Tail, LocalTabs);
Storage =:= ram_copies; Storage =:= disc_copies ->
Open = fun(T, LT) when T =:= LT#local_tab.name ->
mnesia_log:open_log({?MODULE, T},
mnesia_log:dcl_log_header(),
TmpFile,
false,
false,
read_write)
end,
Add = fun(T, Key, Rec, LT) when T =:= LT#local_tab.name ->
Log = {?MODULE, T},
case Rec of
{_T, Key} ->
mnesia_log:append(Log, {{T, Key}, {T, Key}, delete});
(Rec) when T =:= RecName ->
mnesia_log:append(Log, {{T, Key}, Rec, write});
(Rec) ->
Rec2 = setelement(1, Rec, RecName),
mnesia_log:append(Log, {{T, Key}, Rec2, write})
end
end,
Close = fun(T, LT) when T =:= LT#local_tab.name ->
mnesia_log:close_log({?MODULE, T})
end,
Swap = fun(T, LT) when T =:= LT#local_tab.name ->
Expunge(),
if
Storage =:= ram_copies, LT#local_tab.opened =:= false ->
ok;
true ->
Log = mnesia_log:open_log(fallback_tab,
mnesia_log:dcd_log_header(),
DcdFile,
false),
mnesia_log:close_log(Log),
case LT#local_tab.opened of
true ->
Close(T,LT);
false ->
Open(T,LT),
Close(T,LT)
end,
case file:rename(TmpFile, DclFile) of
ok ->
ok;
{error, Reason} ->
mnesia_lib:fatal("Cannot rename file ~p -> ~p: ~p~n",
[TmpFile, DclFile, Reason])
end
end
end,
LocalTab = #local_tab{name = Tab,
storage_type = Storage,
open = Open,
add = Add,
close = Close,
swap = Swap,
record_name = RecName,
opened = false
},
?ets_insert(LocalTabs, LocalTab),
create_dat_files(Tail, LocalTabs)
end;
create_dat_files([{schema, Tab} | Tail], LocalTabs) ->
?ets_delete(LocalTabs, Tab),
ok = dets:delete(schema, {schema, Tab}),
TmpFile = mnesia_lib:tab2tmp(Tab),
mnesia_lib:dets_sync_close(Tab),
file:delete(TmpFile),
create_dat_files(Tail, LocalTabs);
create_dat_files([], _LocalTabs) ->
ok.
uninstall_fallback() ->
uninstall_fallback([{scope, global}]).
uninstall_fallback(Args) ->
case check_fallback_args(Args, #fallback_args{}) of
{ok, FA} ->
do_uninstall_fallback(FA);
{error, Reason} ->
{error, Reason}
end.
do_uninstall_fallback(FA) ->
%% Ensure that we access the intended Mnesia
%% directory. This function may not be called
%% during startup since it will cause the
%% application_controller to get into deadlock
case mnesia_lib:ensure_loaded(?APPLICATION) of
ok ->
Pid = spawn_link(?MODULE, uninstall_fallback_master, [self(), FA]),
receive
{'EXIT', Pid, Reason} -> % if appl has trapped exit
{error, {'EXIT', Reason}};
{Pid, Res} ->
Res
end;
{error, Reason} ->
{error, Reason}
end.
-spec uninstall_fallback_master(pid(), fallback_args()) -> no_return().
uninstall_fallback_master(ClientPid, FA) ->
process_flag(trap_exit, true),
FA2 = check_fallback_dir(ClientPid, FA), % May exit
Bup = FA2#fallback_args.fallback_bup,
case fallback_to_schema(Bup) of
{ok, fallback, List} ->
Cs = mnesia_schema:list2cs(List),
try get_fallback_nodes(FA, Cs#cstruct.disc_copies) of
Ns when is_list(Ns) ->
do_uninstall(ClientPid, Ns, FA)
catch throw:{error, Reason} ->
local_fallback_error(ClientPid, Reason)
end;
{error, Reason} ->
local_fallback_error(ClientPid, Reason)
end.
do_uninstall(ClientPid, Ns, FA) ->
Args = [self(), FA],
global:set_lock({{mnesia_table_lock, schema}, self()}, Ns, infinity),
Pids = [spawn_link(N, ?MODULE, local_uninstall_fallback, Args) || N <- Ns],
Res = do_uninstall(ClientPid, Pids, [], [], ok),
global:del_lock({{mnesia_table_lock, schema}, self()}, Ns),
ClientPid ! {self(), Res},
unlink(ClientPid),
exit(shutdown).
do_uninstall(ClientPid, [Pid | Pids], GoodPids, BadNodes, Res) ->
receive
%% {'EXIT', ClientPid, _} ->
%% client_exit;
{'EXIT', Pid, Reason} ->
BadNode = node(Pid),
BadRes = {error, {"Uninstall fallback", BadNode, Reason}},
do_uninstall(ClientPid, Pids, GoodPids, [BadNode | BadNodes], BadRes);
{Pid, {error, Reason}} ->
BadNode = node(Pid),
BadRes = {error, {"Uninstall fallback", BadNode, Reason}},
do_uninstall(ClientPid, Pids, GoodPids, [BadNode | BadNodes], BadRes);
{Pid, started} ->
do_uninstall(ClientPid, Pids, [Pid | GoodPids], BadNodes, Res)
end;
do_uninstall(ClientPid, [], GoodPids, [], ok) ->
lists:foreach(fun(Pid) -> Pid ! {self(), do_uninstall} end, GoodPids),
rec_uninstall(ClientPid, GoodPids, ok);
do_uninstall(_ClientPid, [], GoodPids, BadNodes, BadRes) ->
lists:foreach(fun(Pid) -> exit(Pid, shutdown) end, GoodPids),
{error, {node_not_running, BadNodes, BadRes}}.
local_uninstall_fallback(Master, FA) ->
%% Don't trap exit
register(mnesia_fallback, self()), % May exit
FA2 = check_fallback_dir(Master, FA), % May exit
Master ! {self(), started},
receive
{Master, do_uninstall} ->
?eval_debug_fun({?MODULE, uninstall_fallback2, pre_delete}, []),
?SAFE(mnesia_lib:set(active_fallback, false)),
Tmp = FA2#fallback_args.fallback_tmp,
Bup = FA2#fallback_args.fallback_bup,
file:delete(Tmp),
Res = file:delete(Bup),
?eval_debug_fun({?MODULE, uninstall_fallback2, post_delete}, []),
Master ! {self(), Res},
unlink(Master),
exit(normal)
end.
rec_uninstall(ClientPid, [Pid | Pids], AccRes) ->
receive
%% {'EXIT', ClientPid, _} ->
%% exit(shutdown);
{'EXIT', Pid, R} ->
Reason = {node_not_running, {node(Pid), R}},
rec_uninstall(ClientPid, Pids, {error, Reason});
{Pid, ok} ->
rec_uninstall(ClientPid, Pids, AccRes);
{Pid, BadRes} ->
rec_uninstall(ClientPid, Pids, BadRes)
end;
rec_uninstall(_, [], Res) ->
Res.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Backup traversal
%% Iterate over a backup and produce a new backup.
%% Fun(BackupItem, Acc) is applied for each BackupItem.
%%
%% Valid BackupItems are:
%%
%% {schema, Tab} Table to be deleted
%% {schema, Tab, CreateList} Table to be created, CreateList may be empty
%% {schema, db_nodes, DbNodes}List of nodes, defaults to [node()] OLD
%% {schema, version, Version} Schema version OLD
%% {schema, cookie, Cookie} Unique schema cookie OLD
%% {Tab, Key} Oid for record to be deleted
%% Record Record to be inserted.
%%
%% The Fun must return a tuple {BackupItems, NewAcc}
%% where BackupItems is a list of valid BackupItems and
%% NewAcc is a new accumulator value. Once BackupItems
%% that not are schema related has been returned, no more schema
%% items may be returned. The schema related items must always be
%% first in the backup.
%%
%% If TargetMod =:= read_only, no new backup will be created.
%%
%% Opening of the source media will be performed by
%% to SourceMod:open_read(Source)
%%
%% Opening of the target media will be performed by
%% to TargetMod:open_write(Target)
traverse_backup(Source, Target, Fun, Acc) ->
Mod = mnesia_monitor:get_env(backup_module),
traverse_backup(Source, Mod, Target, Mod, Fun, Acc).
traverse_backup(Source, SourceMod, Target, TargetMod, Fun, Acc) ->
Args = [self(), Source, SourceMod, Target, TargetMod, Fun, Acc],
Pid = spawn_link(?MODULE, do_traverse_backup, Args),
receive
{'EXIT', Pid, Reason} ->
{error, {"Backup traversal crashed", Reason}};
{iter_done, Pid, Res} ->
Res
end.
do_traverse_backup(ClientPid, Source, SourceMod, Target, TargetMod, Fun, Acc) ->
process_flag(trap_exit, true),
Iter =
if
TargetMod =/= read_only ->
try do_apply(TargetMod, open_write, [Target], Target)
catch throw:{error, Error} ->
unlink(ClientPid),
ClientPid ! {iter_done, self(), {error, Error}},
exit(Error)
end;
true ->
ignore
end,
A = {start, Fun, Acc, TargetMod, Iter},
Res =
case iterate(SourceMod, fun trav_apply/4, Source, A) of
{ok, {iter, _, Acc2, _, Iter2}} when TargetMod =/= read_only ->
try
do_apply(TargetMod, commit_write, [Iter2], Iter2),
{ok, Acc2}
catch throw:{error, Reason} ->
{error, Reason}
end;
{ok, {iter, _, Acc2, _, _}} ->
{ok, Acc2};
{error, Reason} when TargetMod =/= read_only->
?CATCH(do_apply(TargetMod, abort_write, [Iter], Iter)),
{error, {"Backup traversal failed", Reason}};
{error, Reason} ->
{error, {"Backup traversal failed", Reason}}
end,
unlink(ClientPid),
ClientPid ! {iter_done, self(), Res}.
trav_apply(Recs, _Header, _Schema, {iter, Fun, Acc, Mod, Iter}) ->
{NewRecs, Acc2} = filter_foldl(Fun, Acc, Recs),
if
Mod =/= read_only, NewRecs =/= [] ->
Iter2 = do_apply(Mod, write, [Iter, NewRecs], Iter),
{iter, Fun, Acc2, Mod, Iter2};
true ->
{iter, Fun, Acc2, Mod, Iter}
end;
trav_apply(Recs, Header, Schema, {start, Fun, Acc, Mod, Iter}) ->
Iter2 =
if
Mod =/= read_only ->
do_apply(Mod, write, [Iter, [Header]], Iter);
true ->
Iter
end,
TravAcc = trav_apply(Schema, Header, Schema, {iter, Fun, Acc, Mod, Iter2}),
trav_apply(Recs, Header, Schema, TravAcc).
filter_foldl(Fun, Acc, [Head|Tail]) ->
case Fun(Head, Acc) of
{HeadItems, HeadAcc} when is_list(HeadItems) ->
{TailItems, TailAcc} = filter_foldl(Fun, HeadAcc, Tail),
{HeadItems ++ TailItems, TailAcc};
Other ->
throw({error, {"Fun must return a list", Other}})
end;
filter_foldl(_Fun, Acc, []) ->
{[], Acc}.