%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1996-2010. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%%
%% The mnesia_init process loads tables from local disc or from
%% another nodes. It also coordinates updates of the info about
%% where we can read and write tables.
%%
%% Tables may need to be loaded initially at startup of the local
%% node or when other nodes announces that they already have loaded
%% tables that we also want.
%%
%% Initially we set the load request queue to those tables that we
%% safely can load locally, i.e. tables where we have the last
%% consistent replica and we have received mnesia_down from all
%% other nodes holding the table. Then we let the mnesia_init
%% process enter its normal working state.
%%
%% When we need to load a table we append a request to the load
%% request queue. All other requests are regarded as high priority
%% and are processed immediately (e.g. update table whereabouts).
%% We processes the load request queue as a "background" job..
-module(mnesia_controller).
-behaviour(gen_server).
%% Mnesia internal stuff
-export([
start/0,
i_have_tab/1,
info/0,
get_info/1,
get_workers/1,
force_load_table/1,
async_dump_log/1,
sync_dump_log/1,
connect_nodes/1,
connect_nodes/2,
wait_for_schema_commit_lock/0,
release_schema_commit_lock/0,
create_table/1,
get_disc_copy/1,
get_cstructs/0,
sync_and_block_table_whereabouts/4,
sync_del_table_copy_whereabouts/2,
block_table/1,
unblock_table/1,
block_controller/0,
unblock_controller/0,
unannounce_add_table_copy/2,
master_nodes_updated/2,
mnesia_down/1,
add_active_replica/2,
add_active_replica/3,
add_active_replica/4,
update/1,
change_table_access_mode/1,
del_active_replica/2,
wait_for_tables/2,
get_network_copy/2,
merge_schema/0,
start_remote_sender/4,
schedule_late_disc_load/2
]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
%% Module internal stuff
-export([call/1,
cast/1,
dump_and_reply/2,
load_and_reply/2,
send_and_reply/2,
wait_for_tables_init/2,
connect_nodes2/3
]).
-import(mnesia_lib, [set/2, add/2]).
-import(mnesia_lib, [fatal/2, error/2, verbose/2, dbg_out/2]).
-include("mnesia.hrl").
-define(SERVER_NAME, ?MODULE).
-record(state, {supervisor,
schema_is_merged = false,
early_msgs = [],
loader_pid = [], %% Was Pid is now [{Pid,Work}|..]
loader_queue, %% Was list is now gb_tree
sender_pid = [], %% Was a pid or undef is now [{Pid,Work}|..]
sender_queue = [],
late_loader_queue, %% Was list is now gb_tree
dumper_pid, %% Dumper or schema commit pid
dumper_queue = [], %% Dumper or schema commit queue
others = [], %% Processes that needs the copier_done msg
dump_log_timer_ref,
is_stopping = false
}).
%% Backwards Comp. Sender_pid is now a list of senders..
get_senders(#state{sender_pid = Pids}) when is_list(Pids) -> Pids.
%% Backwards Comp. loader_pid is now a list of loaders..
get_loaders(#state{loader_pid = Pids}) when is_list(Pids) -> Pids.
max_loaders() ->
case ?catch_val(no_table_loaders) of
{'EXIT', _} ->
mnesia_lib:set(no_table_loaders,1),
1;
Val -> Val
end.
-record(schema_commit_lock, {owner}).
-record(block_controller, {owner}).
-record(dump_log, {initiated_by,
opt_reply_to
}).
-record(net_load, {table,
reason,
opt_reply_to,
cstruct = unknown
}).
-record(send_table, {table,
receiver_pid,
remote_storage
}).
-record(disc_load, {table,
reason,
opt_reply_to
}).
-record(late_load, {table,
reason,
opt_reply_to,
loaders
}).
-record(loader_done, {worker_pid,
is_loaded,
table_name,
needs_announce,
needs_sync,
needs_reply,
reply_to,
reply}).
-record(sender_done, {worker_pid,
worker_res,
table_name
}).
-record(dumper_done, {worker_pid,
worker_res
}).
val(Var) ->
case ?catch_val(Var) of
{'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason);
Value -> Value
end.
start() ->
gen_server:start_link({local, ?SERVER_NAME}, ?MODULE, [self()],
[{timeout, infinity}
%% ,{debug, [trace]}
]).
sync_dump_log(InitBy) ->
call({sync_dump_log, InitBy}).
async_dump_log(InitBy) ->
?SERVER_NAME ! {async_dump_log, InitBy}.
%% Wait for tables to be active
%% If needed, we will wait for Mnesia to start
%% If Mnesia stops, we will wait for Mnesia to restart
%% We will wait even if the list of tables is empty
%%
wait_for_tables(Tabs, Timeout) when is_list(Tabs), Timeout == infinity ->
do_wait_for_tables(Tabs, Timeout);
wait_for_tables(Tabs, Timeout) when is_list(Tabs),
is_integer(Timeout), Timeout >= 0 ->
do_wait_for_tables(Tabs, Timeout);
wait_for_tables(Tabs, Timeout) ->
{error, {badarg, Tabs, Timeout}}.
do_wait_for_tables(Tabs, 0) ->
reply_wait(Tabs);
do_wait_for_tables(Tabs, Timeout) ->
Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]),
receive
{?SERVER_NAME, Pid, Res} ->
Res;
{'EXIT', Pid, _} ->
reply_wait(Tabs)
after Timeout ->
unlink(Pid),
exit(Pid, timeout),
reply_wait(Tabs)
end.
reply_wait(Tabs) ->
case catch mnesia_lib:active_tables() of
{'EXIT', _} ->
{error, {node_not_running, node()}};
Active when is_list(Active) ->
case Tabs -- Active of
[] ->
ok;
BadTabs ->
{timeout, BadTabs}
end
end.
wait_for_tables_init(From, Tabs) ->
process_flag(trap_exit, true),
Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)),
From ! {?SERVER_NAME, self(), Res},
unlink(From),
exit(normal).
wait_for_init(From, Tabs, Init) ->
case catch link(Init) of
{'EXIT', _} ->
%% Mnesia is not started
{error, {node_not_running, node()}};
true when is_pid(Init) ->
cast({sync_tabs, Tabs, self()}),
rec_tabs(Tabs, Tabs, From, Init)
end.
sync_reply(Waiter, Tab) ->
Waiter ! {?SERVER_NAME, {tab_synced, Tab}}.
rec_tabs([Tab | Tabs], AllTabs, From, Init) ->
receive
{?SERVER_NAME, {tab_synced, Tab}} ->
rec_tabs(Tabs, AllTabs, From, Init);
{'EXIT', From, _} ->
%% This will trigger an exit signal
%% to mnesia_init
exit(wait_for_tables_timeout);
{'EXIT', Init, _} ->
%% Oops, mnesia_init stopped,
exit(mnesia_stopped)
end;
rec_tabs([], _, _, Init) ->
unlink(Init),
ok.
get_cstructs() ->
call(get_cstructs).
update(Fun) ->
call({update,Fun}).
mnesia_down(Node) ->
case cast({mnesia_down, Node}) of
{error, _} -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node);
_Pid -> ok
end.
wait_for_schema_commit_lock() ->
link(whereis(?SERVER_NAME)),
unsafe_call(wait_for_schema_commit_lock).
block_controller() ->
call(block_controller).
unblock_controller() ->
cast(unblock_controller).
release_schema_commit_lock() ->
cast({release_schema_commit_lock, self()}),
unlink(whereis(?SERVER_NAME)).
%% Special for preparation of add table copy
get_network_copy(Tab, Cs) ->
% We can't let the controller queue this one
% because that may cause a deadlock between schema_operations
% and initial tableloadings which both takes schema locks.
% But we have to get copier_done msgs when the other side
% goes down.
call({add_other, self()}),
Reason = {dumper,add_table_copy},
Work = #net_load{table = Tab,reason = Reason,cstruct = Cs},
%% I'll need this cause it's linked trough the subscriber
%% might be solved by using monitor in subscr instead.
process_flag(trap_exit, true),
Load = load_table_fun(Work),
Res = (catch Load()),
process_flag(trap_exit, false),
call({del_other, self()}),
case Res of
#loader_done{is_loaded = true} ->
Tab = Res#loader_done.table_name,
case Res#loader_done.needs_announce of
true ->
i_have_tab(Tab);
false ->
ignore
end,
Res#loader_done.reply;
#loader_done{} ->
Res#loader_done.reply;
Else ->
{not_loaded, Else}
end.
%% This functions is invoked from the dumper
%%
%% There are two cases here:
%% startup ->
%% no need for sync, since mnesia_controller not started yet
%% schema_trans ->
%% already synced with mnesia_controller since the dumper
%% is syncronously started from mnesia_controller
create_table(Tab) ->
{loaded, ok} = mnesia_loader:disc_load_table(Tab, {dumper,create_table}).
get_disc_copy(Tab) ->
disc_load_table(Tab, {dumper,change_table_copy_type}, undefined).
%% Returns ok instead of yes
force_load_table(Tab) when is_atom(Tab), Tab /= schema ->
case ?catch_val({Tab, storage_type}) of
ram_copies ->
do_force_load_table(Tab);
disc_copies ->
do_force_load_table(Tab);
disc_only_copies ->
do_force_load_table(Tab);
unknown ->
set({Tab, load_by_force}, true),
cast({force_load_updated, Tab}),
wait_for_tables([Tab], infinity);
{'EXIT', _} ->
{error, {no_exists, Tab}}
end;
force_load_table(Tab) ->
{error, {bad_type, Tab}}.
do_force_load_table(Tab) ->
Loaded = ?catch_val({Tab, load_reason}),
case Loaded of
unknown ->
set({Tab, load_by_force}, true),
mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
wait_for_tables([Tab], infinity);
{'EXIT', _} ->
set({Tab, load_by_force}, true),
mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
wait_for_tables([Tab], infinity);
_ ->
ok
end.
master_nodes_updated(schema, _Masters) ->
ignore;
master_nodes_updated(Tab, Masters) ->
cast({master_nodes_updated, Tab, Masters}).
schedule_late_disc_load(Tabs, Reason) ->
MsgTag = late_disc_load,
try_schedule_late_disc_load(Tabs, Reason, MsgTag).
try_schedule_late_disc_load(Tabs, _Reason, MsgTag)
when Tabs == [], MsgTag /= schema_is_merged ->
ignore;
try_schedule_late_disc_load(Tabs, Reason, MsgTag) ->
GetIntents =
fun() ->
Item = mnesia_late_disc_load,
Nodes = val({current, db_nodes}),
mnesia:lock({global, Item, Nodes}, write),
case multicall(Nodes -- [node()], disc_load_intents) of
{Replies, []} ->
call({MsgTag, Tabs, Reason, Replies}),
done;
{_, BadNodes} ->
%% Some nodes did not respond, lets try again
{retry, BadNodes}
end
end,
case mnesia:transaction(GetIntents) of
{atomic, done} ->
done;
{atomic, {retry, BadNodes}} ->
verbose("Retry late_load_tables because bad nodes: ~p~n",
[BadNodes]),
try_schedule_late_disc_load(Tabs, Reason, MsgTag);
{aborted, AbortReason} ->
fatal("Cannot late_load_tables~p: ~p~n",
[[Tabs, Reason, MsgTag], AbortReason])
end.
connect_nodes(Ns) ->
connect_nodes(Ns, fun default_merge/1).
connect_nodes(Ns, UserFun) ->
case mnesia:system_info(is_running) of
no ->
{error, {node_not_running, node()}};
yes ->
Pid = spawn_link(?MODULE,connect_nodes2,[self(),Ns, UserFun]),
receive
{?MODULE, Pid, Res, New} ->
case Res of
ok ->
mnesia_lib:add_list(extra_db_nodes, New),
{ok, New};
{aborted, {throw, Str}} when is_list(Str) ->
%%mnesia_recover:disconnect_nodes(New),
{error, {merge_schema_failed, lists:flatten(Str)}};
Else ->
{error, Else}
end;
{'EXIT', Pid, Reason} ->
{error, Reason}
end
end.
connect_nodes2(Father, Ns, UserFun) ->
Current = val({current, db_nodes}),
abcast([node()|Ns], {merging_schema, node()}),
{NewC, OldC} = mnesia_recover:connect_nodes(Ns),
Connected = NewC ++OldC,
New1 = mnesia_lib:intersect(Ns, Connected),
New = New1 -- Current,
process_flag(trap_exit, true),
Res = try_merge_schema(New, UserFun),
Msg = {schema_is_merged, [], late_merge, []},
multicall([node()|Ns], Msg),
After = val({current, db_nodes}),
Father ! {?MODULE, self(), Res, mnesia_lib:intersect(Ns,After)},
unlink(Father),
ok.
%% Merge the local schema with the schema on other nodes.
%% But first we must let all processes that want to force
%% load tables wait until the schema merge is done.
merge_schema() ->
AllNodes = mnesia_lib:all_nodes(),
case try_merge_schema(AllNodes, fun default_merge/1) of
ok ->
schema_is_merged();
{aborted, {throw, Str}} when is_list(Str) ->
fatal("Failed to merge schema: ~s~n", [Str]);
Else ->
fatal("Failed to merge schema: ~p~n", [Else])
end.
default_merge(F) ->
F([]).
try_merge_schema(Nodes, UserFun) ->
case mnesia_schema:merge_schema(UserFun) of
{atomic, not_merged} ->
%% No more nodes that we need to merge the schema with
ok;
{atomic, {merged, OldFriends, NewFriends}} ->
%% Check if new nodes has been added to the schema
Diff = mnesia_lib:all_nodes() -- [node() | Nodes],
mnesia_recover:connect_nodes(Diff),
%% Tell everybody to adopt orphan tables
im_running(OldFriends, NewFriends),
im_running(NewFriends, OldFriends),
try_merge_schema(Nodes, UserFun);
{atomic, {"Cannot get cstructs", Node, Reason}} ->
dbg_out("Cannot get cstructs, Node ~p ~p~n", [Node, Reason]),
timer:sleep(1000), % Avoid a endless loop look alike
try_merge_schema(Nodes, UserFun);
Other ->
Other
end.
im_running(OldFriends, NewFriends) ->
abcast(OldFriends, {im_running, node(), NewFriends}).
schema_is_merged() ->
MsgTag = schema_is_merged,
SafeLoads = initial_safe_loads(),
%% At this point we do not know anything about
%% which tables that the other nodes already
%% has loaded and therefore we let the normal
%% processing of the loader_queue take care
%% of it, since we at that time point will
%% know the whereabouts. We rely on the fact
%% that all nodes tells each other directly
%% when they have loaded a table and are
%% willing to share it.
try_schedule_late_disc_load(SafeLoads, initial, MsgTag).
cast(Msg) ->
case whereis(?SERVER_NAME) of
undefined ->{error, {node_not_running, node()}};
Pid -> gen_server:cast(Pid, Msg)
end.
abcast(Nodes, Msg) ->
gen_server:abcast(Nodes, ?SERVER_NAME, Msg).
unsafe_call(Msg) ->
case whereis(?SERVER_NAME) of
undefined -> {error, {node_not_running, node()}};
Pid -> gen_server:call(Pid, Msg, infinity)
end.
call(Msg) ->
case whereis(?SERVER_NAME) of
undefined ->
{error, {node_not_running, node()}};
Pid ->
link(Pid),
Res = gen_server:call(Pid, Msg, infinity),
unlink(Pid),
%% We get an exit signal if server dies
receive
{'EXIT', Pid, _Reason} ->
{error, {node_not_running, node()}}
after 0 ->
Res
end
end.
remote_call(Node, Func, Args) ->
case catch gen_server:call({?MODULE, Node}, {Func, Args, self()}, infinity) of
{'EXIT', Error} ->
{error, Error};
Else ->
Else
end.
multicall(Nodes, Msg) ->
{Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity),
PatchedGood = [Reply || {_Node, Reply} <- Good],
{PatchedGood, Bad}. %% Make the replies look like rpc:multicalls..
%% rpc:multicall(Nodes, ?MODULE, call, [Msg]).
%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% {stop, Reason}
%%----------------------------------------------------------------------
init([Parent]) ->
process_flag(trap_exit, true),
mnesia_lib:verbose("~p starting: ~p~n", [?SERVER_NAME, self()]),
%% Handshake and initialize transaction recovery
%% for new nodes detected in the schema
All = mnesia_lib:all_nodes(),
Diff = All -- [node() | val(original_nodes)],
mnesia_lib:unset(original_nodes),
mnesia_recover:connect_nodes(Diff),
Interval = mnesia_monitor:get_env(dump_log_time_threshold),
Msg = {async_dump_log, time_threshold},
{ok, Ref} = timer:send_interval(Interval, Msg),
mnesia_dumper:start_regulator(),
Empty = gb_trees:empty(),
{ok, #state{supervisor = Parent, dump_log_timer_ref = Ref,
loader_queue = Empty,
late_loader_queue = Empty}}.
%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, Reply, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call({sync_dump_log, InitBy}, From, State) ->
Worker = #dump_log{initiated_by = InitBy,
opt_reply_to = From
},
State2 = add_worker(Worker, State),
noreply(State2);
handle_call(wait_for_schema_commit_lock, From, State) ->
Worker = #schema_commit_lock{owner = From},
State2 = add_worker(Worker, State),
noreply(State2);
handle_call(block_controller, From, State) ->
Worker = #block_controller{owner = From},
State2 = add_worker(Worker, State),
noreply(State2);
handle_call({update,Fun}, From, State) ->
Res = (catch Fun()),
reply(From, Res),
noreply(State);
handle_call(get_cstructs, From, State) ->
Tabs = val({schema, tables}),
Cstructs = [val({T, cstruct}) || T <- Tabs],
Running = val({current, db_nodes}),
reply(From, {cstructs, Cstructs, Running}),
noreply(State);
handle_call({schema_is_merged, [], late_merge, []}, From,
State = #state{schema_is_merged = Merged}) ->
case Merged of
{false, Node} when Node == node(From) ->
Msgs = State#state.early_msgs,
State1 = State#state{early_msgs = [], schema_is_merged = true},
handle_early_msgs(lists:reverse(Msgs), State1);
_ ->
%% Ooops this came to early, before we have merged :-)
%% or it came to late or from a node we don't care about
reply(From, ignore),
noreply(State)
end;
handle_call({schema_is_merged, TabsR, Reason, RemoteLoaders}, From, State) ->
State2 = late_disc_load(TabsR, Reason, RemoteLoaders, From, State),
%% Handle early messages
Msgs = State2#state.early_msgs,
State3 = State2#state{early_msgs = [], schema_is_merged = true},
handle_early_msgs(lists:reverse(Msgs), State3);
handle_call(disc_load_intents,From,State = #state{loader_queue=LQ,late_loader_queue=LLQ}) ->
LQTabs = gb_trees:keys(LQ),
LLQTabs = gb_trees:keys(LLQ),
ActiveTabs = lists:sort(mnesia_lib:local_active_tables()),
reply(From, {ok, node(), ordsets:union([LQTabs,LLQTabs,ActiveTabs])}),
noreply(State);
handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) ->
Current = val({current, db_nodes}),
Res =
case lists:member(AddNode, Current) and
(State#state.schema_is_merged == true) of
true ->
mnesia_lib:add_lsort({Tab, where_to_write}, AddNode);
false ->
ignore
end,
{reply, Res, State};
handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
ReplyTo, State) ->
KnownNode = lists:member(ToNode, val({current, db_nodes})),
Merged = State#state.schema_is_merged,
if
KnownNode == false ->
reply(ReplyTo, ignore),
noreply(State);
Merged == true ->
Res = case ?catch_val({Tab, cstruct}) of
{'EXIT', _} -> %% Tab deleted
deleted;
_ ->
add_active_replica(Tab, ToNode, RemoteS, AccessMode)
end,
reply(ReplyTo, Res),
noreply(State);
true -> %% Schema is not merged
Msg = {add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
Msgs = State#state.early_msgs,
reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge
noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
end;
handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) ->
KnownNode = lists:member(node(From), val({current, db_nodes})),
Merged = State#state.schema_is_merged,
if
KnownNode == false ->
reply(ReplyTo, ignore),
noreply(State);
Merged == true ->
Res = unannounce_add_table_copy(Tab, Node),
reply(ReplyTo, Res),
noreply(State);
true -> %% Schema is not merged
Msg = {unannounce_add_table_copy, [Tab, Node], From},
Msgs = State#state.early_msgs,
reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge
%% Set ReplyTO to undefined so we don't reply twice
noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
end;
handle_call({net_load, Tab, Cs}, From, State) ->
State2 =
case State#state.schema_is_merged of
true ->
Worker = #net_load{table = Tab,
opt_reply_to = From,
reason = {dumper,add_table_copy},
cstruct = Cs
},
add_worker(Worker, State);
false ->
reply(From, {not_loaded, schema_not_merged}),
State
end,
noreply(State2);
handle_call(Msg, From, State) when State#state.schema_is_merged /= true ->
%% Buffer early messages
Msgs = State#state.early_msgs,
noreply(State#state{early_msgs = [{call, Msg, From} | Msgs]});
handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) ->
State2 = late_disc_load(Tabs, Reason, RemoteLoaders, From, State),
noreply(State2);
handle_call({unblock_table, Tab}, _Dummy, State) ->
Var = {Tab, where_to_commit},
case val(Var) of
{blocked, List} ->
set(Var, List); % where_to_commit
_ ->
ignore
end,
{reply, ok, State};
handle_call({block_table, [Tab], From}, _Dummy, State) ->
case lists:member(node(From), val({current, db_nodes})) of
true ->
block_table(Tab);
false ->
ignore
end,
{reply, ok, State};
handle_call({check_w2r, _Node, Tab}, _From, State) ->
{reply, val({Tab, where_to_read}), State};
handle_call({add_other, Who}, _From, State = #state{others=Others0}) ->
Others = [Who|Others0],
{reply, ok, State#state{others=Others}};
handle_call({del_other, Who}, _From, State = #state{others=Others0}) ->
Others = lists:delete(Who, Others0),
{reply, ok, State#state{others=Others}};
handle_call(Msg, _From, State) ->
error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]),
noreply(State).
late_disc_load(TabsR, Reason, RemoteLoaders, From,
State = #state{loader_queue = LQ, late_loader_queue = LLQ}) ->
verbose("Intend to load tables: ~p~n", [TabsR]),
?eval_debug_fun({?MODULE, late_disc_load},
[{tabs, TabsR},
{reason, Reason},
{loaders, RemoteLoaders}]),
reply(From, queued),
%% RemoteLoaders is a list of {ok, Node, Tabs} tuples
%% Remove deleted tabs and queued/loaded
LocalTabs = gb_sets:from_ordset(lists:sort(mnesia_lib:val({schema,local_tables}))),
Filter = fun(TabInfo0, Acc) ->
TabInfo = {Tab,_} =
case TabInfo0 of
{_,_} -> TabInfo0;
TabN -> {TabN,Reason}
end,
case gb_sets:is_member(Tab, LocalTabs) of
true ->
case ?catch_val({Tab, where_to_read}) == node() of
true -> Acc;
false ->
case gb_trees:is_defined(Tab,LQ) of
true -> Acc;
false -> [TabInfo | Acc]
end
end;
false -> Acc
end
end,
Tabs = lists:foldl(Filter, [], TabsR),
Nodes = val({current, db_nodes}),
LateQueue = late_loaders(Tabs, RemoteLoaders, Nodes, LLQ),
State#state{late_loader_queue = LateQueue}.
late_loaders([{Tab, Reason} | Tabs], RemoteLoaders, Nodes, LLQ) ->
case gb_trees:is_defined(Tab, LLQ) of
false ->
LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []),
case LoadNodes of
[] -> cast({disc_load, Tab, Reason}); % Ugly cast
_ -> ignore
end,
LateLoad = #late_load{table=Tab,loaders=LoadNodes,reason=Reason},
late_loaders(Tabs, RemoteLoaders, Nodes, gb_trees:insert(Tab,LateLoad,LLQ));
true ->
late_loaders(Tabs, RemoteLoaders, Nodes, LLQ)
end;
late_loaders([], _RemoteLoaders, _Nodes, LLQ) ->
LLQ.
late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) ->
late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
late_load_filter([{badrpc, _} | RemoteLoaders], Tab, Nodes, Acc) ->
late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) ->
{ok, Node, Intents} = RL,
Access = val({Tab, access_mode}),
LocalC = val({Tab, local_content}),
StillActive = lists:member(Node, Nodes),
RemoteIntent = lists:member(Tab, Intents),
if
Access == read_write,
LocalC == false,
StillActive == true,
RemoteIntent == true ->
Masters = mnesia_recover:get_master_nodes(Tab),
case lists:member(Node, Masters) of
true ->
%% The other node is master node for
%% the table, accept his load intent
late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);
false when Masters == [] ->
%% The table has no master nodes
%% accept his load intent
late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);
false ->
%% Some one else is master node for
%% the table, ignore his load intent
late_load_filter(RemoteLoaders, Tab, Nodes, Acc)
end;
true ->
late_load_filter(RemoteLoaders, Tab, Nodes, Acc)
end;
late_load_filter([], _Tab, _Nodes, Acc) ->
Acc.
%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast({release_schema_commit_lock, _Owner}, State) ->
if
State#state.is_stopping == true ->
{stop, shutdown, State};
true ->
case State#state.dumper_queue of
[#schema_commit_lock{}|Rest] ->
[_Worker | Rest] = State#state.dumper_queue,
State2 = State#state{dumper_pid = undefined,
dumper_queue = Rest},
State3 = opt_start_worker(State2),
noreply(State3);
_ ->
noreply(State)
end
end;
handle_cast(unblock_controller, State) ->
if
State#state.is_stopping == true ->
{stop, shutdown, State};
is_record(hd(State#state.dumper_queue), block_controller) ->
[_Worker | Rest] = State#state.dumper_queue,
State2 = State#state{dumper_pid = undefined,
dumper_queue = Rest},
State3 = opt_start_worker(State2),
noreply(State3)
end;
handle_cast({mnesia_down, Node}, State) ->
maybe_log_mnesia_down(Node),
mnesia_lib:del({current, db_nodes}, Node),
mnesia_checkpoint:tm_mnesia_down(Node),
Alltabs = val({schema, tables}),
reconfigure_tables(Node, Alltabs),
%% Done from (external point of view)
mnesia_monitor:mnesia_down(?SERVER_NAME, Node),
%% Fix if we are late_merging against the node that went down
case State#state.schema_is_merged of
{false, Node} ->
spawn(?MODULE, call, [{schema_is_merged, [], late_merge, []}]);
_ ->
ignore
end,
%% Fix internal stuff
LateQ = remove_loaders(Alltabs, Node, State#state.late_loader_queue),
case get_senders(State) ++ get_loaders(State) of
[] -> ignore;
Senders ->
lists:foreach(fun({Pid,_}) -> Pid ! {copier_done, Node} end,
Senders)
end,
lists:foreach(fun(Pid) -> Pid ! {copier_done,Node} end,
State#state.others),
Remove = fun(ST) ->
node(ST#send_table.receiver_pid) /= Node
end,
NewSenders = lists:filter(Remove, State#state.sender_queue),
Early = remove_early_messages(State#state.early_msgs, Node),
noreply(State#state{sender_queue = NewSenders,
early_msgs = Early,
late_loader_queue = LateQ
});
handle_cast({merging_schema, Node}, State) ->
case State#state.schema_is_merged of
false ->
%% This comes from dynamic connect_nodes which are made
%% after mnesia:start() and the schema_merge.
ImANewKidInTheBlock =
(val({schema, storage_type}) == ram_copies)
andalso (mnesia_lib:val({schema, local_tables}) == [schema]),
case ImANewKidInTheBlock of
true -> %% I'm newly started ram_node..
noreply(State#state{schema_is_merged = {false, Node}});
false ->
noreply(State)
end;
_ -> %% Already merging schema.
noreply(State)
end;
handle_cast(Msg, State) when State#state.schema_is_merged /= true ->
%% Buffer early messages
Msgs = State#state.early_msgs,
noreply(State#state{early_msgs = [{cast, Msg} | Msgs]});
%% This must be done after schema_is_merged otherwise adopt_orphan
%% might trigger a table load from wrong nodes as a result of that we don't
%% know which tables we can load safly first.
handle_cast({im_running, _Node, NewFriends}, State) ->
LocalTabs = mnesia_lib:local_active_tables() -- [schema],
RemoveLocalOnly = fun(Tab) -> not val({Tab, local_content}) end,
Tabs = lists:filter(RemoveLocalOnly, LocalTabs),
Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})),
abcast(Ns, {adopt_orphans, node(), Tabs}),
noreply(State);
handle_cast({disc_load, Tab, Reason}, State) ->
Worker = #disc_load{table = Tab, reason = Reason},
State2 = add_worker(Worker, State),
noreply(State2);
handle_cast(Worker = #send_table{}, State) ->
State2 = add_worker(Worker, State),
noreply(State2);
handle_cast({sync_tabs, Tabs, From}, State) ->
%% user initiated wait_for_tables
handle_sync_tabs(Tabs, From),
noreply(State);
handle_cast({i_have_tab, Tab, Node}, State) ->
case lists:member(Node, val({current, db_nodes})) of
true ->
State2 = node_has_tabs([Tab], Node, State),
noreply(State2);
false ->
noreply(State)
end;
handle_cast({force_load_updated, Tab}, State) ->
case val({Tab, active_replicas}) of
[] ->
%% No valid replicas
noreply(State);
[SomeNode | _] ->
State2 = node_has_tabs([Tab], SomeNode, State),
noreply(State2)
end;
handle_cast({master_nodes_updated, Tab, Masters}, State) ->
Active = val({Tab, active_replicas}),
Valid =
case val({Tab, load_by_force}) of
true ->
Active;
false ->
if
Masters == [] ->
Active;
true ->
mnesia_lib:intersect(Masters, Active)
end
end,
case Valid of
[] ->
%% No valid replicas
noreply(State);
[SomeNode | _] ->
State2 = node_has_tabs([Tab], SomeNode, State),
noreply(State2)
end;
handle_cast({adopt_orphans, Node, Tabs}, State) ->
State2 = node_has_tabs(Tabs, Node, State),
%% Register the other node as up and running
mnesia_recover:log_mnesia_up(Node),
verbose("Logging mnesia_up ~w~n",[Node]),
mnesia_lib:report_system_event({mnesia_up, Node}),
%% Load orphan tables
LocalTabs = val({schema, local_tables}) -- [schema],
Nodes = val({current, db_nodes}),
{LocalOrphans, RemoteMasters} =
orphan_tables(LocalTabs, Node, Nodes, [], []),
Reason = {adopt_orphan, node()},
mnesia_late_loader:async_late_disc_load(node(), LocalOrphans, Reason),
Fun =
fun(N) ->
RemoteOrphans =
[Tab || {Tab, Ns} <- RemoteMasters,
lists:member(N, Ns)],
mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason)
end,
lists:foreach(Fun, Nodes),
noreply(State2);
handle_cast(Msg, State) ->
error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]),
noreply(State).
handle_sync_tabs([Tab | Tabs], From) ->
case val({Tab, where_to_read}) of
nowhere ->
case get({sync_tab, Tab}) of
undefined ->
put({sync_tab, Tab}, [From]);
Pids ->
put({sync_tab, Tab}, [From | Pids])
end;
_ ->
sync_reply(From, Tab)
end,
handle_sync_tabs(Tabs, From);
handle_sync_tabs([], _From) ->
ok.
%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_info({async_dump_log, InitBy}, State) ->
Worker = #dump_log{initiated_by = InitBy},
State2 = add_worker(Worker, State),
noreply(State2);
handle_info(#dumper_done{worker_pid=Pid, worker_res=Res}, State) ->
if
State#state.is_stopping == true ->
{stop, shutdown, State};
Res == dumped, Pid == State#state.dumper_pid ->
[Worker | Rest] = State#state.dumper_queue,
reply(Worker#dump_log.opt_reply_to, Res),
State2 = State#state{dumper_pid = undefined,
dumper_queue = Rest},
State3 = opt_start_worker(State2),
noreply(State3);
true ->
fatal("Dumper failed: ~p~n state: ~p~n", [Res, State]),
{stop, fatal, State}
end;
handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) ->
LateQueue0 = State0#state.late_loader_queue,
State1 = State0#state{loader_pid = lists:keydelete(WPid,1,get_loaders(State0))},
State2 =
case Done#loader_done.is_loaded of
true ->
%% Optional table announcement
if
Done#loader_done.needs_announce == true,
Done#loader_done.needs_reply == true ->
i_have_tab(Tab),
%% Should be {dumper,add_table_copy} only
reply(Done#loader_done.reply_to,
Done#loader_done.reply);
Done#loader_done.needs_reply == true ->
%% Should be {dumper,add_table_copy} only
reply(Done#loader_done.reply_to,
Done#loader_done.reply);
Done#loader_done.needs_announce == true, Tab == schema ->
i_have_tab(Tab);
Done#loader_done.needs_announce == true ->
i_have_tab(Tab),
%% Local node needs to perform user_sync_tab/1
Ns = val({current, db_nodes}),
abcast(Ns, {i_have_tab, Tab, node()});
Tab == schema ->
ignore;
true ->
%% Local node needs to perform user_sync_tab/1
Ns = val({current, db_nodes}),
AlreadyKnows = val({Tab, active_replicas}),
abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()})
end,
%% Optional user sync
case Done#loader_done.needs_sync of
true -> user_sync_tab(Tab);
false -> ignore
end,
State1#state{late_loader_queue=gb_trees:delete_any(Tab, LateQueue0)};
false ->
%% Either the node went down or table was not
%% loaded remotly yet
case Done#loader_done.needs_reply of
true ->
reply(Done#loader_done.reply_to,
Done#loader_done.reply);
false ->
ignore
end,
case ?catch_val({Tab, active_replicas}) of
[_|_] -> % still available elsewhere
{value,{_,Worker}} = lists:keysearch(WPid,1,get_loaders(State0)),
add_loader(Tab,Worker,State1);
_ ->
State1
end
end,
State3 = opt_start_worker(State2),
noreply(State3);
handle_info(#sender_done{worker_pid=Pid, worker_res=Res}, State) ->
Senders = get_senders(State),
{value, {Pid,_Worker}} = lists:keysearch(Pid, 1, Senders),
if
Res == ok ->
State2 = State#state{sender_pid = lists:keydelete(Pid, 1, Senders)},
State3 = opt_start_worker(State2),
noreply(State3);
true ->
%% No need to send any message to the table receiver
%% since it will soon get a mnesia_down anyway
fatal("Sender failed: ~p~n state: ~p~n", [Res, State]),
{stop, fatal, State}
end;
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor ->
catch set(mnesia_status, stopping),
case State#state.dumper_pid of
undefined ->
dbg_out("~p was ~p~n", [?SERVER_NAME, R]),
{stop, shutdown, State};
_ ->
noreply(State#state{is_stopping = true})
end;
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid ->
case State#state.dumper_queue of
[#schema_commit_lock{}|Workers] -> %% Schema trans crashed or was killed
dbg_out("WARNING: Dumper ~p exited ~p~n", [Pid, R]),
State2 = State#state{dumper_queue = Workers, dumper_pid = undefined},
State3 = opt_start_worker(State2),
noreply(State3);
_Other ->
fatal("Dumper or schema commit crashed: ~p~n state: ~p~n", [R, State]),
{stop, fatal, State}
end;
handle_info(Msg = {'EXIT', Pid, R}, State) when R /= wait_for_tables_timeout ->
case lists:keymember(Pid, 1, get_senders(State)) of
true ->
%% No need to send any message to the table receiver
%% since it will soon get a mnesia_down anyway
fatal("Sender crashed: ~p~n state: ~p~n", [{Pid,R}, State]),
{stop, fatal, State};
false ->
case lists:keymember(Pid, 1, get_loaders(State)) of
true ->
fatal("Loader crashed: ~p~n state: ~p~n", [R, State]),
{stop, fatal, State};
false ->
error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]),
noreply(State)
end
end;
handle_info({From, get_state}, State) ->
From ! {?SERVER_NAME, State},
noreply(State);
%% No real need for buffering
handle_info(Msg, State) when State#state.schema_is_merged /= true ->
%% Buffer early messages
Msgs = State#state.early_msgs,
noreply(State#state{early_msgs = [{info, Msg} | Msgs]});
handle_info({'EXIT', Pid, wait_for_tables_timeout}, State) ->
sync_tab_timeout(Pid, get()),
noreply(State);
handle_info(Msg, State) ->
error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]),
noreply(State).
sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) ->
case lists:delete(Pid, Pids) of
[] ->
erase({sync_tab, Tab});
Pids2 ->
put({sync_tab, Tab}, Pids2)
end,
sync_tab_timeout(Pid, Tail);
sync_tab_timeout(Pid, [_ | Tail]) ->
sync_tab_timeout(Pid, Tail);
sync_tab_timeout(_Pid, []) ->
ok.
%% Pick the load record that has the highest load order
%% Returns {BestLoad, RemainingQueue} or {none, []} if queue is empty
pick_next(Queue) ->
List = gb_trees:values(Queue),
case pick_next(List, none, none) of
none -> {none, gb_trees:empty()};
{Tab, Worker} -> {Worker, gb_trees:delete(Tab,Queue)}
end.
pick_next([Head = #net_load{table=Tab}| Tail], Load, Order) ->
select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);
pick_next([Head = #disc_load{table=Tab}| Tail], Load, Order) ->
select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);
pick_next([], none, _Order) ->
none;
pick_next([], Load, _Order) ->
{element(2,Load), Load}.
select_best(_Head, Tail, {'EXIT', _WHAT}, Load, Order) ->
%% Table have been deleted drop it.
pick_next(Tail, Load, Order);
select_best(Load, Tail, Order, none, none) ->
pick_next(Tail, Load, Order);
select_best(Load, Tail, Order, _OldLoad, OldOrder) when Order > OldOrder ->
pick_next(Tail, Load, Order);
select_best(_Load, Tail, _Order, OldLoad, OldOrder) ->
pick_next(Tail, OldLoad, OldOrder).
%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason, State) ->
mnesia_monitor:terminate_proc(?SERVER_NAME, Reason, State).
%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Upgrade process when its code is to be changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(_OldVsn, State0, _Extra) ->
%% Loader Queue
State1 = case State0#state.loader_pid of
Pids when is_list(Pids) -> State0;
undefined -> State0#state{loader_pid = [],loader_queue=gb_trees:empty()};
Pid when is_pid(Pid) ->
[Loader|Rest] = State0#state.loader_queue,
LQ0 = [{element(2,Rec),Rec} || Rec <- Rest],
LQ1 = lists:sort(LQ0),
LQ = gb_trees:from_orddict(LQ1),
State0#state{loader_pid=[{Pid,Loader}], loader_queue=LQ}
end,
%% LateLoaderQueue
State = if is_list(State1#state.late_loader_queue) ->
LLQ0 = State1#state.late_loader_queue,
LLQ1 = lists:sort([{element(2,Rec),Rec} || Rec <- LLQ0]),
LLQ = gb_trees:from_orddict(LLQ1),
State1#state{late_loader_queue=LLQ};
true ->
State1
end,
{ok, State}.
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
maybe_log_mnesia_down(N) ->
%% We use mnesia_down when deciding which tables to load locally,
%% so if we are not running (i.e haven't decided which tables
%% to load locally), don't log mnesia_down yet.
case mnesia_lib:is_running() of
yes ->
verbose("Logging mnesia_down ~w~n", [N]),
mnesia_recover:log_mnesia_down(N),
ok;
_ ->
Filter = fun(Tab) ->
inactive_copy_holders(Tab, N)
end,
HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]),
if
HalfLoadedTabs == true ->
verbose("Logging mnesia_down ~w~n", [N]),
mnesia_recover:log_mnesia_down(N),
ok;
true ->
%% Unfortunately we have not loaded some common
%% tables yet, so we cannot rely on the nodedown
log_later %% BUGBUG handle this case!!!
end
end.
inactive_copy_holders(Tab, Node) ->
Cs = val({Tab, cstruct}),
case mnesia_lib:cs_to_storage_type(Node, Cs) of
unknown ->
false;
_Storage ->
mnesia_lib:not_active_here(Tab)
end.
orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) ->
Cs = val({Tab, cstruct}),
CopyHolders = mnesia_lib:copy_holders(Cs),
RamCopyHolders = Cs#cstruct.ram_copies,
DiscCopyHolders = CopyHolders -- RamCopyHolders,
DiscNodes = val({schema, disc_copies}),
LocalContent = Cs#cstruct.local_content,
RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes),
Active = val({Tab, active_replicas}),
BeingCreated = (?catch_val({Tab, create_table}) == true),
Read = val({Tab, where_to_read}),
case lists:member(Node, DiscCopyHolders) of
_ when BeingCreated == true ->
orphan_tables(Tabs, Node, Ns, Local, Remote);
_ when Read == node() -> %% Allready loaded
orphan_tables(Tabs, Node, Ns, Local, Remote);
true when Active == [] ->
case DiscCopyHolders -- Ns of
[] ->
%% We're last up and the other nodes have not
%% loaded the table. Lets load it if we are
%% the smallest node.
case lists:min(DiscCopyHolders) of
Min when Min == node() ->
case mnesia_recover:get_master_nodes(Tab) of
[] ->
L = [Tab | Local],
orphan_tables(Tabs, Node, Ns, L, Remote);
Masters ->
R = [{Tab, Masters} | Remote],
orphan_tables(Tabs, Node, Ns, Local, R)
end;
_ ->
orphan_tables(Tabs, Node, Ns, Local, Remote)
end;
_ ->
orphan_tables(Tabs, Node, Ns, Local, Remote)
end;
false when Active == [], DiscCopyHolders == [], RamCopyHoldersOnDiscNodes == [] ->
%% Special case when all replicas resides on disc less nodes
orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);
_ when LocalContent == true ->
orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);
_ ->
orphan_tables(Tabs, Node, Ns, Local, Remote)
end;
orphan_tables([], _, _, LocalOrphans, RemoteMasters) ->
{LocalOrphans, RemoteMasters}.
node_has_tabs([Tab | Tabs], Node, State) when Node /= node() ->
State2 =
case catch update_whereabouts(Tab, Node, State) of
State1 = #state{} -> State1;
{'EXIT', R} -> %% Tab was just deleted?
case ?catch_val({Tab, cstruct}) of
{'EXIT', _} -> State; % yes
_ -> erlang:error(R)
end
end,
node_has_tabs(Tabs, Node, State2);
node_has_tabs([Tab | Tabs], Node, State) ->
user_sync_tab(Tab),
node_has_tabs(Tabs, Node, State);
node_has_tabs([], _Node, State) ->
State.
update_whereabouts(Tab, Node, State) ->
Storage = val({Tab, storage_type}),
Read = val({Tab, where_to_read}),
LocalC = val({Tab, local_content}),
BeingCreated = (?catch_val({Tab, create_table}) == true),
Masters = mnesia_recover:get_master_nodes(Tab),
ByForce = val({Tab, load_by_force}),
GoGetIt =
if
ByForce == true ->
true;
Masters == [] ->
true;
true ->
lists:member(Node, Masters)
end,
dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n",
[Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]),
if
LocalC == true ->
%% Local contents, don't care about other node
State;
BeingCreated == true ->
%% The table is currently being created
%% It will be handled elsewhere
State;
Storage == unknown, Read == nowhere ->
%% No own copy, time to read remotely
%% if the other node is a good node
add_active_replica(Tab, Node),
case GoGetIt of
true ->
set({Tab, where_to_read}, Node),
user_sync_tab(Tab),
State;
false ->
State
end;
Storage == unknown ->
%% No own copy, continue to read remotely
add_active_replica(Tab, Node),
NodeST = mnesia_lib:storage_type_at_node(Node, Tab),
ReadST = mnesia_lib:storage_type_at_node(Read, Tab),
if %% Avoid reading from disc_only_copies
NodeST == disc_only_copies ->
ignore;
ReadST == disc_only_copies ->
mnesia_lib:set_remote_where_to_read(Tab);
true ->
ignore
end,
user_sync_tab(Tab),
State;
Read == nowhere ->
%% Own copy, go and get a copy of the table
%% if the other node is master or if there
%% are no master at all
add_active_replica(Tab, Node),
case GoGetIt of
true ->
Worker = #net_load{table = Tab,
reason = {active_remote, Node}},
add_worker(Worker, State);
false ->
State
end;
true ->
%% We already have an own copy
add_active_replica(Tab, Node),
user_sync_tab(Tab),
State
end.
initial_safe_loads() ->
case val({schema, storage_type}) of
ram_copies ->
Downs = [],
Tabs = val({schema, local_tables}) -- [schema],
LastC = fun(T) -> last_consistent_replica(T, Downs) end,
lists:zf(LastC, Tabs);
disc_copies ->
Downs = mnesia_recover:get_mnesia_downs(),
dbg_out("mnesia_downs = ~p~n", [Downs]),
Tabs = val({schema, local_tables}) -- [schema],
LastC = fun(T) -> last_consistent_replica(T, Downs) end,
lists:zf(LastC, Tabs)
end.
last_consistent_replica(Tab, Downs) ->
Cs = val({Tab, cstruct}),
Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
Ram = Cs#cstruct.ram_copies,
Disc = Cs#cstruct.disc_copies,
DiscOnly = Cs#cstruct.disc_only_copies,
BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs,
BetterCopies = BetterCopies0 -- Ram,
AccessMode = Cs#cstruct.access_mode,
Copies = mnesia_lib:copy_holders(Cs),
Masters = mnesia_recover:get_master_nodes(Tab),
LocalMaster0 = lists:member(node(), Masters),
LocalContent = Cs#cstruct.local_content,
RemoteMaster =
if
Masters == [] -> false;
true -> not LocalMaster0
end,
LocalMaster =
if
Masters == [] -> false;
true -> LocalMaster0
end,
if
Copies == [node()] ->
%% Only one copy holder and it is local.
%% It may also be a local contents table
{true, {Tab, local_only}};
LocalContent == true ->
{true, {Tab, local_content}};
LocalMaster == true ->
%% We have a local master
{true, {Tab, local_master}};
RemoteMaster == true ->
%% Wait for remote master copy
false;
Storage == ram_copies ->
if
Disc == [], DiscOnly == [] ->
%% Nobody has copy on disc
{true, {Tab, ram_only}};
true ->
%% Some other node has copy on disc
false
end;
AccessMode == read_only ->
%% No one has been able to update the table,
%% i.e. all disc resident copies are equal
{true, {Tab, read_only}};
BetterCopies /= [], Masters /= [node()] ->
%% There are better copies on other nodes
%% and we do not have the only master copy
false;
true ->
{true, {Tab, initial}}
end.
reconfigure_tables(N, [Tab |Tail]) ->
del_active_replica(Tab, N),
case val({Tab, where_to_read}) of
N -> mnesia_lib:set_remote_where_to_read(Tab);
_ -> ignore
end,
reconfigure_tables(N, Tail);
reconfigure_tables(_, []) ->
ok.
remove_loaders([Tab| Tabs], N, Loaders) ->
LateQ = drop_loaders(Tab, N, Loaders),
remove_loaders(Tabs, N, LateQ);
remove_loaders([],_, LateQ) -> LateQ.
remove_early_messages([], _Node) ->
[];
remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) ->
remove_early_messages(R, Node); %% Does a reply before queuing
remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node)
when node(From) == Node ->
reply(ReplyTo, ok), %% Remove gen:server waits..
remove_early_messages(R, Node);
remove_early_messages([{cast, {i_have_tab, _Tab, Node}}|R], Node) ->
remove_early_messages(R, Node);
remove_early_messages([{cast, {adopt_orphans, Node, _Tabs}}|R], Node) ->
remove_early_messages(R, Node);
remove_early_messages([M|R],Node) ->
[M|remove_early_messages(R,Node)].
%% Drop loader from late load queue and possibly trigger a disc_load
drop_loaders(Tab, Node, LLQ) ->
case gb_trees:lookup(Tab,LLQ) of
none ->
LLQ;
{value, H} ->
%% Check if it is time to issue a disc_load request
case H#late_load.loaders of
[Node] ->
Reason = {H#late_load.reason, last_loader_down, Node},
cast({disc_load, Tab, Reason}); % Ugly cast
_ ->
ignore
end,
%% Drop the node from the list of loaders
H2 = H#late_load{loaders = H#late_load.loaders -- [Node]},
gb_trees:update(Tab, H2, LLQ)
end.
add_active_replica(Tab, Node) ->
add_active_replica(Tab, Node, val({Tab, cstruct})).
add_active_replica(Tab, Node, Cs = #cstruct{}) ->
Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs),
AccessMode = Cs#cstruct.access_mode,
add_active_replica(Tab, Node, Storage, AccessMode).
%% Block table primitives
block_table(Tab) ->
Var = {Tab, where_to_commit},
Old = val(Var),
New = {blocked, Old},
set(Var, New). % where_to_commit
unblock_table(Tab) ->
call({unblock_table, Tab}).
is_tab_blocked(W2C) when is_list(W2C) ->
{false, W2C};
is_tab_blocked({blocked, W2C}) when is_list(W2C) ->
{true, W2C}.
mark_blocked_tab(true, Value) ->
{blocked, Value};
mark_blocked_tab(false, Value) ->
Value.
%%
add_active_replica(Tab, Node, Storage, AccessMode) ->
Var = {Tab, where_to_commit},
{Blocked, Old} = is_tab_blocked(val(Var)),
Del = lists:keydelete(Node, 1, Old),
case AccessMode of
read_write ->
New = lists:sort([{Node, Storage} | Del]),
set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit
mnesia_lib:add_lsort({Tab, where_to_write}, Node);
read_only ->
set(Var, mark_blocked_tab(Blocked, Del)),
mnesia_lib:del({Tab, where_to_write}, Node)
end,
add({Tab, active_replicas}, Node).
del_active_replica(Tab, Node) ->
Var = {Tab, where_to_commit},
{Blocked, Old} = is_tab_blocked(val(Var)),
Del = lists:keydelete(Node, 1, Old),
New = lists:sort(Del),
set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit
mnesia_lib:del({Tab, active_replicas}, Node),
mnesia_lib:del({Tab, where_to_write}, Node).
change_table_access_mode(Cs) ->
W = fun() ->
Tab = Cs#cstruct.name,
lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end,
val({Tab, active_replicas}))
end,
update(W).
%% node To now has tab loaded, but this must be undone
%% This code is rpc:call'ed from the tab_copier process
%% when it has *not* released it's table lock
unannounce_add_table_copy(Tab, To) ->
catch del_active_replica(Tab, To),
case catch val({Tab , where_to_read}) of
To ->
mnesia_lib:set_remote_where_to_read(Tab);
_ ->
ignore
end.
user_sync_tab(Tab) ->
case val(debug) of
trace ->
mnesia_subscr:subscribe(whereis(mnesia_event), {table, Tab});
_ ->
ignore
end,
case erase({sync_tab, Tab}) of
undefined ->
ok;
Pids ->
lists:foreach(fun(Pid) -> sync_reply(Pid, Tab) end, Pids)
end.
i_have_tab(Tab) ->
case val({Tab, local_content}) of
true ->
mnesia_lib:set_local_content_whereabouts(Tab);
false ->
set({Tab, where_to_read}, node())
end,
add_active_replica(Tab, node()).
sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema ->
Current = val({current, db_nodes}),
Ns =
case lists:member(ToNode, Current) of
true -> Current -- [ToNode];
false -> Current
end,
remote_call(ToNode, block_table, [Tab]),
[remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) ||
Node <- [ToNode | Ns]],
ok.
sync_del_table_copy_whereabouts(Tab, ToNode) when Tab /= schema ->
Current = val({current, db_nodes}),
Ns =
case lists:member(ToNode, Current) of
true -> Current;
false -> [ToNode | Current]
end,
Args = [Tab, ToNode],
[remote_call(Node, unannounce_add_table_copy, Args) || Node <- Ns],
ok.
get_info(Timeout) ->
case whereis(?SERVER_NAME) of
undefined ->
{timeout, Timeout};
Pid ->
Pid ! {self(), get_state},
receive
{?SERVER_NAME, State = #state{loader_queue=LQ,late_loader_queue=LLQ}} ->
{info,State#state{loader_queue=gb_trees:to_list(LQ),
late_loader_queue=gb_trees:to_list(LLQ)}}
after Timeout ->
{timeout, Timeout}
end
end.
get_workers(Timeout) ->
case whereis(?SERVER_NAME) of
undefined ->
{timeout, Timeout};
Pid ->
Pid ! {self(), get_state},
receive
{?SERVER_NAME, State = #state{}} ->
{workers, get_loaders(State), get_senders(State), State#state.dumper_pid}
after Timeout ->
{timeout, Timeout}
end
end.
info() ->
Tabs = mnesia_lib:local_active_tables(),
io:format( "---> Active tables <--- ~n", []),
info(Tabs).
info([Tab | Tail]) ->
case val({Tab, storage_type}) of
disc_only_copies ->
info_format(Tab,
dets:info(Tab, size),
dets:info(Tab, file_size),
"bytes on disc");
_ ->
info_format(Tab,
?ets_info(Tab, size),
?ets_info(Tab, memory),
"words of mem")
end,
info(Tail);
info([]) -> ok.
info_format(Tab, Size, Mem, Media) ->
StrT = mnesia_lib:pad_name(atom_to_list(Tab), 15, []),
StrS = mnesia_lib:pad_name(integer_to_list(Size), 8, []),
StrM = mnesia_lib:pad_name(integer_to_list(Mem), 8, []),
io:format("~s: with ~s records occupying ~s ~s~n",
[StrT, StrS, StrM, Media]).
%% Handle early arrived messages
handle_early_msgs([Msg | Msgs], State) ->
%% The messages are in reverse order
case handle_early_msg(Msg, State) of
%% {stop, Reason, Reply, State2} -> % Will not happen according to dialyzer
%% {stop, Reason, Reply, State2};
{stop, Reason, State2} ->
{stop, Reason, State2};
{noreply, State2} ->
handle_early_msgs(Msgs, State2);
{reply, Reply, State2} ->
{call, _Call, From} = Msg,
reply(From, Reply),
handle_early_msgs(Msgs, State2)
end;
handle_early_msgs([], State) ->
noreply(State).
handle_early_msg({call, Msg, From}, State) ->
handle_call(Msg, From, State);
handle_early_msg({cast, Msg}, State) ->
handle_cast(Msg, State);
handle_early_msg({info, Msg}, State) ->
handle_info(Msg, State).
noreply(State) ->
{noreply, State}.
reply(undefined, Reply) ->
Reply;
reply(ReplyTo, Reply) ->
gen_server:reply(ReplyTo, Reply),
Reply.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Worker management
%% Returns new State
add_worker(Worker = #dump_log{}, State) ->
InitBy = Worker#dump_log.initiated_by,
Queue = State#state.dumper_queue,
case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of
true when Worker#dump_log.opt_reply_to == undefined ->
%% The same threshold has been exceeded again,
%% before we have had the possibility to
%% process the older one.
DetectedBy = {dump_log, InitBy},
Event = {mnesia_overload, DetectedBy},
mnesia_lib:report_system_event(Event);
_ ->
ignore
end,
Queue2 = Queue ++ [Worker],
State2 = State#state{dumper_queue = Queue2},
opt_start_worker(State2);
add_worker(Worker = #schema_commit_lock{}, State) ->
Queue = State#state.dumper_queue,
Queue2 = Queue ++ [Worker],
State2 = State#state{dumper_queue = Queue2},
opt_start_worker(State2);
add_worker(Worker = #net_load{}, State) ->
opt_start_worker(add_loader(Worker#net_load.table,Worker,State));
add_worker(Worker = #send_table{}, State) ->
Queue = State#state.sender_queue,
State2 = State#state{sender_queue = Queue ++ [Worker]},
opt_start_worker(State2);
add_worker(Worker = #disc_load{}, State) ->
opt_start_worker(add_loader(Worker#disc_load.table,Worker,State));
% Block controller should be used for upgrading mnesia.
add_worker(Worker = #block_controller{}, State) ->
Queue = State#state.dumper_queue,
Queue2 = [Worker | Queue],
State2 = State#state{dumper_queue = Queue2},
opt_start_worker(State2).
add_loader(Tab,Worker,State = #state{loader_queue=LQ0}) ->
case gb_trees:is_defined(Tab, LQ0) of
true -> State;
false ->
LQ=gb_trees:insert(Tab, Worker, LQ0),
State#state{loader_queue=LQ}
end.
%% Optionally start a worker
%%
%% Dumpers and loaders may run simultaneously
%% but neither of them may run during schema commit.
%% Loaders may not start if a schema commit is enqueued.
opt_start_worker(State) when State#state.is_stopping == true ->
State;
opt_start_worker(State) ->
%% Prioritize dumper and schema commit
%% by checking them first
case State#state.dumper_queue of
[Worker | _Rest] when State#state.dumper_pid == undefined ->
%% Great, a worker in queue and neither
%% a schema transaction is being
%% committed and nor a dumper is running
%% Start worker but keep him in the queue
if
is_record(Worker, schema_commit_lock) ->
ReplyTo = Worker#schema_commit_lock.owner,
reply(ReplyTo, granted),
{Owner, _Tag} = ReplyTo,
opt_start_loader(State#state{dumper_pid = Owner});
is_record(Worker, dump_log) ->
Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]),
State2 = State#state{dumper_pid = Pid},
%% If the worker was a dumper we may
%% possibly be able to start a loader
%% or sender
State3 = opt_start_sender(State2),
opt_start_loader(State3);
is_record(Worker, block_controller) ->
case {get_senders(State), get_loaders(State)} of
{[], []} ->
ReplyTo = Worker#block_controller.owner,
reply(ReplyTo, granted),
{Owner, _Tag} = ReplyTo,
State#state{dumper_pid = Owner};
_ ->
State
end
end;
_ ->
%% Bad luck, try with a loader or sender instead
State2 = opt_start_sender(State),
opt_start_loader(State2)
end.
opt_start_sender(State) ->
case State#state.sender_queue of
[]-> State; %% No need
SenderQ ->
{NewS,Kept} = opt_start_sender2(SenderQ, get_senders(State),
[], get_loaders(State)),
State#state{sender_pid = NewS, sender_queue = Kept}
end.
opt_start_sender2([], Pids,Kept, _) -> {Pids,Kept};
opt_start_sender2([Sender|R], Pids, Kept, LoaderQ) ->
Tab = Sender#send_table.table,
Active = val({Tab, active_replicas}),
IgotIt = lists:member(node(), Active),
IsLoading = lists:any(fun({_Pid,Loader}) ->
Tab == element(#net_load.table, Loader)
end, LoaderQ),
if
IgotIt, IsLoading ->
%% I'm currently finishing loading the table let him wait
opt_start_sender2(R,Pids, [Sender|Kept], LoaderQ);
IgotIt ->
%% Start worker but keep him in the queue
Pid = spawn_link(?MODULE, send_and_reply,[self(), Sender]),
opt_start_sender2(R,[{Pid,Sender}|Pids],Kept,LoaderQ);
true ->
verbose("Send table failed ~p not active on this node ~n", [Tab]),
Sender#send_table.receiver_pid ! {copier_done, node()},
opt_start_sender2(R,Pids, Kept, LoaderQ)
end.
opt_start_loader(State = #state{loader_queue = LoaderQ}) ->
Current = get_loaders(State),
Max = max_loaders(),
case gb_trees:is_empty(LoaderQ) of
true ->
State;
_ when length(Current) >= Max ->
State;
false ->
SchemaQueue = State#state.dumper_queue,
case lists:keymember(schema_commit_lock, 1, SchemaQueue) of
false ->
case pick_next(LoaderQ) of
{none,Rest} ->
State#state{loader_queue=Rest};
{Worker,Rest} ->
case already_loading(Worker, get_loaders(State)) of
true ->
opt_start_loader(State#state{loader_queue = Rest});
false ->
%% Start worker but keep him in the queue
Pid = load_and_reply(self(), Worker),
State#state{loader_pid=[{Pid,Worker}|get_loaders(State)],
loader_queue = Rest}
end
end;
true ->
%% Bad luck, we must wait for the schema commit
State
end
end.
already_loading(#net_load{table=Tab},Loaders) ->
already_loading2(Tab,Loaders);
already_loading(#disc_load{table=Tab},Loaders) ->
already_loading2(Tab,Loaders).
already_loading2(Tab, [{_,#net_load{table=Tab}}|_]) -> true;
already_loading2(Tab, [{_,#disc_load{table=Tab}}|_]) -> true;
already_loading2(Tab, [_|Rest]) -> already_loading2(Tab,Rest);
already_loading2(_,[]) -> false.
start_remote_sender(Node, Tab, Receiver, Storage) ->
Msg = #send_table{table = Tab,
receiver_pid = Receiver,
remote_storage = Storage},
gen_server:cast({?SERVER_NAME, Node}, Msg).
dump_and_reply(ReplyTo, Worker) ->
%% No trap_exit, die intentionally instead
Res = mnesia_dumper:opt_dump_log(Worker#dump_log.initiated_by),
ReplyTo ! #dumper_done{worker_pid = self(),
worker_res = Res},
unlink(ReplyTo),
exit(normal).
send_and_reply(ReplyTo, Worker) ->
%% No trap_exit, die intentionally instead
Res = mnesia_loader:send_table(Worker#send_table.receiver_pid,
Worker#send_table.table,
Worker#send_table.remote_storage),
ReplyTo ! #sender_done{worker_pid = self(),
worker_res = Res},
unlink(ReplyTo),
exit(normal).
load_and_reply(ReplyTo, Worker) ->
Load = load_table_fun(Worker),
SendAndReply =
fun() ->
process_flag(trap_exit, true),
Done = Load(),
ReplyTo ! Done#loader_done{worker_pid = self()},
unlink(ReplyTo),
exit(normal)
end,
spawn_link(SendAndReply).
%% Now it is time to load the table
%% but first we must check if it still is neccessary
load_table_fun(#net_load{cstruct=Cs, table=Tab, reason=Reason, opt_reply_to=ReplyTo}) ->
LocalC = val({Tab, local_content}),
AccessMode = val({Tab, access_mode}),
ReadNode = val({Tab, where_to_read}),
Active = filter_active(Tab),
Done = #loader_done{is_loaded = true,
table_name = Tab,
needs_announce = false,
needs_sync = false,
needs_reply = (ReplyTo /= undefined),
reply_to = ReplyTo,
reply = {loaded, ok}
},
if
ReadNode == node() ->
%% Already loaded locally
fun() -> Done end;
LocalC == true ->
fun() ->
Res = mnesia_loader:disc_load_table(Tab, load_local_content),
Done#loader_done{reply = Res, needs_announce = true, needs_sync = true}
end;
AccessMode == read_only, Reason /= {dumper,add_table_copy} ->
fun() -> disc_load_table(Tab, Reason, ReplyTo) end;
true ->
fun() ->
%% Either we cannot read the table yet
%% or someone is moving a replica between
%% two nodes
Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs),
case Res of
{loaded, ok} ->
Done#loader_done{needs_sync = true,
reply = Res};
{not_loaded, _} ->
Done#loader_done{is_loaded = false,
reply = Res}
end
end
end;
load_table_fun(#disc_load{table=Tab, reason=Reason, opt_reply_to=ReplyTo}) ->
ReadNode = val({Tab, where_to_read}),
Active = filter_active(Tab),
Done = #loader_done{is_loaded = true,
table_name = Tab,
needs_announce = false,
needs_sync = false,
needs_reply = false
},
if
Active == [], ReadNode == nowhere ->
%% Not loaded anywhere, lets load it from disc
fun() -> disc_load_table(Tab, Reason, ReplyTo) end;
ReadNode == nowhere ->
%% Already loaded on other node, lets get it
Cs = val({Tab, cstruct}),
fun() ->
case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of
{loaded, ok} ->
Done#loader_done{needs_sync = true};
{not_loaded, storage_unknown} ->
Done#loader_done{is_loaded = false};
{not_loaded, ErrReason} ->
Done#loader_done{is_loaded = false,
reply = {not_loaded,ErrReason}}
end
end;
true ->
%% Already readable, do not worry be happy
fun() -> Done end
end.
disc_load_table(Tab, Reason, ReplyTo) ->
Done = #loader_done{is_loaded = true,
table_name = Tab,
needs_announce = false,
needs_sync = false,
needs_reply = ReplyTo /= undefined,
reply_to = ReplyTo,
reply = {loaded, ok}
},
Res = mnesia_loader:disc_load_table(Tab, Reason),
if
Res == {loaded, ok} ->
Done#loader_done{needs_announce = true,
needs_sync = true,
reply = Res};
ReplyTo /= undefined ->
Done#loader_done{is_loaded = false,
reply = Res};
true ->
fatal("Cannot load table ~p from disc: ~p~n", [Tab, Res])
end.
filter_active(Tab) ->
ByForce = val({Tab, load_by_force}),
Active = val({Tab, active_replicas}),
Masters = mnesia_recover:get_master_nodes(Tab),
Ns = do_filter_active(ByForce, Active, Masters),
%% Reorder the so that we load from fastest first
LS = ?catch_val({Tab, storage_type}),
DOC = val({Tab, disc_only_copies}),
{Good,Worse} =
case LS of
disc_only_copies ->
G = mnesia_lib:intersect(Ns, DOC),
{G,Ns--G};
_ ->
G = Ns -- DOC,
{G,Ns--G}
end,
%% Pick a random node of the fastest
Len = length(Good),
if
Len > 0 ->
R = erlang:phash(node(), Len+1),
random(R-1,Good,Worse);
true ->
Worse
end.
random(N, [H|R], Acc) when N > 0 ->
random(N-1,R, [H|Acc]);
random(0, L, Acc) ->
L ++ Acc.
do_filter_active(true, Active, _Masters) ->
Active;
do_filter_active(false, Active, []) ->
Active;
do_filter_active(false, Active, Masters) ->
mnesia_lib:intersect(Active, Masters).