%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1998-2009. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
-module(global_group).
%% Groups nodes into global groups with an own global name space.
-behaviour(gen_server).
%% External exports
-export([start/0, start_link/0, stop/0, init/1]).
-export([handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([global_groups/0]).
-export([monitor_nodes/1]).
-export([own_nodes/0]).
-export([registered_names/1]).
-export([send/2]).
-export([send/3]).
-export([whereis_name/1]).
-export([whereis_name/2]).
-export([global_groups_changed/1]).
-export([global_groups_added/1]).
-export([global_groups_removed/1]).
-export([sync/0]).
-export([ng_add_check/2, ng_add_check/3]).
-export([info/0]).
-export([registered_names_test/1]).
-export([send_test/2]).
-export([whereis_name_test/1]).
-export([get_own_nodes/0, get_own_nodes_with_errors/0]).
-export([publish_on_nodes/0]).
-export([config_scan/1, config_scan/2]).
%% Internal exports
-export([sync_init/4]).
-define(cc_vsn, 2).
%%%====================================================================================
-type publish_type() :: 'hidden' | 'normal'.
-type sync_state() :: 'no_conf' | 'synced'.
-type group_name() :: atom().
-type group_tuple() :: {group_name(), [node()]}
| {group_name(), publish_type(), [node()]}.
%%%====================================================================================
%%% The state of the global_group process
%%%
%%% sync_state = no_conf (global_groups not defined, inital state) |
%%% synced
%%% group_name = Own global group name
%%% nodes = Nodes in the own global group
%%% no_contact = Nodes which we haven't had contact with yet
%%% sync_error = Nodes which we haven't had contact with yet
%%% other_grps = list of other global group names and nodes, [{otherName, [Node]}]
%%% node_name = Own node
%%% monitor = List of Pids requesting nodeup/nodedown
%%%====================================================================================
-record(state, {sync_state = no_conf :: sync_state(),
connect_all :: boolean(),
group_name = [] :: group_name() | [],
nodes = [] :: [node()],
no_contact = [] :: [node()],
sync_error = [],
other_grps = [],
node_name = node() :: node(),
monitor = [],
publish_type = normal :: publish_type(),
group_publish_type = normal :: publish_type()}).
%%%====================================================================================
%%% External exported
%%%====================================================================================
-spec global_groups() -> {group_name(), [group_name()]} | 'undefined'.
global_groups() ->
request(global_groups).
-spec monitor_nodes(boolean()) -> 'ok'.
monitor_nodes(Flag) ->
case Flag of
true -> request({monitor_nodes, Flag});
false -> request({monitor_nodes, Flag});
_ -> {error, not_boolean}
end.
-spec own_nodes() -> [node()].
own_nodes() ->
request(own_nodes).
-type name() :: atom().
-type where() :: {'node', node()} | {'group', group_name()}.
-spec registered_names(where()) -> [name()].
registered_names(Arg) ->
request({registered_names, Arg}).
-spec send(name(), term()) -> pid() | {'badarg', {name(), term()}}.
send(Name, Msg) ->
request({send, Name, Msg}).
-spec send(where(), name(), term()) -> pid() | {'badarg', {name(), term()}}.
send(Group, Name, Msg) ->
request({send, Group, Name, Msg}).
-spec whereis_name(name()) -> pid() | 'undefined'.
whereis_name(Name) ->
request({whereis_name, Name}).
-spec whereis_name(where(), name()) -> pid() | 'undefined'.
whereis_name(Group, Name) ->
request({whereis_name, Group, Name}).
global_groups_changed(NewPara) ->
request({global_groups_changed, NewPara}).
global_groups_added(NewPara) ->
request({global_groups_added, NewPara}).
global_groups_removed(NewPara) ->
request({global_groups_removed, NewPara}).
-spec sync() -> 'ok'.
sync() ->
request(sync).
ng_add_check(Node, OthersNG) ->
ng_add_check(Node, normal, OthersNG).
ng_add_check(Node, PubType, OthersNG) ->
request({ng_add_check, Node, PubType, OthersNG}).
-type info_item() :: {'state', sync_state()}
| {'own_group_name', group_name()}
| {'own_group_nodes', [node()]}
| {'synched_nodes', [node()]}
| {'sync_error', [node()]}
| {'no_contact', [node()]}
| {'other_groups', [group_tuple()]}
| {'monitoring', [pid()]}.
-spec info() -> [info_item()].
info() ->
request(info, 3000).
%% ==== ONLY for test suites ====
registered_names_test(Arg) ->
request({registered_names_test, Arg}).
send_test(Name, Msg) ->
request({send_test, Name, Msg}).
whereis_name_test(Name) ->
request({whereis_name_test, Name}).
%% ==== ONLY for test suites ====
request(Req) ->
request(Req, infinity).
request(Req, Time) ->
case whereis(global_group) of
P when is_pid(P) ->
gen_server:call(global_group, Req, Time);
_Other ->
{error, global_group_not_runnig}
end.
%%%====================================================================================
%%% gen_server start
%%%
%%% The first thing to happen is to read if the global_groups key is defined in the
%%% .config file. If not defined, the whole system is started as one global_group,
%%% and the services of global_group are superfluous.
%%% Otherwise a sync process is started to check that all nodes in the own global
%%% group have the same configuration. This is done by sending 'conf_check' to all
%%% other nodes and requiring 'conf_check_result' back.
%%% If the nodes are not in agreement of the configuration the global_group process
%%% will remove these nodes from the #state.nodes list. This can be a normal case
%%% at release upgrade when all nodes are not yet upgraded.
%%%
%%% It is possible to manually force a sync of the global_group. This is done for
%%% instance after a release upgrade, after all nodes in the group beeing upgraded.
%%% The nodes are not synced automatically because it would cause the node to be
%%% disconnected from those not yet beeing upgraded.
%%%
%%% The three process dictionary variables (registered_names, send, and whereis_name)
%%% are used to store information needed if the search process crashes.
%%% The search process is a help process to find registered names in the system.
%%%====================================================================================
start() -> gen_server:start({local, global_group}, global_group, [], []).
start_link() -> gen_server:start_link({local, global_group}, global_group,[],[]).
stop() -> gen_server:call(global_group, stop, infinity).
init([]) ->
process_flag(priority, max),
ok = net_kernel:monitor_nodes(true),
put(registered_names, [undefined]),
put(send, [undefined]),
put(whereis_name, [undefined]),
process_flag(trap_exit, true),
Ca = case init:get_argument(connect_all) of
{ok, [["false"]]} ->
false;
_ ->
true
end,
PT = publish_arg(),
case application:get_env(kernel, global_groups) of
undefined ->
update_publish_nodes(PT),
{ok, #state{publish_type = PT,
connect_all = Ca}};
{ok, []} ->
update_publish_nodes(PT),
{ok, #state{publish_type = PT,
connect_all = Ca}};
{ok, NodeGrps} ->
{DefGroupName, PubTpGrp, DefNodes, DefOther} =
case catch config_scan(NodeGrps, publish_type) of
{error, _Error2} ->
update_publish_nodes(PT),
exit({error, {'invalid global_groups definition', NodeGrps}});
{DefGroupNameT, PubType, DefNodesT, DefOtherT} ->
update_publish_nodes(PT, {PubType, DefNodesT}),
%% First disconnect any nodes not belonging to our own group
disconnect_nodes(nodes(connected) -- DefNodesT),
lists:foreach(fun(Node) ->
erlang:monitor_node(Node, true)
end,
DefNodesT),
{DefGroupNameT, PubType, lists:delete(node(), DefNodesT), DefOtherT}
end,
{ok, #state{publish_type = PT, group_publish_type = PubTpGrp,
sync_state = synced, group_name = DefGroupName,
no_contact = lists:sort(DefNodes),
other_grps = DefOther}}
end.
%%%====================================================================================
%%% sync() -> ok
%%%
%%% An operator ordered sync of the own global group. This must be done after
%%% a release upgrade. It can also be ordered if somthing has made the nodes
%%% to disagree of the global_groups definition.
%%%====================================================================================
handle_call(sync, _From, S) ->
% io:format("~p sync ~p~n",[node(), application:get_env(kernel, global_groups)]),
case application:get_env(kernel, global_groups) of
undefined ->
update_publish_nodes(S#state.publish_type),
{reply, ok, S};
{ok, []} ->
update_publish_nodes(S#state.publish_type),
{reply, ok, S};
{ok, NodeGrps} ->
{DefGroupName, PubTpGrp, DefNodes, DefOther} =
case catch config_scan(NodeGrps, publish_type) of
{error, _Error2} ->
exit({error, {'invalid global_groups definition', NodeGrps}});
{DefGroupNameT, PubType, DefNodesT, DefOtherT} ->
update_publish_nodes(S#state.publish_type, {PubType, DefNodesT}),
%% First inform global on all nodes not belonging to our own group
disconnect_nodes(nodes(connected) -- DefNodesT),
%% Sync with the nodes in the own group
kill_global_group_check(),
Pid = spawn_link(?MODULE, sync_init,
[sync, DefGroupNameT, PubType, DefNodesT]),
register(global_group_check, Pid),
{DefGroupNameT, PubType, lists:delete(node(), DefNodesT), DefOtherT}
end,
{reply, ok, S#state{sync_state = synced, group_name = DefGroupName,
no_contact = lists:sort(DefNodes),
other_grps = DefOther, group_publish_type = PubTpGrp}}
end;
%%%====================================================================================
%%% global_groups() -> {OwnGroupName, [OtherGroupName]} | undefined
%%%
%%% Get the names of the global groups
%%%====================================================================================
handle_call(global_groups, _From, S) ->
Result = case S#state.sync_state of
no_conf ->
undefined;
synced ->
Other = lists:foldl(fun({N,_L}, Acc) -> Acc ++ [N]
end,
[], S#state.other_grps),
{S#state.group_name, Other}
end,
{reply, Result, S};
%%%====================================================================================
%%% monitor_nodes(bool()) -> ok
%%%
%%% Monitor nodes in the own global group.
%%% True => send nodeup/nodedown to the requesting Pid
%%% False => stop sending nodeup/nodedown to the requesting Pid
%%%====================================================================================
handle_call({monitor_nodes, Flag}, {Pid, _}, StateIn) ->
% io:format("***** handle_call ~p~n",[monitor_nodes]),
{Res, State} = monitor_nodes(Flag, Pid, StateIn),
{reply, Res, State};
%%%====================================================================================
%%% own_nodes() -> [Node]
%%%
%%% Get a list of nodes in the own global group
%%%====================================================================================
handle_call(own_nodes, _From, S) ->
Nodes = case S#state.sync_state of
no_conf ->
[node() | nodes()];
synced ->
get_own_nodes()
% S#state.nodes
end,
{reply, Nodes, S};
%%%====================================================================================
%%% registered_names({node, Node}) -> [Name] | {error, ErrorMessage}
%%% registered_names({group, GlobalGroupName}) -> [Name] | {error, ErrorMessage}
%%%
%%% Get the registered names from a specified Node, or GlobalGroupName.
%%%====================================================================================
handle_call({registered_names, {group, Group}}, _From, S) when Group =:= S#state.group_name ->
Res = global:registered_names(),
{reply, Res, S};
handle_call({registered_names, {group, Group}}, From, S) ->
case lists:keysearch(Group, 1, S#state.other_grps) of
false ->
{reply, [], S};
{value, {Group, []}} ->
{reply, [], S};
{value, {Group, Nodes}} ->
Pid = global_search:start(names, {group, Nodes, From}),
Wait = get(registered_names),
put(registered_names, [{Pid, From} | Wait]),
{noreply, S}
end;
handle_call({registered_names, {node, Node}}, _From, S) when Node =:= node() ->
Res = global:registered_names(),
{reply, Res, S};
handle_call({registered_names, {node, Node}}, From, S) ->
Pid = global_search:start(names, {node, Node, From}),
% io:format(">>>>> registered_names Pid ~p~n",[Pid]),
Wait = get(registered_names),
put(registered_names, [{Pid, From} | Wait]),
{noreply, S};
%%%====================================================================================
%%% send(Name, Msg) -> Pid | {badarg, {Name, Msg}}
%%% send({node, Node}, Name, Msg) -> Pid | {badarg, {Name, Msg}}
%%% send({group, GlobalGroupName}, Name, Msg) -> Pid | {badarg, {Name, Msg}}
%%%
%%% Send the Msg to the specified globally registered Name in own global group,
%%% in specified Node, or GlobalGroupName.
%%% But first the receiver is to be found, the thread is continued at
%%% handle_cast(send_res)
%%%====================================================================================
%% Search in the whole known world, but check own node first.
handle_call({send, Name, Msg}, From, S) ->
case global:whereis_name(Name) of
undefined ->
Pid = global_search:start(send, {any, S#state.other_grps, Name, Msg, From}),
Wait = get(send),
put(send, [{Pid, From, Name, Msg} | Wait]),
{noreply, S};
Found ->
Found ! Msg,
{reply, Found, S}
end;
%% Search in the specified global group, which happens to be the own group.
handle_call({send, {group, Grp}, Name, Msg}, _From, S) when Grp =:= S#state.group_name ->
case global:whereis_name(Name) of
undefined ->
{reply, {badarg, {Name, Msg}}, S};
Pid ->
Pid ! Msg,
{reply, Pid, S}
end;
%% Search in the specified global group.
handle_call({send, {group, Group}, Name, Msg}, From, S) ->
case lists:keysearch(Group, 1, S#state.other_grps) of
false ->
{reply, {badarg, {Name, Msg}}, S};
{value, {Group, []}} ->
{reply, {badarg, {Name, Msg}}, S};
{value, {Group, Nodes}} ->
Pid = global_search:start(send, {group, Nodes, Name, Msg, From}),
Wait = get(send),
put(send, [{Pid, From, Name, Msg} | Wait]),
{noreply, S}
end;
%% Search on the specified node.
handle_call({send, {node, Node}, Name, Msg}, From, S) ->
Pid = global_search:start(send, {node, Node, Name, Msg, From}),
Wait = get(send),
put(send, [{Pid, From, Name, Msg} | Wait]),
{noreply, S};
%%%====================================================================================
%%% whereis_name(Name) -> Pid | undefined
%%% whereis_name({node, Node}, Name) -> Pid | undefined
%%% whereis_name({group, GlobalGroupName}, Name) -> Pid | undefined
%%%
%%% Get the Pid of a globally registered Name in own global group,
%%% in specified Node, or GlobalGroupName.
%%% But first the process is to be found,
%%% the thread is continued at handle_cast(find_name_res)
%%%====================================================================================
%% Search in the whole known world, but check own node first.
handle_call({whereis_name, Name}, From, S) ->
case global:whereis_name(Name) of
undefined ->
Pid = global_search:start(whereis, {any, S#state.other_grps, Name, From}),
Wait = get(whereis_name),
put(whereis_name, [{Pid, From} | Wait]),
{noreply, S};
Found ->
{reply, Found, S}
end;
%% Search in the specified global group, which happens to be the own group.
handle_call({whereis_name, {group, Group}, Name}, _From, S)
when Group =:= S#state.group_name ->
Res = global:whereis_name(Name),
{reply, Res, S};
%% Search in the specified global group.
handle_call({whereis_name, {group, Group}, Name}, From, S) ->
case lists:keysearch(Group, 1, S#state.other_grps) of
false ->
{reply, undefined, S};
{value, {Group, []}} ->
{reply, undefined, S};
{value, {Group, Nodes}} ->
Pid = global_search:start(whereis, {group, Nodes, Name, From}),
Wait = get(whereis_name),
put(whereis_name, [{Pid, From} | Wait]),
{noreply, S}
end;
%% Search on the specified node.
handle_call({whereis_name, {node, Node}, Name}, From, S) ->
Pid = global_search:start(whereis, {node, Node, Name, From}),
Wait = get(whereis_name),
put(whereis_name, [{Pid, From} | Wait]),
{noreply, S};
%%%====================================================================================
%%% global_groups parameter changed
%%% The node is not resynced automatically because it would cause this node to
%%% be disconnected from those nodes not yet been upgraded.
%%%====================================================================================
handle_call({global_groups_changed, NewPara}, _From, S) ->
{NewGroupName, PubTpGrp, NewNodes, NewOther} =
case catch config_scan(NewPara, publish_type) of
{error, _Error2} ->
exit({error, {'invalid global_groups definition', NewPara}});
{DefGroupName, PubType, DefNodes, DefOther} ->
update_publish_nodes(S#state.publish_type, {PubType, DefNodes}),
{DefGroupName, PubType, DefNodes, DefOther}
end,
%% #state.nodes is the common denominator of previous and new definition
NN = NewNodes -- (NewNodes -- S#state.nodes),
%% rest of the nodes in the new definition are marked as not yet contacted
NNC = (NewNodes -- S#state.nodes) -- S#state.sync_error,
%% remove sync_error nodes not belonging to the new group
NSE = NewNodes -- (NewNodes -- S#state.sync_error),
%% Disconnect the connection to nodes which are not in our old global group.
%% This is done because if we already are aware of new nodes (to our global
%% group) global is not going to be synced to these nodes. We disconnect instead
%% of connect because upgrades can be done node by node and we cannot really
%% know what nodes these new nodes are synced to. The operator can always
%% manually force a sync of the nodes after all nodes beeing uppgraded.
%% We must disconnect also if some nodes to which we have a connection
%% will not be in any global group at all.
force_nodedown(nodes(connected) -- NewNodes),
NewS = S#state{group_name = NewGroupName,
nodes = lists:sort(NN),
no_contact = lists:sort(lists:delete(node(), NNC)),
sync_error = lists:sort(NSE),
other_grps = NewOther,
group_publish_type = PubTpGrp},
{reply, ok, NewS};
%%%====================================================================================
%%% global_groups parameter added
%%% The node is not resynced automatically because it would cause this node to
%%% be disconnected from those nodes not yet been upgraded.
%%%====================================================================================
handle_call({global_groups_added, NewPara}, _From, S) ->
% io:format("### global_groups_changed, NewPara ~p ~n",[NewPara]),
{NewGroupName, PubTpGrp, NewNodes, NewOther} =
case catch config_scan(NewPara, publish_type) of
{error, _Error2} ->
exit({error, {'invalid global_groups definition', NewPara}});
{DefGroupName, PubType, DefNodes, DefOther} ->
update_publish_nodes(S#state.publish_type, {PubType, DefNodes}),
{DefGroupName, PubType, DefNodes, DefOther}
end,
%% disconnect from those nodes which are not going to be in our global group
force_nodedown(nodes(connected) -- NewNodes),
%% Check which nodes are already updated
OwnNG = get_own_nodes(),
NGACArgs = case S#state.group_publish_type of
normal ->
[node(), OwnNG];
_ ->
[node(), S#state.group_publish_type, OwnNG]
end,
{NN, NNC, NSE} =
lists:foldl(fun(Node, {NN_acc, NNC_acc, NSE_acc}) ->
case rpc:call(Node, global_group, ng_add_check, NGACArgs) of
{badrpc, _} ->
{NN_acc, [Node | NNC_acc], NSE_acc};
agreed ->
{[Node | NN_acc], NNC_acc, NSE_acc};
not_agreed ->
{NN_acc, NNC_acc, [Node | NSE_acc]}
end
end,
{[], [], []}, lists:delete(node(), NewNodes)),
NewS = S#state{sync_state = synced, group_name = NewGroupName, nodes = lists:sort(NN),
sync_error = lists:sort(NSE), no_contact = lists:sort(NNC),
other_grps = NewOther, group_publish_type = PubTpGrp},
{reply, ok, NewS};
%%%====================================================================================
%%% global_groups parameter removed
%%%====================================================================================
handle_call({global_groups_removed, _NewPara}, _From, S) ->
% io:format("### global_groups_removed, NewPara ~p ~n",[_NewPara]),
update_publish_nodes(S#state.publish_type),
NewS = S#state{sync_state = no_conf, group_name = [], nodes = [],
sync_error = [], no_contact = [],
other_grps = []},
{reply, ok, NewS};
%%%====================================================================================
%%% global_groups parameter added to some other node which thinks that we
%%% belong to the same global group.
%%% It could happen that our node is not yet updated with the new node_group parameter
%%%====================================================================================
handle_call({ng_add_check, Node, PubType, OthersNG}, _From, S) ->
%% Check which nodes are already updated
OwnNG = get_own_nodes(),
case S#state.group_publish_type =:= PubType of
true ->
case OwnNG of
OthersNG ->
NN = [Node | S#state.nodes],
NSE = lists:delete(Node, S#state.sync_error),
NNC = lists:delete(Node, S#state.no_contact),
NewS = S#state{nodes = lists:sort(NN),
sync_error = NSE,
no_contact = NNC},
{reply, agreed, NewS};
_ ->
{reply, not_agreed, S}
end;
_ ->
{reply, not_agreed, S}
end;
%%%====================================================================================
%%% Misceleaneous help function to read some variables
%%%====================================================================================
handle_call(info, _From, S) ->
Reply = [{state, S#state.sync_state},
{own_group_name, S#state.group_name},
{own_group_nodes, get_own_nodes()},
% {"nodes()", lists:sort(nodes())},
{synced_nodes, S#state.nodes},
{sync_error, S#state.sync_error},
{no_contact, S#state.no_contact},
{other_groups, S#state.other_grps},
{monitoring, S#state.monitor}],
{reply, Reply, S};
handle_call(get, _From, S) ->
{reply, get(), S};
%%%====================================================================================
%%% Only for test suites. These tests when the search process exits.
%%%====================================================================================
handle_call({registered_names_test, {node, 'test3844zty'}}, From, S) ->
Pid = global_search:start(names_test, {node, 'test3844zty'}),
Wait = get(registered_names),
put(registered_names, [{Pid, From} | Wait]),
{noreply, S};
handle_call({registered_names_test, {node, _Node}}, _From, S) ->
{reply, {error, illegal_function_call}, S};
handle_call({send_test, Name, 'test3844zty'}, From, S) ->
Pid = global_search:start(send_test, 'test3844zty'),
Wait = get(send),
put(send, [{Pid, From, Name, 'test3844zty'} | Wait]),
{noreply, S};
handle_call({send_test, _Name, _Msg }, _From, S) ->
{reply, {error, illegal_function_call}, S};
handle_call({whereis_name_test, 'test3844zty'}, From, S) ->
Pid = global_search:start(whereis_test, 'test3844zty'),
Wait = get(whereis_name),
put(whereis_name, [{Pid, From} | Wait]),
{noreply, S};
handle_call({whereis_name_test, _Name}, _From, S) ->
{reply, {error, illegal_function_call}, S};
handle_call(Call, _From, S) ->
% io:format("***** handle_call ~p~n",[Call]),
{reply, {illegal_message, Call}, S}.
%%%====================================================================================
%%% registered_names({node, Node}) -> [Name] | {error, ErrorMessage}
%%% registered_names({group, GlobalGroupName}) -> [Name] | {error, ErrorMessage}
%%%
%%% Get a list of nodes in the own global group
%%%====================================================================================
handle_cast({registered_names, User}, S) ->
% io:format(">>>>> registered_names User ~p~n",[User]),
Res = global:registered_names(),
User ! {registered_names_res, Res},
{noreply, S};
handle_cast({registered_names_res, Result, Pid, From}, S) ->
% io:format(">>>>> registered_names_res Result ~p~n",[Result]),
unlink(Pid),
exit(Pid, normal),
Wait = get(registered_names),
NewWait = lists:delete({Pid, From},Wait),
put(registered_names, NewWait),
gen_server:reply(From, Result),
{noreply, S};
%%%====================================================================================
%%% send(Name, Msg) -> Pid | {error, ErrorMessage}
%%% send({node, Node}, Name, Msg) -> Pid | {error, ErrorMessage}
%%% send({group, GlobalGroupName}, Name, Msg) -> Pid | {error, ErrorMessage}
%%%
%%% The registered Name is found; send the message to it, kill the search process,
%%% and return to the requesting process.
%%%====================================================================================
handle_cast({send_res, Result, Name, Msg, Pid, From}, S) ->
% io:format("~p>>>>> send_res Result ~p~n",[node(), Result]),
case Result of
{badarg,{Name, Msg}} ->
continue;
ToPid ->
ToPid ! Msg
end,
unlink(Pid),
exit(Pid, normal),
Wait = get(send),
NewWait = lists:delete({Pid, From, Name, Msg},Wait),
put(send, NewWait),
gen_server:reply(From, Result),
{noreply, S};
%%%====================================================================================
%%% A request from a search process to check if this Name is registered at this node.
%%%====================================================================================
handle_cast({find_name, User, Name}, S) ->
Res = global:whereis_name(Name),
% io:format(">>>>> find_name Name ~p Res ~p~n",[Name, Res]),
User ! {find_name_res, Res},
{noreply, S};
%%%====================================================================================
%%% whereis_name(Name) -> Pid | undefined
%%% whereis_name({node, Node}, Name) -> Pid | undefined
%%% whereis_name({group, GlobalGroupName}, Name) -> Pid | undefined
%%%
%%% The registered Name is found; kill the search process
%%% and return to the requesting process.
%%%====================================================================================
handle_cast({find_name_res, Result, Pid, From}, S) ->
% io:format(">>>>> find_name_res Result ~p~n",[Result]),
% io:format(">>>>> find_name_res get() ~p~n",[get()]),
unlink(Pid),
exit(Pid, normal),
Wait = get(whereis_name),
NewWait = lists:delete({Pid, From},Wait),
put(whereis_name, NewWait),
gen_server:reply(From, Result),
{noreply, S};
%%%====================================================================================
%%% The node is synced successfully
%%%====================================================================================
handle_cast({synced, NoContact}, S) ->
% io:format("~p>>>>> synced ~p ~n",[node(), NoContact]),
kill_global_group_check(),
Nodes = get_own_nodes() -- [node() | NoContact],
{noreply, S#state{nodes = lists:sort(Nodes),
sync_error = [],
no_contact = NoContact}};
%%%====================================================================================
%%% The node could not sync with some other nodes.
%%%====================================================================================
handle_cast({sync_error, NoContact, ErrorNodes}, S) ->
% io:format("~p>>>>> sync_error ~p ~p ~n",[node(), NoContact, ErrorNodes]),
Txt = io_lib:format("Global group: Could not synchronize with these nodes ~p~n"
"because global_groups were not in agreement. ~n", [ErrorNodes]),
error_logger:error_report(Txt),
kill_global_group_check(),
Nodes = (get_own_nodes() -- [node() | NoContact]) -- ErrorNodes,
{noreply, S#state{nodes = lists:sort(Nodes),
sync_error = ErrorNodes,
no_contact = NoContact}};
%%%====================================================================================
%%% Another node is checking this node's group configuration
%%%====================================================================================
handle_cast({conf_check, Vsn, Node, From, sync, CCName, CCNodes}, S) ->
handle_cast({conf_check, Vsn, Node, From, sync, CCName, normal, CCNodes}, S);
handle_cast({conf_check, Vsn, Node, From, sync, CCName, PubType, CCNodes}, S) ->
CurNodes = S#state.nodes,
% io:format(">>>>> conf_check,sync Node ~p~n",[Node]),
%% Another node is syncing,
%% done for instance after upgrade of global_groups parameter
NS =
case application:get_env(kernel, global_groups) of
undefined ->
%% We didn't have any node_group definition
update_publish_nodes(S#state.publish_type),
disconnect_nodes([Node]),
{global_group_check, Node} ! {config_error, Vsn, From, node()},
S;
{ok, []} ->
%% Our node_group definition was empty
update_publish_nodes(S#state.publish_type),
disconnect_nodes([Node]),
{global_group_check, Node} ! {config_error, Vsn, From, node()},
S;
%%---------------------------------
%% global_groups defined
%%---------------------------------
{ok, NodeGrps} ->
case catch config_scan(NodeGrps, publish_type) of
{error, _Error2} ->
%% Our node_group definition was erroneous
disconnect_nodes([Node]),
{global_group_check, Node} ! {config_error, Vsn, From, node()},
S#state{nodes = lists:delete(Node, CurNodes)};
{CCName, PubType, CCNodes, _OtherDef} ->
%% OK, add the node to the #state.nodes if it isn't there
update_publish_nodes(S#state.publish_type, {PubType, CCNodes}),
global_name_server ! {nodeup, Node},
{global_group_check, Node} ! {config_ok, Vsn, From, node()},
case lists:member(Node, CurNodes) of
false ->
NewNodes = lists:sort([Node | CurNodes]),
NSE = lists:delete(Node, S#state.sync_error),
NNC = lists:delete(Node, S#state.no_contact),
S#state{nodes = NewNodes,
sync_error = NSE,
no_contact = NNC};
true ->
S
end;
_ ->
%% node_group definitions were not in agreement
disconnect_nodes([Node]),
{global_group_check, Node} ! {config_error, Vsn, From, node()},
NN = lists:delete(Node, S#state.nodes),
NSE = lists:delete(Node, S#state.sync_error),
NNC = lists:delete(Node, S#state.no_contact),
S#state{nodes = NN,
sync_error = NSE,
no_contact = NNC}
end
end,
{noreply, NS};
handle_cast(_Cast, S) ->
% io:format("***** handle_cast ~p~n",[_Cast]),
{noreply, S}.
%%%====================================================================================
%%% A node went down. If no global group configuration inform global;
%%% if global group configuration inform global only if the node is one in
%%% the own global group.
%%%====================================================================================
handle_info({nodeup, Node}, S) when S#state.sync_state =:= no_conf ->
% io:format("~p>>>>> nodeup, Node ~p ~n",[node(), Node]),
send_monitor(S#state.monitor, {nodeup, Node}, S#state.sync_state),
global_name_server ! {nodeup, Node},
{noreply, S};
handle_info({nodeup, Node}, S) ->
% io:format("~p>>>>> nodeup, Node ~p ~n",[node(), Node]),
OthersNG = case S#state.sync_state of
synced ->
X = (catch rpc:call(Node, global_group, get_own_nodes, [])),
case X of
X when is_list(X) ->
lists:sort(X);
_ ->
[]
end;
no_conf ->
[]
end,
NNC = lists:delete(Node, S#state.no_contact),
NSE = lists:delete(Node, S#state.sync_error),
OwnNG = get_own_nodes(),
case OwnNG of
OthersNG ->
send_monitor(S#state.monitor, {nodeup, Node}, S#state.sync_state),
global_name_server ! {nodeup, Node},
case lists:member(Node, S#state.nodes) of
false ->
NN = lists:sort([Node | S#state.nodes]),
{noreply, S#state{nodes = NN,
no_contact = NNC,
sync_error = NSE}};
true ->
{noreply, S#state{no_contact = NNC,
sync_error = NSE}}
end;
_ ->
case {lists:member(Node, get_own_nodes()),
lists:member(Node, S#state.sync_error)} of
{true, false} ->
NSE2 = lists:sort([Node | S#state.sync_error]),
{noreply, S#state{no_contact = NNC,
sync_error = NSE2}};
_ ->
{noreply, S}
end
end;
%%%====================================================================================
%%% A node has crashed.
%%% nodedown must always be sent to global; this is a security measurement
%%% because during release upgrade the global_groups parameter is upgraded
%%% before the node is synced. This means that nodedown may arrive from a
%%% node which we are not aware of.
%%%====================================================================================
handle_info({nodedown, Node}, S) when S#state.sync_state =:= no_conf ->
% io:format("~p>>>>> nodedown, no_conf Node ~p~n",[node(), Node]),
send_monitor(S#state.monitor, {nodedown, Node}, S#state.sync_state),
global_name_server ! {nodedown, Node},
{noreply, S};
handle_info({nodedown, Node}, S) ->
% io:format("~p>>>>> nodedown, Node ~p ~n",[node(), Node]),
send_monitor(S#state.monitor, {nodedown, Node}, S#state.sync_state),
global_name_server ! {nodedown, Node},
NN = lists:delete(Node, S#state.nodes),
NSE = lists:delete(Node, S#state.sync_error),
NNC = case {lists:member(Node, get_own_nodes()),
lists:member(Node, S#state.no_contact)} of
{true, false} ->
[Node | S#state.no_contact];
_ ->
S#state.no_contact
end,
{noreply, S#state{nodes = NN, no_contact = NNC, sync_error = NSE}};
%%%====================================================================================
%%% A node has changed its global_groups definition, and is telling us that we are not
%%% included in his group any more. This could happen at release upgrade.
%%%====================================================================================
handle_info({disconnect_node, Node}, S) ->
% io:format("~p>>>>> disconnect_node Node ~p CN ~p~n",[node(), Node, S#state.nodes]),
case {S#state.sync_state, lists:member(Node, S#state.nodes)} of
{synced, true} ->
send_monitor(S#state.monitor, {nodedown, Node}, S#state.sync_state);
_ ->
cont
end,
global_name_server ! {nodedown, Node}, %% nodedown is used to inform global of the
%% disconnected node
NN = lists:delete(Node, S#state.nodes),
NNC = lists:delete(Node, S#state.no_contact),
NSE = lists:delete(Node, S#state.sync_error),
{noreply, S#state{nodes = NN, no_contact = NNC, sync_error = NSE}};
handle_info({'EXIT', ExitPid, Reason}, S) ->
check_exit(ExitPid, Reason),
{noreply, S};
handle_info(_Info, S) ->
% io:format("***** handle_info = ~p~n",[_Info]),
{noreply, S}.
terminate(_Reason, _S) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%====================================================================================
%%% Check the global group configuration.
%%%====================================================================================
config_scan(NodeGrps) ->
config_scan(NodeGrps, original).
config_scan(NodeGrps, original) ->
case config_scan(NodeGrps, publish_type) of
{DefGroupName, _, DefNodes, DefOther} ->
{DefGroupName, DefNodes, DefOther};
Error ->
Error
end;
config_scan(NodeGrps, publish_type) ->
config_scan(node(), normal, NodeGrps, no_name, [], []).
config_scan(_MyNode, PubType, [], Own_name, OwnNodes, OtherNodeGrps) ->
{Own_name, PubType, lists:sort(OwnNodes), lists:reverse(OtherNodeGrps)};
config_scan(MyNode, PubType, [GrpTuple|NodeGrps], Own_name, OwnNodes, OtherNodeGrps) ->
{Name, PubTypeGroup, Nodes} = grp_tuple(GrpTuple),
case lists:member(MyNode, Nodes) of
true ->
case Own_name of
no_name ->
config_scan(MyNode, PubTypeGroup, NodeGrps, Name, Nodes, OtherNodeGrps);
_ ->
{error, {'node defined twice', {Own_name, Name}}}
end;
false ->
config_scan(MyNode,PubType,NodeGrps,Own_name,OwnNodes,
[{Name, Nodes}|OtherNodeGrps])
end.
grp_tuple({Name, Nodes}) ->
{Name, normal, Nodes};
grp_tuple({Name, hidden, Nodes}) ->
{Name, hidden, Nodes};
grp_tuple({Name, normal, Nodes}) ->
{Name, normal, Nodes}.
%%%====================================================================================
%%% The special process which checks that all nodes in the own global group
%%% agrees on the configuration.
%%%====================================================================================
sync_init(Type, Cname, PubType, Nodes) ->
{Up, Down} = sync_check_node(lists:delete(node(), Nodes), [], []),
sync_check_init(Type, Up, Cname, Nodes, Down, PubType).
sync_check_node([], Up, Down) ->
{Up, Down};
sync_check_node([Node|Nodes], Up, Down) ->
case net_adm:ping(Node) of
pang ->
sync_check_node(Nodes, Up, [Node|Down]);
pong ->
sync_check_node(Nodes, [Node|Up], Down)
end.
%%%-------------------------------------------------------------
%%% Check that all nodes are in agreement of the global
%%% group configuration.
%%%-------------------------------------------------------------
sync_check_init(Type, Up, Cname, Nodes, Down, PubType) ->
sync_check_init(Type, Up, Cname, Nodes, 3, [], Down, PubType).
sync_check_init(_Type, NoContact, _Cname, _Nodes, 0, ErrorNodes, Down, _PubType) ->
case ErrorNodes of
[] ->
gen_server:cast(global_group, {synced, lists:sort(NoContact ++ Down)});
_ ->
gen_server:cast(global_group, {sync_error, lists:sort(NoContact ++ Down),
ErrorNodes})
end,
receive
kill ->
exit(normal)
after 5000 ->
exit(normal)
end;
sync_check_init(Type, Up, Cname, Nodes, N, ErrorNodes, Down, PubType) ->
ConfCheckMsg = case PubType of
normal ->
{conf_check, ?cc_vsn, node(), self(), Type, Cname, Nodes};
_ ->
{conf_check, ?cc_vsn, node(), self(), Type, Cname, PubType, Nodes}
end,
lists:foreach(fun(Node) ->
gen_server:cast({global_group, Node}, ConfCheckMsg)
end, Up),
case sync_check(Up) of
{ok, synced} ->
sync_check_init(Type, [], Cname, Nodes, 0, ErrorNodes, Down, PubType);
{error, NewErrorNodes} ->
sync_check_init(Type, [], Cname, Nodes, 0, ErrorNodes ++ NewErrorNodes, Down, PubType);
{more, Rem, NewErrorNodes} ->
%% Try again to reach the global_group,
%% obviously the node is up but not the global_group process.
sync_check_init(Type, Rem, Cname, Nodes, N-1, ErrorNodes ++ NewErrorNodes, Down, PubType)
end.
sync_check(Up) ->
sync_check(Up, Up, []).
sync_check([], _Up, []) ->
{ok, synced};
sync_check([], _Up, ErrorNodes) ->
{error, ErrorNodes};
sync_check(Rem, Up, ErrorNodes) ->
receive
{config_ok, ?cc_vsn, Pid, Node} when Pid =:= self() ->
global_name_server ! {nodeup, Node},
sync_check(Rem -- [Node], Up, ErrorNodes);
{config_error, ?cc_vsn, Pid, Node} when Pid =:= self() ->
sync_check(Rem -- [Node], Up, [Node | ErrorNodes]);
{no_global_group_configuration, ?cc_vsn, Pid, Node} when Pid =:= self() ->
sync_check(Rem -- [Node], Up, [Node | ErrorNodes]);
%% Ignore, illegal vsn or illegal Pid
_ ->
sync_check(Rem, Up, ErrorNodes)
after 2000 ->
%% Try again, the previous conf_check message
%% apparently disapared in the magic black hole.
{more, Rem, ErrorNodes}
end.
%%%====================================================================================
%%% A process wants to toggle monitoring nodeup/nodedown from nodes.
%%%====================================================================================
monitor_nodes(true, Pid, State) ->
link(Pid),
Monitor = State#state.monitor,
{ok, State#state{monitor = [Pid|Monitor]}};
monitor_nodes(false, Pid, State) ->
Monitor = State#state.monitor,
State1 = State#state{monitor = delete_all(Pid,Monitor)},
do_unlink(Pid, State1),
{ok, State1};
monitor_nodes(_, _, State) ->
{error, State}.
delete_all(From, [From |Tail]) -> delete_all(From, Tail);
delete_all(From, [H|Tail]) -> [H|delete_all(From, Tail)];
delete_all(_, []) -> [].
%% do unlink if we have no more references to Pid.
do_unlink(Pid, State) ->
case lists:member(Pid, State#state.monitor) of
true ->
false;
_ ->
% io:format("unlink(Pid) ~p~n",[Pid]),
unlink(Pid)
end.
%%%====================================================================================
%%% Send a nodeup/down messages to monitoring Pids in the own global group.
%%%====================================================================================
send_monitor([P|T], M, no_conf) -> safesend_nc(P, M), send_monitor(T, M, no_conf);
send_monitor([P|T], M, SyncState) -> safesend(P, M), send_monitor(T, M, SyncState);
send_monitor([], _, _) -> ok.
safesend(Name, {Msg, Node}) when is_atom(Name) ->
case lists:member(Node, get_own_nodes()) of
true ->
case whereis(Name) of
undefined ->
{Msg, Node};
P when is_pid(P) ->
P ! {Msg, Node}
end;
false ->
not_own_group
end;
safesend(Pid, {Msg, Node}) ->
case lists:member(Node, get_own_nodes()) of
true ->
Pid ! {Msg, Node};
false ->
not_own_group
end.
safesend_nc(Name, {Msg, Node}) when is_atom(Name) ->
case whereis(Name) of
undefined ->
{Msg, Node};
P when is_pid(P) ->
P ! {Msg, Node}
end;
safesend_nc(Pid, {Msg, Node}) ->
Pid ! {Msg, Node}.
%%%====================================================================================
%%% Check which user is associated to the crashed process.
%%%====================================================================================
check_exit(ExitPid, Reason) ->
% io:format("===EXIT=== ~p ~p ~n~p ~n~p ~n~p ~n~n",[ExitPid, Reason, get(registered_names), get(send), get(whereis_name)]),
check_exit_reg(get(registered_names), ExitPid, Reason),
check_exit_send(get(send), ExitPid, Reason),
check_exit_where(get(whereis_name), ExitPid, Reason).
check_exit_reg(undefined, _ExitPid, _Reason) ->
ok;
check_exit_reg(Reg, ExitPid, Reason) ->
case lists:keysearch(ExitPid, 1, lists:delete(undefined, Reg)) of
{value, {ExitPid, From}} ->
NewReg = lists:delete({ExitPid, From}, Reg),
put(registered_names, NewReg),
gen_server:reply(From, {error, Reason});
false ->
not_found_ignored
end.
check_exit_send(undefined, _ExitPid, _Reason) ->
ok;
check_exit_send(Send, ExitPid, _Reason) ->
case lists:keysearch(ExitPid, 1, lists:delete(undefined, Send)) of
{value, {ExitPid, From, Name, Msg}} ->
NewSend = lists:delete({ExitPid, From, Name, Msg}, Send),
put(send, NewSend),
gen_server:reply(From, {badarg, {Name, Msg}});
false ->
not_found_ignored
end.
check_exit_where(undefined, _ExitPid, _Reason) ->
ok;
check_exit_where(Where, ExitPid, Reason) ->
case lists:keysearch(ExitPid, 1, lists:delete(undefined, Where)) of
{value, {ExitPid, From}} ->
NewWhere = lists:delete({ExitPid, From}, Where),
put(whereis_name, NewWhere),
gen_server:reply(From, {error, Reason});
false ->
not_found_ignored
end.
%%%====================================================================================
%%% Kill any possible global_group_check processes
%%%====================================================================================
kill_global_group_check() ->
case whereis(global_group_check) of
undefined ->
ok;
Pid ->
unlink(Pid),
global_group_check ! kill,
unregister(global_group_check)
end.
%%%====================================================================================
%%% Disconnect nodes not belonging to own global_groups
%%%====================================================================================
disconnect_nodes(DisconnectNodes) ->
lists:foreach(fun(Node) ->
{global_group, Node} ! {disconnect_node, node()},
global:node_disconnected(Node)
end,
DisconnectNodes).
%%%====================================================================================
%%% Disconnect nodes not belonging to own global_groups
%%%====================================================================================
force_nodedown(DisconnectNodes) ->
lists:foreach(fun(Node) ->
erlang:disconnect_node(Node),
global:node_disconnected(Node)
end,
DisconnectNodes).
%%%====================================================================================
%%% Get the current global_groups definition
%%%====================================================================================
get_own_nodes_with_errors() ->
case application:get_env(kernel, global_groups) of
undefined ->
{ok, all};
{ok, []} ->
{ok, all};
{ok, NodeGrps} ->
case catch config_scan(NodeGrps, publish_type) of
{error, Error} ->
{error, Error};
{_, _, NodesDef, _} ->
{ok, lists:sort(NodesDef)}
end
end.
get_own_nodes() ->
case get_own_nodes_with_errors() of
{ok, all} ->
[];
{error, _} ->
[];
{ok, Nodes} ->
Nodes
end.
%%%====================================================================================
%%% -hidden command line argument
%%%====================================================================================
publish_arg() ->
case init:get_argument(hidden) of
{ok,[[]]} ->
hidden;
{ok,[["true"]]} ->
hidden;
_ ->
normal
end.
%%%====================================================================================
%%% Own group publication type and nodes
%%%====================================================================================
own_group() ->
case application:get_env(kernel, global_groups) of
undefined ->
no_group;
{ok, []} ->
no_group;
{ok, NodeGrps} ->
case catch config_scan(NodeGrps, publish_type) of
{error, _} ->
no_group;
{_, PubTpGrp, NodesDef, _} ->
{PubTpGrp, NodesDef}
end
end.
%%%====================================================================================
%%% Help function which computes publication list
%%%====================================================================================
publish_on_nodes(normal, no_group) ->
all;
publish_on_nodes(hidden, no_group) ->
[];
publish_on_nodes(normal, {normal, _}) ->
all;
publish_on_nodes(hidden, {_, Nodes}) ->
Nodes;
publish_on_nodes(_, {hidden, Nodes}) ->
Nodes.
%%%====================================================================================
%%% Update net_kernels publication list
%%%====================================================================================
update_publish_nodes(PubArg) ->
update_publish_nodes(PubArg, no_group).
update_publish_nodes(PubArg, MyGroup) ->
net_kernel:update_publish_nodes(publish_on_nodes(PubArg, MyGroup)).
%%%====================================================================================
%%% Fetch publication list
%%%====================================================================================
publish_on_nodes() ->
publish_on_nodes(publish_arg(), own_group()).