diff options
Diffstat (limited to 'lib/dialyzer/test/r9c_SUITE_data/src/mnesia/mnesia_controller.erl')
-rw-r--r-- | lib/dialyzer/test/r9c_SUITE_data/src/mnesia/mnesia_controller.erl | 2010 |
1 files changed, 2010 insertions, 0 deletions
diff --git a/lib/dialyzer/test/r9c_SUITE_data/src/mnesia/mnesia_controller.erl b/lib/dialyzer/test/r9c_SUITE_data/src/mnesia/mnesia_controller.erl new file mode 100644 index 0000000000..1d7f29bfbd --- /dev/null +++ b/lib/dialyzer/test/r9c_SUITE_data/src/mnesia/mnesia_controller.erl @@ -0,0 +1,2010 @@ +%% ``The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved via the world wide web at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% The Initial Developer of the Original Code is Ericsson Utvecklings AB. +%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings +%% AB. All Rights Reserved.'' +%% +%% $Id: mnesia_controller.erl,v 1.3 2010/03/04 13:54:19 maria Exp $ +%% +%% The mnesia_init process loads tables from local disc or from +%% another nodes. It also coordinates updates of the info about +%% where we can read and write tables. +%% +%% Tables may need to be loaded initially at startup of the local +%% node or when other nodes announces that they already have loaded +%% tables that we also want. +%% +%% Initially we set the load request queue to those tables that we +%% safely can load locally, i.e. tables where we have the last +%% consistent replica and we have received mnesia_down from all +%% other nodes holding the table. Then we let the mnesia_init +%% process enter its normal working state. +%% +%% When we need to load a table we append a request to the load +%% request queue. All other requests are regarded as high priority +%% and are processed immediately (e.g. update table whereabouts). +%% We processes the load request queue as a "background" job.. + +-module(mnesia_controller). + +-behaviour(gen_server). + +%% Mnesia internal stuff +-export([ + start/0, + i_have_tab/1, + info/0, + get_info/1, + get_workers/1, + force_load_table/1, + async_dump_log/1, + sync_dump_log/1, + connect_nodes/1, + wait_for_schema_commit_lock/0, + release_schema_commit_lock/0, + create_table/1, + get_disc_copy/1, + get_cstructs/0, + sync_and_block_table_whereabouts/4, + sync_del_table_copy_whereabouts/2, + block_table/1, + unblock_table/1, + block_controller/0, + unblock_controller/0, + unannounce_add_table_copy/2, + master_nodes_updated/2, + mnesia_down/1, + add_active_replica/2, + add_active_replica/3, + add_active_replica/4, + change_table_access_mode/1, + del_active_replica/2, + wait_for_tables/2, + get_network_copy/2, + merge_schema/0, + start_remote_sender/4, + schedule_late_disc_load/2 + ]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +%% Module internal stuff +-export([call/1, + cast/1, + dump_and_reply/2, + load_and_reply/2, + send_and_reply/2, + wait_for_tables_init/2 + ]). + +-import(mnesia_lib, [set/2, add/2]). +-import(mnesia_lib, [fatal/2, error/2, verbose/2, dbg_out/2]). + +-include("mnesia.hrl"). + +-define(SERVER_NAME, ?MODULE). + +-record(state, {supervisor, + schema_is_merged = false, + early_msgs = [], + loader_pid, + loader_queue = [], + sender_pid, + sender_queue = [], + late_loader_queue = [], + dumper_pid, % Dumper or schema commit pid + dumper_queue = [], % Dumper or schema commit queue + dump_log_timer_ref, + is_stopping = false + }). + +-record(worker_reply, {what, + pid, + result + }). + +-record(schema_commit_lock, {owner}). +-record(block_controller, {owner}). + +-record(dump_log, {initiated_by, + opt_reply_to + }). + +-record(net_load, {table, + reason, + opt_reply_to, + cstruct = unknown + }). + +-record(send_table, {table, + receiver_pid, + remote_storage + }). + +-record(disc_load, {table, + reason, + opt_reply_to + }). + +-record(late_load, {table, + reason, + opt_reply_to, + loaders + }). + +-record(loader_done, {worker_pid, + is_loaded, + table_name, + needs_announce, + needs_sync, + needs_reply, + reply_to, + reply}). + +-record(sender_done, {worker_pid, + worker_res, + table_name + }). + +-record(dumper_done, {worker_pid, + worker_res + }). + +val(Var) -> + case ?catch_val(Var) of + {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason); + Value -> Value + end. + +start() -> + gen_server:start_link({local, ?SERVER_NAME}, ?MODULE, [self()], + [{timeout, infinity} + %% ,{debug, [trace]} + ]). + +sync_dump_log(InitBy) -> + call({sync_dump_log, InitBy}). + +async_dump_log(InitBy) -> + ?SERVER_NAME ! {async_dump_log, InitBy}. + +%% Wait for tables to be active +%% If needed, we will wait for Mnesia to start +%% If Mnesia stops, we will wait for Mnesia to restart +%% We will wait even if the list of tables is empty +%% +wait_for_tables(Tabs, Timeout) when list(Tabs), Timeout == infinity -> + do_wait_for_tables(Tabs, Timeout); +wait_for_tables(Tabs, Timeout) when list(Tabs), + integer(Timeout), Timeout >= 0 -> + do_wait_for_tables(Tabs, Timeout); +wait_for_tables(Tabs, Timeout) -> + {error, {badarg, Tabs, Timeout}}. + +do_wait_for_tables(Tabs, 0) -> + reply_wait(Tabs); +do_wait_for_tables(Tabs, Timeout) -> + Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]), + receive + {?SERVER_NAME, Pid, Res} -> + Res; + + {'EXIT', Pid, _} -> + reply_wait(Tabs) + + after Timeout -> + unlink(Pid), + exit(Pid, timeout), + reply_wait(Tabs) + end. + +reply_wait(Tabs) -> + case catch mnesia_lib:active_tables() of + {'EXIT', _} -> + {error, {node_not_running, node()}}; + Active when list(Active) -> + case Tabs -- Active of + [] -> + ok; + BadTabs -> + {timeout, BadTabs} + end + end. + +wait_for_tables_init(From, Tabs) -> + process_flag(trap_exit, true), + Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)), + From ! {?SERVER_NAME, self(), Res}, + unlink(From), + exit(normal). + +wait_for_init(From, Tabs, Init) -> + case catch link(Init) of + {'EXIT', _} -> + %% Mnesia is not started + {error, {node_not_running, node()}}; + true when pid(Init) -> + cast({sync_tabs, Tabs, self()}), + rec_tabs(Tabs, Tabs, From, Init) + end. + +sync_reply(Waiter, Tab) -> + Waiter ! {?SERVER_NAME, {tab_synced, Tab}}. + +rec_tabs([Tab | Tabs], AllTabs, From, Init) -> + receive + {?SERVER_NAME, {tab_synced, Tab}} -> + rec_tabs(Tabs, AllTabs, From, Init); + + {'EXIT', From, _} -> + %% This will trigger an exit signal + %% to mnesia_init + exit(wait_for_tables_timeout); + + {'EXIT', Init, _} -> + %% Oops, mnesia_init stopped, + exit(mnesia_stopped) + end; +rec_tabs([], _, _, Init) -> + unlink(Init), + ok. + +get_cstructs() -> + call(get_cstructs). + +mnesia_down(Node) -> + case cast({mnesia_down, Node}) of + {error, _} -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node); + _Pid -> ok + end. +wait_for_schema_commit_lock() -> + link(whereis(?SERVER_NAME)), + unsafe_call(wait_for_schema_commit_lock). + +block_controller() -> + call(block_controller). + +unblock_controller() -> + cast(unblock_controller). + +release_schema_commit_lock() -> + cast({release_schema_commit_lock, self()}), + unlink(whereis(?SERVER_NAME)). + +%% Special for preparation of add table copy +get_network_copy(Tab, Cs) -> + Work = #net_load{table = Tab, + reason = {dumper, add_table_copy}, + cstruct = Cs + }, + Res = (catch load_table(Work)), + if Res#loader_done.is_loaded == true -> + Tab = Res#loader_done.table_name, + case Res#loader_done.needs_announce of + true -> + i_have_tab(Tab); + false -> + ignore + end; + true -> ignore + end, + + receive %% Flush copier done message + {copier_done, _Node} -> + ok + after 500 -> %% avoid hanging if something is wrong and we shall fail. + ignore + end, + Res#loader_done.reply. + +%% This functions is invoked from the dumper +%% +%% There are two cases here: +%% startup -> +%% no need for sync, since mnesia_controller not started yet +%% schema_trans -> +%% already synced with mnesia_controller since the dumper +%% is syncronously started from mnesia_controller + +create_table(Tab) -> + {loaded, ok} = mnesia_loader:disc_load_table(Tab, {dumper,create_table}). + +get_disc_copy(Tab) -> + disc_load_table(Tab, {dumper,change_table_copy_type}, undefined). + +%% Returns ok instead of yes +force_load_table(Tab) when atom(Tab), Tab /= schema -> + case ?catch_val({Tab, storage_type}) of + ram_copies -> + do_force_load_table(Tab); + disc_copies -> + do_force_load_table(Tab); + disc_only_copies -> + do_force_load_table(Tab); + unknown -> + set({Tab, load_by_force}, true), + cast({force_load_updated, Tab}), + wait_for_tables([Tab], infinity); + {'EXIT', _} -> + {error, {no_exists, Tab}} + end; +force_load_table(Tab) -> + {error, {bad_type, Tab}}. + +do_force_load_table(Tab) -> + Loaded = ?catch_val({Tab, load_reason}), + case Loaded of + unknown -> + set({Tab, load_by_force}, true), + mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user), + wait_for_tables([Tab], infinity); + {'EXIT', _} -> + set({Tab, load_by_force}, true), + mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user), + wait_for_tables([Tab], infinity); + _ -> + ok + end. +master_nodes_updated(schema, _Masters) -> + ignore; +master_nodes_updated(Tab, Masters) -> + cast({master_nodes_updated, Tab, Masters}). + +schedule_late_disc_load(Tabs, Reason) -> + MsgTag = late_disc_load, + try_schedule_late_disc_load(Tabs, Reason, MsgTag). + +try_schedule_late_disc_load(Tabs, _Reason, MsgTag) + when Tabs == [], MsgTag /= schema_is_merged -> + ignore; +try_schedule_late_disc_load(Tabs, Reason, MsgTag) -> + GetIntents = + fun() -> + Item = mnesia_late_disc_load, + Nodes = val({current, db_nodes}), + mnesia:lock({global, Item, Nodes}, write), + case multicall(Nodes -- [node()], disc_load_intents) of + {Replies, []} -> + call({MsgTag, Tabs, Reason, Replies}), + done; + {_, BadNodes} -> + %% Some nodes did not respond, lets try again + {retry, BadNodes} + end + end, + case mnesia:transaction(GetIntents) of + {'atomic', done} -> + done; + {'atomic', {retry, BadNodes}} -> + verbose("Retry late_load_tables because bad nodes: ~p~n", + [BadNodes]), + try_schedule_late_disc_load(Tabs, Reason, MsgTag); + {aborted, AbortReason} -> + fatal("Cannot late_load_tables~p: ~p~n", + [[Tabs, Reason, MsgTag], AbortReason]) + end. + +connect_nodes(Ns) -> + case mnesia:system_info(is_running) of + no -> + {error, {node_not_running, node()}}; + yes -> + {NewC, OldC} = mnesia_recover:connect_nodes(Ns), + Connected = NewC ++OldC, + New1 = mnesia_lib:intersect(Ns, Connected), + New = New1 -- val({current, db_nodes}), + + case try_merge_schema(New) of + ok -> + mnesia_lib:add_list(extra_db_nodes, New), + {ok, New}; + {aborted, {throw, Str}} when list(Str) -> + %%mnesia_recover:disconnect_nodes(New), + {error, {merge_schema_failed, lists:flatten(Str)}}; + Else -> + %% Unconnect nodes where merge failed!! + %% mnesia_recover:disconnect_nodes(New), + {error, Else} + end + end. + +%% Merge the local schema with the schema on other nodes. +%% But first we must let all processes that want to force +%% load tables wait until the schema merge is done. + +merge_schema() -> + AllNodes = mnesia_lib:all_nodes(), + case try_merge_schema(AllNodes) of + ok -> + schema_is_merged(); + {aborted, {throw, Str}} when list(Str) -> + fatal("Failed to merge schema: ~s~n", [Str]); + Else -> + fatal("Failed to merge schema: ~p~n", [Else]) + end. + +try_merge_schema(Nodes) -> + case mnesia_schema:merge_schema() of + {'atomic', not_merged} -> + %% No more nodes that we need to merge the schema with + ok; + {'atomic', {merged, OldFriends, NewFriends}} -> + %% Check if new nodes has been added to the schema + Diff = mnesia_lib:all_nodes() -- [node() | Nodes], + mnesia_recover:connect_nodes(Diff), + + %% Tell everybody to adopt orphan tables + im_running(OldFriends, NewFriends), + im_running(NewFriends, OldFriends), + + try_merge_schema(Nodes); + {'atomic', {"Cannot get cstructs", Node, Reason}} -> + dbg_out("Cannot get cstructs, Node ~p ~p~n", [Node, Reason]), + timer:sleep(1000), % Avoid a endless loop look alike + try_merge_schema(Nodes); + Other -> + Other + end. + +im_running(OldFriends, NewFriends) -> + abcast(OldFriends, {im_running, node(), NewFriends}). + +schema_is_merged() -> + MsgTag = schema_is_merged, + SafeLoads = initial_safe_loads(), + + %% At this point we do not know anything about + %% which tables that the other nodes already + %% has loaded and therefore we let the normal + %% processing of the loader_queue take care + %% of it, since we at that time point will + %% know the whereabouts. We rely on the fact + %% that all nodes tells each other directly + %% when they have loaded a table and are + %% willing to share it. + + try_schedule_late_disc_load(SafeLoads, initial, MsgTag). + + +cast(Msg) -> + case whereis(?SERVER_NAME) of + undefined ->{error, {node_not_running, node()}}; + Pid -> gen_server:cast(Pid, Msg) + end. + +abcast(Nodes, Msg) -> + gen_server:abcast(Nodes, ?SERVER_NAME, Msg). + +unsafe_call(Msg) -> + case whereis(?SERVER_NAME) of + undefined -> {error, {node_not_running, node()}}; + Pid -> gen_server:call(Pid, Msg, infinity) + end. + +call(Msg) -> + case whereis(?SERVER_NAME) of + undefined -> + {error, {node_not_running, node()}}; + Pid -> + link(Pid), + Res = gen_server:call(Pid, Msg, infinity), + unlink(Pid), + + %% We get an exit signal if server dies + receive + {'EXIT', Pid, _Reason} -> + {error, {node_not_running, node()}} + after 0 -> + ignore + end, + Res + end. + +remote_call(Node, Func, Args) -> + case catch gen_server:call({?MODULE, Node}, {Func, Args, self()}, infinity) of + {'EXIT', Error} -> + {error, Error}; + Else -> + Else + end. + +multicall(Nodes, Msg) -> + {Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity), + PatchedGood = [Reply || {_Node, Reply} <- Good], + {PatchedGood, Bad}. %% Make the replies look like rpc:multicalls.. +%% rpc:multicall(Nodes, ?MODULE, call, [Msg]). + +%%%---------------------------------------------------------------------- +%%% Callback functions from gen_server +%%%---------------------------------------------------------------------- + +%%---------------------------------------------------------------------- +%% Func: init/1 +%% Returns: {ok, State} | +%% {ok, State, Timeout} | +%% {stop, Reason} +%%---------------------------------------------------------------------- +init([Parent]) -> + process_flag(trap_exit, true), + mnesia_lib:verbose("~p starting: ~p~n", [?SERVER_NAME, self()]), + + %% Handshake and initialize transaction recovery + %% for new nodes detected in the schema + All = mnesia_lib:all_nodes(), + Diff = All -- [node() | val(original_nodes)], + mnesia_lib:unset(original_nodes), + mnesia_recover:connect_nodes(Diff), + + Interval = mnesia_monitor:get_env(dump_log_time_threshold), + Msg = {async_dump_log, time_threshold}, + {ok, Ref} = timer:send_interval(Interval, Msg), + mnesia_dumper:start_regulator(), + + {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref}}. + +%%---------------------------------------------------------------------- +%% Func: handle_call/3 +%% Returns: {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | (terminate/2 is called) +%% {stop, Reason, Reply, State} (terminate/2 is called) +%%---------------------------------------------------------------------- + +handle_call({sync_dump_log, InitBy}, From, State) -> + Worker = #dump_log{initiated_by = InitBy, + opt_reply_to = From + }, + State2 = add_worker(Worker, State), + noreply(State2); + +handle_call(wait_for_schema_commit_lock, From, State) -> + Worker = #schema_commit_lock{owner = From}, + State2 = add_worker(Worker, State), + noreply(State2); + +handle_call(block_controller, From, State) -> + Worker = #block_controller{owner = From}, + State2 = add_worker(Worker, State), + noreply(State2); + + +handle_call(get_cstructs, From, State) -> + Tabs = val({schema, tables}), + Cstructs = [val({T, cstruct}) || T <- Tabs], + Running = val({current, db_nodes}), + reply(From, {cstructs, Cstructs, Running}), + noreply(State); + +handle_call({schema_is_merged, TabsR, Reason, RemoteLoaders}, From, State) -> + State2 = late_disc_load(TabsR, Reason, RemoteLoaders, From, State), + + %% Handle early messages + Msgs = State2#state.early_msgs, + State3 = State2#state{early_msgs = [], schema_is_merged = true}, + Ns = val({current, db_nodes}), + dbg_out("Schema is merged ~w, State ~w~n", [Ns, State3]), +%% dbg_out("handle_early_msgs ~p ~n", [Msgs]), % qqqq + handle_early_msgs(lists:reverse(Msgs), State3); + +handle_call(disc_load_intents, From, State) -> + Tabs = disc_load_intents(State#state.loader_queue) ++ + disc_load_intents(State#state.late_loader_queue), + ActiveTabs = mnesia_lib:local_active_tables(), + reply(From, {ok, node(), mnesia_lib:union(Tabs, ActiveTabs)}), + noreply(State); + +handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) -> +%%% dbg_out("update_w2w ~p", [[add, Tab, AddNode]]), %%% qqqq + Current = val({current, db_nodes}), + Res = + case lists:member(AddNode, Current) and + State#state.schema_is_merged == true of + true -> + mnesia_lib:add({Tab, where_to_write}, AddNode); + false -> + ignore + end, + {reply, Res, State}; + +handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From}, + ReplyTo, State) -> + KnownNode = lists:member(ToNode, val({current, db_nodes})), + Merged = State#state.schema_is_merged, + if + KnownNode == false -> + reply(ReplyTo, ignore), + noreply(State); + Merged == true -> + Res = add_active_replica(Tab, ToNode, RemoteS, AccessMode), + reply(ReplyTo, Res), + noreply(State); + true -> %% Schema is not merged + Msg = {add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From}, + Msgs = State#state.early_msgs, + reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge + noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]}) + end; + +handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) -> + KnownNode = lists:member(node(From), val({current, db_nodes})), + Merged = State#state.schema_is_merged, + if + KnownNode == false -> + reply(ReplyTo, ignore), + noreply(State); + Merged == true -> + Res = unannounce_add_table_copy(Tab, Node), + reply(ReplyTo, Res), + noreply(State); + true -> %% Schema is not merged + Msg = {unannounce_add_table_copy, [Tab, Node], From}, + Msgs = State#state.early_msgs, + reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge + %% Set ReplyTO to undefined so we don't reply twice + noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]}) + end; + +handle_call(Msg, From, State) when State#state.schema_is_merged == false -> + %% Buffer early messages +%% dbg_out("Buffered early msg ~p ~n", [Msg]), %% qqqq + Msgs = State#state.early_msgs, + noreply(State#state{early_msgs = [{call, Msg, From} | Msgs]}); + +handle_call({net_load, Tab, Cs}, From, State) -> + Worker = #net_load{table = Tab, + opt_reply_to = From, + reason = add_table_copy, + cstruct = Cs + }, + State2 = add_worker(Worker, State), + noreply(State2); + +handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) -> + State2 = late_disc_load(Tabs, Reason, RemoteLoaders, From, State), + noreply(State2); + +handle_call({block_table, [Tab], From}, _Dummy, State) -> + case lists:member(node(From), val({current, db_nodes})) of + true -> + block_table(Tab); + false -> + ignore + end, + {reply, ok, State}; + +handle_call({check_w2r, _Node, Tab}, _From, State) -> + {reply, val({Tab, where_to_read}), State}; + +handle_call(Msg, _From, State) -> + error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]), + noreply(State). + +disc_load_intents([H | T]) when record(H, disc_load) -> + [H#disc_load.table | disc_load_intents(T)]; +disc_load_intents([H | T]) when record(H, late_load) -> + [H#late_load.table | disc_load_intents(T)]; +disc_load_intents( [H | T]) when record(H, net_load) -> + disc_load_intents(T); +disc_load_intents([]) -> + []. + +late_disc_load(TabsR, Reason, RemoteLoaders, From, State) -> + verbose("Intend to load tables: ~p~n", [TabsR]), + ?eval_debug_fun({?MODULE, late_disc_load}, + [{tabs, TabsR}, + {reason, Reason}, + {loaders, RemoteLoaders}]), + + reply(From, queued), + %% RemoteLoaders is a list of {ok, Node, Tabs} tuples + + %% Remove deleted tabs + LocalTabs = mnesia_lib:val({schema, local_tables}), + Filter = fun({Tab, Reas}, Acc) -> + case lists:member(Tab, LocalTabs) of + true -> [{Tab, Reas} | Acc]; + false -> Acc + end; + (Tab, Acc) -> + case lists:member(Tab, LocalTabs) of + true -> [Tab | Acc]; + false -> Acc + end + end, + + Tabs = lists:foldl(Filter, [], TabsR), + + Nodes = val({current, db_nodes}), + LateLoaders = late_loaders(Tabs, Reason, RemoteLoaders, Nodes), + LateQueue = State#state.late_loader_queue ++ LateLoaders, + State#state{late_loader_queue = LateQueue}. + +late_loaders([{Tab, Reason} | Tabs], DefaultReason, RemoteLoaders, Nodes) -> + LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []), + case LoadNodes of + [] -> + cast({disc_load, Tab, Reason}); % Ugly cast + _ -> + ignore + end, + LateLoad = #late_load{table = Tab, loaders = LoadNodes, reason = Reason}, + [LateLoad | late_loaders(Tabs, DefaultReason, RemoteLoaders, Nodes)]; + +late_loaders([Tab | Tabs], Reason, RemoteLoaders, Nodes) -> + Loaders = late_load_filter(RemoteLoaders, Tab, Nodes, []), + case Loaders of + [] -> + cast({disc_load, Tab, Reason}); % Ugly cast + _ -> + ignore + end, + LateLoad = #late_load{table = Tab, loaders = Loaders, reason = Reason}, + [LateLoad | late_loaders(Tabs, Reason, RemoteLoaders, Nodes)]; +late_loaders([], _Reason, _RemoteLoaders, _Nodes) -> + []. + +late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) -> + late_load_filter(RemoteLoaders, Tab, Nodes, Acc); +late_load_filter([{badrpc, _} | RemoteLoaders], Tab, Nodes, Acc) -> + late_load_filter(RemoteLoaders, Tab, Nodes, Acc); +late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) -> + {ok, Node, Intents} = RL, + Access = val({Tab, access_mode}), + LocalC = val({Tab, local_content}), + StillActive = lists:member(Node, Nodes), + RemoteIntent = lists:member(Tab, Intents), + if + Access == read_write, + LocalC == false, + StillActive == true, + RemoteIntent == true -> + Masters = mnesia_recover:get_master_nodes(Tab), + case lists:member(Node, Masters) of + true -> + %% The other node is master node for + %% the table, accept his load intent + late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]); + false when Masters == [] -> + %% The table has no master nodes + %% accept his load intent + late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]); + false -> + %% Some one else is master node for + %% the table, ignore his load intent + late_load_filter(RemoteLoaders, Tab, Nodes, Acc) + end; + true -> + late_load_filter(RemoteLoaders, Tab, Nodes, Acc) + end; +late_load_filter([], _Tab, _Nodes, Acc) -> + Acc. + +%%---------------------------------------------------------------------- +%% Func: handle_cast/2 +%% Returns: {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} (terminate/2 is called) +%%---------------------------------------------------------------------- + +handle_cast({release_schema_commit_lock, _Owner}, State) -> + if + State#state.is_stopping == true -> + {stop, shutdown, State}; + true -> + case State#state.dumper_queue of + [#schema_commit_lock{}|Rest] -> + [_Worker | Rest] = State#state.dumper_queue, + State2 = State#state{dumper_pid = undefined, + dumper_queue = Rest}, + State3 = opt_start_worker(State2), + noreply(State3); + _ -> + noreply(State) + end + end; + +handle_cast(unblock_controller, State) -> + if + State#state.is_stopping == true -> + {stop, shutdown, State}; + record(hd(State#state.dumper_queue), block_controller) -> + [_Worker | Rest] = State#state.dumper_queue, + State2 = State#state{dumper_pid = undefined, + dumper_queue = Rest}, + State3 = opt_start_worker(State2), + noreply(State3) + end; + +handle_cast({mnesia_down, Node}, State) -> + maybe_log_mnesia_down(Node), + mnesia_lib:del({current, db_nodes}, Node), + mnesia_checkpoint:tm_mnesia_down(Node), + Alltabs = val({schema, tables}), + State2 = reconfigure_tables(Node, State, Alltabs), + case State#state.sender_pid of + undefined -> ignore; + Pid when pid(Pid) -> Pid ! {copier_done, Node} + end, + case State#state.loader_pid of + undefined -> ignore; + Pid2 when pid(Pid2) -> Pid2 ! {copier_done, Node} + end, + NewSenders = + case State#state.sender_queue of + [OldSender | RestSenders] -> + Remove = fun(ST) -> + node(ST#send_table.receiver_pid) /= Node + end, + NewS = lists:filter(Remove, RestSenders), + %% Keep old sender it will be removed by sender_done + [OldSender | NewS]; + [] -> + [] + end, + Early = remove_early_messages(State2#state.early_msgs, Node), + mnesia_monitor:mnesia_down(?SERVER_NAME, Node), + noreply(State2#state{sender_queue = NewSenders, early_msgs = Early}); + +handle_cast({im_running, _Node, NewFriends}, State) -> + Tabs = mnesia_lib:local_active_tables() -- [schema], + Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})), + abcast(Ns, {adopt_orphans, node(), Tabs}), + noreply(State); + +handle_cast(Msg, State) when State#state.schema_is_merged == false -> + %% Buffer early messages + Msgs = State#state.early_msgs, + noreply(State#state{early_msgs = [{cast, Msg} | Msgs]}); + +handle_cast({disc_load, Tab, Reason}, State) -> + Worker = #disc_load{table = Tab, reason = Reason}, + State2 = add_worker(Worker, State), + noreply(State2); + +handle_cast(Worker, State) when record(Worker, send_table) -> + State2 = add_worker(Worker, State), + noreply(State2); + +handle_cast({sync_tabs, Tabs, From}, State) -> + %% user initiated wait_for_tables + handle_sync_tabs(Tabs, From), + noreply(State); + +handle_cast({i_have_tab, Tab, Node}, State) -> + case lists:member(Node, val({current, db_nodes})) of + true -> + State2 = node_has_tabs([Tab], Node, State), + noreply(State2); + false -> + noreply(State) + end; + +handle_cast({force_load_updated, Tab}, State) -> + case val({Tab, active_replicas}) of + [] -> + %% No valid replicas + noreply(State); + [SomeNode | _] -> + State2 = node_has_tabs([Tab], SomeNode, State), + noreply(State2) + end; + +handle_cast({master_nodes_updated, Tab, Masters}, State) -> + Active = val({Tab, active_replicas}), + Valid = + case val({Tab, load_by_force}) of + true -> + Active; + false -> + if + Masters == [] -> + Active; + true -> + mnesia_lib:intersect(Masters, Active) + end + end, + case Valid of + [] -> + %% No valid replicas + noreply(State); + [SomeNode | _] -> + State2 = node_has_tabs([Tab], SomeNode, State), + noreply(State2) + end; + +handle_cast({adopt_orphans, Node, Tabs}, State) -> + + State2 = node_has_tabs(Tabs, Node, State), + + %% Register the other node as up and running + mnesia_recover:log_mnesia_up(Node), + verbose("Logging mnesia_up ~w~n", [Node]), + mnesia_lib:report_system_event({mnesia_up, Node}), + + %% Load orphan tables + LocalTabs = val({schema, local_tables}) -- [schema], + Nodes = val({current, db_nodes}), + {LocalOrphans, RemoteMasters} = + orphan_tables(LocalTabs, Node, Nodes, [], []), + Reason = {adopt_orphan, node()}, + mnesia_late_loader:async_late_disc_load(node(), LocalOrphans, Reason), + + Fun = + fun(N) -> + RemoteOrphans = + [Tab || {Tab, Ns} <- RemoteMasters, + lists:member(N, Ns)], + mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason) + end, + lists:foreach(Fun, Nodes), + + Queue = State2#state.loader_queue, + State3 = State2#state{loader_queue = Queue}, + noreply(State3); + +handle_cast(Msg, State) -> + error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]), + noreply(State). + +handle_sync_tabs([Tab | Tabs], From) -> + case val({Tab, where_to_read}) of + nowhere -> + case get({sync_tab, Tab}) of + undefined -> + put({sync_tab, Tab}, [From]); + Pids -> + put({sync_tab, Tab}, [From | Pids]) + end; + _ -> + sync_reply(From, Tab) + end, + handle_sync_tabs(Tabs, From); +handle_sync_tabs([], _From) -> + ok. + +%%---------------------------------------------------------------------- +%% Func: handle_info/2 +%% Returns: {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} (terminate/2 is called) +%%---------------------------------------------------------------------- + +handle_info({async_dump_log, InitBy}, State) -> + Worker = #dump_log{initiated_by = InitBy}, + State2 = add_worker(Worker, State), + noreply(State2); + +handle_info(Done, State) when record(Done, dumper_done) -> + Pid = Done#dumper_done.worker_pid, + Res = Done#dumper_done.worker_res, + if + State#state.is_stopping == true -> + {stop, shutdown, State}; + Res == dumped, Pid == State#state.dumper_pid -> + [Worker | Rest] = State#state.dumper_queue, + reply(Worker#dump_log.opt_reply_to, Res), + State2 = State#state{dumper_pid = undefined, + dumper_queue = Rest}, + State3 = opt_start_worker(State2), + noreply(State3); + true -> + fatal("Dumper failed: ~p~n state: ~p~n", [Res, State]), + {stop, fatal, State} + end; + +handle_info(Done, State) when record(Done, loader_done) -> + if + %% Assertion + Done#loader_done.worker_pid == State#state.loader_pid -> ok + end, + + [_Worker | Rest] = LoadQ0 = State#state.loader_queue, + LateQueue0 = State#state.late_loader_queue, + {LoadQ, LateQueue} = + case Done#loader_done.is_loaded of + true -> + Tab = Done#loader_done.table_name, + + %% Optional user sync + case Done#loader_done.needs_sync of + true -> user_sync_tab(Tab); + false -> ignore + end, + + %% Optional table announcement + case Done#loader_done.needs_announce of + true -> + i_have_tab(Tab), + case Tab of + schema -> + ignore; + _ -> + %% Local node needs to perform user_sync_tab/1 + Ns = val({current, db_nodes}), + abcast(Ns, {i_have_tab, Tab, node()}) + end; + false -> + case Tab of + schema -> + ignore; + _ -> + %% Local node needs to perform user_sync_tab/1 + Ns = val({current, db_nodes}), + AlreadyKnows = val({Tab, active_replicas}), + abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()}) + end + end, + + %% Optional client reply + case Done#loader_done.needs_reply of + true -> + reply(Done#loader_done.reply_to, + Done#loader_done.reply); + false -> + ignore + end, + {Rest, reply_late_load(Tab, LateQueue0)}; + false -> + case Done#loader_done.reply of + restart -> + {LoadQ0, LateQueue0}; + _ -> + {Rest, LateQueue0} + end + end, + + State2 = State#state{loader_pid = undefined, + loader_queue = LoadQ, + late_loader_queue = LateQueue}, + + State3 = opt_start_worker(State2), + noreply(State3); + +handle_info(Done, State) when record(Done, sender_done) -> + Pid = Done#sender_done.worker_pid, + Res = Done#sender_done.worker_res, + if + Res == ok, Pid == State#state.sender_pid -> + [Worker | Rest] = State#state.sender_queue, + Worker#send_table.receiver_pid ! {copier_done, node()}, + State2 = State#state{sender_pid = undefined, + sender_queue = Rest}, + State3 = opt_start_worker(State2), + noreply(State3); + true -> + %% No need to send any message to the table receiver + %% since it will soon get a mnesia_down anyway + fatal("Sender failed: ~p~n state: ~p~n", [Res, State]), + {stop, fatal, State} + end; + +handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor -> + catch set(mnesia_status, stopping), + case State#state.dumper_pid of + undefined -> + dbg_out("~p was ~p~n", [?SERVER_NAME, R]), + {stop, shutdown, State}; + _ -> + noreply(State#state{is_stopping = true}) + end; + +handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid -> + case State#state.dumper_queue of + [#schema_commit_lock{}|Workers] -> %% Schema trans crashed or was killed + State2 = State#state{dumper_queue = Workers, dumper_pid = undefined}, + State3 = opt_start_worker(State2), + noreply(State3); + _Other -> + fatal("Dumper or schema commit crashed: ~p~n state: ~p~n", [R, State]), + {stop, fatal, State} + end; + +handle_info({'EXIT', Pid, R}, State) when Pid == State#state.loader_pid -> + fatal("Loader crashed: ~p~n state: ~p~n", [R, State]), + {stop, fatal, State}; + +handle_info({'EXIT', Pid, R}, State) when Pid == State#state.sender_pid -> + %% No need to send any message to the table receiver + %% since it will soon get a mnesia_down anyway + fatal("Sender crashed: ~p~n state: ~p~n", [R, State]), + {stop, fatal, State}; + +handle_info({From, get_state}, State) -> + From ! {?SERVER_NAME, State}, + noreply(State); + +%% No real need for buffering +handle_info(Msg, State) when State#state.schema_is_merged == false -> + %% Buffer early messages + Msgs = State#state.early_msgs, + noreply(State#state{early_msgs = [{info, Msg} | Msgs]}); + +handle_info({'EXIT', Pid, wait_for_tables_timeout}, State) -> + sync_tab_timeout(Pid, get()), + noreply(State); + +handle_info(Msg, State) -> + error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]), + noreply(State). + +reply_late_load(Tab, [H | T]) when H#late_load.table == Tab -> + reply(H#late_load.opt_reply_to, ok), + reply_late_load(Tab, T); +reply_late_load(Tab, [H | T]) -> + [H | reply_late_load(Tab, T)]; +reply_late_load(_Tab, []) -> + []. + +sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) -> + case lists:delete(Pid, Pids) of + [] -> + erase({sync_tab, Tab}); + Pids2 -> + put({sync_tab, Tab}, Pids2) + end, + sync_tab_timeout(Pid, Tail); +sync_tab_timeout(Pid, [_ | Tail]) -> + sync_tab_timeout(Pid, Tail); +sync_tab_timeout(_Pid, []) -> + ok. + +%% Pick the load record that has the highest load order +%% Returns {BestLoad, RemainingQueue} or {none, []} if queue is empty +pick_next(Queue) -> + pick_next(Queue, none, none, []). + +pick_next([Head | Tail], Load, Order, Rest) when record(Head, net_load) -> + Tab = Head#net_load.table, + select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest); +pick_next([Head | Tail], Load, Order, Rest) when record(Head, disc_load) -> + Tab = Head#disc_load.table, + select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest); +pick_next([], Load, _Order, Rest) -> + {Load, Rest}. + +select_best(Load, Tail, Order, none, none, Rest) -> + pick_next(Tail, Load, Order, Rest); +select_best(Load, Tail, Order, OldLoad, OldOrder, Rest) when Order > OldOrder -> + pick_next(Tail, Load, Order, [OldLoad | Rest]); +select_best(Load, Tail, _Order, OldLoad, OldOrder, Rest) -> + pick_next(Tail, OldLoad, OldOrder, [Load | Rest]). + +%%---------------------------------------------------------------------- +%% Func: terminate/2 +%% Purpose: Shutdown the server +%% Returns: any (ignored by gen_server) +%%---------------------------------------------------------------------- +terminate(Reason, State) -> + mnesia_monitor:terminate_proc(?SERVER_NAME, Reason, State). + +%%---------------------------------------------------------------------- +%% Func: code_change/3 +%% Purpose: Upgrade process when its code is to be changed +%% Returns: {ok, NewState} +%%---------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%---------------------------------------------------------------------- +%%% Internal functions +%%%---------------------------------------------------------------------- + +maybe_log_mnesia_down(N) -> + %% We use mnesia_down when deciding which tables to load locally, + %% so if we are not running (i.e haven't decided which tables + %% to load locally), don't log mnesia_down yet. + case mnesia_lib:is_running() of + yes -> + verbose("Logging mnesia_down ~w~n", [N]), + mnesia_recover:log_mnesia_down(N), + ok; + _ -> + Filter = fun(Tab) -> + inactive_copy_holders(Tab, N) + end, + HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]), + if + HalfLoadedTabs == true -> + verbose("Logging mnesia_down ~w~n", [N]), + mnesia_recover:log_mnesia_down(N), + ok; + true -> + %% Unfortunately we have not loaded some common + %% tables yet, so we cannot rely on the nodedown + log_later %% BUGBUG handle this case!!! + end + end. + +inactive_copy_holders(Tab, Node) -> + Cs = val({Tab, cstruct}), + case mnesia_lib:cs_to_storage_type(Node, Cs) of + unknown -> + false; + _Storage -> + mnesia_lib:not_active_here(Tab) + end. + +orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) -> + Cs = val({Tab, cstruct}), + CopyHolders = mnesia_lib:copy_holders(Cs), + RamCopyHolders = Cs#cstruct.ram_copies, + DiscCopyHolders = CopyHolders -- RamCopyHolders, + DiscNodes = val({schema, disc_copies}), + LocalContent = Cs#cstruct.local_content, + RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes), + Active = val({Tab, active_replicas}), + case lists:member(Node, DiscCopyHolders) of + true when Active == [] -> + case DiscCopyHolders -- Ns of + [] -> + %% We're last up and the other nodes have not + %% loaded the table. Lets load it if we are + %% the smallest node. + case lists:min(DiscCopyHolders) of + Min when Min == node() -> + case mnesia_recover:get_master_nodes(Tab) of + [] -> + L = [Tab | Local], + orphan_tables(Tabs, Node, Ns, L, Remote); + Masters -> + R = [{Tab, Masters} | Remote], + orphan_tables(Tabs, Node, Ns, Local, R) + end; + _ -> + orphan_tables(Tabs, Node, Ns, Local, Remote) + end; + _ -> + orphan_tables(Tabs, Node, Ns, Local, Remote) + end; + false when Active == [], DiscCopyHolders == [], RamCopyHoldersOnDiscNodes == [] -> + %% Special case when all replicas resides on disc less nodes + orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote); + _ when LocalContent == true -> + orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote); + _ -> + orphan_tables(Tabs, Node, Ns, Local, Remote) + end; +orphan_tables([], _, _, LocalOrphans, RemoteMasters) -> + {LocalOrphans, RemoteMasters}. + +node_has_tabs([Tab | Tabs], Node, State) when Node /= node() -> + State2 = update_whereabouts(Tab, Node, State), + node_has_tabs(Tabs, Node, State2); +node_has_tabs([Tab | Tabs], Node, State) -> + user_sync_tab(Tab), + node_has_tabs(Tabs, Node, State); +node_has_tabs([], _Node, State) -> + State. + +update_whereabouts(Tab, Node, State) -> + Storage = val({Tab, storage_type}), + Read = val({Tab, where_to_read}), + LocalC = val({Tab, local_content}), + BeingCreated = (?catch_val({Tab, create_table}) == true), + Masters = mnesia_recover:get_master_nodes(Tab), + ByForce = val({Tab, load_by_force}), + GoGetIt = + if + ByForce == true -> + true; + Masters == [] -> + true; + true -> + lists:member(Node, Masters) + end, + + dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n", + [Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]), + if + LocalC == true -> + %% Local contents, don't care about other node + State; + Storage == unknown, Read == nowhere -> + %% No own copy, time to read remotely + %% if the other node is a good node + add_active_replica(Tab, Node), + case GoGetIt of + true -> + set({Tab, where_to_read}, Node), + user_sync_tab(Tab), + State; + false -> + State + end; + Storage == unknown -> + %% No own copy, continue to read remotely + add_active_replica(Tab, Node), + NodeST = mnesia_lib:storage_type_at_node(Node, Tab), + ReadST = mnesia_lib:storage_type_at_node(Read, Tab), + if %% Avoid reading from disc_only_copies + NodeST == disc_only_copies -> + ignore; + ReadST == disc_only_copies -> + mnesia_lib:set_remote_where_to_read(Tab); + true -> + ignore + end, + user_sync_tab(Tab), + State; + BeingCreated == true -> + %% The table is currently being created + %% and we shall have an own copy of it. + %% We will load the (empty) table locally. + add_active_replica(Tab, Node), + State; + Read == nowhere -> + %% Own copy, go and get a copy of the table + %% if the other node is master or if there + %% are no master at all + add_active_replica(Tab, Node), + case GoGetIt of + true -> + Worker = #net_load{table = Tab, + reason = {active_remote, Node}}, + add_worker(Worker, State); + false -> + State + end; + true -> + %% We already have an own copy + add_active_replica(Tab, Node), + user_sync_tab(Tab), + State + end. + +initial_safe_loads() -> + case val({schema, storage_type}) of + ram_copies -> + Downs = [], + Tabs = val({schema, local_tables}) -- [schema], + LastC = fun(T) -> last_consistent_replica(T, Downs) end, + lists:zf(LastC, Tabs); + + disc_copies -> + Downs = mnesia_recover:get_mnesia_downs(), + dbg_out("mnesia_downs = ~p~n", [Downs]), + + Tabs = val({schema, local_tables}) -- [schema], + LastC = fun(T) -> last_consistent_replica(T, Downs) end, + lists:zf(LastC, Tabs) + end. + +last_consistent_replica(Tab, Downs) -> + Cs = val({Tab, cstruct}), + Storage = mnesia_lib:cs_to_storage_type(node(), Cs), + Ram = Cs#cstruct.ram_copies, + Disc = Cs#cstruct.disc_copies, + DiscOnly = Cs#cstruct.disc_only_copies, + BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs, + BetterCopies = BetterCopies0 -- Ram, + AccessMode = Cs#cstruct.access_mode, + Copies = mnesia_lib:copy_holders(Cs), + Masters = mnesia_recover:get_master_nodes(Tab), + LocalMaster0 = lists:member(node(), Masters), + LocalContent = Cs#cstruct.local_content, + RemoteMaster = + if + Masters == [] -> false; + true -> not LocalMaster0 + end, + LocalMaster = + if + Masters == [] -> false; + true -> LocalMaster0 + end, + if + Copies == [node()] -> + %% Only one copy holder and it is local. + %% It may also be a local contents table + {true, {Tab, local_only}}; + LocalContent == true -> + {true, {Tab, local_content}}; + LocalMaster == true -> + %% We have a local master + {true, {Tab, local_master}}; + RemoteMaster == true -> + %% Wait for remote master copy + false; + Storage == ram_copies -> + if + Disc == [], DiscOnly == [] -> + %% Nobody has copy on disc + {true, {Tab, ram_only}}; + true -> + %% Some other node has copy on disc + false + end; + AccessMode == read_only -> + %% No one has been able to update the table, + %% i.e. all disc resident copies are equal + {true, {Tab, read_only}}; + BetterCopies /= [], Masters /= [node()] -> + %% There are better copies on other nodes + %% and we do not have the only master copy + false; + true -> + {true, {Tab, initial}} + end. + +reconfigure_tables(N, State, [Tab |Tail]) -> + del_active_replica(Tab, N), + case val({Tab, where_to_read}) of + N -> mnesia_lib:set_remote_where_to_read(Tab); + _ -> ignore + end, + LateQ = drop_loaders(Tab, N, State#state.late_loader_queue), + reconfigure_tables(N, State#state{late_loader_queue = LateQ}, Tail); + +reconfigure_tables(_, State, []) -> + State. + +remove_early_messages([], _Node) -> + []; +remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) -> + remove_early_messages(R, Node); %% Does a reply before queuing +remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node) + when node(From) == Node -> + reply(ReplyTo, ok), %% Remove gen:server waits.. + remove_early_messages(R, Node); +remove_early_messages([{cast, {i_have_tab, _Tab, Node}}|R], Node) -> + remove_early_messages(R, Node); +remove_early_messages([{cast, {adopt_orphans, Node, _Tabs}}|R], Node) -> + remove_early_messages(R, Node); +remove_early_messages([M|R],Node) -> + [M|remove_early_messages(R,Node)]. + +%% Drop loader from late load queue and possibly trigger a disc_load +drop_loaders(Tab, Node, [H | T]) when H#late_load.table == Tab -> + %% Check if it is time to issue a disc_load request + case H#late_load.loaders of + [Node] -> + Reason = {H#late_load.reason, last_loader_down, Node}, + cast({disc_load, Tab, Reason}); % Ugly cast + _ -> + ignore + end, + %% Drop the node from the list of loaders + H2 = H#late_load{loaders = H#late_load.loaders -- [Node]}, + [H2 | drop_loaders(Tab, Node, T)]; +drop_loaders(Tab, Node, [H | T]) -> + [H | drop_loaders(Tab, Node, T)]; +drop_loaders(_, _, []) -> + []. + +add_active_replica(Tab, Node) -> + add_active_replica(Tab, Node, val({Tab, cstruct})). + +add_active_replica(Tab, Node, Cs) when record(Cs, cstruct) -> + Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs), + AccessMode = Cs#cstruct.access_mode, + add_active_replica(Tab, Node, Storage, AccessMode). + +%% Block table primitives + +block_table(Tab) -> + Var = {Tab, where_to_commit}, + Old = val(Var), + New = {blocked, Old}, + set(Var, New). % where_to_commit + +unblock_table(Tab) -> + Var = {Tab, where_to_commit}, + New = + case val(Var) of + {blocked, List} -> + List; + List -> + List + end, + set(Var, New). % where_to_commit + +is_tab_blocked(W2C) when list(W2C) -> + {false, W2C}; +is_tab_blocked({blocked, W2C}) when list(W2C) -> + {true, W2C}. + +mark_blocked_tab(true, Value) -> + {blocked, Value}; +mark_blocked_tab(false, Value) -> + Value. + +%% + +add_active_replica(Tab, Node, Storage, AccessMode) -> + Var = {Tab, where_to_commit}, + {Blocked, Old} = is_tab_blocked(val(Var)), + Del = lists:keydelete(Node, 1, Old), + case AccessMode of + read_write -> + New = lists:sort([{Node, Storage} | Del]), + set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit + add({Tab, where_to_write}, Node); + read_only -> + set(Var, mark_blocked_tab(Blocked, Del)), + mnesia_lib:del({Tab, where_to_write}, Node) + end, + add({Tab, active_replicas}, Node). + +del_active_replica(Tab, Node) -> + Var = {Tab, where_to_commit}, + {Blocked, Old} = is_tab_blocked(val(Var)), + Del = lists:keydelete(Node, 1, Old), + New = lists:sort(Del), + set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit + mnesia_lib:del({Tab, active_replicas}, Node), + mnesia_lib:del({Tab, where_to_write}, Node). + +change_table_access_mode(Cs) -> + Tab = Cs#cstruct.name, + lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end, + val({Tab, active_replicas})). + +%% node To now has tab loaded, but this must be undone +%% This code is rpc:call'ed from the tab_copier process +%% when it has *not* released it's table lock +unannounce_add_table_copy(Tab, To) -> + del_active_replica(Tab, To), + case val({Tab , where_to_read}) of + To -> + mnesia_lib:set_remote_where_to_read(Tab); + _ -> + ignore + end. + +user_sync_tab(Tab) -> + case val(debug) of + trace -> + mnesia_subscr:subscribe(whereis(mnesia_event), {table, Tab}); + _ -> + ignore + end, + + case erase({sync_tab, Tab}) of + undefined -> + ok; + Pids -> + lists:foreach(fun(Pid) -> sync_reply(Pid, Tab) end, Pids) + end. + +i_have_tab(Tab) -> + case val({Tab, local_content}) of + true -> + mnesia_lib:set_local_content_whereabouts(Tab); + false -> + set({Tab, where_to_read}, node()) + end, + add_active_replica(Tab, node()). + +sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema -> + Current = val({current, db_nodes}), + Ns = + case lists:member(ToNode, Current) of + true -> Current -- [ToNode]; + false -> Current + end, + remote_call(ToNode, block_table, [Tab]), + [remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) || + Node <- [ToNode | Ns]], + ok. + +sync_del_table_copy_whereabouts(Tab, ToNode) when Tab /= schema -> + Current = val({current, db_nodes}), + Ns = + case lists:member(ToNode, Current) of + true -> Current; + false -> [ToNode | Current] + end, + Args = [Tab, ToNode], + [remote_call(Node, unannounce_add_table_copy, Args) || Node <- Ns], + ok. + +get_info(Timeout) -> + case whereis(?SERVER_NAME) of + undefined -> + {timeout, Timeout}; + Pid -> + Pid ! {self(), get_state}, + receive + {?SERVER_NAME, State} when record(State, state) -> + {info,State} + after Timeout -> + {timeout, Timeout} + end + end. + +get_workers(Timeout) -> + case whereis(?SERVER_NAME) of + undefined -> + {timeout, Timeout}; + Pid -> + Pid ! {self(), get_state}, + receive + {?SERVER_NAME, State} when record(State, state) -> + {workers, State#state.loader_pid, State#state.sender_pid, State#state.dumper_pid} + after Timeout -> + {timeout, Timeout} + end + end. + +info() -> + Tabs = mnesia_lib:local_active_tables(), + io:format( "---> Active tables <--- ~n", []), + info(Tabs). + +info([Tab | Tail]) -> + case val({Tab, storage_type}) of + disc_only_copies -> + info_format(Tab, + dets:info(Tab, size), + dets:info(Tab, file_size), + "bytes on disc"); + _ -> + info_format(Tab, + ?ets_info(Tab, size), + ?ets_info(Tab, memory), + "words of mem") + end, + info(Tail); +info([]) -> ok; +info(Tab) -> info([Tab]). + +info_format(Tab, Size, Mem, Media) -> + StrT = mnesia_lib:pad_name(atom_to_list(Tab), 15, []), + StrS = mnesia_lib:pad_name(integer_to_list(Size), 8, []), + StrM = mnesia_lib:pad_name(integer_to_list(Mem), 8, []), + io:format("~s: with ~s records occupying ~s ~s~n", + [StrT, StrS, StrM, Media]). + +%% Handle early arrived messages +handle_early_msgs([Msg | Msgs], State) -> + %% The messages are in reverse order + case handle_early_msg(Msg, State) of + {stop, Reason, Reply, State2} -> + {stop, Reason, Reply, State2}; + {stop, Reason, State2} -> + {stop, Reason, State2}; + {noreply, State2} -> + handle_early_msgs(Msgs, State2); + {noreply, State2, _Timeout} -> + handle_early_msgs(Msgs, State2); + Else -> + dbg_out("handle_early_msgs case clause ~p ~n", [Else]), + erlang:error(Else, [[Msg | Msgs], State]) + end; +handle_early_msgs([], State) -> + noreply(State). + +handle_early_msg({call, Msg, From}, State) -> + handle_call(Msg, From, State); +handle_early_msg({cast, Msg}, State) -> + handle_cast(Msg, State); +handle_early_msg({info, Msg}, State) -> + handle_info(Msg, State). + +noreply(State) -> + {noreply, State}. + +reply(undefined, Reply) -> + Reply; +reply(ReplyTo, Reply) -> + gen_server:reply(ReplyTo, Reply), + Reply. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Worker management + +%% Returns new State +add_worker(Worker, State) when record(Worker, dump_log) -> + InitBy = Worker#dump_log.initiated_by, + Queue = State#state.dumper_queue, + case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of + false -> + ignore; + true when Worker#dump_log.opt_reply_to == undefined -> + %% The same threshold has been exceeded again, + %% before we have had the possibility to + %% process the older one. + DetectedBy = {dump_log, InitBy}, + Event = {mnesia_overload, DetectedBy}, + mnesia_lib:report_system_event(Event) + end, + Queue2 = Queue ++ [Worker], + State2 = State#state{dumper_queue = Queue2}, + opt_start_worker(State2); +add_worker(Worker, State) when record(Worker, schema_commit_lock) -> + Queue = State#state.dumper_queue, + Queue2 = Queue ++ [Worker], + State2 = State#state{dumper_queue = Queue2}, + opt_start_worker(State2); +add_worker(Worker, State) when record(Worker, net_load) -> + Queue = State#state.loader_queue, + State2 = State#state{loader_queue = Queue ++ [Worker]}, + opt_start_worker(State2); +add_worker(Worker, State) when record(Worker, send_table) -> + Queue = State#state.sender_queue, + State2 = State#state{sender_queue = Queue ++ [Worker]}, + opt_start_worker(State2); +add_worker(Worker, State) when record(Worker, disc_load) -> + Queue = State#state.loader_queue, + State2 = State#state{loader_queue = Queue ++ [Worker]}, + opt_start_worker(State2); +% Block controller should be used for upgrading mnesia. +add_worker(Worker, State) when record(Worker, block_controller) -> + Queue = State#state.dumper_queue, + Queue2 = [Worker | Queue], + State2 = State#state{dumper_queue = Queue2}, + opt_start_worker(State2). + +%% Optionally start a worker +%% +%% Dumpers and loaders may run simultaneously +%% but neither of them may run during schema commit. +%% Loaders may not start if a schema commit is enqueued. +opt_start_worker(State) when State#state.is_stopping == true -> + State; +opt_start_worker(State) -> + %% Prioritize dumper and schema commit + %% by checking them first + case State#state.dumper_queue of + [Worker | _Rest] when State#state.dumper_pid == undefined -> + %% Great, a worker in queue and neither + %% a schema transaction is being + %% committed and nor a dumper is running + + %% Start worker but keep him in the queue + if + record(Worker, schema_commit_lock) -> + ReplyTo = Worker#schema_commit_lock.owner, + reply(ReplyTo, granted), + {Owner, _Tag} = ReplyTo, + State#state{dumper_pid = Owner}; + + record(Worker, dump_log) -> + Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]), + State2 = State#state{dumper_pid = Pid}, + + %% If the worker was a dumper we may + %% possibly be able to start a loader + %% or sender + State3 = opt_start_sender(State2), + opt_start_loader(State3); + + record(Worker, block_controller) -> + case {State#state.sender_pid, State#state.loader_pid} of + {undefined, undefined} -> + ReplyTo = Worker#block_controller.owner, + reply(ReplyTo, granted), + {Owner, _Tag} = ReplyTo, + State#state{dumper_pid = Owner}; + _ -> + State + end + end; + _ -> + %% Bad luck, try with a loader or sender instead + State2 = opt_start_sender(State), + opt_start_loader(State2) + end. + +opt_start_sender(State) -> + case State#state.sender_queue of + []-> + %% No need + State; + + _ when State#state.sender_pid /= undefined -> + %% Bad luck, a sender is already running + State; + + [Sender | _SenderRest] -> + case State#state.loader_queue of + [Loader | _LoaderRest] + when State#state.loader_pid /= undefined, + Loader#net_load.table == Sender#send_table.table -> + %% A conflicting loader is running + State; + _ -> + SchemaQueue = State#state.dumper_queue, + case lists:keymember(schema_commit, 1, SchemaQueue) of + false -> + + %% Start worker but keep him in the queue + Pid = spawn_link(?MODULE, send_and_reply, + [self(), Sender]), + State#state{sender_pid = Pid}; + true -> + %% Bad luck, we must wait for the schema commit + State + end + end + end. + +opt_start_loader(State) -> + LoaderQueue = State#state.loader_queue, + if + LoaderQueue == [] -> + %% No need + State; + + State#state.loader_pid /= undefined -> + %% Bad luck, an loader is already running + State; + + true -> + SchemaQueue = State#state.dumper_queue, + case lists:keymember(schema_commit, 1, SchemaQueue) of + false -> + {Worker, Rest} = pick_next(LoaderQueue), + + %% Start worker but keep him in the queue + Pid = spawn_link(?MODULE, load_and_reply, [self(), Worker]), + State#state{loader_pid = Pid, + loader_queue = [Worker | Rest]}; + true -> + %% Bad luck, we must wait for the schema commit + State + end + end. + +start_remote_sender(Node, Tab, Receiver, Storage) -> + Msg = #send_table{table = Tab, + receiver_pid = Receiver, + remote_storage = Storage}, + gen_server:cast({?SERVER_NAME, Node}, Msg). + +dump_and_reply(ReplyTo, Worker) -> + %% No trap_exit, die intentionally instead + Res = mnesia_dumper:opt_dump_log(Worker#dump_log.initiated_by), + ReplyTo ! #dumper_done{worker_pid = self(), + worker_res = Res}, + unlink(ReplyTo), + exit(normal). + +send_and_reply(ReplyTo, Worker) -> + %% No trap_exit, die intentionally instead + Res = mnesia_loader:send_table(Worker#send_table.receiver_pid, + Worker#send_table.table, + Worker#send_table.remote_storage), + ReplyTo ! #sender_done{worker_pid = self(), + worker_res = Res}, + unlink(ReplyTo), + exit(normal). + + +load_and_reply(ReplyTo, Worker) -> + process_flag(trap_exit, true), + Done = load_table(Worker), + ReplyTo ! Done#loader_done{worker_pid = self()}, + unlink(ReplyTo), + exit(normal). + +%% Now it is time to load the table +%% but first we must check if it still is neccessary +load_table(Load) when record(Load, net_load) -> + Tab = Load#net_load.table, + ReplyTo = Load#net_load.opt_reply_to, + Reason = Load#net_load.reason, + LocalC = val({Tab, local_content}), + AccessMode = val({Tab, access_mode}), + ReadNode = val({Tab, where_to_read}), + Active = filter_active(Tab), + Done = #loader_done{is_loaded = true, + table_name = Tab, + needs_announce = false, + needs_sync = false, + needs_reply = true, + reply_to = ReplyTo, + reply = {loaded, ok} + }, + if + ReadNode == node() -> + %% Already loaded locally + Done; + LocalC == true -> + Res = mnesia_loader:disc_load_table(Tab, load_local_content), + Done#loader_done{reply = Res, needs_announce = true, needs_sync = true}; + AccessMode == read_only -> + disc_load_table(Tab, Reason, ReplyTo); + true -> + %% Either we cannot read the table yet + %% or someone is moving a replica between + %% two nodes + Cs = Load#net_load.cstruct, + Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs), + case Res of + {loaded, ok} -> + Done#loader_done{needs_sync = true, + reply = Res}; + {not_loaded, storage_unknown} -> + Done#loader_done{reply = Res}; + {not_loaded, _} -> + Done#loader_done{is_loaded = false, + needs_reply = false, + reply = Res} + end + end; + +load_table(Load) when record(Load, disc_load) -> + Tab = Load#disc_load.table, + Reason = Load#disc_load.reason, + ReplyTo = Load#disc_load.opt_reply_to, + ReadNode = val({Tab, where_to_read}), + Active = filter_active(Tab), + Done = #loader_done{is_loaded = true, + table_name = Tab, + needs_announce = false, + needs_sync = false, + needs_reply = false + }, + if + Active == [], ReadNode == nowhere -> + %% Not loaded anywhere, lets load it from disc + disc_load_table(Tab, Reason, ReplyTo); + ReadNode == nowhere -> + %% Already loaded on other node, lets get it + Cs = val({Tab, cstruct}), + case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of + {loaded, ok} -> + Done#loader_done{needs_sync = true}; + {not_loaded, storage_unknown} -> + Done#loader_done{is_loaded = false}; + {not_loaded, ErrReason} -> + Done#loader_done{is_loaded = false, + reply = {not_loaded,ErrReason}} + end; + true -> + %% Already readable, do not worry be happy + Done + end. + +disc_load_table(Tab, Reason, ReplyTo) -> + Done = #loader_done{is_loaded = true, + table_name = Tab, + needs_announce = false, + needs_sync = false, + needs_reply = true, + reply_to = ReplyTo, + reply = {loaded, ok} + }, + Res = mnesia_loader:disc_load_table(Tab, Reason), + if + Res == {loaded, ok} -> + Done#loader_done{needs_announce = true, + needs_sync = true, + reply = Res}; + ReplyTo /= undefined -> + Done#loader_done{is_loaded = false, + reply = Res}; + true -> + fatal("Cannot load table ~p from disc: ~p~n", [Tab, Res]) + end. + +filter_active(Tab) -> + ByForce = val({Tab, load_by_force}), + Active = val({Tab, active_replicas}), + Masters = mnesia_recover:get_master_nodes(Tab), + do_filter_active(ByForce, Active, Masters). + +do_filter_active(true, Active, _Masters) -> + Active; +do_filter_active(false, Active, []) -> + Active; +do_filter_active(false, Active, Masters) -> + mnesia_lib:intersect(Active, Masters). |