%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1996-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
%%
-module(mnesia_monitor).
-behaviour(gen_server).
%% Public exports
-export([
close_dets/1,
close_log/1,
detect_inconcistency/2,
get_env/1,
init/0,
mktab/2,
unsafe_mktab/2,
unsafe_create_external/4,
mnesia_down/2,
needs_protocol_conversion/1,
negotiate_protocol/1,
disconnect/1,
open_dets/2,
unsafe_open_dets/2,
open_log/1,
patch_env/2,
protocol_version/0,
reopen_log/3,
set_env/2,
start/0,
start_proc/4,
sync_log/1,
terminate_proc/3,
unsafe_close_dets/1,
unsafe_close_log/1,
use_dir/0,
do_check_type/2
]).
%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
%% Internal exports
-export([
call/1,
cast/1,
detect_partitioned_network/2,
has_remote_mnesia_down/1,
negotiate_protocol_impl/2
]).
-compile({no_auto_import,[error/2]}).
-import(mnesia_lib, [dbg_out/2, verbose/2, error/2, fatal/2, set/2]).
-include("mnesia.hrl").
-record(state, {supervisor, pending_negotiators = [],
going_down = [], tm_started = false, early_connects = [],
connecting, mq = [], remote_node_status = []}).
-define(current_protocol_version, {8,4}).
-define(previous_protocol_version, {8,3}).
start() ->
gen_server:start_link({local, ?MODULE}, ?MODULE,
[self()], [{timeout, infinity}
%% ,{debug, [trace]}
]).
init() ->
call(init).
mnesia_down(From, Node) ->
cast({mnesia_down, From, Node}).
mktab(Tab, Args) ->
unsafe_call({mktab, Tab, Args}).
unsafe_mktab(Tab, Args) ->
unsafe_call({unsafe_mktab, Tab, Args}).
open_dets(Tab, Args) ->
unsafe_call({open_dets, Tab, Args}).
unsafe_open_dets(Tab, Args) ->
unsafe_call({unsafe_open_dets, Tab, Args}).
close_dets(Tab) ->
unsafe_call({close_dets, Tab}).
unsafe_close_dets(Name) ->
unsafe_call({unsafe_close_dets, Name}).
open_log(Args) ->
unsafe_call({open_log, Args}).
reopen_log(Name, Fname, Head) ->
unsafe_call({reopen_log, Name, Fname, Head}).
sync_log(Name) ->
unsafe_call({sync_log, Name}).
close_log(Name) ->
unsafe_call({close_log, Name}).
unsafe_close_log(Name) ->
unsafe_call({unsafe_close_log, Name}).
unsafe_create_external(Tab, Alias, Mod, Cs) ->
unsafe_call({unsafe_create_external, Tab, Alias, Mod, Cs}).
disconnect(Node) ->
cast({disconnect, Node}).
%% Returns GoodNoodes
%% Creates a link to each compatible monitor and
%% protocol_version to agreed version upon success
negotiate_protocol([]) -> [];
negotiate_protocol(Nodes) ->
call({negotiate_protocol, Nodes}).
negotiate_protocol_impl(Nodes, Requester) ->
Version = mnesia:system_info(version),
Protocols = acceptable_protocol_versions(),
MonitorPid = whereis(?MODULE),
Msg = {negotiate_protocol, MonitorPid, Version, Protocols},
{Replies, _BadNodes} = multicall(Nodes, Msg),
Res = check_protocol(Replies, Protocols),
?MODULE ! {protocol_negotiated,Requester,Res},
unlink(whereis(?MODULE)),
ok.
check_protocol([{Node, {accept, Mon, Version, Protocol}} | Tail], Protocols) ->
case lists:member(Protocol, Protocols) of
true ->
case Protocol == protocol_version() of
true ->
set({protocol, Node}, {Protocol, false});
false ->
set({protocol, Node}, {Protocol, true})
end,
[node(Mon) | check_protocol(Tail, Protocols)];
false ->
verbose("Failed to connect with ~p. ~p protocols rejected. "
"expected version = ~p, expected protocol = ~p~n",
[Node, Protocols, Version, Protocol]),
unlink(Mon), % Get rid of unnecessary link
check_protocol(Tail, Protocols)
end;
check_protocol([{Node, {reject, _Mon, Version, Protocol}} | Tail], Protocols) ->
verbose("Failed to connect with ~p. ~p protocols rejected. "
"expected version = ~p, expected protocol = ~p~n",
[Node, Protocols, Version, Protocol]),
check_protocol(Tail, Protocols);
check_protocol([{error, _Reason} | Tail], Protocols) ->
dbg_out("~p connect failed error: ~tp~n", [?MODULE, _Reason]),
check_protocol(Tail, Protocols);
check_protocol([{badrpc, _Reason} | Tail], Protocols) ->
dbg_out("~p connect failed badrpc: ~tp~n", [?MODULE, _Reason]),
check_protocol(Tail, Protocols);
check_protocol([], [Protocol | _Protocols]) ->
set(protocol_version, Protocol),
[].
protocol_version() ->
case ?catch_val(protocol_version) of
{'EXIT', _} -> ?current_protocol_version;
Version -> Version
end.
%% A sorted list of acceptable protocols the
%% preferred protocols are first in the list
acceptable_protocol_versions() ->
[protocol_version(), ?previous_protocol_version].
needs_protocol_conversion(Node) ->
case {?catch_val({protocol, Node}), protocol_version()} of
{{'EXIT', _}, _} ->
false;
{{_, Bool}, ?current_protocol_version} ->
Bool;
{{_, Bool}, _} ->
not Bool
end.
cast(Msg) ->
case whereis(?MODULE) of
undefined -> ok;
Pid -> gen_server:cast(Pid, Msg)
end.
unsafe_call(Msg) ->
case whereis(?MODULE) of
undefined -> {error, {node_not_running, node()}};
Pid -> gen_server:call(Pid, Msg, infinity)
end.
call(Msg) ->
case whereis(?MODULE) of
undefined ->
{error, {node_not_running, node()}};
Pid ->
link(Pid),
Res = gen_server:call(Pid, Msg, infinity),
unlink(Pid),
%% We get an exit signal if server dies
receive
{'EXIT', Pid, _Reason} ->
{error, {node_not_running, node()}}
after 0 ->
Res
end
end.
multicall(Nodes, Msg) ->
rpc:multicall(Nodes, ?MODULE, call, [Msg]).
start_proc(Who, Mod, Fun, Args) ->
Args2 = [Who, Mod, Fun, Args],
proc_lib:start_link(mnesia_sp, init_proc, Args2, infinity).
terminate_proc(Who, R, State) when R /= shutdown, R /= killed ->
fatal("~p crashed: ~p state: ~tp~n", [Who, R, State]);
terminate_proc(Who, Reason, _State) ->
mnesia_lib:verbose("~p terminated: ~tp~n", [Who, Reason]),
ok.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Callback functions from gen_server
%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% {stop, Reason}
%%----------------------------------------------------------------------
init([Parent]) ->
process_flag(trap_exit, true),
?ets_new_table(mnesia_gvar, [set, public, named_table]),
?ets_new_table(mnesia_stats, [set, public, named_table]),
set(subscribers, []),
set(activity_subscribers, []),
mnesia_lib:verbose("~p starting: ~p~n", [?MODULE, self()]),
Version = mnesia:system_info(version),
set(version, Version),
dbg_out("Version: ~p~n", [Version]),
try process_config_args(env()) of
ok ->
mnesia_lib:set({'$$$_report', current_pos}, 0),
Level = mnesia_lib:val(debug),
mnesia_lib:verbose("Mnesia debug level set to ~p\n", [Level]),
set(mnesia_status, starting), %% set start status
set({current, db_nodes}, [node()]),
set(use_dir, use_dir()),
mnesia_lib:create_counter(trans_aborts),
mnesia_lib:create_counter(trans_commits),
mnesia_lib:create_counter(trans_log_writes),
Left = get_env(dump_log_write_threshold),
mnesia_lib:set_counter(trans_log_writes_left, Left),
mnesia_lib:create_counter(trans_log_writes_prev),
mnesia_lib:create_counter(trans_restarts),
mnesia_lib:create_counter(trans_failures),
set(checkpoints, []),
set(pending_checkpoints, []),
set(pending_checkpoint_pids, []),
{ok, #state{supervisor = Parent}}
catch _:Reason ->
mnesia_lib:report_fatal("Bad configuration: ~tp~n", [Reason]),
{stop, {bad_config, Reason}}
end.
use_dir() ->
case ?catch_val(use_dir) of
{'EXIT', _} ->
case get_env(schema_location) of
disc -> true;
opt_disc -> non_empty_dir();
ram -> false
end;
Bool ->
Bool
end.
%% Returns true if the Mnesia directory contains
%% important files
non_empty_dir() ->
mnesia_lib:exists(mnesia_bup:fallback_bup()) or
mnesia_lib:exists(mnesia_lib:tab2dmp(schema)) or
mnesia_lib:exists(mnesia_lib:tab2dat(schema)).
%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call({mktab, Tab, Args}, _From, State) ->
try ?ets_new_table(Tab, Args) of
Reply ->
{reply, Reply, State}
catch error:ExitReason ->
Msg = "Cannot create ets table",
Reason = {system_limit, Msg, Tab, Args, ExitReason},
fatal("~tp~n", [Reason]),
{noreply, State}
end;
handle_call({unsafe_mktab, Tab, Args}, _From, State) ->
try ?ets_new_table(Tab, Args) of
Reply ->
{reply, Reply, State}
catch error:ExitReason ->
{reply, {error, ExitReason}, State}
end;
handle_call({open_dets, Tab, Args}, _From, State) ->
case mnesia_lib:dets_sync_open(Tab, Args) of
{ok, Tab} ->
{reply, {ok, Tab}, State};
{error, Reason} ->
Msg = "Cannot open dets table",
Error = {error, {Msg, Tab, Args, Reason}},
fatal("~tp~n", [Error]),
{noreply, State}
end;
handle_call({unsafe_open_dets, Tab, Args}, _From, State) ->
case mnesia_lib:dets_sync_open(Tab, Args) of
{ok, Tab} ->
{reply, {ok, Tab}, State};
{error, Reason} ->
{reply, {error,Reason}, State}
end;
handle_call({close_dets, Tab}, _From, State) ->
ok = mnesia_lib:dets_sync_close(Tab),
{reply, ok, State};
handle_call({unsafe_close_dets, Tab}, _From, State) ->
mnesia_lib:dets_sync_close(Tab),
{reply, ok, State};
handle_call({open_log, Args}, _From, State) ->
Res = disk_log:open([{notify, true}|Args]),
{reply, Res, State};
handle_call({reopen_log, Name, Fname, Head}, _From, State) ->
case disk_log:reopen(Name, Fname, Head) of
ok ->
{reply, ok, State};
{error, Reason} ->
Msg = "Cannot rename disk_log file",
Error = {error, {Msg, Name, Fname, Head, Reason}},
fatal("~tp~n", [Error]),
{noreply, State}
end;
handle_call({sync_log, Name}, _From, State) ->
{reply, disk_log:sync(Name), State};
handle_call({close_log, Name}, _From, State) ->
case disk_log:close(Name) of
ok ->
{reply, ok, State};
{error, Reason} ->
Msg = "Cannot close disk_log file",
Error = {error, {Msg, Name, Reason}},
fatal("~tp~n", [Error]),
{noreply, State}
end;
handle_call({unsafe_close_log, Name}, _From, State) ->
_ = disk_log:close(Name),
{reply, ok, State};
handle_call({unsafe_create_external, Tab, Alias, Mod, Cs}, _From, State) ->
case catch Mod:create_table(Alias, Tab, mnesia_schema:cs2list(Cs)) of
{'EXIT', ExitReason} ->
{reply, {error, ExitReason}, State};
Reply ->
{reply, Reply, State}
end;
handle_call({negotiate_protocol, Mon, _Version, _Protocols}, _From, State)
when State#state.tm_started == false ->
State2 = State#state{early_connects = [node(Mon) | State#state.early_connects]},
{reply, {node(), {reject, self(), uninitialized, uninitialized}}, State2};
%% From remote monitor..
handle_call({negotiate_protocol, Mon, Version, Protocols}, From, State)
when node(Mon) /= node() ->
Protocol = protocol_version(),
MyVersion = mnesia:system_info(version),
case lists:member(Protocol, Protocols) of
true ->
accept_protocol(Mon, MyVersion, Protocol, From, State);
false ->
%% in this release we should be able to handle the previous
%% protocol
case hd(Protocols) of
?previous_protocol_version ->
accept_protocol(Mon, MyVersion, ?previous_protocol_version, From, State);
_ ->
verbose("Connection with ~p rejected. "
"version = ~p, protocols = ~p, "
"expected version = ~p, expected protocol = ~p~n",
[node(Mon), Version, Protocols, MyVersion, Protocol]),
{reply, {node(), {reject, self(), MyVersion, Protocol}}, State}
end
end;
%% Local request to negotiate with other monitors (nodes).
handle_call({negotiate_protocol, Nodes}, From, State) ->
case mnesia_lib:intersect(State#state.going_down, Nodes) of
[] ->
spawn_link(?MODULE, negotiate_protocol_impl, [Nodes, From]),
{noreply, State#state{connecting={From,Nodes}}};
_ -> %% Cannot connect now, still processing mnesia down
{reply, busy, State}
end;
handle_call(init, _From, State) ->
_ = net_kernel:monitor_nodes(true),
EarlyNodes = State#state.early_connects,
State2 = State#state{tm_started = true},
{reply, EarlyNodes, State2};
handle_call(Msg, _From, State) ->
error("~p got unexpected call: ~tp~n", [?MODULE, Msg]),
{noreply, State}.
accept_protocol(Mon, Version, Protocol, From, State) ->
Reply = {node(), {accept, self(), Version, Protocol}},
Node = node(Mon),
Pending0 = State#state.pending_negotiators,
Pending = lists:keydelete(Node, 1, Pending0),
case lists:member(Node, State#state.going_down) of
true ->
%% Wait for the mnesia_down to be processed,
%% before we reply
P = Pending ++ [{Node, Mon, From, Reply}],
{noreply, State#state{pending_negotiators = P}};
false ->
%% No need for wait
link(Mon), %% link to remote Monitor
case Protocol == protocol_version() of
true ->
set({protocol, Node}, {Protocol, false});
false ->
set({protocol, Node}, {Protocol, true})
end,
{reply, Reply, State#state{pending_negotiators = Pending}}
end.
%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast({mnesia_down, mnesia_controller, Node}, State) ->
mnesia_tm:mnesia_down(Node),
{noreply, State};
handle_cast({mnesia_down, mnesia_tm, Node}, State) ->
Down = {mnesia_down, Node},
mnesia_lib:report_system_event(Down),
GoingDown = lists:delete(Node, State#state.going_down),
State2 = State#state{going_down = GoingDown},
Pending = State#state.pending_negotiators,
State3 = check_raise_conditon_nodeup(Node, State2),
case lists:keysearch(Node, 1, Pending) of
{value, {Node, Mon, ReplyTo, Reply}} ->
%% Late reply to remote monitor
link(Mon), %% link to remote Monitor
gen_server:reply(ReplyTo, Reply),
P2 = lists:keydelete(Node, 1,Pending),
State4 = State3#state{pending_negotiators = P2},
process_q(State4);
false ->
%% No pending remote monitors
process_q(State3)
end;
handle_cast({disconnect, Node}, State) ->
case rpc:call(Node, erlang, whereis, [?MODULE]) of
{badrpc, _} ->
ignore;
undefined ->
ignore;
RemoteMon when is_pid(RemoteMon) ->
unlink(RemoteMon)
end,
{noreply, State};
handle_cast({inconsistent_database, Context, Node}, State) ->
Msg = {inconsistent_database, Context, Node},
mnesia_lib:report_system_event(Msg),
{noreply, State};
handle_cast(Msg, State) ->
error("~p got unexpected cast: ~tp~n", [?MODULE, Msg]),
{noreply, State}.
%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor ->
dbg_out("~p was ~p by supervisor~n",[?MODULE, R]),
{stop, R, State};
handle_info({'EXIT', Pid, fatal}, State) when node(Pid) == node() ->
dbg_out("~p got FATAL ERROR from: ~p~n",[?MODULE, Pid]),
%% This may hang supervisor if a shutdown happens at the same time as an fatal
%% is in progress
%% exit(State#state.supervisor, shutdown),
%% It is better to kill an innocent process
?SAFE(exit(whereis(mnesia_locker), kill)),
{noreply, State};
handle_info(Msg = {'EXIT',Pid,_}, State) ->
Node = node(Pid),
if
Node /= node(), State#state.connecting == undefined ->
%% Remotly linked process died, assume that it was a mnesia_monitor
mnesia_recover:mnesia_down(Node),
mnesia_controller:mnesia_down(Node),
{noreply, State#state{going_down = [Node | State#state.going_down]}};
Node /= node() ->
{noreply, State#state{mq = State#state.mq ++ [{info, Msg}]}};
true ->
%% We have probably got an exit signal from
%% disk_log or dets
Hint = "Hint: check that the disk still is writable",
fatal("~p got unexpected info: ~tp; ~p~n",
[?MODULE, Msg, Hint])
end;
handle_info({protocol_negotiated, From,Res}, State) ->
From = element(1,State#state.connecting),
gen_server:reply(From, Res),
process_q(State#state{connecting = undefined});
handle_info({check_nodeup, Node}, State) ->
State2 = check_mnesia_down(Node, State),
{noreply, State2};
handle_info({nodeup, Node}, State) ->
State2 = remote_node_status(Node, up, State),
State3 = check_mnesia_down(Node, State2),
{noreply, State3};
handle_info({nodedown, Node}, State) ->
State2 = remote_node_status(Node, down, State),
{noreply, State2};
handle_info({disk_log, _Node, Log, Info}, State) ->
case Info of
{truncated, _No} ->
ok;
_ ->
mnesia_lib:important("Warning Log file ~tp error reason ~ts~n",
[Log, disk_log:format_error(Info)])
end,
{noreply, State};
handle_info(Msg, State) ->
error("~p got unexpected info (~tp): ~tp~n", [?MODULE, State, Msg]).
process_q(State = #state{mq=[]}) -> {noreply,State};
process_q(State = #state{mq=[{info,Msg}|R]}) ->
handle_info(Msg, State#state{mq=R});
process_q(State = #state{mq=[{cast,Msg}|R]}) ->
handle_cast(Msg, State#state{mq=R});
process_q(State = #state{mq=[{call,From,Msg}|R]}) ->
handle_call(Msg, From, State#state{mq=R}).
%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason, State) ->
terminate_proc(?MODULE, Reason, State).
%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Upgrade process when its code is to be changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(_, {state, SUP, PN, GD, TMS, EC}, _) ->
{ok, #state{supervisor=SUP, pending_negotiators=PN,
going_down = GD, tm_started =TMS, early_connects = EC}};
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
process_config_args([]) ->
ok;
process_config_args([C|T]) ->
V = get_env(C),
dbg_out("Env ~p: ~p~n", [C, V]),
mnesia_lib:set(C, V),
process_config_args(T).
set_env(E,Val) ->
mnesia_lib:set(E, check_type(E,Val)),
ok.
get_env(E) ->
case ?catch_val(E) of
{'EXIT', _} ->
case application:get_env(mnesia, E) of
{ok, Val} ->
check_type(E, Val);
undefined ->
check_type(E, default_env(E))
end;
Val ->
Val
end.
env() ->
[
access_module,
allow_index_on_key,
auto_repair,
backup_module,
debug,
dir,
dump_disc_copies_at_startup,
dump_log_load_regulation,
dump_log_time_threshold,
dump_log_update_in_place,
dump_log_write_threshold,
event_module,
extra_db_nodes,
ignore_fallback_at_startup,
fallback_error_function,
fold_chunk_size,
max_wait_for_decision,
schema_location,
core_dir,
pid_sort_order,
no_table_loaders,
dc_dump_limit,
send_compressed,
schema
].
default_env(access_module) ->
mnesia;
default_env(auto_repair) ->
true;
default_env(allow_index_on_key) ->
false;
default_env(backup_module) ->
mnesia_backup;
default_env(debug) ->
none;
default_env(dir) ->
Name = lists:concat(["Mnesia.", node()]),
filename:absname(Name);
default_env(dump_disc_copies_at_startup) ->
true;
default_env(dump_log_load_regulation) ->
false;
default_env(dump_log_time_threshold) ->
timer:minutes(3);
default_env(dump_log_update_in_place) ->
true;
default_env(dump_log_write_threshold) ->
1000;
default_env(event_module) ->
mnesia_event;
default_env(extra_db_nodes) ->
[];
default_env(ignore_fallback_at_startup) ->
false;
default_env(fallback_error_function) ->
{mnesia, lkill};
default_env(fold_chunk_size) ->
100;
default_env(max_wait_for_decision) ->
infinity;
default_env(schema_location) ->
opt_disc;
default_env(core_dir) ->
false;
default_env(pid_sort_order) ->
false;
default_env(no_table_loaders) ->
2;
default_env(dc_dump_limit) ->
4;
default_env(send_compressed) ->
0;
default_env(schema) ->
[].
check_type(Env, Val) ->
try do_check_type(Env, Val)
catch error:_ -> exit({bad_config, Env, Val})
end.
do_check_type(access_module, A) when is_atom(A) -> A;
do_check_type(allow_index_on_key, B) -> bool(B);
do_check_type(auto_repair, B) -> bool(B);
do_check_type(backup_module, B) when is_atom(B) -> B;
do_check_type(debug, debug) -> debug;
do_check_type(debug, false) -> none;
do_check_type(debug, none) -> none;
do_check_type(debug, trace) -> trace;
do_check_type(debug, true) -> debug;
do_check_type(debug, verbose) -> verbose;
do_check_type(dir, V) -> filename:absname(V);
do_check_type(dump_disc_copies_at_startup, B) -> bool(B);
do_check_type(dump_log_load_regulation, B) -> bool(B);
do_check_type(dump_log_time_threshold, I) when is_integer(I), I > 0 -> I;
do_check_type(dump_log_update_in_place, B) -> bool(B);
do_check_type(dump_log_write_threshold, I) when is_integer(I), I > 0 -> I;
do_check_type(event_module, A) when is_atom(A) -> A;
do_check_type(ignore_fallback_at_startup, B) -> bool(B);
do_check_type(fallback_error_function, {Mod, Func})
when is_atom(Mod), is_atom(Func) -> {Mod, Func};
do_check_type(extra_db_nodes, L) when is_list(L) ->
Fun = fun(N) when N == node() -> false;
(A) when is_atom(A) -> true
end,
lists:filter(Fun, L);
do_check_type(fold_chunk_size, I) when is_integer(I), I > 0;
I =:= infinity -> I;
do_check_type(max_wait_for_decision, infinity) -> infinity;
do_check_type(max_wait_for_decision, I) when is_integer(I), I > 0 -> I;
do_check_type(schema_location, M) -> media(M);
do_check_type(core_dir, "false") -> false;
do_check_type(core_dir, false) -> false;
do_check_type(core_dir, Dir) when is_list(Dir) -> Dir;
do_check_type(pid_sort_order, r9b_plain) -> r9b_plain;
do_check_type(pid_sort_order, "r9b_plain") -> r9b_plain;
do_check_type(pid_sort_order, standard) -> standard;
do_check_type(pid_sort_order, "standard") -> standard;
do_check_type(pid_sort_order, _) -> false;
do_check_type(no_table_loaders, N) when is_integer(N), N > 0 -> N;
do_check_type(dc_dump_limit,N) when is_number(N), N > 0 -> N;
do_check_type(send_compressed, L) when is_integer(L), L >= 0, L =< 9 -> L;
do_check_type(schema, L) when is_list(L) -> L.
bool(true) -> true;
bool(false) -> false.
media(disc) -> disc;
media(opt_disc) -> opt_disc;
media(ram) -> ram.
patch_env(Env, Val) ->
try do_check_type(Env, Val) of
NewVal ->
application_controller:set_env(mnesia, Env, NewVal),
NewVal
catch error:_ ->
{error, {bad_type, Env, Val}}
end.
detect_partitioned_network(Mon, Node) ->
detect_inconcistency([Node], running_partitioned_network),
unlink(Mon),
exit(normal).
detect_inconcistency([], _Context) ->
ok;
detect_inconcistency(Nodes, Context) ->
Downs = [N || N <- Nodes, mnesia_recover:has_mnesia_down(N)],
{Replies, _BadNodes} =
rpc:multicall(Downs, ?MODULE, has_remote_mnesia_down, [node()]),
report_inconsistency(Replies, Context, ok).
has_remote_mnesia_down(Node) ->
HasDown = mnesia_recover:has_mnesia_down(Node),
Master = mnesia_recover:get_master_nodes(schema),
if
HasDown == true, Master == [] ->
{true, node()};
true ->
{false, node()}
end.
report_inconsistency([{true, Node} | Replies], Context, _Status) ->
%% Oops, Mnesia is already running on the
%% other node AND we both regard each
%% other as down. The database is
%% potentially inconsistent and we has to
%% do tell the applications about it, so
%% they may perform some clever recovery
%% action.
Msg = {inconsistent_database, Context, Node},
mnesia_lib:report_system_event(Msg),
report_inconsistency(Replies, Context, inconsistent_database);
report_inconsistency([{false, _Node} | Replies], Context, Status) ->
report_inconsistency(Replies, Context, Status);
report_inconsistency([{badrpc, _Reason} | Replies], Context, Status) ->
report_inconsistency(Replies, Context, Status);
report_inconsistency([], _Context, Status) ->
Status.
remote_node_status(Node, Status, State) ->
{ok, Nodes} = mnesia_schema:read_nodes(),
case lists:member(Node, Nodes) of
true ->
update_node_status({Node, Status}, State);
_ ->
State
end.
update_node_status({Node, down}, State = #state{remote_node_status = RNodeS}) ->
RNodeS2 = lists:ukeymerge(1, [{Node, down}], RNodeS),
State#state{remote_node_status = RNodeS2};
update_node_status({Node, up}, State = #state{remote_node_status = RNodeS}) ->
case lists:keyfind(Node, 1, RNodeS) of
{Node, down} ->
RNodeS2 = lists:ukeymerge(1, [{Node, up}], RNodeS),
State#state{remote_node_status = RNodeS2};
_ ->
State
end.
check_raise_conditon_nodeup(Node, State = #state{remote_node_status = RNodeS}) ->
case lists:keyfind(Node, 1, RNodeS) of
{Node, up} ->
self() ! {check_nodeup, Node};
_ ->
ignore
end,
State#state{remote_node_status = lists:keydelete(Node, 1, RNodeS)}.
check_mnesia_down(Node, State = #state{remote_node_status = RNodeS}) ->
%% Check if the network has been partitioned
%% due to communication failure.
HasDown = mnesia_recover:has_mnesia_down(Node),
ImRunning = mnesia_lib:is_running(),
if
%% If I'm not running the test will be made later.
HasDown == true, ImRunning == yes ->
spawn_link(?MODULE, detect_partitioned_network, [self(), Node]),
State#state{remote_node_status = lists:keydelete(Node, 1, RNodeS)};
true ->
State
end.