diff options
Diffstat (limited to 'lib/mnesia/src/mnesia_controller.erl')
-rw-r--r-- | lib/mnesia/src/mnesia_controller.erl | 267 |
1 files changed, 132 insertions, 135 deletions
diff --git a/lib/mnesia/src/mnesia_controller.erl b/lib/mnesia/src/mnesia_controller.erl index 6a561394d5..d488a33d67 100644 --- a/lib/mnesia/src/mnesia_controller.erl +++ b/lib/mnesia/src/mnesia_controller.erl @@ -107,14 +107,14 @@ -include("mnesia.hrl"). --define(SERVER_NAME, ?MODULE). +-define(SERVER_NAME, ?MODULE). -record(state, {supervisor, schema_is_merged = false, early_msgs = [], - loader_pid = [], %% Was Pid is now [{Pid,Work}|..] + loader_pid = [], %% Was Pid is now [{Pid,Work}|..] loader_queue, %% Was list is now gb_tree - sender_pid = [], %% Was a pid or undef is now [{Pid,Work}|..] + sender_pid = [], %% Was a pid or undef is now [{Pid,Work}|..] sender_queue = [], late_loader_queue, %% Was list is now gb_tree dumper_pid, %% Dumper or schema commit pid @@ -124,12 +124,12 @@ is_stopping = false }). %% Backwards Comp. Sender_pid is now a list of senders.. -get_senders(#state{sender_pid = Pids}) when is_list(Pids) -> Pids. +get_senders(#state{sender_pid = Pids}) when is_list(Pids) -> Pids. %% Backwards Comp. loader_pid is now a list of loaders.. -get_loaders(#state{loader_pid = Pids}) when is_list(Pids) -> Pids. +get_loaders(#state{loader_pid = Pids}) when is_list(Pids) -> Pids. max_loaders() -> case ?catch_val(no_table_loaders) of - {'EXIT', _} -> + {'EXIT', _} -> mnesia_lib:set(no_table_loaders,1), 1; Val -> Val @@ -153,7 +153,7 @@ max_loaders() -> remote_storage }). --record(disc_load, {table, +-record(disc_load, {table, reason, opt_reply_to }). @@ -184,7 +184,7 @@ max_loaders() -> val(Var) -> case ?catch_val(Var) of - {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason); + {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason); Value -> Value end. @@ -199,7 +199,7 @@ sync_dump_log(InitBy) -> async_dump_log(InitBy) -> ?SERVER_NAME ! {async_dump_log, InitBy}. - + %% Wait for tables to be active %% If needed, we will wait for Mnesia to start %% If Mnesia stops, we will wait for Mnesia to restart @@ -227,7 +227,7 @@ do_wait_for_tables(Tabs, Timeout) -> exit(Pid, timeout), reply_wait(Tabs) end. - + reply_wait(Tabs) -> case catch mnesia_lib:active_tables() of {'EXIT', _} -> @@ -270,7 +270,7 @@ rec_tabs([Tab | Tabs], AllTabs, From, Init) -> %% This will trigger an exit signal %% to mnesia_init exit(wait_for_tables_timeout); - + {'EXIT', Init, _} -> %% Oops, mnesia_init stopped, exit(mnesia_stopped) @@ -279,11 +279,8 @@ rec_tabs([], _, _, Init) -> unlink(Init), ok. -%% New function that does exactly what get_cstructs() used to do. -%% When this function is called, we know that the calling node knows -%% how to convert cstructs on the receiving end (should they differ). get_remote_cstructs() -> - call(get_cstructs). + get_cstructs(). %% Sigh not forward compatible always check version %% Old function kept for backwards compatibility; converts cstructs before sending. get_cstructs() -> @@ -319,7 +316,7 @@ get_network_copy(Tab, Cs) -> % We can't let the controller queue this one % because that may cause a deadlock between schema_operations % and initial tableloadings which both takes schema locks. -% But we have to get copier_done msgs when the other side +% But we have to get copier_done msgs when the other side % goes down. call({add_other, self()}), Reason = {dumper,add_table_copy}, @@ -341,14 +338,14 @@ get_network_copy(Tab, Cs) -> ignore end, Res#loader_done.reply; - #loader_done{} -> + #loader_done{} -> Res#loader_done.reply; Else -> {not_loaded, Else} end. %% This functions is invoked from the dumper -%% +%% %% There are two cases here: %% startup -> %% no need for sync, since mnesia_controller not started yet @@ -380,11 +377,11 @@ force_load_table(Tab) when is_atom(Tab), Tab /= schema -> end; force_load_table(Tab) -> {error, {bad_type, Tab}}. - + do_force_load_table(Tab) -> Loaded = ?catch_val({Tab, load_reason}), case Loaded of - unknown -> + unknown -> set({Tab, load_by_force}, true), mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user), wait_for_tables([Tab], infinity); @@ -394,7 +391,7 @@ do_force_load_table(Tab) -> wait_for_tables([Tab], infinity); _ -> ok - end. + end. master_nodes_updated(schema, _Masters) -> ignore; master_nodes_updated(Tab, Masters) -> @@ -438,15 +435,15 @@ connect_nodes(Ns) -> connect_nodes(Ns, fun default_merge/1). connect_nodes(Ns, UserFun) -> - case mnesia:system_info(is_running) of + case mnesia:system_info(is_running) of no -> {error, {node_not_running, node()}}; - yes -> + yes -> Pid = spawn_link(?MODULE,connect_nodes2,[self(),Ns, UserFun]), - receive - {?MODULE, Pid, Res, New} -> + receive + {?MODULE, Pid, Res, New} -> case Res of - ok -> + ok -> mnesia_lib:add_list(extra_db_nodes, New), {ok, New}; {aborted, {throw, Str}} when is_list(Str) -> @@ -454,8 +451,8 @@ connect_nodes(Ns, UserFun) -> {error, {merge_schema_failed, lists:flatten(Str)}}; Else -> {error, Else} - end; - {'EXIT', Pid, Reason} -> + end; + {'EXIT', Pid, Reason} -> {error, Reason} end end. @@ -466,16 +463,16 @@ connect_nodes2(Father, Ns, UserFun) -> {NewC, OldC} = mnesia_recover:connect_nodes(Ns), Connected = NewC ++OldC, New1 = mnesia_lib:intersect(Ns, Connected), - New = New1 -- Current, + New = New1 -- Current, process_flag(trap_exit, true), Res = try_merge_schema(New, [], UserFun), Msg = {schema_is_merged, [], late_merge, []}, multicall([node()|Ns], Msg), - After = val({current, db_nodes}), + After = val({current, db_nodes}), Father ! {?MODULE, self(), Res, mnesia_lib:intersect(Ns,After)}, unlink(Father), ok. - + %% Merge the local schema with the schema on other nodes. %% But first we must let all processes that want to force %% load tables wait until the schema merge is done. @@ -483,7 +480,7 @@ connect_nodes2(Father, Ns, UserFun) -> merge_schema() -> AllNodes = mnesia_lib:all_nodes(), case try_merge_schema(AllNodes, [node()], fun default_merge/1) of - ok -> + ok -> schema_is_merged(); {aborted, {throw, Str}} when is_list(Str) -> fatal("Failed to merge schema: ~s~n", [Str]); @@ -535,7 +532,7 @@ im_running(OldFriends, NewFriends) -> schema_is_merged() -> MsgTag = schema_is_merged, SafeLoads = initial_safe_loads(), - + %% At this point we do not know anything about %% which tables that the other nodes already %% has loaded and therefore we let the normal @@ -545,7 +542,7 @@ schema_is_merged() -> %% that all nodes tells each other directly %% when they have loaded a table and are %% willing to share it. - + try_schedule_late_disc_load(SafeLoads, initial, MsgTag). @@ -589,7 +586,7 @@ remote_call(Node, Func, Args) -> Else -> Else end. - + multicall(Nodes, Msg) -> {Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity), PatchedGood = [Reply || {_Node, Reply} <- Good], @@ -621,9 +618,9 @@ init([Parent]) -> Msg = {async_dump_log, time_threshold}, {ok, Ref} = timer:send_interval(Interval, Msg), mnesia_dumper:start_regulator(), - + Empty = gb_trees:empty(), - {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref, + {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref, loader_queue = Empty, late_loader_queue = Empty}}. @@ -656,17 +653,17 @@ handle_call(block_controller, From, State) -> handle_call({update,Fun}, From, State) -> Res = (catch Fun()), - reply(From, Res), + reply(From, Res), noreply(State); handle_call(get_cstructs, From, State) -> Tabs = val({schema, tables}), Cstructs = [val({T, cstruct}) || T <- Tabs], Running = val({current, db_nodes}), - reply(From, {cstructs, Cstructs, Running}), + reply(From, {cstructs, Cstructs, Running}), noreply(State); -handle_call({schema_is_merged, [], late_merge, []}, From, +handle_call({schema_is_merged, [], late_merge, []}, From, State = #state{schema_is_merged = Merged}) -> case Merged of {false, Node} when Node == node(From) -> @@ -697,8 +694,8 @@ handle_call(disc_load_intents,From,State = #state{loader_queue=LQ,late_loader_qu handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) -> Current = val({current, db_nodes}), - Res = - case lists:member(AddNode, Current) and + Res = + case lists:member(AddNode, Current) and (State#state.schema_is_merged == true) of true -> mnesia_lib:add_lsort({Tab, where_to_write}, AddNode), @@ -732,7 +729,7 @@ handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From}, noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]}) end; -handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) -> +handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) -> KnownNode = lists:member(node(From), val({current, db_nodes})), Merged = State#state.schema_is_merged, if @@ -752,16 +749,16 @@ handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) -> end; handle_call({net_load, Tab, Cs}, From, State) -> - State2 = + State2 = case State#state.schema_is_merged of - true -> + true -> Worker = #net_load{table = Tab, opt_reply_to = From, reason = {dumper,add_table_copy}, cstruct = Cs }, add_worker(Worker, State); - false -> + false -> reply(From, {not_loaded, schema_not_merged}), State end, @@ -804,16 +801,16 @@ handle_call({add_other, Who}, _From, State = #state{others=Others0}) -> handle_call({del_other, Who}, _From, State = #state{others=Others0}) -> Others = lists:delete(Who, Others0), {reply, ok, State#state{others=Others}}; - + handle_call(Msg, _From, State) -> error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]), noreply(State). -late_disc_load(TabsR, Reason, RemoteLoaders, From, +late_disc_load(TabsR, Reason, RemoteLoaders, From, State = #state{loader_queue = LQ, late_loader_queue = LLQ}) -> verbose("Intend to load tables: ~p~n", [TabsR]), ?eval_debug_fun({?MODULE, late_disc_load}, - [{tabs, TabsR}, + [{tabs, TabsR}, {reason, Reason}, {loaders, RemoteLoaders}]), @@ -822,14 +819,14 @@ late_disc_load(TabsR, Reason, RemoteLoaders, From, %% Remove deleted tabs and queued/loaded LocalTabs = gb_sets:from_ordset(lists:sort(mnesia_lib:val({schema,local_tables}))), - Filter = fun(TabInfo0, Acc) -> - TabInfo = {Tab,_} = - case TabInfo0 of + Filter = fun(TabInfo0, Acc) -> + TabInfo = {Tab,_} = + case TabInfo0 of {_,_} -> TabInfo0; TabN -> {TabN,Reason} end, case gb_sets:is_member(Tab, LocalTabs) of - true -> + true -> case ?catch_val({Tab, where_to_read}) == node() of true -> Acc; false -> @@ -841,12 +838,12 @@ late_disc_load(TabsR, Reason, RemoteLoaders, From, false -> Acc end end, - + Tabs = lists:foldl(Filter, [], TabsR), - + Nodes = val({current, db_nodes}), LateQueue = late_loaders(Tabs, RemoteLoaders, Nodes, LLQ), - State#state{late_loader_queue = LateQueue}. + State#state{late_loader_queue = LateQueue}. late_loaders([{Tab, Reason} | Tabs], RemoteLoaders, Nodes, LLQ) -> case gb_trees:is_defined(Tab, LLQ) of @@ -859,7 +856,7 @@ late_loaders([{Tab, Reason} | Tabs], RemoteLoaders, Nodes, LLQ) -> LateLoad = #late_load{table=Tab,loaders=LoadNodes,reason=Reason}, late_loaders(Tabs, RemoteLoaders, Nodes, gb_trees:insert(Tab,LateLoad,LLQ)); true -> - late_loaders(Tabs, RemoteLoaders, Nodes, LLQ) + late_loaders(Tabs, RemoteLoaders, Nodes, LLQ) end; late_loaders([], _RemoteLoaders, _Nodes, LLQ) -> LLQ. @@ -899,7 +896,7 @@ late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) -> end; late_load_filter([], _Tab, _Nodes, Acc) -> Acc. - + %%---------------------------------------------------------------------- %% Func: handle_cast/2 %% Returns: {noreply, State} | @@ -911,7 +908,7 @@ handle_cast({release_schema_commit_lock, _Owner}, State) -> if State#state.is_stopping == true -> {stop, shutdown, State}; - true -> + true -> case State#state.dumper_queue of [#schema_commit_lock{}|Rest] -> [_Worker | Rest] = State#state.dumper_queue, @@ -932,7 +929,7 @@ handle_cast(unblock_controller, State) -> [_Worker | Rest] = State#state.dumper_queue, State2 = State#state{dumper_pid = undefined, dumper_queue = Rest}, - State3 = opt_start_worker(State2), + State3 = opt_start_worker(State2), noreply(State3) end; @@ -948,31 +945,31 @@ handle_cast({mnesia_down, Node}, State) -> %% Fix if we are late_merging against the node that went down case State#state.schema_is_merged of - {false, Node} -> + {false, Node} -> spawn(?MODULE, call, [{schema_is_merged, [], late_merge, []}]); _ -> ignore end, - + %% Fix internal stuff LateQ = remove_loaders(Alltabs, Node, State#state.late_loader_queue), - + case get_senders(State) ++ get_loaders(State) of [] -> ignore; - Senders -> + Senders -> lists:foreach(fun({Pid,_}) -> Pid ! {copier_done, Node} end, Senders) end, - lists:foreach(fun(Pid) -> Pid ! {copier_done,Node} end, + lists:foreach(fun(Pid) -> Pid ! {copier_done,Node} end, State#state.others), - + Remove = fun(ST) -> node(ST#send_table.receiver_pid) /= Node end, NewSenders = lists:filter(Remove, State#state.sender_queue), Early = remove_early_messages(State#state.early_msgs, Node), - noreply(State#state{sender_queue = NewSenders, - early_msgs = Early, + noreply(State#state{sender_queue = NewSenders, + early_msgs = Early, late_loader_queue = LateQ }); @@ -981,8 +978,8 @@ handle_cast({merging_schema, Node}, State) -> false -> %% This comes from dynamic connect_nodes which are made %% after mnesia:start() and the schema_merge. - ImANewKidInTheBlock = - (val({schema, storage_type}) == ram_copies) + ImANewKidInTheBlock = + (val({schema, storage_type}) == ram_copies) andalso (mnesia_lib:val({schema, local_tables}) == [schema]), case ImANewKidInTheBlock of true -> %% I'm newly started ram_node.. @@ -1000,7 +997,7 @@ handle_cast(Msg, State) when State#state.schema_is_merged /= true -> noreply(State#state{early_msgs = [{cast, Msg} | Msgs]}); %% This must be done after schema_is_merged otherwise adopt_orphan -%% might trigger a table load from wrong nodes as a result of that we don't +%% might trigger a table load from wrong nodes as a result of that we don't %% know which tables we can load safly first. handle_cast({im_running, Node, NewFriends}, State) -> LocalTabs = mnesia_lib:local_active_tables() -- [schema], @@ -1027,7 +1024,7 @@ handle_cast({sync_tabs, Tabs, From}, State) -> handle_cast({i_have_tab, Tab, Node}, State) -> case lists:member(Node, val({current, db_nodes})) of - true -> + true -> State2 = node_has_tabs([Tab], Node, State), noreply(State2); false -> @@ -1043,10 +1040,10 @@ handle_cast({force_load_updated, Tab}, State) -> State2 = node_has_tabs([Tab], SomeNode, State), noreply(State2) end; - + handle_cast({master_nodes_updated, Tab, Masters}, State) -> Active = val({Tab, active_replicas}), - Valid = + Valid = case val({Tab, load_by_force}) of true -> Active; @@ -1066,10 +1063,10 @@ handle_cast({master_nodes_updated, Tab, Masters}, State) -> State2 = node_has_tabs([Tab], SomeNode, State), noreply(State2) end; - + handle_cast({adopt_orphans, Node, Tabs}, State) -> State2 = node_has_tabs(Tabs, Node, State), - + case ?catch_val({node_up,Node}) of true -> ignore; _ -> @@ -1101,7 +1098,7 @@ handle_cast(Msg, State) -> error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]), noreply(State). -handle_sync_tabs([Tab | Tabs], From) -> +handle_sync_tabs([Tab | Tabs], From) -> case val({Tab, where_to_read}) of nowhere -> case get({sync_tab, Tab}) of @@ -1145,7 +1142,7 @@ handle_info(#dumper_done{worker_pid=Pid, worker_res=Res}, State) -> {stop, fatal, State} end; -handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) -> +handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) -> LateQueue0 = State0#state.late_loader_queue, State1 = State0#state{loader_pid = lists:keydelete(WPid,1,get_loaders(State0))}, @@ -1153,7 +1150,7 @@ handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) -> case Done#loader_done.is_loaded of true -> %% Optional table announcement - if + if Done#loader_done.needs_announce == true, Done#loader_done.needs_reply == true -> i_have_tab(Tab), @@ -1187,7 +1184,7 @@ handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) -> State1#state{late_loader_queue=gb_trees:delete_any(Tab, LateQueue0)}; false -> %% Either the node went down or table was not - %% loaded remotly yet + %% loaded remotly yet case Done#loader_done.needs_reply of true -> reply(Done#loader_done.reply_to, @@ -1210,7 +1207,7 @@ handle_info(#sender_done{worker_pid=Pid, worker_res=Res}, State) -> Senders = get_senders(State), {value, {Pid,_Worker}} = lists:keysearch(Pid, 1, Senders), if - Res == ok -> + Res == ok -> State2 = State#state{sender_pid = lists:keydelete(Pid, 1, Senders)}, State3 = opt_start_worker(State2), noreply(State3); @@ -1252,7 +1249,7 @@ handle_info(Msg = {'EXIT', Pid, R}, State) when R /= wait_for_tables_timeout -> {stop, fatal, State}; false -> case lists:keymember(Pid, 1, get_loaders(State)) of - true -> + true -> fatal("Loader crashed: ~p~n state: ~p~n", [R, State]), {stop, fatal, State}; false -> @@ -1338,7 +1335,7 @@ code_change(_OldVsn, State0, _Extra) -> State1 = case State0#state.loader_pid of Pids when is_list(Pids) -> State0; undefined -> State0#state{loader_pid = [],loader_queue=gb_trees:empty()}; - Pid when is_pid(Pid) -> + Pid when is_pid(Pid) -> [Loader|Rest] = State0#state.loader_queue, LQ0 = [{element(2,Rec),Rec} || Rec <- Rest], LQ1 = lists:sort(LQ0), @@ -1346,7 +1343,7 @@ code_change(_OldVsn, State0, _Extra) -> State0#state{loader_pid=[{Pid,Loader}], loader_queue=LQ} end, %% LateLoaderQueue - State = if is_list(State1#state.late_loader_queue) -> + State = if is_list(State1#state.late_loader_queue) -> LLQ0 = State1#state.late_loader_queue, LLQ1 = lists:sort([{element(2,Rec),Rec} || Rec <- LLQ0]), LLQ = gb_trees:from_orddict(LLQ1), @@ -1355,7 +1352,7 @@ code_change(_OldVsn, State0, _Extra) -> State1 end, {ok, State}. - + %%%---------------------------------------------------------------------- %%% Internal functions %%%---------------------------------------------------------------------- @@ -1365,20 +1362,20 @@ maybe_log_mnesia_down(N) -> %% so if we are not running (i.e haven't decided which tables %% to load locally), don't log mnesia_down yet. case mnesia_lib:is_running() of - yes -> + yes -> verbose("Logging mnesia_down ~w~n", [N]), mnesia_recover:log_mnesia_down(N), ok; - _ -> + _ -> Filter = fun(Tab) -> inactive_copy_holders(Tab, N) end, HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]), - if + if HalfLoadedTabs == true -> verbose("Logging mnesia_down ~w~n", [N]), mnesia_recover:log_mnesia_down(N), - ok; + ok; true -> %% Unfortunately we have not loaded some common %% tables yet, so we cannot rely on the nodedown @@ -1407,7 +1404,7 @@ orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) -> BeingCreated = (?catch_val({Tab, create_table}) == true), Read = val({Tab, where_to_read}), case lists:member(Node, DiscCopyHolders) of - _ when BeingCreated == true -> + _ when BeingCreated == true -> orphan_tables(Tabs, Node, Ns, Local, Remote); _ when Read == node() -> %% Allready loaded orphan_tables(Tabs, Node, Ns, Local, Remote); @@ -1445,13 +1442,13 @@ orphan_tables([], _, _, LocalOrphans, RemoteMasters) -> {LocalOrphans, RemoteMasters}. node_has_tabs([Tab | Tabs], Node, State) when Node /= node() -> - State2 = + State2 = case catch update_whereabouts(Tab, Node, State) of State1 = #state{} -> State1; {'EXIT', R} -> %% Tab was just deleted? case ?catch_val({Tab, cstruct}) of {'EXIT', _} -> State; % yes - _ -> erlang:error(R) + _ -> erlang:error(R) end end, node_has_tabs(Tabs, Node, State2); @@ -1477,14 +1474,14 @@ update_whereabouts(Tab, Node, State) -> true -> lists:member(Node, Masters) end, - + dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n", [Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]), if LocalC == true -> %% Local contents, don't care about other node State; - BeingCreated == true -> + BeingCreated == true -> %% The table is currently being created %% It will be handled elsewhere State; @@ -1501,8 +1498,8 @@ update_whereabouts(Tab, Node, State) -> State end; Storage == unknown -> - %% No own copy, continue to read remotely - add_active_replica(Tab, Node), + %% No own copy, continue to read remotely + add_active_replica(Tab, Node), NodeST = mnesia_lib:storage_type_at_node(Node, Tab), ReadST = mnesia_lib:storage_type_at_node(Read, Tab), if %% Avoid reading from disc_only_copies @@ -1542,16 +1539,16 @@ initial_safe_loads() -> Tabs = val({schema, local_tables}) -- [schema], LastC = fun(T) -> last_consistent_replica(T, Downs) end, lists:zf(LastC, Tabs); - + disc_copies -> Downs = mnesia_recover:get_mnesia_downs(), dbg_out("mnesia_downs = ~p~n", [Downs]), - + Tabs = val({schema, local_tables}) -- [schema], LastC = fun(T) -> last_consistent_replica(T, Downs) end, lists:zf(LastC, Tabs) end. - + last_consistent_replica(Tab, Downs) -> Cs = val({Tab, cstruct}), Storage = mnesia_lib:cs_to_storage_type(node(), Cs), @@ -1628,7 +1625,7 @@ remove_early_messages([], _Node) -> []; remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) -> remove_early_messages(R, Node); %% Does a reply before queuing -remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node) +remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node) when node(From) == Node -> reply(ReplyTo, ok), %% Remove gen:server waits.. remove_early_messages(R, Node); @@ -1682,9 +1679,9 @@ is_tab_blocked(W2C) when is_list(W2C) -> is_tab_blocked({blocked, W2C}) when is_list(W2C) -> {true, W2C}. -mark_blocked_tab(true, Value) -> +mark_blocked_tab(true, Value) -> {blocked, Value}; -mark_blocked_tab(false, Value) -> +mark_blocked_tab(false, Value) -> Value. %% @@ -1717,7 +1714,7 @@ del_active_replica(Tab, Node) -> update_where_to_wlock(Tab). change_table_access_mode(Cs) -> - W = fun() -> + W = fun() -> Tab = Cs#cstruct.name, lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end, val({Tab, active_replicas})) @@ -1746,7 +1743,7 @@ update_where_to_wlock(Tab) -> unannounce_add_table_copy(Tab, To) -> catch del_active_replica(Tab, To), case catch val({Tab , where_to_read}) of - To -> + To -> mnesia_lib:set_remote_where_to_read(Tab); _ -> ignore @@ -1759,7 +1756,7 @@ user_sync_tab(Tab) -> _ -> ignore end, - + case erase({sync_tab, Tab}) of undefined -> ok; @@ -1778,11 +1775,11 @@ i_have_tab(Tab) -> sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema -> Current = val({current, db_nodes}), - Ns = + Ns = case lists:member(ToNode, Current) of true -> Current -- [ToNode]; false -> Current - end, + end, remote_call(ToNode, block_table, [Tab]), [remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) || Node <- [ToNode | Ns]], @@ -1827,7 +1824,7 @@ get_workers(Timeout) -> {timeout, Timeout} end end. - + info() -> Tabs = mnesia_lib:local_active_tables(), io:format( "---> Active tables <--- ~n", []), @@ -1836,12 +1833,12 @@ info() -> info([Tab | Tail]) -> case val({Tab, storage_type}) of disc_only_copies -> - info_format(Tab, - dets:info(Tab, size), + info_format(Tab, + dets:info(Tab, size), dets:info(Tab, file_size), "bytes on disc"); _ -> - info_format(Tab, + info_format(Tab, ?ets_info(Tab, size), ?ets_info(Tab, memory), "words of mem") @@ -1881,7 +1878,7 @@ handle_early_msg({cast, Msg}, State) -> handle_cast(Msg, State); handle_early_msg({info, Msg}, State) -> handle_info(Msg, State). - + noreply(State) -> {noreply, State}. @@ -1929,7 +1926,7 @@ add_worker(Worker = #send_table{}, State) -> add_worker(Worker = #disc_load{}, State) -> opt_start_worker(add_loader(Worker#disc_load.table,Worker,State)); % Block controller should be used for upgrading mnesia. -add_worker(Worker = #block_controller{}, State) -> +add_worker(Worker = #block_controller{}, State) -> Queue = State#state.dumper_queue, Queue2 = [Worker | Queue], State2 = State#state{dumper_queue = Queue2}, @@ -1938,13 +1935,13 @@ add_worker(Worker = #block_controller{}, State) -> add_loader(Tab,Worker,State = #state{loader_queue=LQ0}) -> case gb_trees:is_defined(Tab, LQ0) of true -> State; - false -> + false -> LQ=gb_trees:insert(Tab, Worker, LQ0), State#state{loader_queue=LQ} end. %% Optionally start a worker -%% +%% %% Dumpers and loaders may run simultaneously %% but neither of them may run during schema commit. %% Loaders may not start if a schema commit is enqueued. @@ -1958,7 +1955,7 @@ opt_start_worker(State) -> %% Great, a worker in queue and neither %% a schema transaction is being %% committed and nor a dumper is running - + %% Start worker but keep him in the queue if is_record(Worker, schema_commit_lock) -> @@ -1966,7 +1963,7 @@ opt_start_worker(State) -> reply(ReplyTo, granted), {Owner, _Tag} = ReplyTo, opt_start_loader(State#state{dumper_pid = Owner}); - + is_record(Worker, dump_log) -> Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]), State2 = State#state{dumper_pid = Pid}, @@ -1976,7 +1973,7 @@ opt_start_worker(State) -> %% or sender State3 = opt_start_sender(State2), opt_start_loader(State3); - + is_record(Worker, block_controller) -> case {get_senders(State), get_loaders(State)} of {[], []} -> @@ -1989,7 +1986,7 @@ opt_start_worker(State) -> end end; _ -> - %% Bad luck, try with a loader or sender instead + %% Bad luck, try with a loader or sender instead State2 = opt_start_sender(State), opt_start_loader(State2) end. @@ -1997,8 +1994,8 @@ opt_start_worker(State) -> opt_start_sender(State) -> case State#state.sender_queue of []-> State; %% No need - SenderQ -> - {NewS,Kept} = opt_start_sender2(SenderQ, get_senders(State), + SenderQ -> + {NewS,Kept} = opt_start_sender2(SenderQ, get_senders(State), [], get_loaders(State)), State#state{sender_pid = NewS, sender_queue = Kept} end. @@ -2007,11 +2004,11 @@ opt_start_sender2([], Pids,Kept, _) -> {Pids,Kept}; opt_start_sender2([Sender|R], Pids, Kept, LoaderQ) -> Tab = Sender#send_table.table, Active = val({Tab, active_replicas}), - IgotIt = lists:member(node(), Active), - IsLoading = lists:any(fun({_Pid,Loader}) -> + IgotIt = lists:member(node(), Active), + IsLoading = lists:any(fun({_Pid,Loader}) -> Tab == element(#net_load.table, Loader) end, LoaderQ), - if + if IgotIt, IsLoading -> %% I'm currently finishing loading the table let him wait opt_start_sender2(R,Pids, [Sender|Kept], LoaderQ); @@ -2029,11 +2026,11 @@ opt_start_loader(State = #state{loader_queue = LoaderQ}) -> Current = get_loaders(State), Max = max_loaders(), case gb_trees:is_empty(LoaderQ) of - true -> + true -> State; - _ when length(Current) >= Max -> + _ when length(Current) >= Max -> State; - false -> + false -> SchemaQueue = State#state.dumper_queue, case lists:keymember(schema_commit_lock, 1, SchemaQueue) of false -> @@ -2064,7 +2061,7 @@ already_loading(#disc_load{table=Tab},Loaders) -> already_loading2(Tab, [{_,#net_load{table=Tab}}|_]) -> true; already_loading2(Tab, [{_,#disc_load{table=Tab}}|_]) -> true; -already_loading2(Tab, [_|Rest]) -> already_loading2(Tab,Rest); +already_loading2(Tab, [_|Rest]) -> already_loading2(Tab,Rest); already_loading2(_,[]) -> false. start_remote_sender(Node, Tab, Receiver, Storage) -> @@ -2093,8 +2090,8 @@ send_and_reply(ReplyTo, Worker) -> load_and_reply(ReplyTo, Worker) -> Load = load_table_fun(Worker), - SendAndReply = - fun() -> + SendAndReply = + fun() -> process_flag(trap_exit, true), Done = Load(), ReplyTo ! Done#loader_done{worker_pid = self()}, @@ -2161,7 +2158,7 @@ load_table_fun(#disc_load{table=Tab, reason=Reason, opt_reply_to=ReplyTo}) -> ReadNode == nowhere -> %% Already loaded on other node, lets get it Cs = val({Tab, cstruct}), - fun() -> + fun() -> case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of {loaded, ok} -> Done#loader_done{needs_sync = true}; @@ -2204,10 +2201,10 @@ filter_active(Tab) -> Active = val({Tab, active_replicas}), Masters = mnesia_recover:get_master_nodes(Tab), Ns = do_filter_active(ByForce, Active, Masters), - %% Reorder the so that we load from fastest first + %% Reorder the so that we load from fastest first LS = ?catch_val({Tab, storage_type}), DOC = val({Tab, disc_only_copies}), - {Good,Worse} = + {Good,Worse} = case LS of disc_only_copies -> G = mnesia_lib:intersect(Ns, DOC), @@ -2218,7 +2215,7 @@ filter_active(Tab) -> end, %% Pick a random node of the fastest Len = length(Good), - if + if Len > 0 -> R = erlang:phash(node(), Len+1), random(R-1,Good,Worse); @@ -2237,5 +2234,5 @@ do_filter_active(false, Active, []) -> Active; do_filter_active(false, Active, Masters) -> mnesia_lib:intersect(Active, Masters). - + |