%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1996-2014. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%

%%
-module(mnesia_dumper).

%% The InitBy arg may be one of the following:
%% scan_decisions     Initial scan for decisions
%% startup	      Initial dump during startup
%% schema_prepare     Dump initiated during schema transaction preparation
%% schema_update      Dump initiated during schema transaction commit
%% fast_schema_update A schema_update, but ignores the log file
%% user		      Dump initiated by user
%% write_threshold    Automatic dump caused by too many log writes
%% time_threshold     Automatic dump caused by timeout

%% Public interface
-export([
	 get_log_writes/0,
	 incr_log_writes/0,
         needs_dump_ets/1,
	 raw_dump_table/2,
	 raw_named_dump_table/2,
	 start_regulator/0,
	 opt_dump_log/1,
	 update/3,
	 snapshot_dcd/1
	]).

 %% Internal stuff
-export([regulator_init/1]).

-include("mnesia.hrl").
-include_lib("kernel/include/file.hrl").

-import(mnesia_lib, [fatal/2, dbg_out/2]).

-define(REGULATOR_NAME, mnesia_dumper_load_regulator).
-define(DumpToEtsMultiplier, 4).

get_log_writes() ->
    Max = mnesia_monitor:get_env(dump_log_write_threshold),
    Prev = mnesia_lib:read_counter(trans_log_writes),
    Left = mnesia_lib:read_counter(trans_log_writes_left),
    Diff = Max - Left,
    Prev + Diff.

incr_log_writes() ->
    Left = mnesia_lib:incr_counter(trans_log_writes_left, -1),
    if
	Left > 0 ->
	    ignore;
	true ->
	    adjust_log_writes(true)
    end.

adjust_log_writes(DoCast) ->
    Token = {mnesia_adjust_log_writes, self()},
    case global:set_lock(Token, [node()], 1) of
	false ->
	    ignore; %% Somebody else is sending a dump request
	true ->
	    case DoCast of
		false ->
		    ignore;
		true ->
		    mnesia_controller:async_dump_log(write_threshold)
	    end,
	    Max = mnesia_monitor:get_env(dump_log_write_threshold),
	    Left = mnesia_lib:read_counter(trans_log_writes_left),
	    %% Don't care if we lost a few writes
	    mnesia_lib:set_counter(trans_log_writes_left, Max),
	    Diff = Max - Left,
	    _ = mnesia_lib:incr_counter(trans_log_writes, Diff),
	    global:del_lock(Token, [node()])
    end.

%% Returns 'ok' or exits
opt_dump_log(InitBy) ->
    Reg = case whereis(?REGULATOR_NAME) of
	      undefined ->
		  nopid;
	      Pid when is_pid(Pid) ->
		  Pid
	  end,
    perform_dump(InitBy, Reg).

snapshot_dcd(Tables) ->
    lists:foreach(
      fun(Tab) ->
	      case mnesia_lib:storage_type_at_node(node(), Tab) of
		  disc_copies ->
		      mnesia_log:ets2dcd(Tab);
		  _ ->
		      %% Storage type was checked before queueing the op, though
		      skip
	      end
      end, Tables),
    dumped.

%% Scan for decisions
perform_dump(InitBy, Regulator) when InitBy == scan_decisions ->
    ?eval_debug_fun({?MODULE, perform_dump}, [InitBy]),

    dbg_out("Transaction log dump initiated by ~w~n", [InitBy]),
    scan_decisions(mnesia_log:previous_log_file(), InitBy, Regulator),
    scan_decisions(mnesia_log:latest_log_file(), InitBy, Regulator);

%% Propagate the log into the DAT-files
perform_dump(InitBy, Regulator) ->
    ?eval_debug_fun({?MODULE, perform_dump}, [InitBy]),
    LogState = mnesia_log:prepare_log_dump(InitBy),
    dbg_out("Transaction log dump initiated by ~w: ~w~n",
	    [InitBy, LogState]),
    adjust_log_writes(false),
    case LogState of
	already_dumped ->
	    mnesia_recover:allow_garb(),
	    dumped;
	{needs_dump, Diff} ->
	    U = mnesia_monitor:get_env(dump_log_update_in_place),
	    Cont = mnesia_log:init_log_dump(),
	    mnesia_recover:sync(),
	    try do_perform_dump(Cont, U, InitBy, Regulator, undefined) of
		ok ->
		    ?eval_debug_fun({?MODULE, post_dump}, [InitBy]),
		    case mnesia_monitor:use_dir() of
			true ->
			    mnesia_recover:dump_decision_tab();
			false ->
			    mnesia_log:purge_some_logs()
		    end,
		    mnesia_recover:allow_garb(),
		    %% And now to the crucial point...
		    mnesia_log:confirm_log_dump(Diff)
	    catch exit:Reason when Reason =/= fatal ->
		    case mnesia_monitor:get_env(auto_repair) of
			true ->
			    mnesia_lib:important(error, Reason),
			    %% Ignore rest of the log
			    mnesia_log:confirm_log_dump(Diff);
			false ->
			    fatal(error, Reason)
		    end
	    end;
	{error, Reason} ->
	    {error, {"Cannot prepare log dump", Reason}}
    end.

scan_decisions(Fname, InitBy, Regulator) ->
    Exists = mnesia_lib:exists(Fname),
    case Exists of
	false ->
	    ok;
	true ->
	    Header = mnesia_log:trans_log_header(),
	    Name = previous_log,
	    mnesia_log:open_log(Name, Header, Fname, Exists,
				mnesia_monitor:get_env(auto_repair), read_only),
	    Cont = start,
	    try
		do_perform_dump(Cont, false, InitBy, Regulator, undefined)
	    catch exit:Reason when Reason =/= fatal ->
		    {error, Reason}
	    after mnesia_log:close_log(Name)
	    end
    end.

do_perform_dump(Cont, InPlace, InitBy, Regulator, OldVersion) ->
    case mnesia_log:chunk_log(Cont) of
	{C2, Recs} ->
	    try insert_recs(Recs, InPlace, InitBy, Regulator, OldVersion) of
		Version ->
		    do_perform_dump(C2, InPlace, InitBy, Regulator, Version)
	    catch _:R when R =/= fatal ->
		    ST = erlang:get_stacktrace(),
		    Reason = {"Transaction log dump error: ~p~n", [{R, ST}]},
		    close_files(InPlace, {error, Reason}, InitBy),
		    exit(Reason)
	    end;
	eof ->
	    close_files(InPlace, ok, InitBy),
	    erase(mnesia_dumper_dets),
	    ok
    end.

insert_recs([Rec | Recs], InPlace, InitBy, Regulator, LogV) ->
    regulate(Regulator),
    case insert_rec(Rec, InPlace, InitBy, LogV) of
	LogH when is_record(LogH, log_header) ->
	    insert_recs(Recs, InPlace, InitBy, Regulator, LogH#log_header.log_version);
	_ ->
	    insert_recs(Recs, InPlace, InitBy, Regulator, LogV)
    end;

insert_recs([], _InPlace, _InitBy, _Regulator, Version) ->
    Version.

insert_rec(Rec, _InPlace, scan_decisions, _LogV) ->
    if
	is_record(Rec, commit) ->
	    ignore;
	is_record(Rec, log_header) ->
	    ignore;
	true ->
	    mnesia_recover:note_log_decision(Rec, scan_decisions)
    end;
insert_rec(Rec, InPlace, InitBy, LogV) when is_record(Rec, commit) ->
    %% Determine the Outcome of the transaction and recover it
    D = Rec#commit.decision,
    case mnesia_recover:wait_for_decision(D, InitBy) of
	{Tid, committed} ->
	    do_insert_rec(Tid, Rec, InPlace, InitBy, LogV);
	{Tid, aborted} ->
	    case InitBy of
		startup ->
		    mnesia_schema:undo_prepare_commit(Tid, Rec);
		_ ->
		    ok
	    end
    end;
insert_rec(H, _InPlace, _InitBy, _LogV) when is_record(H, log_header) ->
    CurrentVersion = mnesia_log:version(),
    if
        H#log_header.log_kind /= trans_log ->
	    exit({"Bad kind of transaction log", H});
	H#log_header.log_version == CurrentVersion ->
	    ok;
	H#log_header.log_version == "4.2" ->
	    ok;
	H#log_header.log_version == "4.1" ->
	    ok;
	H#log_header.log_version == "4.0" ->
	    ok;
	true ->
	    fatal("Bad version of transaction log: ~p~n", [H])
    end,
    H;

insert_rec(_Rec, _InPlace, _InitBy, _LogV) ->
    ok.

do_insert_rec(Tid, Rec, InPlace, InitBy, LogV) ->
    case Rec#commit.schema_ops of
	[] ->
	    ignore;
	SchemaOps ->
	    case val({schema, storage_type}) of
		ram_copies ->
		    insert_ops(Tid, schema_ops, SchemaOps, InPlace, InitBy, LogV);
	        Storage ->
		    true = open_files(schema, Storage, InPlace, InitBy),
		    insert_ops(Tid, schema_ops, SchemaOps, InPlace, InitBy, LogV)
	    end
    end,
    D = Rec#commit.disc_copies,
    insert_ops(Tid, disc_copies, D, InPlace, InitBy, LogV),
    case InitBy of
	startup ->
	    DO = Rec#commit.disc_only_copies,
	    insert_ops(Tid, disc_only_copies, DO, InPlace, InitBy, LogV);
	_ ->
	    ignore
    end.


update(_Tid, [], _DumperMode) ->
    dumped;
update(Tid, SchemaOps, DumperMode) ->
    UseDir = mnesia_monitor:use_dir(),
    Res = perform_update(Tid, SchemaOps, DumperMode, UseDir),
    mnesia_controller:release_schema_commit_lock(),
    Res.

perform_update(_Tid, _SchemaOps, mandatory, true) ->
    %% Force a dump of the transaction log in order to let the
    %% dumper perform needed updates

    InitBy = schema_update,
    ?eval_debug_fun({?MODULE, dump_schema_op}, [InitBy]),
    opt_dump_log(InitBy);
perform_update(Tid, SchemaOps, _DumperMode, _UseDir) ->
    %% No need for a full transaction log dump.
    %% Ignore the log file and perform only perform
    %% the corresponding updates.

    InitBy = fast_schema_update,
    InPlace = mnesia_monitor:get_env(dump_log_update_in_place),
    try insert_ops(Tid, schema_ops, SchemaOps, InPlace, InitBy,
		   mnesia_log:version()),
	 ?eval_debug_fun({?MODULE, post_dump}, [InitBy]),
	 close_files(InPlace, ok, InitBy),
	 ok
    catch _:Reason when Reason =/= fatal ->
	    ST = erlang:get_stacktrace(),
	    Error = {error, {"Schema update error", {Reason, ST}}},
	    close_files(InPlace, Error, InitBy),
            fatal("Schema update error ~p ~p", [{Reason,ST}, SchemaOps])
    end.

insert_ops(_Tid, _Storage, [], _InPlace, _InitBy, _) ->    ok;
insert_ops(Tid, Storage, [Op], InPlace, InitBy, Ver)  when Ver >= "4.3"->
    insert_op(Tid, Storage, Op, InPlace, InitBy),
    ok;
insert_ops(Tid, Storage, [Op | Ops], InPlace, InitBy, Ver)  when Ver >= "4.3"->
    insert_op(Tid, Storage, Op, InPlace, InitBy),
    insert_ops(Tid, Storage, Ops, InPlace, InitBy, Ver);
insert_ops(Tid, Storage, [Op | Ops], InPlace, InitBy, Ver) when Ver < "4.3" ->
    insert_ops(Tid, Storage, Ops, InPlace, InitBy, Ver),
    insert_op(Tid, Storage, Op, InPlace, InitBy).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Normal ops

disc_insert(_Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy) ->
    case open_files(Tab, Storage, InPlace, InitBy) of
	true ->
	    case Storage of
		disc_copies when Tab /= schema ->
		    mnesia_log:append({?MODULE,Tab}, {{Tab, Key}, Val, Op}),
		    ok;
		_ ->
		    dets_insert(Op,Tab,Key,Val)
	    end;
	false ->
	    ignore
    end.

%% To fix update_counter so that it behaves better.
%% i.e. if nothing have changed in tab except update_counter
%% trust that the value in the dets file is correct.
%% Otherwise we will get a double increment.
%% This is perfect but update_counter is a dirty op.

dets_insert(Op,Tab,Key,Val) ->
    case Op of
	write ->
	    dets_updated(Tab,Key),
	    ok = dets:insert(Tab, Val);
	delete ->
	    dets_updated(Tab,Key),
	    ok = dets:delete(Tab, Key);
	update_counter ->
	    case dets_incr_counter(Tab,Key) of
		true ->
		    {RecName, Incr} = Val,
		    try _ = dets:update_counter(Tab, Key, Incr)
		    catch error:_ when Incr < 0 ->
			    Zero = {RecName, Key, 0},
			    ok = dets:insert(Tab, Zero);
			  error:_ ->
			    Init = {RecName, Key, Incr},
			    ok = dets:insert(Tab, Init)
		    end;
		false ->  ok
	    end;
	delete_object ->
	    dets_updated(Tab,Key),
	    ok = dets:delete_object(Tab, Val);
	clear_table ->
	    dets_cleared(Tab),
	    ok = dets:delete_all_objects(Tab)
    end.

dets_updated(Tab,Key) ->
    case get(mnesia_dumper_dets) of
	undefined ->
	    Empty = gb_trees:empty(),
	    Tree = gb_trees:insert(Tab, gb_sets:singleton(Key), Empty),
	    put(mnesia_dumper_dets, Tree);
	Tree ->
	    case gb_trees:lookup(Tab,Tree) of
		{value, cleared} -> ignore;
		{value, Set} ->
		    T = gb_trees:update(Tab, gb_sets:add(Key, Set), Tree),
		    put(mnesia_dumper_dets, T);
		none ->
		    T = gb_trees:insert(Tab, gb_sets:singleton(Key), Tree),
		    put(mnesia_dumper_dets, T)
	    end
    end.

dets_incr_counter(Tab,Key) ->
    case get(mnesia_dumper_dets) of
	undefined -> false;
	Tree ->
	    case gb_trees:lookup(Tab,Tree) of
		{value, cleared} -> true;
		{value, Set} -> gb_sets:is_member(Key, Set);
		none -> false
	    end
    end.

dets_cleared(Tab) ->
    case get(mnesia_dumper_dets) of
	undefined ->
	    Empty = gb_trees:empty(),
	    Tree = gb_trees:insert(Tab, cleared, Empty),
	    put(mnesia_dumper_dets, Tree);
	Tree ->
	    case gb_trees:lookup(Tab,Tree) of
		{value, cleared} -> ignore;
		_ ->
		    T = gb_trees:enter(Tab, cleared, Tree),
		    put(mnesia_dumper_dets, T)
	    end
    end.

insert(Tid, Storage, Tab, Key, [Val | Tail], Op, InPlace, InitBy) ->
    insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy),
    insert(Tid, Storage, Tab, Key, Tail, Op, InPlace, InitBy);

insert(_Tid, _Storage, _Tab, _Key, [], _Op, _InPlace, _InitBy) ->
    ok;

insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy) ->
    Item = {{Tab, Key}, Val, Op},
    case InitBy of
	startup ->
	    disc_insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy);

	_ when Storage == ram_copies ->
	    mnesia_tm:do_update_op(Tid, Storage, Item),
	    Snmp = mnesia_tm:prepare_snmp(Tab, Key, [Item]),
	    mnesia_tm:do_snmp(Tid, Snmp);

	_ when Storage == disc_copies ->
	    disc_insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy),
	    mnesia_tm:do_update_op(Tid, Storage, Item),
	    Snmp = mnesia_tm:prepare_snmp(Tab, Key, [Item]),
	    mnesia_tm:do_snmp(Tid, Snmp);

	_ when Storage == disc_only_copies ->
	    mnesia_tm:do_update_op(Tid, Storage, Item),
	    Snmp = mnesia_tm:prepare_snmp(Tab, Key, [Item]),
	    mnesia_tm:do_snmp(Tid, Snmp);

	_ when Storage == unknown ->
	    ignore
    end.

disc_delete_table(Tab, Storage) ->
    case mnesia_monitor:use_dir() of
	true ->
	    if
		Storage == disc_only_copies; Tab == schema ->
		    mnesia_monitor:unsafe_close_dets(Tab),
		    Dat = mnesia_lib:tab2dat(Tab),
		    file:delete(Dat),
		    ok;
		true ->
		    DclFile = mnesia_lib:tab2dcl(Tab),
		    case get({?MODULE,Tab}) of
			{opened_dumper, dcl} ->
			    del_opened_tab(Tab),
			    mnesia_log:unsafe_close_log(Tab);
			_ ->
			    ok
		    end,
		    file:delete(DclFile),
		    DcdFile = mnesia_lib:tab2dcd(Tab),
		    file:delete(DcdFile),
		    ok
	    end,
	    erase({?MODULE, Tab}),
	    ok;
	false ->
	    ok
    end.

disc_delete_indecies(_Tab, _Cs, Storage) when Storage /= disc_only_copies ->
    ok;
disc_delete_indecies(Tab, Cs, disc_only_copies) ->
    Indecies = Cs#cstruct.index,
    mnesia_index:del_transient(Tab, Indecies, disc_only_copies).

insert_op(Tid, Storage, {{Tab, Key}, Val, Op}, InPlace, InitBy) ->
    %% Propagate to disc only
    disc_insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy);

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% NOTE that all operations below will only
%% be performed if the dump is initiated by
%% startup or fast_schema_update
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

insert_op(_Tid, schema_ops, _OP, _InPlace, Initby)
  when Initby /= startup,
       Initby /= fast_schema_update,
       Initby /= schema_update ->
    ignore;

insert_op(Tid, _, {op, rec, Storage, Item}, InPlace, InitBy) ->
    {{Tab, Key}, ValList, Op} = Item,
    insert(Tid, Storage, Tab, Key, ValList, Op, InPlace, InitBy);

insert_op(Tid, _, {op, change_table_copy_type, N, FromS, ToS, TabDef}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    Val = mnesia_schema:insert_cstruct(Tid, Cs, true), % Update ram only
    {schema, Tab, _} = Val,
    case lists:member(N, val({current, db_nodes})) of
	true when InitBy /= startup  ->
	    mnesia_controller:add_active_replica(Tab, N, Cs);
	_ ->
	    ignore
    end,
    if
	N == node() ->
	    Dmp  = mnesia_lib:tab2dmp(Tab),
	    Dat  = mnesia_lib:tab2dat(Tab),
	    Dcd  = mnesia_lib:tab2dcd(Tab),
	    Dcl  = mnesia_lib:tab2dcl(Tab),
	    case {FromS, ToS} of
		{ram_copies, disc_copies} when Tab == schema ->
		    ok = ensure_rename(Dmp, Dat);
		{ram_copies, disc_copies} ->
		    file:delete(Dcl),
		    ok = ensure_rename(Dmp, Dcd);
		{disc_copies, ram_copies} when Tab == schema ->
		    mnesia_lib:set(use_dir, false),
		    mnesia_monitor:unsafe_close_dets(Tab),
		    ok = file:delete(Dat);
		{disc_copies, ram_copies} ->
		    _ = file:delete(Dcl),
		    _ = file:delete(Dcd),
		    ok;
		{ram_copies, disc_only_copies} ->
		    ok = ensure_rename(Dmp, Dat),
		    true = open_files(Tab, disc_only_copies, InPlace, InitBy),
		    %% ram_delete_table must be done before init_indecies,
		    %% it uses info which is reset in init_indecies,
		    %% it doesn't matter, because init_indecies don't use
		    %% the ram replica of the table when creating the disc
		    %% index; Could be improved :)
		    mnesia_schema:ram_delete_table(Tab, FromS),
		    PosList = Cs#cstruct.index,
		    mnesia_index:init_indecies(Tab, disc_only_copies, PosList);
		{disc_only_copies, ram_copies} ->
		    mnesia_monitor:unsafe_close_dets(Tab),
		    disc_delete_indecies(Tab, Cs, disc_only_copies),
		    case InitBy of
			startup ->
			    ignore;
			_ ->
			    mnesia_controller:get_disc_copy(Tab),
			    ok
		    end,
		    disc_delete_table(Tab, disc_only_copies);
		{disc_copies, disc_only_copies} ->
		    ok = ensure_rename(Dmp, Dat),
		    true = open_files(Tab, disc_only_copies, InPlace, InitBy),
		    mnesia_schema:ram_delete_table(Tab, FromS),
		    PosList = Cs#cstruct.index,
		    mnesia_index:init_indecies(Tab, disc_only_copies, PosList),
		    _ = file:delete(Dcl),
		    _ = file:delete(Dcd),
		    ok;
		{disc_only_copies, disc_copies} ->
		    mnesia_monitor:unsafe_close_dets(Tab),
		    disc_delete_indecies(Tab, Cs, disc_only_copies),
		    case InitBy of
			startup ->
			    ignore;
			_ ->
			    mnesia_log:ets2dcd(Tab),
			    mnesia_controller:get_disc_copy(Tab),
			    disc_delete_table(Tab, disc_only_copies)
		    end
	    end;
	true ->
	    ignore
    end,
    S = val({schema, storage_type}),
    disc_insert(Tid, S, schema, Tab, Val, write, InPlace, InitBy);

insert_op(Tid, _, {op, transform, _Fun, TabDef}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    case mnesia_lib:cs_to_storage_type(node(), Cs) of
	disc_copies ->
	    open_dcl(Cs#cstruct.name);
	_ ->
	    ignore
    end,
    insert_cstruct(Tid, Cs, true, InPlace, InitBy);

%%%  Operations below this are handled without using the logg.

insert_op(Tid, _, {op, restore_recreate, TabDef}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    Tab = Cs#cstruct.name,
    Type = Cs#cstruct.type,
    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
    %% Delete all possibly existing files and tables
    disc_delete_table(Tab, Storage),
    disc_delete_indecies(Tab, Cs, Storage),
    case InitBy of
	startup ->
	    ignore;
	_ ->
	    case ?catch_val({Tab, cstruct}) of
		{'EXIT', _} -> ignore;
		_ ->
		    mnesia_schema:ram_delete_table(Tab, Storage),
		    mnesia_checkpoint:tm_del_copy(Tab, node())
	    end
    end,
    StorageProps = Cs#cstruct.storage_properties,
    
    %% And create new ones..
    if
	(InitBy == startup) or (Storage == unknown) ->
	    ignore;
	Storage == ram_copies ->
	    EtsProps = proplists:get_value(ets, StorageProps, []),
	    Args = [{keypos, 2}, public, named_table, Type | EtsProps],
	    mnesia_monitor:mktab(Tab, Args);
	Storage == disc_copies ->
	    EtsProps = proplists:get_value(ets, StorageProps, []),
	    Args = [{keypos, 2}, public, named_table, Type | EtsProps],
	    mnesia_monitor:mktab(Tab, Args),
	    File = mnesia_lib:tab2dcd(Tab),
	    FArg = [{file, File}, {name, {mnesia,create}},
		    {repair, false}, {mode, read_write}],
	    {ok, Log} = mnesia_monitor:open_log(FArg),
	    mnesia_monitor:unsafe_close_log(Log);
	Storage == disc_only_copies ->
	    File = mnesia_lib:tab2dat(Tab),
	    file:delete(File),
	    DetsProps = proplists:get_value(dets, StorageProps, []),
	    Args = [{file, mnesia_lib:tab2dat(Tab)},
		    {type, mnesia_lib:disk_type(Tab, Type)},
		    {keypos, 2},
		    {repair, mnesia_monitor:get_env(auto_repair)} 
		    | DetsProps ],
	    mnesia_monitor:open_dets(Tab, Args)
    end,
    insert_op(Tid, ignore, {op, create_table, TabDef}, InPlace, InitBy);

insert_op(Tid, _, {op, create_table, TabDef}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    insert_cstruct(Tid, Cs, false, InPlace, InitBy),
    Tab = Cs#cstruct.name,
    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
    StorageProps = Cs#cstruct.storage_properties,
    case InitBy of
	startup ->
	    case Storage of
		unknown ->
		    ignore;
		ram_copies ->
		    ignore;
		disc_copies ->
		    Dcd = mnesia_lib:tab2dcd(Tab),
		    case mnesia_lib:exists(Dcd) of
			true -> ignore;
			false ->
			    mnesia_log:open_log(temp,
						mnesia_log:dcd_log_header(),
						Dcd,
						false,
						false,
						read_write),
			    mnesia_log:unsafe_close_log(temp)
		    end;
		_ ->
		    DetsProps = proplists:get_value(dets, StorageProps, []),

		    Args = [{file, mnesia_lib:tab2dat(Tab)},
			    {type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)},
			    {keypos, 2},
			    {repair, mnesia_monitor:get_env(auto_repair)} 
			    | DetsProps ],
		    case mnesia_monitor:open_dets(Tab, Args) of
			{ok, _} ->
			    mnesia_monitor:unsafe_close_dets(Tab);
			{error, Error} ->
			    exit({"Failed to create dets table", Error})
		    end
	    end;
	_ ->
	    Copies = mnesia_lib:copy_holders(Cs),
	    Active = mnesia_lib:intersect(Copies, val({current, db_nodes})),
	    [mnesia_controller:add_active_replica(Tab, N, Cs) || N <- Active],
	    
	    case Storage of
		unknown ->
		    mnesia_lib:unset({Tab, create_table}),
		    case Cs#cstruct.local_content of
			true ->
			    ignore;
			false ->
			    mnesia_lib:set_remote_where_to_read(Tab)
		    end;
		_ ->
		    case Cs#cstruct.local_content of
			true ->
			    mnesia_lib:set_local_content_whereabouts(Tab);
			false ->
			    mnesia_lib:set({Tab, where_to_read}, node())
		    end,
		    case Storage of
			ram_copies ->
			    ignore;
			_ ->
			    %% Indecies are still created by loader
			    disc_delete_indecies(Tab, Cs, Storage)
			    %% disc_delete_table(Tab, Storage)
		    end,

		    %% Update whereabouts and create table
		    mnesia_controller:create_table(Tab),
		    mnesia_lib:unset({Tab, create_table})
	    end
    end;

insert_op(_Tid, _, {op, dump_table, Size, TabDef}, _InPlace, _InitBy) ->
    case Size of
	unknown ->
	    ignore;
	_ ->
	    Cs = mnesia_schema:list2cs(TabDef),
	    Tab = Cs#cstruct.name,
	    Dmp = mnesia_lib:tab2dmp(Tab),
	    Dat = mnesia_lib:tab2dcd(Tab),
	    case Size of
		0 ->
	    	    %% Assume that table files already are closed
		    file:delete(Dmp),
		    file:delete(Dat);
		_ ->
		    ok = ensure_rename(Dmp, Dat)
	    end
    end;

insert_op(Tid, _, {op, delete_table, TabDef}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    Tab = Cs#cstruct.name,
    case mnesia_lib:cs_to_storage_type(node(), Cs) of
	unknown ->
	    ignore;
	Storage ->
	    disc_delete_table(Tab, Storage),
	    disc_delete_indecies(Tab, Cs, Storage),
	    case InitBy of
		startup ->
		    ignore;
		_ ->
		    mnesia_schema:ram_delete_table(Tab, Storage),
		    mnesia_checkpoint:tm_del_copy(Tab, node())
	    end
    end,
    delete_cstruct(Tid, Cs, InPlace, InitBy);

insert_op(Tid, _, {op, clear_table, TabDef}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    Tab = Cs#cstruct.name,
    case mnesia_lib:cs_to_storage_type(node(), Cs) of
	unknown ->
	    ignore;
	Storage ->
	    Oid = '_', %%val({Tab, wild_pattern}),
	    if Storage == disc_copies ->
		    open_dcl(Cs#cstruct.name);
	       true ->
		    ignore
	    end,
	    %% Need to catch this, it crashes on ram_copies if
	    %% the op comes before table is loaded at startup.
	    ?CATCH(insert(Tid, Storage, Tab, '_', Oid, clear_table, InPlace, InitBy))
    end;

insert_op(Tid, _, {op, merge_schema, TabDef}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    case Cs#cstruct.name of
	schema ->
	    %% If we bootstrap an empty (diskless) mnesia from another node
	    %% we might have changed the storage_type of schema.
	    %% I think this is a good place to do it.
	    Update = fun(NS = {Node,Storage}) ->
			     case mnesia_lib:cs_to_storage_type(Node, Cs) of
				 Storage -> NS;
				 disc_copies when Node == node() ->
				     Dir = mnesia_lib:dir(),
				     ok = mnesia_schema:opt_create_dir(true, Dir),
				     mnesia_schema:purge_dir(Dir, []),
				     mnesia_log:purge_all_logs(),

				     mnesia_lib:set(use_dir, true),
				     mnesia_log:init(),
				     Ns = val({current, db_nodes}),
				     F = fun(U) -> mnesia_recover:log_mnesia_up(U) end,
				     lists:foreach(F, Ns),
				     raw_named_dump_table(schema, dat),
				     temp_set_master_nodes(),
				     {Node,disc_copies};
				 CSstorage ->
				     {Node,CSstorage}
			     end
		     end,

	    W2C0 = val({schema, where_to_commit}),
	    W2C = case W2C0 of
		      {blocked, List} ->
			  {blocked,lists:map(Update,List)};
		      List ->
			  lists:map(Update,List)
		  end,
	    if W2C == W2C0 -> ignore;
	       true -> mnesia_lib:set({schema, where_to_commit}, W2C)
	    end;
	_ ->
	    ignore
    end,
    insert_cstruct(Tid, Cs, false, InPlace, InitBy);

insert_op(Tid, _, {op, del_table_copy, Storage, Node, TabDef}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    Tab = Cs#cstruct.name,
    if
	Tab == schema, Storage == ram_copies ->
	    insert_cstruct(Tid, Cs, true, InPlace, InitBy);
        Tab /= schema ->
	    mnesia_controller:del_active_replica(Tab, Node),
	    mnesia_lib:del({Tab, Storage}, Node),
	    if
		Node == node() ->
		    case Cs#cstruct.local_content of
			true -> mnesia_lib:set({Tab, where_to_read}, nowhere);
			false -> mnesia_lib:set_remote_where_to_read(Tab)
		    end,
		    mnesia_lib:del({schema, local_tables}, Tab),
		    mnesia_lib:set({Tab, storage_type}, unknown),
		    insert_cstruct(Tid, Cs, true, InPlace, InitBy),
		    disc_delete_table(Tab, Storage),
		    disc_delete_indecies(Tab, Cs, Storage),
		    mnesia_schema:ram_delete_table(Tab, Storage),
		    mnesia_checkpoint:tm_del_copy(Tab, Node);
		true ->
		    case val({Tab, where_to_read}) of
			Node ->
			    mnesia_lib:set_remote_where_to_read(Tab);
			_  ->
			    ignore
		    end,
		    insert_cstruct(Tid, Cs, true, InPlace, InitBy)
	    end
    end;

insert_op(Tid, _, {op, add_table_copy, _Storage, _Node, TabDef}, InPlace, InitBy) ->
    %% During prepare commit, the files was created
    %% and the replica was announced
    Cs = mnesia_schema:list2cs(TabDef),
    insert_cstruct(Tid, Cs, true, InPlace, InitBy);

insert_op(Tid, _, {op, add_snmp, _Us, TabDef}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    insert_cstruct(Tid, Cs, true, InPlace, InitBy);

insert_op(Tid, _, {op, del_snmp, TabDef}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    Tab = Cs#cstruct.name,
    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
    if
	InitBy /= startup,
	Storage /= unknown ->
	    case ?catch_val({Tab, {index, snmp}}) of
		{'EXIT', _} ->
		    ignore;
		Stab ->
		    mnesia_snmp_hook:delete_table(Tab, Stab),
		    mnesia_lib:unset({Tab, {index, snmp}})
	    end;
	true ->
	    ignore
    end,
    insert_cstruct(Tid, Cs, true, InPlace, InitBy);

insert_op(Tid, _, {op, add_index, Pos, TabDef}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    Tab = insert_cstruct(Tid, Cs, true, InPlace, InitBy),
    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
    case InitBy of
	startup when Storage == disc_only_copies ->
	    true = open_files(Tab, Storage, InPlace, InitBy),
	    mnesia_index:init_indecies(Tab, Storage, [Pos]);
	startup ->
	    ignore;
	_  ->
	    case val({Tab,where_to_read}) of
		nowhere -> ignore;
		_ ->
		    mnesia_index:init_indecies(Tab, Storage, [Pos])
	    end
    end;

insert_op(Tid, _, {op, del_index, Pos, TabDef}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    Tab = Cs#cstruct.name,
    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
    case InitBy of
	startup when Storage == disc_only_copies ->
	    mnesia_index:del_index_table(Tab, Storage, Pos);
	startup ->
	    ignore;
	_ ->
	    mnesia_index:del_index_table(Tab, Storage, Pos)
    end,
    insert_cstruct(Tid, Cs, true, InPlace, InitBy);

insert_op(Tid, _, {op, change_table_access_mode,TabDef, _OldAccess, _Access}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    case InitBy of
	startup -> ignore;
	_ -> mnesia_controller:change_table_access_mode(Cs)
    end,
    insert_cstruct(Tid, Cs, true, InPlace, InitBy);

insert_op(Tid, _, {op, change_table_majority,TabDef, _OldAccess, _Access}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    case InitBy of
	startup -> ignore;
	_ -> mnesia_controller:change_table_majority(Cs)
    end,
    insert_cstruct(Tid, Cs, true, InPlace, InitBy);

insert_op(Tid, _, {op, change_table_load_order, TabDef, _OldLevel, _Level}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    insert_cstruct(Tid, Cs, true, InPlace, InitBy);

insert_op(Tid, _, {op, delete_property, TabDef, PropKey}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    Tab = Cs#cstruct.name,
    mnesia_lib:unset({Tab, user_property, PropKey}),
    insert_cstruct(Tid, Cs, true, InPlace, InitBy);

insert_op(Tid, _, {op, write_property, TabDef, _Prop}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    insert_cstruct(Tid, Cs, true, InPlace, InitBy);

insert_op(Tid, _, {op, change_table_frag, _Change, TabDef}, InPlace, InitBy) ->
    Cs = mnesia_schema:list2cs(TabDef),
    insert_cstruct(Tid, Cs, true, InPlace, InitBy).

open_files(Tab, Storage, UpdateInPlace, InitBy)
  when Storage /= unknown, Storage /= ram_copies ->
    case get({?MODULE, Tab}) of
	undefined ->
	    case ?catch_val({Tab, setorbag}) of
		{'EXIT', _} ->
		    false;
		Type ->
		    case Storage of
			disc_copies when Tab /= schema ->
			    Bool = open_disc_copies(Tab, InitBy),
			    Bool;
			_ ->
			    Props = val({Tab, storage_properties}),
			    DetsProps = proplists:get_value(dets, Props, []),
			    Fname = prepare_open(Tab, UpdateInPlace),
			    Args = [{file, Fname},
				    {keypos, 2},
				    {repair, mnesia_monitor:get_env(auto_repair)},
				    {type, mnesia_lib:disk_type(Tab, Type)} 
				    | DetsProps],
			    {ok, _} = mnesia_monitor:open_dets(Tab, Args),
			    put({?MODULE, Tab}, {opened_dumper, dat}),
			    true
		    end
	    end;
	already_dumped ->
	    false;
	{opened_dumper, _} ->
	    true
    end;
open_files(_Tab, _Storage, _UpdateInPlace, _InitBy) ->
    false.

open_disc_copies(Tab, InitBy) ->
    DumpEts = needs_dump_ets(Tab),
    if
	DumpEts == false; InitBy == startup ->
            DclF = mnesia_lib:tab2dcl(Tab),
	    mnesia_log:open_log({?MODULE,Tab},
				mnesia_log:dcl_log_header(),
				DclF,
				mnesia_lib:exists(DclF),
				mnesia_monitor:get_env(auto_repair),
				read_write),
	    put({?MODULE, Tab}, {opened_dumper, dcl}),
	    true;
	true ->
	    mnesia_log:ets2dcd(Tab),
	    put({?MODULE, Tab}, already_dumped),
	    false
    end.

needs_dump_ets(Tab) ->
    DclF = mnesia_lib:tab2dcl(Tab),
    case file:read_file_info(DclF) of
        {error, enoent} ->
            false;
        {ok, DclInfo} ->
            DcdF =  mnesia_lib:tab2dcd(Tab),
            case file:read_file_info(DcdF) of
                {error, Reason} ->
                    mnesia_lib:dbg_out("File ~p info_error ~p ~n",
                                       [DcdF, Reason]),
                    true;
                {ok, DcdInfo} ->
                    Mul = case ?catch_val(dc_dump_limit) of
                              {'EXIT', _} -> ?DumpToEtsMultiplier;
                              Val -> Val
                          end,
                    DcdInfo#file_info.size =< (DclInfo#file_info.size * Mul)
            end
    end.

%% Always opens the dcl file for writing overriding already_dumped
%% mechanismen, used for schema transactions.
open_dcl(Tab) ->
    case get({?MODULE, Tab}) of
    	{opened_dumper, _} ->
	    true;
	_ -> %% undefined or already_dumped
	    DclF = mnesia_lib:tab2dcl(Tab),
	    mnesia_log:open_log({?MODULE,Tab},
				mnesia_log:dcl_log_header(),
				DclF,
				mnesia_lib:exists(DclF),
				mnesia_monitor:get_env(auto_repair),
				read_write),
	    put({?MODULE, Tab}, {opened_dumper, dcl}),
	    true
    end.

prepare_open(Tab, UpdateInPlace) ->
    Dat =  mnesia_lib:tab2dat(Tab),
    case UpdateInPlace of
	true ->
	    Dat;
	false ->
	    Tmp = mnesia_lib:tab2tmp(Tab),
	    try ok = mnesia_lib:copy_file(Dat, Tmp)
	    catch error:Error ->
		    fatal("Cannot copy dets file ~p to ~p: ~p~n",
			  [Dat, Tmp, Error])
	    end,
	    Tmp
    end.

del_opened_tab(Tab) ->
    erase({?MODULE, Tab}).

close_files(UpdateInPlace, Outcome, InitBy) -> % Update in place
    close_files(UpdateInPlace, Outcome, InitBy, get()).

close_files(InPlace, Outcome, InitBy, [{{?MODULE, Tab}, already_dumped} | Tail]) ->
    erase({?MODULE, Tab}),
    close_files(InPlace, Outcome, InitBy, Tail);
close_files(InPlace, Outcome, InitBy, [{{?MODULE, Tab}, {opened_dumper, Type}} | Tail]) ->
    erase({?MODULE, Tab}),
    case val({Tab, storage_type}) of
	disc_only_copies when InitBy /= startup ->
	    ignore;
	disc_copies when Tab /= schema ->
	    mnesia_log:close_log({?MODULE,Tab});
	Storage ->
	    do_close(InPlace, Outcome, Tab, Type, Storage)
    end,
    close_files(InPlace, Outcome, InitBy, Tail);

close_files(InPlace, Outcome, InitBy, [_ | Tail]) ->
    close_files(InPlace, Outcome, InitBy, Tail);
close_files(_, _, _InitBy, []) ->
    ok.

%% If storage is unknown during close clean up files, this can happen if timing
%% is right and dirty_write conflicts with schema operations.
do_close(_, _, Tab, dcl, unknown) ->
    mnesia_log:close_log({?MODULE,Tab}),
    file:delete(mnesia_lib:tab2dcl(Tab));
do_close(_, _, Tab, dcl, _) ->  %% To be safe, can it happen?
    mnesia_log:close_log({?MODULE,Tab});

do_close(InPlace, Outcome, Tab, dat, Storage) ->
    mnesia_monitor:close_dets(Tab),
    if
	Storage == unknown, InPlace == true  ->
	    file:delete(mnesia_lib:tab2dat(Tab));
	InPlace == true ->
	    %% Update in place
	    ok;
	Outcome == ok, Storage /= unknown ->
	    %% Success: swap tmp files with dat files
	    TabDat = mnesia_lib:tab2dat(Tab),
	    ok = file:rename(mnesia_lib:tab2tmp(Tab), TabDat);
	true ->
	    file:delete(mnesia_lib:tab2tmp(Tab))
    end.


ensure_rename(From, To) ->
    case mnesia_lib:exists(From) of
	true ->
	    file:rename(From, To);
	false ->
	    case mnesia_lib:exists(To) of
		true ->
		    ok;
		false ->
		    {error, {rename_failed, From, To}}
	    end
    end.

insert_cstruct(Tid, Cs, KeepWhereabouts, InPlace, InitBy) ->
    Val = mnesia_schema:insert_cstruct(Tid, Cs, KeepWhereabouts),
    {schema, Tab, _} = Val,
    S = val({schema, storage_type}),
    disc_insert(Tid, S, schema, Tab, Val, write, InPlace, InitBy),
    Tab.

delete_cstruct(Tid, Cs, InPlace, InitBy) ->
    Val = mnesia_schema:delete_cstruct(Tid, Cs),
    {schema, Tab, _} = Val,
    S = val({schema, storage_type}),
    disc_insert(Tid, S, schema, Tab, Val, delete, InPlace, InitBy),
    Tab.


temp_set_master_nodes() ->
    Tabs = val({schema, local_tables}),
    Masters = [{Tab, (val({Tab, disc_copies}) ++
		      val({Tab, ram_copies}) ++
		      val({Tab, disc_only_copies})) -- [node()]}
	       || Tab <- Tabs],
    %% UseDir = false since we don't want to remember these
    %% masternodes and we are running (really soon anyway) since we want this
    %% to be known during table loading.
    mnesia_recover:log_master_nodes(Masters, false, yes),
    ok.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Raw dump of table. Dumper must have unique access to the ets table.

raw_named_dump_table(Tab, Ftype) ->
    case mnesia_monitor:use_dir() of
	true ->
	    mnesia_lib:lock_table(Tab),
	    TmpFname = mnesia_lib:tab2tmp(Tab),
	    Fname =
		case Ftype of
		    dat -> mnesia_lib:tab2dat(Tab);
		    dmp -> mnesia_lib:tab2dmp(Tab)
		end,
	    file:delete(TmpFname),
	    file:delete(Fname),
	    TabSize = ?ets_info(Tab, size),
	    TabRef = Tab,
	    DiskType = mnesia_lib:disk_type(Tab),
	    Args = [{file, TmpFname},
		    {keypos, 2},
		    %%		    {ram_file, true},
		    {estimated_no_objects, TabSize + 256},
		    {repair, mnesia_monitor:get_env(auto_repair)},
		    {type, DiskType}],
	    case mnesia_lib:dets_sync_open(TabRef, Args) of
		{ok, TabRef} ->
		    Storage = ram_copies,
		    mnesia_lib:db_fixtable(Storage, Tab, true),

		    try
			ok = raw_dump_table(TabRef, Tab),
			ok = file:rename(TmpFname, Fname)
		    catch _:Reason ->
			    ?SAFE(file:delete(TmpFname)),
			    exit({"Dump of table to disc failed", Reason})
		    after
			mnesia_lib:db_fixtable(Storage, Tab, false),
			mnesia_lib:dets_sync_close(Tab),
			mnesia_lib:unlock_table(Tab)
		    end;
		{error, Reason} ->
		    mnesia_lib:unlock_table(Tab),
		    exit({"Open of file before dump to disc failed", Reason})
	    end;
	false ->
	    exit({has_no_disc, node()})
    end.

raw_dump_table(DetsRef, EtsRef) ->
    dets:from_ets(DetsRef, EtsRef).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Load regulator
%%
%% This is a poor mans substitute for a fair scheduler algorithm
%% in the Erlang emulator. The mnesia_dumper process performs many
%% costly BIF invokations and must pay for this. But since the
%% Emulator does not handle this properly we must compensate for
%% this with some form of load regulation of ourselves in order to
%% not steal all computation power in the Erlang Emulator ans make
%% other processes starve. Hopefully this is a temporary solution.

start_regulator() ->
    case mnesia_monitor:get_env(dump_log_load_regulation) of
	false ->
	    nopid;
	true ->
	    N = ?REGULATOR_NAME,
	    case mnesia_monitor:start_proc(N, ?MODULE, regulator_init, [self()]) of
		{ok, Pid} ->
		    Pid;
		{error, Reason} ->
		    fatal("Failed to start ~n: ~p~n", [N, Reason])
	    end
    end.

regulator_init(Parent) ->
    %% No need for trapping exits.
    %% Using low priority causes the regulation
    process_flag(priority, low),
    register(?REGULATOR_NAME, self()),
    proc_lib:init_ack(Parent, {ok, self()}),
    regulator_loop().

regulator_loop() ->
    receive
	{regulate, From} ->
	    From ! {regulated, self()},
	    regulator_loop();
	{stop, From} ->
	    From ! {stopped, self()},
	    exit(normal)
    end.

regulate(nopid) ->
    ok;
regulate(RegulatorPid) ->
    RegulatorPid ! {regulate, self()},
    receive
	{regulated, RegulatorPid} -> ok
    end.

val(Var) ->
    case ?catch_val(Var) of
	{'EXIT', _} -> mnesia_lib:other_val(Var);
	Value -> Value
    end.