aboutsummaryrefslogtreecommitdiffstats
path: root/lib/mnesia/src/mnesia_controller.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mnesia/src/mnesia_controller.erl')
-rw-r--r--lib/mnesia/src/mnesia_controller.erl267
1 files changed, 132 insertions, 135 deletions
diff --git a/lib/mnesia/src/mnesia_controller.erl b/lib/mnesia/src/mnesia_controller.erl
index 6a561394d5..d488a33d67 100644
--- a/lib/mnesia/src/mnesia_controller.erl
+++ b/lib/mnesia/src/mnesia_controller.erl
@@ -107,14 +107,14 @@
-include("mnesia.hrl").
--define(SERVER_NAME, ?MODULE).
+-define(SERVER_NAME, ?MODULE).
-record(state, {supervisor,
schema_is_merged = false,
early_msgs = [],
- loader_pid = [], %% Was Pid is now [{Pid,Work}|..]
+ loader_pid = [], %% Was Pid is now [{Pid,Work}|..]
loader_queue, %% Was list is now gb_tree
- sender_pid = [], %% Was a pid or undef is now [{Pid,Work}|..]
+ sender_pid = [], %% Was a pid or undef is now [{Pid,Work}|..]
sender_queue = [],
late_loader_queue, %% Was list is now gb_tree
dumper_pid, %% Dumper or schema commit pid
@@ -124,12 +124,12 @@
is_stopping = false
}).
%% Backwards Comp. Sender_pid is now a list of senders..
-get_senders(#state{sender_pid = Pids}) when is_list(Pids) -> Pids.
+get_senders(#state{sender_pid = Pids}) when is_list(Pids) -> Pids.
%% Backwards Comp. loader_pid is now a list of loaders..
-get_loaders(#state{loader_pid = Pids}) when is_list(Pids) -> Pids.
+get_loaders(#state{loader_pid = Pids}) when is_list(Pids) -> Pids.
max_loaders() ->
case ?catch_val(no_table_loaders) of
- {'EXIT', _} ->
+ {'EXIT', _} ->
mnesia_lib:set(no_table_loaders,1),
1;
Val -> Val
@@ -153,7 +153,7 @@ max_loaders() ->
remote_storage
}).
--record(disc_load, {table,
+-record(disc_load, {table,
reason,
opt_reply_to
}).
@@ -184,7 +184,7 @@ max_loaders() ->
val(Var) ->
case ?catch_val(Var) of
- {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason);
+ {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason);
Value -> Value
end.
@@ -199,7 +199,7 @@ sync_dump_log(InitBy) ->
async_dump_log(InitBy) ->
?SERVER_NAME ! {async_dump_log, InitBy}.
-
+
%% Wait for tables to be active
%% If needed, we will wait for Mnesia to start
%% If Mnesia stops, we will wait for Mnesia to restart
@@ -227,7 +227,7 @@ do_wait_for_tables(Tabs, Timeout) ->
exit(Pid, timeout),
reply_wait(Tabs)
end.
-
+
reply_wait(Tabs) ->
case catch mnesia_lib:active_tables() of
{'EXIT', _} ->
@@ -270,7 +270,7 @@ rec_tabs([Tab | Tabs], AllTabs, From, Init) ->
%% This will trigger an exit signal
%% to mnesia_init
exit(wait_for_tables_timeout);
-
+
{'EXIT', Init, _} ->
%% Oops, mnesia_init stopped,
exit(mnesia_stopped)
@@ -279,11 +279,8 @@ rec_tabs([], _, _, Init) ->
unlink(Init),
ok.
-%% New function that does exactly what get_cstructs() used to do.
-%% When this function is called, we know that the calling node knows
-%% how to convert cstructs on the receiving end (should they differ).
get_remote_cstructs() ->
- call(get_cstructs).
+ get_cstructs(). %% Sigh not forward compatible always check version
%% Old function kept for backwards compatibility; converts cstructs before sending.
get_cstructs() ->
@@ -319,7 +316,7 @@ get_network_copy(Tab, Cs) ->
% We can't let the controller queue this one
% because that may cause a deadlock between schema_operations
% and initial tableloadings which both takes schema locks.
-% But we have to get copier_done msgs when the other side
+% But we have to get copier_done msgs when the other side
% goes down.
call({add_other, self()}),
Reason = {dumper,add_table_copy},
@@ -341,14 +338,14 @@ get_network_copy(Tab, Cs) ->
ignore
end,
Res#loader_done.reply;
- #loader_done{} ->
+ #loader_done{} ->
Res#loader_done.reply;
Else ->
{not_loaded, Else}
end.
%% This functions is invoked from the dumper
-%%
+%%
%% There are two cases here:
%% startup ->
%% no need for sync, since mnesia_controller not started yet
@@ -380,11 +377,11 @@ force_load_table(Tab) when is_atom(Tab), Tab /= schema ->
end;
force_load_table(Tab) ->
{error, {bad_type, Tab}}.
-
+
do_force_load_table(Tab) ->
Loaded = ?catch_val({Tab, load_reason}),
case Loaded of
- unknown ->
+ unknown ->
set({Tab, load_by_force}, true),
mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
wait_for_tables([Tab], infinity);
@@ -394,7 +391,7 @@ do_force_load_table(Tab) ->
wait_for_tables([Tab], infinity);
_ ->
ok
- end.
+ end.
master_nodes_updated(schema, _Masters) ->
ignore;
master_nodes_updated(Tab, Masters) ->
@@ -438,15 +435,15 @@ connect_nodes(Ns) ->
connect_nodes(Ns, fun default_merge/1).
connect_nodes(Ns, UserFun) ->
- case mnesia:system_info(is_running) of
+ case mnesia:system_info(is_running) of
no ->
{error, {node_not_running, node()}};
- yes ->
+ yes ->
Pid = spawn_link(?MODULE,connect_nodes2,[self(),Ns, UserFun]),
- receive
- {?MODULE, Pid, Res, New} ->
+ receive
+ {?MODULE, Pid, Res, New} ->
case Res of
- ok ->
+ ok ->
mnesia_lib:add_list(extra_db_nodes, New),
{ok, New};
{aborted, {throw, Str}} when is_list(Str) ->
@@ -454,8 +451,8 @@ connect_nodes(Ns, UserFun) ->
{error, {merge_schema_failed, lists:flatten(Str)}};
Else ->
{error, Else}
- end;
- {'EXIT', Pid, Reason} ->
+ end;
+ {'EXIT', Pid, Reason} ->
{error, Reason}
end
end.
@@ -466,16 +463,16 @@ connect_nodes2(Father, Ns, UserFun) ->
{NewC, OldC} = mnesia_recover:connect_nodes(Ns),
Connected = NewC ++OldC,
New1 = mnesia_lib:intersect(Ns, Connected),
- New = New1 -- Current,
+ New = New1 -- Current,
process_flag(trap_exit, true),
Res = try_merge_schema(New, [], UserFun),
Msg = {schema_is_merged, [], late_merge, []},
multicall([node()|Ns], Msg),
- After = val({current, db_nodes}),
+ After = val({current, db_nodes}),
Father ! {?MODULE, self(), Res, mnesia_lib:intersect(Ns,After)},
unlink(Father),
ok.
-
+
%% Merge the local schema with the schema on other nodes.
%% But first we must let all processes that want to force
%% load tables wait until the schema merge is done.
@@ -483,7 +480,7 @@ connect_nodes2(Father, Ns, UserFun) ->
merge_schema() ->
AllNodes = mnesia_lib:all_nodes(),
case try_merge_schema(AllNodes, [node()], fun default_merge/1) of
- ok ->
+ ok ->
schema_is_merged();
{aborted, {throw, Str}} when is_list(Str) ->
fatal("Failed to merge schema: ~s~n", [Str]);
@@ -535,7 +532,7 @@ im_running(OldFriends, NewFriends) ->
schema_is_merged() ->
MsgTag = schema_is_merged,
SafeLoads = initial_safe_loads(),
-
+
%% At this point we do not know anything about
%% which tables that the other nodes already
%% has loaded and therefore we let the normal
@@ -545,7 +542,7 @@ schema_is_merged() ->
%% that all nodes tells each other directly
%% when they have loaded a table and are
%% willing to share it.
-
+
try_schedule_late_disc_load(SafeLoads, initial, MsgTag).
@@ -589,7 +586,7 @@ remote_call(Node, Func, Args) ->
Else ->
Else
end.
-
+
multicall(Nodes, Msg) ->
{Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity),
PatchedGood = [Reply || {_Node, Reply} <- Good],
@@ -621,9 +618,9 @@ init([Parent]) ->
Msg = {async_dump_log, time_threshold},
{ok, Ref} = timer:send_interval(Interval, Msg),
mnesia_dumper:start_regulator(),
-
+
Empty = gb_trees:empty(),
- {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref,
+ {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref,
loader_queue = Empty,
late_loader_queue = Empty}}.
@@ -656,17 +653,17 @@ handle_call(block_controller, From, State) ->
handle_call({update,Fun}, From, State) ->
Res = (catch Fun()),
- reply(From, Res),
+ reply(From, Res),
noreply(State);
handle_call(get_cstructs, From, State) ->
Tabs = val({schema, tables}),
Cstructs = [val({T, cstruct}) || T <- Tabs],
Running = val({current, db_nodes}),
- reply(From, {cstructs, Cstructs, Running}),
+ reply(From, {cstructs, Cstructs, Running}),
noreply(State);
-handle_call({schema_is_merged, [], late_merge, []}, From,
+handle_call({schema_is_merged, [], late_merge, []}, From,
State = #state{schema_is_merged = Merged}) ->
case Merged of
{false, Node} when Node == node(From) ->
@@ -697,8 +694,8 @@ handle_call(disc_load_intents,From,State = #state{loader_queue=LQ,late_loader_qu
handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) ->
Current = val({current, db_nodes}),
- Res =
- case lists:member(AddNode, Current) and
+ Res =
+ case lists:member(AddNode, Current) and
(State#state.schema_is_merged == true) of
true ->
mnesia_lib:add_lsort({Tab, where_to_write}, AddNode),
@@ -732,7 +729,7 @@ handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
end;
-handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) ->
+handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) ->
KnownNode = lists:member(node(From), val({current, db_nodes})),
Merged = State#state.schema_is_merged,
if
@@ -752,16 +749,16 @@ handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) ->
end;
handle_call({net_load, Tab, Cs}, From, State) ->
- State2 =
+ State2 =
case State#state.schema_is_merged of
- true ->
+ true ->
Worker = #net_load{table = Tab,
opt_reply_to = From,
reason = {dumper,add_table_copy},
cstruct = Cs
},
add_worker(Worker, State);
- false ->
+ false ->
reply(From, {not_loaded, schema_not_merged}),
State
end,
@@ -804,16 +801,16 @@ handle_call({add_other, Who}, _From, State = #state{others=Others0}) ->
handle_call({del_other, Who}, _From, State = #state{others=Others0}) ->
Others = lists:delete(Who, Others0),
{reply, ok, State#state{others=Others}};
-
+
handle_call(Msg, _From, State) ->
error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]),
noreply(State).
-late_disc_load(TabsR, Reason, RemoteLoaders, From,
+late_disc_load(TabsR, Reason, RemoteLoaders, From,
State = #state{loader_queue = LQ, late_loader_queue = LLQ}) ->
verbose("Intend to load tables: ~p~n", [TabsR]),
?eval_debug_fun({?MODULE, late_disc_load},
- [{tabs, TabsR},
+ [{tabs, TabsR},
{reason, Reason},
{loaders, RemoteLoaders}]),
@@ -822,14 +819,14 @@ late_disc_load(TabsR, Reason, RemoteLoaders, From,
%% Remove deleted tabs and queued/loaded
LocalTabs = gb_sets:from_ordset(lists:sort(mnesia_lib:val({schema,local_tables}))),
- Filter = fun(TabInfo0, Acc) ->
- TabInfo = {Tab,_} =
- case TabInfo0 of
+ Filter = fun(TabInfo0, Acc) ->
+ TabInfo = {Tab,_} =
+ case TabInfo0 of
{_,_} -> TabInfo0;
TabN -> {TabN,Reason}
end,
case gb_sets:is_member(Tab, LocalTabs) of
- true ->
+ true ->
case ?catch_val({Tab, where_to_read}) == node() of
true -> Acc;
false ->
@@ -841,12 +838,12 @@ late_disc_load(TabsR, Reason, RemoteLoaders, From,
false -> Acc
end
end,
-
+
Tabs = lists:foldl(Filter, [], TabsR),
-
+
Nodes = val({current, db_nodes}),
LateQueue = late_loaders(Tabs, RemoteLoaders, Nodes, LLQ),
- State#state{late_loader_queue = LateQueue}.
+ State#state{late_loader_queue = LateQueue}.
late_loaders([{Tab, Reason} | Tabs], RemoteLoaders, Nodes, LLQ) ->
case gb_trees:is_defined(Tab, LLQ) of
@@ -859,7 +856,7 @@ late_loaders([{Tab, Reason} | Tabs], RemoteLoaders, Nodes, LLQ) ->
LateLoad = #late_load{table=Tab,loaders=LoadNodes,reason=Reason},
late_loaders(Tabs, RemoteLoaders, Nodes, gb_trees:insert(Tab,LateLoad,LLQ));
true ->
- late_loaders(Tabs, RemoteLoaders, Nodes, LLQ)
+ late_loaders(Tabs, RemoteLoaders, Nodes, LLQ)
end;
late_loaders([], _RemoteLoaders, _Nodes, LLQ) ->
LLQ.
@@ -899,7 +896,7 @@ late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) ->
end;
late_load_filter([], _Tab, _Nodes, Acc) ->
Acc.
-
+
%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
@@ -911,7 +908,7 @@ handle_cast({release_schema_commit_lock, _Owner}, State) ->
if
State#state.is_stopping == true ->
{stop, shutdown, State};
- true ->
+ true ->
case State#state.dumper_queue of
[#schema_commit_lock{}|Rest] ->
[_Worker | Rest] = State#state.dumper_queue,
@@ -932,7 +929,7 @@ handle_cast(unblock_controller, State) ->
[_Worker | Rest] = State#state.dumper_queue,
State2 = State#state{dumper_pid = undefined,
dumper_queue = Rest},
- State3 = opt_start_worker(State2),
+ State3 = opt_start_worker(State2),
noreply(State3)
end;
@@ -948,31 +945,31 @@ handle_cast({mnesia_down, Node}, State) ->
%% Fix if we are late_merging against the node that went down
case State#state.schema_is_merged of
- {false, Node} ->
+ {false, Node} ->
spawn(?MODULE, call, [{schema_is_merged, [], late_merge, []}]);
_ ->
ignore
end,
-
+
%% Fix internal stuff
LateQ = remove_loaders(Alltabs, Node, State#state.late_loader_queue),
-
+
case get_senders(State) ++ get_loaders(State) of
[] -> ignore;
- Senders ->
+ Senders ->
lists:foreach(fun({Pid,_}) -> Pid ! {copier_done, Node} end,
Senders)
end,
- lists:foreach(fun(Pid) -> Pid ! {copier_done,Node} end,
+ lists:foreach(fun(Pid) -> Pid ! {copier_done,Node} end,
State#state.others),
-
+
Remove = fun(ST) ->
node(ST#send_table.receiver_pid) /= Node
end,
NewSenders = lists:filter(Remove, State#state.sender_queue),
Early = remove_early_messages(State#state.early_msgs, Node),
- noreply(State#state{sender_queue = NewSenders,
- early_msgs = Early,
+ noreply(State#state{sender_queue = NewSenders,
+ early_msgs = Early,
late_loader_queue = LateQ
});
@@ -981,8 +978,8 @@ handle_cast({merging_schema, Node}, State) ->
false ->
%% This comes from dynamic connect_nodes which are made
%% after mnesia:start() and the schema_merge.
- ImANewKidInTheBlock =
- (val({schema, storage_type}) == ram_copies)
+ ImANewKidInTheBlock =
+ (val({schema, storage_type}) == ram_copies)
andalso (mnesia_lib:val({schema, local_tables}) == [schema]),
case ImANewKidInTheBlock of
true -> %% I'm newly started ram_node..
@@ -1000,7 +997,7 @@ handle_cast(Msg, State) when State#state.schema_is_merged /= true ->
noreply(State#state{early_msgs = [{cast, Msg} | Msgs]});
%% This must be done after schema_is_merged otherwise adopt_orphan
-%% might trigger a table load from wrong nodes as a result of that we don't
+%% might trigger a table load from wrong nodes as a result of that we don't
%% know which tables we can load safly first.
handle_cast({im_running, Node, NewFriends}, State) ->
LocalTabs = mnesia_lib:local_active_tables() -- [schema],
@@ -1027,7 +1024,7 @@ handle_cast({sync_tabs, Tabs, From}, State) ->
handle_cast({i_have_tab, Tab, Node}, State) ->
case lists:member(Node, val({current, db_nodes})) of
- true ->
+ true ->
State2 = node_has_tabs([Tab], Node, State),
noreply(State2);
false ->
@@ -1043,10 +1040,10 @@ handle_cast({force_load_updated, Tab}, State) ->
State2 = node_has_tabs([Tab], SomeNode, State),
noreply(State2)
end;
-
+
handle_cast({master_nodes_updated, Tab, Masters}, State) ->
Active = val({Tab, active_replicas}),
- Valid =
+ Valid =
case val({Tab, load_by_force}) of
true ->
Active;
@@ -1066,10 +1063,10 @@ handle_cast({master_nodes_updated, Tab, Masters}, State) ->
State2 = node_has_tabs([Tab], SomeNode, State),
noreply(State2)
end;
-
+
handle_cast({adopt_orphans, Node, Tabs}, State) ->
State2 = node_has_tabs(Tabs, Node, State),
-
+
case ?catch_val({node_up,Node}) of
true -> ignore;
_ ->
@@ -1101,7 +1098,7 @@ handle_cast(Msg, State) ->
error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]),
noreply(State).
-handle_sync_tabs([Tab | Tabs], From) ->
+handle_sync_tabs([Tab | Tabs], From) ->
case val({Tab, where_to_read}) of
nowhere ->
case get({sync_tab, Tab}) of
@@ -1145,7 +1142,7 @@ handle_info(#dumper_done{worker_pid=Pid, worker_res=Res}, State) ->
{stop, fatal, State}
end;
-handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) ->
+handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) ->
LateQueue0 = State0#state.late_loader_queue,
State1 = State0#state{loader_pid = lists:keydelete(WPid,1,get_loaders(State0))},
@@ -1153,7 +1150,7 @@ handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) ->
case Done#loader_done.is_loaded of
true ->
%% Optional table announcement
- if
+ if
Done#loader_done.needs_announce == true,
Done#loader_done.needs_reply == true ->
i_have_tab(Tab),
@@ -1187,7 +1184,7 @@ handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) ->
State1#state{late_loader_queue=gb_trees:delete_any(Tab, LateQueue0)};
false ->
%% Either the node went down or table was not
- %% loaded remotly yet
+ %% loaded remotly yet
case Done#loader_done.needs_reply of
true ->
reply(Done#loader_done.reply_to,
@@ -1210,7 +1207,7 @@ handle_info(#sender_done{worker_pid=Pid, worker_res=Res}, State) ->
Senders = get_senders(State),
{value, {Pid,_Worker}} = lists:keysearch(Pid, 1, Senders),
if
- Res == ok ->
+ Res == ok ->
State2 = State#state{sender_pid = lists:keydelete(Pid, 1, Senders)},
State3 = opt_start_worker(State2),
noreply(State3);
@@ -1252,7 +1249,7 @@ handle_info(Msg = {'EXIT', Pid, R}, State) when R /= wait_for_tables_timeout ->
{stop, fatal, State};
false ->
case lists:keymember(Pid, 1, get_loaders(State)) of
- true ->
+ true ->
fatal("Loader crashed: ~p~n state: ~p~n", [R, State]),
{stop, fatal, State};
false ->
@@ -1338,7 +1335,7 @@ code_change(_OldVsn, State0, _Extra) ->
State1 = case State0#state.loader_pid of
Pids when is_list(Pids) -> State0;
undefined -> State0#state{loader_pid = [],loader_queue=gb_trees:empty()};
- Pid when is_pid(Pid) ->
+ Pid when is_pid(Pid) ->
[Loader|Rest] = State0#state.loader_queue,
LQ0 = [{element(2,Rec),Rec} || Rec <- Rest],
LQ1 = lists:sort(LQ0),
@@ -1346,7 +1343,7 @@ code_change(_OldVsn, State0, _Extra) ->
State0#state{loader_pid=[{Pid,Loader}], loader_queue=LQ}
end,
%% LateLoaderQueue
- State = if is_list(State1#state.late_loader_queue) ->
+ State = if is_list(State1#state.late_loader_queue) ->
LLQ0 = State1#state.late_loader_queue,
LLQ1 = lists:sort([{element(2,Rec),Rec} || Rec <- LLQ0]),
LLQ = gb_trees:from_orddict(LLQ1),
@@ -1355,7 +1352,7 @@ code_change(_OldVsn, State0, _Extra) ->
State1
end,
{ok, State}.
-
+
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
@@ -1365,20 +1362,20 @@ maybe_log_mnesia_down(N) ->
%% so if we are not running (i.e haven't decided which tables
%% to load locally), don't log mnesia_down yet.
case mnesia_lib:is_running() of
- yes ->
+ yes ->
verbose("Logging mnesia_down ~w~n", [N]),
mnesia_recover:log_mnesia_down(N),
ok;
- _ ->
+ _ ->
Filter = fun(Tab) ->
inactive_copy_holders(Tab, N)
end,
HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]),
- if
+ if
HalfLoadedTabs == true ->
verbose("Logging mnesia_down ~w~n", [N]),
mnesia_recover:log_mnesia_down(N),
- ok;
+ ok;
true ->
%% Unfortunately we have not loaded some common
%% tables yet, so we cannot rely on the nodedown
@@ -1407,7 +1404,7 @@ orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) ->
BeingCreated = (?catch_val({Tab, create_table}) == true),
Read = val({Tab, where_to_read}),
case lists:member(Node, DiscCopyHolders) of
- _ when BeingCreated == true ->
+ _ when BeingCreated == true ->
orphan_tables(Tabs, Node, Ns, Local, Remote);
_ when Read == node() -> %% Allready loaded
orphan_tables(Tabs, Node, Ns, Local, Remote);
@@ -1445,13 +1442,13 @@ orphan_tables([], _, _, LocalOrphans, RemoteMasters) ->
{LocalOrphans, RemoteMasters}.
node_has_tabs([Tab | Tabs], Node, State) when Node /= node() ->
- State2 =
+ State2 =
case catch update_whereabouts(Tab, Node, State) of
State1 = #state{} -> State1;
{'EXIT', R} -> %% Tab was just deleted?
case ?catch_val({Tab, cstruct}) of
{'EXIT', _} -> State; % yes
- _ -> erlang:error(R)
+ _ -> erlang:error(R)
end
end,
node_has_tabs(Tabs, Node, State2);
@@ -1477,14 +1474,14 @@ update_whereabouts(Tab, Node, State) ->
true ->
lists:member(Node, Masters)
end,
-
+
dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n",
[Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]),
if
LocalC == true ->
%% Local contents, don't care about other node
State;
- BeingCreated == true ->
+ BeingCreated == true ->
%% The table is currently being created
%% It will be handled elsewhere
State;
@@ -1501,8 +1498,8 @@ update_whereabouts(Tab, Node, State) ->
State
end;
Storage == unknown ->
- %% No own copy, continue to read remotely
- add_active_replica(Tab, Node),
+ %% No own copy, continue to read remotely
+ add_active_replica(Tab, Node),
NodeST = mnesia_lib:storage_type_at_node(Node, Tab),
ReadST = mnesia_lib:storage_type_at_node(Read, Tab),
if %% Avoid reading from disc_only_copies
@@ -1542,16 +1539,16 @@ initial_safe_loads() ->
Tabs = val({schema, local_tables}) -- [schema],
LastC = fun(T) -> last_consistent_replica(T, Downs) end,
lists:zf(LastC, Tabs);
-
+
disc_copies ->
Downs = mnesia_recover:get_mnesia_downs(),
dbg_out("mnesia_downs = ~p~n", [Downs]),
-
+
Tabs = val({schema, local_tables}) -- [schema],
LastC = fun(T) -> last_consistent_replica(T, Downs) end,
lists:zf(LastC, Tabs)
end.
-
+
last_consistent_replica(Tab, Downs) ->
Cs = val({Tab, cstruct}),
Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
@@ -1628,7 +1625,7 @@ remove_early_messages([], _Node) ->
[];
remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) ->
remove_early_messages(R, Node); %% Does a reply before queuing
-remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node)
+remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node)
when node(From) == Node ->
reply(ReplyTo, ok), %% Remove gen:server waits..
remove_early_messages(R, Node);
@@ -1682,9 +1679,9 @@ is_tab_blocked(W2C) when is_list(W2C) ->
is_tab_blocked({blocked, W2C}) when is_list(W2C) ->
{true, W2C}.
-mark_blocked_tab(true, Value) ->
+mark_blocked_tab(true, Value) ->
{blocked, Value};
-mark_blocked_tab(false, Value) ->
+mark_blocked_tab(false, Value) ->
Value.
%%
@@ -1717,7 +1714,7 @@ del_active_replica(Tab, Node) ->
update_where_to_wlock(Tab).
change_table_access_mode(Cs) ->
- W = fun() ->
+ W = fun() ->
Tab = Cs#cstruct.name,
lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end,
val({Tab, active_replicas}))
@@ -1746,7 +1743,7 @@ update_where_to_wlock(Tab) ->
unannounce_add_table_copy(Tab, To) ->
catch del_active_replica(Tab, To),
case catch val({Tab , where_to_read}) of
- To ->
+ To ->
mnesia_lib:set_remote_where_to_read(Tab);
_ ->
ignore
@@ -1759,7 +1756,7 @@ user_sync_tab(Tab) ->
_ ->
ignore
end,
-
+
case erase({sync_tab, Tab}) of
undefined ->
ok;
@@ -1778,11 +1775,11 @@ i_have_tab(Tab) ->
sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema ->
Current = val({current, db_nodes}),
- Ns =
+ Ns =
case lists:member(ToNode, Current) of
true -> Current -- [ToNode];
false -> Current
- end,
+ end,
remote_call(ToNode, block_table, [Tab]),
[remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) ||
Node <- [ToNode | Ns]],
@@ -1827,7 +1824,7 @@ get_workers(Timeout) ->
{timeout, Timeout}
end
end.
-
+
info() ->
Tabs = mnesia_lib:local_active_tables(),
io:format( "---> Active tables <--- ~n", []),
@@ -1836,12 +1833,12 @@ info() ->
info([Tab | Tail]) ->
case val({Tab, storage_type}) of
disc_only_copies ->
- info_format(Tab,
- dets:info(Tab, size),
+ info_format(Tab,
+ dets:info(Tab, size),
dets:info(Tab, file_size),
"bytes on disc");
_ ->
- info_format(Tab,
+ info_format(Tab,
?ets_info(Tab, size),
?ets_info(Tab, memory),
"words of mem")
@@ -1881,7 +1878,7 @@ handle_early_msg({cast, Msg}, State) ->
handle_cast(Msg, State);
handle_early_msg({info, Msg}, State) ->
handle_info(Msg, State).
-
+
noreply(State) ->
{noreply, State}.
@@ -1929,7 +1926,7 @@ add_worker(Worker = #send_table{}, State) ->
add_worker(Worker = #disc_load{}, State) ->
opt_start_worker(add_loader(Worker#disc_load.table,Worker,State));
% Block controller should be used for upgrading mnesia.
-add_worker(Worker = #block_controller{}, State) ->
+add_worker(Worker = #block_controller{}, State) ->
Queue = State#state.dumper_queue,
Queue2 = [Worker | Queue],
State2 = State#state{dumper_queue = Queue2},
@@ -1938,13 +1935,13 @@ add_worker(Worker = #block_controller{}, State) ->
add_loader(Tab,Worker,State = #state{loader_queue=LQ0}) ->
case gb_trees:is_defined(Tab, LQ0) of
true -> State;
- false ->
+ false ->
LQ=gb_trees:insert(Tab, Worker, LQ0),
State#state{loader_queue=LQ}
end.
%% Optionally start a worker
-%%
+%%
%% Dumpers and loaders may run simultaneously
%% but neither of them may run during schema commit.
%% Loaders may not start if a schema commit is enqueued.
@@ -1958,7 +1955,7 @@ opt_start_worker(State) ->
%% Great, a worker in queue and neither
%% a schema transaction is being
%% committed and nor a dumper is running
-
+
%% Start worker but keep him in the queue
if
is_record(Worker, schema_commit_lock) ->
@@ -1966,7 +1963,7 @@ opt_start_worker(State) ->
reply(ReplyTo, granted),
{Owner, _Tag} = ReplyTo,
opt_start_loader(State#state{dumper_pid = Owner});
-
+
is_record(Worker, dump_log) ->
Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]),
State2 = State#state{dumper_pid = Pid},
@@ -1976,7 +1973,7 @@ opt_start_worker(State) ->
%% or sender
State3 = opt_start_sender(State2),
opt_start_loader(State3);
-
+
is_record(Worker, block_controller) ->
case {get_senders(State), get_loaders(State)} of
{[], []} ->
@@ -1989,7 +1986,7 @@ opt_start_worker(State) ->
end
end;
_ ->
- %% Bad luck, try with a loader or sender instead
+ %% Bad luck, try with a loader or sender instead
State2 = opt_start_sender(State),
opt_start_loader(State2)
end.
@@ -1997,8 +1994,8 @@ opt_start_worker(State) ->
opt_start_sender(State) ->
case State#state.sender_queue of
[]-> State; %% No need
- SenderQ ->
- {NewS,Kept} = opt_start_sender2(SenderQ, get_senders(State),
+ SenderQ ->
+ {NewS,Kept} = opt_start_sender2(SenderQ, get_senders(State),
[], get_loaders(State)),
State#state{sender_pid = NewS, sender_queue = Kept}
end.
@@ -2007,11 +2004,11 @@ opt_start_sender2([], Pids,Kept, _) -> {Pids,Kept};
opt_start_sender2([Sender|R], Pids, Kept, LoaderQ) ->
Tab = Sender#send_table.table,
Active = val({Tab, active_replicas}),
- IgotIt = lists:member(node(), Active),
- IsLoading = lists:any(fun({_Pid,Loader}) ->
+ IgotIt = lists:member(node(), Active),
+ IsLoading = lists:any(fun({_Pid,Loader}) ->
Tab == element(#net_load.table, Loader)
end, LoaderQ),
- if
+ if
IgotIt, IsLoading ->
%% I'm currently finishing loading the table let him wait
opt_start_sender2(R,Pids, [Sender|Kept], LoaderQ);
@@ -2029,11 +2026,11 @@ opt_start_loader(State = #state{loader_queue = LoaderQ}) ->
Current = get_loaders(State),
Max = max_loaders(),
case gb_trees:is_empty(LoaderQ) of
- true ->
+ true ->
State;
- _ when length(Current) >= Max ->
+ _ when length(Current) >= Max ->
State;
- false ->
+ false ->
SchemaQueue = State#state.dumper_queue,
case lists:keymember(schema_commit_lock, 1, SchemaQueue) of
false ->
@@ -2064,7 +2061,7 @@ already_loading(#disc_load{table=Tab},Loaders) ->
already_loading2(Tab, [{_,#net_load{table=Tab}}|_]) -> true;
already_loading2(Tab, [{_,#disc_load{table=Tab}}|_]) -> true;
-already_loading2(Tab, [_|Rest]) -> already_loading2(Tab,Rest);
+already_loading2(Tab, [_|Rest]) -> already_loading2(Tab,Rest);
already_loading2(_,[]) -> false.
start_remote_sender(Node, Tab, Receiver, Storage) ->
@@ -2093,8 +2090,8 @@ send_and_reply(ReplyTo, Worker) ->
load_and_reply(ReplyTo, Worker) ->
Load = load_table_fun(Worker),
- SendAndReply =
- fun() ->
+ SendAndReply =
+ fun() ->
process_flag(trap_exit, true),
Done = Load(),
ReplyTo ! Done#loader_done{worker_pid = self()},
@@ -2161,7 +2158,7 @@ load_table_fun(#disc_load{table=Tab, reason=Reason, opt_reply_to=ReplyTo}) ->
ReadNode == nowhere ->
%% Already loaded on other node, lets get it
Cs = val({Tab, cstruct}),
- fun() ->
+ fun() ->
case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of
{loaded, ok} ->
Done#loader_done{needs_sync = true};
@@ -2204,10 +2201,10 @@ filter_active(Tab) ->
Active = val({Tab, active_replicas}),
Masters = mnesia_recover:get_master_nodes(Tab),
Ns = do_filter_active(ByForce, Active, Masters),
- %% Reorder the so that we load from fastest first
+ %% Reorder the so that we load from fastest first
LS = ?catch_val({Tab, storage_type}),
DOC = val({Tab, disc_only_copies}),
- {Good,Worse} =
+ {Good,Worse} =
case LS of
disc_only_copies ->
G = mnesia_lib:intersect(Ns, DOC),
@@ -2218,7 +2215,7 @@ filter_active(Tab) ->
end,
%% Pick a random node of the fastest
Len = length(Good),
- if
+ if
Len > 0 ->
R = erlang:phash(node(), Len+1),
random(R-1,Good,Worse);
@@ -2237,5 +2234,5 @@ do_filter_active(false, Active, []) ->
Active;
do_filter_active(false, Active, Masters) ->
mnesia_lib:intersect(Active, Masters).
-
+