aboutsummaryrefslogtreecommitdiffstats
path: root/lib/mnesia/src/mnesia_controller.erl
diff options
context:
space:
mode:
authorErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
committerErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/mnesia/src/mnesia_controller.erl
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/mnesia/src/mnesia_controller.erl')
-rw-r--r--lib/mnesia/src/mnesia_controller.erl2182
1 files changed, 2182 insertions, 0 deletions
diff --git a/lib/mnesia/src/mnesia_controller.erl b/lib/mnesia/src/mnesia_controller.erl
new file mode 100644
index 0000000000..9bc480e619
--- /dev/null
+++ b/lib/mnesia/src/mnesia_controller.erl
@@ -0,0 +1,2182 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% The mnesia_init process loads tables from local disc or from
+%% another nodes. It also coordinates updates of the info about
+%% where we can read and write tables.
+%%
+%% Tables may need to be loaded initially at startup of the local
+%% node or when other nodes announces that they already have loaded
+%% tables that we also want.
+%%
+%% Initially we set the load request queue to those tables that we
+%% safely can load locally, i.e. tables where we have the last
+%% consistent replica and we have received mnesia_down from all
+%% other nodes holding the table. Then we let the mnesia_init
+%% process enter its normal working state.
+%%
+%% When we need to load a table we append a request to the load
+%% request queue. All other requests are regarded as high priority
+%% and are processed immediately (e.g. update table whereabouts).
+%% We processes the load request queue as a "background" job..
+
+-module(mnesia_controller).
+
+-behaviour(gen_server).
+
+%% Mnesia internal stuff
+-export([
+ start/0,
+ i_have_tab/1,
+ info/0,
+ get_info/1,
+ get_workers/1,
+ force_load_table/1,
+ async_dump_log/1,
+ sync_dump_log/1,
+ connect_nodes/1,
+ wait_for_schema_commit_lock/0,
+ release_schema_commit_lock/0,
+ create_table/1,
+ get_disc_copy/1,
+ get_cstructs/0,
+ sync_and_block_table_whereabouts/4,
+ sync_del_table_copy_whereabouts/2,
+ block_table/1,
+ unblock_table/1,
+ block_controller/0,
+ unblock_controller/0,
+ unannounce_add_table_copy/2,
+ master_nodes_updated/2,
+ mnesia_down/1,
+ add_active_replica/2,
+ add_active_replica/3,
+ add_active_replica/4,
+ update/1,
+ change_table_access_mode/1,
+ del_active_replica/2,
+ wait_for_tables/2,
+ get_network_copy/2,
+ merge_schema/0,
+ start_remote_sender/4,
+ schedule_late_disc_load/2
+ ]).
+
+%% gen_server callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+
+%% Module internal stuff
+-export([call/1,
+ cast/1,
+ dump_and_reply/2,
+ load_and_reply/2,
+ send_and_reply/2,
+ wait_for_tables_init/2,
+ connect_nodes2/2
+ ]).
+
+-import(mnesia_lib, [set/2, add/2]).
+-import(mnesia_lib, [fatal/2, error/2, verbose/2, dbg_out/2]).
+
+-include("mnesia.hrl").
+
+-define(SERVER_NAME, ?MODULE).
+
+-record(state, {supervisor,
+ schema_is_merged = false,
+ early_msgs = [],
+ loader_pid = [], %% Was Pid is now [{Pid,Work}|..]
+ loader_queue, %% Was list is now gb_tree
+ sender_pid = [], %% Was a pid or undef is now [{Pid,Work}|..]
+ sender_queue = [],
+ late_loader_queue, %% Was list is now gb_tree
+ dumper_pid, %% Dumper or schema commit pid
+ dumper_queue = [], %% Dumper or schema commit queue
+ others = [], %% Processes that needs the copier_done msg
+ dump_log_timer_ref,
+ is_stopping = false
+ }).
+%% Backwards Comp. Sender_pid is now a list of senders..
+get_senders(#state{sender_pid = Pids}) when is_list(Pids) -> Pids.
+%% Backwards Comp. loader_pid is now a list of loaders..
+get_loaders(#state{loader_pid = Pids}) when is_list(Pids) -> Pids.
+max_loaders() ->
+ case ?catch_val(no_table_loaders) of
+ {'EXIT', _} ->
+ mnesia_lib:set(no_table_loaders,1),
+ 1;
+ Val -> Val
+ end.
+
+-record(schema_commit_lock, {owner}).
+-record(block_controller, {owner}).
+
+-record(dump_log, {initiated_by,
+ opt_reply_to
+ }).
+
+-record(net_load, {table,
+ reason,
+ opt_reply_to,
+ cstruct = unknown
+ }).
+
+-record(send_table, {table,
+ receiver_pid,
+ remote_storage
+ }).
+
+-record(disc_load, {table,
+ reason,
+ opt_reply_to
+ }).
+
+-record(late_load, {table,
+ reason,
+ opt_reply_to,
+ loaders
+ }).
+
+-record(loader_done, {worker_pid,
+ is_loaded,
+ table_name,
+ needs_announce,
+ needs_sync,
+ needs_reply,
+ reply_to,
+ reply}).
+
+-record(sender_done, {worker_pid,
+ worker_res,
+ table_name
+ }).
+
+-record(dumper_done, {worker_pid,
+ worker_res
+ }).
+
+val(Var) ->
+ case ?catch_val(Var) of
+ {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason);
+ Value -> Value
+ end.
+
+start() ->
+ gen_server:start_link({local, ?SERVER_NAME}, ?MODULE, [self()],
+ [{timeout, infinity}
+ %% ,{debug, [trace]}
+ ]).
+
+sync_dump_log(InitBy) ->
+ call({sync_dump_log, InitBy}).
+
+async_dump_log(InitBy) ->
+ ?SERVER_NAME ! {async_dump_log, InitBy}.
+
+%% Wait for tables to be active
+%% If needed, we will wait for Mnesia to start
+%% If Mnesia stops, we will wait for Mnesia to restart
+%% We will wait even if the list of tables is empty
+%%
+wait_for_tables(Tabs, Timeout) when is_list(Tabs), Timeout == infinity ->
+ do_wait_for_tables(Tabs, Timeout);
+wait_for_tables(Tabs, Timeout) when is_list(Tabs),
+ is_integer(Timeout), Timeout >= 0 ->
+ do_wait_for_tables(Tabs, Timeout);
+wait_for_tables(Tabs, Timeout) ->
+ {error, {badarg, Tabs, Timeout}}.
+
+do_wait_for_tables(Tabs, 0) ->
+ reply_wait(Tabs);
+do_wait_for_tables(Tabs, Timeout) ->
+ Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]),
+ receive
+ {?SERVER_NAME, Pid, Res} ->
+ Res;
+ {'EXIT', Pid, _} ->
+ reply_wait(Tabs)
+ after Timeout ->
+ unlink(Pid),
+ exit(Pid, timeout),
+ reply_wait(Tabs)
+ end.
+
+reply_wait(Tabs) ->
+ case catch mnesia_lib:active_tables() of
+ {'EXIT', _} ->
+ {error, {node_not_running, node()}};
+ Active when is_list(Active) ->
+ case Tabs -- Active of
+ [] ->
+ ok;
+ BadTabs ->
+ {timeout, BadTabs}
+ end
+ end.
+
+wait_for_tables_init(From, Tabs) ->
+ process_flag(trap_exit, true),
+ Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)),
+ From ! {?SERVER_NAME, self(), Res},
+ unlink(From),
+ exit(normal).
+
+wait_for_init(From, Tabs, Init) ->
+ case catch link(Init) of
+ {'EXIT', _} ->
+ %% Mnesia is not started
+ {error, {node_not_running, node()}};
+ true when is_pid(Init) ->
+ cast({sync_tabs, Tabs, self()}),
+ rec_tabs(Tabs, Tabs, From, Init)
+ end.
+
+sync_reply(Waiter, Tab) ->
+ Waiter ! {?SERVER_NAME, {tab_synced, Tab}}.
+
+rec_tabs([Tab | Tabs], AllTabs, From, Init) ->
+ receive
+ {?SERVER_NAME, {tab_synced, Tab}} ->
+ rec_tabs(Tabs, AllTabs, From, Init);
+
+ {'EXIT', From, _} ->
+ %% This will trigger an exit signal
+ %% to mnesia_init
+ exit(wait_for_tables_timeout);
+
+ {'EXIT', Init, _} ->
+ %% Oops, mnesia_init stopped,
+ exit(mnesia_stopped)
+ end;
+rec_tabs([], _, _, Init) ->
+ unlink(Init),
+ ok.
+
+get_cstructs() ->
+ call(get_cstructs).
+
+update(Fun) ->
+ call({update,Fun}).
+
+
+mnesia_down(Node) ->
+ case cast({mnesia_down, Node}) of
+ {error, _} -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node);
+ _Pid -> ok
+ end.
+wait_for_schema_commit_lock() ->
+ link(whereis(?SERVER_NAME)),
+ unsafe_call(wait_for_schema_commit_lock).
+
+block_controller() ->
+ call(block_controller).
+
+unblock_controller() ->
+ cast(unblock_controller).
+
+release_schema_commit_lock() ->
+ cast({release_schema_commit_lock, self()}),
+ unlink(whereis(?SERVER_NAME)).
+
+%% Special for preparation of add table copy
+get_network_copy(Tab, Cs) ->
+% We can't let the controller queue this one
+% because that may cause a deadlock between schema_operations
+% and initial tableloadings which both takes schema locks.
+% But we have to get copier_done msgs when the other side
+% goes down.
+ call({add_other, self()}),
+ Reason = {dumper,add_table_copy},
+ Work = #net_load{table = Tab,reason = Reason,cstruct = Cs},
+ %% I'll need this cause it's linked trough the subscriber
+ %% might be solved by using monitor in subscr instead.
+ process_flag(trap_exit, true),
+ Load = load_table_fun(Work),
+ Res = (catch Load()),
+ process_flag(trap_exit, false),
+ call({del_other, self()}),
+ case Res of
+ #loader_done{is_loaded = true} ->
+ Tab = Res#loader_done.table_name,
+ case Res#loader_done.needs_announce of
+ true ->
+ i_have_tab(Tab);
+ false ->
+ ignore
+ end,
+ Res#loader_done.reply;
+ #loader_done{} ->
+ Res#loader_done.reply;
+ Else ->
+ {not_loaded, Else}
+ end.
+
+%% This functions is invoked from the dumper
+%%
+%% There are two cases here:
+%% startup ->
+%% no need for sync, since mnesia_controller not started yet
+%% schema_trans ->
+%% already synced with mnesia_controller since the dumper
+%% is syncronously started from mnesia_controller
+
+create_table(Tab) ->
+ {loaded, ok} = mnesia_loader:disc_load_table(Tab, {dumper,create_table}).
+
+get_disc_copy(Tab) ->
+ disc_load_table(Tab, {dumper,change_table_copy_type}, undefined).
+
+%% Returns ok instead of yes
+force_load_table(Tab) when is_atom(Tab), Tab /= schema ->
+ case ?catch_val({Tab, storage_type}) of
+ ram_copies ->
+ do_force_load_table(Tab);
+ disc_copies ->
+ do_force_load_table(Tab);
+ disc_only_copies ->
+ do_force_load_table(Tab);
+ unknown ->
+ set({Tab, load_by_force}, true),
+ cast({force_load_updated, Tab}),
+ wait_for_tables([Tab], infinity);
+ {'EXIT', _} ->
+ {error, {no_exists, Tab}}
+ end;
+force_load_table(Tab) ->
+ {error, {bad_type, Tab}}.
+
+do_force_load_table(Tab) ->
+ Loaded = ?catch_val({Tab, load_reason}),
+ case Loaded of
+ unknown ->
+ set({Tab, load_by_force}, true),
+ mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
+ wait_for_tables([Tab], infinity);
+ {'EXIT', _} ->
+ set({Tab, load_by_force}, true),
+ mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
+ wait_for_tables([Tab], infinity);
+ _ ->
+ ok
+ end.
+master_nodes_updated(schema, _Masters) ->
+ ignore;
+master_nodes_updated(Tab, Masters) ->
+ cast({master_nodes_updated, Tab, Masters}).
+
+schedule_late_disc_load(Tabs, Reason) ->
+ MsgTag = late_disc_load,
+ try_schedule_late_disc_load(Tabs, Reason, MsgTag).
+
+try_schedule_late_disc_load(Tabs, _Reason, MsgTag)
+ when Tabs == [], MsgTag /= schema_is_merged ->
+ ignore;
+try_schedule_late_disc_load(Tabs, Reason, MsgTag) ->
+ GetIntents =
+ fun() ->
+ Item = mnesia_late_disc_load,
+ Nodes = val({current, db_nodes}),
+ mnesia:lock({global, Item, Nodes}, write),
+ case multicall(Nodes -- [node()], disc_load_intents) of
+ {Replies, []} ->
+ call({MsgTag, Tabs, Reason, Replies}),
+ done;
+ {_, BadNodes} ->
+ %% Some nodes did not respond, lets try again
+ {retry, BadNodes}
+ end
+ end,
+ case mnesia:transaction(GetIntents) of
+ {atomic, done} ->
+ done;
+ {atomic, {retry, BadNodes}} ->
+ verbose("Retry late_load_tables because bad nodes: ~p~n",
+ [BadNodes]),
+ try_schedule_late_disc_load(Tabs, Reason, MsgTag);
+ {aborted, AbortReason} ->
+ fatal("Cannot late_load_tables~p: ~p~n",
+ [[Tabs, Reason, MsgTag], AbortReason])
+ end.
+
+connect_nodes(Ns) ->
+ case mnesia:system_info(is_running) of
+ no ->
+ {error, {node_not_running, node()}};
+ yes ->
+ Pid = spawn_link(?MODULE,connect_nodes2,[self(),Ns]),
+ receive
+ {?MODULE, Pid, Res, New} ->
+ case Res of
+ ok ->
+ mnesia_lib:add_list(extra_db_nodes, New),
+ {ok, New};
+ {aborted, {throw, Str}} when is_list(Str) ->
+ %%mnesia_recover:disconnect_nodes(New),
+ {error, {merge_schema_failed, lists:flatten(Str)}};
+ Else ->
+ {error, Else}
+ end;
+ {'EXIT', Pid, Reason} ->
+ {error, Reason}
+ end
+ end.
+
+connect_nodes2(Father, Ns) ->
+ Current = val({current, db_nodes}),
+ abcast([node()|Ns], {merging_schema, node()}),
+ {NewC, OldC} = mnesia_recover:connect_nodes(Ns),
+ Connected = NewC ++OldC,
+ New1 = mnesia_lib:intersect(Ns, Connected),
+ New = New1 -- Current,
+ process_flag(trap_exit, true),
+ Res = try_merge_schema(New),
+ Msg = {schema_is_merged, [], late_merge, []},
+ multicall([node()|Ns], Msg),
+ After = val({current, db_nodes}),
+ Father ! {?MODULE, self(), Res, mnesia_lib:intersect(Ns,After)},
+ unlink(Father),
+ ok.
+
+%% Merge the local schema with the schema on other nodes.
+%% But first we must let all processes that want to force
+%% load tables wait until the schema merge is done.
+
+merge_schema() ->
+ AllNodes = mnesia_lib:all_nodes(),
+ case try_merge_schema(AllNodes) of
+ ok ->
+ schema_is_merged();
+ {aborted, {throw, Str}} when is_list(Str) ->
+ fatal("Failed to merge schema: ~s~n", [Str]);
+ Else ->
+ fatal("Failed to merge schema: ~p~n", [Else])
+ end.
+
+try_merge_schema(Nodes) ->
+ case mnesia_schema:merge_schema() of
+ {atomic, not_merged} ->
+ %% No more nodes that we need to merge the schema with
+ ok;
+ {atomic, {merged, OldFriends, NewFriends}} ->
+ %% Check if new nodes has been added to the schema
+ Diff = mnesia_lib:all_nodes() -- [node() | Nodes],
+ mnesia_recover:connect_nodes(Diff),
+
+ %% Tell everybody to adopt orphan tables
+ im_running(OldFriends, NewFriends),
+ im_running(NewFriends, OldFriends),
+
+ try_merge_schema(Nodes);
+ {atomic, {"Cannot get cstructs", Node, Reason}} ->
+ dbg_out("Cannot get cstructs, Node ~p ~p~n", [Node, Reason]),
+ timer:sleep(1000), % Avoid a endless loop look alike
+ try_merge_schema(Nodes);
+ Other ->
+ Other
+ end.
+
+im_running(OldFriends, NewFriends) ->
+ abcast(OldFriends, {im_running, node(), NewFriends}).
+
+schema_is_merged() ->
+ MsgTag = schema_is_merged,
+ SafeLoads = initial_safe_loads(),
+
+ %% At this point we do not know anything about
+ %% which tables that the other nodes already
+ %% has loaded and therefore we let the normal
+ %% processing of the loader_queue take care
+ %% of it, since we at that time point will
+ %% know the whereabouts. We rely on the fact
+ %% that all nodes tells each other directly
+ %% when they have loaded a table and are
+ %% willing to share it.
+
+ try_schedule_late_disc_load(SafeLoads, initial, MsgTag).
+
+
+cast(Msg) ->
+ case whereis(?SERVER_NAME) of
+ undefined ->{error, {node_not_running, node()}};
+ Pid -> gen_server:cast(Pid, Msg)
+ end.
+
+abcast(Nodes, Msg) ->
+ gen_server:abcast(Nodes, ?SERVER_NAME, Msg).
+
+unsafe_call(Msg) ->
+ case whereis(?SERVER_NAME) of
+ undefined -> {error, {node_not_running, node()}};
+ Pid -> gen_server:call(Pid, Msg, infinity)
+ end.
+
+call(Msg) ->
+ case whereis(?SERVER_NAME) of
+ undefined ->
+ {error, {node_not_running, node()}};
+ Pid ->
+ link(Pid),
+ Res = gen_server:call(Pid, Msg, infinity),
+ unlink(Pid),
+
+ %% We get an exit signal if server dies
+ receive
+ {'EXIT', Pid, _Reason} ->
+ {error, {node_not_running, node()}}
+ after 0 ->
+ Res
+ end
+ end.
+
+remote_call(Node, Func, Args) ->
+ case catch gen_server:call({?MODULE, Node}, {Func, Args, self()}, infinity) of
+ {'EXIT', Error} ->
+ {error, Error};
+ Else ->
+ Else
+ end.
+
+multicall(Nodes, Msg) ->
+ {Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity),
+ PatchedGood = [Reply || {_Node, Reply} <- Good],
+ {PatchedGood, Bad}. %% Make the replies look like rpc:multicalls..
+%% rpc:multicall(Nodes, ?MODULE, call, [Msg]).
+
+%%%----------------------------------------------------------------------
+%%% Callback functions from gen_server
+%%%----------------------------------------------------------------------
+
+%%----------------------------------------------------------------------
+%% Func: init/1
+%% Returns: {ok, State} |
+%% {ok, State, Timeout} |
+%% {stop, Reason}
+%%----------------------------------------------------------------------
+init([Parent]) ->
+ process_flag(trap_exit, true),
+ mnesia_lib:verbose("~p starting: ~p~n", [?SERVER_NAME, self()]),
+
+ %% Handshake and initialize transaction recovery
+ %% for new nodes detected in the schema
+ All = mnesia_lib:all_nodes(),
+ Diff = All -- [node() | val(original_nodes)],
+ mnesia_lib:unset(original_nodes),
+ mnesia_recover:connect_nodes(Diff),
+
+ Interval = mnesia_monitor:get_env(dump_log_time_threshold),
+ Msg = {async_dump_log, time_threshold},
+ {ok, Ref} = timer:send_interval(Interval, Msg),
+ mnesia_dumper:start_regulator(),
+
+ Empty = gb_trees:empty(),
+ {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref,
+ loader_queue = Empty,
+ late_loader_queue = Empty}}.
+
+%%----------------------------------------------------------------------
+%% Func: handle_call/3
+%% Returns: {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} | (terminate/2 is called)
+%% {stop, Reason, Reply, State} (terminate/2 is called)
+%%----------------------------------------------------------------------
+
+handle_call({sync_dump_log, InitBy}, From, State) ->
+ Worker = #dump_log{initiated_by = InitBy,
+ opt_reply_to = From
+ },
+ State2 = add_worker(Worker, State),
+ noreply(State2);
+
+handle_call(wait_for_schema_commit_lock, From, State) ->
+ Worker = #schema_commit_lock{owner = From},
+ State2 = add_worker(Worker, State),
+ noreply(State2);
+
+handle_call(block_controller, From, State) ->
+ Worker = #block_controller{owner = From},
+ State2 = add_worker(Worker, State),
+ noreply(State2);
+
+handle_call({update,Fun}, From, State) ->
+ Res = (catch Fun()),
+ reply(From, Res),
+ noreply(State);
+
+handle_call(get_cstructs, From, State) ->
+ Tabs = val({schema, tables}),
+ Cstructs = [val({T, cstruct}) || T <- Tabs],
+ Running = val({current, db_nodes}),
+ reply(From, {cstructs, Cstructs, Running}),
+ noreply(State);
+
+handle_call({schema_is_merged, [], late_merge, []}, From,
+ State = #state{schema_is_merged = Merged}) ->
+ case Merged of
+ {false, Node} when Node == node(From) ->
+ Msgs = State#state.early_msgs,
+ State1 = State#state{early_msgs = [], schema_is_merged = true},
+ handle_early_msgs(lists:reverse(Msgs), State1);
+ _ ->
+ %% Ooops this came to early, before we have merged :-)
+ %% or it came to late or from a node we don't care about
+ reply(From, ignore),
+ noreply(State)
+ end;
+
+handle_call({schema_is_merged, TabsR, Reason, RemoteLoaders}, From, State) ->
+ State2 = late_disc_load(TabsR, Reason, RemoteLoaders, From, State),
+
+ %% Handle early messages
+ Msgs = State2#state.early_msgs,
+ State3 = State2#state{early_msgs = [], schema_is_merged = true},
+ handle_early_msgs(lists:reverse(Msgs), State3);
+
+handle_call(disc_load_intents,From,State = #state{loader_queue=LQ,late_loader_queue=LLQ}) ->
+ LQTabs = gb_trees:keys(LQ),
+ LLQTabs = gb_trees:keys(LLQ),
+ ActiveTabs = lists:sort(mnesia_lib:local_active_tables()),
+ reply(From, {ok, node(), ordsets:union([LQTabs,LLQTabs,ActiveTabs])}),
+ noreply(State);
+
+handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) ->
+ Current = val({current, db_nodes}),
+ Res =
+ case lists:member(AddNode, Current) and
+ (State#state.schema_is_merged == true) of
+ true ->
+ mnesia_lib:add_lsort({Tab, where_to_write}, AddNode);
+ false ->
+ ignore
+ end,
+ {reply, Res, State};
+
+handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
+ ReplyTo, State) ->
+ KnownNode = lists:member(ToNode, val({current, db_nodes})),
+ Merged = State#state.schema_is_merged,
+ if
+ KnownNode == false ->
+ reply(ReplyTo, ignore),
+ noreply(State);
+ Merged == true ->
+ Res = case ?catch_val({Tab, cstruct}) of
+ {'EXIT', _} -> %% Tab deleted
+ deleted;
+ _ ->
+ add_active_replica(Tab, ToNode, RemoteS, AccessMode)
+ end,
+ reply(ReplyTo, Res),
+ noreply(State);
+ true -> %% Schema is not merged
+ Msg = {add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
+ Msgs = State#state.early_msgs,
+ reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge
+ noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
+ end;
+
+handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) ->
+ KnownNode = lists:member(node(From), val({current, db_nodes})),
+ Merged = State#state.schema_is_merged,
+ if
+ KnownNode == false ->
+ reply(ReplyTo, ignore),
+ noreply(State);
+ Merged == true ->
+ Res = unannounce_add_table_copy(Tab, Node),
+ reply(ReplyTo, Res),
+ noreply(State);
+ true -> %% Schema is not merged
+ Msg = {unannounce_add_table_copy, [Tab, Node], From},
+ Msgs = State#state.early_msgs,
+ reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge
+ %% Set ReplyTO to undefined so we don't reply twice
+ noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
+ end;
+
+handle_call({net_load, Tab, Cs}, From, State) ->
+ State2 =
+ case State#state.schema_is_merged of
+ true ->
+ Worker = #net_load{table = Tab,
+ opt_reply_to = From,
+ reason = {dumper,add_table_copy},
+ cstruct = Cs
+ },
+ add_worker(Worker, State);
+ false ->
+ reply(From, {not_loaded, schema_not_merged}),
+ State
+ end,
+ noreply(State2);
+
+handle_call(Msg, From, State) when State#state.schema_is_merged /= true ->
+ %% Buffer early messages
+ Msgs = State#state.early_msgs,
+ noreply(State#state{early_msgs = [{call, Msg, From} | Msgs]});
+
+handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) ->
+ State2 = late_disc_load(Tabs, Reason, RemoteLoaders, From, State),
+ noreply(State2);
+
+handle_call({unblock_table, Tab}, _Dummy, State) ->
+ Var = {Tab, where_to_commit},
+ case val(Var) of
+ {blocked, List} ->
+ set(Var, List); % where_to_commit
+ _ ->
+ ignore
+ end,
+ {reply, ok, State};
+
+handle_call({block_table, [Tab], From}, _Dummy, State) ->
+ case lists:member(node(From), val({current, db_nodes})) of
+ true ->
+ block_table(Tab);
+ false ->
+ ignore
+ end,
+ {reply, ok, State};
+
+handle_call({check_w2r, _Node, Tab}, _From, State) ->
+ {reply, val({Tab, where_to_read}), State};
+
+handle_call({add_other, Who}, _From, State = #state{others=Others0}) ->
+ Others = [Who|Others0],
+ {reply, ok, State#state{others=Others}};
+handle_call({del_other, Who}, _From, State = #state{others=Others0}) ->
+ Others = lists:delete(Who, Others0),
+ {reply, ok, State#state{others=Others}};
+
+handle_call(Msg, _From, State) ->
+ error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]),
+ noreply(State).
+
+late_disc_load(TabsR, Reason, RemoteLoaders, From,
+ State = #state{loader_queue = LQ, late_loader_queue = LLQ}) ->
+ verbose("Intend to load tables: ~p~n", [TabsR]),
+ ?eval_debug_fun({?MODULE, late_disc_load},
+ [{tabs, TabsR},
+ {reason, Reason},
+ {loaders, RemoteLoaders}]),
+
+ reply(From, queued),
+ %% RemoteLoaders is a list of {ok, Node, Tabs} tuples
+
+ %% Remove deleted tabs and queued/loaded
+ LocalTabs = gb_sets:from_ordset(lists:sort(mnesia_lib:val({schema,local_tables}))),
+ Filter = fun(TabInfo0, Acc) ->
+ TabInfo = {Tab,_} =
+ case TabInfo0 of
+ {_,_} -> TabInfo0;
+ TabN -> {TabN,Reason}
+ end,
+ case gb_sets:is_member(Tab, LocalTabs) of
+ true ->
+ case ?catch_val({Tab, where_to_read}) == node() of
+ true -> Acc;
+ false ->
+ case gb_trees:is_defined(Tab,LQ) of
+ true -> Acc;
+ false -> [TabInfo | Acc]
+ end
+ end;
+ false -> Acc
+ end
+ end,
+
+ Tabs = lists:foldl(Filter, [], TabsR),
+
+ Nodes = val({current, db_nodes}),
+ LateQueue = late_loaders(Tabs, RemoteLoaders, Nodes, LLQ),
+ State#state{late_loader_queue = LateQueue}.
+
+late_loaders([{Tab, Reason} | Tabs], RemoteLoaders, Nodes, LLQ) ->
+ case gb_trees:is_defined(Tab, LLQ) of
+ false ->
+ LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []),
+ case LoadNodes of
+ [] -> cast({disc_load, Tab, Reason}); % Ugly cast
+ _ -> ignore
+ end,
+ LateLoad = #late_load{table=Tab,loaders=LoadNodes,reason=Reason},
+ late_loaders(Tabs, RemoteLoaders, Nodes, gb_trees:insert(Tab,LateLoad,LLQ));
+ true ->
+ late_loaders(Tabs, RemoteLoaders, Nodes, LLQ)
+ end;
+late_loaders([], _RemoteLoaders, _Nodes, LLQ) ->
+ LLQ.
+
+late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) ->
+ late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
+late_load_filter([{badrpc, _} | RemoteLoaders], Tab, Nodes, Acc) ->
+ late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
+late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) ->
+ {ok, Node, Intents} = RL,
+ Access = val({Tab, access_mode}),
+ LocalC = val({Tab, local_content}),
+ StillActive = lists:member(Node, Nodes),
+ RemoteIntent = lists:member(Tab, Intents),
+ if
+ Access == read_write,
+ LocalC == false,
+ StillActive == true,
+ RemoteIntent == true ->
+ Masters = mnesia_recover:get_master_nodes(Tab),
+ case lists:member(Node, Masters) of
+ true ->
+ %% The other node is master node for
+ %% the table, accept his load intent
+ late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);
+ false when Masters == [] ->
+ %% The table has no master nodes
+ %% accept his load intent
+ late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);
+ false ->
+ %% Some one else is master node for
+ %% the table, ignore his load intent
+ late_load_filter(RemoteLoaders, Tab, Nodes, Acc)
+ end;
+ true ->
+ late_load_filter(RemoteLoaders, Tab, Nodes, Acc)
+ end;
+late_load_filter([], _Tab, _Nodes, Acc) ->
+ Acc.
+
+%%----------------------------------------------------------------------
+%% Func: handle_cast/2
+%% Returns: {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State} (terminate/2 is called)
+%%----------------------------------------------------------------------
+
+handle_cast({release_schema_commit_lock, _Owner}, State) ->
+ if
+ State#state.is_stopping == true ->
+ {stop, shutdown, State};
+ true ->
+ case State#state.dumper_queue of
+ [#schema_commit_lock{}|Rest] ->
+ [_Worker | Rest] = State#state.dumper_queue,
+ State2 = State#state{dumper_pid = undefined,
+ dumper_queue = Rest},
+ State3 = opt_start_worker(State2),
+ noreply(State3);
+ _ ->
+ noreply(State)
+ end
+ end;
+
+handle_cast(unblock_controller, State) ->
+ if
+ State#state.is_stopping == true ->
+ {stop, shutdown, State};
+ is_record(hd(State#state.dumper_queue), block_controller) ->
+ [_Worker | Rest] = State#state.dumper_queue,
+ State2 = State#state{dumper_pid = undefined,
+ dumper_queue = Rest},
+ State3 = opt_start_worker(State2),
+ noreply(State3)
+ end;
+
+handle_cast({mnesia_down, Node}, State) ->
+ maybe_log_mnesia_down(Node),
+ mnesia_lib:del({current, db_nodes}, Node),
+ mnesia_checkpoint:tm_mnesia_down(Node),
+ Alltabs = val({schema, tables}),
+ reconfigure_tables(Node, Alltabs),
+ %% Done from (external point of view)
+ mnesia_monitor:mnesia_down(?SERVER_NAME, Node),
+
+ %% Fix if we are late_merging against the node that went down
+ case State#state.schema_is_merged of
+ {false, Node} ->
+ spawn(?MODULE, call, [{schema_is_merged, [], late_merge, []}]);
+ _ ->
+ ignore
+ end,
+
+ %% Fix internal stuff
+ LateQ = remove_loaders(Alltabs, Node, State#state.late_loader_queue),
+
+ case get_senders(State) ++ get_loaders(State) of
+ [] -> ignore;
+ Senders ->
+ lists:foreach(fun({Pid,_}) -> Pid ! {copier_done, Node} end,
+ Senders)
+ end,
+ lists:foreach(fun(Pid) -> Pid ! {copier_done,Node} end,
+ State#state.others),
+
+ Remove = fun(ST) ->
+ node(ST#send_table.receiver_pid) /= Node
+ end,
+ NewSenders = lists:filter(Remove, State#state.sender_queue),
+ Early = remove_early_messages(State#state.early_msgs, Node),
+ noreply(State#state{sender_queue = NewSenders,
+ early_msgs = Early,
+ late_loader_queue = LateQ
+ });
+
+handle_cast({merging_schema, Node}, State) ->
+ case State#state.schema_is_merged of
+ false ->
+ %% This comes from dynamic connect_nodes which are made
+ %% after mnesia:start() and the schema_merge.
+ ImANewKidInTheBlock =
+ (val({schema, storage_type}) == ram_copies)
+ andalso (mnesia_lib:val({schema, local_tables}) == [schema]),
+ case ImANewKidInTheBlock of
+ true -> %% I'm newly started ram_node..
+ noreply(State#state{schema_is_merged = {false, Node}});
+ false ->
+ noreply(State)
+ end;
+ _ -> %% Already merging schema.
+ noreply(State)
+ end;
+
+handle_cast(Msg, State) when State#state.schema_is_merged /= true ->
+ %% Buffer early messages
+ Msgs = State#state.early_msgs,
+ noreply(State#state{early_msgs = [{cast, Msg} | Msgs]});
+
+%% This must be done after schema_is_merged otherwise adopt_orphan
+%% might trigger a table load from wrong nodes as a result of that we don't
+%% know which tables we can load safly first.
+handle_cast({im_running, _Node, NewFriends}, State) ->
+ LocalTabs = mnesia_lib:local_active_tables() -- [schema],
+ RemoveLocalOnly = fun(Tab) -> not val({Tab, local_content}) end,
+ Tabs = lists:filter(RemoveLocalOnly, LocalTabs),
+ Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})),
+ abcast(Ns, {adopt_orphans, node(), Tabs}),
+ noreply(State);
+
+handle_cast({disc_load, Tab, Reason}, State) ->
+ Worker = #disc_load{table = Tab, reason = Reason},
+ State2 = add_worker(Worker, State),
+ noreply(State2);
+
+handle_cast(Worker = #send_table{}, State) ->
+ State2 = add_worker(Worker, State),
+ noreply(State2);
+
+handle_cast({sync_tabs, Tabs, From}, State) ->
+ %% user initiated wait_for_tables
+ handle_sync_tabs(Tabs, From),
+ noreply(State);
+
+handle_cast({i_have_tab, Tab, Node}, State) ->
+ case lists:member(Node, val({current, db_nodes})) of
+ true ->
+ State2 = node_has_tabs([Tab], Node, State),
+ noreply(State2);
+ false ->
+ noreply(State)
+ end;
+
+handle_cast({force_load_updated, Tab}, State) ->
+ case val({Tab, active_replicas}) of
+ [] ->
+ %% No valid replicas
+ noreply(State);
+ [SomeNode | _] ->
+ State2 = node_has_tabs([Tab], SomeNode, State),
+ noreply(State2)
+ end;
+
+handle_cast({master_nodes_updated, Tab, Masters}, State) ->
+ Active = val({Tab, active_replicas}),
+ Valid =
+ case val({Tab, load_by_force}) of
+ true ->
+ Active;
+ false ->
+ if
+ Masters == [] ->
+ Active;
+ true ->
+ mnesia_lib:intersect(Masters, Active)
+ end
+ end,
+ case Valid of
+ [] ->
+ %% No valid replicas
+ noreply(State);
+ [SomeNode | _] ->
+ State2 = node_has_tabs([Tab], SomeNode, State),
+ noreply(State2)
+ end;
+
+handle_cast({adopt_orphans, Node, Tabs}, State) ->
+
+ State2 = node_has_tabs(Tabs, Node, State),
+
+ %% Register the other node as up and running
+ mnesia_recover:log_mnesia_up(Node),
+ verbose("Logging mnesia_up ~w~n",[Node]),
+ mnesia_lib:report_system_event({mnesia_up, Node}),
+
+ %% Load orphan tables
+ LocalTabs = val({schema, local_tables}) -- [schema],
+ Nodes = val({current, db_nodes}),
+ {LocalOrphans, RemoteMasters} =
+ orphan_tables(LocalTabs, Node, Nodes, [], []),
+ Reason = {adopt_orphan, node()},
+ mnesia_late_loader:async_late_disc_load(node(), LocalOrphans, Reason),
+
+ Fun =
+ fun(N) ->
+ RemoteOrphans =
+ [Tab || {Tab, Ns} <- RemoteMasters,
+ lists:member(N, Ns)],
+ mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason)
+ end,
+ lists:foreach(Fun, Nodes),
+ noreply(State2);
+
+handle_cast(Msg, State) ->
+ error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]),
+ noreply(State).
+
+handle_sync_tabs([Tab | Tabs], From) ->
+ case val({Tab, where_to_read}) of
+ nowhere ->
+ case get({sync_tab, Tab}) of
+ undefined ->
+ put({sync_tab, Tab}, [From]);
+ Pids ->
+ put({sync_tab, Tab}, [From | Pids])
+ end;
+ _ ->
+ sync_reply(From, Tab)
+ end,
+ handle_sync_tabs(Tabs, From);
+handle_sync_tabs([], _From) ->
+ ok.
+
+%%----------------------------------------------------------------------
+%% Func: handle_info/2
+%% Returns: {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State} (terminate/2 is called)
+%%----------------------------------------------------------------------
+
+handle_info({async_dump_log, InitBy}, State) ->
+ Worker = #dump_log{initiated_by = InitBy},
+ State2 = add_worker(Worker, State),
+ noreply(State2);
+
+handle_info(#dumper_done{worker_pid=Pid, worker_res=Res}, State) ->
+ if
+ State#state.is_stopping == true ->
+ {stop, shutdown, State};
+ Res == dumped, Pid == State#state.dumper_pid ->
+ [Worker | Rest] = State#state.dumper_queue,
+ reply(Worker#dump_log.opt_reply_to, Res),
+ State2 = State#state{dumper_pid = undefined,
+ dumper_queue = Rest},
+ State3 = opt_start_worker(State2),
+ noreply(State3);
+ true ->
+ fatal("Dumper failed: ~p~n state: ~p~n", [Res, State]),
+ {stop, fatal, State}
+ end;
+
+handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) ->
+ LateQueue0 = State0#state.late_loader_queue,
+ State1 = State0#state{loader_pid = lists:keydelete(WPid,1,get_loaders(State0))},
+
+ State2 =
+ case Done#loader_done.is_loaded of
+ true ->
+ %% Optional table announcement
+ if
+ Done#loader_done.needs_announce == true,
+ Done#loader_done.needs_reply == true ->
+ i_have_tab(Tab),
+ %% Should be {dumper,add_table_copy} only
+ reply(Done#loader_done.reply_to,
+ Done#loader_done.reply);
+ Done#loader_done.needs_reply == true ->
+ %% Should be {dumper,add_table_copy} only
+ reply(Done#loader_done.reply_to,
+ Done#loader_done.reply);
+ Done#loader_done.needs_announce == true, Tab == schema ->
+ i_have_tab(Tab);
+ Done#loader_done.needs_announce == true ->
+ i_have_tab(Tab),
+ %% Local node needs to perform user_sync_tab/1
+ Ns = val({current, db_nodes}),
+ abcast(Ns, {i_have_tab, Tab, node()});
+ Tab == schema ->
+ ignore;
+ true ->
+ %% Local node needs to perform user_sync_tab/1
+ Ns = val({current, db_nodes}),
+ AlreadyKnows = val({Tab, active_replicas}),
+ abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()})
+ end,
+ %% Optional user sync
+ case Done#loader_done.needs_sync of
+ true -> user_sync_tab(Tab);
+ false -> ignore
+ end,
+ State1#state{late_loader_queue=gb_trees:delete_any(Tab, LateQueue0)};
+ false ->
+ %% Either the node went down or table was not
+ %% loaded remotly yet
+ case Done#loader_done.needs_reply of
+ true ->
+ reply(Done#loader_done.reply_to,
+ Done#loader_done.reply);
+ false ->
+ ignore
+ end,
+ case ?catch_val({Tab, active_replicas}) of
+ [_|_] -> % still available elsewhere
+ {value,{_,Worker}} = lists:keysearch(WPid,1,get_loaders(State0)),
+ add_loader(Tab,Worker,State1);
+ _ ->
+ State1
+ end
+ end,
+ State3 = opt_start_worker(State2),
+ noreply(State3);
+
+handle_info(#sender_done{worker_pid=Pid, worker_res=Res}, State) ->
+ Senders = get_senders(State),
+ {value, {Pid,_Worker}} = lists:keysearch(Pid, 1, Senders),
+ if
+ Res == ok ->
+ State2 = State#state{sender_pid = lists:keydelete(Pid, 1, Senders)},
+ State3 = opt_start_worker(State2),
+ noreply(State3);
+ true ->
+ %% No need to send any message to the table receiver
+ %% since it will soon get a mnesia_down anyway
+ fatal("Sender failed: ~p~n state: ~p~n", [Res, State]),
+ {stop, fatal, State}
+ end;
+
+handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor ->
+ catch set(mnesia_status, stopping),
+ case State#state.dumper_pid of
+ undefined ->
+ dbg_out("~p was ~p~n", [?SERVER_NAME, R]),
+ {stop, shutdown, State};
+ _ ->
+ noreply(State#state{is_stopping = true})
+ end;
+
+handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid ->
+ case State#state.dumper_queue of
+ [#schema_commit_lock{}|Workers] -> %% Schema trans crashed or was killed
+ dbg_out("WARNING: Dumper ~p exited ~p~n", [Pid, R]),
+ State2 = State#state{dumper_queue = Workers, dumper_pid = undefined},
+ State3 = opt_start_worker(State2),
+ noreply(State3);
+ _Other ->
+ fatal("Dumper or schema commit crashed: ~p~n state: ~p~n", [R, State]),
+ {stop, fatal, State}
+ end;
+
+handle_info(Msg = {'EXIT', Pid, R}, State) when R /= wait_for_tables_timeout ->
+ case lists:keymember(Pid, 1, get_senders(State)) of
+ true ->
+ %% No need to send any message to the table receiver
+ %% since it will soon get a mnesia_down anyway
+ fatal("Sender crashed: ~p~n state: ~p~n", [{Pid,R}, State]),
+ {stop, fatal, State};
+ false ->
+ case lists:keymember(Pid, 1, get_loaders(State)) of
+ true ->
+ fatal("Loader crashed: ~p~n state: ~p~n", [R, State]),
+ {stop, fatal, State};
+ false ->
+ error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]),
+ noreply(State)
+ end
+ end;
+
+handle_info({From, get_state}, State) ->
+ From ! {?SERVER_NAME, State},
+ noreply(State);
+
+%% No real need for buffering
+handle_info(Msg, State) when State#state.schema_is_merged /= true ->
+ %% Buffer early messages
+ Msgs = State#state.early_msgs,
+ noreply(State#state{early_msgs = [{info, Msg} | Msgs]});
+
+handle_info({'EXIT', Pid, wait_for_tables_timeout}, State) ->
+ sync_tab_timeout(Pid, get()),
+ noreply(State);
+
+handle_info(Msg, State) ->
+ error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]),
+ noreply(State).
+
+sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) ->
+ case lists:delete(Pid, Pids) of
+ [] ->
+ erase({sync_tab, Tab});
+ Pids2 ->
+ put({sync_tab, Tab}, Pids2)
+ end,
+ sync_tab_timeout(Pid, Tail);
+sync_tab_timeout(Pid, [_ | Tail]) ->
+ sync_tab_timeout(Pid, Tail);
+sync_tab_timeout(_Pid, []) ->
+ ok.
+
+%% Pick the load record that has the highest load order
+%% Returns {BestLoad, RemainingQueue} or {none, []} if queue is empty
+pick_next(Queue) ->
+ List = gb_trees:values(Queue),
+ case pick_next(List, none, none) of
+ none -> {none, gb_trees:empty()};
+ {Tab, Worker} -> {Worker, gb_trees:delete(Tab,Queue)}
+ end.
+
+pick_next([Head = #net_load{table=Tab}| Tail], Load, Order) ->
+ select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);
+pick_next([Head = #disc_load{table=Tab}| Tail], Load, Order) ->
+ select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);
+pick_next([], none, _Order) ->
+ none;
+pick_next([], Load, _Order) ->
+ {element(2,Load), Load}.
+
+select_best(_Head, Tail, {'EXIT', _WHAT}, Load, Order) ->
+ %% Table have been deleted drop it.
+ pick_next(Tail, Load, Order);
+select_best(Load, Tail, Order, none, none) ->
+ pick_next(Tail, Load, Order);
+select_best(Load, Tail, Order, _OldLoad, OldOrder) when Order > OldOrder ->
+ pick_next(Tail, Load, Order);
+select_best(_Load, Tail, _Order, OldLoad, OldOrder) ->
+ pick_next(Tail, OldLoad, OldOrder).
+
+%%----------------------------------------------------------------------
+%% Func: terminate/2
+%% Purpose: Shutdown the server
+%% Returns: any (ignored by gen_server)
+%%----------------------------------------------------------------------
+terminate(Reason, State) ->
+ mnesia_monitor:terminate_proc(?SERVER_NAME, Reason, State).
+
+%%----------------------------------------------------------------------
+%% Func: code_change/3
+%% Purpose: Upgrade process when its code is to be changed
+%% Returns: {ok, NewState}
+%%----------------------------------------------------------------------
+code_change(_OldVsn, State0, _Extra) ->
+ %% Loader Queue
+ State1 = case State0#state.loader_pid of
+ Pids when is_list(Pids) -> State0;
+ undefined -> State0#state{loader_pid = [],loader_queue=gb_trees:empty()};
+ Pid when is_pid(Pid) ->
+ [Loader|Rest] = State0#state.loader_queue,
+ LQ0 = [{element(2,Rec),Rec} || Rec <- Rest],
+ LQ1 = lists:sort(LQ0),
+ LQ = gb_trees:from_orddict(LQ1),
+ State0#state{loader_pid=[{Pid,Loader}], loader_queue=LQ}
+ end,
+ %% LateLoaderQueue
+ State = if is_list(State1#state.late_loader_queue) ->
+ LLQ0 = State1#state.late_loader_queue,
+ LLQ1 = lists:sort([{element(2,Rec),Rec} || Rec <- LLQ0]),
+ LLQ = gb_trees:from_orddict(LLQ1),
+ State1#state{late_loader_queue=LLQ};
+ true ->
+ State1
+ end,
+ {ok, State}.
+
+%%%----------------------------------------------------------------------
+%%% Internal functions
+%%%----------------------------------------------------------------------
+
+maybe_log_mnesia_down(N) ->
+ %% We use mnesia_down when deciding which tables to load locally,
+ %% so if we are not running (i.e haven't decided which tables
+ %% to load locally), don't log mnesia_down yet.
+ case mnesia_lib:is_running() of
+ yes ->
+ verbose("Logging mnesia_down ~w~n", [N]),
+ mnesia_recover:log_mnesia_down(N),
+ ok;
+ _ ->
+ Filter = fun(Tab) ->
+ inactive_copy_holders(Tab, N)
+ end,
+ HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]),
+ if
+ HalfLoadedTabs == true ->
+ verbose("Logging mnesia_down ~w~n", [N]),
+ mnesia_recover:log_mnesia_down(N),
+ ok;
+ true ->
+ %% Unfortunately we have not loaded some common
+ %% tables yet, so we cannot rely on the nodedown
+ log_later %% BUGBUG handle this case!!!
+ end
+ end.
+
+inactive_copy_holders(Tab, Node) ->
+ Cs = val({Tab, cstruct}),
+ case mnesia_lib:cs_to_storage_type(Node, Cs) of
+ unknown ->
+ false;
+ _Storage ->
+ mnesia_lib:not_active_here(Tab)
+ end.
+
+orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) ->
+ Cs = val({Tab, cstruct}),
+ CopyHolders = mnesia_lib:copy_holders(Cs),
+ RamCopyHolders = Cs#cstruct.ram_copies,
+ DiscCopyHolders = CopyHolders -- RamCopyHolders,
+ DiscNodes = val({schema, disc_copies}),
+ LocalContent = Cs#cstruct.local_content,
+ RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes),
+ Active = val({Tab, active_replicas}),
+ BeingCreated = (?catch_val({Tab, create_table}) == true),
+ Read = val({Tab, where_to_read}),
+ case lists:member(Node, DiscCopyHolders) of
+ _ when BeingCreated == true ->
+ orphan_tables(Tabs, Node, Ns, Local, Remote);
+ _ when Read == node() -> %% Allready loaded
+ orphan_tables(Tabs, Node, Ns, Local, Remote);
+ true when Active == [] ->
+ case DiscCopyHolders -- Ns of
+ [] ->
+ %% We're last up and the other nodes have not
+ %% loaded the table. Lets load it if we are
+ %% the smallest node.
+ case lists:min(DiscCopyHolders) of
+ Min when Min == node() ->
+ case mnesia_recover:get_master_nodes(Tab) of
+ [] ->
+ L = [Tab | Local],
+ orphan_tables(Tabs, Node, Ns, L, Remote);
+ Masters ->
+ R = [{Tab, Masters} | Remote],
+ orphan_tables(Tabs, Node, Ns, Local, R)
+ end;
+ _ ->
+ orphan_tables(Tabs, Node, Ns, Local, Remote)
+ end;
+ _ ->
+ orphan_tables(Tabs, Node, Ns, Local, Remote)
+ end;
+ false when Active == [], DiscCopyHolders == [], RamCopyHoldersOnDiscNodes == [] ->
+ %% Special case when all replicas resides on disc less nodes
+ orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);
+ _ when LocalContent == true ->
+ orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);
+ _ ->
+ orphan_tables(Tabs, Node, Ns, Local, Remote)
+ end;
+orphan_tables([], _, _, LocalOrphans, RemoteMasters) ->
+ {LocalOrphans, RemoteMasters}.
+
+node_has_tabs([Tab | Tabs], Node, State) when Node /= node() ->
+ State2 =
+ case catch update_whereabouts(Tab, Node, State) of
+ State1 = #state{} -> State1;
+ {'EXIT', R} -> %% Tab was just deleted?
+ case ?catch_val({Tab, cstruct}) of
+ {'EXIT', _} -> State; % yes
+ _ -> erlang:error(R)
+ end
+ end,
+ node_has_tabs(Tabs, Node, State2);
+node_has_tabs([Tab | Tabs], Node, State) ->
+ user_sync_tab(Tab),
+ node_has_tabs(Tabs, Node, State);
+node_has_tabs([], _Node, State) ->
+ State.
+
+update_whereabouts(Tab, Node, State) ->
+ Storage = val({Tab, storage_type}),
+ Read = val({Tab, where_to_read}),
+ LocalC = val({Tab, local_content}),
+ BeingCreated = (?catch_val({Tab, create_table}) == true),
+ Masters = mnesia_recover:get_master_nodes(Tab),
+ ByForce = val({Tab, load_by_force}),
+ GoGetIt =
+ if
+ ByForce == true ->
+ true;
+ Masters == [] ->
+ true;
+ true ->
+ lists:member(Node, Masters)
+ end,
+
+ dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n",
+ [Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]),
+ if
+ LocalC == true ->
+ %% Local contents, don't care about other node
+ State;
+ BeingCreated == true ->
+ %% The table is currently being created
+ %% It will be handled elsewhere
+ State;
+ Storage == unknown, Read == nowhere ->
+ %% No own copy, time to read remotely
+ %% if the other node is a good node
+ add_active_replica(Tab, Node),
+ case GoGetIt of
+ true ->
+ set({Tab, where_to_read}, Node),
+ user_sync_tab(Tab),
+ State;
+ false ->
+ State
+ end;
+ Storage == unknown ->
+ %% No own copy, continue to read remotely
+ add_active_replica(Tab, Node),
+ NodeST = mnesia_lib:storage_type_at_node(Node, Tab),
+ ReadST = mnesia_lib:storage_type_at_node(Read, Tab),
+ if %% Avoid reading from disc_only_copies
+ NodeST == disc_only_copies ->
+ ignore;
+ ReadST == disc_only_copies ->
+ mnesia_lib:set_remote_where_to_read(Tab);
+ true ->
+ ignore
+ end,
+ user_sync_tab(Tab),
+ State;
+ Read == nowhere ->
+ %% Own copy, go and get a copy of the table
+ %% if the other node is master or if there
+ %% are no master at all
+ add_active_replica(Tab, Node),
+ case GoGetIt of
+ true ->
+ Worker = #net_load{table = Tab,
+ reason = {active_remote, Node}},
+ add_worker(Worker, State);
+ false ->
+ State
+ end;
+ true ->
+ %% We already have an own copy
+ add_active_replica(Tab, Node),
+ user_sync_tab(Tab),
+ State
+ end.
+
+initial_safe_loads() ->
+ case val({schema, storage_type}) of
+ ram_copies ->
+ Downs = [],
+ Tabs = val({schema, local_tables}) -- [schema],
+ LastC = fun(T) -> last_consistent_replica(T, Downs) end,
+ lists:zf(LastC, Tabs);
+
+ disc_copies ->
+ Downs = mnesia_recover:get_mnesia_downs(),
+ dbg_out("mnesia_downs = ~p~n", [Downs]),
+
+ Tabs = val({schema, local_tables}) -- [schema],
+ LastC = fun(T) -> last_consistent_replica(T, Downs) end,
+ lists:zf(LastC, Tabs)
+ end.
+
+last_consistent_replica(Tab, Downs) ->
+ Cs = val({Tab, cstruct}),
+ Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
+ Ram = Cs#cstruct.ram_copies,
+ Disc = Cs#cstruct.disc_copies,
+ DiscOnly = Cs#cstruct.disc_only_copies,
+ BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs,
+ BetterCopies = BetterCopies0 -- Ram,
+ AccessMode = Cs#cstruct.access_mode,
+ Copies = mnesia_lib:copy_holders(Cs),
+ Masters = mnesia_recover:get_master_nodes(Tab),
+ LocalMaster0 = lists:member(node(), Masters),
+ LocalContent = Cs#cstruct.local_content,
+ RemoteMaster =
+ if
+ Masters == [] -> false;
+ true -> not LocalMaster0
+ end,
+ LocalMaster =
+ if
+ Masters == [] -> false;
+ true -> LocalMaster0
+ end,
+ if
+ Copies == [node()] ->
+ %% Only one copy holder and it is local.
+ %% It may also be a local contents table
+ {true, {Tab, local_only}};
+ LocalContent == true ->
+ {true, {Tab, local_content}};
+ LocalMaster == true ->
+ %% We have a local master
+ {true, {Tab, local_master}};
+ RemoteMaster == true ->
+ %% Wait for remote master copy
+ false;
+ Storage == ram_copies ->
+ if
+ Disc == [], DiscOnly == [] ->
+ %% Nobody has copy on disc
+ {true, {Tab, ram_only}};
+ true ->
+ %% Some other node has copy on disc
+ false
+ end;
+ AccessMode == read_only ->
+ %% No one has been able to update the table,
+ %% i.e. all disc resident copies are equal
+ {true, {Tab, read_only}};
+ BetterCopies /= [], Masters /= [node()] ->
+ %% There are better copies on other nodes
+ %% and we do not have the only master copy
+ false;
+ true ->
+ {true, {Tab, initial}}
+ end.
+
+reconfigure_tables(N, [Tab |Tail]) ->
+ del_active_replica(Tab, N),
+ case val({Tab, where_to_read}) of
+ N -> mnesia_lib:set_remote_where_to_read(Tab);
+ _ -> ignore
+ end,
+ reconfigure_tables(N, Tail);
+reconfigure_tables(_, []) ->
+ ok.
+
+remove_loaders([Tab| Tabs], N, Loaders) ->
+ LateQ = drop_loaders(Tab, N, Loaders),
+ remove_loaders(Tabs, N, LateQ);
+remove_loaders([],_, LateQ) -> LateQ.
+
+remove_early_messages([], _Node) ->
+ [];
+remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) ->
+ remove_early_messages(R, Node); %% Does a reply before queuing
+remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node)
+ when node(From) == Node ->
+ reply(ReplyTo, ok), %% Remove gen:server waits..
+ remove_early_messages(R, Node);
+remove_early_messages([{cast, {i_have_tab, _Tab, Node}}|R], Node) ->
+ remove_early_messages(R, Node);
+remove_early_messages([{cast, {adopt_orphans, Node, _Tabs}}|R], Node) ->
+ remove_early_messages(R, Node);
+remove_early_messages([M|R],Node) ->
+ [M|remove_early_messages(R,Node)].
+
+%% Drop loader from late load queue and possibly trigger a disc_load
+drop_loaders(Tab, Node, LLQ) ->
+ case gb_trees:lookup(Tab,LLQ) of
+ none ->
+ LLQ;
+ {value, H} ->
+ %% Check if it is time to issue a disc_load request
+ case H#late_load.loaders of
+ [Node] ->
+ Reason = {H#late_load.reason, last_loader_down, Node},
+ cast({disc_load, Tab, Reason}); % Ugly cast
+ _ ->
+ ignore
+ end,
+ %% Drop the node from the list of loaders
+ H2 = H#late_load{loaders = H#late_load.loaders -- [Node]},
+ gb_trees:update(Tab, H2, LLQ)
+ end.
+
+add_active_replica(Tab, Node) ->
+ add_active_replica(Tab, Node, val({Tab, cstruct})).
+
+add_active_replica(Tab, Node, Cs = #cstruct{}) ->
+ Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs),
+ AccessMode = Cs#cstruct.access_mode,
+ add_active_replica(Tab, Node, Storage, AccessMode).
+
+%% Block table primitives
+
+block_table(Tab) ->
+ Var = {Tab, where_to_commit},
+ Old = val(Var),
+ New = {blocked, Old},
+ set(Var, New). % where_to_commit
+
+unblock_table(Tab) ->
+ call({unblock_table, Tab}).
+
+is_tab_blocked(W2C) when is_list(W2C) ->
+ {false, W2C};
+is_tab_blocked({blocked, W2C}) when is_list(W2C) ->
+ {true, W2C}.
+
+mark_blocked_tab(true, Value) ->
+ {blocked, Value};
+mark_blocked_tab(false, Value) ->
+ Value.
+
+%%
+
+add_active_replica(Tab, Node, Storage, AccessMode) ->
+ Var = {Tab, where_to_commit},
+ {Blocked, Old} = is_tab_blocked(val(Var)),
+ Del = lists:keydelete(Node, 1, Old),
+ case AccessMode of
+ read_write ->
+ New = lists:sort([{Node, Storage} | Del]),
+ set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit
+ mnesia_lib:add_lsort({Tab, where_to_write}, Node);
+ read_only ->
+ set(Var, mark_blocked_tab(Blocked, Del)),
+ mnesia_lib:del({Tab, where_to_write}, Node)
+ end,
+ add({Tab, active_replicas}, Node).
+
+del_active_replica(Tab, Node) ->
+ Var = {Tab, where_to_commit},
+ {Blocked, Old} = is_tab_blocked(val(Var)),
+ Del = lists:keydelete(Node, 1, Old),
+ New = lists:sort(Del),
+ set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit
+ mnesia_lib:del({Tab, active_replicas}, Node),
+ mnesia_lib:del({Tab, where_to_write}, Node).
+
+change_table_access_mode(Cs) ->
+ W = fun() ->
+ Tab = Cs#cstruct.name,
+ lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end,
+ val({Tab, active_replicas}))
+ end,
+ update(W).
+
+
+%% node To now has tab loaded, but this must be undone
+%% This code is rpc:call'ed from the tab_copier process
+%% when it has *not* released it's table lock
+unannounce_add_table_copy(Tab, To) ->
+ catch del_active_replica(Tab, To),
+ case catch val({Tab , where_to_read}) of
+ To ->
+ mnesia_lib:set_remote_where_to_read(Tab);
+ _ ->
+ ignore
+ end.
+
+user_sync_tab(Tab) ->
+ case val(debug) of
+ trace ->
+ mnesia_subscr:subscribe(whereis(mnesia_event), {table, Tab});
+ _ ->
+ ignore
+ end,
+
+ case erase({sync_tab, Tab}) of
+ undefined ->
+ ok;
+ Pids ->
+ lists:foreach(fun(Pid) -> sync_reply(Pid, Tab) end, Pids)
+ end.
+
+i_have_tab(Tab) ->
+ case val({Tab, local_content}) of
+ true ->
+ mnesia_lib:set_local_content_whereabouts(Tab);
+ false ->
+ set({Tab, where_to_read}, node())
+ end,
+ add_active_replica(Tab, node()).
+
+sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema ->
+ Current = val({current, db_nodes}),
+ Ns =
+ case lists:member(ToNode, Current) of
+ true -> Current -- [ToNode];
+ false -> Current
+ end,
+ remote_call(ToNode, block_table, [Tab]),
+ [remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) ||
+ Node <- [ToNode | Ns]],
+ ok.
+
+sync_del_table_copy_whereabouts(Tab, ToNode) when Tab /= schema ->
+ Current = val({current, db_nodes}),
+ Ns =
+ case lists:member(ToNode, Current) of
+ true -> Current;
+ false -> [ToNode | Current]
+ end,
+ Args = [Tab, ToNode],
+ [remote_call(Node, unannounce_add_table_copy, Args) || Node <- Ns],
+ ok.
+
+get_info(Timeout) ->
+ case whereis(?SERVER_NAME) of
+ undefined ->
+ {timeout, Timeout};
+ Pid ->
+ Pid ! {self(), get_state},
+ receive
+ {?SERVER_NAME, State = #state{loader_queue=LQ,late_loader_queue=LLQ}} ->
+ {info,State#state{loader_queue=gb_trees:to_list(LQ),
+ late_loader_queue=gb_trees:to_list(LLQ)}}
+ after Timeout ->
+ {timeout, Timeout}
+ end
+ end.
+
+get_workers(Timeout) ->
+ case whereis(?SERVER_NAME) of
+ undefined ->
+ {timeout, Timeout};
+ Pid ->
+ Pid ! {self(), get_state},
+ receive
+ {?SERVER_NAME, State = #state{}} ->
+ {workers, get_loaders(State), get_senders(State), State#state.dumper_pid}
+ after Timeout ->
+ {timeout, Timeout}
+ end
+ end.
+
+info() ->
+ Tabs = mnesia_lib:local_active_tables(),
+ io:format( "---> Active tables <--- ~n", []),
+ info(Tabs).
+
+info([Tab | Tail]) ->
+ case val({Tab, storage_type}) of
+ disc_only_copies ->
+ info_format(Tab,
+ dets:info(Tab, size),
+ dets:info(Tab, file_size),
+ "bytes on disc");
+ _ ->
+ info_format(Tab,
+ ?ets_info(Tab, size),
+ ?ets_info(Tab, memory),
+ "words of mem")
+ end,
+ info(Tail);
+info([]) -> ok.
+
+
+info_format(Tab, Size, Mem, Media) ->
+ StrT = mnesia_lib:pad_name(atom_to_list(Tab), 15, []),
+ StrS = mnesia_lib:pad_name(integer_to_list(Size), 8, []),
+ StrM = mnesia_lib:pad_name(integer_to_list(Mem), 8, []),
+ io:format("~s: with ~s records occupying ~s ~s~n",
+ [StrT, StrS, StrM, Media]).
+
+%% Handle early arrived messages
+handle_early_msgs([Msg | Msgs], State) ->
+ %% The messages are in reverse order
+ case handle_early_msg(Msg, State) of
+%% {stop, Reason, Reply, State2} -> % Will not happen according to dialyzer
+%% {stop, Reason, Reply, State2};
+ {stop, Reason, State2} ->
+ {stop, Reason, State2};
+ {noreply, State2} ->
+ handle_early_msgs(Msgs, State2);
+ {reply, Reply, State2} ->
+ {call, _Call, From} = Msg,
+ reply(From, Reply),
+ handle_early_msgs(Msgs, State2)
+ end;
+handle_early_msgs([], State) ->
+ noreply(State).
+
+handle_early_msg({call, Msg, From}, State) ->
+ handle_call(Msg, From, State);
+handle_early_msg({cast, Msg}, State) ->
+ handle_cast(Msg, State);
+handle_early_msg({info, Msg}, State) ->
+ handle_info(Msg, State).
+
+noreply(State) ->
+ {noreply, State}.
+
+reply(undefined, Reply) ->
+ Reply;
+reply(ReplyTo, Reply) ->
+ gen_server:reply(ReplyTo, Reply),
+ Reply.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Worker management
+
+%% Returns new State
+add_worker(Worker = #dump_log{}, State) ->
+ InitBy = Worker#dump_log.initiated_by,
+ Queue = State#state.dumper_queue,
+ case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of
+ true when Worker#dump_log.opt_reply_to == undefined ->
+ %% The same threshold has been exceeded again,
+ %% before we have had the possibility to
+ %% process the older one.
+ DetectedBy = {dump_log, InitBy},
+ Event = {mnesia_overload, DetectedBy},
+ mnesia_lib:report_system_event(Event);
+ _ ->
+ ignore
+ end,
+ Queue2 = Queue ++ [Worker],
+ State2 = State#state{dumper_queue = Queue2},
+ opt_start_worker(State2);
+add_worker(Worker = #schema_commit_lock{}, State) ->
+ Queue = State#state.dumper_queue,
+ Queue2 = Queue ++ [Worker],
+ State2 = State#state{dumper_queue = Queue2},
+ opt_start_worker(State2);
+add_worker(Worker = #net_load{}, State) ->
+ opt_start_worker(add_loader(Worker#net_load.table,Worker,State));
+add_worker(Worker = #send_table{}, State) ->
+ Queue = State#state.sender_queue,
+ State2 = State#state{sender_queue = Queue ++ [Worker]},
+ opt_start_worker(State2);
+add_worker(Worker = #disc_load{}, State) ->
+ opt_start_worker(add_loader(Worker#disc_load.table,Worker,State));
+% Block controller should be used for upgrading mnesia.
+add_worker(Worker = #block_controller{}, State) ->
+ Queue = State#state.dumper_queue,
+ Queue2 = [Worker | Queue],
+ State2 = State#state{dumper_queue = Queue2},
+ opt_start_worker(State2).
+
+add_loader(Tab,Worker,State = #state{loader_queue=LQ0}) ->
+ case gb_trees:is_defined(Tab, LQ0) of
+ true -> State;
+ false ->
+ LQ=gb_trees:insert(Tab, Worker, LQ0),
+ State#state{loader_queue=LQ}
+ end.
+
+%% Optionally start a worker
+%%
+%% Dumpers and loaders may run simultaneously
+%% but neither of them may run during schema commit.
+%% Loaders may not start if a schema commit is enqueued.
+opt_start_worker(State) when State#state.is_stopping == true ->
+ State;
+opt_start_worker(State) ->
+ %% Prioritize dumper and schema commit
+ %% by checking them first
+ case State#state.dumper_queue of
+ [Worker | _Rest] when State#state.dumper_pid == undefined ->
+ %% Great, a worker in queue and neither
+ %% a schema transaction is being
+ %% committed and nor a dumper is running
+
+ %% Start worker but keep him in the queue
+ if
+ is_record(Worker, schema_commit_lock) ->
+ ReplyTo = Worker#schema_commit_lock.owner,
+ reply(ReplyTo, granted),
+ {Owner, _Tag} = ReplyTo,
+ opt_start_loader(State#state{dumper_pid = Owner});
+
+ is_record(Worker, dump_log) ->
+ Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]),
+ State2 = State#state{dumper_pid = Pid},
+
+ %% If the worker was a dumper we may
+ %% possibly be able to start a loader
+ %% or sender
+ State3 = opt_start_sender(State2),
+ opt_start_loader(State3);
+
+ is_record(Worker, block_controller) ->
+ case {get_senders(State), get_loaders(State)} of
+ {[], []} ->
+ ReplyTo = Worker#block_controller.owner,
+ reply(ReplyTo, granted),
+ {Owner, _Tag} = ReplyTo,
+ State#state{dumper_pid = Owner};
+ _ ->
+ State
+ end
+ end;
+ _ ->
+ %% Bad luck, try with a loader or sender instead
+ State2 = opt_start_sender(State),
+ opt_start_loader(State2)
+ end.
+
+opt_start_sender(State) ->
+ case State#state.sender_queue of
+ []-> State; %% No need
+ SenderQ ->
+ {NewS,Kept} = opt_start_sender2(SenderQ, get_senders(State),
+ [], get_loaders(State)),
+ State#state{sender_pid = NewS, sender_queue = Kept}
+ end.
+
+opt_start_sender2([], Pids,Kept, _) -> {Pids,Kept};
+opt_start_sender2([Sender|R], Pids, Kept, LoaderQ) ->
+ Tab = Sender#send_table.table,
+ Active = val({Tab, active_replicas}),
+ IgotIt = lists:member(node(), Active),
+ IsLoading = lists:any(fun({_Pid,Loader}) ->
+ Tab == element(#net_load.table, Loader)
+ end, LoaderQ),
+ if
+ IgotIt, IsLoading ->
+ %% I'm currently finishing loading the table let him wait
+ opt_start_sender2(R,Pids, [Sender|Kept], LoaderQ);
+ IgotIt ->
+ %% Start worker but keep him in the queue
+ Pid = spawn_link(?MODULE, send_and_reply,[self(), Sender]),
+ opt_start_sender2(R,[{Pid,Sender}|Pids],Kept,LoaderQ);
+ true ->
+ verbose("Send table failed ~p not active on this node ~n", [Tab]),
+ Sender#send_table.receiver_pid ! {copier_done, node()},
+ opt_start_sender2(R,Pids, Kept, LoaderQ)
+ end.
+
+opt_start_loader(State = #state{loader_queue = LoaderQ}) ->
+ Current = get_loaders(State),
+ Max = max_loaders(),
+ case gb_trees:is_empty(LoaderQ) of
+ true ->
+ State;
+ _ when length(Current) >= Max ->
+ State;
+ false ->
+ SchemaQueue = State#state.dumper_queue,
+ case lists:keymember(schema_commit_lock, 1, SchemaQueue) of
+ false ->
+ case pick_next(LoaderQ) of
+ {none,Rest} ->
+ State#state{loader_queue=Rest};
+ {Worker,Rest} ->
+ case already_loading(Worker, get_loaders(State)) of
+ true ->
+ opt_start_loader(State#state{loader_queue = Rest});
+ false ->
+ %% Start worker but keep him in the queue
+ Pid = load_and_reply(self(), Worker),
+ State#state{loader_pid=[{Pid,Worker}|get_loaders(State)],
+ loader_queue = Rest}
+ end
+ end;
+ true ->
+ %% Bad luck, we must wait for the schema commit
+ State
+ end
+ end.
+
+already_loading(#net_load{table=Tab},Loaders) ->
+ already_loading2(Tab,Loaders);
+already_loading(#disc_load{table=Tab},Loaders) ->
+ already_loading2(Tab,Loaders).
+
+already_loading2(Tab, [{_,#net_load{table=Tab}}|_]) -> true;
+already_loading2(Tab, [{_,#disc_load{table=Tab}}|_]) -> true;
+already_loading2(Tab, [_|Rest]) -> already_loading2(Tab,Rest);
+already_loading2(_,[]) -> false.
+
+start_remote_sender(Node, Tab, Receiver, Storage) ->
+ Msg = #send_table{table = Tab,
+ receiver_pid = Receiver,
+ remote_storage = Storage},
+ gen_server:cast({?SERVER_NAME, Node}, Msg).
+
+dump_and_reply(ReplyTo, Worker) ->
+ %% No trap_exit, die intentionally instead
+ Res = mnesia_dumper:opt_dump_log(Worker#dump_log.initiated_by),
+ ReplyTo ! #dumper_done{worker_pid = self(),
+ worker_res = Res},
+ unlink(ReplyTo),
+ exit(normal).
+
+send_and_reply(ReplyTo, Worker) ->
+ %% No trap_exit, die intentionally instead
+ Res = mnesia_loader:send_table(Worker#send_table.receiver_pid,
+ Worker#send_table.table,
+ Worker#send_table.remote_storage),
+ ReplyTo ! #sender_done{worker_pid = self(),
+ worker_res = Res},
+ unlink(ReplyTo),
+ exit(normal).
+
+load_and_reply(ReplyTo, Worker) ->
+ Load = load_table_fun(Worker),
+ SendAndReply =
+ fun() ->
+ process_flag(trap_exit, true),
+ Done = Load(),
+ ReplyTo ! Done#loader_done{worker_pid = self()},
+ unlink(ReplyTo),
+ exit(normal)
+ end,
+ spawn_link(SendAndReply).
+
+%% Now it is time to load the table
+%% but first we must check if it still is neccessary
+load_table_fun(#net_load{cstruct=Cs, table=Tab, reason=Reason, opt_reply_to=ReplyTo}) ->
+ LocalC = val({Tab, local_content}),
+ AccessMode = val({Tab, access_mode}),
+ ReadNode = val({Tab, where_to_read}),
+ Active = filter_active(Tab),
+ Done = #loader_done{is_loaded = true,
+ table_name = Tab,
+ needs_announce = false,
+ needs_sync = false,
+ needs_reply = (ReplyTo /= undefined),
+ reply_to = ReplyTo,
+ reply = {loaded, ok}
+ },
+ if
+ ReadNode == node() ->
+ %% Already loaded locally
+ fun() -> Done end;
+ LocalC == true ->
+ fun() ->
+ Res = mnesia_loader:disc_load_table(Tab, load_local_content),
+ Done#loader_done{reply = Res, needs_announce = true, needs_sync = true}
+ end;
+ AccessMode == read_only, Reason /= {dumper,add_table_copy} ->
+ fun() -> disc_load_table(Tab, Reason, ReplyTo) end;
+ true ->
+ fun() ->
+ %% Either we cannot read the table yet
+ %% or someone is moving a replica between
+ %% two nodes
+ Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs),
+ case Res of
+ {loaded, ok} ->
+ Done#loader_done{needs_sync = true,
+ reply = Res};
+ {not_loaded, _} ->
+ Done#loader_done{is_loaded = false,
+ reply = Res}
+ end
+ end
+ end;
+load_table_fun(#disc_load{table=Tab, reason=Reason, opt_reply_to=ReplyTo}) ->
+ ReadNode = val({Tab, where_to_read}),
+ Active = filter_active(Tab),
+ Done = #loader_done{is_loaded = true,
+ table_name = Tab,
+ needs_announce = false,
+ needs_sync = false,
+ needs_reply = false
+ },
+ if
+ Active == [], ReadNode == nowhere ->
+ %% Not loaded anywhere, lets load it from disc
+ fun() -> disc_load_table(Tab, Reason, ReplyTo) end;
+ ReadNode == nowhere ->
+ %% Already loaded on other node, lets get it
+ Cs = val({Tab, cstruct}),
+ fun() ->
+ case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of
+ {loaded, ok} ->
+ Done#loader_done{needs_sync = true};
+ {not_loaded, storage_unknown} ->
+ Done#loader_done{is_loaded = false};
+ {not_loaded, ErrReason} ->
+ Done#loader_done{is_loaded = false,
+ reply = {not_loaded,ErrReason}}
+ end
+ end;
+ true ->
+ %% Already readable, do not worry be happy
+ fun() -> Done end
+ end.
+
+disc_load_table(Tab, Reason, ReplyTo) ->
+ Done = #loader_done{is_loaded = true,
+ table_name = Tab,
+ needs_announce = false,
+ needs_sync = false,
+ needs_reply = ReplyTo /= undefined,
+ reply_to = ReplyTo,
+ reply = {loaded, ok}
+ },
+ Res = mnesia_loader:disc_load_table(Tab, Reason),
+ if
+ Res == {loaded, ok} ->
+ Done#loader_done{needs_announce = true,
+ needs_sync = true,
+ reply = Res};
+ ReplyTo /= undefined ->
+ Done#loader_done{is_loaded = false,
+ reply = Res};
+ true ->
+ fatal("Cannot load table ~p from disc: ~p~n", [Tab, Res])
+ end.
+
+filter_active(Tab) ->
+ ByForce = val({Tab, load_by_force}),
+ Active = val({Tab, active_replicas}),
+ Masters = mnesia_recover:get_master_nodes(Tab),
+ Ns = do_filter_active(ByForce, Active, Masters),
+ %% Reorder the so that we load from fastest first
+ LS = ?catch_val({Tab, storage_type}),
+ DOC = val({Tab, disc_only_copies}),
+ {Good,Worse} =
+ case LS of
+ disc_only_copies ->
+ G = mnesia_lib:intersect(Ns, DOC),
+ {G,Ns--G};
+ _ ->
+ G = Ns -- DOC,
+ {G,Ns--G}
+ end,
+ %% Pick a random node of the fastest
+ Len = length(Good),
+ if
+ Len > 0 ->
+ R = erlang:phash(node(), Len+1),
+ random(R-1,Good,Worse);
+ true ->
+ Worse
+ end.
+
+random(N, [H|R], Acc) when N > 0 ->
+ random(N-1,R, [H|Acc]);
+random(0, L, Acc) ->
+ L ++ Acc.
+
+do_filter_active(true, Active, _Masters) ->
+ Active;
+do_filter_active(false, Active, []) ->
+ Active;
+do_filter_active(false, Active, Masters) ->
+ mnesia_lib:intersect(Active, Masters).
+
+