%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1996-2014. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%%
-module(mnesia_dumper).
%% The InitBy arg may be one of the following:
%% scan_decisions Initial scan for decisions
%% startup Initial dump during startup
%% schema_prepare Dump initiated during schema transaction preparation
%% schema_update Dump initiated during schema transaction commit
%% fast_schema_update A schema_update, but ignores the log file
%% user Dump initiated by user
%% write_threshold Automatic dump caused by too many log writes
%% time_threshold Automatic dump caused by timeout
%% Public interface
-export([
get_log_writes/0,
incr_log_writes/0,
needs_dump_ets/1,
raw_dump_table/2,
raw_named_dump_table/2,
start_regulator/0,
opt_dump_log/1,
update/3,
snapshot_dcd/1
]).
%% Internal stuff
-export([regulator_init/1]).
-include("mnesia.hrl").
-include_lib("kernel/include/file.hrl").
-import(mnesia_lib, [fatal/2, dbg_out/2]).
-define(REGULATOR_NAME, mnesia_dumper_load_regulator).
-define(DumpToEtsMultiplier, 4).
get_log_writes() ->
Max = mnesia_monitor:get_env(dump_log_write_threshold),
Prev = mnesia_lib:read_counter(trans_log_writes),
Left = mnesia_lib:read_counter(trans_log_writes_left),
Diff = Max - Left,
Prev + Diff.
incr_log_writes() ->
Left = mnesia_lib:incr_counter(trans_log_writes_left, -1),
if
Left > 0 ->
ignore;
true ->
adjust_log_writes(true)
end.
adjust_log_writes(DoCast) ->
Token = {mnesia_adjust_log_writes, self()},
case global:set_lock(Token, [node()], 1) of
false ->
ignore; %% Somebody else is sending a dump request
true ->
case DoCast of
false ->
ignore;
true ->
mnesia_controller:async_dump_log(write_threshold)
end,
Max = mnesia_monitor:get_env(dump_log_write_threshold),
Left = mnesia_lib:read_counter(trans_log_writes_left),
%% Don't care if we lost a few writes
mnesia_lib:set_counter(trans_log_writes_left, Max),
Diff = Max - Left,
_ = mnesia_lib:incr_counter(trans_log_writes, Diff),
global:del_lock(Token, [node()])
end.
%% Returns 'ok' or exits
opt_dump_log(InitBy) ->
Reg = case whereis(?REGULATOR_NAME) of
undefined ->
nopid;
Pid when is_pid(Pid) ->
Pid
end,
perform_dump(InitBy, Reg).
snapshot_dcd(Tables) ->
lists:foreach(
fun(Tab) ->
case mnesia_lib:storage_type_at_node(node(), Tab) of
disc_copies ->
mnesia_log:ets2dcd(Tab);
_ ->
%% Storage type was checked before queueing the op, though
skip
end
end, Tables),
dumped.
%% Scan for decisions
perform_dump(InitBy, Regulator) when InitBy == scan_decisions ->
?eval_debug_fun({?MODULE, perform_dump}, [InitBy]),
dbg_out("Transaction log dump initiated by ~w~n", [InitBy]),
scan_decisions(mnesia_log:previous_log_file(), InitBy, Regulator),
scan_decisions(mnesia_log:latest_log_file(), InitBy, Regulator);
%% Propagate the log into the DAT-files
perform_dump(InitBy, Regulator) ->
?eval_debug_fun({?MODULE, perform_dump}, [InitBy]),
LogState = mnesia_log:prepare_log_dump(InitBy),
dbg_out("Transaction log dump initiated by ~w: ~w~n",
[InitBy, LogState]),
adjust_log_writes(false),
case LogState of
already_dumped ->
mnesia_recover:allow_garb(),
dumped;
{needs_dump, Diff} ->
U = mnesia_monitor:get_env(dump_log_update_in_place),
Cont = mnesia_log:init_log_dump(),
mnesia_recover:sync(),
try do_perform_dump(Cont, U, InitBy, Regulator, undefined) of
ok ->
?eval_debug_fun({?MODULE, post_dump}, [InitBy]),
case mnesia_monitor:use_dir() of
true ->
mnesia_recover:dump_decision_tab();
false ->
mnesia_log:purge_some_logs()
end,
mnesia_recover:allow_garb(),
%% And now to the crucial point...
mnesia_log:confirm_log_dump(Diff)
catch exit:Reason when Reason =/= fatal ->
case mnesia_monitor:get_env(auto_repair) of
true ->
mnesia_lib:important(error, Reason),
%% Ignore rest of the log
mnesia_log:confirm_log_dump(Diff);
false ->
fatal(error, Reason)
end
end;
{error, Reason} ->
{error, {"Cannot prepare log dump", Reason}}
end.
scan_decisions(Fname, InitBy, Regulator) ->
Exists = mnesia_lib:exists(Fname),
case Exists of
false ->
ok;
true ->
Header = mnesia_log:trans_log_header(),
Name = previous_log,
mnesia_log:open_log(Name, Header, Fname, Exists,
mnesia_monitor:get_env(auto_repair), read_only),
Cont = start,
try
do_perform_dump(Cont, false, InitBy, Regulator, undefined)
catch exit:Reason when Reason =/= fatal ->
{error, Reason}
after mnesia_log:close_log(Name)
end
end.
do_perform_dump(Cont, InPlace, InitBy, Regulator, OldVersion) ->
case mnesia_log:chunk_log(Cont) of
{C2, Recs} ->
try insert_recs(Recs, InPlace, InitBy, Regulator, OldVersion) of
Version ->
do_perform_dump(C2, InPlace, InitBy, Regulator, Version)
catch _:R when R =/= fatal ->
ST = erlang:get_stacktrace(),
Reason = {"Transaction log dump error: ~p~n", [{R, ST}]},
close_files(InPlace, {error, Reason}, InitBy),
exit(Reason)
end;
eof ->
close_files(InPlace, ok, InitBy),
erase(mnesia_dumper_dets),
ok
end.
insert_recs([Rec | Recs], InPlace, InitBy, Regulator, LogV) ->
regulate(Regulator),
case insert_rec(Rec, InPlace, InitBy, LogV) of
LogH when is_record(LogH, log_header) ->
insert_recs(Recs, InPlace, InitBy, Regulator, LogH#log_header.log_version);
_ ->
insert_recs(Recs, InPlace, InitBy, Regulator, LogV)
end;
insert_recs([], _InPlace, _InitBy, _Regulator, Version) ->
Version.
insert_rec(Rec, _InPlace, scan_decisions, _LogV) ->
if
is_record(Rec, commit) ->
ignore;
is_record(Rec, log_header) ->
ignore;
true ->
mnesia_recover:note_log_decision(Rec, scan_decisions)
end;
insert_rec(Rec, InPlace, InitBy, LogV) when is_record(Rec, commit) ->
%% Determine the Outcome of the transaction and recover it
D = Rec#commit.decision,
case mnesia_recover:wait_for_decision(D, InitBy) of
{Tid, committed} ->
do_insert_rec(Tid, Rec, InPlace, InitBy, LogV);
{Tid, aborted} ->
case InitBy of
startup ->
mnesia_schema:undo_prepare_commit(Tid, Rec);
_ ->
ok
end
end;
insert_rec(H, _InPlace, _InitBy, _LogV) when is_record(H, log_header) ->
CurrentVersion = mnesia_log:version(),
if
H#log_header.log_kind /= trans_log ->
exit({"Bad kind of transaction log", H});
H#log_header.log_version == CurrentVersion ->
ok;
H#log_header.log_version == "4.2" ->
ok;
H#log_header.log_version == "4.1" ->
ok;
H#log_header.log_version == "4.0" ->
ok;
true ->
fatal("Bad version of transaction log: ~p~n", [H])
end,
H;
insert_rec(_Rec, _InPlace, _InitBy, _LogV) ->
ok.
do_insert_rec(Tid, Rec, InPlace, InitBy, LogV) ->
case Rec#commit.schema_ops of
[] ->
ignore;
SchemaOps ->
case val({schema, storage_type}) of
ram_copies ->
insert_ops(Tid, schema_ops, SchemaOps, InPlace, InitBy, LogV);
Storage ->
true = open_files(schema, Storage, InPlace, InitBy),
insert_ops(Tid, schema_ops, SchemaOps, InPlace, InitBy, LogV)
end
end,
D = Rec#commit.disc_copies,
insert_ops(Tid, disc_copies, D, InPlace, InitBy, LogV),
case InitBy of
startup ->
DO = Rec#commit.disc_only_copies,
insert_ops(Tid, disc_only_copies, DO, InPlace, InitBy, LogV);
_ ->
ignore
end.
update(_Tid, [], _DumperMode) ->
dumped;
update(Tid, SchemaOps, DumperMode) ->
UseDir = mnesia_monitor:use_dir(),
Res = perform_update(Tid, SchemaOps, DumperMode, UseDir),
mnesia_controller:release_schema_commit_lock(),
Res.
perform_update(_Tid, _SchemaOps, mandatory, true) ->
%% Force a dump of the transaction log in order to let the
%% dumper perform needed updates
InitBy = schema_update,
?eval_debug_fun({?MODULE, dump_schema_op}, [InitBy]),
opt_dump_log(InitBy);
perform_update(Tid, SchemaOps, _DumperMode, _UseDir) ->
%% No need for a full transaction log dump.
%% Ignore the log file and perform only perform
%% the corresponding updates.
InitBy = fast_schema_update,
InPlace = mnesia_monitor:get_env(dump_log_update_in_place),
try insert_ops(Tid, schema_ops, SchemaOps, InPlace, InitBy,
mnesia_log:version()),
?eval_debug_fun({?MODULE, post_dump}, [InitBy]),
close_files(InPlace, ok, InitBy),
ok
catch _:Reason when Reason =/= fatal ->
ST = erlang:get_stacktrace(),
Error = {error, {"Schema update error", {Reason, ST}}},
close_files(InPlace, Error, InitBy),
fatal("Schema update error ~p ~p", [{Reason,ST}, SchemaOps])
end.
insert_ops(_Tid, _Storage, [], _InPlace, _InitBy, _) -> ok;
insert_ops(Tid, Storage, [Op], InPlace, InitBy, Ver) when Ver >= "4.3"->
insert_op(Tid, Storage, Op, InPlace, InitBy),
ok;
insert_ops(Tid, Storage, [Op | Ops], InPlace, InitBy, Ver) when Ver >= "4.3"->
insert_op(Tid, Storage, Op, InPlace, InitBy),
insert_ops(Tid, Storage, Ops, InPlace, InitBy, Ver);
insert_ops(Tid, Storage, [Op | Ops], InPlace, InitBy, Ver) when Ver < "4.3" ->
insert_ops(Tid, Storage, Ops, InPlace, InitBy, Ver),
insert_op(Tid, Storage, Op, InPlace, InitBy).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Normal ops
disc_insert(_Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy) ->
case open_files(Tab, Storage, InPlace, InitBy) of
true ->
case Storage of
disc_copies when Tab /= schema ->
mnesia_log:append({?MODULE,Tab}, {{Tab, Key}, Val, Op}),
ok;
_ ->
dets_insert(Op,Tab,Key,Val)
end;
false ->
ignore
end.
%% To fix update_counter so that it behaves better.
%% i.e. if nothing have changed in tab except update_counter
%% trust that the value in the dets file is correct.
%% Otherwise we will get a double increment.
%% This is perfect but update_counter is a dirty op.
dets_insert(Op,Tab,Key,Val) ->
case Op of
write ->
dets_updated(Tab,Key),
ok = dets:insert(Tab, Val);
delete ->
dets_updated(Tab,Key),
ok = dets:delete(Tab, Key);
update_counter ->
case dets_incr_counter(Tab,Key) of
true ->
{RecName, Incr} = Val,
try _ = dets:update_counter(Tab, Key, Incr)
catch error:_ when Incr < 0 ->
Zero = {RecName, Key, 0},
ok = dets:insert(Tab, Zero);
error:_ ->
Init = {RecName, Key, Incr},
ok = dets:insert(Tab, Init)
end;
false -> ok
end;
delete_object ->
dets_updated(Tab,Key),
ok = dets:delete_object(Tab, Val);
clear_table ->
dets_cleared(Tab),
ok = dets:delete_all_objects(Tab)
end.
dets_updated(Tab,Key) ->
case get(mnesia_dumper_dets) of
undefined ->
Empty = gb_trees:empty(),
Tree = gb_trees:insert(Tab, gb_sets:singleton(Key), Empty),
put(mnesia_dumper_dets, Tree);
Tree ->
case gb_trees:lookup(Tab,Tree) of
{value, cleared} -> ignore;
{value, Set} ->
T = gb_trees:update(Tab, gb_sets:add(Key, Set), Tree),
put(mnesia_dumper_dets, T);
none ->
T = gb_trees:insert(Tab, gb_sets:singleton(Key), Tree),
put(mnesia_dumper_dets, T)
end
end.
dets_incr_counter(Tab,Key) ->
case get(mnesia_dumper_dets) of
undefined -> false;
Tree ->
case gb_trees:lookup(Tab,Tree) of
{value, cleared} -> true;
{value, Set} -> gb_sets:is_member(Key, Set);
none -> false
end
end.
dets_cleared(Tab) ->
case get(mnesia_dumper_dets) of
undefined ->
Empty = gb_trees:empty(),
Tree = gb_trees:insert(Tab, cleared, Empty),
put(mnesia_dumper_dets, Tree);
Tree ->
case gb_trees:lookup(Tab,Tree) of
{value, cleared} -> ignore;
_ ->
T = gb_trees:enter(Tab, cleared, Tree),
put(mnesia_dumper_dets, T)
end
end.
insert(Tid, Storage, Tab, Key, [Val | Tail], Op, InPlace, InitBy) ->
insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy),
insert(Tid, Storage, Tab, Key, Tail, Op, InPlace, InitBy);
insert(_Tid, _Storage, _Tab, _Key, [], _Op, _InPlace, _InitBy) ->
ok;
insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy) ->
Item = {{Tab, Key}, Val, Op},
case InitBy of
startup ->
disc_insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy);
_ when Storage == ram_copies ->
mnesia_tm:do_update_op(Tid, Storage, Item),
Snmp = mnesia_tm:prepare_snmp(Tab, Key, [Item]),
mnesia_tm:do_snmp(Tid, Snmp);
_ when Storage == disc_copies ->
disc_insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy),
mnesia_tm:do_update_op(Tid, Storage, Item),
Snmp = mnesia_tm:prepare_snmp(Tab, Key, [Item]),
mnesia_tm:do_snmp(Tid, Snmp);
_ when Storage == disc_only_copies ->
mnesia_tm:do_update_op(Tid, Storage, Item),
Snmp = mnesia_tm:prepare_snmp(Tab, Key, [Item]),
mnesia_tm:do_snmp(Tid, Snmp);
_ when Storage == unknown ->
ignore
end.
disc_delete_table(Tab, Storage) ->
case mnesia_monitor:use_dir() of
true ->
if
Storage == disc_only_copies; Tab == schema ->
mnesia_monitor:unsafe_close_dets(Tab),
Dat = mnesia_lib:tab2dat(Tab),
file:delete(Dat),
ok;
true ->
DclFile = mnesia_lib:tab2dcl(Tab),
case get({?MODULE,Tab}) of
{opened_dumper, dcl} ->
del_opened_tab(Tab),
mnesia_log:unsafe_close_log(Tab);
_ ->
ok
end,
file:delete(DclFile),
DcdFile = mnesia_lib:tab2dcd(Tab),
file:delete(DcdFile),
ok
end,
erase({?MODULE, Tab}),
ok;
false ->
ok
end.
disc_delete_indecies(_Tab, _Cs, Storage) when Storage /= disc_only_copies ->
ok;
disc_delete_indecies(Tab, Cs, disc_only_copies) ->
Indecies = Cs#cstruct.index,
mnesia_index:del_transient(Tab, Indecies, disc_only_copies).
insert_op(Tid, Storage, {{Tab, Key}, Val, Op}, InPlace, InitBy) ->
%% Propagate to disc only
disc_insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy);
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% NOTE that all operations below will only
%% be performed if the dump is initiated by
%% startup or fast_schema_update
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
insert_op(_Tid, schema_ops, _OP, _InPlace, Initby)
when Initby /= startup,
Initby /= fast_schema_update,
Initby /= schema_update ->
ignore;
insert_op(Tid, _, {op, rec, Storage, Item}, InPlace, InitBy) ->
{{Tab, Key}, ValList, Op} = Item,
insert(Tid, Storage, Tab, Key, ValList, Op, InPlace, InitBy);
insert_op(Tid, _, {op, change_table_copy_type, N, FromS, ToS, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
Val = mnesia_schema:insert_cstruct(Tid, Cs, true), % Update ram only
{schema, Tab, _} = Val,
case lists:member(N, val({current, db_nodes})) of
true when InitBy /= startup ->
mnesia_controller:add_active_replica(Tab, N, Cs);
_ ->
ignore
end,
if
N == node() ->
Dmp = mnesia_lib:tab2dmp(Tab),
Dat = mnesia_lib:tab2dat(Tab),
Dcd = mnesia_lib:tab2dcd(Tab),
Dcl = mnesia_lib:tab2dcl(Tab),
case {FromS, ToS} of
{ram_copies, disc_copies} when Tab == schema ->
ok = ensure_rename(Dmp, Dat);
{ram_copies, disc_copies} ->
file:delete(Dcl),
ok = ensure_rename(Dmp, Dcd);
{disc_copies, ram_copies} when Tab == schema ->
mnesia_lib:set(use_dir, false),
mnesia_monitor:unsafe_close_dets(Tab),
ok = file:delete(Dat);
{disc_copies, ram_copies} ->
_ = file:delete(Dcl),
_ = file:delete(Dcd),
ok;
{ram_copies, disc_only_copies} ->
ok = ensure_rename(Dmp, Dat),
true = open_files(Tab, disc_only_copies, InPlace, InitBy),
%% ram_delete_table must be done before init_indecies,
%% it uses info which is reset in init_indecies,
%% it doesn't matter, because init_indecies don't use
%% the ram replica of the table when creating the disc
%% index; Could be improved :)
mnesia_schema:ram_delete_table(Tab, FromS),
PosList = Cs#cstruct.index,
mnesia_index:init_indecies(Tab, disc_only_copies, PosList);
{disc_only_copies, ram_copies} ->
mnesia_monitor:unsafe_close_dets(Tab),
disc_delete_indecies(Tab, Cs, disc_only_copies),
case InitBy of
startup ->
ignore;
_ ->
mnesia_controller:get_disc_copy(Tab),
ok
end,
disc_delete_table(Tab, disc_only_copies);
{disc_copies, disc_only_copies} ->
ok = ensure_rename(Dmp, Dat),
true = open_files(Tab, disc_only_copies, InPlace, InitBy),
mnesia_schema:ram_delete_table(Tab, FromS),
PosList = Cs#cstruct.index,
mnesia_index:init_indecies(Tab, disc_only_copies, PosList),
_ = file:delete(Dcl),
_ = file:delete(Dcd),
ok;
{disc_only_copies, disc_copies} ->
mnesia_monitor:unsafe_close_dets(Tab),
disc_delete_indecies(Tab, Cs, disc_only_copies),
case InitBy of
startup ->
ignore;
_ ->
mnesia_log:ets2dcd(Tab),
mnesia_controller:get_disc_copy(Tab),
disc_delete_table(Tab, disc_only_copies)
end
end;
true ->
ignore
end,
S = val({schema, storage_type}),
disc_insert(Tid, S, schema, Tab, Val, write, InPlace, InitBy);
insert_op(Tid, _, {op, transform, _Fun, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
case mnesia_lib:cs_to_storage_type(node(), Cs) of
disc_copies ->
open_dcl(Cs#cstruct.name);
_ ->
ignore
end,
insert_cstruct(Tid, Cs, true, InPlace, InitBy);
%%% Operations below this are handled without using the logg.
insert_op(Tid, _, {op, restore_recreate, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
Tab = Cs#cstruct.name,
Type = Cs#cstruct.type,
Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
%% Delete all possibly existing files and tables
disc_delete_table(Tab, Storage),
disc_delete_indecies(Tab, Cs, Storage),
case InitBy of
startup ->
ignore;
_ ->
case ?catch_val({Tab, cstruct}) of
{'EXIT', _} -> ignore;
_ ->
mnesia_schema:ram_delete_table(Tab, Storage),
mnesia_checkpoint:tm_del_copy(Tab, node())
end
end,
StorageProps = Cs#cstruct.storage_properties,
%% And create new ones..
if
(InitBy == startup) or (Storage == unknown) ->
ignore;
Storage == ram_copies ->
EtsProps = proplists:get_value(ets, StorageProps, []),
Args = [{keypos, 2}, public, named_table, Type | EtsProps],
mnesia_monitor:mktab(Tab, Args);
Storage == disc_copies ->
EtsProps = proplists:get_value(ets, StorageProps, []),
Args = [{keypos, 2}, public, named_table, Type | EtsProps],
mnesia_monitor:mktab(Tab, Args),
File = mnesia_lib:tab2dcd(Tab),
FArg = [{file, File}, {name, {mnesia,create}},
{repair, false}, {mode, read_write}],
{ok, Log} = mnesia_monitor:open_log(FArg),
mnesia_monitor:unsafe_close_log(Log);
Storage == disc_only_copies ->
File = mnesia_lib:tab2dat(Tab),
file:delete(File),
DetsProps = proplists:get_value(dets, StorageProps, []),
Args = [{file, mnesia_lib:tab2dat(Tab)},
{type, mnesia_lib:disk_type(Tab, Type)},
{keypos, 2},
{repair, mnesia_monitor:get_env(auto_repair)}
| DetsProps ],
mnesia_monitor:open_dets(Tab, Args)
end,
insert_op(Tid, ignore, {op, create_table, TabDef}, InPlace, InitBy);
insert_op(Tid, _, {op, create_table, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
insert_cstruct(Tid, Cs, false, InPlace, InitBy),
Tab = Cs#cstruct.name,
Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
StorageProps = Cs#cstruct.storage_properties,
case InitBy of
startup ->
case Storage of
unknown ->
ignore;
ram_copies ->
ignore;
disc_copies ->
Dcd = mnesia_lib:tab2dcd(Tab),
case mnesia_lib:exists(Dcd) of
true -> ignore;
false ->
mnesia_log:open_log(temp,
mnesia_log:dcd_log_header(),
Dcd,
false,
false,
read_write),
mnesia_log:unsafe_close_log(temp)
end;
_ ->
DetsProps = proplists:get_value(dets, StorageProps, []),
Args = [{file, mnesia_lib:tab2dat(Tab)},
{type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)},
{keypos, 2},
{repair, mnesia_monitor:get_env(auto_repair)}
| DetsProps ],
case mnesia_monitor:open_dets(Tab, Args) of
{ok, _} ->
mnesia_monitor:unsafe_close_dets(Tab);
{error, Error} ->
exit({"Failed to create dets table", Error})
end
end;
_ ->
Copies = mnesia_lib:copy_holders(Cs),
Active = mnesia_lib:intersect(Copies, val({current, db_nodes})),
[mnesia_controller:add_active_replica(Tab, N, Cs) || N <- Active],
case Storage of
unknown ->
mnesia_lib:unset({Tab, create_table}),
case Cs#cstruct.local_content of
true ->
ignore;
false ->
mnesia_lib:set_remote_where_to_read(Tab)
end;
_ ->
case Cs#cstruct.local_content of
true ->
mnesia_lib:set_local_content_whereabouts(Tab);
false ->
mnesia_lib:set({Tab, where_to_read}, node())
end,
case Storage of
ram_copies ->
ignore;
_ ->
%% Indecies are still created by loader
disc_delete_indecies(Tab, Cs, Storage)
%% disc_delete_table(Tab, Storage)
end,
%% Update whereabouts and create table
mnesia_controller:create_table(Tab),
mnesia_lib:unset({Tab, create_table})
end
end;
insert_op(_Tid, _, {op, dump_table, Size, TabDef}, _InPlace, _InitBy) ->
case Size of
unknown ->
ignore;
_ ->
Cs = mnesia_schema:list2cs(TabDef),
Tab = Cs#cstruct.name,
Dmp = mnesia_lib:tab2dmp(Tab),
Dat = mnesia_lib:tab2dcd(Tab),
case Size of
0 ->
%% Assume that table files already are closed
file:delete(Dmp),
file:delete(Dat);
_ ->
ok = ensure_rename(Dmp, Dat)
end
end;
insert_op(Tid, _, {op, delete_table, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
Tab = Cs#cstruct.name,
case mnesia_lib:cs_to_storage_type(node(), Cs) of
unknown ->
ignore;
Storage ->
disc_delete_table(Tab, Storage),
disc_delete_indecies(Tab, Cs, Storage),
case InitBy of
startup ->
ignore;
_ ->
mnesia_schema:ram_delete_table(Tab, Storage),
mnesia_checkpoint:tm_del_copy(Tab, node())
end
end,
delete_cstruct(Tid, Cs, InPlace, InitBy);
insert_op(Tid, _, {op, clear_table, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
Tab = Cs#cstruct.name,
case mnesia_lib:cs_to_storage_type(node(), Cs) of
unknown ->
ignore;
Storage ->
Oid = '_', %%val({Tab, wild_pattern}),
if Storage == disc_copies ->
open_dcl(Cs#cstruct.name);
true ->
ignore
end,
%% Need to catch this, it crashes on ram_copies if
%% the op comes before table is loaded at startup.
?CATCH(insert(Tid, Storage, Tab, '_', Oid, clear_table, InPlace, InitBy))
end;
insert_op(Tid, _, {op, merge_schema, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
case Cs#cstruct.name of
schema ->
%% If we bootstrap an empty (diskless) mnesia from another node
%% we might have changed the storage_type of schema.
%% I think this is a good place to do it.
Update = fun(NS = {Node,Storage}) ->
case mnesia_lib:cs_to_storage_type(Node, Cs) of
Storage -> NS;
disc_copies when Node == node() ->
Dir = mnesia_lib:dir(),
ok = mnesia_schema:opt_create_dir(true, Dir),
mnesia_schema:purge_dir(Dir, []),
mnesia_log:purge_all_logs(),
mnesia_lib:set(use_dir, true),
mnesia_log:init(),
Ns = val({current, db_nodes}),
F = fun(U) -> mnesia_recover:log_mnesia_up(U) end,
lists:foreach(F, Ns),
raw_named_dump_table(schema, dat),
temp_set_master_nodes(),
{Node,disc_copies};
CSstorage ->
{Node,CSstorage}
end
end,
W2C0 = val({schema, where_to_commit}),
W2C = case W2C0 of
{blocked, List} ->
{blocked,lists:map(Update,List)};
List ->
lists:map(Update,List)
end,
if W2C == W2C0 -> ignore;
true -> mnesia_lib:set({schema, where_to_commit}, W2C)
end;
_ ->
ignore
end,
insert_cstruct(Tid, Cs, false, InPlace, InitBy);
insert_op(Tid, _, {op, del_table_copy, Storage, Node, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
Tab = Cs#cstruct.name,
if
Tab == schema, Storage == ram_copies ->
insert_cstruct(Tid, Cs, true, InPlace, InitBy);
Tab /= schema ->
mnesia_controller:del_active_replica(Tab, Node),
mnesia_lib:del({Tab, Storage}, Node),
if
Node == node() ->
case Cs#cstruct.local_content of
true -> mnesia_lib:set({Tab, where_to_read}, nowhere);
false -> mnesia_lib:set_remote_where_to_read(Tab)
end,
mnesia_lib:del({schema, local_tables}, Tab),
mnesia_lib:set({Tab, storage_type}, unknown),
insert_cstruct(Tid, Cs, true, InPlace, InitBy),
disc_delete_table(Tab, Storage),
disc_delete_indecies(Tab, Cs, Storage),
mnesia_schema:ram_delete_table(Tab, Storage),
mnesia_checkpoint:tm_del_copy(Tab, Node);
true ->
case val({Tab, where_to_read}) of
Node ->
mnesia_lib:set_remote_where_to_read(Tab);
_ ->
ignore
end,
insert_cstruct(Tid, Cs, true, InPlace, InitBy)
end
end;
insert_op(Tid, _, {op, add_table_copy, _Storage, _Node, TabDef}, InPlace, InitBy) ->
%% During prepare commit, the files was created
%% and the replica was announced
Cs = mnesia_schema:list2cs(TabDef),
insert_cstruct(Tid, Cs, true, InPlace, InitBy);
insert_op(Tid, _, {op, add_snmp, _Us, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
insert_cstruct(Tid, Cs, true, InPlace, InitBy);
insert_op(Tid, _, {op, del_snmp, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
Tab = Cs#cstruct.name,
Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
if
InitBy /= startup,
Storage /= unknown ->
case ?catch_val({Tab, {index, snmp}}) of
{'EXIT', _} ->
ignore;
Stab ->
mnesia_snmp_hook:delete_table(Tab, Stab),
mnesia_lib:unset({Tab, {index, snmp}})
end;
true ->
ignore
end,
insert_cstruct(Tid, Cs, true, InPlace, InitBy);
insert_op(Tid, _, {op, add_index, Pos, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
Tab = insert_cstruct(Tid, Cs, true, InPlace, InitBy),
Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
case InitBy of
startup when Storage == disc_only_copies ->
true = open_files(Tab, Storage, InPlace, InitBy),
mnesia_index:init_indecies(Tab, Storage, [Pos]);
startup ->
ignore;
_ ->
case val({Tab,where_to_read}) of
nowhere -> ignore;
_ ->
mnesia_index:init_indecies(Tab, Storage, [Pos])
end
end;
insert_op(Tid, _, {op, del_index, Pos, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
Tab = Cs#cstruct.name,
Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
case InitBy of
startup when Storage == disc_only_copies ->
mnesia_index:del_index_table(Tab, Storage, Pos);
startup ->
ignore;
_ ->
mnesia_index:del_index_table(Tab, Storage, Pos)
end,
insert_cstruct(Tid, Cs, true, InPlace, InitBy);
insert_op(Tid, _, {op, change_table_access_mode,TabDef, _OldAccess, _Access}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
case InitBy of
startup -> ignore;
_ -> mnesia_controller:change_table_access_mode(Cs)
end,
insert_cstruct(Tid, Cs, true, InPlace, InitBy);
insert_op(Tid, _, {op, change_table_majority,TabDef, _OldAccess, _Access}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
case InitBy of
startup -> ignore;
_ -> mnesia_controller:change_table_majority(Cs)
end,
insert_cstruct(Tid, Cs, true, InPlace, InitBy);
insert_op(Tid, _, {op, change_table_load_order, TabDef, _OldLevel, _Level}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
insert_cstruct(Tid, Cs, true, InPlace, InitBy);
insert_op(Tid, _, {op, delete_property, TabDef, PropKey}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
Tab = Cs#cstruct.name,
mnesia_lib:unset({Tab, user_property, PropKey}),
insert_cstruct(Tid, Cs, true, InPlace, InitBy);
insert_op(Tid, _, {op, write_property, TabDef, _Prop}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
insert_cstruct(Tid, Cs, true, InPlace, InitBy);
insert_op(Tid, _, {op, change_table_frag, _Change, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
insert_cstruct(Tid, Cs, true, InPlace, InitBy).
open_files(Tab, Storage, UpdateInPlace, InitBy)
when Storage /= unknown, Storage /= ram_copies ->
case get({?MODULE, Tab}) of
undefined ->
case ?catch_val({Tab, setorbag}) of
{'EXIT', _} ->
false;
Type ->
case Storage of
disc_copies when Tab /= schema ->
Bool = open_disc_copies(Tab, InitBy),
Bool;
_ ->
Props = val({Tab, storage_properties}),
DetsProps = proplists:get_value(dets, Props, []),
Fname = prepare_open(Tab, UpdateInPlace),
Args = [{file, Fname},
{keypos, 2},
{repair, mnesia_monitor:get_env(auto_repair)},
{type, mnesia_lib:disk_type(Tab, Type)}
| DetsProps],
{ok, _} = mnesia_monitor:open_dets(Tab, Args),
put({?MODULE, Tab}, {opened_dumper, dat}),
true
end
end;
already_dumped ->
false;
{opened_dumper, _} ->
true
end;
open_files(_Tab, _Storage, _UpdateInPlace, _InitBy) ->
false.
open_disc_copies(Tab, InitBy) ->
DumpEts = needs_dump_ets(Tab),
if
DumpEts == false; InitBy == startup ->
DclF = mnesia_lib:tab2dcl(Tab),
mnesia_log:open_log({?MODULE,Tab},
mnesia_log:dcl_log_header(),
DclF,
mnesia_lib:exists(DclF),
mnesia_monitor:get_env(auto_repair),
read_write),
put({?MODULE, Tab}, {opened_dumper, dcl}),
true;
true ->
mnesia_log:ets2dcd(Tab),
put({?MODULE, Tab}, already_dumped),
false
end.
needs_dump_ets(Tab) ->
DclF = mnesia_lib:tab2dcl(Tab),
case file:read_file_info(DclF) of
{error, enoent} ->
false;
{ok, DclInfo} ->
DcdF = mnesia_lib:tab2dcd(Tab),
case file:read_file_info(DcdF) of
{error, Reason} ->
mnesia_lib:dbg_out("File ~p info_error ~p ~n",
[DcdF, Reason]),
true;
{ok, DcdInfo} ->
Mul = case ?catch_val(dc_dump_limit) of
{'EXIT', _} -> ?DumpToEtsMultiplier;
Val -> Val
end,
DcdInfo#file_info.size =< (DclInfo#file_info.size * Mul)
end
end.
%% Always opens the dcl file for writing overriding already_dumped
%% mechanismen, used for schema transactions.
open_dcl(Tab) ->
case get({?MODULE, Tab}) of
{opened_dumper, _} ->
true;
_ -> %% undefined or already_dumped
DclF = mnesia_lib:tab2dcl(Tab),
mnesia_log:open_log({?MODULE,Tab},
mnesia_log:dcl_log_header(),
DclF,
mnesia_lib:exists(DclF),
mnesia_monitor:get_env(auto_repair),
read_write),
put({?MODULE, Tab}, {opened_dumper, dcl}),
true
end.
prepare_open(Tab, UpdateInPlace) ->
Dat = mnesia_lib:tab2dat(Tab),
case UpdateInPlace of
true ->
Dat;
false ->
Tmp = mnesia_lib:tab2tmp(Tab),
try ok = mnesia_lib:copy_file(Dat, Tmp)
catch error:Error ->
fatal("Cannot copy dets file ~p to ~p: ~p~n",
[Dat, Tmp, Error])
end,
Tmp
end.
del_opened_tab(Tab) ->
erase({?MODULE, Tab}).
close_files(UpdateInPlace, Outcome, InitBy) -> % Update in place
close_files(UpdateInPlace, Outcome, InitBy, get()).
close_files(InPlace, Outcome, InitBy, [{{?MODULE, Tab}, already_dumped} | Tail]) ->
erase({?MODULE, Tab}),
close_files(InPlace, Outcome, InitBy, Tail);
close_files(InPlace, Outcome, InitBy, [{{?MODULE, Tab}, {opened_dumper, Type}} | Tail]) ->
erase({?MODULE, Tab}),
case val({Tab, storage_type}) of
disc_only_copies when InitBy /= startup ->
ignore;
disc_copies when Tab /= schema ->
mnesia_log:close_log({?MODULE,Tab});
Storage ->
do_close(InPlace, Outcome, Tab, Type, Storage)
end,
close_files(InPlace, Outcome, InitBy, Tail);
close_files(InPlace, Outcome, InitBy, [_ | Tail]) ->
close_files(InPlace, Outcome, InitBy, Tail);
close_files(_, _, _InitBy, []) ->
ok.
%% If storage is unknown during close clean up files, this can happen if timing
%% is right and dirty_write conflicts with schema operations.
do_close(_, _, Tab, dcl, unknown) ->
mnesia_log:close_log({?MODULE,Tab}),
file:delete(mnesia_lib:tab2dcl(Tab));
do_close(_, _, Tab, dcl, _) -> %% To be safe, can it happen?
mnesia_log:close_log({?MODULE,Tab});
do_close(InPlace, Outcome, Tab, dat, Storage) ->
mnesia_monitor:close_dets(Tab),
if
Storage == unknown, InPlace == true ->
file:delete(mnesia_lib:tab2dat(Tab));
InPlace == true ->
%% Update in place
ok;
Outcome == ok, Storage /= unknown ->
%% Success: swap tmp files with dat files
TabDat = mnesia_lib:tab2dat(Tab),
ok = file:rename(mnesia_lib:tab2tmp(Tab), TabDat);
true ->
file:delete(mnesia_lib:tab2tmp(Tab))
end.
ensure_rename(From, To) ->
case mnesia_lib:exists(From) of
true ->
file:rename(From, To);
false ->
case mnesia_lib:exists(To) of
true ->
ok;
false ->
{error, {rename_failed, From, To}}
end
end.
insert_cstruct(Tid, Cs, KeepWhereabouts, InPlace, InitBy) ->
Val = mnesia_schema:insert_cstruct(Tid, Cs, KeepWhereabouts),
{schema, Tab, _} = Val,
S = val({schema, storage_type}),
disc_insert(Tid, S, schema, Tab, Val, write, InPlace, InitBy),
Tab.
delete_cstruct(Tid, Cs, InPlace, InitBy) ->
Val = mnesia_schema:delete_cstruct(Tid, Cs),
{schema, Tab, _} = Val,
S = val({schema, storage_type}),
disc_insert(Tid, S, schema, Tab, Val, delete, InPlace, InitBy),
Tab.
temp_set_master_nodes() ->
Tabs = val({schema, local_tables}),
Masters = [{Tab, (val({Tab, disc_copies}) ++
val({Tab, ram_copies}) ++
val({Tab, disc_only_copies})) -- [node()]}
|| Tab <- Tabs],
%% UseDir = false since we don't want to remember these
%% masternodes and we are running (really soon anyway) since we want this
%% to be known during table loading.
mnesia_recover:log_master_nodes(Masters, false, yes),
ok.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Raw dump of table. Dumper must have unique access to the ets table.
raw_named_dump_table(Tab, Ftype) ->
case mnesia_monitor:use_dir() of
true ->
mnesia_lib:lock_table(Tab),
TmpFname = mnesia_lib:tab2tmp(Tab),
Fname =
case Ftype of
dat -> mnesia_lib:tab2dat(Tab);
dmp -> mnesia_lib:tab2dmp(Tab)
end,
file:delete(TmpFname),
file:delete(Fname),
TabSize = ?ets_info(Tab, size),
TabRef = Tab,
DiskType = mnesia_lib:disk_type(Tab),
Args = [{file, TmpFname},
{keypos, 2},
%% {ram_file, true},
{estimated_no_objects, TabSize + 256},
{repair, mnesia_monitor:get_env(auto_repair)},
{type, DiskType}],
case mnesia_lib:dets_sync_open(TabRef, Args) of
{ok, TabRef} ->
Storage = ram_copies,
mnesia_lib:db_fixtable(Storage, Tab, true),
try
ok = raw_dump_table(TabRef, Tab),
ok = file:rename(TmpFname, Fname)
catch _:Reason ->
?SAFE(file:delete(TmpFname)),
exit({"Dump of table to disc failed", Reason})
after
mnesia_lib:db_fixtable(Storage, Tab, false),
mnesia_lib:dets_sync_close(Tab),
mnesia_lib:unlock_table(Tab)
end;
{error, Reason} ->
mnesia_lib:unlock_table(Tab),
exit({"Open of file before dump to disc failed", Reason})
end;
false ->
exit({has_no_disc, node()})
end.
raw_dump_table(DetsRef, EtsRef) ->
dets:from_ets(DetsRef, EtsRef).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Load regulator
%%
%% This is a poor mans substitute for a fair scheduler algorithm
%% in the Erlang emulator. The mnesia_dumper process performs many
%% costly BIF invokations and must pay for this. But since the
%% Emulator does not handle this properly we must compensate for
%% this with some form of load regulation of ourselves in order to
%% not steal all computation power in the Erlang Emulator ans make
%% other processes starve. Hopefully this is a temporary solution.
start_regulator() ->
case mnesia_monitor:get_env(dump_log_load_regulation) of
false ->
nopid;
true ->
N = ?REGULATOR_NAME,
case mnesia_monitor:start_proc(N, ?MODULE, regulator_init, [self()]) of
{ok, Pid} ->
Pid;
{error, Reason} ->
fatal("Failed to start ~n: ~p~n", [N, Reason])
end
end.
regulator_init(Parent) ->
%% No need for trapping exits.
%% Using low priority causes the regulation
process_flag(priority, low),
register(?REGULATOR_NAME, self()),
proc_lib:init_ack(Parent, {ok, self()}),
regulator_loop().
regulator_loop() ->
receive
{regulate, From} ->
From ! {regulated, self()},
regulator_loop();
{stop, From} ->
From ! {stopped, self()},
exit(normal)
end.
regulate(nopid) ->
ok;
regulate(RegulatorPid) ->
RegulatorPid ! {regulate, self()},
receive
{regulated, RegulatorPid} -> ok
end.
val(Var) ->
case ?catch_val(Var) of
{'EXIT', _} -> mnesia_lib:other_val(Var);
Value -> Value
end.