aboutsummaryrefslogblamecommitdiffstats
path: root/lib/dialyzer/test/r9c_SUITE_data/src/mnesia/mnesia_controller.erl
blob: 1d7f29bfbd2cce69685b22c0c76ecb2603fab813 (plain) (tree)
1
2
3
4
5
6
7
8
9
10




                                                                        
  



                                                                         
  


                                                                         
  



















































































                                                                       
                              





































                                                                    
                          





























                                     
                                                              













                                                                   
 




























                                                                      
 









































                                                           
 




























                                                                     
 


























                                                                           
  






























                                                                             
 


                                            
                  








                                                                                   
        






































                                                                       

                                          

                                                
              



                                                            
 
                                         
                     


















                                                                       
             



















                                                                  
 


                                                                         
                                                                










                                                         
 








                                                  
 











































                                                                                  
 






























                                                                         
 











































                                                                             
                                                             












                                                                                  

                                               


























                                                                                     
                                                                              
































































                                                                                     
                                   







                                                         
                                     









                                                         
 
                                          
 































































                                                                              
 










                                                                        
               



















                                                                 
                                              
















                                                         
                









































                                                                         
               














                                                           
 

                                                           
           


















                                                             
 


                                                  
 



                                                      
 






                                                                          
 







                                                                                       
 


























































                                                                        
 





                                                         
 




                                                   
 




















                                                                                     
                           












                                                         
                              


















































































































































                                                                                   
              


                                                     
            



                                                                                        
              


                                                             
                       



















































































                                                                                          
 


















                                                                        

                                                     












































                                                                       
 


                                                      
 



                                                                    
 








































































                                                                                       
                                                                        













































                                                                        
         












                                                
                                
                     
                                 






































                                                                       
             











                                                                         
 

















                                                                                        
        


                                             
            










































                                                                                                     
 







                                                 

                                             


                                                  
                            


























                                                             
               











                                                                   
 















































                                                                           
                                                                  





                                                
  












                                                                    
 






                                                              
 








                                                                                
 











                                                                            
                                                            












                                                     
 
































                                                                           
 
































































































































































                                                                                    
%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved via the world wide web at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
%% AB. All Rights Reserved.''
%%
%%     $Id: mnesia_controller.erl,v 1.3 2010/03/04 13:54:19 maria Exp $
%%
%% The mnesia_init process loads tables from local disc or from
%% another nodes. It also coordinates updates of the info about
%% where we can read and write tables.
%%
%% Tables may need to be loaded initially at startup of the local
%% node or when other nodes announces that they already have loaded
%% tables that we also want.
%%
%% Initially we set the load request queue to those tables that we
%% safely can load locally, i.e. tables where we have the last
%% consistent replica and we have received mnesia_down from all
%% other nodes holding the table. Then we let the mnesia_init
%% process enter its normal working state.
%%
%% When we need to load a table we append a request to the load
%% request queue. All other requests are regarded as high priority
%% and are processed immediately (e.g. update table whereabouts).
%% We processes the load request queue as a "background" job..

-module(mnesia_controller).

-behaviour(gen_server).

%% Mnesia internal stuff
-export([
	 start/0,
	 i_have_tab/1,
	 info/0,
	 get_info/1,
	 get_workers/1,
	 force_load_table/1,
	 async_dump_log/1,
	 sync_dump_log/1,
	 connect_nodes/1,
	 wait_for_schema_commit_lock/0,
	 release_schema_commit_lock/0,
	 create_table/1,
	 get_disc_copy/1,
	 get_cstructs/0,
	 sync_and_block_table_whereabouts/4,
	 sync_del_table_copy_whereabouts/2,
	 block_table/1,
	 unblock_table/1,
	 block_controller/0,
	 unblock_controller/0,
	 unannounce_add_table_copy/2,
	 master_nodes_updated/2,
	 mnesia_down/1,
	 add_active_replica/2,
	 add_active_replica/3,
	 add_active_replica/4,
	 change_table_access_mode/1,
	 del_active_replica/2,
	 wait_for_tables/2,
	 get_network_copy/2,
	 merge_schema/0,
	 start_remote_sender/4,
	 schedule_late_disc_load/2
	]).

%% gen_server callbacks
-export([init/1,
	 handle_call/3,
	 handle_cast/2,
	 handle_info/2,
	 terminate/2,
	 code_change/3]).

%% Module internal stuff
-export([call/1,
	 cast/1,
	 dump_and_reply/2,
	 load_and_reply/2,
	 send_and_reply/2,
	 wait_for_tables_init/2
	]).

-import(mnesia_lib, [set/2, add/2]).
-import(mnesia_lib, [fatal/2, error/2, verbose/2, dbg_out/2]).

-include("mnesia.hrl").

-define(SERVER_NAME, ?MODULE).

-record(state, {supervisor,
		schema_is_merged = false,
		early_msgs = [],
		loader_pid,
		loader_queue = [],
		sender_pid,
		sender_queue =  [],
		late_loader_queue = [],
		dumper_pid,          % Dumper or schema commit pid
		dumper_queue = [],   % Dumper or schema commit queue
		dump_log_timer_ref,
		is_stopping = false
	       }).

-record(worker_reply, {what,
		       pid,
		       result
		      }).

-record(schema_commit_lock, {owner}).
-record(block_controller, {owner}).

-record(dump_log, {initiated_by,
		   opt_reply_to
		  }).

-record(net_load, {table,
		   reason,
		   opt_reply_to,
		   cstruct = unknown
		  }).

-record(send_table, {table,
		     receiver_pid,
		     remote_storage
		    }).

-record(disc_load, {table,
		    reason,
		    opt_reply_to
		   }).

-record(late_load, {table,
		    reason,
		    opt_reply_to,
		    loaders
		   }).

-record(loader_done, {worker_pid,
		      is_loaded,
		      table_name,
		      needs_announce,
		      needs_sync,
		      needs_reply,
		      reply_to,
		      reply}).

-record(sender_done, {worker_pid,
		      worker_res,
		      table_name
		     }).

-record(dumper_done, {worker_pid,
		      worker_res
		     }).

val(Var) ->
    case ?catch_val(Var) of
	{'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason);
	Value -> Value
    end.

start() ->
    gen_server:start_link({local, ?SERVER_NAME}, ?MODULE, [self()],
			  [{timeout, infinity}
			   %% ,{debug, [trace]}
			  ]).

sync_dump_log(InitBy) ->
    call({sync_dump_log, InitBy}).

async_dump_log(InitBy) ->
    ?SERVER_NAME ! {async_dump_log, InitBy}.

%% Wait for tables to be active
%% If needed, we will wait for Mnesia to start
%% If Mnesia stops, we will wait for Mnesia to restart
%% We will wait even if the list of tables is empty
%%
wait_for_tables(Tabs, Timeout) when list(Tabs), Timeout == infinity ->
    do_wait_for_tables(Tabs, Timeout);
wait_for_tables(Tabs, Timeout) when list(Tabs),
                                    integer(Timeout), Timeout >= 0 ->
    do_wait_for_tables(Tabs, Timeout);
wait_for_tables(Tabs, Timeout) ->
    {error, {badarg, Tabs, Timeout}}.

do_wait_for_tables(Tabs, 0) ->
    reply_wait(Tabs);
do_wait_for_tables(Tabs, Timeout) ->
    Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]),
    receive
	{?SERVER_NAME, Pid, Res} ->
	    Res;

	{'EXIT', Pid, _} ->
	    reply_wait(Tabs)

    after Timeout ->
	    unlink(Pid),
	    exit(Pid, timeout),
	    reply_wait(Tabs)
    end.

reply_wait(Tabs) ->
    case catch mnesia_lib:active_tables() of
	{'EXIT', _} ->
	    {error, {node_not_running, node()}};
	Active when list(Active) ->
	    case Tabs -- Active of
		[] ->
		    ok;
		BadTabs ->
		    {timeout, BadTabs}
	    end
    end.

wait_for_tables_init(From, Tabs) ->
    process_flag(trap_exit, true),
    Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)),
    From ! {?SERVER_NAME, self(), Res},
    unlink(From),
    exit(normal).

wait_for_init(From, Tabs, Init) ->
    case catch link(Init) of
	{'EXIT', _} ->
	    %% Mnesia is not started
	    {error, {node_not_running, node()}};
	true when pid(Init) ->
	    cast({sync_tabs, Tabs, self()}),
	    rec_tabs(Tabs, Tabs, From, Init)
    end.

sync_reply(Waiter, Tab) ->
    Waiter ! {?SERVER_NAME, {tab_synced, Tab}}.

rec_tabs([Tab | Tabs], AllTabs, From, Init) ->
    receive
	{?SERVER_NAME, {tab_synced, Tab}} ->
	    rec_tabs(Tabs, AllTabs, From, Init);

	{'EXIT', From, _} ->
	    %% This will trigger an exit signal
	    %% to mnesia_init
	    exit(wait_for_tables_timeout);

	{'EXIT', Init, _} ->
	    %% Oops, mnesia_init stopped,
	    exit(mnesia_stopped)
    end;
rec_tabs([], _, _, Init) ->
    unlink(Init),
    ok.

get_cstructs() ->
    call(get_cstructs).

mnesia_down(Node) ->
    case cast({mnesia_down, Node}) of
	{error, _} -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node);
	_Pid ->  ok
    end.
wait_for_schema_commit_lock() ->
    link(whereis(?SERVER_NAME)),
    unsafe_call(wait_for_schema_commit_lock).

block_controller() ->
    call(block_controller).

unblock_controller() ->
    cast(unblock_controller).

release_schema_commit_lock() ->
    cast({release_schema_commit_lock, self()}),
    unlink(whereis(?SERVER_NAME)).

%% Special for preparation of add table copy
get_network_copy(Tab, Cs) ->
    Work = #net_load{table = Tab,
		     reason = {dumper, add_table_copy},
		     cstruct = Cs
		    },
    Res = (catch load_table(Work)),
    if Res#loader_done.is_loaded == true ->
	    Tab = Res#loader_done.table_name,
	    case Res#loader_done.needs_announce of
		true ->
		    i_have_tab(Tab);
		false ->
		    ignore
	    end;
       true -> ignore
    end,

    receive %% Flush copier done message
	{copier_done, _Node} ->
	    ok
    after 500 ->  %% avoid hanging if something is wrong and we shall fail.
	    ignore
    end,
    Res#loader_done.reply.

%% This functions is invoked from the dumper
%%
%% There are two cases here:
%% startup ->
%%   no need for sync, since mnesia_controller not started yet
%% schema_trans ->
%%   already synced with mnesia_controller since the dumper
%%   is syncronously started from mnesia_controller

create_table(Tab) ->
    {loaded, ok} = mnesia_loader:disc_load_table(Tab, {dumper,create_table}).

get_disc_copy(Tab) ->
    disc_load_table(Tab, {dumper,change_table_copy_type}, undefined).

%% Returns ok instead of yes
force_load_table(Tab) when atom(Tab), Tab /= schema ->
    case ?catch_val({Tab, storage_type}) of
	ram_copies ->
	    do_force_load_table(Tab);
	disc_copies ->
	    do_force_load_table(Tab);
	disc_only_copies ->
	    do_force_load_table(Tab);
	unknown ->
	    set({Tab, load_by_force}, true),
	    cast({force_load_updated, Tab}),
	    wait_for_tables([Tab], infinity);
	{'EXIT', _} ->
	    {error, {no_exists, Tab}}
    end;
force_load_table(Tab) ->
    {error, {bad_type, Tab}}.

do_force_load_table(Tab) ->
    Loaded = ?catch_val({Tab, load_reason}),
    case Loaded of
	unknown ->
	    set({Tab, load_by_force}, true),
	    mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
	    wait_for_tables([Tab], infinity);
	{'EXIT', _} ->
	    set({Tab, load_by_force}, true),
	    mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
	    wait_for_tables([Tab], infinity);
	_ ->
	    ok
    end.
master_nodes_updated(schema, _Masters) ->
    ignore;
master_nodes_updated(Tab, Masters) ->
    cast({master_nodes_updated, Tab, Masters}).

schedule_late_disc_load(Tabs, Reason) ->
    MsgTag = late_disc_load,
    try_schedule_late_disc_load(Tabs, Reason, MsgTag).

try_schedule_late_disc_load(Tabs, _Reason, MsgTag)
  when Tabs == [], MsgTag /= schema_is_merged ->
    ignore;
try_schedule_late_disc_load(Tabs, Reason, MsgTag) ->
    GetIntents =
	fun() ->
		Item = mnesia_late_disc_load,
		Nodes = val({current, db_nodes}),
		mnesia:lock({global, Item, Nodes}, write),
		case multicall(Nodes -- [node()], disc_load_intents) of
		    {Replies, []} ->
			call({MsgTag, Tabs, Reason, Replies}),
			done;
		    {_, BadNodes} ->
			%% Some nodes did not respond, lets try again
			{retry, BadNodes}
		end
	end,
    case mnesia:transaction(GetIntents) of
	{'atomic', done} ->
	    done;
	{'atomic', {retry, BadNodes}} ->
	    verbose("Retry late_load_tables because bad nodes: ~p~n",
		    [BadNodes]),
	    try_schedule_late_disc_load(Tabs, Reason, MsgTag);
	{aborted, AbortReason} ->
	    fatal("Cannot late_load_tables~p: ~p~n",
		  [[Tabs, Reason, MsgTag], AbortReason])
    end.

connect_nodes(Ns) ->
    case mnesia:system_info(is_running) of
	no ->
	    {error, {node_not_running, node()}};
	yes ->
	    {NewC, OldC} = mnesia_recover:connect_nodes(Ns),
	    Connected = NewC ++OldC,
	    New1 = mnesia_lib:intersect(Ns, Connected),
	    New = New1 -- val({current, db_nodes}),

	    case try_merge_schema(New) of
		ok ->
		    mnesia_lib:add_list(extra_db_nodes, New),
		    {ok, New};
		{aborted, {throw, Str}} when list(Str) ->
		    %%mnesia_recover:disconnect_nodes(New),
		    {error, {merge_schema_failed, lists:flatten(Str)}};
		Else ->
		    %% Unconnect nodes where merge failed!!
		    %% mnesia_recover:disconnect_nodes(New),
		    {error, Else}
	    end
    end.

%% Merge the local schema with the schema on other nodes.
%% But first we must let all processes that want to force
%% load tables wait until the schema merge is done.

merge_schema() ->
    AllNodes = mnesia_lib:all_nodes(),
    case try_merge_schema(AllNodes) of
	ok ->
	    schema_is_merged();
	{aborted, {throw, Str}} when list(Str) ->
	    fatal("Failed to merge schema: ~s~n", [Str]);
	Else ->
	    fatal("Failed to merge schema: ~p~n", [Else])
    end.

try_merge_schema(Nodes) ->
    case mnesia_schema:merge_schema() of
	{'atomic', not_merged} ->
	    %% No more nodes that we need to merge the schema with
	    ok;
	{'atomic', {merged, OldFriends, NewFriends}} ->
	    %% Check if new nodes has been added to the schema
	    Diff = mnesia_lib:all_nodes() -- [node() | Nodes],
	    mnesia_recover:connect_nodes(Diff),

	    %% Tell everybody to adopt orphan tables
	    im_running(OldFriends, NewFriends),
	    im_running(NewFriends, OldFriends),

	    try_merge_schema(Nodes);
	{'atomic', {"Cannot get cstructs", Node, Reason}} ->
	    dbg_out("Cannot get cstructs, Node ~p ~p~n", [Node, Reason]),
	    timer:sleep(1000), % Avoid a endless loop look alike
	    try_merge_schema(Nodes);
	Other ->
	    Other
    end.

im_running(OldFriends, NewFriends) ->
    abcast(OldFriends, {im_running, node(), NewFriends}).

schema_is_merged() ->
    MsgTag = schema_is_merged,
    SafeLoads = initial_safe_loads(),

    %% At this point we do not know anything about
    %% which tables that the other nodes already
    %% has loaded and therefore we let the normal
    %% processing of the loader_queue take care
    %% of it, since we at that time point will
    %% know the whereabouts. We rely on the fact
    %% that all nodes tells each other directly
    %% when they have loaded a table and are
    %% willing to share it.

    try_schedule_late_disc_load(SafeLoads, initial, MsgTag).


cast(Msg) ->
    case whereis(?SERVER_NAME) of
	undefined ->{error, {node_not_running, node()}};
	Pid ->  gen_server:cast(Pid, Msg)
    end.

abcast(Nodes, Msg) ->
    gen_server:abcast(Nodes, ?SERVER_NAME, Msg).

unsafe_call(Msg) ->
    case whereis(?SERVER_NAME) of
	undefined -> {error, {node_not_running, node()}};
	Pid -> gen_server:call(Pid, Msg, infinity)
    end.

call(Msg) ->
    case whereis(?SERVER_NAME) of
	undefined ->
	    {error, {node_not_running, node()}};
	Pid ->
	    link(Pid),
	    Res = gen_server:call(Pid, Msg, infinity),
	    unlink(Pid),

	    %% We get an exit signal if server dies
            receive
                {'EXIT', Pid, _Reason} ->
                    {error, {node_not_running, node()}}
            after 0 ->
                    ignore
            end,
	    Res
    end.

remote_call(Node, Func, Args) ->
    case catch gen_server:call({?MODULE, Node}, {Func, Args, self()}, infinity) of
	{'EXIT', Error} ->
	    {error, Error};
	Else ->
	    Else
    end.

multicall(Nodes, Msg) ->
    {Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity),
    PatchedGood = [Reply || {_Node, Reply} <- Good],
    {PatchedGood, Bad}.  %% Make the replies look like rpc:multicalls..
%%    rpc:multicall(Nodes, ?MODULE, call, [Msg]).

%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State}          |
%%          {ok, State, Timeout} |
%%          {stop, Reason}
%%----------------------------------------------------------------------
init([Parent]) ->
    process_flag(trap_exit, true),
    mnesia_lib:verbose("~p starting: ~p~n", [?SERVER_NAME, self()]),

    %% Handshake and initialize transaction recovery
    %% for new nodes detected in the schema
    All = mnesia_lib:all_nodes(),
    Diff = All -- [node() | val(original_nodes)],
    mnesia_lib:unset(original_nodes),
    mnesia_recover:connect_nodes(Diff),

    Interval = mnesia_monitor:get_env(dump_log_time_threshold),
    Msg = {async_dump_log, time_threshold},
    {ok, Ref} = timer:send_interval(Interval, Msg),
    mnesia_dumper:start_regulator(),

    {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref}}.

%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State}          |
%%          {reply, Reply, State, Timeout} |
%%          {noreply, State}               |
%%          {noreply, State, Timeout}      |
%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
%%          {stop, Reason, Reply, State}     (terminate/2 is called)
%%----------------------------------------------------------------------

handle_call({sync_dump_log, InitBy}, From, State) ->
    Worker = #dump_log{initiated_by = InitBy,
		       opt_reply_to = From
		      },
    State2 = add_worker(Worker, State),
    noreply(State2);

handle_call(wait_for_schema_commit_lock, From, State) ->
    Worker = #schema_commit_lock{owner = From},
    State2 = add_worker(Worker, State),
    noreply(State2);

handle_call(block_controller, From, State) ->
    Worker = #block_controller{owner = From},
    State2 = add_worker(Worker, State),
    noreply(State2);


handle_call(get_cstructs, From, State) ->
    Tabs = val({schema, tables}),
    Cstructs = [val({T, cstruct}) || T <- Tabs],
    Running = val({current, db_nodes}),
    reply(From, {cstructs, Cstructs, Running}),
    noreply(State);

handle_call({schema_is_merged, TabsR, Reason, RemoteLoaders}, From, State) ->
    State2 = late_disc_load(TabsR, Reason, RemoteLoaders, From, State),

    %% Handle early messages
    Msgs = State2#state.early_msgs,
    State3 = State2#state{early_msgs = [], schema_is_merged = true},
    Ns = val({current, db_nodes}),
    dbg_out("Schema is merged ~w, State ~w~n", [Ns, State3]),
%%    dbg_out("handle_early_msgs ~p ~n", [Msgs]), % qqqq
    handle_early_msgs(lists:reverse(Msgs), State3);

handle_call(disc_load_intents, From, State) ->
    Tabs = disc_load_intents(State#state.loader_queue) ++
           disc_load_intents(State#state.late_loader_queue),
    ActiveTabs = mnesia_lib:local_active_tables(),
    reply(From, {ok, node(), mnesia_lib:union(Tabs, ActiveTabs)}),
    noreply(State);

handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) ->
%%%    dbg_out("update_w2w ~p", [[add, Tab, AddNode]]), %%% qqqq
    Current = val({current, db_nodes}),
    Res =
	case lists:member(AddNode, Current) and
	    State#state.schema_is_merged == true of
	    true ->
		mnesia_lib:add({Tab, where_to_write}, AddNode);
	    false ->
		ignore
	end,
    {reply, Res, State};

handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
	    ReplyTo, State) ->
    KnownNode = lists:member(ToNode, val({current, db_nodes})),
    Merged = State#state.schema_is_merged,
    if
	KnownNode == false ->
	    reply(ReplyTo, ignore),
	    noreply(State);
	Merged == true ->
	    Res = add_active_replica(Tab, ToNode, RemoteS, AccessMode),
	    reply(ReplyTo, Res),
	    noreply(State);
	true -> %% Schema is not merged
	    Msg = {add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
	    Msgs = State#state.early_msgs,
	    reply(ReplyTo, ignore),   %% Reply ignore and add data after schema merge
	    noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
    end;

handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) ->
    KnownNode = lists:member(node(From), val({current, db_nodes})),
    Merged = State#state.schema_is_merged,
    if
	KnownNode == false ->
	    reply(ReplyTo, ignore),
	    noreply(State);
	Merged == true ->
	    Res = unannounce_add_table_copy(Tab, Node),
	    reply(ReplyTo, Res),
	    noreply(State);
	true -> %% Schema is not merged
	    Msg = {unannounce_add_table_copy, [Tab, Node], From},
	    Msgs = State#state.early_msgs,
	    reply(ReplyTo, ignore),   %% Reply ignore and add data after schema merge
	    %% Set ReplyTO to undefined so we don't reply twice
	    noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
    end;

handle_call(Msg, From, State) when State#state.schema_is_merged == false ->
    %% Buffer early messages
%%    dbg_out("Buffered early msg ~p ~n", [Msg]),   %% qqqq
    Msgs = State#state.early_msgs,
    noreply(State#state{early_msgs = [{call, Msg, From} | Msgs]});

handle_call({net_load, Tab, Cs}, From, State) ->
    Worker = #net_load{table = Tab,
		       opt_reply_to = From,
		       reason = add_table_copy,
		       cstruct = Cs
		      },
    State2 = add_worker(Worker, State),
    noreply(State2);

handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) ->
    State2 = late_disc_load(Tabs, Reason, RemoteLoaders, From, State),
    noreply(State2);

handle_call({block_table, [Tab], From}, _Dummy, State) ->
    case lists:member(node(From), val({current, db_nodes})) of
	true ->
	    block_table(Tab);
	false ->
	    ignore
    end,
    {reply, ok, State};

handle_call({check_w2r, _Node, Tab}, _From, State) ->
    {reply, val({Tab, where_to_read}), State};

handle_call(Msg, _From, State) ->
    error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]),
    noreply(State).

disc_load_intents([H | T]) when record(H, disc_load) ->
    [H#disc_load.table | disc_load_intents(T)];
disc_load_intents([H | T]) when record(H, late_load) ->
    [H#late_load.table | disc_load_intents(T)];
disc_load_intents( [H | T]) when record(H, net_load) ->
    disc_load_intents(T);
disc_load_intents([]) ->
    [].

late_disc_load(TabsR, Reason, RemoteLoaders, From, State) ->
    verbose("Intend to load tables: ~p~n", [TabsR]),
    ?eval_debug_fun({?MODULE, late_disc_load},
		    [{tabs, TabsR},
		     {reason, Reason},
		     {loaders, RemoteLoaders}]),

    reply(From, queued),
    %% RemoteLoaders is a list of {ok, Node, Tabs} tuples

    %% Remove deleted tabs
    LocalTabs = mnesia_lib:val({schema, local_tables}),
    Filter = fun({Tab, Reas}, Acc) ->
		     case lists:member(Tab, LocalTabs) of
			 true -> [{Tab, Reas} | Acc];
		         false -> Acc
		     end;
		(Tab, Acc) ->
		     case lists:member(Tab, LocalTabs) of
			 true -> [Tab | Acc];
		         false -> Acc
		     end
	     end,

    Tabs = lists:foldl(Filter, [], TabsR),

    Nodes = val({current, db_nodes}),
    LateLoaders = late_loaders(Tabs, Reason, RemoteLoaders, Nodes),
    LateQueue = State#state.late_loader_queue ++ LateLoaders,
    State#state{late_loader_queue = LateQueue}.

late_loaders([{Tab, Reason} | Tabs], DefaultReason, RemoteLoaders, Nodes) ->
    LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []),
    case LoadNodes of
	[] ->
	    cast({disc_load, Tab, Reason}); % Ugly cast
	_ ->
	    ignore
    end,
    LateLoad = #late_load{table = Tab, loaders = LoadNodes, reason = Reason},
    [LateLoad | late_loaders(Tabs, DefaultReason, RemoteLoaders, Nodes)];

late_loaders([Tab | Tabs], Reason, RemoteLoaders, Nodes) ->
    Loaders = late_load_filter(RemoteLoaders, Tab, Nodes, []),
    case Loaders of
	[] ->
	    cast({disc_load, Tab, Reason});  % Ugly cast
	_ ->
	    ignore
    end,
    LateLoad = #late_load{table = Tab, loaders = Loaders, reason = Reason},
    [LateLoad | late_loaders(Tabs, Reason, RemoteLoaders, Nodes)];
late_loaders([], _Reason, _RemoteLoaders, _Nodes) ->
    [].

late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) ->
    late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
late_load_filter([{badrpc, _} | RemoteLoaders], Tab, Nodes, Acc) ->
    late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) ->
    {ok, Node, Intents} = RL,
    Access = val({Tab, access_mode}),
    LocalC = val({Tab, local_content}),
    StillActive = lists:member(Node, Nodes),
    RemoteIntent = lists:member(Tab, Intents),
    if
	Access == read_write,
	LocalC == false,
	StillActive == true,
	RemoteIntent == true ->
	    Masters = mnesia_recover:get_master_nodes(Tab),
	    case lists:member(Node, Masters) of
		true ->
		    %% The other node is master node for
		    %% the table, accept his load intent
		    late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);
		false when Masters == [] ->
		    %% The table has no master nodes
		    %% accept his load intent
		    late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);
		false ->
		    %% Some one else is master node for
		    %% the table, ignore his load intent
		    late_load_filter(RemoteLoaders, Tab, Nodes, Acc)
	    end;
	true ->
	    late_load_filter(RemoteLoaders, Tab, Nodes, Acc)
    end;
late_load_filter([], _Tab, _Nodes, Acc) ->
    Acc.

%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------

handle_cast({release_schema_commit_lock, _Owner}, State) ->
    if
	State#state.is_stopping == true ->
	    {stop, shutdown, State};
	true ->
	    case State#state.dumper_queue of
		[#schema_commit_lock{}|Rest] ->
		    [_Worker | Rest] = State#state.dumper_queue,
		    State2 = State#state{dumper_pid = undefined,
					 dumper_queue = Rest},
		    State3 = opt_start_worker(State2),
		    noreply(State3);
		_ ->
		    noreply(State)
	    end
    end;

handle_cast(unblock_controller, State) ->
    if
	State#state.is_stopping == true ->
	    {stop, shutdown, State};
	record(hd(State#state.dumper_queue), block_controller) ->
	    [_Worker | Rest] = State#state.dumper_queue,
	    State2 = State#state{dumper_pid = undefined,
				 dumper_queue = Rest},
	    State3 = opt_start_worker(State2),
	    noreply(State3)
    end;

handle_cast({mnesia_down, Node}, State) ->
    maybe_log_mnesia_down(Node),
    mnesia_lib:del({current, db_nodes}, Node),
    mnesia_checkpoint:tm_mnesia_down(Node),
    Alltabs = val({schema, tables}),
    State2 = reconfigure_tables(Node, State, Alltabs),
    case State#state.sender_pid of
	undefined -> ignore;
	Pid when pid(Pid) -> Pid ! {copier_done, Node}
    end,
    case State#state.loader_pid of
	undefined -> ignore;
	Pid2 when pid(Pid2) -> Pid2 ! {copier_done, Node}
    end,
    NewSenders =
	case State#state.sender_queue of
	    [OldSender | RestSenders] ->
		Remove = fun(ST) ->
				 node(ST#send_table.receiver_pid) /= Node
			 end,
		NewS = lists:filter(Remove, RestSenders),
		%% Keep old sender it will be removed by sender_done
		[OldSender | NewS];
	    [] ->
		[]
	end,
    Early = remove_early_messages(State2#state.early_msgs, Node),
    mnesia_monitor:mnesia_down(?SERVER_NAME, Node),
    noreply(State2#state{sender_queue = NewSenders, early_msgs = Early});

handle_cast({im_running, _Node, NewFriends}, State) ->
    Tabs = mnesia_lib:local_active_tables() -- [schema],
    Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})),
    abcast(Ns, {adopt_orphans, node(), Tabs}),
    noreply(State);

handle_cast(Msg, State) when State#state.schema_is_merged == false ->
    %% Buffer early messages
    Msgs = State#state.early_msgs,
    noreply(State#state{early_msgs = [{cast, Msg} | Msgs]});

handle_cast({disc_load, Tab, Reason}, State) ->
    Worker = #disc_load{table = Tab, reason = Reason},
    State2 = add_worker(Worker, State),
    noreply(State2);

handle_cast(Worker, State) when record(Worker, send_table) ->
    State2 = add_worker(Worker, State),
    noreply(State2);

handle_cast({sync_tabs, Tabs, From}, State) ->
    %% user initiated wait_for_tables
    handle_sync_tabs(Tabs, From),
    noreply(State);

handle_cast({i_have_tab, Tab, Node}, State) ->
    case lists:member(Node, val({current, db_nodes})) of
	true ->
	    State2 = node_has_tabs([Tab], Node, State),
	    noreply(State2);
	false ->
	    noreply(State)
    end;

handle_cast({force_load_updated, Tab}, State) ->
    case val({Tab, active_replicas}) of
	[] ->
	    %% No valid replicas
	    noreply(State);
	[SomeNode | _] ->
	    State2 = node_has_tabs([Tab], SomeNode, State),
	    noreply(State2)
    end;

handle_cast({master_nodes_updated, Tab, Masters}, State) ->
    Active = val({Tab, active_replicas}),
    Valid =
	case val({Tab, load_by_force}) of
	    true ->
		Active;
	    false ->
		if
		    Masters == [] ->
			Active;
		    true ->
			mnesia_lib:intersect(Masters, Active)
		end
	end,
    case Valid of
	[] ->
	    %% No valid replicas
	    noreply(State);
	[SomeNode | _] ->
	    State2 = node_has_tabs([Tab], SomeNode, State),
	    noreply(State2)
    end;

handle_cast({adopt_orphans, Node, Tabs}, State) ->

    State2 = node_has_tabs(Tabs, Node, State),

    %% Register the other node as up and running
    mnesia_recover:log_mnesia_up(Node),
    verbose("Logging mnesia_up ~w~n", [Node]),
    mnesia_lib:report_system_event({mnesia_up, Node}),

    %% Load orphan tables
    LocalTabs = val({schema, local_tables}) -- [schema],
    Nodes = val({current, db_nodes}),
    {LocalOrphans, RemoteMasters} =
	orphan_tables(LocalTabs, Node, Nodes, [], []),
    Reason = {adopt_orphan, node()},
    mnesia_late_loader:async_late_disc_load(node(), LocalOrphans, Reason),

    Fun =
	fun(N) ->
		RemoteOrphans =
		    [Tab || {Tab, Ns} <- RemoteMasters,
			    lists:member(N, Ns)],
		mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason)
	  end,
    lists:foreach(Fun, Nodes),

    Queue = State2#state.loader_queue,
    State3 = State2#state{loader_queue = Queue},
    noreply(State3);

handle_cast(Msg, State) ->
    error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]),
    noreply(State).

handle_sync_tabs([Tab | Tabs], From) ->
    case val({Tab, where_to_read}) of
	nowhere ->
	    case get({sync_tab, Tab}) of
		undefined ->
		    put({sync_tab, Tab}, [From]);
		Pids ->
		    put({sync_tab, Tab}, [From | Pids])
	    end;
	_ ->
	    sync_reply(From, Tab)
    end,
    handle_sync_tabs(Tabs, From);
handle_sync_tabs([], _From) ->
    ok.

%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------

handle_info({async_dump_log, InitBy}, State) ->
    Worker = #dump_log{initiated_by = InitBy},
    State2 = add_worker(Worker, State),
    noreply(State2);

handle_info(Done, State) when record(Done, dumper_done) ->
    Pid = Done#dumper_done.worker_pid,
    Res = Done#dumper_done.worker_res,
    if
	State#state.is_stopping == true ->
	    {stop, shutdown, State};
	Res == dumped, Pid == State#state.dumper_pid ->
	    [Worker | Rest] = State#state.dumper_queue,
	    reply(Worker#dump_log.opt_reply_to, Res),
	    State2 = State#state{dumper_pid = undefined,
				 dumper_queue = Rest},
	    State3 = opt_start_worker(State2),
	    noreply(State3);
	true ->
	    fatal("Dumper failed: ~p~n state: ~p~n", [Res, State]),
	    {stop, fatal, State}
    end;

handle_info(Done, State) when record(Done, loader_done) ->
    if
	%% Assertion
	Done#loader_done.worker_pid == State#state.loader_pid -> ok
    end,

    [_Worker | Rest] = LoadQ0 = State#state.loader_queue,
    LateQueue0 = State#state.late_loader_queue,
    {LoadQ, LateQueue} =
	case Done#loader_done.is_loaded of
	    true ->
		Tab = Done#loader_done.table_name,

		%% Optional user sync
		case Done#loader_done.needs_sync of
		    true -> user_sync_tab(Tab);
		    false -> ignore
		end,

		%% Optional table announcement
		case Done#loader_done.needs_announce of
		    true ->
			i_have_tab(Tab),
			case Tab of
			    schema ->
				ignore;
			    _ ->
				%% Local node needs to perform user_sync_tab/1
				Ns = val({current, db_nodes}),
				abcast(Ns, {i_have_tab, Tab, node()})
			end;
		    false ->
			case Tab of
			    schema ->
				ignore;
			    _ ->
				%% Local node needs to perform user_sync_tab/1
				Ns = val({current, db_nodes}),
				AlreadyKnows = val({Tab, active_replicas}),
				abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()})
			end
		end,

		%% Optional client reply
		case Done#loader_done.needs_reply of
		    true ->
			reply(Done#loader_done.reply_to,
			      Done#loader_done.reply);
		    false ->
			ignore
		end,
		{Rest, reply_late_load(Tab, LateQueue0)};
	    false ->
		case Done#loader_done.reply of
		    restart ->
			{LoadQ0, LateQueue0};
		    _ ->
			{Rest, LateQueue0}
		end
	end,

    State2 = State#state{loader_pid = undefined,
			 loader_queue = LoadQ,
			 late_loader_queue = LateQueue},

    State3 = opt_start_worker(State2),
    noreply(State3);

handle_info(Done, State) when record(Done, sender_done) ->
    Pid = Done#sender_done.worker_pid,
    Res = Done#sender_done.worker_res,
    if
	Res == ok, Pid == State#state.sender_pid ->
	    [Worker | Rest] = State#state.sender_queue,
	    Worker#send_table.receiver_pid ! {copier_done, node()},
	    State2 = State#state{sender_pid = undefined,
				 sender_queue = Rest},
	    State3 = opt_start_worker(State2),
	    noreply(State3);
	true ->
	    %% No need to send any message to the table receiver
	    %% since it will soon get a mnesia_down anyway
	    fatal("Sender failed: ~p~n state: ~p~n", [Res, State]),
	    {stop, fatal, State}
    end;

handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor ->
    catch set(mnesia_status, stopping),
    case State#state.dumper_pid of
	undefined ->
	    dbg_out("~p was ~p~n", [?SERVER_NAME, R]),
	    {stop, shutdown, State};
	_ ->
	    noreply(State#state{is_stopping = true})
    end;

handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid ->
    case State#state.dumper_queue of
	[#schema_commit_lock{}|Workers] ->  %% Schema trans crashed or was killed
	    State2 = State#state{dumper_queue = Workers, dumper_pid = undefined},
	    State3 = opt_start_worker(State2),
	    noreply(State3);
	_Other ->
	    fatal("Dumper or schema commit crashed: ~p~n state: ~p~n", [R, State]),
	    {stop, fatal, State}
    end;

handle_info({'EXIT', Pid, R}, State) when Pid == State#state.loader_pid ->
    fatal("Loader crashed: ~p~n state: ~p~n", [R, State]),
    {stop, fatal, State};

handle_info({'EXIT', Pid, R}, State) when Pid == State#state.sender_pid ->
    %% No need to send any message to the table receiver
    %% since it will soon get a mnesia_down anyway
    fatal("Sender crashed: ~p~n state: ~p~n", [R, State]),
    {stop, fatal, State};

handle_info({From, get_state}, State) ->
    From ! {?SERVER_NAME, State},
    noreply(State);

%% No real need for buffering
handle_info(Msg, State) when State#state.schema_is_merged == false ->
    %% Buffer early messages
    Msgs = State#state.early_msgs,
    noreply(State#state{early_msgs = [{info, Msg} | Msgs]});

handle_info({'EXIT', Pid, wait_for_tables_timeout}, State) ->
    sync_tab_timeout(Pid, get()),
    noreply(State);

handle_info(Msg, State) ->
    error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]),
    noreply(State).

reply_late_load(Tab, [H | T]) when H#late_load.table == Tab ->
    reply(H#late_load.opt_reply_to, ok),
    reply_late_load(Tab, T);
reply_late_load(Tab, [H | T])  ->
    [H | reply_late_load(Tab, T)];
reply_late_load(_Tab, []) ->
    [].

sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) ->
    case lists:delete(Pid, Pids) of
	[] ->
	    erase({sync_tab, Tab});
	Pids2 ->
	    put({sync_tab, Tab}, Pids2)
    end,
    sync_tab_timeout(Pid, Tail);
sync_tab_timeout(Pid, [_ | Tail]) ->
    sync_tab_timeout(Pid, Tail);
sync_tab_timeout(_Pid, []) ->
    ok.

%% Pick the load record that has the highest load order
%% Returns {BestLoad, RemainingQueue} or {none, []} if queue is empty
pick_next(Queue) ->
    pick_next(Queue, none, none, []).

pick_next([Head | Tail], Load, Order, Rest) when record(Head, net_load) ->
    Tab = Head#net_load.table,
    select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest);
pick_next([Head | Tail], Load, Order, Rest) when record(Head, disc_load) ->
    Tab = Head#disc_load.table,
    select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest);
pick_next([], Load, _Order, Rest) ->
    {Load, Rest}.

select_best(Load, Tail, Order, none, none, Rest) ->
    pick_next(Tail, Load, Order, Rest);
select_best(Load, Tail, Order, OldLoad, OldOrder, Rest) when Order > OldOrder ->
    pick_next(Tail, Load, Order, [OldLoad | Rest]);
select_best(Load, Tail, _Order, OldLoad, OldOrder, Rest) ->
    pick_next(Tail, OldLoad, OldOrder, [Load | Rest]).

%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason, State) ->
    mnesia_monitor:terminate_proc(?SERVER_NAME, Reason, State).

%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Upgrade process when its code is to be changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

maybe_log_mnesia_down(N) ->
    %% We use mnesia_down when deciding which tables to load locally,
    %% so if we are not running (i.e haven't decided which tables
    %% to load locally), don't log mnesia_down yet.
    case mnesia_lib:is_running() of
	yes ->
	    verbose("Logging mnesia_down ~w~n", [N]),
	    mnesia_recover:log_mnesia_down(N),
	    ok;
	_ ->
	    Filter = fun(Tab) ->
			     inactive_copy_holders(Tab, N)
		     end,
	    HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]),
	    if
		HalfLoadedTabs == true ->
		    verbose("Logging mnesia_down ~w~n", [N]),
		    mnesia_recover:log_mnesia_down(N),
		    ok;
		true ->
		    %% Unfortunately we have not loaded some common
		    %% tables yet, so we cannot rely on the nodedown
		    log_later   %% BUGBUG handle this case!!!
	    end
    end.

inactive_copy_holders(Tab, Node) ->
    Cs = val({Tab, cstruct}),
    case mnesia_lib:cs_to_storage_type(Node, Cs) of
	unknown ->
	    false;
	_Storage ->
	    mnesia_lib:not_active_here(Tab)
    end.

orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) ->
    Cs = val({Tab, cstruct}),
    CopyHolders = mnesia_lib:copy_holders(Cs),
    RamCopyHolders = Cs#cstruct.ram_copies,
    DiscCopyHolders = CopyHolders -- RamCopyHolders,
    DiscNodes = val({schema, disc_copies}),
    LocalContent = Cs#cstruct.local_content,
    RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes),
    Active = val({Tab, active_replicas}),
    case lists:member(Node, DiscCopyHolders) of
	true when Active == [] ->
	    case DiscCopyHolders -- Ns of
		[] ->
		    %% We're last up and the other nodes have not
		    %% loaded the table. Lets load it if we are
		    %% the smallest node.
		    case lists:min(DiscCopyHolders) of
			Min when Min == node() ->
			    case mnesia_recover:get_master_nodes(Tab) of
				[] ->
				    L = [Tab | Local],
				    orphan_tables(Tabs, Node, Ns, L, Remote);
				Masters ->
				    R = [{Tab, Masters} | Remote],
				    orphan_tables(Tabs, Node, Ns, Local, R)
			    end;
			_ ->
			    orphan_tables(Tabs, Node, Ns, Local, Remote)
		    end;
		_ ->
		    orphan_tables(Tabs, Node, Ns, Local, Remote)
	    end;
	false when Active == [], DiscCopyHolders == [], RamCopyHoldersOnDiscNodes == [] ->
	    %% Special case when all replicas resides on disc less nodes
	    orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);
	_ when LocalContent == true ->
	    orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);
	_ ->
	    orphan_tables(Tabs, Node, Ns, Local, Remote)
    end;
orphan_tables([], _, _, LocalOrphans, RemoteMasters) ->
    {LocalOrphans, RemoteMasters}.

node_has_tabs([Tab | Tabs], Node, State) when Node /= node() ->
    State2 = update_whereabouts(Tab, Node, State),
    node_has_tabs(Tabs, Node, State2);
node_has_tabs([Tab | Tabs], Node, State) ->
    user_sync_tab(Tab),
    node_has_tabs(Tabs, Node, State);
node_has_tabs([], _Node, State) ->
    State.

update_whereabouts(Tab, Node, State) ->
    Storage = val({Tab, storage_type}),
    Read = val({Tab, where_to_read}),
    LocalC = val({Tab, local_content}),
    BeingCreated = (?catch_val({Tab, create_table}) == true),
    Masters = mnesia_recover:get_master_nodes(Tab),
    ByForce = val({Tab, load_by_force}),
    GoGetIt =
	if
	    ByForce == true ->
		true;
	    Masters == [] ->
		true;
	    true ->
		lists:member(Node, Masters)
	end,

    dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n",
	    [Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]),
    if
	LocalC == true ->
	    %% Local contents, don't care about other node
	    State;
	Storage == unknown, Read == nowhere ->
	    %% No own copy, time to read remotely
	    %% if the other node is a good node
	    add_active_replica(Tab, Node),
	    case GoGetIt of
		true ->
		    set({Tab, where_to_read}, Node),
		    user_sync_tab(Tab),
		    State;
		false ->
		    State
	    end;
	Storage == unknown ->
	    %% No own copy, continue to read remotely
	    add_active_replica(Tab, Node),
	    NodeST = mnesia_lib:storage_type_at_node(Node, Tab),
	    ReadST = mnesia_lib:storage_type_at_node(Read, Tab),
	    if   %% Avoid reading from disc_only_copies
		NodeST == disc_only_copies ->
		    ignore;
		ReadST == disc_only_copies ->
		    mnesia_lib:set_remote_where_to_read(Tab);
		true ->
		    ignore
	    end,
	    user_sync_tab(Tab),
	    State;
	BeingCreated == true ->
	    %% The table is currently being created
	    %% and we shall have an own copy of it.
	    %% We will load the (empty) table locally.
	    add_active_replica(Tab, Node),
	    State;
	Read == nowhere ->
	    %% Own copy, go and get a copy of the table
	    %% if the other node is master or if there
	    %% are no master at all
	    add_active_replica(Tab, Node),
	    case GoGetIt of
		true ->
		    Worker = #net_load{table = Tab,
				       reason = {active_remote, Node}},
		    add_worker(Worker, State);
		false ->
		    State
	    end;
	true ->
	    %% We already have an own copy
	    add_active_replica(Tab, Node),
	    user_sync_tab(Tab),
	    State
    end.

initial_safe_loads() ->
    case val({schema, storage_type}) of
	ram_copies ->
	    Downs = [],
	    Tabs = val({schema, local_tables}) -- [schema],
	    LastC = fun(T) -> last_consistent_replica(T, Downs) end,
	    lists:zf(LastC, Tabs);

	disc_copies ->
	    Downs = mnesia_recover:get_mnesia_downs(),
	    dbg_out("mnesia_downs = ~p~n", [Downs]),

	    Tabs = val({schema, local_tables}) -- [schema],
	    LastC = fun(T) -> last_consistent_replica(T, Downs) end,
	    lists:zf(LastC, Tabs)
    end.

last_consistent_replica(Tab, Downs) ->
    Cs = val({Tab, cstruct}),
    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
    Ram = Cs#cstruct.ram_copies,
    Disc = Cs#cstruct.disc_copies,
    DiscOnly = Cs#cstruct.disc_only_copies,
    BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs,
    BetterCopies = BetterCopies0 -- Ram,
    AccessMode = Cs#cstruct.access_mode,
    Copies = mnesia_lib:copy_holders(Cs),
    Masters = mnesia_recover:get_master_nodes(Tab),
    LocalMaster0 = lists:member(node(), Masters),
    LocalContent = Cs#cstruct.local_content,
    RemoteMaster =
	if
	    Masters == [] -> false;
	    true -> not LocalMaster0
	end,
    LocalMaster =
	if
	    Masters == [] -> false;
	    true -> LocalMaster0
	end,
    if
	Copies == [node()]  ->
	    %% Only one copy holder and it is local.
	    %% It may also be a local contents table
	    {true, {Tab, local_only}};
	LocalContent == true ->
	    {true, {Tab, local_content}};
	LocalMaster == true ->
	    %% We have a local master
	    {true, {Tab, local_master}};
	RemoteMaster == true ->
	    %% Wait for remote master copy
	    false;
	Storage == ram_copies ->
	    if
		Disc == [], DiscOnly == [] ->
		    %% Nobody has copy on disc
		    {true, {Tab, ram_only}};
		true ->
		    %% Some other node has copy on disc
		    false
	    end;
	AccessMode == read_only ->
	    %% No one has been able to update the table,
	    %% i.e. all disc resident copies are equal
	    {true, {Tab, read_only}};
	BetterCopies /= [], Masters /= [node()] ->
	    %% There are better copies on other nodes
	    %% and we do not have the only master copy
	    false;
	true ->
	    {true, {Tab, initial}}
    end.

reconfigure_tables(N, State, [Tab |Tail]) ->
    del_active_replica(Tab, N),
    case val({Tab, where_to_read}) of
	N ->  mnesia_lib:set_remote_where_to_read(Tab);
	_ ->  ignore
    end,
    LateQ = drop_loaders(Tab, N, State#state.late_loader_queue),
    reconfigure_tables(N, State#state{late_loader_queue = LateQ}, Tail);

reconfigure_tables(_, State, []) ->
    State.

remove_early_messages([], _Node) ->
    [];
remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) ->
    remove_early_messages(R, Node); %% Does a reply before queuing
remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node)
  when node(From) == Node ->
    reply(ReplyTo, ok),  %% Remove gen:server waits..
    remove_early_messages(R, Node);
remove_early_messages([{cast, {i_have_tab, _Tab, Node}}|R], Node) ->
    remove_early_messages(R, Node);
remove_early_messages([{cast, {adopt_orphans, Node, _Tabs}}|R], Node) ->
    remove_early_messages(R, Node);
remove_early_messages([M|R],Node) ->
    [M|remove_early_messages(R,Node)].

%% Drop loader from late load queue and possibly trigger a disc_load
drop_loaders(Tab, Node, [H | T]) when H#late_load.table == Tab ->
    %% Check if it is time to issue a disc_load request
    case H#late_load.loaders of
	[Node] ->
	    Reason = {H#late_load.reason, last_loader_down, Node},
	    cast({disc_load, Tab, Reason});  % Ugly cast
	_ ->
	    ignore
    end,
    %% Drop the node from the list of loaders
    H2 = H#late_load{loaders = H#late_load.loaders -- [Node]},
    [H2 | drop_loaders(Tab, Node, T)];
drop_loaders(Tab, Node, [H | T]) ->
    [H | drop_loaders(Tab, Node, T)];
drop_loaders(_, _, []) ->
    [].

add_active_replica(Tab, Node) ->
    add_active_replica(Tab, Node, val({Tab, cstruct})).

add_active_replica(Tab, Node, Cs) when record(Cs, cstruct) ->
    Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs),
    AccessMode = Cs#cstruct.access_mode,
    add_active_replica(Tab, Node, Storage, AccessMode).

%% Block table primitives

block_table(Tab) ->
    Var = {Tab, where_to_commit},
    Old = val(Var),
    New = {blocked, Old},
    set(Var, New). % where_to_commit

unblock_table(Tab) ->
    Var = {Tab, where_to_commit},
    New =
	case val(Var) of
	    {blocked, List} ->
		List;
	    List ->
		List
	end,
    set(Var, New). % where_to_commit

is_tab_blocked(W2C) when list(W2C) ->
    {false, W2C};
is_tab_blocked({blocked, W2C}) when list(W2C) ->
    {true, W2C}.

mark_blocked_tab(true, Value) ->
    {blocked, Value};
mark_blocked_tab(false, Value) ->
    Value.

%%

add_active_replica(Tab, Node, Storage, AccessMode) ->
    Var = {Tab, where_to_commit},
    {Blocked, Old} = is_tab_blocked(val(Var)),
    Del = lists:keydelete(Node, 1, Old),
    case AccessMode of
	read_write ->
	    New = lists:sort([{Node, Storage} | Del]),
	    set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit
	    add({Tab, where_to_write}, Node);
	read_only ->
	    set(Var, mark_blocked_tab(Blocked, Del)),
	    mnesia_lib:del({Tab, where_to_write}, Node)
    end,
    add({Tab, active_replicas}, Node).

del_active_replica(Tab, Node) ->
    Var = {Tab, where_to_commit},
    {Blocked, Old} = is_tab_blocked(val(Var)),
    Del = lists:keydelete(Node, 1, Old),
    New = lists:sort(Del),
    set(Var, mark_blocked_tab(Blocked, New)),      % where_to_commit
    mnesia_lib:del({Tab, active_replicas}, Node),
    mnesia_lib:del({Tab, where_to_write}, Node).

change_table_access_mode(Cs) ->
    Tab = Cs#cstruct.name,
    lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end,
		  val({Tab, active_replicas})).

%% node To now has tab loaded, but this must be undone
%% This code is rpc:call'ed from the tab_copier process
%% when it has *not* released it's table lock
unannounce_add_table_copy(Tab, To) ->
    del_active_replica(Tab, To),
    case val({Tab , where_to_read}) of
	To ->
	    mnesia_lib:set_remote_where_to_read(Tab);
	_ ->
	    ignore
    end.

user_sync_tab(Tab) ->
    case val(debug) of
	trace ->
	    mnesia_subscr:subscribe(whereis(mnesia_event), {table, Tab});
	_ ->
	    ignore
    end,

    case erase({sync_tab, Tab}) of
	undefined ->
	    ok;
	Pids ->
	    lists:foreach(fun(Pid) -> sync_reply(Pid, Tab) end, Pids)
    end.

i_have_tab(Tab) ->
    case val({Tab, local_content}) of
	true ->
	    mnesia_lib:set_local_content_whereabouts(Tab);
	false ->
	    set({Tab, where_to_read}, node())
    end,
    add_active_replica(Tab, node()).

sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema ->
    Current = val({current, db_nodes}),
    Ns =
	case lists:member(ToNode, Current) of
	    true -> Current -- [ToNode];
	    false -> Current
	end,
    remote_call(ToNode, block_table, [Tab]),
    [remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) ||
	Node <- [ToNode | Ns]],
    ok.

sync_del_table_copy_whereabouts(Tab, ToNode) when Tab /= schema ->
    Current = val({current, db_nodes}),
    Ns =
	case lists:member(ToNode, Current) of
	    true -> Current;
	    false -> [ToNode | Current]
	end,
    Args = [Tab, ToNode],
    [remote_call(Node, unannounce_add_table_copy, Args) || Node <- Ns],
    ok.

get_info(Timeout) ->
    case whereis(?SERVER_NAME) of
	undefined ->
	    {timeout, Timeout};
	Pid ->
	    Pid ! {self(), get_state},
	    receive
		{?SERVER_NAME, State} when record(State, state) ->
		    {info,State}
	    after Timeout ->
		    {timeout, Timeout}
	    end
    end.

get_workers(Timeout) ->
    case whereis(?SERVER_NAME) of
	undefined ->
	    {timeout, Timeout};
	Pid ->
	    Pid ! {self(), get_state},
	    receive
		{?SERVER_NAME, State} when record(State, state) ->
		    {workers, State#state.loader_pid, State#state.sender_pid, State#state.dumper_pid}
	    after Timeout ->
		    {timeout, Timeout}
	    end
    end.

info() ->
    Tabs = mnesia_lib:local_active_tables(),
    io:format( "---> Active tables <--- ~n", []),
    info(Tabs).

info([Tab | Tail]) ->
    case val({Tab, storage_type}) of
	disc_only_copies ->
	    info_format(Tab,
			dets:info(Tab, size),
			dets:info(Tab, file_size),
			"bytes on disc");
	_ ->
	    info_format(Tab,
			?ets_info(Tab, size),
			?ets_info(Tab, memory),
			"words of mem")
    end,
    info(Tail);
info([]) -> ok;
info(Tab) -> info([Tab]).

info_format(Tab, Size, Mem, Media) ->
    StrT = mnesia_lib:pad_name(atom_to_list(Tab), 15, []),
    StrS = mnesia_lib:pad_name(integer_to_list(Size), 8, []),
    StrM = mnesia_lib:pad_name(integer_to_list(Mem), 8, []),
    io:format("~s: with ~s records occupying ~s ~s~n",
	      [StrT, StrS, StrM, Media]).

%% Handle early arrived messages
handle_early_msgs([Msg | Msgs], State) ->
    %% The messages are in reverse order
    case handle_early_msg(Msg, State) of
        {stop, Reason, Reply, State2} ->
	    {stop, Reason, Reply, State2};
        {stop, Reason, State2} ->
	    {stop, Reason, State2};
	{noreply, State2} ->
	    handle_early_msgs(Msgs, State2);
	{noreply, State2, _Timeout} ->
	    handle_early_msgs(Msgs, State2);
	Else ->
	    dbg_out("handle_early_msgs case clause ~p ~n", [Else]),
	    erlang:error(Else, [[Msg | Msgs], State])
    end;
handle_early_msgs([], State) ->
    noreply(State).

handle_early_msg({call, Msg, From}, State) ->
    handle_call(Msg, From, State);
handle_early_msg({cast, Msg}, State) ->
    handle_cast(Msg, State);
handle_early_msg({info, Msg}, State) ->
    handle_info(Msg, State).

noreply(State) ->
    {noreply, State}.

reply(undefined, Reply) ->
    Reply;
reply(ReplyTo, Reply) ->
    gen_server:reply(ReplyTo, Reply),
    Reply.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Worker management

%% Returns new State
add_worker(Worker, State) when record(Worker, dump_log) ->
    InitBy = Worker#dump_log.initiated_by,
    Queue = State#state.dumper_queue,
    case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of
	false ->
	    ignore;
	true when Worker#dump_log.opt_reply_to == undefined ->
	    %% The same threshold has been exceeded again,
	    %% before we have had the possibility to
	    %% process the older one.
	    DetectedBy = {dump_log, InitBy},
	    Event = {mnesia_overload, DetectedBy},
	    mnesia_lib:report_system_event(Event)
    end,
    Queue2 = Queue ++ [Worker],
    State2 = State#state{dumper_queue = Queue2},
    opt_start_worker(State2);
add_worker(Worker, State) when record(Worker, schema_commit_lock) ->
    Queue = State#state.dumper_queue,
    Queue2 = Queue ++ [Worker],
    State2 = State#state{dumper_queue = Queue2},
    opt_start_worker(State2);
add_worker(Worker, State) when record(Worker, net_load) ->
    Queue = State#state.loader_queue,
    State2 = State#state{loader_queue = Queue ++ [Worker]},
    opt_start_worker(State2);
add_worker(Worker, State) when record(Worker, send_table) ->
    Queue = State#state.sender_queue,
    State2 = State#state{sender_queue = Queue ++ [Worker]},
    opt_start_worker(State2);
add_worker(Worker, State) when record(Worker, disc_load) ->
    Queue = State#state.loader_queue,
    State2 = State#state{loader_queue = Queue ++ [Worker]},
    opt_start_worker(State2);
% Block controller should be used for upgrading mnesia.
add_worker(Worker, State) when record(Worker, block_controller) ->
    Queue = State#state.dumper_queue,
    Queue2 = [Worker | Queue],
    State2 = State#state{dumper_queue = Queue2},
    opt_start_worker(State2).

%% Optionally start a worker
%%
%% Dumpers and loaders may run simultaneously
%% but neither of them may run during schema commit.
%% Loaders may not start if a schema commit is enqueued.
opt_start_worker(State) when State#state.is_stopping == true ->
    State;
opt_start_worker(State) ->
    %% Prioritize dumper and schema commit
    %% by checking them first
    case State#state.dumper_queue of
	[Worker | _Rest] when State#state.dumper_pid == undefined ->
	    %% Great, a worker in queue and neither
	    %% a schema transaction is being
	    %% committed and nor a dumper is running

	    %% Start worker but keep him in the queue
	    if
		record(Worker, schema_commit_lock) ->
		    ReplyTo = Worker#schema_commit_lock.owner,
		    reply(ReplyTo, granted),
		    {Owner, _Tag} = ReplyTo,
		    State#state{dumper_pid = Owner};

		record(Worker, dump_log) ->
		    Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]),
		    State2 = State#state{dumper_pid = Pid},

		    %% If the worker was a dumper we may
		    %% possibly be able to start a loader
		    %% or sender
		    State3 = opt_start_sender(State2),
		    opt_start_loader(State3);

		record(Worker, block_controller) ->
		    case {State#state.sender_pid, State#state.loader_pid} of
			{undefined, undefined} ->
			    ReplyTo = Worker#block_controller.owner,
			    reply(ReplyTo, granted),
			    {Owner, _Tag} = ReplyTo,
			    State#state{dumper_pid = Owner};
			_ ->
			    State
		    end
	    end;
	_ ->
	    %% Bad luck, try with a loader or sender instead
	    State2 = opt_start_sender(State),
	    opt_start_loader(State2)
    end.

opt_start_sender(State) ->
    case State#state.sender_queue of
	[]->
	    %% No need
	    State;

	_ when State#state.sender_pid /= undefined ->
	    %% Bad luck, a sender is already running
	    State;

	[Sender | _SenderRest] ->
	    case State#state.loader_queue of
		[Loader | _LoaderRest]
		when State#state.loader_pid /= undefined,
		     Loader#net_load.table == Sender#send_table.table ->
		    %% A conflicting loader is running
		    State;
		_ ->
		    SchemaQueue = State#state.dumper_queue,
		    case lists:keymember(schema_commit, 1, SchemaQueue) of
			false ->

			    %% Start worker but keep him in the queue
			    Pid = spawn_link(?MODULE, send_and_reply,
					     [self(), Sender]),
			    State#state{sender_pid = Pid};
			true ->
			    %% Bad luck, we must wait for the schema commit
			    State
		    end
	    end
    end.

opt_start_loader(State) ->
    LoaderQueue = State#state.loader_queue,
    if
	LoaderQueue == [] ->
	    %% No need
	    State;

	State#state.loader_pid /= undefined ->
	    %% Bad luck, an loader is already running
	    State;

	true ->
	    SchemaQueue = State#state.dumper_queue,
	    case lists:keymember(schema_commit, 1, SchemaQueue) of
		false ->
		    {Worker, Rest} = pick_next(LoaderQueue),

		    %% Start worker but keep him in the queue
		    Pid = spawn_link(?MODULE, load_and_reply, [self(), Worker]),
		    State#state{loader_pid = Pid,
				loader_queue = [Worker | Rest]};
		true ->
		    %% Bad luck, we must wait for the schema commit
		    State
	    end
    end.

start_remote_sender(Node, Tab, Receiver, Storage) ->
    Msg = #send_table{table = Tab,
		      receiver_pid = Receiver,
		      remote_storage = Storage},
    gen_server:cast({?SERVER_NAME, Node}, Msg).

dump_and_reply(ReplyTo, Worker) ->
    %% No trap_exit, die intentionally instead
    Res = mnesia_dumper:opt_dump_log(Worker#dump_log.initiated_by),
    ReplyTo ! #dumper_done{worker_pid = self(),
			   worker_res = Res},
    unlink(ReplyTo),
    exit(normal).

send_and_reply(ReplyTo, Worker) ->
    %% No trap_exit, die intentionally instead
    Res = mnesia_loader:send_table(Worker#send_table.receiver_pid,
				   Worker#send_table.table,
				   Worker#send_table.remote_storage),
    ReplyTo ! #sender_done{worker_pid = self(),
			   worker_res = Res},
    unlink(ReplyTo),
    exit(normal).


load_and_reply(ReplyTo, Worker) ->
    process_flag(trap_exit, true),
    Done = load_table(Worker),
    ReplyTo ! Done#loader_done{worker_pid = self()},
    unlink(ReplyTo),
    exit(normal).

%% Now it is time to load the table
%% but first we must check if it still is neccessary
load_table(Load) when record(Load, net_load) ->
    Tab = Load#net_load.table,
    ReplyTo = Load#net_load.opt_reply_to,
    Reason =  Load#net_load.reason,
    LocalC = val({Tab, local_content}),
    AccessMode = val({Tab, access_mode}),
    ReadNode = val({Tab, where_to_read}),
    Active = filter_active(Tab),
    Done = #loader_done{is_loaded = true,
			table_name = Tab,
			needs_announce = false,
			needs_sync = false,
			needs_reply = true,
			reply_to = ReplyTo,
			reply = {loaded, ok}
		       },
    if
	ReadNode == node() ->
	    %% Already loaded locally
	    Done;
	LocalC == true ->
	    Res = mnesia_loader:disc_load_table(Tab, load_local_content),
	    Done#loader_done{reply = Res, needs_announce = true, needs_sync = true};
	AccessMode == read_only ->
	    disc_load_table(Tab, Reason, ReplyTo);
	true ->
	    %% Either we cannot read the table yet
	    %% or someone is moving a replica between
	    %% two nodes
	    Cs =  Load#net_load.cstruct,
	    Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs),
	    case Res of
		{loaded, ok} ->
		    Done#loader_done{needs_sync = true,
				     reply = Res};
		{not_loaded, storage_unknown} ->
		    Done#loader_done{reply = Res};
		{not_loaded, _} ->
		    Done#loader_done{is_loaded = false,
				     needs_reply = false,
				     reply = Res}
	    end
    end;

load_table(Load) when record(Load, disc_load) ->
    Tab = Load#disc_load.table,
    Reason =  Load#disc_load.reason,
    ReplyTo = Load#disc_load.opt_reply_to,
    ReadNode = val({Tab, where_to_read}),
    Active = filter_active(Tab),
    Done = #loader_done{is_loaded = true,
			table_name = Tab,
			needs_announce = false,
			needs_sync = false,
			needs_reply = false
		       },
    if
	Active == [], ReadNode == nowhere ->
	    %% Not loaded anywhere, lets load it from disc
	    disc_load_table(Tab, Reason, ReplyTo);
	ReadNode == nowhere ->
	    %% Already loaded on other node, lets get it
	    Cs = val({Tab, cstruct}),
	    case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of
		{loaded, ok} ->
		    Done#loader_done{needs_sync = true};
		{not_loaded, storage_unknown} ->
		    Done#loader_done{is_loaded = false};
		{not_loaded, ErrReason} ->
		    Done#loader_done{is_loaded = false,
				     reply = {not_loaded,ErrReason}}
	    end;
	true ->
	    %% Already readable, do not worry be happy
	    Done
    end.

disc_load_table(Tab, Reason, ReplyTo) ->
    Done = #loader_done{is_loaded = true,
			table_name = Tab,
			needs_announce = false,
			needs_sync = false,
			needs_reply = true,
			reply_to = ReplyTo,
			reply = {loaded, ok}
		       },
    Res = mnesia_loader:disc_load_table(Tab, Reason),
    if
	Res == {loaded, ok} ->
	    Done#loader_done{needs_announce = true,
			     needs_sync = true,
			     reply = Res};
	ReplyTo /= undefined ->
	    Done#loader_done{is_loaded = false,
			     reply = Res};
	true ->
	    fatal("Cannot load table ~p from disc: ~p~n", [Tab, Res])
    end.

filter_active(Tab) ->
    ByForce = val({Tab, load_by_force}),
    Active = val({Tab, active_replicas}),
    Masters = mnesia_recover:get_master_nodes(Tab),
    do_filter_active(ByForce, Active, Masters).

do_filter_active(true, Active, _Masters) ->
    Active;
do_filter_active(false, Active, []) ->
    Active;
do_filter_active(false, Active, Masters) ->
    mnesia_lib:intersect(Active, Masters).