%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1996-2014. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%

%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
%% This module administers three kinds of log files:
%%
%% 1 The transaction log
%%   mnesia_tm appends to the log (via mnesia_log) at the
%%   end of each transaction (or dirty write) and
%%   mnesia_dumper reads the log and performs the ops in
%%   the dat files. The dump_log is done  at startup and
%%   at intervals controlled by the user.
%%
%% 2 The mnesia_down log
%%   mnesia_tm appends to the log (via mnesia_log) when it
%%   realizes that mnesia goes up or down on another node.
%%   mnesia_init reads the log (via mnesia_log) at startup.
%%
%% 3 The backup log
%%   mnesia_schema produces one tiny log when the schema is
%%   initially created. mnesia_schema also reads the log
%%   when the user wants tables (possibly incl the schema)
%%   to be restored. mnesia_log appends to the log when the
%%   user wants to produce a real backup.
%%
%%   The actual access to the backup media is performed via the
%%   mnesia_backup module for both read and write. mnesia_backup
%%   uses the disk_log (*), BUT the user may write an own module
%%   with the same interface as mnesia_backup and configure
%%   Mnesia so the alternate module performs the actual accesses
%%   to the backup media. This means that the user may put the
%%   backup on medias that Mnesia does not know about possibly on
%%   hosts where Erlang is not running.
%%
%% All these logs have to some extent a common structure.
%% They are all using the disk_log module (*) for the basic
%% file structure. The disk_log has a repair feature that
%% can be used to skip erroneous log records if one comes to
%% the conclusion that it is more important to reuse some
%% of the log records than the risque of obtaining inconsistent
%% data. If the data becomes inconsistent it is solely up to the
%% application to make it consistent again. The automatic
%% reparation of the disk_log is very powerful, but use it
%% with extreme care.
%%
%% First in all Mnesia's log file is a mnesia log header.
%% It contains a list with a log_header record as single
%% element. The structure of the log_header may never be
%% changed since it may be written to very old backup files.
%% By holding this record definition stable we can be
%% able to comprahend backups from timepoint 0. It also
%% allows us to use the backup format as an interchange
%% format between Mnesia releases.
%%
%% An op-list is a list of tuples with arity 3. Each tuple
%% has this structure: {Oid, Recs, Op} where Oid is the tuple
%% {Tab, Key}, Recs is a (possibly empty) list of records and
%% Op is an atom.
%%
%% The log file structure for the transaction log is as follows.
%%
%%    After the mnesia log section follows an extended record section
%%    containing op-lists. There are several values that Op may
%%    have, such as write, delete, update_counter, delete_object,
%%    and replace. There is no special end of section marker.
%%
%%    +-----------------+
%%    | mnesia log head |
%%    +-----------------+
%%    | extended record |
%%    | section         |
%%    +-----------------+
%%
%% The log file structure for the mnesia_down log is as follows.
%%
%%    After the mnesia log section follows a mnesia_down section
%%    containg lists with yoyo records as single element.
%%
%%    +-----------------+
%%    | mnesia log head |
%%    +-----------------+
%%    | mnesia_down     |
%%    | section         |
%%    +-----------------+
%%
%% The log file structure for the backup log is as follows.
%%
%%    After the mnesia log section follows a schema section
%%    containing record lists. A record list is a list of tuples
%%    where {schema, Tab} is interpreted as a delete_table(Tab) and
%%    {schema, Tab, CreateList} are interpreted as create_table.
%%
%%    The record section also contains record lists. In this section
%%    {Tab, Key} is interpreted as delete({Tab, Key}) and other tuples
%%    as write(Tuple). There is no special end of section marker.
%%
%%    +-----------------+
%%    | mnesia log head |
%%    +-----------------+
%%    | schema section  |
%%    +-----------------+
%%    | record section  |
%%    +-----------------+
%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-module(mnesia_log).

-export([
	 append/2,
	 backup/1,
	 backup/2,
	 backup_checkpoint/2,
	 backup_checkpoint/3,
	 backup_log_header/0,
	 backup_master/2,
	 chunk_decision_log/1,
	 chunk_decision_tab/1,
	 chunk_log/1,
	 chunk_log/2,
	 close_decision_log/0,
	 close_decision_tab/0,
	 close_log/1,
	 unsafe_close_log/1,
	 confirm_log_dump/1,
	 confirm_decision_log_dump/0,
	 previous_log_file/0,
	 previous_decision_log_file/0,
	 latest_log_file/0,
	 decision_log_version/0,
	 decision_log_file/0,
	 decision_tab_file/0,
	 decision_tab_version/0,
	 dcl_version/0,
	 dcd_version/0,
	 ets2dcd/1,
	 ets2dcd/2,
	 dcd2ets/1,
	 dcd2ets/2,
	 init/0,
	 init_log_dump/0,
	 log/1,
	 slog/1,
	 log_decision/1,
	 log_files/0,
	 open_decision_log/0,
	 trans_log_header/0,
	 open_decision_tab/0,
	 dcl_log_header/0,
	 dcd_log_header/0,
	 open_log/4,
	 open_log/6,
	 prepare_decision_log_dump/0,
	 prepare_log_dump/1,
	 save_decision_tab/1,
	 purge_all_logs/0,
	 purge_some_logs/0,
	 stop/0,
	 tab_copier/3,
	 version/0,
	 view/0,
	 view/1,
	 write_trans_log_header/0
	]).


-compile({no_auto_import,[error/2]}).

-include("mnesia.hrl").
-import(mnesia_lib, [val/1, dir/1]).
-import(mnesia_lib, [exists/1, fatal/2, error/2, dbg_out/2]).

trans_log_header() -> log_header(trans_log, version()).
backup_log_header() -> log_header(backup_log, "1.2").
decision_log_header() -> log_header(decision_log, decision_log_version()).
decision_tab_header() -> log_header(decision_tab, decision_tab_version()).
dcl_log_header() -> log_header(dcl_log, dcl_version()).
dcd_log_header() -> log_header(dcd_log, dcd_version()).

log_header(Kind, Version) ->
    #log_header{log_version=Version,
		log_kind=Kind,
		mnesia_version=mnesia:system_info(version),
		node=node(),
		now=now()}.

version() -> "4.3".

decision_log_version() -> "3.0".

decision_tab_version() -> "1.0".

dcl_version() -> "1.0".
dcd_version() -> "1.0".

append(Log, Bin) when is_binary(Bin) ->
    disk_log:balog(Log, Bin);
append(Log, Term) ->
    disk_log:alog(Log, Term).

%% Synced append
sappend(Log, Bin) when is_binary(Bin) ->
    ok = disk_log:blog(Log, Bin);
sappend(Log, Term) ->
    ok = disk_log:log(Log, Term).

%% Write commit records to the latest_log
log(C) when  C#commit.disc_copies == [],
             C#commit.disc_only_copies  == [],
             C#commit.schema_ops == [] ->
    ignore;
log(C) ->
    case mnesia_monitor:use_dir() of
        true ->
	    if
		is_record(C, commit) ->
		    C2 =  C#commit{ram_copies = [], snmp = []},
		    append(latest_log, C2);
		true ->
		    %% Either a commit record as binary
		    %% or some decision related info
		    append(latest_log, C)
	    end,
	    mnesia_dumper:incr_log_writes();
	false ->
	    ignore
    end.

%% Synced

slog(C) when  C#commit.disc_copies == [],
             C#commit.disc_only_copies  == [],
             C#commit.schema_ops == [] ->
    ignore;
slog(C) ->
    case mnesia_monitor:use_dir() of
        true ->
	    if
		is_record(C, commit) ->
		    C2 =  C#commit{ram_copies = [], snmp = []},
		    sappend(latest_log, C2);
		true ->
		    %% Either a commit record as binary
		    %% or some decision related info
		    sappend(latest_log, C)
	    end,
	    mnesia_dumper:incr_log_writes();
	false ->
	    ignore
    end.


%% Stuff related to the file LOG

%% Returns a list of logfiles. The oldest is first.
log_files() -> [previous_log_file(),
		latest_log_file(),
		decision_tab_file()
	       ].

latest_log_file() -> dir(latest_log_name()).

previous_log_file() -> dir("PREVIOUS.LOG").

decision_log_file() -> dir(decision_log_name()).

decision_tab_file() -> dir(decision_tab_name()).

previous_decision_log_file() -> dir("PDECISION.LOG").

latest_log_name() -> "LATEST.LOG".

decision_log_name() -> "DECISION.LOG".

decision_tab_name() -> "DECISION_TAB.LOG".

init() ->
    case mnesia_monitor:use_dir() of
	true ->
	    Prev = previous_log_file(),
	    verify_no_exists(Prev),

	    Latest = latest_log_file(),
	    verify_no_exists(Latest),

	    Header = trans_log_header(),
	    open_log(latest_log, Header, Latest);
	false ->
	    ok
    end.

verify_no_exists(Fname) ->
    case exists(Fname) of
	false ->
	    ok;
	true ->
	    fatal("Log file exists: ~p~n", [Fname])
    end.

open_log(Name, Header, Fname) ->
    Exists = exists(Fname),
    open_log(Name, Header, Fname, Exists).

open_log(Name, Header, Fname, Exists) ->
    Repair = mnesia_monitor:get_env(auto_repair),
    open_log(Name, Header, Fname, Exists, Repair).

open_log(Name, Header, Fname, Exists, Repair) ->
    case Name == previous_log of
	true ->
	    open_log(Name, Header, Fname, Exists, Repair, read_only);
	false ->
	    open_log(Name, Header, Fname, Exists, Repair, read_write)
    end.

open_log(Name, Header, Fname, Exists, Repair, Mode) ->
    Args = [{file, Fname}, {name, Name}, {repair, Repair}, {mode, Mode}],
%%    io:format("~p:open_log: ~p ~p~n", [?MODULE, Name, Fname]),
    case mnesia_monitor:open_log(Args) of
	{ok, Log} when Exists == true ->
	    Log;
	{ok, Log} ->
	    write_header(Log, Header),
	    Log;
	{repaired, Log, _, {badbytes, 0}} when Exists == true ->
	    Log;
	{repaired, Log, _, {badbytes, 0}} ->
	    write_header(Log, Header),
	    Log;
	{repaired, Log, _Recover, BadBytes} ->
	    mnesia_lib:important("Data may be missing, log ~p repaired: Lost ~p bytes~n",
				 [Fname, BadBytes]),
	    Log;
	{error, Reason} when Repair == true ->
	    file:delete(Fname),
	    mnesia_lib:important("Data may be missing, Corrupt logfile deleted: ~p, ~p ~n",
				 [Fname, Reason]),
	    %% Create a new
	    open_log(Name, Header, Fname, false, false, read_write);
	{error, Reason} ->
	    fatal("Cannot open log file ~p: ~p~n", [Fname, Reason])
    end.

write_header(Log, Header) ->
    append(Log, Header).

write_trans_log_header() ->
    write_header(latest_log, trans_log_header()).

stop() ->
    case mnesia_monitor:use_dir() of
        true ->
	    close_log(latest_log);
	false ->
	    ok
    end.

close_log(Log) ->
%%    io:format("mnesia_log:close_log ~p~n", [Log]),
%%    io:format("mnesia_log:close_log ~p~n", [Log]),
    case disk_log:sync(Log) of
	ok -> ok;
	{error, {read_only_mode, Log}} ->
	    ok;
	{error, Reason} ->
	    mnesia_lib:important("Failed syncing ~p to_disk reason ~p ~n",
				 [Log, Reason])
    end,
    mnesia_monitor:close_log(Log).

unsafe_close_log(Log) ->
%%    io:format("mnesia_log:close_log ~p~n", [Log]),
    mnesia_monitor:unsafe_close_log(Log).


purge_some_logs() ->
    mnesia_monitor:unsafe_close_log(latest_log),
    _ = file:delete(latest_log_file()),
    _ = file:delete(decision_tab_file()),
    ok.

purge_all_logs() ->
    _ = file:delete(previous_log_file()),
    _ = file:delete(latest_log_file()),
    _ = file:delete(decision_tab_file()),
    ok.

%% Prepare dump by renaming the open logfile if possible
%% Returns a tuple on the following format: {Res, OpenLog}
%% where OpenLog is the file descriptor to log file, ready for append
%% and Res is one of the following: already_dumped, needs_dump or {error, Reason}
prepare_log_dump(InitBy) ->
    Diff = mnesia_dumper:get_log_writes() -
           mnesia_lib:read_counter(trans_log_writes_prev),
    if
	Diff == 0, InitBy /= startup ->
	    already_dumped;
	true ->
	    case mnesia_monitor:use_dir() of
		true ->
		    Prev = previous_log_file(),
		    prepare_prev(Diff, InitBy, Prev, exists(Prev));
		false ->
		    already_dumped
	    end
    end.

prepare_prev(Diff, _, _, true) ->
    {needs_dump, Diff};
prepare_prev(Diff, startup, Prev, false) ->
    Latest = latest_log_file(),
    case exists(Latest) of
	true ->
	    case file:rename(Latest, Prev) of
		ok ->
		    {needs_dump, Diff};
		{error, Reason} ->
		    {error, Reason}
	    end;
	false ->
	    already_dumped
    end;
prepare_prev(Diff, _InitBy, Prev, false) ->
    Head = trans_log_header(),
    case mnesia_monitor:reopen_log(latest_log, Prev, Head) of
	ok ->
	    {needs_dump, Diff};
	{error, Reason} ->
	    Latest = latest_log_file(),
	    {error, {"Cannot rename log file",
		     [Latest, Prev, Reason]}}
    end.

%% Init dump and return PrevLogFileDesc or exit.
init_log_dump() ->
    Fname = previous_log_file(),
    open_log(previous_log, trans_log_header(), Fname),
    start.


chunk_log(Cont) ->
    chunk_log(previous_log, Cont).

chunk_log(_Log, eof) ->
    eof;
chunk_log(Log, Cont) ->
    case catch disk_log:chunk(Log, Cont) of
	{error, Reason} ->
	    fatal("Possibly truncated ~p file: ~p~n",
		  [Log, Reason]);
	{C2, Chunk, _BadBytes} ->
	    %% Read_only case, should we warn about the bad log file?
	    %% BUGBUG Should we crash if Repair == false ??
	    %% We got to check this !!
	    mnesia_lib:important("~p repaired, lost ~p bad bytes~n", [Log, _BadBytes]),
	    {C2, Chunk};
	Other ->
	    Other
    end.

%% Confirms the dump by closing prev log and delete the file
confirm_log_dump(Updates) ->
    case mnesia_monitor:close_log(previous_log) of
	ok ->
	    file:delete(previous_log_file()),
	    mnesia_lib:incr_counter(trans_log_writes_prev, Updates),
	    dumped;
	{error, Reason} ->
	    {error, Reason}
    end.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Decision log

open_decision_log() ->
    Latest = decision_log_file(),
    open_log(decision_log, decision_log_header(), Latest),
    start.

prepare_decision_log_dump() ->
    Prev = previous_decision_log_file(),
    prepare_decision_log_dump(exists(Prev), Prev).

prepare_decision_log_dump(false, Prev) ->
    Head = decision_log_header(),
    case mnesia_monitor:reopen_log(decision_log, Prev, Head) of
	ok ->
	    prepare_decision_log_dump(true, Prev);
	{error, Reason} ->
	    fatal("Cannot rename decision log file ~p -> ~p: ~p~n",
		     [decision_log_file(), Prev, Reason])
    end;
prepare_decision_log_dump(true, Prev) ->
    open_log(previous_decision_log, decision_log_header(), Prev),
    start.

chunk_decision_log(Cont) ->
    %% dbg_out("chunk log ~p~n", [Cont]),
    chunk_log(previous_decision_log, Cont).

%% Confirms dump of the decision log
confirm_decision_log_dump() ->
    case mnesia_monitor:close_log(previous_decision_log) of
	ok ->
	    file:delete(previous_decision_log_file());
	{error, Reason} ->
	    fatal("Cannot confirm decision log dump: ~p~n",
		  [Reason])
    end.

save_decision_tab(Decisions) ->
    Log = decision_tab,
    Tmp = mnesia_lib:dir("DECISION_TAB.TMP"),
    file:delete(Tmp),
    open_log(Log, decision_tab_header(), Tmp),
    append(Log, Decisions),
    close_log(Log),
    TabFile = decision_tab_file(),
    ok = file:rename(Tmp, TabFile).

open_decision_tab() ->
    TabFile = decision_tab_file(),
    open_log(decision_tab, decision_tab_header(), TabFile),
    start.

close_decision_tab() ->
    close_log(decision_tab).

chunk_decision_tab(Cont) ->
    %% dbg_out("chunk tab ~p~n", [Cont]),
    chunk_log(decision_tab, Cont).

close_decision_log() ->
    close_log(decision_log).

log_decision(Decision) ->
    append(decision_log, Decision).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Debug functions

view() ->
    lists:foreach(fun(F) -> view(F) end, log_files()).

view(File) ->
    mnesia_lib:show("*****  ~p ***** ~n", [File]),
    case exists(File) of
	false ->
	    nolog;
	true ->
	    N = view_only,
	    Args = [{file, File}, {name, N}, {mode, read_only}],
	    case disk_log:open(Args) of
		{ok, N} ->
		    view_file(start, N);
		{repaired, _, _, _} ->
		    view_file(start, N);
		{error, Reason} ->
		    error("Cannot open log ~p: ~p~n", [File, Reason])
	    end
    end.

view_file(C, Log) ->
    case disk_log:chunk(Log, C) of
	{error, Reason} ->
	    error("** Possibly truncated FILE ~p~n", [Reason]),
	    error;
	eof ->
	    disk_log:close(Log),
	    eof;
	{C2, Terms, _BadBytes} ->
	    dbg_out("Lost ~p bytes in ~p ~n", [_BadBytes, Log]),
	    lists:foreach(fun(X) -> mnesia_lib:show("~p~n", [X]) end,
			  Terms),
	    view_file(C2, Log);
	{C2, Terms} ->
	    lists:foreach(fun(X) -> mnesia_lib:show("~p~n", [X]) end,
			  Terms),
	    view_file(C2, Log)
    end.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Backup

-record(backup_args, {name, module, opaque, scope, prev_name, tables, cookie}).

backup(Opaque) ->
    backup(Opaque, []).

backup(Opaque, Mod) when is_atom(Mod) ->
    backup(Opaque, [{module, Mod}]);
backup(Opaque, Args) when is_list(Args) ->
    %% Backup all tables with max redundancy
    CpArgs = [{ram_overrides_dump, false}, {max, val({schema, tables})}],
    case mnesia_checkpoint:activate(CpArgs) of
	{ok, Name, _Nodes} ->
	    Res = backup_checkpoint(Name, Opaque, Args),
	    mnesia_checkpoint:deactivate(Name),
	    Res;
	{error, Reason} ->
	    {error, Reason}
    end.

backup_checkpoint(Name, Opaque) ->
    backup_checkpoint(Name, Opaque, []).

backup_checkpoint(Name, Opaque, Mod) when is_atom(Mod) ->
    backup_checkpoint(Name, Opaque, [{module, Mod}]);
backup_checkpoint(Name, Opaque, Args) when is_list(Args) ->
    DefaultMod = mnesia_monitor:get_env(backup_module),
    B = #backup_args{name = Name,
		     module = DefaultMod,
		     opaque = Opaque,
		     scope = global,
		     tables = all,
		     prev_name = Name},
    case check_backup_args(Args, B) of
	{ok, B2} ->
	    %% Decentralized backup
	    %% Incremental

	    Self = self(),
	    Pid = spawn_link(?MODULE, backup_master, [Self, B2]),
	    receive
		{Pid, Self, Res} -> Res
	    end;
	{error, Reason} ->
	    {error, Reason}
    end.

check_backup_args([Arg | Tail], B) ->
    case catch check_backup_arg_type(Arg, B) of
	{'EXIT', _Reason} ->
	    {error, {badarg, Arg}};
	B2 ->
	    check_backup_args(Tail, B2)
    end;

check_backup_args([], B) ->
    {ok, B}.

check_backup_arg_type(Arg, B) ->
    case Arg of
	{scope, global} ->
	    B#backup_args{scope = global};
	{scope, local} ->
	    B#backup_args{scope = local};
	{module, Mod} ->
	    Mod2 = mnesia_monitor:do_check_type(backup_module, Mod),
	    B#backup_args{module = Mod2};
	{incremental, Name} ->
	    B#backup_args{prev_name = Name};
	{tables, Tabs} when is_list(Tabs) ->
	    B#backup_args{tables = Tabs}
    end.

backup_master(ClientPid, B) ->
    process_flag(trap_exit, true),
    case catch do_backup_master(B) of
	{'EXIT', Reason} ->
	    ClientPid ! {self(), ClientPid, {error, {'EXIT', Reason}}};
	Res ->
	    ClientPid ! {self(), ClientPid, Res}
    end,
    unlink(ClientPid),
    exit(normal).

do_backup_master(B) ->
    Name = B#backup_args.name,
    B2 = safe_apply(B, open_write, [B#backup_args.opaque]),
    B3 = safe_write(B2, [backup_log_header()]),
    case mnesia_checkpoint:tables_and_cookie(Name) of
	{ok, AllTabs, Cookie} ->
	    Tabs = select_tables(AllTabs, B3),
	    B4 = B3#backup_args{cookie = Cookie},
	    %% Always put schema first in backup file
	    B5 = backup_schema(B4, Tabs),
	    B6 = lists:foldl(fun backup_tab/2, B5, Tabs -- [schema]),
	    safe_apply(B6, commit_write, [B6#backup_args.opaque]),
	    ok;
	{error, Reason} ->
	    abort_write(B3, {?MODULE, backup_master}, [B], {error, Reason})
    end.

select_tables(AllTabs, B) ->
    Tabs =
	case B#backup_args.tables of
	    all -> AllTabs;
	    SomeTabs when is_list(SomeTabs) -> SomeTabs
	end,
    case B#backup_args.scope of
	global ->
	    Tabs;
	local ->
	    Name = B#backup_args.name,
	    [T || T <- Tabs, mnesia_checkpoint:most_local_node(Name, T) == {ok, node()}]
    end.

safe_write(B, []) ->
    B;
safe_write(B, Recs) ->
    safe_apply(B, write, [B#backup_args.opaque, Recs]).

backup_schema(B, Tabs) ->
    case lists:member(schema, Tabs) of
	true ->
	    backup_tab(schema, B);
	false ->
	    Defs = [{schema, T, mnesia_schema:get_create_list(T)} || T <- Tabs],
	    safe_write(B, Defs)
    end.

safe_apply(B, write, [_, Items]) when Items == [] ->
    B;
safe_apply(B, What, Args) ->
    Abort = fun(R) -> abort_write(B, What, Args, R) end,
    receive
	{'EXIT', Pid, R} -> Abort({'EXIT', Pid, R})
    after 0 ->
	    Mod = B#backup_args.module,
	    case catch apply(Mod, What, Args) of
		{ok, Opaque} -> B#backup_args{opaque=Opaque};
		{error, R} -> Abort(R);
		R -> Abort(R)
	    end
    end.

abort_write(B, What, Args, Reason) ->
    Mod = B#backup_args.module,
    Opaque = B#backup_args.opaque,
    dbg_out("Failed to perform backup. M=~p:F=~p:A=~p -> ~p~n",
	    [Mod, What, Args, Reason]),
    case catch apply(Mod, abort_write, [Opaque]) of
	{ok, _Res} ->
	    throw({error, Reason});
	Other ->
	    error("Failed to abort backup. ~p:~p~p -> ~p~n",
		  [Mod, abort_write, [Opaque], Other]),
	    throw({error, Reason})
    end.

backup_tab(Tab, B) ->
    Name = B#backup_args.name,
    case mnesia_checkpoint:most_local_node(Name, Tab) of
	{ok, Node} when Node == node() ->
	    tab_copier(self(), B, Tab);
	{ok, Node} ->
	    RemoteB = B,
	    Pid = spawn_link(Node, ?MODULE, tab_copier, [self(), RemoteB, Tab]),
	    RecName = val({Tab, record_name}),
	    tab_receiver(Pid, B, Tab, RecName, 0);
	{error, Reason} ->
	    abort_write(B, {?MODULE, backup_tab}, [Tab, B], {error, Reason})
    end.

tab_copier(Pid, B, Tab) when is_record(B, backup_args) ->
    %% Intentional crash at exit
    Name = B#backup_args.name,
    PrevName = B#backup_args.prev_name,
    {FirstName, FirstSource} = select_source(Tab, Name, PrevName),

    ?eval_debug_fun({?MODULE, tab_copier, pre}, [{name, Name}, {tab, Tab}]),
    Res = handle_more(Pid, B, Tab, FirstName, FirstSource, Name),
    ?eval_debug_fun({?MODULE, tab_copier, post}, [{name, Name}, {tab, Tab}]),

    handle_last(Pid, Res).

select_source(Tab, Name, PrevName) ->
    if
	Tab == schema ->
	    %% Always full backup of schema
	    {Name, table};
	Name == PrevName ->
	    %% Full backup
	    {Name, table};
	true ->
	    %% Wants incremental backup
	    case mnesia_checkpoint:most_local_node(PrevName, Tab) of
		{ok, Node} when Node == node() ->
		    %% Accept incremental backup
		    {PrevName, retainer};
		_ ->
		    %% Do a full backup anyway
		    dbg_out("Incremental backup escalated to full backup: ~p~n", [Tab]),
		    {Name, table}
	    end
    end.

handle_more(Pid, B, Tab, FirstName, FirstSource, Name) ->
    Acc = {0, B},
    case {mnesia_checkpoint:really_retain(Name, Tab),
	  mnesia_checkpoint:really_retain(FirstName, Tab)} of
	{true, true} ->
	    Acc2 = iterate(B, FirstName, Tab, Pid, FirstSource, latest, first, Acc),
	    iterate(B, Name, Tab, Pid, retainer, checkpoint, last, Acc2);
	{false, false}->
	    %% Put the dumped file in the backup
	    %% instead of the ram table. Does
	    %% only apply to ram_copies.
	    iterate(B, Name, Tab, Pid, retainer, checkpoint, last, Acc);
	Bad ->
	    Reason = {"Checkpoints for incremental backup must have same "
		      "setting of ram_overrides_dump",
		      Tab, Name, FirstName, Bad},
	    abort_write(B, {?MODULE, backup_tab}, [Tab, B], {error, Reason})
    end.

handle_last(Pid, {_Count, B}) when Pid == self() ->
    B;
handle_last(Pid, _Acc) ->
    unlink(Pid),
    Pid ! {self(), {last, {ok, dummy}}},
    exit(normal).

iterate(B, Name, Tab, Pid, Source, Age, Pass, Acc) ->
    Fun =
	if
	    Pid == self() ->
		RecName = val({Tab, record_name}),
		fun(Recs, A) -> copy_records(RecName, Tab, Recs, A) end;
	    true ->
		fun(Recs, A) -> send_records(Pid, Tab, Recs, Pass, A) end
	end,
    case mnesia_checkpoint:iterate(Name, Tab, Fun, Acc, Source, Age) of
	{ok, Acc2} ->
	    Acc2;
	{error, Reason} ->
	    R = {error, {"Tab copier iteration failed", Reason}},
	    abort_write(B, {?MODULE, iterate}, [self(), B, Tab], R)
    end.

copy_records(_RecName, _Tab, [], Acc) ->
    Acc;
copy_records(RecName, Tab, Recs, {Count, B}) ->
    Recs2 = rec_filter(B, Tab, RecName, Recs),
    B2 = safe_write(B, Recs2),
    {Count + 1, B2}.

send_records(Pid, Tab, Recs, Pass, {Count, B}) ->
    receive
	{Pid, more, Count} ->
	    if
		Pass == last, Recs == [] ->
		    {Count, B};
		true ->
		    Next = Count + 1,
		    Pid ! {self(), {more, Next, Recs}},
		    {Next, B}
	    end;
	Msg ->
	    exit({send_records_unexpected_msg, Tab, Msg})
    end.

tab_receiver(Pid, B, Tab, RecName, Slot) ->
    Pid ! {self(), more, Slot},
    receive
	{Pid, {more, Next, Recs}} ->
	    Recs2 = rec_filter(B, Tab, RecName, Recs),
	    B2 = safe_write(B, Recs2),
	    tab_receiver(Pid, B2, Tab, RecName, Next);

	{Pid, {last, {ok,_}}} ->
	    B;

	{'EXIT', Pid, {error, R}} ->
	    Reason = {error, {"Tab copier crashed", R}},
	    abort_write(B, {?MODULE, remote_tab_sender}, [self(), B, Tab], Reason);
	{'EXIT', Pid, R} ->
	    Reason = {error, {"Tab copier crashed", {'EXIT', R}}},
	    abort_write(B, {?MODULE, remote_tab_sender}, [self(), B, Tab], Reason);
	Msg ->
	    R = {error, {"Tab receiver got unexpected msg", Msg}},
	    abort_write(B, {?MODULE, remote_tab_sender}, [self(), B, Tab], R)
    end.

rec_filter(B, schema, _RecName, Recs) ->
    case catch mnesia_bup:refresh_cookie(Recs, B#backup_args.cookie) of
	Recs2 when is_list(Recs2) ->
	    Recs2;
	{error, _Reason} ->
	    %% No schema table cookie
	    Recs
    end;
rec_filter(_B, Tab, Tab, Recs) ->
    Recs;
rec_filter(_B, Tab, _RecName, Recs) ->
    [setelement(1, Rec, Tab) || Rec <- Recs].

ets2dcd(Tab) ->
    ets2dcd(Tab, dcd).

ets2dcd(Tab, Ftype) ->
    Fname =
	case Ftype of
	    dcd -> mnesia_lib:tab2dcd(Tab);
	    dmp -> mnesia_lib:tab2dmp(Tab)
	end,
    TmpF = mnesia_lib:tab2tmp(Tab),
    file:delete(TmpF),
    Log  = open_log({Tab, ets2dcd}, dcd_log_header(), TmpF, false),
    mnesia_lib:db_fixtable(ram_copies, Tab, true),
    ok   = ets2dcd(mnesia_lib:db_init_chunk(ram_copies, Tab, 1000), Tab, Log),
    mnesia_lib:db_fixtable(ram_copies, Tab, false),
    close_log(Log),
    ok = file:rename(TmpF, Fname),
    %% Remove old log data which is now in the new dcd.
    %% No one else should be accessing this file!
    file:delete(mnesia_lib:tab2dcl(Tab)),
    ok.

ets2dcd('$end_of_table', _Tab, _Log) ->
    ok;
ets2dcd({Recs, Cont}, Tab, Log) ->
    ok = disk_log:log_terms(Log, Recs),
    ets2dcd(mnesia_lib:db_chunk(ram_copies, Cont), Tab, Log).

dcd2ets(Tab) ->
    dcd2ets(Tab, mnesia_monitor:get_env(auto_repair)).

dcd2ets(Tab, Rep) ->
    Dcd = mnesia_lib:tab2dcd(Tab),
    case mnesia_lib:exists(Dcd) of
	true ->
	    Log = open_log({Tab, dcd2ets}, dcd_log_header(), Dcd,
			   true, Rep, read_only),
	    Data = chunk_log(Log, start),
	    ok = insert_dcdchunk(Data, Log, Tab),
	    close_log(Log),
	    load_dcl(Tab, Rep);
	false -> %% Handle old dets files, and conversion from disc_only to disc.
	    Fname = mnesia_lib:tab2dat(Tab),
	    Type = val({Tab, setorbag}),
	    case mnesia_lib:dets_to_ets(Tab, Tab, Fname, Type, Rep, yes) of
		loaded ->
		    ets2dcd(Tab),
		    file:delete(Fname),
		    0;
		{error, Error} ->
		    erlang:error({"Failed to load table from disc", [Tab, Error]})
	    end
    end.

insert_dcdchunk({Cont, [LogH | Rest]}, Log, Tab)
  when is_record(LogH, log_header),
       LogH#log_header.log_kind == dcd_log,
       LogH#log_header.log_version >= "1.0" ->
    insert_dcdchunk({Cont, Rest}, Log, Tab);

insert_dcdchunk({Cont, Recs}, Log, Tab) ->
    true = ets:insert(Tab, Recs),
    insert_dcdchunk(chunk_log(Log, Cont), Log, Tab);
insert_dcdchunk(eof, _Log, _Tab) ->
    ok.

load_dcl(Tab, Rep) ->
    FName = mnesia_lib:tab2dcl(Tab),
    case mnesia_lib:exists(FName) of
	true ->
	    Name = {load_dcl,Tab},
	    open_log(Name,
		     dcl_log_header(),
		     FName,
		     true,
		     Rep,
		     read_only),
	    FirstChunk = chunk_log(Name, start),
            N = insert_logchunk(FirstChunk, Name, 0),
	    close_log(Name),
	    N;
	false ->
	    0
    end.

insert_logchunk({C2, Recs}, Tab, C) ->
    N = add_recs(Recs, C),
    insert_logchunk(chunk_log(Tab, C2), Tab, C+N);
insert_logchunk(eof, _Tab, C) ->
    C.

add_recs([{{Tab, _Key}, Val, write} | Rest], N) ->
    true = ets:insert(Tab, Val),
    add_recs(Rest, N+1);
add_recs([{{Tab, Key}, _Val, delete} | Rest], N) ->
    true = ets:delete(Tab, Key),
    add_recs(Rest, N+1);
add_recs([{{Tab, _Key}, Val, delete_object} | Rest], N) ->
    true = ets:match_delete(Tab, Val),
    add_recs(Rest, N+1);
add_recs([{{Tab, Key}, Val, update_counter} | Rest], N) ->
    {RecName, Incr} = Val,
    case catch ets:update_counter(Tab, Key, Incr) of
	CounterVal when is_integer(CounterVal) ->
	    ok;
	_ when Incr < 0 ->
	    Zero = {RecName, Key, 0},
	    true = ets:insert(Tab, Zero);
	_ ->
	    Zero = {RecName, Key, Incr},
	    true = ets:insert(Tab, Zero)
    end,
    add_recs(Rest, N+1);
add_recs([LogH|Rest], N)
  when is_record(LogH, log_header),
       LogH#log_header.log_kind == dcl_log,
       LogH#log_header.log_version >= "1.0" ->
    add_recs(Rest, N);
add_recs([{{Tab, _Key}, _Val, clear_table} | Rest], N) ->
    Size = ets:info(Tab, size),
    true = ets:delete_all_objects(Tab),
    add_recs(Rest, N+Size);
add_recs([], N) ->
    N.