diff options
Diffstat (limited to 'lib/mnesia/src/mnesia_controller.erl')
-rw-r--r-- | lib/mnesia/src/mnesia_controller.erl | 2182 |
1 files changed, 2182 insertions, 0 deletions
diff --git a/lib/mnesia/src/mnesia_controller.erl b/lib/mnesia/src/mnesia_controller.erl new file mode 100644 index 0000000000..9bc480e619 --- /dev/null +++ b/lib/mnesia/src/mnesia_controller.erl @@ -0,0 +1,2182 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1996-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% The mnesia_init process loads tables from local disc or from +%% another nodes. It also coordinates updates of the info about +%% where we can read and write tables. +%% +%% Tables may need to be loaded initially at startup of the local +%% node or when other nodes announces that they already have loaded +%% tables that we also want. +%% +%% Initially we set the load request queue to those tables that we +%% safely can load locally, i.e. tables where we have the last +%% consistent replica and we have received mnesia_down from all +%% other nodes holding the table. Then we let the mnesia_init +%% process enter its normal working state. +%% +%% When we need to load a table we append a request to the load +%% request queue. All other requests are regarded as high priority +%% and are processed immediately (e.g. update table whereabouts). +%% We processes the load request queue as a "background" job.. + +-module(mnesia_controller). + +-behaviour(gen_server). + +%% Mnesia internal stuff +-export([ + start/0, + i_have_tab/1, + info/0, + get_info/1, + get_workers/1, + force_load_table/1, + async_dump_log/1, + sync_dump_log/1, + connect_nodes/1, + wait_for_schema_commit_lock/0, + release_schema_commit_lock/0, + create_table/1, + get_disc_copy/1, + get_cstructs/0, + sync_and_block_table_whereabouts/4, + sync_del_table_copy_whereabouts/2, + block_table/1, + unblock_table/1, + block_controller/0, + unblock_controller/0, + unannounce_add_table_copy/2, + master_nodes_updated/2, + mnesia_down/1, + add_active_replica/2, + add_active_replica/3, + add_active_replica/4, + update/1, + change_table_access_mode/1, + del_active_replica/2, + wait_for_tables/2, + get_network_copy/2, + merge_schema/0, + start_remote_sender/4, + schedule_late_disc_load/2 + ]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +%% Module internal stuff +-export([call/1, + cast/1, + dump_and_reply/2, + load_and_reply/2, + send_and_reply/2, + wait_for_tables_init/2, + connect_nodes2/2 + ]). + +-import(mnesia_lib, [set/2, add/2]). +-import(mnesia_lib, [fatal/2, error/2, verbose/2, dbg_out/2]). + +-include("mnesia.hrl"). + +-define(SERVER_NAME, ?MODULE). + +-record(state, {supervisor, + schema_is_merged = false, + early_msgs = [], + loader_pid = [], %% Was Pid is now [{Pid,Work}|..] + loader_queue, %% Was list is now gb_tree + sender_pid = [], %% Was a pid or undef is now [{Pid,Work}|..] + sender_queue = [], + late_loader_queue, %% Was list is now gb_tree + dumper_pid, %% Dumper or schema commit pid + dumper_queue = [], %% Dumper or schema commit queue + others = [], %% Processes that needs the copier_done msg + dump_log_timer_ref, + is_stopping = false + }). +%% Backwards Comp. Sender_pid is now a list of senders.. +get_senders(#state{sender_pid = Pids}) when is_list(Pids) -> Pids. +%% Backwards Comp. loader_pid is now a list of loaders.. +get_loaders(#state{loader_pid = Pids}) when is_list(Pids) -> Pids. +max_loaders() -> + case ?catch_val(no_table_loaders) of + {'EXIT', _} -> + mnesia_lib:set(no_table_loaders,1), + 1; + Val -> Val + end. + +-record(schema_commit_lock, {owner}). +-record(block_controller, {owner}). + +-record(dump_log, {initiated_by, + opt_reply_to + }). + +-record(net_load, {table, + reason, + opt_reply_to, + cstruct = unknown + }). + +-record(send_table, {table, + receiver_pid, + remote_storage + }). + +-record(disc_load, {table, + reason, + opt_reply_to + }). + +-record(late_load, {table, + reason, + opt_reply_to, + loaders + }). + +-record(loader_done, {worker_pid, + is_loaded, + table_name, + needs_announce, + needs_sync, + needs_reply, + reply_to, + reply}). + +-record(sender_done, {worker_pid, + worker_res, + table_name + }). + +-record(dumper_done, {worker_pid, + worker_res + }). + +val(Var) -> + case ?catch_val(Var) of + {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason); + Value -> Value + end. + +start() -> + gen_server:start_link({local, ?SERVER_NAME}, ?MODULE, [self()], + [{timeout, infinity} + %% ,{debug, [trace]} + ]). + +sync_dump_log(InitBy) -> + call({sync_dump_log, InitBy}). + +async_dump_log(InitBy) -> + ?SERVER_NAME ! {async_dump_log, InitBy}. + +%% Wait for tables to be active +%% If needed, we will wait for Mnesia to start +%% If Mnesia stops, we will wait for Mnesia to restart +%% We will wait even if the list of tables is empty +%% +wait_for_tables(Tabs, Timeout) when is_list(Tabs), Timeout == infinity -> + do_wait_for_tables(Tabs, Timeout); +wait_for_tables(Tabs, Timeout) when is_list(Tabs), + is_integer(Timeout), Timeout >= 0 -> + do_wait_for_tables(Tabs, Timeout); +wait_for_tables(Tabs, Timeout) -> + {error, {badarg, Tabs, Timeout}}. + +do_wait_for_tables(Tabs, 0) -> + reply_wait(Tabs); +do_wait_for_tables(Tabs, Timeout) -> + Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]), + receive + {?SERVER_NAME, Pid, Res} -> + Res; + {'EXIT', Pid, _} -> + reply_wait(Tabs) + after Timeout -> + unlink(Pid), + exit(Pid, timeout), + reply_wait(Tabs) + end. + +reply_wait(Tabs) -> + case catch mnesia_lib:active_tables() of + {'EXIT', _} -> + {error, {node_not_running, node()}}; + Active when is_list(Active) -> + case Tabs -- Active of + [] -> + ok; + BadTabs -> + {timeout, BadTabs} + end + end. + +wait_for_tables_init(From, Tabs) -> + process_flag(trap_exit, true), + Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)), + From ! {?SERVER_NAME, self(), Res}, + unlink(From), + exit(normal). + +wait_for_init(From, Tabs, Init) -> + case catch link(Init) of + {'EXIT', _} -> + %% Mnesia is not started + {error, {node_not_running, node()}}; + true when is_pid(Init) -> + cast({sync_tabs, Tabs, self()}), + rec_tabs(Tabs, Tabs, From, Init) + end. + +sync_reply(Waiter, Tab) -> + Waiter ! {?SERVER_NAME, {tab_synced, Tab}}. + +rec_tabs([Tab | Tabs], AllTabs, From, Init) -> + receive + {?SERVER_NAME, {tab_synced, Tab}} -> + rec_tabs(Tabs, AllTabs, From, Init); + + {'EXIT', From, _} -> + %% This will trigger an exit signal + %% to mnesia_init + exit(wait_for_tables_timeout); + + {'EXIT', Init, _} -> + %% Oops, mnesia_init stopped, + exit(mnesia_stopped) + end; +rec_tabs([], _, _, Init) -> + unlink(Init), + ok. + +get_cstructs() -> + call(get_cstructs). + +update(Fun) -> + call({update,Fun}). + + +mnesia_down(Node) -> + case cast({mnesia_down, Node}) of + {error, _} -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node); + _Pid -> ok + end. +wait_for_schema_commit_lock() -> + link(whereis(?SERVER_NAME)), + unsafe_call(wait_for_schema_commit_lock). + +block_controller() -> + call(block_controller). + +unblock_controller() -> + cast(unblock_controller). + +release_schema_commit_lock() -> + cast({release_schema_commit_lock, self()}), + unlink(whereis(?SERVER_NAME)). + +%% Special for preparation of add table copy +get_network_copy(Tab, Cs) -> +% We can't let the controller queue this one +% because that may cause a deadlock between schema_operations +% and initial tableloadings which both takes schema locks. +% But we have to get copier_done msgs when the other side +% goes down. + call({add_other, self()}), + Reason = {dumper,add_table_copy}, + Work = #net_load{table = Tab,reason = Reason,cstruct = Cs}, + %% I'll need this cause it's linked trough the subscriber + %% might be solved by using monitor in subscr instead. + process_flag(trap_exit, true), + Load = load_table_fun(Work), + Res = (catch Load()), + process_flag(trap_exit, false), + call({del_other, self()}), + case Res of + #loader_done{is_loaded = true} -> + Tab = Res#loader_done.table_name, + case Res#loader_done.needs_announce of + true -> + i_have_tab(Tab); + false -> + ignore + end, + Res#loader_done.reply; + #loader_done{} -> + Res#loader_done.reply; + Else -> + {not_loaded, Else} + end. + +%% This functions is invoked from the dumper +%% +%% There are two cases here: +%% startup -> +%% no need for sync, since mnesia_controller not started yet +%% schema_trans -> +%% already synced with mnesia_controller since the dumper +%% is syncronously started from mnesia_controller + +create_table(Tab) -> + {loaded, ok} = mnesia_loader:disc_load_table(Tab, {dumper,create_table}). + +get_disc_copy(Tab) -> + disc_load_table(Tab, {dumper,change_table_copy_type}, undefined). + +%% Returns ok instead of yes +force_load_table(Tab) when is_atom(Tab), Tab /= schema -> + case ?catch_val({Tab, storage_type}) of + ram_copies -> + do_force_load_table(Tab); + disc_copies -> + do_force_load_table(Tab); + disc_only_copies -> + do_force_load_table(Tab); + unknown -> + set({Tab, load_by_force}, true), + cast({force_load_updated, Tab}), + wait_for_tables([Tab], infinity); + {'EXIT', _} -> + {error, {no_exists, Tab}} + end; +force_load_table(Tab) -> + {error, {bad_type, Tab}}. + +do_force_load_table(Tab) -> + Loaded = ?catch_val({Tab, load_reason}), + case Loaded of + unknown -> + set({Tab, load_by_force}, true), + mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user), + wait_for_tables([Tab], infinity); + {'EXIT', _} -> + set({Tab, load_by_force}, true), + mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user), + wait_for_tables([Tab], infinity); + _ -> + ok + end. +master_nodes_updated(schema, _Masters) -> + ignore; +master_nodes_updated(Tab, Masters) -> + cast({master_nodes_updated, Tab, Masters}). + +schedule_late_disc_load(Tabs, Reason) -> + MsgTag = late_disc_load, + try_schedule_late_disc_load(Tabs, Reason, MsgTag). + +try_schedule_late_disc_load(Tabs, _Reason, MsgTag) + when Tabs == [], MsgTag /= schema_is_merged -> + ignore; +try_schedule_late_disc_load(Tabs, Reason, MsgTag) -> + GetIntents = + fun() -> + Item = mnesia_late_disc_load, + Nodes = val({current, db_nodes}), + mnesia:lock({global, Item, Nodes}, write), + case multicall(Nodes -- [node()], disc_load_intents) of + {Replies, []} -> + call({MsgTag, Tabs, Reason, Replies}), + done; + {_, BadNodes} -> + %% Some nodes did not respond, lets try again + {retry, BadNodes} + end + end, + case mnesia:transaction(GetIntents) of + {atomic, done} -> + done; + {atomic, {retry, BadNodes}} -> + verbose("Retry late_load_tables because bad nodes: ~p~n", + [BadNodes]), + try_schedule_late_disc_load(Tabs, Reason, MsgTag); + {aborted, AbortReason} -> + fatal("Cannot late_load_tables~p: ~p~n", + [[Tabs, Reason, MsgTag], AbortReason]) + end. + +connect_nodes(Ns) -> + case mnesia:system_info(is_running) of + no -> + {error, {node_not_running, node()}}; + yes -> + Pid = spawn_link(?MODULE,connect_nodes2,[self(),Ns]), + receive + {?MODULE, Pid, Res, New} -> + case Res of + ok -> + mnesia_lib:add_list(extra_db_nodes, New), + {ok, New}; + {aborted, {throw, Str}} when is_list(Str) -> + %%mnesia_recover:disconnect_nodes(New), + {error, {merge_schema_failed, lists:flatten(Str)}}; + Else -> + {error, Else} + end; + {'EXIT', Pid, Reason} -> + {error, Reason} + end + end. + +connect_nodes2(Father, Ns) -> + Current = val({current, db_nodes}), + abcast([node()|Ns], {merging_schema, node()}), + {NewC, OldC} = mnesia_recover:connect_nodes(Ns), + Connected = NewC ++OldC, + New1 = mnesia_lib:intersect(Ns, Connected), + New = New1 -- Current, + process_flag(trap_exit, true), + Res = try_merge_schema(New), + Msg = {schema_is_merged, [], late_merge, []}, + multicall([node()|Ns], Msg), + After = val({current, db_nodes}), + Father ! {?MODULE, self(), Res, mnesia_lib:intersect(Ns,After)}, + unlink(Father), + ok. + +%% Merge the local schema with the schema on other nodes. +%% But first we must let all processes that want to force +%% load tables wait until the schema merge is done. + +merge_schema() -> + AllNodes = mnesia_lib:all_nodes(), + case try_merge_schema(AllNodes) of + ok -> + schema_is_merged(); + {aborted, {throw, Str}} when is_list(Str) -> + fatal("Failed to merge schema: ~s~n", [Str]); + Else -> + fatal("Failed to merge schema: ~p~n", [Else]) + end. + +try_merge_schema(Nodes) -> + case mnesia_schema:merge_schema() of + {atomic, not_merged} -> + %% No more nodes that we need to merge the schema with + ok; + {atomic, {merged, OldFriends, NewFriends}} -> + %% Check if new nodes has been added to the schema + Diff = mnesia_lib:all_nodes() -- [node() | Nodes], + mnesia_recover:connect_nodes(Diff), + + %% Tell everybody to adopt orphan tables + im_running(OldFriends, NewFriends), + im_running(NewFriends, OldFriends), + + try_merge_schema(Nodes); + {atomic, {"Cannot get cstructs", Node, Reason}} -> + dbg_out("Cannot get cstructs, Node ~p ~p~n", [Node, Reason]), + timer:sleep(1000), % Avoid a endless loop look alike + try_merge_schema(Nodes); + Other -> + Other + end. + +im_running(OldFriends, NewFriends) -> + abcast(OldFriends, {im_running, node(), NewFriends}). + +schema_is_merged() -> + MsgTag = schema_is_merged, + SafeLoads = initial_safe_loads(), + + %% At this point we do not know anything about + %% which tables that the other nodes already + %% has loaded and therefore we let the normal + %% processing of the loader_queue take care + %% of it, since we at that time point will + %% know the whereabouts. We rely on the fact + %% that all nodes tells each other directly + %% when they have loaded a table and are + %% willing to share it. + + try_schedule_late_disc_load(SafeLoads, initial, MsgTag). + + +cast(Msg) -> + case whereis(?SERVER_NAME) of + undefined ->{error, {node_not_running, node()}}; + Pid -> gen_server:cast(Pid, Msg) + end. + +abcast(Nodes, Msg) -> + gen_server:abcast(Nodes, ?SERVER_NAME, Msg). + +unsafe_call(Msg) -> + case whereis(?SERVER_NAME) of + undefined -> {error, {node_not_running, node()}}; + Pid -> gen_server:call(Pid, Msg, infinity) + end. + +call(Msg) -> + case whereis(?SERVER_NAME) of + undefined -> + {error, {node_not_running, node()}}; + Pid -> + link(Pid), + Res = gen_server:call(Pid, Msg, infinity), + unlink(Pid), + + %% We get an exit signal if server dies + receive + {'EXIT', Pid, _Reason} -> + {error, {node_not_running, node()}} + after 0 -> + Res + end + end. + +remote_call(Node, Func, Args) -> + case catch gen_server:call({?MODULE, Node}, {Func, Args, self()}, infinity) of + {'EXIT', Error} -> + {error, Error}; + Else -> + Else + end. + +multicall(Nodes, Msg) -> + {Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity), + PatchedGood = [Reply || {_Node, Reply} <- Good], + {PatchedGood, Bad}. %% Make the replies look like rpc:multicalls.. +%% rpc:multicall(Nodes, ?MODULE, call, [Msg]). + +%%%---------------------------------------------------------------------- +%%% Callback functions from gen_server +%%%---------------------------------------------------------------------- + +%%---------------------------------------------------------------------- +%% Func: init/1 +%% Returns: {ok, State} | +%% {ok, State, Timeout} | +%% {stop, Reason} +%%---------------------------------------------------------------------- +init([Parent]) -> + process_flag(trap_exit, true), + mnesia_lib:verbose("~p starting: ~p~n", [?SERVER_NAME, self()]), + + %% Handshake and initialize transaction recovery + %% for new nodes detected in the schema + All = mnesia_lib:all_nodes(), + Diff = All -- [node() | val(original_nodes)], + mnesia_lib:unset(original_nodes), + mnesia_recover:connect_nodes(Diff), + + Interval = mnesia_monitor:get_env(dump_log_time_threshold), + Msg = {async_dump_log, time_threshold}, + {ok, Ref} = timer:send_interval(Interval, Msg), + mnesia_dumper:start_regulator(), + + Empty = gb_trees:empty(), + {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref, + loader_queue = Empty, + late_loader_queue = Empty}}. + +%%---------------------------------------------------------------------- +%% Func: handle_call/3 +%% Returns: {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | (terminate/2 is called) +%% {stop, Reason, Reply, State} (terminate/2 is called) +%%---------------------------------------------------------------------- + +handle_call({sync_dump_log, InitBy}, From, State) -> + Worker = #dump_log{initiated_by = InitBy, + opt_reply_to = From + }, + State2 = add_worker(Worker, State), + noreply(State2); + +handle_call(wait_for_schema_commit_lock, From, State) -> + Worker = #schema_commit_lock{owner = From}, + State2 = add_worker(Worker, State), + noreply(State2); + +handle_call(block_controller, From, State) -> + Worker = #block_controller{owner = From}, + State2 = add_worker(Worker, State), + noreply(State2); + +handle_call({update,Fun}, From, State) -> + Res = (catch Fun()), + reply(From, Res), + noreply(State); + +handle_call(get_cstructs, From, State) -> + Tabs = val({schema, tables}), + Cstructs = [val({T, cstruct}) || T <- Tabs], + Running = val({current, db_nodes}), + reply(From, {cstructs, Cstructs, Running}), + noreply(State); + +handle_call({schema_is_merged, [], late_merge, []}, From, + State = #state{schema_is_merged = Merged}) -> + case Merged of + {false, Node} when Node == node(From) -> + Msgs = State#state.early_msgs, + State1 = State#state{early_msgs = [], schema_is_merged = true}, + handle_early_msgs(lists:reverse(Msgs), State1); + _ -> + %% Ooops this came to early, before we have merged :-) + %% or it came to late or from a node we don't care about + reply(From, ignore), + noreply(State) + end; + +handle_call({schema_is_merged, TabsR, Reason, RemoteLoaders}, From, State) -> + State2 = late_disc_load(TabsR, Reason, RemoteLoaders, From, State), + + %% Handle early messages + Msgs = State2#state.early_msgs, + State3 = State2#state{early_msgs = [], schema_is_merged = true}, + handle_early_msgs(lists:reverse(Msgs), State3); + +handle_call(disc_load_intents,From,State = #state{loader_queue=LQ,late_loader_queue=LLQ}) -> + LQTabs = gb_trees:keys(LQ), + LLQTabs = gb_trees:keys(LLQ), + ActiveTabs = lists:sort(mnesia_lib:local_active_tables()), + reply(From, {ok, node(), ordsets:union([LQTabs,LLQTabs,ActiveTabs])}), + noreply(State); + +handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) -> + Current = val({current, db_nodes}), + Res = + case lists:member(AddNode, Current) and + (State#state.schema_is_merged == true) of + true -> + mnesia_lib:add_lsort({Tab, where_to_write}, AddNode); + false -> + ignore + end, + {reply, Res, State}; + +handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From}, + ReplyTo, State) -> + KnownNode = lists:member(ToNode, val({current, db_nodes})), + Merged = State#state.schema_is_merged, + if + KnownNode == false -> + reply(ReplyTo, ignore), + noreply(State); + Merged == true -> + Res = case ?catch_val({Tab, cstruct}) of + {'EXIT', _} -> %% Tab deleted + deleted; + _ -> + add_active_replica(Tab, ToNode, RemoteS, AccessMode) + end, + reply(ReplyTo, Res), + noreply(State); + true -> %% Schema is not merged + Msg = {add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From}, + Msgs = State#state.early_msgs, + reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge + noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]}) + end; + +handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) -> + KnownNode = lists:member(node(From), val({current, db_nodes})), + Merged = State#state.schema_is_merged, + if + KnownNode == false -> + reply(ReplyTo, ignore), + noreply(State); + Merged == true -> + Res = unannounce_add_table_copy(Tab, Node), + reply(ReplyTo, Res), + noreply(State); + true -> %% Schema is not merged + Msg = {unannounce_add_table_copy, [Tab, Node], From}, + Msgs = State#state.early_msgs, + reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge + %% Set ReplyTO to undefined so we don't reply twice + noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]}) + end; + +handle_call({net_load, Tab, Cs}, From, State) -> + State2 = + case State#state.schema_is_merged of + true -> + Worker = #net_load{table = Tab, + opt_reply_to = From, + reason = {dumper,add_table_copy}, + cstruct = Cs + }, + add_worker(Worker, State); + false -> + reply(From, {not_loaded, schema_not_merged}), + State + end, + noreply(State2); + +handle_call(Msg, From, State) when State#state.schema_is_merged /= true -> + %% Buffer early messages + Msgs = State#state.early_msgs, + noreply(State#state{early_msgs = [{call, Msg, From} | Msgs]}); + +handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) -> + State2 = late_disc_load(Tabs, Reason, RemoteLoaders, From, State), + noreply(State2); + +handle_call({unblock_table, Tab}, _Dummy, State) -> + Var = {Tab, where_to_commit}, + case val(Var) of + {blocked, List} -> + set(Var, List); % where_to_commit + _ -> + ignore + end, + {reply, ok, State}; + +handle_call({block_table, [Tab], From}, _Dummy, State) -> + case lists:member(node(From), val({current, db_nodes})) of + true -> + block_table(Tab); + false -> + ignore + end, + {reply, ok, State}; + +handle_call({check_w2r, _Node, Tab}, _From, State) -> + {reply, val({Tab, where_to_read}), State}; + +handle_call({add_other, Who}, _From, State = #state{others=Others0}) -> + Others = [Who|Others0], + {reply, ok, State#state{others=Others}}; +handle_call({del_other, Who}, _From, State = #state{others=Others0}) -> + Others = lists:delete(Who, Others0), + {reply, ok, State#state{others=Others}}; + +handle_call(Msg, _From, State) -> + error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]), + noreply(State). + +late_disc_load(TabsR, Reason, RemoteLoaders, From, + State = #state{loader_queue = LQ, late_loader_queue = LLQ}) -> + verbose("Intend to load tables: ~p~n", [TabsR]), + ?eval_debug_fun({?MODULE, late_disc_load}, + [{tabs, TabsR}, + {reason, Reason}, + {loaders, RemoteLoaders}]), + + reply(From, queued), + %% RemoteLoaders is a list of {ok, Node, Tabs} tuples + + %% Remove deleted tabs and queued/loaded + LocalTabs = gb_sets:from_ordset(lists:sort(mnesia_lib:val({schema,local_tables}))), + Filter = fun(TabInfo0, Acc) -> + TabInfo = {Tab,_} = + case TabInfo0 of + {_,_} -> TabInfo0; + TabN -> {TabN,Reason} + end, + case gb_sets:is_member(Tab, LocalTabs) of + true -> + case ?catch_val({Tab, where_to_read}) == node() of + true -> Acc; + false -> + case gb_trees:is_defined(Tab,LQ) of + true -> Acc; + false -> [TabInfo | Acc] + end + end; + false -> Acc + end + end, + + Tabs = lists:foldl(Filter, [], TabsR), + + Nodes = val({current, db_nodes}), + LateQueue = late_loaders(Tabs, RemoteLoaders, Nodes, LLQ), + State#state{late_loader_queue = LateQueue}. + +late_loaders([{Tab, Reason} | Tabs], RemoteLoaders, Nodes, LLQ) -> + case gb_trees:is_defined(Tab, LLQ) of + false -> + LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []), + case LoadNodes of + [] -> cast({disc_load, Tab, Reason}); % Ugly cast + _ -> ignore + end, + LateLoad = #late_load{table=Tab,loaders=LoadNodes,reason=Reason}, + late_loaders(Tabs, RemoteLoaders, Nodes, gb_trees:insert(Tab,LateLoad,LLQ)); + true -> + late_loaders(Tabs, RemoteLoaders, Nodes, LLQ) + end; +late_loaders([], _RemoteLoaders, _Nodes, LLQ) -> + LLQ. + +late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) -> + late_load_filter(RemoteLoaders, Tab, Nodes, Acc); +late_load_filter([{badrpc, _} | RemoteLoaders], Tab, Nodes, Acc) -> + late_load_filter(RemoteLoaders, Tab, Nodes, Acc); +late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) -> + {ok, Node, Intents} = RL, + Access = val({Tab, access_mode}), + LocalC = val({Tab, local_content}), + StillActive = lists:member(Node, Nodes), + RemoteIntent = lists:member(Tab, Intents), + if + Access == read_write, + LocalC == false, + StillActive == true, + RemoteIntent == true -> + Masters = mnesia_recover:get_master_nodes(Tab), + case lists:member(Node, Masters) of + true -> + %% The other node is master node for + %% the table, accept his load intent + late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]); + false when Masters == [] -> + %% The table has no master nodes + %% accept his load intent + late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]); + false -> + %% Some one else is master node for + %% the table, ignore his load intent + late_load_filter(RemoteLoaders, Tab, Nodes, Acc) + end; + true -> + late_load_filter(RemoteLoaders, Tab, Nodes, Acc) + end; +late_load_filter([], _Tab, _Nodes, Acc) -> + Acc. + +%%---------------------------------------------------------------------- +%% Func: handle_cast/2 +%% Returns: {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} (terminate/2 is called) +%%---------------------------------------------------------------------- + +handle_cast({release_schema_commit_lock, _Owner}, State) -> + if + State#state.is_stopping == true -> + {stop, shutdown, State}; + true -> + case State#state.dumper_queue of + [#schema_commit_lock{}|Rest] -> + [_Worker | Rest] = State#state.dumper_queue, + State2 = State#state{dumper_pid = undefined, + dumper_queue = Rest}, + State3 = opt_start_worker(State2), + noreply(State3); + _ -> + noreply(State) + end + end; + +handle_cast(unblock_controller, State) -> + if + State#state.is_stopping == true -> + {stop, shutdown, State}; + is_record(hd(State#state.dumper_queue), block_controller) -> + [_Worker | Rest] = State#state.dumper_queue, + State2 = State#state{dumper_pid = undefined, + dumper_queue = Rest}, + State3 = opt_start_worker(State2), + noreply(State3) + end; + +handle_cast({mnesia_down, Node}, State) -> + maybe_log_mnesia_down(Node), + mnesia_lib:del({current, db_nodes}, Node), + mnesia_checkpoint:tm_mnesia_down(Node), + Alltabs = val({schema, tables}), + reconfigure_tables(Node, Alltabs), + %% Done from (external point of view) + mnesia_monitor:mnesia_down(?SERVER_NAME, Node), + + %% Fix if we are late_merging against the node that went down + case State#state.schema_is_merged of + {false, Node} -> + spawn(?MODULE, call, [{schema_is_merged, [], late_merge, []}]); + _ -> + ignore + end, + + %% Fix internal stuff + LateQ = remove_loaders(Alltabs, Node, State#state.late_loader_queue), + + case get_senders(State) ++ get_loaders(State) of + [] -> ignore; + Senders -> + lists:foreach(fun({Pid,_}) -> Pid ! {copier_done, Node} end, + Senders) + end, + lists:foreach(fun(Pid) -> Pid ! {copier_done,Node} end, + State#state.others), + + Remove = fun(ST) -> + node(ST#send_table.receiver_pid) /= Node + end, + NewSenders = lists:filter(Remove, State#state.sender_queue), + Early = remove_early_messages(State#state.early_msgs, Node), + noreply(State#state{sender_queue = NewSenders, + early_msgs = Early, + late_loader_queue = LateQ + }); + +handle_cast({merging_schema, Node}, State) -> + case State#state.schema_is_merged of + false -> + %% This comes from dynamic connect_nodes which are made + %% after mnesia:start() and the schema_merge. + ImANewKidInTheBlock = + (val({schema, storage_type}) == ram_copies) + andalso (mnesia_lib:val({schema, local_tables}) == [schema]), + case ImANewKidInTheBlock of + true -> %% I'm newly started ram_node.. + noreply(State#state{schema_is_merged = {false, Node}}); + false -> + noreply(State) + end; + _ -> %% Already merging schema. + noreply(State) + end; + +handle_cast(Msg, State) when State#state.schema_is_merged /= true -> + %% Buffer early messages + Msgs = State#state.early_msgs, + noreply(State#state{early_msgs = [{cast, Msg} | Msgs]}); + +%% This must be done after schema_is_merged otherwise adopt_orphan +%% might trigger a table load from wrong nodes as a result of that we don't +%% know which tables we can load safly first. +handle_cast({im_running, _Node, NewFriends}, State) -> + LocalTabs = mnesia_lib:local_active_tables() -- [schema], + RemoveLocalOnly = fun(Tab) -> not val({Tab, local_content}) end, + Tabs = lists:filter(RemoveLocalOnly, LocalTabs), + Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})), + abcast(Ns, {adopt_orphans, node(), Tabs}), + noreply(State); + +handle_cast({disc_load, Tab, Reason}, State) -> + Worker = #disc_load{table = Tab, reason = Reason}, + State2 = add_worker(Worker, State), + noreply(State2); + +handle_cast(Worker = #send_table{}, State) -> + State2 = add_worker(Worker, State), + noreply(State2); + +handle_cast({sync_tabs, Tabs, From}, State) -> + %% user initiated wait_for_tables + handle_sync_tabs(Tabs, From), + noreply(State); + +handle_cast({i_have_tab, Tab, Node}, State) -> + case lists:member(Node, val({current, db_nodes})) of + true -> + State2 = node_has_tabs([Tab], Node, State), + noreply(State2); + false -> + noreply(State) + end; + +handle_cast({force_load_updated, Tab}, State) -> + case val({Tab, active_replicas}) of + [] -> + %% No valid replicas + noreply(State); + [SomeNode | _] -> + State2 = node_has_tabs([Tab], SomeNode, State), + noreply(State2) + end; + +handle_cast({master_nodes_updated, Tab, Masters}, State) -> + Active = val({Tab, active_replicas}), + Valid = + case val({Tab, load_by_force}) of + true -> + Active; + false -> + if + Masters == [] -> + Active; + true -> + mnesia_lib:intersect(Masters, Active) + end + end, + case Valid of + [] -> + %% No valid replicas + noreply(State); + [SomeNode | _] -> + State2 = node_has_tabs([Tab], SomeNode, State), + noreply(State2) + end; + +handle_cast({adopt_orphans, Node, Tabs}, State) -> + + State2 = node_has_tabs(Tabs, Node, State), + + %% Register the other node as up and running + mnesia_recover:log_mnesia_up(Node), + verbose("Logging mnesia_up ~w~n",[Node]), + mnesia_lib:report_system_event({mnesia_up, Node}), + + %% Load orphan tables + LocalTabs = val({schema, local_tables}) -- [schema], + Nodes = val({current, db_nodes}), + {LocalOrphans, RemoteMasters} = + orphan_tables(LocalTabs, Node, Nodes, [], []), + Reason = {adopt_orphan, node()}, + mnesia_late_loader:async_late_disc_load(node(), LocalOrphans, Reason), + + Fun = + fun(N) -> + RemoteOrphans = + [Tab || {Tab, Ns} <- RemoteMasters, + lists:member(N, Ns)], + mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason) + end, + lists:foreach(Fun, Nodes), + noreply(State2); + +handle_cast(Msg, State) -> + error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]), + noreply(State). + +handle_sync_tabs([Tab | Tabs], From) -> + case val({Tab, where_to_read}) of + nowhere -> + case get({sync_tab, Tab}) of + undefined -> + put({sync_tab, Tab}, [From]); + Pids -> + put({sync_tab, Tab}, [From | Pids]) + end; + _ -> + sync_reply(From, Tab) + end, + handle_sync_tabs(Tabs, From); +handle_sync_tabs([], _From) -> + ok. + +%%---------------------------------------------------------------------- +%% Func: handle_info/2 +%% Returns: {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} (terminate/2 is called) +%%---------------------------------------------------------------------- + +handle_info({async_dump_log, InitBy}, State) -> + Worker = #dump_log{initiated_by = InitBy}, + State2 = add_worker(Worker, State), + noreply(State2); + +handle_info(#dumper_done{worker_pid=Pid, worker_res=Res}, State) -> + if + State#state.is_stopping == true -> + {stop, shutdown, State}; + Res == dumped, Pid == State#state.dumper_pid -> + [Worker | Rest] = State#state.dumper_queue, + reply(Worker#dump_log.opt_reply_to, Res), + State2 = State#state{dumper_pid = undefined, + dumper_queue = Rest}, + State3 = opt_start_worker(State2), + noreply(State3); + true -> + fatal("Dumper failed: ~p~n state: ~p~n", [Res, State]), + {stop, fatal, State} + end; + +handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) -> + LateQueue0 = State0#state.late_loader_queue, + State1 = State0#state{loader_pid = lists:keydelete(WPid,1,get_loaders(State0))}, + + State2 = + case Done#loader_done.is_loaded of + true -> + %% Optional table announcement + if + Done#loader_done.needs_announce == true, + Done#loader_done.needs_reply == true -> + i_have_tab(Tab), + %% Should be {dumper,add_table_copy} only + reply(Done#loader_done.reply_to, + Done#loader_done.reply); + Done#loader_done.needs_reply == true -> + %% Should be {dumper,add_table_copy} only + reply(Done#loader_done.reply_to, + Done#loader_done.reply); + Done#loader_done.needs_announce == true, Tab == schema -> + i_have_tab(Tab); + Done#loader_done.needs_announce == true -> + i_have_tab(Tab), + %% Local node needs to perform user_sync_tab/1 + Ns = val({current, db_nodes}), + abcast(Ns, {i_have_tab, Tab, node()}); + Tab == schema -> + ignore; + true -> + %% Local node needs to perform user_sync_tab/1 + Ns = val({current, db_nodes}), + AlreadyKnows = val({Tab, active_replicas}), + abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()}) + end, + %% Optional user sync + case Done#loader_done.needs_sync of + true -> user_sync_tab(Tab); + false -> ignore + end, + State1#state{late_loader_queue=gb_trees:delete_any(Tab, LateQueue0)}; + false -> + %% Either the node went down or table was not + %% loaded remotly yet + case Done#loader_done.needs_reply of + true -> + reply(Done#loader_done.reply_to, + Done#loader_done.reply); + false -> + ignore + end, + case ?catch_val({Tab, active_replicas}) of + [_|_] -> % still available elsewhere + {value,{_,Worker}} = lists:keysearch(WPid,1,get_loaders(State0)), + add_loader(Tab,Worker,State1); + _ -> + State1 + end + end, + State3 = opt_start_worker(State2), + noreply(State3); + +handle_info(#sender_done{worker_pid=Pid, worker_res=Res}, State) -> + Senders = get_senders(State), + {value, {Pid,_Worker}} = lists:keysearch(Pid, 1, Senders), + if + Res == ok -> + State2 = State#state{sender_pid = lists:keydelete(Pid, 1, Senders)}, + State3 = opt_start_worker(State2), + noreply(State3); + true -> + %% No need to send any message to the table receiver + %% since it will soon get a mnesia_down anyway + fatal("Sender failed: ~p~n state: ~p~n", [Res, State]), + {stop, fatal, State} + end; + +handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor -> + catch set(mnesia_status, stopping), + case State#state.dumper_pid of + undefined -> + dbg_out("~p was ~p~n", [?SERVER_NAME, R]), + {stop, shutdown, State}; + _ -> + noreply(State#state{is_stopping = true}) + end; + +handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid -> + case State#state.dumper_queue of + [#schema_commit_lock{}|Workers] -> %% Schema trans crashed or was killed + dbg_out("WARNING: Dumper ~p exited ~p~n", [Pid, R]), + State2 = State#state{dumper_queue = Workers, dumper_pid = undefined}, + State3 = opt_start_worker(State2), + noreply(State3); + _Other -> + fatal("Dumper or schema commit crashed: ~p~n state: ~p~n", [R, State]), + {stop, fatal, State} + end; + +handle_info(Msg = {'EXIT', Pid, R}, State) when R /= wait_for_tables_timeout -> + case lists:keymember(Pid, 1, get_senders(State)) of + true -> + %% No need to send any message to the table receiver + %% since it will soon get a mnesia_down anyway + fatal("Sender crashed: ~p~n state: ~p~n", [{Pid,R}, State]), + {stop, fatal, State}; + false -> + case lists:keymember(Pid, 1, get_loaders(State)) of + true -> + fatal("Loader crashed: ~p~n state: ~p~n", [R, State]), + {stop, fatal, State}; + false -> + error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]), + noreply(State) + end + end; + +handle_info({From, get_state}, State) -> + From ! {?SERVER_NAME, State}, + noreply(State); + +%% No real need for buffering +handle_info(Msg, State) when State#state.schema_is_merged /= true -> + %% Buffer early messages + Msgs = State#state.early_msgs, + noreply(State#state{early_msgs = [{info, Msg} | Msgs]}); + +handle_info({'EXIT', Pid, wait_for_tables_timeout}, State) -> + sync_tab_timeout(Pid, get()), + noreply(State); + +handle_info(Msg, State) -> + error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]), + noreply(State). + +sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) -> + case lists:delete(Pid, Pids) of + [] -> + erase({sync_tab, Tab}); + Pids2 -> + put({sync_tab, Tab}, Pids2) + end, + sync_tab_timeout(Pid, Tail); +sync_tab_timeout(Pid, [_ | Tail]) -> + sync_tab_timeout(Pid, Tail); +sync_tab_timeout(_Pid, []) -> + ok. + +%% Pick the load record that has the highest load order +%% Returns {BestLoad, RemainingQueue} or {none, []} if queue is empty +pick_next(Queue) -> + List = gb_trees:values(Queue), + case pick_next(List, none, none) of + none -> {none, gb_trees:empty()}; + {Tab, Worker} -> {Worker, gb_trees:delete(Tab,Queue)} + end. + +pick_next([Head = #net_load{table=Tab}| Tail], Load, Order) -> + select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order); +pick_next([Head = #disc_load{table=Tab}| Tail], Load, Order) -> + select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order); +pick_next([], none, _Order) -> + none; +pick_next([], Load, _Order) -> + {element(2,Load), Load}. + +select_best(_Head, Tail, {'EXIT', _WHAT}, Load, Order) -> + %% Table have been deleted drop it. + pick_next(Tail, Load, Order); +select_best(Load, Tail, Order, none, none) -> + pick_next(Tail, Load, Order); +select_best(Load, Tail, Order, _OldLoad, OldOrder) when Order > OldOrder -> + pick_next(Tail, Load, Order); +select_best(_Load, Tail, _Order, OldLoad, OldOrder) -> + pick_next(Tail, OldLoad, OldOrder). + +%%---------------------------------------------------------------------- +%% Func: terminate/2 +%% Purpose: Shutdown the server +%% Returns: any (ignored by gen_server) +%%---------------------------------------------------------------------- +terminate(Reason, State) -> + mnesia_monitor:terminate_proc(?SERVER_NAME, Reason, State). + +%%---------------------------------------------------------------------- +%% Func: code_change/3 +%% Purpose: Upgrade process when its code is to be changed +%% Returns: {ok, NewState} +%%---------------------------------------------------------------------- +code_change(_OldVsn, State0, _Extra) -> + %% Loader Queue + State1 = case State0#state.loader_pid of + Pids when is_list(Pids) -> State0; + undefined -> State0#state{loader_pid = [],loader_queue=gb_trees:empty()}; + Pid when is_pid(Pid) -> + [Loader|Rest] = State0#state.loader_queue, + LQ0 = [{element(2,Rec),Rec} || Rec <- Rest], + LQ1 = lists:sort(LQ0), + LQ = gb_trees:from_orddict(LQ1), + State0#state{loader_pid=[{Pid,Loader}], loader_queue=LQ} + end, + %% LateLoaderQueue + State = if is_list(State1#state.late_loader_queue) -> + LLQ0 = State1#state.late_loader_queue, + LLQ1 = lists:sort([{element(2,Rec),Rec} || Rec <- LLQ0]), + LLQ = gb_trees:from_orddict(LLQ1), + State1#state{late_loader_queue=LLQ}; + true -> + State1 + end, + {ok, State}. + +%%%---------------------------------------------------------------------- +%%% Internal functions +%%%---------------------------------------------------------------------- + +maybe_log_mnesia_down(N) -> + %% We use mnesia_down when deciding which tables to load locally, + %% so if we are not running (i.e haven't decided which tables + %% to load locally), don't log mnesia_down yet. + case mnesia_lib:is_running() of + yes -> + verbose("Logging mnesia_down ~w~n", [N]), + mnesia_recover:log_mnesia_down(N), + ok; + _ -> + Filter = fun(Tab) -> + inactive_copy_holders(Tab, N) + end, + HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]), + if + HalfLoadedTabs == true -> + verbose("Logging mnesia_down ~w~n", [N]), + mnesia_recover:log_mnesia_down(N), + ok; + true -> + %% Unfortunately we have not loaded some common + %% tables yet, so we cannot rely on the nodedown + log_later %% BUGBUG handle this case!!! + end + end. + +inactive_copy_holders(Tab, Node) -> + Cs = val({Tab, cstruct}), + case mnesia_lib:cs_to_storage_type(Node, Cs) of + unknown -> + false; + _Storage -> + mnesia_lib:not_active_here(Tab) + end. + +orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) -> + Cs = val({Tab, cstruct}), + CopyHolders = mnesia_lib:copy_holders(Cs), + RamCopyHolders = Cs#cstruct.ram_copies, + DiscCopyHolders = CopyHolders -- RamCopyHolders, + DiscNodes = val({schema, disc_copies}), + LocalContent = Cs#cstruct.local_content, + RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes), + Active = val({Tab, active_replicas}), + BeingCreated = (?catch_val({Tab, create_table}) == true), + Read = val({Tab, where_to_read}), + case lists:member(Node, DiscCopyHolders) of + _ when BeingCreated == true -> + orphan_tables(Tabs, Node, Ns, Local, Remote); + _ when Read == node() -> %% Allready loaded + orphan_tables(Tabs, Node, Ns, Local, Remote); + true when Active == [] -> + case DiscCopyHolders -- Ns of + [] -> + %% We're last up and the other nodes have not + %% loaded the table. Lets load it if we are + %% the smallest node. + case lists:min(DiscCopyHolders) of + Min when Min == node() -> + case mnesia_recover:get_master_nodes(Tab) of + [] -> + L = [Tab | Local], + orphan_tables(Tabs, Node, Ns, L, Remote); + Masters -> + R = [{Tab, Masters} | Remote], + orphan_tables(Tabs, Node, Ns, Local, R) + end; + _ -> + orphan_tables(Tabs, Node, Ns, Local, Remote) + end; + _ -> + orphan_tables(Tabs, Node, Ns, Local, Remote) + end; + false when Active == [], DiscCopyHolders == [], RamCopyHoldersOnDiscNodes == [] -> + %% Special case when all replicas resides on disc less nodes + orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote); + _ when LocalContent == true -> + orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote); + _ -> + orphan_tables(Tabs, Node, Ns, Local, Remote) + end; +orphan_tables([], _, _, LocalOrphans, RemoteMasters) -> + {LocalOrphans, RemoteMasters}. + +node_has_tabs([Tab | Tabs], Node, State) when Node /= node() -> + State2 = + case catch update_whereabouts(Tab, Node, State) of + State1 = #state{} -> State1; + {'EXIT', R} -> %% Tab was just deleted? + case ?catch_val({Tab, cstruct}) of + {'EXIT', _} -> State; % yes + _ -> erlang:error(R) + end + end, + node_has_tabs(Tabs, Node, State2); +node_has_tabs([Tab | Tabs], Node, State) -> + user_sync_tab(Tab), + node_has_tabs(Tabs, Node, State); +node_has_tabs([], _Node, State) -> + State. + +update_whereabouts(Tab, Node, State) -> + Storage = val({Tab, storage_type}), + Read = val({Tab, where_to_read}), + LocalC = val({Tab, local_content}), + BeingCreated = (?catch_val({Tab, create_table}) == true), + Masters = mnesia_recover:get_master_nodes(Tab), + ByForce = val({Tab, load_by_force}), + GoGetIt = + if + ByForce == true -> + true; + Masters == [] -> + true; + true -> + lists:member(Node, Masters) + end, + + dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n", + [Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]), + if + LocalC == true -> + %% Local contents, don't care about other node + State; + BeingCreated == true -> + %% The table is currently being created + %% It will be handled elsewhere + State; + Storage == unknown, Read == nowhere -> + %% No own copy, time to read remotely + %% if the other node is a good node + add_active_replica(Tab, Node), + case GoGetIt of + true -> + set({Tab, where_to_read}, Node), + user_sync_tab(Tab), + State; + false -> + State + end; + Storage == unknown -> + %% No own copy, continue to read remotely + add_active_replica(Tab, Node), + NodeST = mnesia_lib:storage_type_at_node(Node, Tab), + ReadST = mnesia_lib:storage_type_at_node(Read, Tab), + if %% Avoid reading from disc_only_copies + NodeST == disc_only_copies -> + ignore; + ReadST == disc_only_copies -> + mnesia_lib:set_remote_where_to_read(Tab); + true -> + ignore + end, + user_sync_tab(Tab), + State; + Read == nowhere -> + %% Own copy, go and get a copy of the table + %% if the other node is master or if there + %% are no master at all + add_active_replica(Tab, Node), + case GoGetIt of + true -> + Worker = #net_load{table = Tab, + reason = {active_remote, Node}}, + add_worker(Worker, State); + false -> + State + end; + true -> + %% We already have an own copy + add_active_replica(Tab, Node), + user_sync_tab(Tab), + State + end. + +initial_safe_loads() -> + case val({schema, storage_type}) of + ram_copies -> + Downs = [], + Tabs = val({schema, local_tables}) -- [schema], + LastC = fun(T) -> last_consistent_replica(T, Downs) end, + lists:zf(LastC, Tabs); + + disc_copies -> + Downs = mnesia_recover:get_mnesia_downs(), + dbg_out("mnesia_downs = ~p~n", [Downs]), + + Tabs = val({schema, local_tables}) -- [schema], + LastC = fun(T) -> last_consistent_replica(T, Downs) end, + lists:zf(LastC, Tabs) + end. + +last_consistent_replica(Tab, Downs) -> + Cs = val({Tab, cstruct}), + Storage = mnesia_lib:cs_to_storage_type(node(), Cs), + Ram = Cs#cstruct.ram_copies, + Disc = Cs#cstruct.disc_copies, + DiscOnly = Cs#cstruct.disc_only_copies, + BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs, + BetterCopies = BetterCopies0 -- Ram, + AccessMode = Cs#cstruct.access_mode, + Copies = mnesia_lib:copy_holders(Cs), + Masters = mnesia_recover:get_master_nodes(Tab), + LocalMaster0 = lists:member(node(), Masters), + LocalContent = Cs#cstruct.local_content, + RemoteMaster = + if + Masters == [] -> false; + true -> not LocalMaster0 + end, + LocalMaster = + if + Masters == [] -> false; + true -> LocalMaster0 + end, + if + Copies == [node()] -> + %% Only one copy holder and it is local. + %% It may also be a local contents table + {true, {Tab, local_only}}; + LocalContent == true -> + {true, {Tab, local_content}}; + LocalMaster == true -> + %% We have a local master + {true, {Tab, local_master}}; + RemoteMaster == true -> + %% Wait for remote master copy + false; + Storage == ram_copies -> + if + Disc == [], DiscOnly == [] -> + %% Nobody has copy on disc + {true, {Tab, ram_only}}; + true -> + %% Some other node has copy on disc + false + end; + AccessMode == read_only -> + %% No one has been able to update the table, + %% i.e. all disc resident copies are equal + {true, {Tab, read_only}}; + BetterCopies /= [], Masters /= [node()] -> + %% There are better copies on other nodes + %% and we do not have the only master copy + false; + true -> + {true, {Tab, initial}} + end. + +reconfigure_tables(N, [Tab |Tail]) -> + del_active_replica(Tab, N), + case val({Tab, where_to_read}) of + N -> mnesia_lib:set_remote_where_to_read(Tab); + _ -> ignore + end, + reconfigure_tables(N, Tail); +reconfigure_tables(_, []) -> + ok. + +remove_loaders([Tab| Tabs], N, Loaders) -> + LateQ = drop_loaders(Tab, N, Loaders), + remove_loaders(Tabs, N, LateQ); +remove_loaders([],_, LateQ) -> LateQ. + +remove_early_messages([], _Node) -> + []; +remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) -> + remove_early_messages(R, Node); %% Does a reply before queuing +remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node) + when node(From) == Node -> + reply(ReplyTo, ok), %% Remove gen:server waits.. + remove_early_messages(R, Node); +remove_early_messages([{cast, {i_have_tab, _Tab, Node}}|R], Node) -> + remove_early_messages(R, Node); +remove_early_messages([{cast, {adopt_orphans, Node, _Tabs}}|R], Node) -> + remove_early_messages(R, Node); +remove_early_messages([M|R],Node) -> + [M|remove_early_messages(R,Node)]. + +%% Drop loader from late load queue and possibly trigger a disc_load +drop_loaders(Tab, Node, LLQ) -> + case gb_trees:lookup(Tab,LLQ) of + none -> + LLQ; + {value, H} -> + %% Check if it is time to issue a disc_load request + case H#late_load.loaders of + [Node] -> + Reason = {H#late_load.reason, last_loader_down, Node}, + cast({disc_load, Tab, Reason}); % Ugly cast + _ -> + ignore + end, + %% Drop the node from the list of loaders + H2 = H#late_load{loaders = H#late_load.loaders -- [Node]}, + gb_trees:update(Tab, H2, LLQ) + end. + +add_active_replica(Tab, Node) -> + add_active_replica(Tab, Node, val({Tab, cstruct})). + +add_active_replica(Tab, Node, Cs = #cstruct{}) -> + Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs), + AccessMode = Cs#cstruct.access_mode, + add_active_replica(Tab, Node, Storage, AccessMode). + +%% Block table primitives + +block_table(Tab) -> + Var = {Tab, where_to_commit}, + Old = val(Var), + New = {blocked, Old}, + set(Var, New). % where_to_commit + +unblock_table(Tab) -> + call({unblock_table, Tab}). + +is_tab_blocked(W2C) when is_list(W2C) -> + {false, W2C}; +is_tab_blocked({blocked, W2C}) when is_list(W2C) -> + {true, W2C}. + +mark_blocked_tab(true, Value) -> + {blocked, Value}; +mark_blocked_tab(false, Value) -> + Value. + +%% + +add_active_replica(Tab, Node, Storage, AccessMode) -> + Var = {Tab, where_to_commit}, + {Blocked, Old} = is_tab_blocked(val(Var)), + Del = lists:keydelete(Node, 1, Old), + case AccessMode of + read_write -> + New = lists:sort([{Node, Storage} | Del]), + set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit + mnesia_lib:add_lsort({Tab, where_to_write}, Node); + read_only -> + set(Var, mark_blocked_tab(Blocked, Del)), + mnesia_lib:del({Tab, where_to_write}, Node) + end, + add({Tab, active_replicas}, Node). + +del_active_replica(Tab, Node) -> + Var = {Tab, where_to_commit}, + {Blocked, Old} = is_tab_blocked(val(Var)), + Del = lists:keydelete(Node, 1, Old), + New = lists:sort(Del), + set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit + mnesia_lib:del({Tab, active_replicas}, Node), + mnesia_lib:del({Tab, where_to_write}, Node). + +change_table_access_mode(Cs) -> + W = fun() -> + Tab = Cs#cstruct.name, + lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end, + val({Tab, active_replicas})) + end, + update(W). + + +%% node To now has tab loaded, but this must be undone +%% This code is rpc:call'ed from the tab_copier process +%% when it has *not* released it's table lock +unannounce_add_table_copy(Tab, To) -> + catch del_active_replica(Tab, To), + case catch val({Tab , where_to_read}) of + To -> + mnesia_lib:set_remote_where_to_read(Tab); + _ -> + ignore + end. + +user_sync_tab(Tab) -> + case val(debug) of + trace -> + mnesia_subscr:subscribe(whereis(mnesia_event), {table, Tab}); + _ -> + ignore + end, + + case erase({sync_tab, Tab}) of + undefined -> + ok; + Pids -> + lists:foreach(fun(Pid) -> sync_reply(Pid, Tab) end, Pids) + end. + +i_have_tab(Tab) -> + case val({Tab, local_content}) of + true -> + mnesia_lib:set_local_content_whereabouts(Tab); + false -> + set({Tab, where_to_read}, node()) + end, + add_active_replica(Tab, node()). + +sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema -> + Current = val({current, db_nodes}), + Ns = + case lists:member(ToNode, Current) of + true -> Current -- [ToNode]; + false -> Current + end, + remote_call(ToNode, block_table, [Tab]), + [remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) || + Node <- [ToNode | Ns]], + ok. + +sync_del_table_copy_whereabouts(Tab, ToNode) when Tab /= schema -> + Current = val({current, db_nodes}), + Ns = + case lists:member(ToNode, Current) of + true -> Current; + false -> [ToNode | Current] + end, + Args = [Tab, ToNode], + [remote_call(Node, unannounce_add_table_copy, Args) || Node <- Ns], + ok. + +get_info(Timeout) -> + case whereis(?SERVER_NAME) of + undefined -> + {timeout, Timeout}; + Pid -> + Pid ! {self(), get_state}, + receive + {?SERVER_NAME, State = #state{loader_queue=LQ,late_loader_queue=LLQ}} -> + {info,State#state{loader_queue=gb_trees:to_list(LQ), + late_loader_queue=gb_trees:to_list(LLQ)}} + after Timeout -> + {timeout, Timeout} + end + end. + +get_workers(Timeout) -> + case whereis(?SERVER_NAME) of + undefined -> + {timeout, Timeout}; + Pid -> + Pid ! {self(), get_state}, + receive + {?SERVER_NAME, State = #state{}} -> + {workers, get_loaders(State), get_senders(State), State#state.dumper_pid} + after Timeout -> + {timeout, Timeout} + end + end. + +info() -> + Tabs = mnesia_lib:local_active_tables(), + io:format( "---> Active tables <--- ~n", []), + info(Tabs). + +info([Tab | Tail]) -> + case val({Tab, storage_type}) of + disc_only_copies -> + info_format(Tab, + dets:info(Tab, size), + dets:info(Tab, file_size), + "bytes on disc"); + _ -> + info_format(Tab, + ?ets_info(Tab, size), + ?ets_info(Tab, memory), + "words of mem") + end, + info(Tail); +info([]) -> ok. + + +info_format(Tab, Size, Mem, Media) -> + StrT = mnesia_lib:pad_name(atom_to_list(Tab), 15, []), + StrS = mnesia_lib:pad_name(integer_to_list(Size), 8, []), + StrM = mnesia_lib:pad_name(integer_to_list(Mem), 8, []), + io:format("~s: with ~s records occupying ~s ~s~n", + [StrT, StrS, StrM, Media]). + +%% Handle early arrived messages +handle_early_msgs([Msg | Msgs], State) -> + %% The messages are in reverse order + case handle_early_msg(Msg, State) of +%% {stop, Reason, Reply, State2} -> % Will not happen according to dialyzer +%% {stop, Reason, Reply, State2}; + {stop, Reason, State2} -> + {stop, Reason, State2}; + {noreply, State2} -> + handle_early_msgs(Msgs, State2); + {reply, Reply, State2} -> + {call, _Call, From} = Msg, + reply(From, Reply), + handle_early_msgs(Msgs, State2) + end; +handle_early_msgs([], State) -> + noreply(State). + +handle_early_msg({call, Msg, From}, State) -> + handle_call(Msg, From, State); +handle_early_msg({cast, Msg}, State) -> + handle_cast(Msg, State); +handle_early_msg({info, Msg}, State) -> + handle_info(Msg, State). + +noreply(State) -> + {noreply, State}. + +reply(undefined, Reply) -> + Reply; +reply(ReplyTo, Reply) -> + gen_server:reply(ReplyTo, Reply), + Reply. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Worker management + +%% Returns new State +add_worker(Worker = #dump_log{}, State) -> + InitBy = Worker#dump_log.initiated_by, + Queue = State#state.dumper_queue, + case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of + true when Worker#dump_log.opt_reply_to == undefined -> + %% The same threshold has been exceeded again, + %% before we have had the possibility to + %% process the older one. + DetectedBy = {dump_log, InitBy}, + Event = {mnesia_overload, DetectedBy}, + mnesia_lib:report_system_event(Event); + _ -> + ignore + end, + Queue2 = Queue ++ [Worker], + State2 = State#state{dumper_queue = Queue2}, + opt_start_worker(State2); +add_worker(Worker = #schema_commit_lock{}, State) -> + Queue = State#state.dumper_queue, + Queue2 = Queue ++ [Worker], + State2 = State#state{dumper_queue = Queue2}, + opt_start_worker(State2); +add_worker(Worker = #net_load{}, State) -> + opt_start_worker(add_loader(Worker#net_load.table,Worker,State)); +add_worker(Worker = #send_table{}, State) -> + Queue = State#state.sender_queue, + State2 = State#state{sender_queue = Queue ++ [Worker]}, + opt_start_worker(State2); +add_worker(Worker = #disc_load{}, State) -> + opt_start_worker(add_loader(Worker#disc_load.table,Worker,State)); +% Block controller should be used for upgrading mnesia. +add_worker(Worker = #block_controller{}, State) -> + Queue = State#state.dumper_queue, + Queue2 = [Worker | Queue], + State2 = State#state{dumper_queue = Queue2}, + opt_start_worker(State2). + +add_loader(Tab,Worker,State = #state{loader_queue=LQ0}) -> + case gb_trees:is_defined(Tab, LQ0) of + true -> State; + false -> + LQ=gb_trees:insert(Tab, Worker, LQ0), + State#state{loader_queue=LQ} + end. + +%% Optionally start a worker +%% +%% Dumpers and loaders may run simultaneously +%% but neither of them may run during schema commit. +%% Loaders may not start if a schema commit is enqueued. +opt_start_worker(State) when State#state.is_stopping == true -> + State; +opt_start_worker(State) -> + %% Prioritize dumper and schema commit + %% by checking them first + case State#state.dumper_queue of + [Worker | _Rest] when State#state.dumper_pid == undefined -> + %% Great, a worker in queue and neither + %% a schema transaction is being + %% committed and nor a dumper is running + + %% Start worker but keep him in the queue + if + is_record(Worker, schema_commit_lock) -> + ReplyTo = Worker#schema_commit_lock.owner, + reply(ReplyTo, granted), + {Owner, _Tag} = ReplyTo, + opt_start_loader(State#state{dumper_pid = Owner}); + + is_record(Worker, dump_log) -> + Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]), + State2 = State#state{dumper_pid = Pid}, + + %% If the worker was a dumper we may + %% possibly be able to start a loader + %% or sender + State3 = opt_start_sender(State2), + opt_start_loader(State3); + + is_record(Worker, block_controller) -> + case {get_senders(State), get_loaders(State)} of + {[], []} -> + ReplyTo = Worker#block_controller.owner, + reply(ReplyTo, granted), + {Owner, _Tag} = ReplyTo, + State#state{dumper_pid = Owner}; + _ -> + State + end + end; + _ -> + %% Bad luck, try with a loader or sender instead + State2 = opt_start_sender(State), + opt_start_loader(State2) + end. + +opt_start_sender(State) -> + case State#state.sender_queue of + []-> State; %% No need + SenderQ -> + {NewS,Kept} = opt_start_sender2(SenderQ, get_senders(State), + [], get_loaders(State)), + State#state{sender_pid = NewS, sender_queue = Kept} + end. + +opt_start_sender2([], Pids,Kept, _) -> {Pids,Kept}; +opt_start_sender2([Sender|R], Pids, Kept, LoaderQ) -> + Tab = Sender#send_table.table, + Active = val({Tab, active_replicas}), + IgotIt = lists:member(node(), Active), + IsLoading = lists:any(fun({_Pid,Loader}) -> + Tab == element(#net_load.table, Loader) + end, LoaderQ), + if + IgotIt, IsLoading -> + %% I'm currently finishing loading the table let him wait + opt_start_sender2(R,Pids, [Sender|Kept], LoaderQ); + IgotIt -> + %% Start worker but keep him in the queue + Pid = spawn_link(?MODULE, send_and_reply,[self(), Sender]), + opt_start_sender2(R,[{Pid,Sender}|Pids],Kept,LoaderQ); + true -> + verbose("Send table failed ~p not active on this node ~n", [Tab]), + Sender#send_table.receiver_pid ! {copier_done, node()}, + opt_start_sender2(R,Pids, Kept, LoaderQ) + end. + +opt_start_loader(State = #state{loader_queue = LoaderQ}) -> + Current = get_loaders(State), + Max = max_loaders(), + case gb_trees:is_empty(LoaderQ) of + true -> + State; + _ when length(Current) >= Max -> + State; + false -> + SchemaQueue = State#state.dumper_queue, + case lists:keymember(schema_commit_lock, 1, SchemaQueue) of + false -> + case pick_next(LoaderQ) of + {none,Rest} -> + State#state{loader_queue=Rest}; + {Worker,Rest} -> + case already_loading(Worker, get_loaders(State)) of + true -> + opt_start_loader(State#state{loader_queue = Rest}); + false -> + %% Start worker but keep him in the queue + Pid = load_and_reply(self(), Worker), + State#state{loader_pid=[{Pid,Worker}|get_loaders(State)], + loader_queue = Rest} + end + end; + true -> + %% Bad luck, we must wait for the schema commit + State + end + end. + +already_loading(#net_load{table=Tab},Loaders) -> + already_loading2(Tab,Loaders); +already_loading(#disc_load{table=Tab},Loaders) -> + already_loading2(Tab,Loaders). + +already_loading2(Tab, [{_,#net_load{table=Tab}}|_]) -> true; +already_loading2(Tab, [{_,#disc_load{table=Tab}}|_]) -> true; +already_loading2(Tab, [_|Rest]) -> already_loading2(Tab,Rest); +already_loading2(_,[]) -> false. + +start_remote_sender(Node, Tab, Receiver, Storage) -> + Msg = #send_table{table = Tab, + receiver_pid = Receiver, + remote_storage = Storage}, + gen_server:cast({?SERVER_NAME, Node}, Msg). + +dump_and_reply(ReplyTo, Worker) -> + %% No trap_exit, die intentionally instead + Res = mnesia_dumper:opt_dump_log(Worker#dump_log.initiated_by), + ReplyTo ! #dumper_done{worker_pid = self(), + worker_res = Res}, + unlink(ReplyTo), + exit(normal). + +send_and_reply(ReplyTo, Worker) -> + %% No trap_exit, die intentionally instead + Res = mnesia_loader:send_table(Worker#send_table.receiver_pid, + Worker#send_table.table, + Worker#send_table.remote_storage), + ReplyTo ! #sender_done{worker_pid = self(), + worker_res = Res}, + unlink(ReplyTo), + exit(normal). + +load_and_reply(ReplyTo, Worker) -> + Load = load_table_fun(Worker), + SendAndReply = + fun() -> + process_flag(trap_exit, true), + Done = Load(), + ReplyTo ! Done#loader_done{worker_pid = self()}, + unlink(ReplyTo), + exit(normal) + end, + spawn_link(SendAndReply). + +%% Now it is time to load the table +%% but first we must check if it still is neccessary +load_table_fun(#net_load{cstruct=Cs, table=Tab, reason=Reason, opt_reply_to=ReplyTo}) -> + LocalC = val({Tab, local_content}), + AccessMode = val({Tab, access_mode}), + ReadNode = val({Tab, where_to_read}), + Active = filter_active(Tab), + Done = #loader_done{is_loaded = true, + table_name = Tab, + needs_announce = false, + needs_sync = false, + needs_reply = (ReplyTo /= undefined), + reply_to = ReplyTo, + reply = {loaded, ok} + }, + if + ReadNode == node() -> + %% Already loaded locally + fun() -> Done end; + LocalC == true -> + fun() -> + Res = mnesia_loader:disc_load_table(Tab, load_local_content), + Done#loader_done{reply = Res, needs_announce = true, needs_sync = true} + end; + AccessMode == read_only, Reason /= {dumper,add_table_copy} -> + fun() -> disc_load_table(Tab, Reason, ReplyTo) end; + true -> + fun() -> + %% Either we cannot read the table yet + %% or someone is moving a replica between + %% two nodes + Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs), + case Res of + {loaded, ok} -> + Done#loader_done{needs_sync = true, + reply = Res}; + {not_loaded, _} -> + Done#loader_done{is_loaded = false, + reply = Res} + end + end + end; +load_table_fun(#disc_load{table=Tab, reason=Reason, opt_reply_to=ReplyTo}) -> + ReadNode = val({Tab, where_to_read}), + Active = filter_active(Tab), + Done = #loader_done{is_loaded = true, + table_name = Tab, + needs_announce = false, + needs_sync = false, + needs_reply = false + }, + if + Active == [], ReadNode == nowhere -> + %% Not loaded anywhere, lets load it from disc + fun() -> disc_load_table(Tab, Reason, ReplyTo) end; + ReadNode == nowhere -> + %% Already loaded on other node, lets get it + Cs = val({Tab, cstruct}), + fun() -> + case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of + {loaded, ok} -> + Done#loader_done{needs_sync = true}; + {not_loaded, storage_unknown} -> + Done#loader_done{is_loaded = false}; + {not_loaded, ErrReason} -> + Done#loader_done{is_loaded = false, + reply = {not_loaded,ErrReason}} + end + end; + true -> + %% Already readable, do not worry be happy + fun() -> Done end + end. + +disc_load_table(Tab, Reason, ReplyTo) -> + Done = #loader_done{is_loaded = true, + table_name = Tab, + needs_announce = false, + needs_sync = false, + needs_reply = ReplyTo /= undefined, + reply_to = ReplyTo, + reply = {loaded, ok} + }, + Res = mnesia_loader:disc_load_table(Tab, Reason), + if + Res == {loaded, ok} -> + Done#loader_done{needs_announce = true, + needs_sync = true, + reply = Res}; + ReplyTo /= undefined -> + Done#loader_done{is_loaded = false, + reply = Res}; + true -> + fatal("Cannot load table ~p from disc: ~p~n", [Tab, Res]) + end. + +filter_active(Tab) -> + ByForce = val({Tab, load_by_force}), + Active = val({Tab, active_replicas}), + Masters = mnesia_recover:get_master_nodes(Tab), + Ns = do_filter_active(ByForce, Active, Masters), + %% Reorder the so that we load from fastest first + LS = ?catch_val({Tab, storage_type}), + DOC = val({Tab, disc_only_copies}), + {Good,Worse} = + case LS of + disc_only_copies -> + G = mnesia_lib:intersect(Ns, DOC), + {G,Ns--G}; + _ -> + G = Ns -- DOC, + {G,Ns--G} + end, + %% Pick a random node of the fastest + Len = length(Good), + if + Len > 0 -> + R = erlang:phash(node(), Len+1), + random(R-1,Good,Worse); + true -> + Worse + end. + +random(N, [H|R], Acc) when N > 0 -> + random(N-1,R, [H|Acc]); +random(0, L, Acc) -> + L ++ Acc. + +do_filter_active(true, Active, _Masters) -> + Active; +do_filter_active(false, Active, []) -> + Active; +do_filter_active(false, Active, Masters) -> + mnesia_lib:intersect(Active, Masters). + + |