%% ``Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings %% AB. All Rights Reserved.'' %% %% $Id: mnesia_controller.erl,v 1.3 2010/03/04 13:54:19 maria Exp $ %% %% The mnesia_init process loads tables from local disc or from %% another nodes. It also coordinates updates of the info about %% where we can read and write tables. %% %% Tables may need to be loaded initially at startup of the local %% node or when other nodes announces that they already have loaded %% tables that we also want. %% %% Initially we set the load request queue to those tables that we %% safely can load locally, i.e. tables where we have the last %% consistent replica and we have received mnesia_down from all %% other nodes holding the table. Then we let the mnesia_init %% process enter its normal working state. %% %% When we need to load a table we append a request to the load %% request queue. All other requests are regarded as high priority %% and are processed immediately (e.g. update table whereabouts). %% We processes the load request queue as a "background" job.. -module(mnesia_controller). -behaviour(gen_server). %% Mnesia internal stuff -export([ start/0, i_have_tab/1, info/0, get_info/1, get_workers/1, force_load_table/1, async_dump_log/1, sync_dump_log/1, connect_nodes/1, wait_for_schema_commit_lock/0, release_schema_commit_lock/0, create_table/1, get_disc_copy/1, get_cstructs/0, sync_and_block_table_whereabouts/4, sync_del_table_copy_whereabouts/2, block_table/1, unblock_table/1, block_controller/0, unblock_controller/0, unannounce_add_table_copy/2, master_nodes_updated/2, mnesia_down/1, add_active_replica/2, add_active_replica/3, add_active_replica/4, change_table_access_mode/1, del_active_replica/2, wait_for_tables/2, get_network_copy/2, merge_schema/0, start_remote_sender/4, schedule_late_disc_load/2 ]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% Module internal stuff -export([call/1, cast/1, dump_and_reply/2, load_and_reply/2, send_and_reply/2, wait_for_tables_init/2 ]). -import(mnesia_lib, [set/2, add/2]). -import(mnesia_lib, [fatal/2, error/2, verbose/2, dbg_out/2]). -include("mnesia.hrl"). -define(SERVER_NAME, ?MODULE). -record(state, {supervisor, schema_is_merged = false, early_msgs = [], loader_pid, loader_queue = [], sender_pid, sender_queue = [], late_loader_queue = [], dumper_pid, % Dumper or schema commit pid dumper_queue = [], % Dumper or schema commit queue dump_log_timer_ref, is_stopping = false }). -record(worker_reply, {what, pid, result }). -record(schema_commit_lock, {owner}). -record(block_controller, {owner}). -record(dump_log, {initiated_by, opt_reply_to }). -record(net_load, {table, reason, opt_reply_to, cstruct = unknown }). -record(send_table, {table, receiver_pid, remote_storage }). -record(disc_load, {table, reason, opt_reply_to }). -record(late_load, {table, reason, opt_reply_to, loaders }). -record(loader_done, {worker_pid, is_loaded, table_name, needs_announce, needs_sync, needs_reply, reply_to, reply}). -record(sender_done, {worker_pid, worker_res, table_name }). -record(dumper_done, {worker_pid, worker_res }). val(Var) -> case ?catch_val(Var) of {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason); Value -> Value end. start() -> gen_server:start_link({local, ?SERVER_NAME}, ?MODULE, [self()], [{timeout, infinity} %% ,{debug, [trace]} ]). sync_dump_log(InitBy) -> call({sync_dump_log, InitBy}). async_dump_log(InitBy) -> ?SERVER_NAME ! {async_dump_log, InitBy}. %% Wait for tables to be active %% If needed, we will wait for Mnesia to start %% If Mnesia stops, we will wait for Mnesia to restart %% We will wait even if the list of tables is empty %% wait_for_tables(Tabs, Timeout) when list(Tabs), Timeout == infinity -> do_wait_for_tables(Tabs, Timeout); wait_for_tables(Tabs, Timeout) when list(Tabs), integer(Timeout), Timeout >= 0 -> do_wait_for_tables(Tabs, Timeout); wait_for_tables(Tabs, Timeout) -> {error, {badarg, Tabs, Timeout}}. do_wait_for_tables(Tabs, 0) -> reply_wait(Tabs); do_wait_for_tables(Tabs, Timeout) -> Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]), receive {?SERVER_NAME, Pid, Res} -> Res; {'EXIT', Pid, _} -> reply_wait(Tabs) after Timeout -> unlink(Pid), exit(Pid, timeout), reply_wait(Tabs) end. reply_wait(Tabs) -> case catch mnesia_lib:active_tables() of {'EXIT', _} -> {error, {node_not_running, node()}}; Active when list(Active) -> case Tabs -- Active of [] -> ok; BadTabs -> {timeout, BadTabs} end end. wait_for_tables_init(From, Tabs) -> process_flag(trap_exit, true), Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)), From ! {?SERVER_NAME, self(), Res}, unlink(From), exit(normal). wait_for_init(From, Tabs, Init) -> case catch link(Init) of {'EXIT', _} -> %% Mnesia is not started {error, {node_not_running, node()}}; true when pid(Init) -> cast({sync_tabs, Tabs, self()}), rec_tabs(Tabs, Tabs, From, Init) end. sync_reply(Waiter, Tab) -> Waiter ! {?SERVER_NAME, {tab_synced, Tab}}. rec_tabs([Tab | Tabs], AllTabs, From, Init) -> receive {?SERVER_NAME, {tab_synced, Tab}} -> rec_tabs(Tabs, AllTabs, From, Init); {'EXIT', From, _} -> %% This will trigger an exit signal %% to mnesia_init exit(wait_for_tables_timeout); {'EXIT', Init, _} -> %% Oops, mnesia_init stopped, exit(mnesia_stopped) end; rec_tabs([], _, _, Init) -> unlink(Init), ok. get_cstructs() -> call(get_cstructs). mnesia_down(Node) -> case cast({mnesia_down, Node}) of {error, _} -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node); _Pid -> ok end. wait_for_schema_commit_lock() -> link(whereis(?SERVER_NAME)), unsafe_call(wait_for_schema_commit_lock). block_controller() -> call(block_controller). unblock_controller() -> cast(unblock_controller). release_schema_commit_lock() -> cast({release_schema_commit_lock, self()}), unlink(whereis(?SERVER_NAME)). %% Special for preparation of add table copy get_network_copy(Tab, Cs) -> Work = #net_load{table = Tab, reason = {dumper, add_table_copy}, cstruct = Cs }, Res = (catch load_table(Work)), if Res#loader_done.is_loaded == true -> Tab = Res#loader_done.table_name, case Res#loader_done.needs_announce of true -> i_have_tab(Tab); false -> ignore end; true -> ignore end, receive %% Flush copier done message {copier_done, _Node} -> ok after 500 -> %% avoid hanging if something is wrong and we shall fail. ignore end, Res#loader_done.reply. %% This functions is invoked from the dumper %% %% There are two cases here: %% startup -> %% no need for sync, since mnesia_controller not started yet %% schema_trans -> %% already synced with mnesia_controller since the dumper %% is syncronously started from mnesia_controller create_table(Tab) -> {loaded, ok} = mnesia_loader:disc_load_table(Tab, {dumper,create_table}). get_disc_copy(Tab) -> disc_load_table(Tab, {dumper,change_table_copy_type}, undefined). %% Returns ok instead of yes force_load_table(Tab) when atom(Tab), Tab /= schema -> case ?catch_val({Tab, storage_type}) of ram_copies -> do_force_load_table(Tab); disc_copies -> do_force_load_table(Tab); disc_only_copies -> do_force_load_table(Tab); unknown -> set({Tab, load_by_force}, true), cast({force_load_updated, Tab}), wait_for_tables([Tab], infinity); {'EXIT', _} -> {error, {no_exists, Tab}} end; force_load_table(Tab) -> {error, {bad_type, Tab}}. do_force_load_table(Tab) -> Loaded = ?catch_val({Tab, load_reason}), case Loaded of unknown -> set({Tab, load_by_force}, true), mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user), wait_for_tables([Tab], infinity); {'EXIT', _} -> set({Tab, load_by_force}, true), mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user), wait_for_tables([Tab], infinity); _ -> ok end. master_nodes_updated(schema, _Masters) -> ignore; master_nodes_updated(Tab, Masters) -> cast({master_nodes_updated, Tab, Masters}). schedule_late_disc_load(Tabs, Reason) -> MsgTag = late_disc_load, try_schedule_late_disc_load(Tabs, Reason, MsgTag). try_schedule_late_disc_load(Tabs, _Reason, MsgTag) when Tabs == [], MsgTag /= schema_is_merged -> ignore; try_schedule_late_disc_load(Tabs, Reason, MsgTag) -> GetIntents = fun() -> Item = mnesia_late_disc_load, Nodes = val({current, db_nodes}), mnesia:lock({global, Item, Nodes}, write), case multicall(Nodes -- [node()], disc_load_intents) of {Replies, []} -> call({MsgTag, Tabs, Reason, Replies}), done; {_, BadNodes} -> %% Some nodes did not respond, lets try again {retry, BadNodes} end end, case mnesia:transaction(GetIntents) of {'atomic', done} -> done; {'atomic', {retry, BadNodes}} -> verbose("Retry late_load_tables because bad nodes: ~p~n", [BadNodes]), try_schedule_late_disc_load(Tabs, Reason, MsgTag); {aborted, AbortReason} -> fatal("Cannot late_load_tables~p: ~p~n", [[Tabs, Reason, MsgTag], AbortReason]) end. connect_nodes(Ns) -> case mnesia:system_info(is_running) of no -> {error, {node_not_running, node()}}; yes -> {NewC, OldC} = mnesia_recover:connect_nodes(Ns), Connected = NewC ++OldC, New1 = mnesia_lib:intersect(Ns, Connected), New = New1 -- val({current, db_nodes}), case try_merge_schema(New) of ok -> mnesia_lib:add_list(extra_db_nodes, New), {ok, New}; {aborted, {throw, Str}} when list(Str) -> %%mnesia_recover:disconnect_nodes(New), {error, {merge_schema_failed, lists:flatten(Str)}}; Else -> %% Unconnect nodes where merge failed!! %% mnesia_recover:disconnect_nodes(New), {error, Else} end end. %% Merge the local schema with the schema on other nodes. %% But first we must let all processes that want to force %% load tables wait until the schema merge is done. merge_schema() -> AllNodes = mnesia_lib:all_nodes(), case try_merge_schema(AllNodes) of ok -> schema_is_merged(); {aborted, {throw, Str}} when list(Str) -> fatal("Failed to merge schema: ~s~n", [Str]); Else -> fatal("Failed to merge schema: ~p~n", [Else]) end. try_merge_schema(Nodes) -> case mnesia_schema:merge_schema() of {'atomic', not_merged} -> %% No more nodes that we need to merge the schema with ok; {'atomic', {merged, OldFriends, NewFriends}} -> %% Check if new nodes has been added to the schema Diff = mnesia_lib:all_nodes() -- [node() | Nodes], mnesia_recover:connect_nodes(Diff), %% Tell everybody to adopt orphan tables im_running(OldFriends, NewFriends), im_running(NewFriends, OldFriends), try_merge_schema(Nodes); {'atomic', {"Cannot get cstructs", Node, Reason}} -> dbg_out("Cannot get cstructs, Node ~p ~p~n", [Node, Reason]), timer:sleep(1000), % Avoid a endless loop look alike try_merge_schema(Nodes); Other -> Other end. im_running(OldFriends, NewFriends) -> abcast(OldFriends, {im_running, node(), NewFriends}). schema_is_merged() -> MsgTag = schema_is_merged, SafeLoads = initial_safe_loads(), %% At this point we do not know anything about %% which tables that the other nodes already %% has loaded and therefore we let the normal %% processing of the loader_queue take care %% of it, since we at that time point will %% know the whereabouts. We rely on the fact %% that all nodes tells each other directly %% when they have loaded a table and are %% willing to share it. try_schedule_late_disc_load(SafeLoads, initial, MsgTag). cast(Msg) -> case whereis(?SERVER_NAME) of undefined ->{error, {node_not_running, node()}}; Pid -> gen_server:cast(Pid, Msg) end. abcast(Nodes, Msg) -> gen_server:abcast(Nodes, ?SERVER_NAME, Msg). unsafe_call(Msg) -> case whereis(?SERVER_NAME) of undefined -> {error, {node_not_running, node()}}; Pid -> gen_server:call(Pid, Msg, infinity) end. call(Msg) -> case whereis(?SERVER_NAME) of undefined -> {error, {node_not_running, node()}}; Pid -> link(Pid), Res = gen_server:call(Pid, Msg, infinity), unlink(Pid), %% We get an exit signal if server dies receive {'EXIT', Pid, _Reason} -> {error, {node_not_running, node()}} after 0 -> ignore end, Res end. remote_call(Node, Func, Args) -> case catch gen_server:call({?MODULE, Node}, {Func, Args, self()}, infinity) of {'EXIT', Error} -> {error, Error}; Else -> Else end. multicall(Nodes, Msg) -> {Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity), PatchedGood = [Reply || {_Node, Reply} <- Good], {PatchedGood, Bad}. %% Make the replies look like rpc:multicalls.. %% rpc:multicall(Nodes, ?MODULE, call, [Msg]). %%%---------------------------------------------------------------------- %%% Callback functions from gen_server %%%---------------------------------------------------------------------- %%---------------------------------------------------------------------- %% Func: init/1 %% Returns: {ok, State} | %% {ok, State, Timeout} | %% {stop, Reason} %%---------------------------------------------------------------------- init([Parent]) -> process_flag(trap_exit, true), mnesia_lib:verbose("~p starting: ~p~n", [?SERVER_NAME, self()]), %% Handshake and initialize transaction recovery %% for new nodes detected in the schema All = mnesia_lib:all_nodes(), Diff = All -- [node() | val(original_nodes)], mnesia_lib:unset(original_nodes), mnesia_recover:connect_nodes(Diff), Interval = mnesia_monitor:get_env(dump_log_time_threshold), Msg = {async_dump_log, time_threshold}, {ok, Ref} = timer:send_interval(Interval, Msg), mnesia_dumper:start_regulator(), {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref}}. %%---------------------------------------------------------------------- %% Func: handle_call/3 %% Returns: {reply, Reply, State} | %% {reply, Reply, State, Timeout} | %% {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, Reply, State} | (terminate/2 is called) %% {stop, Reason, Reply, State} (terminate/2 is called) %%---------------------------------------------------------------------- handle_call({sync_dump_log, InitBy}, From, State) -> Worker = #dump_log{initiated_by = InitBy, opt_reply_to = From }, State2 = add_worker(Worker, State), noreply(State2); handle_call(wait_for_schema_commit_lock, From, State) -> Worker = #schema_commit_lock{owner = From}, State2 = add_worker(Worker, State), noreply(State2); handle_call(block_controller, From, State) -> Worker = #block_controller{owner = From}, State2 = add_worker(Worker, State), noreply(State2); handle_call(get_cstructs, From, State) -> Tabs = val({schema, tables}), Cstructs = [val({T, cstruct}) || T <- Tabs], Running = val({current, db_nodes}), reply(From, {cstructs, Cstructs, Running}), noreply(State); handle_call({schema_is_merged, TabsR, Reason, RemoteLoaders}, From, State) -> State2 = late_disc_load(TabsR, Reason, RemoteLoaders, From, State), %% Handle early messages Msgs = State2#state.early_msgs, State3 = State2#state{early_msgs = [], schema_is_merged = true}, Ns = val({current, db_nodes}), dbg_out("Schema is merged ~w, State ~w~n", [Ns, State3]), %% dbg_out("handle_early_msgs ~p ~n", [Msgs]), % qqqq handle_early_msgs(lists:reverse(Msgs), State3); handle_call(disc_load_intents, From, State) -> Tabs = disc_load_intents(State#state.loader_queue) ++ disc_load_intents(State#state.late_loader_queue), ActiveTabs = mnesia_lib:local_active_tables(), reply(From, {ok, node(), mnesia_lib:union(Tabs, ActiveTabs)}), noreply(State); handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) -> %%% dbg_out("update_w2w ~p", [[add, Tab, AddNode]]), %%% qqqq Current = val({current, db_nodes}), Res = case lists:member(AddNode, Current) and State#state.schema_is_merged == true of true -> mnesia_lib:add({Tab, where_to_write}, AddNode); false -> ignore end, {reply, Res, State}; handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From}, ReplyTo, State) -> KnownNode = lists:member(ToNode, val({current, db_nodes})), Merged = State#state.schema_is_merged, if KnownNode == false -> reply(ReplyTo, ignore), noreply(State); Merged == true -> Res = add_active_replica(Tab, ToNode, RemoteS, AccessMode), reply(ReplyTo, Res), noreply(State); true -> %% Schema is not merged Msg = {add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From}, Msgs = State#state.early_msgs, reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]}) end; handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) -> KnownNode = lists:member(node(From), val({current, db_nodes})), Merged = State#state.schema_is_merged, if KnownNode == false -> reply(ReplyTo, ignore), noreply(State); Merged == true -> Res = unannounce_add_table_copy(Tab, Node), reply(ReplyTo, Res), noreply(State); true -> %% Schema is not merged Msg = {unannounce_add_table_copy, [Tab, Node], From}, Msgs = State#state.early_msgs, reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge %% Set ReplyTO to undefined so we don't reply twice noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]}) end; handle_call(Msg, From, State) when State#state.schema_is_merged == false -> %% Buffer early messages %% dbg_out("Buffered early msg ~p ~n", [Msg]), %% qqqq Msgs = State#state.early_msgs, noreply(State#state{early_msgs = [{call, Msg, From} | Msgs]}); handle_call({net_load, Tab, Cs}, From, State) -> Worker = #net_load{table = Tab, opt_reply_to = From, reason = add_table_copy, cstruct = Cs }, State2 = add_worker(Worker, State), noreply(State2); handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) -> State2 = late_disc_load(Tabs, Reason, RemoteLoaders, From, State), noreply(State2); handle_call({block_table, [Tab], From}, _Dummy, State) -> case lists:member(node(From), val({current, db_nodes})) of true -> block_table(Tab); false -> ignore end, {reply, ok, State}; handle_call({check_w2r, _Node, Tab}, _From, State) -> {reply, val({Tab, where_to_read}), State}; handle_call(Msg, _From, State) -> error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]), noreply(State). disc_load_intents([H | T]) when record(H, disc_load) -> [H#disc_load.table | disc_load_intents(T)]; disc_load_intents([H | T]) when record(H, late_load) -> [H#late_load.table | disc_load_intents(T)]; disc_load_intents( [H | T]) when record(H, net_load) -> disc_load_intents(T); disc_load_intents([]) -> []. late_disc_load(TabsR, Reason, RemoteLoaders, From, State) -> verbose("Intend to load tables: ~p~n", [TabsR]), ?eval_debug_fun({?MODULE, late_disc_load}, [{tabs, TabsR}, {reason, Reason}, {loaders, RemoteLoaders}]), reply(From, queued), %% RemoteLoaders is a list of {ok, Node, Tabs} tuples %% Remove deleted tabs LocalTabs = mnesia_lib:val({schema, local_tables}), Filter = fun({Tab, Reas}, Acc) -> case lists:member(Tab, LocalTabs) of true -> [{Tab, Reas} | Acc]; false -> Acc end; (Tab, Acc) -> case lists:member(Tab, LocalTabs) of true -> [Tab | Acc]; false -> Acc end end, Tabs = lists:foldl(Filter, [], TabsR), Nodes = val({current, db_nodes}), LateLoaders = late_loaders(Tabs, Reason, RemoteLoaders, Nodes), LateQueue = State#state.late_loader_queue ++ LateLoaders, State#state{late_loader_queue = LateQueue}. late_loaders([{Tab, Reason} | Tabs], DefaultReason, RemoteLoaders, Nodes) -> LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []), case LoadNodes of [] -> cast({disc_load, Tab, Reason}); % Ugly cast _ -> ignore end, LateLoad = #late_load{table = Tab, loaders = LoadNodes, reason = Reason}, [LateLoad | late_loaders(Tabs, DefaultReason, RemoteLoaders, Nodes)]; late_loaders([Tab | Tabs], Reason, RemoteLoaders, Nodes) -> Loaders = late_load_filter(RemoteLoaders, Tab, Nodes, []), case Loaders of [] -> cast({disc_load, Tab, Reason}); % Ugly cast _ -> ignore end, LateLoad = #late_load{table = Tab, loaders = Loaders, reason = Reason}, [LateLoad | late_loaders(Tabs, Reason, RemoteLoaders, Nodes)]; late_loaders([], _Reason, _RemoteLoaders, _Nodes) -> []. late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) -> late_load_filter(RemoteLoaders, Tab, Nodes, Acc); late_load_filter([{badrpc, _} | RemoteLoaders], Tab, Nodes, Acc) -> late_load_filter(RemoteLoaders, Tab, Nodes, Acc); late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) -> {ok, Node, Intents} = RL, Access = val({Tab, access_mode}), LocalC = val({Tab, local_content}), StillActive = lists:member(Node, Nodes), RemoteIntent = lists:member(Tab, Intents), if Access == read_write, LocalC == false, StillActive == true, RemoteIntent == true -> Masters = mnesia_recover:get_master_nodes(Tab), case lists:member(Node, Masters) of true -> %% The other node is master node for %% the table, accept his load intent late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]); false when Masters == [] -> %% The table has no master nodes %% accept his load intent late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]); false -> %% Some one else is master node for %% the table, ignore his load intent late_load_filter(RemoteLoaders, Tab, Nodes, Acc) end; true -> late_load_filter(RemoteLoaders, Tab, Nodes, Acc) end; late_load_filter([], _Tab, _Nodes, Acc) -> Acc. %%---------------------------------------------------------------------- %% Func: handle_cast/2 %% Returns: {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} (terminate/2 is called) %%---------------------------------------------------------------------- handle_cast({release_schema_commit_lock, _Owner}, State) -> if State#state.is_stopping == true -> {stop, shutdown, State}; true -> case State#state.dumper_queue of [#schema_commit_lock{}|Rest] -> [_Worker | Rest] = State#state.dumper_queue, State2 = State#state{dumper_pid = undefined, dumper_queue = Rest}, State3 = opt_start_worker(State2), noreply(State3); _ -> noreply(State) end end; handle_cast(unblock_controller, State) -> if State#state.is_stopping == true -> {stop, shutdown, State}; record(hd(State#state.dumper_queue), block_controller) -> [_Worker | Rest] = State#state.dumper_queue, State2 = State#state{dumper_pid = undefined, dumper_queue = Rest}, State3 = opt_start_worker(State2), noreply(State3) end; handle_cast({mnesia_down, Node}, State) -> maybe_log_mnesia_down(Node), mnesia_lib:del({current, db_nodes}, Node), mnesia_checkpoint:tm_mnesia_down(Node), Alltabs = val({schema, tables}), State2 = reconfigure_tables(Node, State, Alltabs), case State#state.sender_pid of undefined -> ignore; Pid when pid(Pid) -> Pid ! {copier_done, Node} end, case State#state.loader_pid of undefined -> ignore; Pid2 when pid(Pid2) -> Pid2 ! {copier_done, Node} end, NewSenders = case State#state.sender_queue of [OldSender | RestSenders] -> Remove = fun(ST) -> node(ST#send_table.receiver_pid) /= Node end, NewS = lists:filter(Remove, RestSenders), %% Keep old sender it will be removed by sender_done [OldSender | NewS]; [] -> [] end, Early = remove_early_messages(State2#state.early_msgs, Node), mnesia_monitor:mnesia_down(?SERVER_NAME, Node), noreply(State2#state{sender_queue = NewSenders, early_msgs = Early}); handle_cast({im_running, _Node, NewFriends}, State) -> Tabs = mnesia_lib:local_active_tables() -- [schema], Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})), abcast(Ns, {adopt_orphans, node(), Tabs}), noreply(State); handle_cast(Msg, State) when State#state.schema_is_merged == false -> %% Buffer early messages Msgs = State#state.early_msgs, noreply(State#state{early_msgs = [{cast, Msg} | Msgs]}); handle_cast({disc_load, Tab, Reason}, State) -> Worker = #disc_load{table = Tab, reason = Reason}, State2 = add_worker(Worker, State), noreply(State2); handle_cast(Worker, State) when record(Worker, send_table) -> State2 = add_worker(Worker, State), noreply(State2); handle_cast({sync_tabs, Tabs, From}, State) -> %% user initiated wait_for_tables handle_sync_tabs(Tabs, From), noreply(State); handle_cast({i_have_tab, Tab, Node}, State) -> case lists:member(Node, val({current, db_nodes})) of true -> State2 = node_has_tabs([Tab], Node, State), noreply(State2); false -> noreply(State) end; handle_cast({force_load_updated, Tab}, State) -> case val({Tab, active_replicas}) of [] -> %% No valid replicas noreply(State); [SomeNode | _] -> State2 = node_has_tabs([Tab], SomeNode, State), noreply(State2) end; handle_cast({master_nodes_updated, Tab, Masters}, State) -> Active = val({Tab, active_replicas}), Valid = case val({Tab, load_by_force}) of true -> Active; false -> if Masters == [] -> Active; true -> mnesia_lib:intersect(Masters, Active) end end, case Valid of [] -> %% No valid replicas noreply(State); [SomeNode | _] -> State2 = node_has_tabs([Tab], SomeNode, State), noreply(State2) end; handle_cast({adopt_orphans, Node, Tabs}, State) -> State2 = node_has_tabs(Tabs, Node, State), %% Register the other node as up and running mnesia_recover:log_mnesia_up(Node), verbose("Logging mnesia_up ~w~n", [Node]), mnesia_lib:report_system_event({mnesia_up, Node}), %% Load orphan tables LocalTabs = val({schema, local_tables}) -- [schema], Nodes = val({current, db_nodes}), {LocalOrphans, RemoteMasters} = orphan_tables(LocalTabs, Node, Nodes, [], []), Reason = {adopt_orphan, node()}, mnesia_late_loader:async_late_disc_load(node(), LocalOrphans, Reason), Fun = fun(N) -> RemoteOrphans = [Tab || {Tab, Ns} <- RemoteMasters, lists:member(N, Ns)], mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason) end, lists:foreach(Fun, Nodes), Queue = State2#state.loader_queue, State3 = State2#state{loader_queue = Queue}, noreply(State3); handle_cast(Msg, State) -> error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]), noreply(State). handle_sync_tabs([Tab | Tabs], From) -> case val({Tab, where_to_read}) of nowhere -> case get({sync_tab, Tab}) of undefined -> put({sync_tab, Tab}, [From]); Pids -> put({sync_tab, Tab}, [From | Pids]) end; _ -> sync_reply(From, Tab) end, handle_sync_tabs(Tabs, From); handle_sync_tabs([], _From) -> ok. %%---------------------------------------------------------------------- %% Func: handle_info/2 %% Returns: {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} (terminate/2 is called) %%---------------------------------------------------------------------- handle_info({async_dump_log, InitBy}, State) -> Worker = #dump_log{initiated_by = InitBy}, State2 = add_worker(Worker, State), noreply(State2); handle_info(Done, State) when record(Done, dumper_done) -> Pid = Done#dumper_done.worker_pid, Res = Done#dumper_done.worker_res, if State#state.is_stopping == true -> {stop, shutdown, State}; Res == dumped, Pid == State#state.dumper_pid -> [Worker | Rest] = State#state.dumper_queue, reply(Worker#dump_log.opt_reply_to, Res), State2 = State#state{dumper_pid = undefined, dumper_queue = Rest}, State3 = opt_start_worker(State2), noreply(State3); true -> fatal("Dumper failed: ~p~n state: ~p~n", [Res, State]), {stop, fatal, State} end; handle_info(Done, State) when record(Done, loader_done) -> if %% Assertion Done#loader_done.worker_pid == State#state.loader_pid -> ok end, [_Worker | Rest] = LoadQ0 = State#state.loader_queue, LateQueue0 = State#state.late_loader_queue, {LoadQ, LateQueue} = case Done#loader_done.is_loaded of true -> Tab = Done#loader_done.table_name, %% Optional user sync case Done#loader_done.needs_sync of true -> user_sync_tab(Tab); false -> ignore end, %% Optional table announcement case Done#loader_done.needs_announce of true -> i_have_tab(Tab), case Tab of schema -> ignore; _ -> %% Local node needs to perform user_sync_tab/1 Ns = val({current, db_nodes}), abcast(Ns, {i_have_tab, Tab, node()}) end; false -> case Tab of schema -> ignore; _ -> %% Local node needs to perform user_sync_tab/1 Ns = val({current, db_nodes}), AlreadyKnows = val({Tab, active_replicas}), abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()}) end end, %% Optional client reply case Done#loader_done.needs_reply of true -> reply(Done#loader_done.reply_to, Done#loader_done.reply); false -> ignore end, {Rest, reply_late_load(Tab, LateQueue0)}; false -> case Done#loader_done.reply of restart -> {LoadQ0, LateQueue0}; _ -> {Rest, LateQueue0} end end, State2 = State#state{loader_pid = undefined, loader_queue = LoadQ, late_loader_queue = LateQueue}, State3 = opt_start_worker(State2), noreply(State3); handle_info(Done, State) when record(Done, sender_done) -> Pid = Done#sender_done.worker_pid, Res = Done#sender_done.worker_res, if Res == ok, Pid == State#state.sender_pid -> [Worker | Rest] = State#state.sender_queue, Worker#send_table.receiver_pid ! {copier_done, node()}, State2 = State#state{sender_pid = undefined, sender_queue = Rest}, State3 = opt_start_worker(State2), noreply(State3); true -> %% No need to send any message to the table receiver %% since it will soon get a mnesia_down anyway fatal("Sender failed: ~p~n state: ~p~n", [Res, State]), {stop, fatal, State} end; handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor -> catch set(mnesia_status, stopping), case State#state.dumper_pid of undefined -> dbg_out("~p was ~p~n", [?SERVER_NAME, R]), {stop, shutdown, State}; _ -> noreply(State#state{is_stopping = true}) end; handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid -> case State#state.dumper_queue of [#schema_commit_lock{}|Workers] -> %% Schema trans crashed or was killed State2 = State#state{dumper_queue = Workers, dumper_pid = undefined}, State3 = opt_start_worker(State2), noreply(State3); _Other -> fatal("Dumper or schema commit crashed: ~p~n state: ~p~n", [R, State]), {stop, fatal, State} end; handle_info({'EXIT', Pid, R}, State) when Pid == State#state.loader_pid -> fatal("Loader crashed: ~p~n state: ~p~n", [R, State]), {stop, fatal, State}; handle_info({'EXIT', Pid, R}, State) when Pid == State#state.sender_pid -> %% No need to send any message to the table receiver %% since it will soon get a mnesia_down anyway fatal("Sender crashed: ~p~n state: ~p~n", [R, State]), {stop, fatal, State}; handle_info({From, get_state}, State) -> From ! {?SERVER_NAME, State}, noreply(State); %% No real need for buffering handle_info(Msg, State) when State#state.schema_is_merged == false -> %% Buffer early messages Msgs = State#state.early_msgs, noreply(State#state{early_msgs = [{info, Msg} | Msgs]}); handle_info({'EXIT', Pid, wait_for_tables_timeout}, State) -> sync_tab_timeout(Pid, get()), noreply(State); handle_info(Msg, State) -> error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]), noreply(State). reply_late_load(Tab, [H | T]) when H#late_load.table == Tab -> reply(H#late_load.opt_reply_to, ok), reply_late_load(Tab, T); reply_late_load(Tab, [H | T]) -> [H | reply_late_load(Tab, T)]; reply_late_load(_Tab, []) -> []. sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) -> case lists:delete(Pid, Pids) of [] -> erase({sync_tab, Tab}); Pids2 -> put({sync_tab, Tab}, Pids2) end, sync_tab_timeout(Pid, Tail); sync_tab_timeout(Pid, [_ | Tail]) -> sync_tab_timeout(Pid, Tail); sync_tab_timeout(_Pid, []) -> ok. %% Pick the load record that has the highest load order %% Returns {BestLoad, RemainingQueue} or {none, []} if queue is empty pick_next(Queue) -> pick_next(Queue, none, none, []). pick_next([Head | Tail], Load, Order, Rest) when record(Head, net_load) -> Tab = Head#net_load.table, select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest); pick_next([Head | Tail], Load, Order, Rest) when record(Head, disc_load) -> Tab = Head#disc_load.table, select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest); pick_next([], Load, _Order, Rest) -> {Load, Rest}. select_best(Load, Tail, Order, none, none, Rest) -> pick_next(Tail, Load, Order, Rest); select_best(Load, Tail, Order, OldLoad, OldOrder, Rest) when Order > OldOrder -> pick_next(Tail, Load, Order, [OldLoad | Rest]); select_best(Load, Tail, _Order, OldLoad, OldOrder, Rest) -> pick_next(Tail, OldLoad, OldOrder, [Load | Rest]). %%---------------------------------------------------------------------- %% Func: terminate/2 %% Purpose: Shutdown the server %% Returns: any (ignored by gen_server) %%---------------------------------------------------------------------- terminate(Reason, State) -> mnesia_monitor:terminate_proc(?SERVER_NAME, Reason, State). %%---------------------------------------------------------------------- %% Func: code_change/3 %% Purpose: Upgrade process when its code is to be changed %% Returns: {ok, NewState} %%---------------------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}. %%%---------------------------------------------------------------------- %%% Internal functions %%%---------------------------------------------------------------------- maybe_log_mnesia_down(N) -> %% We use mnesia_down when deciding which tables to load locally, %% so if we are not running (i.e haven't decided which tables %% to load locally), don't log mnesia_down yet. case mnesia_lib:is_running() of yes -> verbose("Logging mnesia_down ~w~n", [N]), mnesia_recover:log_mnesia_down(N), ok; _ -> Filter = fun(Tab) -> inactive_copy_holders(Tab, N) end, HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]), if HalfLoadedTabs == true -> verbose("Logging mnesia_down ~w~n", [N]), mnesia_recover:log_mnesia_down(N), ok; true -> %% Unfortunately we have not loaded some common %% tables yet, so we cannot rely on the nodedown log_later %% BUGBUG handle this case!!! end end. inactive_copy_holders(Tab, Node) -> Cs = val({Tab, cstruct}), case mnesia_lib:cs_to_storage_type(Node, Cs) of unknown -> false; _Storage -> mnesia_lib:not_active_here(Tab) end. orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) -> Cs = val({Tab, cstruct}), CopyHolders = mnesia_lib:copy_holders(Cs), RamCopyHolders = Cs#cstruct.ram_copies, DiscCopyHolders = CopyHolders -- RamCopyHolders, DiscNodes = val({schema, disc_copies}), LocalContent = Cs#cstruct.local_content, RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes), Active = val({Tab, active_replicas}), case lists:member(Node, DiscCopyHolders) of true when Active == [] -> case DiscCopyHolders -- Ns of [] -> %% We're last up and the other nodes have not %% loaded the table. Lets load it if we are %% the smallest node. case lists:min(DiscCopyHolders) of Min when Min == node() -> case mnesia_recover:get_master_nodes(Tab) of [] -> L = [Tab | Local], orphan_tables(Tabs, Node, Ns, L, Remote); Masters -> R = [{Tab, Masters} | Remote], orphan_tables(Tabs, Node, Ns, Local, R) end; _ -> orphan_tables(Tabs, Node, Ns, Local, Remote) end; _ -> orphan_tables(Tabs, Node, Ns, Local, Remote) end; false when Active == [], DiscCopyHolders == [], RamCopyHoldersOnDiscNodes == [] -> %% Special case when all replicas resides on disc less nodes orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote); _ when LocalContent == true -> orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote); _ -> orphan_tables(Tabs, Node, Ns, Local, Remote) end; orphan_tables([], _, _, LocalOrphans, RemoteMasters) -> {LocalOrphans, RemoteMasters}. node_has_tabs([Tab | Tabs], Node, State) when Node /= node() -> State2 = update_whereabouts(Tab, Node, State), node_has_tabs(Tabs, Node, State2); node_has_tabs([Tab | Tabs], Node, State) -> user_sync_tab(Tab), node_has_tabs(Tabs, Node, State); node_has_tabs([], _Node, State) -> State. update_whereabouts(Tab, Node, State) -> Storage = val({Tab, storage_type}), Read = val({Tab, where_to_read}), LocalC = val({Tab, local_content}), BeingCreated = (?catch_val({Tab, create_table}) == true), Masters = mnesia_recover:get_master_nodes(Tab), ByForce = val({Tab, load_by_force}), GoGetIt = if ByForce == true -> true; Masters == [] -> true; true -> lists:member(Node, Masters) end, dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n", [Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]), if LocalC == true -> %% Local contents, don't care about other node State; Storage == unknown, Read == nowhere -> %% No own copy, time to read remotely %% if the other node is a good node add_active_replica(Tab, Node), case GoGetIt of true -> set({Tab, where_to_read}, Node), user_sync_tab(Tab), State; false -> State end; Storage == unknown -> %% No own copy, continue to read remotely add_active_replica(Tab, Node), NodeST = mnesia_lib:storage_type_at_node(Node, Tab), ReadST = mnesia_lib:storage_type_at_node(Read, Tab), if %% Avoid reading from disc_only_copies NodeST == disc_only_copies -> ignore; ReadST == disc_only_copies -> mnesia_lib:set_remote_where_to_read(Tab); true -> ignore end, user_sync_tab(Tab), State; BeingCreated == true -> %% The table is currently being created %% and we shall have an own copy of it. %% We will load the (empty) table locally. add_active_replica(Tab, Node), State; Read == nowhere -> %% Own copy, go and get a copy of the table %% if the other node is master or if there %% are no master at all add_active_replica(Tab, Node), case GoGetIt of true -> Worker = #net_load{table = Tab, reason = {active_remote, Node}}, add_worker(Worker, State); false -> State end; true -> %% We already have an own copy add_active_replica(Tab, Node), user_sync_tab(Tab), State end. initial_safe_loads() -> case val({schema, storage_type}) of ram_copies -> Downs = [], Tabs = val({schema, local_tables}) -- [schema], LastC = fun(T) -> last_consistent_replica(T, Downs) end, lists:zf(LastC, Tabs); disc_copies -> Downs = mnesia_recover:get_mnesia_downs(), dbg_out("mnesia_downs = ~p~n", [Downs]), Tabs = val({schema, local_tables}) -- [schema], LastC = fun(T) -> last_consistent_replica(T, Downs) end, lists:zf(LastC, Tabs) end. last_consistent_replica(Tab, Downs) -> Cs = val({Tab, cstruct}), Storage = mnesia_lib:cs_to_storage_type(node(), Cs), Ram = Cs#cstruct.ram_copies, Disc = Cs#cstruct.disc_copies, DiscOnly = Cs#cstruct.disc_only_copies, BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs, BetterCopies = BetterCopies0 -- Ram, AccessMode = Cs#cstruct.access_mode, Copies = mnesia_lib:copy_holders(Cs), Masters = mnesia_recover:get_master_nodes(Tab), LocalMaster0 = lists:member(node(), Masters), LocalContent = Cs#cstruct.local_content, RemoteMaster = if Masters == [] -> false; true -> not LocalMaster0 end, LocalMaster = if Masters == [] -> false; true -> LocalMaster0 end, if Copies == [node()] -> %% Only one copy holder and it is local. %% It may also be a local contents table {true, {Tab, local_only}}; LocalContent == true -> {true, {Tab, local_content}}; LocalMaster == true -> %% We have a local master {true, {Tab, local_master}}; RemoteMaster == true -> %% Wait for remote master copy false; Storage == ram_copies -> if Disc == [], DiscOnly == [] -> %% Nobody has copy on disc {true, {Tab, ram_only}}; true -> %% Some other node has copy on disc false end; AccessMode == read_only -> %% No one has been able to update the table, %% i.e. all disc resident copies are equal {true, {Tab, read_only}}; BetterCopies /= [], Masters /= [node()] -> %% There are better copies on other nodes %% and we do not have the only master copy false; true -> {true, {Tab, initial}} end. reconfigure_tables(N, State, [Tab |Tail]) -> del_active_replica(Tab, N), case val({Tab, where_to_read}) of N -> mnesia_lib:set_remote_where_to_read(Tab); _ -> ignore end, LateQ = drop_loaders(Tab, N, State#state.late_loader_queue), reconfigure_tables(N, State#state{late_loader_queue = LateQ}, Tail); reconfigure_tables(_, State, []) -> State. remove_early_messages([], _Node) -> []; remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) -> remove_early_messages(R, Node); %% Does a reply before queuing remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node) when node(From) == Node -> reply(ReplyTo, ok), %% Remove gen:server waits.. remove_early_messages(R, Node); remove_early_messages([{cast, {i_have_tab, _Tab, Node}}|R], Node) -> remove_early_messages(R, Node); remove_early_messages([{cast, {adopt_orphans, Node, _Tabs}}|R], Node) -> remove_early_messages(R, Node); remove_early_messages([M|R],Node) -> [M|remove_early_messages(R,Node)]. %% Drop loader from late load queue and possibly trigger a disc_load drop_loaders(Tab, Node, [H | T]) when H#late_load.table == Tab -> %% Check if it is time to issue a disc_load request case H#late_load.loaders of [Node] -> Reason = {H#late_load.reason, last_loader_down, Node}, cast({disc_load, Tab, Reason}); % Ugly cast _ -> ignore end, %% Drop the node from the list of loaders H2 = H#late_load{loaders = H#late_load.loaders -- [Node]}, [H2 | drop_loaders(Tab, Node, T)]; drop_loaders(Tab, Node, [H | T]) -> [H | drop_loaders(Tab, Node, T)]; drop_loaders(_, _, []) -> []. add_active_replica(Tab, Node) -> add_active_replica(Tab, Node, val({Tab, cstruct})). add_active_replica(Tab, Node, Cs) when record(Cs, cstruct) -> Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs), AccessMode = Cs#cstruct.access_mode, add_active_replica(Tab, Node, Storage, AccessMode). %% Block table primitives block_table(Tab) -> Var = {Tab, where_to_commit}, Old = val(Var), New = {blocked, Old}, set(Var, New). % where_to_commit unblock_table(Tab) -> Var = {Tab, where_to_commit}, New = case val(Var) of {blocked, List} -> List; List -> List end, set(Var, New). % where_to_commit is_tab_blocked(W2C) when list(W2C) -> {false, W2C}; is_tab_blocked({blocked, W2C}) when list(W2C) -> {true, W2C}. mark_blocked_tab(true, Value) -> {blocked, Value}; mark_blocked_tab(false, Value) -> Value. %% add_active_replica(Tab, Node, Storage, AccessMode) -> Var = {Tab, where_to_commit}, {Blocked, Old} = is_tab_blocked(val(Var)), Del = lists:keydelete(Node, 1, Old), case AccessMode of read_write -> New = lists:sort([{Node, Storage} | Del]), set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit add({Tab, where_to_write}, Node); read_only -> set(Var, mark_blocked_tab(Blocked, Del)), mnesia_lib:del({Tab, where_to_write}, Node) end, add({Tab, active_replicas}, Node). del_active_replica(Tab, Node) -> Var = {Tab, where_to_commit}, {Blocked, Old} = is_tab_blocked(val(Var)), Del = lists:keydelete(Node, 1, Old), New = lists:sort(Del), set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit mnesia_lib:del({Tab, active_replicas}, Node), mnesia_lib:del({Tab, where_to_write}, Node). change_table_access_mode(Cs) -> Tab = Cs#cstruct.name, lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end, val({Tab, active_replicas})). %% node To now has tab loaded, but this must be undone %% This code is rpc:call'ed from the tab_copier process %% when it has *not* released it's table lock unannounce_add_table_copy(Tab, To) -> del_active_replica(Tab, To), case val({Tab , where_to_read}) of To -> mnesia_lib:set_remote_where_to_read(Tab); _ -> ignore end. user_sync_tab(Tab) -> case val(debug) of trace -> mnesia_subscr:subscribe(whereis(mnesia_event), {table, Tab}); _ -> ignore end, case erase({sync_tab, Tab}) of undefined -> ok; Pids -> lists:foreach(fun(Pid) -> sync_reply(Pid, Tab) end, Pids) end. i_have_tab(Tab) -> case val({Tab, local_content}) of true -> mnesia_lib:set_local_content_whereabouts(Tab); false -> set({Tab, where_to_read}, node()) end, add_active_replica(Tab, node()). sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema -> Current = val({current, db_nodes}), Ns = case lists:member(ToNode, Current) of true -> Current -- [ToNode]; false -> Current end, remote_call(ToNode, block_table, [Tab]), [remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) || Node <- [ToNode | Ns]], ok. sync_del_table_copy_whereabouts(Tab, ToNode) when Tab /= schema -> Current = val({current, db_nodes}), Ns = case lists:member(ToNode, Current) of true -> Current; false -> [ToNode | Current] end, Args = [Tab, ToNode], [remote_call(Node, unannounce_add_table_copy, Args) || Node <- Ns], ok. get_info(Timeout) -> case whereis(?SERVER_NAME) of undefined -> {timeout, Timeout}; Pid -> Pid ! {self(), get_state}, receive {?SERVER_NAME, State} when record(State, state) -> {info,State} after Timeout -> {timeout, Timeout} end end. get_workers(Timeout) -> case whereis(?SERVER_NAME) of undefined -> {timeout, Timeout}; Pid -> Pid ! {self(), get_state}, receive {?SERVER_NAME, State} when record(State, state) -> {workers, State#state.loader_pid, State#state.sender_pid, State#state.dumper_pid} after Timeout -> {timeout, Timeout} end end. info() -> Tabs = mnesia_lib:local_active_tables(), io:format( "---> Active tables <--- ~n", []), info(Tabs). info([Tab | Tail]) -> case val({Tab, storage_type}) of disc_only_copies -> info_format(Tab, dets:info(Tab, size), dets:info(Tab, file_size), "bytes on disc"); _ -> info_format(Tab, ?ets_info(Tab, size), ?ets_info(Tab, memory), "words of mem") end, info(Tail); info([]) -> ok; info(Tab) -> info([Tab]). info_format(Tab, Size, Mem, Media) -> StrT = mnesia_lib:pad_name(atom_to_list(Tab), 15, []), StrS = mnesia_lib:pad_name(integer_to_list(Size), 8, []), StrM = mnesia_lib:pad_name(integer_to_list(Mem), 8, []), io:format("~s: with ~s records occupying ~s ~s~n", [StrT, StrS, StrM, Media]). %% Handle early arrived messages handle_early_msgs([Msg | Msgs], State) -> %% The messages are in reverse order case handle_early_msg(Msg, State) of {stop, Reason, Reply, State2} -> {stop, Reason, Reply, State2}; {stop, Reason, State2} -> {stop, Reason, State2}; {noreply, State2} -> handle_early_msgs(Msgs, State2); {noreply, State2, _Timeout} -> handle_early_msgs(Msgs, State2); Else -> dbg_out("handle_early_msgs case clause ~p ~n", [Else]), erlang:error(Else, [[Msg | Msgs], State]) end; handle_early_msgs([], State) -> noreply(State). handle_early_msg({call, Msg, From}, State) -> handle_call(Msg, From, State); handle_early_msg({cast, Msg}, State) -> handle_cast(Msg, State); handle_early_msg({info, Msg}, State) -> handle_info(Msg, State). noreply(State) -> {noreply, State}. reply(undefined, Reply) -> Reply; reply(ReplyTo, Reply) -> gen_server:reply(ReplyTo, Reply), Reply. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Worker management %% Returns new State add_worker(Worker, State) when record(Worker, dump_log) -> InitBy = Worker#dump_log.initiated_by, Queue = State#state.dumper_queue, case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of false -> ignore; true when Worker#dump_log.opt_reply_to == undefined -> %% The same threshold has been exceeded again, %% before we have had the possibility to %% process the older one. DetectedBy = {dump_log, InitBy}, Event = {mnesia_overload, DetectedBy}, mnesia_lib:report_system_event(Event) end, Queue2 = Queue ++ [Worker], State2 = State#state{dumper_queue = Queue2}, opt_start_worker(State2); add_worker(Worker, State) when record(Worker, schema_commit_lock) -> Queue = State#state.dumper_queue, Queue2 = Queue ++ [Worker], State2 = State#state{dumper_queue = Queue2}, opt_start_worker(State2); add_worker(Worker, State) when record(Worker, net_load) -> Queue = State#state.loader_queue, State2 = State#state{loader_queue = Queue ++ [Worker]}, opt_start_worker(State2); add_worker(Worker, State) when record(Worker, send_table) -> Queue = State#state.sender_queue, State2 = State#state{sender_queue = Queue ++ [Worker]}, opt_start_worker(State2); add_worker(Worker, State) when record(Worker, disc_load) -> Queue = State#state.loader_queue, State2 = State#state{loader_queue = Queue ++ [Worker]}, opt_start_worker(State2); % Block controller should be used for upgrading mnesia. add_worker(Worker, State) when record(Worker, block_controller) -> Queue = State#state.dumper_queue, Queue2 = [Worker | Queue], State2 = State#state{dumper_queue = Queue2}, opt_start_worker(State2). %% Optionally start a worker %% %% Dumpers and loaders may run simultaneously %% but neither of them may run during schema commit. %% Loaders may not start if a schema commit is enqueued. opt_start_worker(State) when State#state.is_stopping == true -> State; opt_start_worker(State) -> %% Prioritize dumper and schema commit %% by checking them first case State#state.dumper_queue of [Worker | _Rest] when State#state.dumper_pid == undefined -> %% Great, a worker in queue and neither %% a schema transaction is being %% committed and nor a dumper is running %% Start worker but keep him in the queue if record(Worker, schema_commit_lock) -> ReplyTo = Worker#schema_commit_lock.owner, reply(ReplyTo, granted), {Owner, _Tag} = ReplyTo, State#state{dumper_pid = Owner}; record(Worker, dump_log) -> Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]), State2 = State#state{dumper_pid = Pid}, %% If the worker was a dumper we may %% possibly be able to start a loader %% or sender State3 = opt_start_sender(State2), opt_start_loader(State3); record(Worker, block_controller) -> case {State#state.sender_pid, State#state.loader_pid} of {undefined, undefined} -> ReplyTo = Worker#block_controller.owner, reply(ReplyTo, granted), {Owner, _Tag} = ReplyTo, State#state{dumper_pid = Owner}; _ -> State end end; _ -> %% Bad luck, try with a loader or sender instead State2 = opt_start_sender(State), opt_start_loader(State2) end. opt_start_sender(State) -> case State#state.sender_queue of []-> %% No need State; _ when State#state.sender_pid /= undefined -> %% Bad luck, a sender is already running State; [Sender | _SenderRest] -> case State#state.loader_queue of [Loader | _LoaderRest] when State#state.loader_pid /= undefined, Loader#net_load.table == Sender#send_table.table -> %% A conflicting loader is running State; _ -> SchemaQueue = State#state.dumper_queue, case lists:keymember(schema_commit, 1, SchemaQueue) of false -> %% Start worker but keep him in the queue Pid = spawn_link(?MODULE, send_and_reply, [self(), Sender]), State#state{sender_pid = Pid}; true -> %% Bad luck, we must wait for the schema commit State end end end. opt_start_loader(State) -> LoaderQueue = State#state.loader_queue, if LoaderQueue == [] -> %% No need State; State#state.loader_pid /= undefined -> %% Bad luck, an loader is already running State; true -> SchemaQueue = State#state.dumper_queue, case lists:keymember(schema_commit, 1, SchemaQueue) of false -> {Worker, Rest} = pick_next(LoaderQueue), %% Start worker but keep him in the queue Pid = spawn_link(?MODULE, load_and_reply, [self(), Worker]), State#state{loader_pid = Pid, loader_queue = [Worker | Rest]}; true -> %% Bad luck, we must wait for the schema commit State end end. start_remote_sender(Node, Tab, Receiver, Storage) -> Msg = #send_table{table = Tab, receiver_pid = Receiver, remote_storage = Storage}, gen_server:cast({?SERVER_NAME, Node}, Msg). dump_and_reply(ReplyTo, Worker) -> %% No trap_exit, die intentionally instead Res = mnesia_dumper:opt_dump_log(Worker#dump_log.initiated_by), ReplyTo ! #dumper_done{worker_pid = self(), worker_res = Res}, unlink(ReplyTo), exit(normal). send_and_reply(ReplyTo, Worker) -> %% No trap_exit, die intentionally instead Res = mnesia_loader:send_table(Worker#send_table.receiver_pid, Worker#send_table.table, Worker#send_table.remote_storage), ReplyTo ! #sender_done{worker_pid = self(), worker_res = Res}, unlink(ReplyTo), exit(normal). load_and_reply(ReplyTo, Worker) -> process_flag(trap_exit, true), Done = load_table(Worker), ReplyTo ! Done#loader_done{worker_pid = self()}, unlink(ReplyTo), exit(normal). %% Now it is time to load the table %% but first we must check if it still is neccessary load_table(Load) when record(Load, net_load) -> Tab = Load#net_load.table, ReplyTo = Load#net_load.opt_reply_to, Reason = Load#net_load.reason, LocalC = val({Tab, local_content}), AccessMode = val({Tab, access_mode}), ReadNode = val({Tab, where_to_read}), Active = filter_active(Tab), Done = #loader_done{is_loaded = true, table_name = Tab, needs_announce = false, needs_sync = false, needs_reply = true, reply_to = ReplyTo, reply = {loaded, ok} }, if ReadNode == node() -> %% Already loaded locally Done; LocalC == true -> Res = mnesia_loader:disc_load_table(Tab, load_local_content), Done#loader_done{reply = Res, needs_announce = true, needs_sync = true}; AccessMode == read_only -> disc_load_table(Tab, Reason, ReplyTo); true -> %% Either we cannot read the table yet %% or someone is moving a replica between %% two nodes Cs = Load#net_load.cstruct, Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs), case Res of {loaded, ok} -> Done#loader_done{needs_sync = true, reply = Res}; {not_loaded, storage_unknown} -> Done#loader_done{reply = Res}; {not_loaded, _} -> Done#loader_done{is_loaded = false, needs_reply = false, reply = Res} end end; load_table(Load) when record(Load, disc_load) -> Tab = Load#disc_load.table, Reason = Load#disc_load.reason, ReplyTo = Load#disc_load.opt_reply_to, ReadNode = val({Tab, where_to_read}), Active = filter_active(Tab), Done = #loader_done{is_loaded = true, table_name = Tab, needs_announce = false, needs_sync = false, needs_reply = false }, if Active == [], ReadNode == nowhere -> %% Not loaded anywhere, lets load it from disc disc_load_table(Tab, Reason, ReplyTo); ReadNode == nowhere -> %% Already loaded on other node, lets get it Cs = val({Tab, cstruct}), case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of {loaded, ok} -> Done#loader_done{needs_sync = true}; {not_loaded, storage_unknown} -> Done#loader_done{is_loaded = false}; {not_loaded, ErrReason} -> Done#loader_done{is_loaded = false, reply = {not_loaded,ErrReason}} end; true -> %% Already readable, do not worry be happy Done end. disc_load_table(Tab, Reason, ReplyTo) -> Done = #loader_done{is_loaded = true, table_name = Tab, needs_announce = false, needs_sync = false, needs_reply = true, reply_to = ReplyTo, reply = {loaded, ok} }, Res = mnesia_loader:disc_load_table(Tab, Reason), if Res == {loaded, ok} -> Done#loader_done{needs_announce = true, needs_sync = true, reply = Res}; ReplyTo /= undefined -> Done#loader_done{is_loaded = false, reply = Res}; true -> fatal("Cannot load table ~p from disc: ~p~n", [Tab, Res]) end. filter_active(Tab) -> ByForce = val({Tab, load_by_force}), Active = val({Tab, active_replicas}), Masters = mnesia_recover:get_master_nodes(Tab), do_filter_active(ByForce, Active, Masters). do_filter_active(true, Active, _Masters) -> Active; do_filter_active(false, Active, []) -> Active; do_filter_active(false, Active, Masters) -> mnesia_lib:intersect(Active, Masters).