diff options
author | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
---|---|---|
committer | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
commit | 84adefa331c4159d432d22840663c38f155cd4c1 (patch) | |
tree | bff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/mnesia/src/mnesia_log.erl | |
download | otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2 otp-84adefa331c4159d432d22840663c38f155cd4c1.zip |
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/mnesia/src/mnesia_log.erl')
-rw-r--r-- | lib/mnesia/src/mnesia_log.erl | 1025 |
1 files changed, 1025 insertions, 0 deletions
diff --git a/lib/mnesia/src/mnesia_log.erl b/lib/mnesia/src/mnesia_log.erl new file mode 100644 index 0000000000..00ec4740ee --- /dev/null +++ b/lib/mnesia/src/mnesia_log.erl @@ -0,0 +1,1025 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1996-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% +%% This module administers three kinds of log files: +%% +%% 1 The transaction log +%% mnesia_tm appends to the log (via mnesia_log) at the +%% end of each transaction (or dirty write) and +%% mnesia_dumper reads the log and performs the ops in +%% the dat files. The dump_log is done at startup and +%% at intervals controlled by the user. +%% +%% 2 The mnesia_down log +%% mnesia_tm appends to the log (via mnesia_log) when it +%% realizes that mnesia goes up or down on another node. +%% mnesia_init reads the log (via mnesia_log) at startup. +%% +%% 3 The backup log +%% mnesia_schema produces one tiny log when the schema is +%% initially created. mnesia_schema also reads the log +%% when the user wants tables (possibly incl the schema) +%% to be restored. mnesia_log appends to the log when the +%% user wants to produce a real backup. +%% +%% The actual access to the backup media is performed via the +%% mnesia_backup module for both read and write. mnesia_backup +%% uses the disk_log (*), BUT the user may write an own module +%% with the same interface as mnesia_backup and configure +%% Mnesia so the alternate module performs the actual accesses +%% to the backup media. This means that the user may put the +%% backup on medias that Mnesia does not know about possibly on +%% hosts where Erlang is not running. +%% +%% All these logs have to some extent a common structure. +%% They are all using the disk_log module (*) for the basic +%% file structure. The disk_log has a repair feature that +%% can be used to skip erroneous log records if one comes to +%% the conclusion that it is more important to reuse some +%% of the log records than the risque of obtaining inconsistent +%% data. If the data becomes inconsistent it is solely up to the +%% application to make it consistent again. The automatic +%% reparation of the disk_log is very powerful, but use it +%% with extreme care. +%% +%% First in all Mnesia's log file is a mnesia log header. +%% It contains a list with a log_header record as single +%% element. The structure of the log_header may never be +%% changed since it may be written to very old backup files. +%% By holding this record definition stable we can be +%% able to comprahend backups from timepoint 0. It also +%% allows us to use the backup format as an interchange +%% format between Mnesia releases. +%% +%% An op-list is a list of tuples with arity 3. Each tuple +%% has this structure: {Oid, Recs, Op} where Oid is the tuple +%% {Tab, Key}, Recs is a (possibly empty) list of records and +%% Op is an atom. +%% +%% The log file structure for the transaction log is as follows. +%% +%% After the mnesia log section follows an extended record section +%% containing op-lists. There are several values that Op may +%% have, such as write, delete, update_counter, delete_object, +%% and replace. There is no special end of section marker. +%% +%% +-----------------+ +%% | mnesia log head | +%% +-----------------+ +%% | extended record | +%% | section | +%% +-----------------+ +%% +%% The log file structure for the mnesia_down log is as follows. +%% +%% After the mnesia log section follows a mnesia_down section +%% containg lists with yoyo records as single element. +%% +%% +-----------------+ +%% | mnesia log head | +%% +-----------------+ +%% | mnesia_down | +%% | section | +%% +-----------------+ +%% +%% The log file structure for the backup log is as follows. +%% +%% After the mnesia log section follows a schema section +%% containing record lists. A record list is a list of tuples +%% where {schema, Tab} is interpreted as a delete_table(Tab) and +%% {schema, Tab, CreateList} are interpreted as create_table. +%% +%% The record section also contains record lists. In this section +%% {Tab, Key} is interpreted as delete({Tab, Key}) and other tuples +%% as write(Tuple). There is no special end of section marker. +%% +%% +-----------------+ +%% | mnesia log head | +%% +-----------------+ +%% | schema section | +%% +-----------------+ +%% | record section | +%% +-----------------+ +%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-module(mnesia_log). + +-export([ + append/2, + backup/1, + backup/2, + backup_checkpoint/2, + backup_checkpoint/3, + backup_log_header/0, + backup_master/2, + chunk_decision_log/1, + chunk_decision_tab/1, + chunk_log/1, + chunk_log/2, + close_decision_log/0, + close_decision_tab/0, + close_log/1, + unsafe_close_log/1, + confirm_log_dump/1, + confirm_decision_log_dump/0, + previous_log_file/0, + previous_decision_log_file/0, + latest_log_file/0, + decision_log_version/0, + decision_log_file/0, + decision_tab_file/0, + decision_tab_version/0, + dcl_version/0, + dcd_version/0, + ets2dcd/1, + ets2dcd/2, + dcd2ets/1, + dcd2ets/2, + init/0, + init_log_dump/0, + log/1, + slog/1, + log_decision/1, + log_files/0, + open_decision_log/0, + trans_log_header/0, + open_decision_tab/0, + dcl_log_header/0, + dcd_log_header/0, + open_log/4, + open_log/6, + prepare_decision_log_dump/0, + prepare_log_dump/1, + save_decision_tab/1, + purge_all_logs/0, + purge_some_logs/0, + stop/0, + tab_copier/3, + version/0, + view/0, + view/1, + write_trans_log_header/0 + ]). + + +-include("mnesia.hrl"). +-import(mnesia_lib, [val/1, dir/1]). +-import(mnesia_lib, [exists/1, fatal/2, error/2, dbg_out/2]). + +trans_log_header() -> log_header(trans_log, version()). +backup_log_header() -> log_header(backup_log, "1.2"). +decision_log_header() -> log_header(decision_log, decision_log_version()). +decision_tab_header() -> log_header(decision_tab, decision_tab_version()). +dcl_log_header() -> log_header(dcl_log, dcl_version()). +dcd_log_header() -> log_header(dcd_log, dcd_version()). + +log_header(Kind, Version) -> + #log_header{log_version=Version, + log_kind=Kind, + mnesia_version=mnesia:system_info(version), + node=node(), + now=now()}. + +version() -> "4.3". + +decision_log_version() -> "3.0". + +decision_tab_version() -> "1.0". + +dcl_version() -> "1.0". +dcd_version() -> "1.0". + +append(Log, Bin) when is_binary(Bin) -> + disk_log:balog(Log, Bin); +append(Log, Term) -> + disk_log:alog(Log, Term). + +%% Synced append +sappend(Log, Bin) when is_binary(Bin) -> + ok = disk_log:blog(Log, Bin); +sappend(Log, Term) -> + ok = disk_log:log(Log, Term). + +%% Write commit records to the latest_log +log(C) when C#commit.disc_copies == [], + C#commit.disc_only_copies == [], + C#commit.schema_ops == [] -> + ignore; +log(C) -> + case mnesia_monitor:use_dir() of + true -> + if + is_record(C, commit) -> + C2 = C#commit{ram_copies = [], snmp = []}, + append(latest_log, C2); + true -> + %% Either a commit record as binary + %% or some decision related info + append(latest_log, C) + end, + mnesia_dumper:incr_log_writes(); + false -> + ignore + end. + +%% Synced + +slog(C) when C#commit.disc_copies == [], + C#commit.disc_only_copies == [], + C#commit.schema_ops == [] -> + ignore; +slog(C) -> + case mnesia_monitor:use_dir() of + true -> + if + is_record(C, commit) -> + C2 = C#commit{ram_copies = [], snmp = []}, + sappend(latest_log, C2); + true -> + %% Either a commit record as binary + %% or some decision related info + sappend(latest_log, C) + end, + mnesia_dumper:incr_log_writes(); + false -> + ignore + end. + + +%% Stuff related to the file LOG + +%% Returns a list of logfiles. The oldest is first. +log_files() -> [previous_log_file(), + latest_log_file(), + decision_tab_file() + ]. + +latest_log_file() -> dir(latest_log_name()). + +previous_log_file() -> dir("PREVIOUS.LOG"). + +decision_log_file() -> dir(decision_log_name()). + +decision_tab_file() -> dir(decision_tab_name()). + +previous_decision_log_file() -> dir("PDECISION.LOG"). + +latest_log_name() -> "LATEST.LOG". + +decision_log_name() -> "DECISION.LOG". + +decision_tab_name() -> "DECISION_TAB.LOG". + +init() -> + case mnesia_monitor:use_dir() of + true -> + Prev = previous_log_file(), + verify_no_exists(Prev), + + Latest = latest_log_file(), + verify_no_exists(Latest), + + Header = trans_log_header(), + open_log(latest_log, Header, Latest); + false -> + ok + end. + +verify_no_exists(Fname) -> + case exists(Fname) of + false -> + ok; + true -> + fatal("Log file exists: ~p~n", [Fname]) + end. + +open_log(Name, Header, Fname) -> + Exists = exists(Fname), + open_log(Name, Header, Fname, Exists). + +open_log(Name, Header, Fname, Exists) -> + Repair = mnesia_monitor:get_env(auto_repair), + open_log(Name, Header, Fname, Exists, Repair). + +open_log(Name, Header, Fname, Exists, Repair) -> + case Name == previous_log of + true -> + open_log(Name, Header, Fname, Exists, Repair, read_only); + false -> + open_log(Name, Header, Fname, Exists, Repair, read_write) + end. + +open_log(Name, Header, Fname, Exists, Repair, Mode) -> + Args = [{file, Fname}, {name, Name}, {repair, Repair}, {mode, Mode}], +%% io:format("~p:open_log: ~p ~p~n", [?MODULE, Name, Fname]), + case mnesia_monitor:open_log(Args) of + {ok, Log} when Exists == true -> + Log; + {ok, Log} -> + write_header(Log, Header), + Log; + {repaired, Log, _, {badbytes, 0}} when Exists == true -> + Log; + {repaired, Log, _, {badbytes, 0}} -> + write_header(Log, Header), + Log; + {repaired, Log, _Recover, BadBytes} -> + mnesia_lib:important("Data may be missing, log ~p repaired: Lost ~p bytes~n", + [Fname, BadBytes]), + Log; + {error, Reason} when Repair == true -> + file:delete(Fname), + mnesia_lib:important("Data may be missing, Corrupt logfile deleted: ~p, ~p ~n", + [Fname, Reason]), + %% Create a new + open_log(Name, Header, Fname, false, false, read_write); + {error, Reason} -> + fatal("Cannot open log file ~p: ~p~n", [Fname, Reason]) + end. + +write_header(Log, Header) -> + append(Log, Header). + +write_trans_log_header() -> + write_header(latest_log, trans_log_header()). + +stop() -> + case mnesia_monitor:use_dir() of + true -> + close_log(latest_log); + false -> + ok + end. + +close_log(Log) -> +%% io:format("mnesia_log:close_log ~p~n", [Log]), +%% io:format("mnesia_log:close_log ~p~n", [Log]), + case disk_log:sync(Log) of + ok -> ok; + {error, {read_only_mode, Log}} -> + ok; + {error, Reason} -> + mnesia_lib:important("Failed syncing ~p to_disk reason ~p ~n", + [Log, Reason]) + end, + mnesia_monitor:close_log(Log). + +unsafe_close_log(Log) -> +%% io:format("mnesia_log:close_log ~p~n", [Log]), + mnesia_monitor:unsafe_close_log(Log). + + +purge_some_logs() -> + mnesia_monitor:unsafe_close_log(latest_log), + file:delete(latest_log_file()), + file:delete(decision_tab_file()). + +purge_all_logs() -> + file:delete(previous_log_file()), + file:delete(latest_log_file()), + file:delete(decision_tab_file()). + +%% Prepare dump by renaming the open logfile if possible +%% Returns a tuple on the following format: {Res, OpenLog} +%% where OpenLog is the file descriptor to log file, ready for append +%% and Res is one of the following: already_dumped, needs_dump or {error, Reason} +prepare_log_dump(InitBy) -> + Diff = mnesia_dumper:get_log_writes() - + mnesia_lib:read_counter(trans_log_writes_prev), + if + Diff == 0, InitBy /= startup -> + already_dumped; + true -> + case mnesia_monitor:use_dir() of + true -> + Prev = previous_log_file(), + prepare_prev(Diff, InitBy, Prev, exists(Prev)); + false -> + already_dumped + end + end. + +prepare_prev(Diff, _, _, true) -> + {needs_dump, Diff}; +prepare_prev(Diff, startup, Prev, false) -> + Latest = latest_log_file(), + case exists(Latest) of + true -> + case file:rename(Latest, Prev) of + ok -> + {needs_dump, Diff}; + {error, Reason} -> + {error, Reason} + end; + false -> + already_dumped + end; +prepare_prev(Diff, _InitBy, Prev, false) -> + Head = trans_log_header(), + case mnesia_monitor:reopen_log(latest_log, Prev, Head) of + ok -> + {needs_dump, Diff}; + {error, Reason} -> + Latest = latest_log_file(), + {error, {"Cannot rename log file", + [Latest, Prev, Reason]}} + end. + +%% Init dump and return PrevLogFileDesc or exit. +init_log_dump() -> + Fname = previous_log_file(), + open_log(previous_log, trans_log_header(), Fname), + start. + + +chunk_log(Cont) -> + chunk_log(previous_log, Cont). + +chunk_log(_Log, eof) -> + eof; +chunk_log(Log, Cont) -> + case catch disk_log:chunk(Log, Cont) of + {error, Reason} -> + fatal("Possibly truncated ~p file: ~p~n", + [Log, Reason]); + {C2, Chunk, _BadBytes} -> + %% Read_only case, should we warn about the bad log file? + %% BUGBUG Should we crash if Repair == false ?? + %% We got to check this !! + mnesia_lib:important("~p repaired, lost ~p bad bytes~n", [Log, _BadBytes]), + {C2, Chunk}; + Other -> + Other + end. + +%% Confirms the dump by closing prev log and delete the file +confirm_log_dump(Updates) -> + case mnesia_monitor:close_log(previous_log) of + ok -> + file:delete(previous_log_file()), + mnesia_lib:incr_counter(trans_log_writes_prev, Updates), + dumped; + {error, Reason} -> + {error, Reason} + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Decision log + +open_decision_log() -> + Latest = decision_log_file(), + open_log(decision_log, decision_log_header(), Latest), + start. + +prepare_decision_log_dump() -> + Prev = previous_decision_log_file(), + prepare_decision_log_dump(exists(Prev), Prev). + +prepare_decision_log_dump(false, Prev) -> + Head = decision_log_header(), + case mnesia_monitor:reopen_log(decision_log, Prev, Head) of + ok -> + prepare_decision_log_dump(true, Prev); + {error, Reason} -> + fatal("Cannot rename decision log file ~p -> ~p: ~p~n", + [decision_log_file(), Prev, Reason]) + end; +prepare_decision_log_dump(true, Prev) -> + open_log(previous_decision_log, decision_log_header(), Prev), + start. + +chunk_decision_log(Cont) -> + %% dbg_out("chunk log ~p~n", [Cont]), + chunk_log(previous_decision_log, Cont). + +%% Confirms dump of the decision log +confirm_decision_log_dump() -> + case mnesia_monitor:close_log(previous_decision_log) of + ok -> + file:delete(previous_decision_log_file()); + {error, Reason} -> + fatal("Cannot confirm decision log dump: ~p~n", + [Reason]) + end. + +save_decision_tab(Decisions) -> + Log = decision_tab, + Tmp = mnesia_lib:dir("DECISION_TAB.TMP"), + file:delete(Tmp), + open_log(Log, decision_tab_header(), Tmp), + append(Log, Decisions), + close_log(Log), + TabFile = decision_tab_file(), + ok = file:rename(Tmp, TabFile). + +open_decision_tab() -> + TabFile = decision_tab_file(), + open_log(decision_tab, decision_tab_header(), TabFile), + start. + +close_decision_tab() -> + close_log(decision_tab). + +chunk_decision_tab(Cont) -> + %% dbg_out("chunk tab ~p~n", [Cont]), + chunk_log(decision_tab, Cont). + +close_decision_log() -> + close_log(decision_log). + +log_decision(Decision) -> + append(decision_log, Decision). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Debug functions + +view() -> + lists:foreach(fun(F) -> view(F) end, log_files()). + +view(File) -> + mnesia_lib:show("***** ~p ***** ~n", [File]), + case exists(File) of + false -> + nolog; + true -> + N = view_only, + Args = [{file, File}, {name, N}, {mode, read_only}], + case disk_log:open(Args) of + {ok, N} -> + view_file(start, N); + {repaired, _, _, _} -> + view_file(start, N); + {error, Reason} -> + error("Cannot open log ~p: ~p~n", [File, Reason]) + end + end. + +view_file(C, Log) -> + case disk_log:chunk(Log, C) of + {error, Reason} -> + error("** Possibly truncated FILE ~p~n", [Reason]), + error; + eof -> + disk_log:close(Log), + eof; + {C2, Terms, _BadBytes} -> + dbg_out("Lost ~p bytes in ~p ~n", [_BadBytes, Log]), + lists:foreach(fun(X) -> mnesia_lib:show("~p~n", [X]) end, + Terms), + view_file(C2, Log); + {C2, Terms} -> + lists:foreach(fun(X) -> mnesia_lib:show("~p~n", [X]) end, + Terms), + view_file(C2, Log) + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Backup + +-record(backup_args, {name, module, opaque, scope, prev_name, tables, cookie}). + +backup(Opaque) -> + backup(Opaque, []). + +backup(Opaque, Mod) when is_atom(Mod) -> + backup(Opaque, [{module, Mod}]); +backup(Opaque, Args) when is_list(Args) -> + %% Backup all tables with max redundancy + CpArgs = [{ram_overrides_dump, false}, {max, val({schema, tables})}], + case mnesia_checkpoint:activate(CpArgs) of + {ok, Name, _Nodes} -> + Res = backup_checkpoint(Name, Opaque, Args), + mnesia_checkpoint:deactivate(Name), + Res; + {error, Reason} -> + {error, Reason} + end. + +backup_checkpoint(Name, Opaque) -> + backup_checkpoint(Name, Opaque, []). + +backup_checkpoint(Name, Opaque, Mod) when is_atom(Mod) -> + backup_checkpoint(Name, Opaque, [{module, Mod}]); +backup_checkpoint(Name, Opaque, Args) when is_list(Args) -> + DefaultMod = mnesia_monitor:get_env(backup_module), + B = #backup_args{name = Name, + module = DefaultMod, + opaque = Opaque, + scope = global, + tables = all, + prev_name = Name}, + case check_backup_args(Args, B) of + {ok, B2} -> + %% Decentralized backup + %% Incremental + + Self = self(), + Pid = spawn_link(?MODULE, backup_master, [Self, B2]), + receive + {Pid, Self, Res} -> Res + end; + {error, Reason} -> + {error, Reason} + end. + +check_backup_args([Arg | Tail], B) -> + case catch check_backup_arg_type(Arg, B) of + {'EXIT', _Reason} -> + {error, {badarg, Arg}}; + B2 -> + check_backup_args(Tail, B2) + end; + +check_backup_args([], B) -> + {ok, B}. + +check_backup_arg_type(Arg, B) -> + case Arg of + {scope, global} -> + B#backup_args{scope = global}; + {scope, local} -> + B#backup_args{scope = local}; + {module, Mod} -> + Mod2 = mnesia_monitor:do_check_type(backup_module, Mod), + B#backup_args{module = Mod2}; + {incremental, Name} -> + B#backup_args{prev_name = Name}; + {tables, Tabs} when is_list(Tabs) -> + B#backup_args{tables = Tabs} + end. + +backup_master(ClientPid, B) -> + process_flag(trap_exit, true), + case catch do_backup_master(B) of + {'EXIT', Reason} -> + ClientPid ! {self(), ClientPid, {error, {'EXIT', Reason}}}; + Res -> + ClientPid ! {self(), ClientPid, Res} + end, + unlink(ClientPid), + exit(normal). + +do_backup_master(B) -> + Name = B#backup_args.name, + B2 = safe_apply(B, open_write, [B#backup_args.opaque]), + B3 = safe_write(B2, [backup_log_header()]), + case mnesia_checkpoint:tables_and_cookie(Name) of + {ok, AllTabs, Cookie} -> + Tabs = select_tables(AllTabs, B3), + B4 = B3#backup_args{cookie = Cookie}, + %% Always put schema first in backup file + B5 = backup_schema(B4, Tabs), + B6 = lists:foldl(fun backup_tab/2, B5, Tabs -- [schema]), + safe_apply(B6, commit_write, [B6#backup_args.opaque]), + ok; + {error, Reason} -> + abort_write(B3, {?MODULE, backup_master}, [B], {error, Reason}) + end. + +select_tables(AllTabs, B) -> + Tabs = + case B#backup_args.tables of + all -> AllTabs; + SomeTabs when is_list(SomeTabs) -> SomeTabs + end, + case B#backup_args.scope of + global -> + Tabs; + local -> + Name = B#backup_args.name, + [T || T <- Tabs, mnesia_checkpoint:most_local_node(Name, T) == {ok, node()}] + end. + +safe_write(B, []) -> + B; +safe_write(B, Recs) -> + safe_apply(B, write, [B#backup_args.opaque, Recs]). + +backup_schema(B, Tabs) -> + case lists:member(schema, Tabs) of + true -> + backup_tab(schema, B); + false -> + Defs = [{schema, T, mnesia_schema:get_create_list(T)} || T <- Tabs], + safe_write(B, Defs) + end. + +safe_apply(B, write, [_, Items]) when Items == [] -> + B; +safe_apply(B, What, Args) -> + Abort = fun(R) -> abort_write(B, What, Args, R) end, + receive + {'EXIT', Pid, R} -> Abort({'EXIT', Pid, R}) + after 0 -> + Mod = B#backup_args.module, + case catch apply(Mod, What, Args) of + {ok, Opaque} -> B#backup_args{opaque=Opaque}; + {error, R} -> Abort(R); + R -> Abort(R) + end + end. + +abort_write(B, What, Args, Reason) -> + Mod = B#backup_args.module, + Opaque = B#backup_args.opaque, + dbg_out("Failed to perform backup. M=~p:F=~p:A=~p -> ~p~n", + [Mod, What, Args, Reason]), + case catch apply(Mod, abort_write, [Opaque]) of + {ok, _Res} -> + throw({error, Reason}); + Other -> + error("Failed to abort backup. ~p:~p~p -> ~p~n", + [Mod, abort_write, [Opaque], Other]), + throw({error, Reason}) + end. + +backup_tab(Tab, B) -> + Name = B#backup_args.name, + case mnesia_checkpoint:most_local_node(Name, Tab) of + {ok, Node} when Node == node() -> + tab_copier(self(), B, Tab); + {ok, Node} -> + RemoteB = B, + Pid = spawn_link(Node, ?MODULE, tab_copier, [self(), RemoteB, Tab]), + RecName = val({Tab, record_name}), + tab_receiver(Pid, B, Tab, RecName, 0); + {error, Reason} -> + abort_write(B, {?MODULE, backup_tab}, [Tab, B], {error, Reason}) + end. + +tab_copier(Pid, B, Tab) when is_record(B, backup_args) -> + %% Intentional crash at exit + Name = B#backup_args.name, + PrevName = B#backup_args.prev_name, + {FirstName, FirstSource} = select_source(Tab, Name, PrevName), + + ?eval_debug_fun({?MODULE, tab_copier, pre}, [{name, Name}, {tab, Tab}]), + Res = handle_more(Pid, B, Tab, FirstName, FirstSource, Name), + ?eval_debug_fun({?MODULE, tab_copier, post}, [{name, Name}, {tab, Tab}]), + + handle_last(Pid, Res). + +select_source(Tab, Name, PrevName) -> + if + Tab == schema -> + %% Always full backup of schema + {Name, table}; + Name == PrevName -> + %% Full backup + {Name, table}; + true -> + %% Wants incremental backup + case mnesia_checkpoint:most_local_node(PrevName, Tab) of + {ok, Node} when Node == node() -> + %% Accept incremental backup + {PrevName, retainer}; + _ -> + %% Do a full backup anyway + dbg_out("Incremental backup escalated to full backup: ~p~n", [Tab]), + {Name, table} + end + end. + +handle_more(Pid, B, Tab, FirstName, FirstSource, Name) -> + Acc = {0, B}, + case {mnesia_checkpoint:really_retain(Name, Tab), + mnesia_checkpoint:really_retain(FirstName, Tab)} of + {true, true} -> + Acc2 = iterate(B, FirstName, Tab, Pid, FirstSource, latest, first, Acc), + iterate(B, Name, Tab, Pid, retainer, checkpoint, last, Acc2); + {false, false}-> + %% Put the dumped file in the backup + %% instead of the ram table. Does + %% only apply to ram_copies. + iterate(B, Name, Tab, Pid, retainer, checkpoint, last, Acc); + Bad -> + Reason = {"Checkpoints for incremental backup must have same " + "setting of ram_overrides_dump", + Tab, Name, FirstName, Bad}, + abort_write(B, {?MODULE, backup_tab}, [Tab, B], {error, Reason}) + end. + +handle_last(Pid, {_Count, B}) when Pid == self() -> + B; +handle_last(Pid, _Acc) -> + unlink(Pid), + Pid ! {self(), {last, {ok, dummy}}}, + exit(normal). + +iterate(B, Name, Tab, Pid, Source, Age, Pass, Acc) -> + Fun = + if + Pid == self() -> + RecName = val({Tab, record_name}), + fun(Recs, A) -> copy_records(RecName, Tab, Recs, A) end; + true -> + fun(Recs, A) -> send_records(Pid, Tab, Recs, Pass, A) end + end, + case mnesia_checkpoint:iterate(Name, Tab, Fun, Acc, Source, Age) of + {ok, Acc2} -> + Acc2; + {error, Reason} -> + R = {error, {"Tab copier iteration failed", Reason}}, + abort_write(B, {?MODULE, iterate}, [self(), B, Tab], R) + end. + +copy_records(_RecName, _Tab, [], Acc) -> + Acc; +copy_records(RecName, Tab, Recs, {Count, B}) -> + Recs2 = rec_filter(B, Tab, RecName, Recs), + B2 = safe_write(B, Recs2), + {Count + 1, B2}. + +send_records(Pid, Tab, Recs, Pass, {Count, B}) -> + receive + {Pid, more, Count} -> + if + Pass == last, Recs == [] -> + {Count, B}; + true -> + Next = Count + 1, + Pid ! {self(), {more, Next, Recs}}, + {Next, B} + end; + Msg -> + exit({send_records_unexpected_msg, Tab, Msg}) + end. + +tab_receiver(Pid, B, Tab, RecName, Slot) -> + Pid ! {self(), more, Slot}, + receive + {Pid, {more, Next, Recs}} -> + Recs2 = rec_filter(B, Tab, RecName, Recs), + B2 = safe_write(B, Recs2), + tab_receiver(Pid, B2, Tab, RecName, Next); + + {Pid, {last, {ok,_}}} -> + B; + + {'EXIT', Pid, {error, R}} -> + Reason = {error, {"Tab copier crashed", R}}, + abort_write(B, {?MODULE, remote_tab_sender}, [self(), B, Tab], Reason); + {'EXIT', Pid, R} -> + Reason = {error, {"Tab copier crashed", {'EXIT', R}}}, + abort_write(B, {?MODULE, remote_tab_sender}, [self(), B, Tab], Reason); + Msg -> + R = {error, {"Tab receiver got unexpected msg", Msg}}, + abort_write(B, {?MODULE, remote_tab_sender}, [self(), B, Tab], R) + end. + +rec_filter(B, schema, _RecName, Recs) -> + case catch mnesia_bup:refresh_cookie(Recs, B#backup_args.cookie) of + Recs2 when is_list(Recs2) -> + Recs2; + {error, _Reason} -> + %% No schema table cookie + Recs + end; +rec_filter(_B, Tab, Tab, Recs) -> + Recs; +rec_filter(_B, Tab, _RecName, Recs) -> + [setelement(1, Rec, Tab) || Rec <- Recs]. + +ets2dcd(Tab) -> + ets2dcd(Tab, dcd). + +ets2dcd(Tab, Ftype) -> + Fname = + case Ftype of + dcd -> mnesia_lib:tab2dcd(Tab); + dmp -> mnesia_lib:tab2dmp(Tab) + end, + TmpF = mnesia_lib:tab2tmp(Tab), + file:delete(TmpF), + Log = open_log({Tab, ets2dcd}, dcd_log_header(), TmpF, false), + mnesia_lib:db_fixtable(ram_copies, Tab, true), + ok = ets2dcd(mnesia_lib:db_init_chunk(ram_copies, Tab, 1000), Tab, Log), + mnesia_lib:db_fixtable(ram_copies, Tab, false), + close_log(Log), + ok = file:rename(TmpF, Fname), + %% Remove old log data which is now in the new dcd. + %% No one else should be accessing this file! + file:delete(mnesia_lib:tab2dcl(Tab)), + ok. + +ets2dcd('$end_of_table', _Tab, _Log) -> + ok; +ets2dcd({Recs, Cont}, Tab, Log) -> + ok = disk_log:alog_terms(Log, Recs), + ets2dcd(mnesia_lib:db_chunk(ram_copies, Cont), Tab, Log). + +dcd2ets(Tab) -> + dcd2ets(Tab, mnesia_monitor:get_env(auto_repair)). + +dcd2ets(Tab, Rep) -> + Dcd = mnesia_lib:tab2dcd(Tab), + case mnesia_lib:exists(Dcd) of + true -> + Log = open_log({Tab, dcd2ets}, dcd_log_header(), Dcd, + true, Rep, read_only), + Data = chunk_log(Log, start), + ok = insert_dcdchunk(Data, Log, Tab), + close_log(Log), + load_dcl(Tab, Rep); + false -> %% Handle old dets files, and conversion from disc_only to disc. + Fname = mnesia_lib:tab2dat(Tab), + Type = val({Tab, setorbag}), + case mnesia_lib:dets_to_ets(Tab, Tab, Fname, Type, Rep, yes) of + loaded -> + ets2dcd(Tab), + file:delete(Fname), + 0; + {error, Error} -> + erlang:error({"Failed to load table from disc", [Tab, Error]}) + end + end. + +insert_dcdchunk({Cont, [LogH | Rest]}, Log, Tab) + when is_record(LogH, log_header), + LogH#log_header.log_kind == dcd_log, + LogH#log_header.log_version >= "1.0" -> + insert_dcdchunk({Cont, Rest}, Log, Tab); + +insert_dcdchunk({Cont, Recs}, Log, Tab) -> + true = ets:insert(Tab, Recs), + insert_dcdchunk(chunk_log(Log, Cont), Log, Tab); +insert_dcdchunk(eof, _Log, _Tab) -> + ok. + +load_dcl(Tab, Rep) -> + FName = mnesia_lib:tab2dcl(Tab), + case mnesia_lib:exists(FName) of + true -> + Name = {load_dcl,Tab}, + open_log(Name, + dcl_log_header(), + FName, + true, + Rep, + read_only), + FirstChunk = chunk_log(Name, start), + N = insert_logchunk(FirstChunk, Name, 0), + close_log(Name), + N; + false -> + 0 + end. + +insert_logchunk({C2, Recs}, Tab, C) -> + N = add_recs(Recs, C), + insert_logchunk(chunk_log(Tab, C2), Tab, C+N); +insert_logchunk(eof, _Tab, C) -> + C. + +add_recs([{{Tab, _Key}, Val, write} | Rest], N) -> + true = ets:insert(Tab, Val), + add_recs(Rest, N+1); +add_recs([{{Tab, Key}, _Val, delete} | Rest], N) -> + true = ets:delete(Tab, Key), + add_recs(Rest, N+1); +add_recs([{{Tab, _Key}, Val, delete_object} | Rest], N) -> + true = ets:match_delete(Tab, Val), + add_recs(Rest, N+1); +add_recs([{{Tab, Key}, Val, update_counter} | Rest], N) -> + {RecName, Incr} = Val, + case catch ets:update_counter(Tab, Key, Incr) of + CounterVal when is_integer(CounterVal) -> + ok; + _ when Incr < 0 -> + Zero = {RecName, Key, 0}, + true = ets:insert(Tab, Zero); + _ -> + Zero = {RecName, Key, Incr}, + true = ets:insert(Tab, Zero) + end, + add_recs(Rest, N+1); +add_recs([LogH|Rest], N) + when is_record(LogH, log_header), + LogH#log_header.log_kind == dcl_log, + LogH#log_header.log_version >= "1.0" -> + add_recs(Rest, N); +add_recs([{{Tab, _Key}, _Val, clear_table} | Rest], N) -> + true = ets:match_delete(Tab, '_'), + add_recs(Rest, N+ets:info(Tab, size)); +add_recs([], N) -> + N. |