diff options
author | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
---|---|---|
committer | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
commit | 84adefa331c4159d432d22840663c38f155cd4c1 (patch) | |
tree | bff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/kernel/src/global_group.erl | |
download | otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2 otp-84adefa331c4159d432d22840663c38f155cd4c1.zip |
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/kernel/src/global_group.erl')
-rw-r--r-- | lib/kernel/src/global_group.erl | 1347 |
1 files changed, 1347 insertions, 0 deletions
diff --git a/lib/kernel/src/global_group.erl b/lib/kernel/src/global_group.erl new file mode 100644 index 0000000000..7e141ac5c7 --- /dev/null +++ b/lib/kernel/src/global_group.erl @@ -0,0 +1,1347 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1998-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(global_group). + +%% Groups nodes into global groups with an own global name space. + +-behaviour(gen_server). + +%% External exports +-export([start/0, start_link/0, stop/0, init/1]). +-export([handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([global_groups/0]). +-export([monitor_nodes/1]). +-export([own_nodes/0]). +-export([registered_names/1]). +-export([send/2]). +-export([send/3]). +-export([whereis_name/1]). +-export([whereis_name/2]). +-export([global_groups_changed/1]). +-export([global_groups_added/1]). +-export([global_groups_removed/1]). +-export([sync/0]). +-export([ng_add_check/2, ng_add_check/3]). + +-export([info/0]). +-export([registered_names_test/1]). +-export([send_test/2]). +-export([whereis_name_test/1]). +-export([get_own_nodes/0, get_own_nodes_with_errors/0]). +-export([publish_on_nodes/0]). + +-export([config_scan/1, config_scan/2]). + +%% Internal exports +-export([sync_init/4]). + + +-define(cc_vsn, 2). + +%%%==================================================================================== + +-type publish_type() :: 'hidden' | 'normal'. +-type sync_state() :: 'no_conf' | 'synced'. + +-type group_name() :: atom(). +-type group_tuple() :: {group_name(), [node()]} + | {group_name(), publish_type(), [node()]}. + + +%%%==================================================================================== +%%% The state of the global_group process +%%% +%%% sync_state = no_conf (global_groups not defined, inital state) | +%%% synced +%%% group_name = Own global group name +%%% nodes = Nodes in the own global group +%%% no_contact = Nodes which we haven't had contact with yet +%%% sync_error = Nodes which we haven't had contact with yet +%%% other_grps = list of other global group names and nodes, [{otherName, [Node]}] +%%% node_name = Own node +%%% monitor = List of Pids requesting nodeup/nodedown +%%%==================================================================================== + +-record(state, {sync_state = no_conf :: sync_state(), + connect_all :: boolean(), + group_name = [] :: group_name() | [], + nodes = [] :: [node()], + no_contact = [] :: [node()], + sync_error = [], + other_grps = [], + node_name = node() :: node(), + monitor = [], + publish_type = normal :: publish_type(), + group_publish_type = normal :: publish_type()}). + + +%%%==================================================================================== +%%% External exported +%%%==================================================================================== + +-spec global_groups() -> {group_name(), [group_name()]} | 'undefined'. +global_groups() -> + request(global_groups). + +-spec monitor_nodes(boolean()) -> 'ok'. +monitor_nodes(Flag) -> + case Flag of + true -> request({monitor_nodes, Flag}); + false -> request({monitor_nodes, Flag}); + _ -> {error, not_boolean} + end. + +-spec own_nodes() -> [node()]. +own_nodes() -> + request(own_nodes). + +-type name() :: atom(). +-type where() :: {'node', node()} | {'group', group_name()}. + +-spec registered_names(where()) -> [name()]. +registered_names(Arg) -> + request({registered_names, Arg}). + +-spec send(name(), term()) -> pid() | {'badarg', {name(), term()}}. +send(Name, Msg) -> + request({send, Name, Msg}). + +-spec send(where(), name(), term()) -> pid() | {'badarg', {name(), term()}}. +send(Group, Name, Msg) -> + request({send, Group, Name, Msg}). + +-spec whereis_name(name()) -> pid() | 'undefined'. +whereis_name(Name) -> + request({whereis_name, Name}). + +-spec whereis_name(where(), name()) -> pid() | 'undefined'. +whereis_name(Group, Name) -> + request({whereis_name, Group, Name}). + +global_groups_changed(NewPara) -> + request({global_groups_changed, NewPara}). + +global_groups_added(NewPara) -> + request({global_groups_added, NewPara}). + +global_groups_removed(NewPara) -> + request({global_groups_removed, NewPara}). + +-spec sync() -> 'ok'. +sync() -> + request(sync). + +ng_add_check(Node, OthersNG) -> + ng_add_check(Node, normal, OthersNG). + +ng_add_check(Node, PubType, OthersNG) -> + request({ng_add_check, Node, PubType, OthersNG}). + +-type info_item() :: {'state', sync_state()} + | {'own_group_name', group_name()} + | {'own_group_nodes', [node()]} + | {'synched_nodes', [node()]} + | {'sync_error', [node()]} + | {'no_contact', [node()]} + | {'other_groups', [group_tuple()]} + | {'monitoring', [pid()]}. + +-spec info() -> [info_item()]. +info() -> + request(info, 3000). + +%% ==== ONLY for test suites ==== +registered_names_test(Arg) -> + request({registered_names_test, Arg}). +send_test(Name, Msg) -> + request({send_test, Name, Msg}). +whereis_name_test(Name) -> + request({whereis_name_test, Name}). +%% ==== ONLY for test suites ==== + + +request(Req) -> + request(Req, infinity). + +request(Req, Time) -> + case whereis(global_group) of + P when is_pid(P) -> + gen_server:call(global_group, Req, Time); + _Other -> + {error, global_group_not_runnig} + end. + +%%%==================================================================================== +%%% gen_server start +%%% +%%% The first thing to happen is to read if the global_groups key is defined in the +%%% .config file. If not defined, the whole system is started as one global_group, +%%% and the services of global_group are superfluous. +%%% Otherwise a sync process is started to check that all nodes in the own global +%%% group have the same configuration. This is done by sending 'conf_check' to all +%%% other nodes and requiring 'conf_check_result' back. +%%% If the nodes are not in agreement of the configuration the global_group process +%%% will remove these nodes from the #state.nodes list. This can be a normal case +%%% at release upgrade when all nodes are not yet upgraded. +%%% +%%% It is possible to manually force a sync of the global_group. This is done for +%%% instance after a release upgrade, after all nodes in the group beeing upgraded. +%%% The nodes are not synced automatically because it would cause the node to be +%%% disconnected from those not yet beeing upgraded. +%%% +%%% The three process dictionary variables (registered_names, send, and whereis_name) +%%% are used to store information needed if the search process crashes. +%%% The search process is a help process to find registered names in the system. +%%%==================================================================================== +start() -> gen_server:start({local, global_group}, global_group, [], []). +start_link() -> gen_server:start_link({local, global_group}, global_group,[],[]). +stop() -> gen_server:call(global_group, stop, infinity). + +init([]) -> + process_flag(priority, max), + ok = net_kernel:monitor_nodes(true), + put(registered_names, [undefined]), + put(send, [undefined]), + put(whereis_name, [undefined]), + process_flag(trap_exit, true), + Ca = case init:get_argument(connect_all) of + {ok, [["false"]]} -> + false; + _ -> + true + end, + PT = publish_arg(), + case application:get_env(kernel, global_groups) of + undefined -> + update_publish_nodes(PT), + {ok, #state{publish_type = PT, + connect_all = Ca}}; + {ok, []} -> + update_publish_nodes(PT), + {ok, #state{publish_type = PT, + connect_all = Ca}}; + {ok, NodeGrps} -> + {DefGroupName, PubTpGrp, DefNodes, DefOther} = + case catch config_scan(NodeGrps, publish_type) of + {error, _Error2} -> + update_publish_nodes(PT), + exit({error, {'invalid global_groups definition', NodeGrps}}); + {DefGroupNameT, PubType, DefNodesT, DefOtherT} -> + update_publish_nodes(PT, {PubType, DefNodesT}), + %% First disconnect any nodes not belonging to our own group + disconnect_nodes(nodes(connected) -- DefNodesT), + lists:foreach(fun(Node) -> + erlang:monitor_node(Node, true) + end, + DefNodesT), + {DefGroupNameT, PubType, lists:delete(node(), DefNodesT), DefOtherT} + end, + {ok, #state{publish_type = PT, group_publish_type = PubTpGrp, + sync_state = synced, group_name = DefGroupName, + no_contact = lists:sort(DefNodes), + other_grps = DefOther}} + end. + + +%%%==================================================================================== +%%% sync() -> ok +%%% +%%% An operator ordered sync of the own global group. This must be done after +%%% a release upgrade. It can also be ordered if somthing has made the nodes +%%% to disagree of the global_groups definition. +%%%==================================================================================== +handle_call(sync, _From, S) -> +% io:format("~p sync ~p~n",[node(), application:get_env(kernel, global_groups)]), + case application:get_env(kernel, global_groups) of + undefined -> + update_publish_nodes(S#state.publish_type), + {reply, ok, S}; + {ok, []} -> + update_publish_nodes(S#state.publish_type), + {reply, ok, S}; + {ok, NodeGrps} -> + {DefGroupName, PubTpGrp, DefNodes, DefOther} = + case catch config_scan(NodeGrps, publish_type) of + {error, _Error2} -> + exit({error, {'invalid global_groups definition', NodeGrps}}); + {DefGroupNameT, PubType, DefNodesT, DefOtherT} -> + update_publish_nodes(S#state.publish_type, {PubType, DefNodesT}), + %% First inform global on all nodes not belonging to our own group + disconnect_nodes(nodes(connected) -- DefNodesT), + %% Sync with the nodes in the own group + kill_global_group_check(), + Pid = spawn_link(?MODULE, sync_init, + [sync, DefGroupNameT, PubType, DefNodesT]), + register(global_group_check, Pid), + {DefGroupNameT, PubType, lists:delete(node(), DefNodesT), DefOtherT} + end, + {reply, ok, S#state{sync_state = synced, group_name = DefGroupName, + no_contact = lists:sort(DefNodes), + other_grps = DefOther, group_publish_type = PubTpGrp}} + end; + + + +%%%==================================================================================== +%%% global_groups() -> {OwnGroupName, [OtherGroupName]} | undefined +%%% +%%% Get the names of the global groups +%%%==================================================================================== +handle_call(global_groups, _From, S) -> + Result = case S#state.sync_state of + no_conf -> + undefined; + synced -> + Other = lists:foldl(fun({N,_L}, Acc) -> Acc ++ [N] + end, + [], S#state.other_grps), + {S#state.group_name, Other} + end, + {reply, Result, S}; + + + +%%%==================================================================================== +%%% monitor_nodes(bool()) -> ok +%%% +%%% Monitor nodes in the own global group. +%%% True => send nodeup/nodedown to the requesting Pid +%%% False => stop sending nodeup/nodedown to the requesting Pid +%%%==================================================================================== +handle_call({monitor_nodes, Flag}, {Pid, _}, StateIn) -> +% io:format("***** handle_call ~p~n",[monitor_nodes]), + {Res, State} = monitor_nodes(Flag, Pid, StateIn), + {reply, Res, State}; + + +%%%==================================================================================== +%%% own_nodes() -> [Node] +%%% +%%% Get a list of nodes in the own global group +%%%==================================================================================== +handle_call(own_nodes, _From, S) -> + Nodes = case S#state.sync_state of + no_conf -> + [node() | nodes()]; + synced -> + get_own_nodes() +% S#state.nodes + end, + {reply, Nodes, S}; + + + +%%%==================================================================================== +%%% registered_names({node, Node}) -> [Name] | {error, ErrorMessage} +%%% registered_names({group, GlobalGroupName}) -> [Name] | {error, ErrorMessage} +%%% +%%% Get the registered names from a specified Node, or GlobalGroupName. +%%%==================================================================================== +handle_call({registered_names, {group, Group}}, _From, S) when Group =:= S#state.group_name -> + Res = global:registered_names(), + {reply, Res, S}; +handle_call({registered_names, {group, Group}}, From, S) -> + case lists:keysearch(Group, 1, S#state.other_grps) of + false -> + {reply, [], S}; + {value, {Group, []}} -> + {reply, [], S}; + {value, {Group, Nodes}} -> + Pid = global_search:start(names, {group, Nodes, From}), + Wait = get(registered_names), + put(registered_names, [{Pid, From} | Wait]), + {noreply, S} + end; +handle_call({registered_names, {node, Node}}, _From, S) when Node =:= node() -> + Res = global:registered_names(), + {reply, Res, S}; +handle_call({registered_names, {node, Node}}, From, S) -> + Pid = global_search:start(names, {node, Node, From}), +% io:format(">>>>> registered_names Pid ~p~n",[Pid]), + Wait = get(registered_names), + put(registered_names, [{Pid, From} | Wait]), + {noreply, S}; + + + +%%%==================================================================================== +%%% send(Name, Msg) -> Pid | {badarg, {Name, Msg}} +%%% send({node, Node}, Name, Msg) -> Pid | {badarg, {Name, Msg}} +%%% send({group, GlobalGroupName}, Name, Msg) -> Pid | {badarg, {Name, Msg}} +%%% +%%% Send the Msg to the specified globally registered Name in own global group, +%%% in specified Node, or GlobalGroupName. +%%% But first the receiver is to be found, the thread is continued at +%%% handle_cast(send_res) +%%%==================================================================================== +%% Search in the whole known world, but check own node first. +handle_call({send, Name, Msg}, From, S) -> + case global:whereis_name(Name) of + undefined -> + Pid = global_search:start(send, {any, S#state.other_grps, Name, Msg, From}), + Wait = get(send), + put(send, [{Pid, From, Name, Msg} | Wait]), + {noreply, S}; + Found -> + Found ! Msg, + {reply, Found, S} + end; +%% Search in the specified global group, which happens to be the own group. +handle_call({send, {group, Grp}, Name, Msg}, _From, S) when Grp =:= S#state.group_name -> + case global:whereis_name(Name) of + undefined -> + {reply, {badarg, {Name, Msg}}, S}; + Pid -> + Pid ! Msg, + {reply, Pid, S} + end; +%% Search in the specified global group. +handle_call({send, {group, Group}, Name, Msg}, From, S) -> + case lists:keysearch(Group, 1, S#state.other_grps) of + false -> + {reply, {badarg, {Name, Msg}}, S}; + {value, {Group, []}} -> + {reply, {badarg, {Name, Msg}}, S}; + {value, {Group, Nodes}} -> + Pid = global_search:start(send, {group, Nodes, Name, Msg, From}), + Wait = get(send), + put(send, [{Pid, From, Name, Msg} | Wait]), + {noreply, S} + end; +%% Search on the specified node. +handle_call({send, {node, Node}, Name, Msg}, From, S) -> + Pid = global_search:start(send, {node, Node, Name, Msg, From}), + Wait = get(send), + put(send, [{Pid, From, Name, Msg} | Wait]), + {noreply, S}; + + + +%%%==================================================================================== +%%% whereis_name(Name) -> Pid | undefined +%%% whereis_name({node, Node}, Name) -> Pid | undefined +%%% whereis_name({group, GlobalGroupName}, Name) -> Pid | undefined +%%% +%%% Get the Pid of a globally registered Name in own global group, +%%% in specified Node, or GlobalGroupName. +%%% But first the process is to be found, +%%% the thread is continued at handle_cast(find_name_res) +%%%==================================================================================== +%% Search in the whole known world, but check own node first. +handle_call({whereis_name, Name}, From, S) -> + case global:whereis_name(Name) of + undefined -> + Pid = global_search:start(whereis, {any, S#state.other_grps, Name, From}), + Wait = get(whereis_name), + put(whereis_name, [{Pid, From} | Wait]), + {noreply, S}; + Found -> + {reply, Found, S} + end; +%% Search in the specified global group, which happens to be the own group. +handle_call({whereis_name, {group, Group}, Name}, _From, S) + when Group =:= S#state.group_name -> + Res = global:whereis_name(Name), + {reply, Res, S}; +%% Search in the specified global group. +handle_call({whereis_name, {group, Group}, Name}, From, S) -> + case lists:keysearch(Group, 1, S#state.other_grps) of + false -> + {reply, undefined, S}; + {value, {Group, []}} -> + {reply, undefined, S}; + {value, {Group, Nodes}} -> + Pid = global_search:start(whereis, {group, Nodes, Name, From}), + Wait = get(whereis_name), + put(whereis_name, [{Pid, From} | Wait]), + {noreply, S} + end; +%% Search on the specified node. +handle_call({whereis_name, {node, Node}, Name}, From, S) -> + Pid = global_search:start(whereis, {node, Node, Name, From}), + Wait = get(whereis_name), + put(whereis_name, [{Pid, From} | Wait]), + {noreply, S}; + + +%%%==================================================================================== +%%% global_groups parameter changed +%%% The node is not resynced automatically because it would cause this node to +%%% be disconnected from those nodes not yet been upgraded. +%%%==================================================================================== +handle_call({global_groups_changed, NewPara}, _From, S) -> + {NewGroupName, PubTpGrp, NewNodes, NewOther} = + case catch config_scan(NewPara, publish_type) of + {error, _Error2} -> + exit({error, {'invalid global_groups definition', NewPara}}); + {DefGroupName, PubType, DefNodes, DefOther} -> + update_publish_nodes(S#state.publish_type, {PubType, DefNodes}), + {DefGroupName, PubType, DefNodes, DefOther} + end, + + %% #state.nodes is the common denominator of previous and new definition + NN = NewNodes -- (NewNodes -- S#state.nodes), + %% rest of the nodes in the new definition are marked as not yet contacted + NNC = (NewNodes -- S#state.nodes) -- S#state.sync_error, + %% remove sync_error nodes not belonging to the new group + NSE = NewNodes -- (NewNodes -- S#state.sync_error), + + %% Disconnect the connection to nodes which are not in our old global group. + %% This is done because if we already are aware of new nodes (to our global + %% group) global is not going to be synced to these nodes. We disconnect instead + %% of connect because upgrades can be done node by node and we cannot really + %% know what nodes these new nodes are synced to. The operator can always + %% manually force a sync of the nodes after all nodes beeing uppgraded. + %% We must disconnect also if some nodes to which we have a connection + %% will not be in any global group at all. + force_nodedown(nodes(connected) -- NewNodes), + + NewS = S#state{group_name = NewGroupName, + nodes = lists:sort(NN), + no_contact = lists:sort(lists:delete(node(), NNC)), + sync_error = lists:sort(NSE), + other_grps = NewOther, + group_publish_type = PubTpGrp}, + {reply, ok, NewS}; + + +%%%==================================================================================== +%%% global_groups parameter added +%%% The node is not resynced automatically because it would cause this node to +%%% be disconnected from those nodes not yet been upgraded. +%%%==================================================================================== +handle_call({global_groups_added, NewPara}, _From, S) -> +% io:format("### global_groups_changed, NewPara ~p ~n",[NewPara]), + {NewGroupName, PubTpGrp, NewNodes, NewOther} = + case catch config_scan(NewPara, publish_type) of + {error, _Error2} -> + exit({error, {'invalid global_groups definition', NewPara}}); + {DefGroupName, PubType, DefNodes, DefOther} -> + update_publish_nodes(S#state.publish_type, {PubType, DefNodes}), + {DefGroupName, PubType, DefNodes, DefOther} + end, + + %% disconnect from those nodes which are not going to be in our global group + force_nodedown(nodes(connected) -- NewNodes), + + %% Check which nodes are already updated + OwnNG = get_own_nodes(), + NGACArgs = case S#state.group_publish_type of + normal -> + [node(), OwnNG]; + _ -> + [node(), S#state.group_publish_type, OwnNG] + end, + {NN, NNC, NSE} = + lists:foldl(fun(Node, {NN_acc, NNC_acc, NSE_acc}) -> + case rpc:call(Node, global_group, ng_add_check, NGACArgs) of + {badrpc, _} -> + {NN_acc, [Node | NNC_acc], NSE_acc}; + agreed -> + {[Node | NN_acc], NNC_acc, NSE_acc}; + not_agreed -> + {NN_acc, NNC_acc, [Node | NSE_acc]} + end + end, + {[], [], []}, lists:delete(node(), NewNodes)), + NewS = S#state{sync_state = synced, group_name = NewGroupName, nodes = lists:sort(NN), + sync_error = lists:sort(NSE), no_contact = lists:sort(NNC), + other_grps = NewOther, group_publish_type = PubTpGrp}, + {reply, ok, NewS}; + + +%%%==================================================================================== +%%% global_groups parameter removed +%%%==================================================================================== +handle_call({global_groups_removed, _NewPara}, _From, S) -> +% io:format("### global_groups_removed, NewPara ~p ~n",[_NewPara]), + update_publish_nodes(S#state.publish_type), + NewS = S#state{sync_state = no_conf, group_name = [], nodes = [], + sync_error = [], no_contact = [], + other_grps = []}, + {reply, ok, NewS}; + + +%%%==================================================================================== +%%% global_groups parameter added to some other node which thinks that we +%%% belong to the same global group. +%%% It could happen that our node is not yet updated with the new node_group parameter +%%%==================================================================================== +handle_call({ng_add_check, Node, PubType, OthersNG}, _From, S) -> + %% Check which nodes are already updated + OwnNG = get_own_nodes(), + case S#state.group_publish_type =:= PubType of + true -> + case OwnNG of + OthersNG -> + NN = [Node | S#state.nodes], + NSE = lists:delete(Node, S#state.sync_error), + NNC = lists:delete(Node, S#state.no_contact), + NewS = S#state{nodes = lists:sort(NN), + sync_error = NSE, + no_contact = NNC}, + {reply, agreed, NewS}; + _ -> + {reply, not_agreed, S} + end; + _ -> + {reply, not_agreed, S} + end; + + + +%%%==================================================================================== +%%% Misceleaneous help function to read some variables +%%%==================================================================================== +handle_call(info, _From, S) -> + Reply = [{state, S#state.sync_state}, + {own_group_name, S#state.group_name}, + {own_group_nodes, get_own_nodes()}, +% {"nodes()", lists:sort(nodes())}, + {synced_nodes, S#state.nodes}, + {sync_error, S#state.sync_error}, + {no_contact, S#state.no_contact}, + {other_groups, S#state.other_grps}, + {monitoring, S#state.monitor}], + + {reply, Reply, S}; + +handle_call(get, _From, S) -> + {reply, get(), S}; + + +%%%==================================================================================== +%%% Only for test suites. These tests when the search process exits. +%%%==================================================================================== +handle_call({registered_names_test, {node, 'test3844zty'}}, From, S) -> + Pid = global_search:start(names_test, {node, 'test3844zty'}), + Wait = get(registered_names), + put(registered_names, [{Pid, From} | Wait]), + {noreply, S}; +handle_call({registered_names_test, {node, _Node}}, _From, S) -> + {reply, {error, illegal_function_call}, S}; +handle_call({send_test, Name, 'test3844zty'}, From, S) -> + Pid = global_search:start(send_test, 'test3844zty'), + Wait = get(send), + put(send, [{Pid, From, Name, 'test3844zty'} | Wait]), + {noreply, S}; +handle_call({send_test, _Name, _Msg }, _From, S) -> + {reply, {error, illegal_function_call}, S}; +handle_call({whereis_name_test, 'test3844zty'}, From, S) -> + Pid = global_search:start(whereis_test, 'test3844zty'), + Wait = get(whereis_name), + put(whereis_name, [{Pid, From} | Wait]), + {noreply, S}; +handle_call({whereis_name_test, _Name}, _From, S) -> + {reply, {error, illegal_function_call}, S}; + +handle_call(Call, _From, S) -> +% io:format("***** handle_call ~p~n",[Call]), + {reply, {illegal_message, Call}, S}. + + + + + +%%%==================================================================================== +%%% registered_names({node, Node}) -> [Name] | {error, ErrorMessage} +%%% registered_names({group, GlobalGroupName}) -> [Name] | {error, ErrorMessage} +%%% +%%% Get a list of nodes in the own global group +%%%==================================================================================== +handle_cast({registered_names, User}, S) -> +% io:format(">>>>> registered_names User ~p~n",[User]), + Res = global:registered_names(), + User ! {registered_names_res, Res}, + {noreply, S}; + +handle_cast({registered_names_res, Result, Pid, From}, S) -> +% io:format(">>>>> registered_names_res Result ~p~n",[Result]), + unlink(Pid), + exit(Pid, normal), + Wait = get(registered_names), + NewWait = lists:delete({Pid, From},Wait), + put(registered_names, NewWait), + gen_server:reply(From, Result), + {noreply, S}; + + + +%%%==================================================================================== +%%% send(Name, Msg) -> Pid | {error, ErrorMessage} +%%% send({node, Node}, Name, Msg) -> Pid | {error, ErrorMessage} +%%% send({group, GlobalGroupName}, Name, Msg) -> Pid | {error, ErrorMessage} +%%% +%%% The registered Name is found; send the message to it, kill the search process, +%%% and return to the requesting process. +%%%==================================================================================== +handle_cast({send_res, Result, Name, Msg, Pid, From}, S) -> +% io:format("~p>>>>> send_res Result ~p~n",[node(), Result]), + case Result of + {badarg,{Name, Msg}} -> + continue; + ToPid -> + ToPid ! Msg + end, + unlink(Pid), + exit(Pid, normal), + Wait = get(send), + NewWait = lists:delete({Pid, From, Name, Msg},Wait), + put(send, NewWait), + gen_server:reply(From, Result), + {noreply, S}; + + + +%%%==================================================================================== +%%% A request from a search process to check if this Name is registered at this node. +%%%==================================================================================== +handle_cast({find_name, User, Name}, S) -> + Res = global:whereis_name(Name), +% io:format(">>>>> find_name Name ~p Res ~p~n",[Name, Res]), + User ! {find_name_res, Res}, + {noreply, S}; + +%%%==================================================================================== +%%% whereis_name(Name) -> Pid | undefined +%%% whereis_name({node, Node}, Name) -> Pid | undefined +%%% whereis_name({group, GlobalGroupName}, Name) -> Pid | undefined +%%% +%%% The registered Name is found; kill the search process +%%% and return to the requesting process. +%%%==================================================================================== +handle_cast({find_name_res, Result, Pid, From}, S) -> +% io:format(">>>>> find_name_res Result ~p~n",[Result]), +% io:format(">>>>> find_name_res get() ~p~n",[get()]), + unlink(Pid), + exit(Pid, normal), + Wait = get(whereis_name), + NewWait = lists:delete({Pid, From},Wait), + put(whereis_name, NewWait), + gen_server:reply(From, Result), + {noreply, S}; + + +%%%==================================================================================== +%%% The node is synced successfully +%%%==================================================================================== +handle_cast({synced, NoContact}, S) -> +% io:format("~p>>>>> synced ~p ~n",[node(), NoContact]), + kill_global_group_check(), + Nodes = get_own_nodes() -- [node() | NoContact], + {noreply, S#state{nodes = lists:sort(Nodes), + sync_error = [], + no_contact = NoContact}}; + + +%%%==================================================================================== +%%% The node could not sync with some other nodes. +%%%==================================================================================== +handle_cast({sync_error, NoContact, ErrorNodes}, S) -> +% io:format("~p>>>>> sync_error ~p ~p ~n",[node(), NoContact, ErrorNodes]), + Txt = io_lib:format("Global group: Could not synchronize with these nodes ~p~n" + "because global_groups were not in agreement. ~n", [ErrorNodes]), + error_logger:error_report(Txt), + kill_global_group_check(), + Nodes = (get_own_nodes() -- [node() | NoContact]) -- ErrorNodes, + {noreply, S#state{nodes = lists:sort(Nodes), + sync_error = ErrorNodes, + no_contact = NoContact}}; + + +%%%==================================================================================== +%%% Another node is checking this node's group configuration +%%%==================================================================================== +handle_cast({conf_check, Vsn, Node, From, sync, CCName, CCNodes}, S) -> + handle_cast({conf_check, Vsn, Node, From, sync, CCName, normal, CCNodes}, S); + +handle_cast({conf_check, Vsn, Node, From, sync, CCName, PubType, CCNodes}, S) -> + CurNodes = S#state.nodes, +% io:format(">>>>> conf_check,sync Node ~p~n",[Node]), + %% Another node is syncing, + %% done for instance after upgrade of global_groups parameter + NS = + case application:get_env(kernel, global_groups) of + undefined -> + %% We didn't have any node_group definition + update_publish_nodes(S#state.publish_type), + disconnect_nodes([Node]), + {global_group_check, Node} ! {config_error, Vsn, From, node()}, + S; + {ok, []} -> + %% Our node_group definition was empty + update_publish_nodes(S#state.publish_type), + disconnect_nodes([Node]), + {global_group_check, Node} ! {config_error, Vsn, From, node()}, + S; + %%--------------------------------- + %% global_groups defined + %%--------------------------------- + {ok, NodeGrps} -> + case catch config_scan(NodeGrps, publish_type) of + {error, _Error2} -> + %% Our node_group definition was erroneous + disconnect_nodes([Node]), + {global_group_check, Node} ! {config_error, Vsn, From, node()}, + S#state{nodes = lists:delete(Node, CurNodes)}; + + {CCName, PubType, CCNodes, _OtherDef} -> + %% OK, add the node to the #state.nodes if it isn't there + update_publish_nodes(S#state.publish_type, {PubType, CCNodes}), + global_name_server ! {nodeup, Node}, + {global_group_check, Node} ! {config_ok, Vsn, From, node()}, + case lists:member(Node, CurNodes) of + false -> + NewNodes = lists:sort([Node | CurNodes]), + NSE = lists:delete(Node, S#state.sync_error), + NNC = lists:delete(Node, S#state.no_contact), + S#state{nodes = NewNodes, + sync_error = NSE, + no_contact = NNC}; + true -> + S + end; + _ -> + %% node_group definitions were not in agreement + disconnect_nodes([Node]), + {global_group_check, Node} ! {config_error, Vsn, From, node()}, + NN = lists:delete(Node, S#state.nodes), + NSE = lists:delete(Node, S#state.sync_error), + NNC = lists:delete(Node, S#state.no_contact), + S#state{nodes = NN, + sync_error = NSE, + no_contact = NNC} + end + end, + {noreply, NS}; + + +handle_cast(_Cast, S) -> +% io:format("***** handle_cast ~p~n",[_Cast]), + {noreply, S}. + + + +%%%==================================================================================== +%%% A node went down. If no global group configuration inform global; +%%% if global group configuration inform global only if the node is one in +%%% the own global group. +%%%==================================================================================== +handle_info({nodeup, Node}, S) when S#state.sync_state =:= no_conf -> +% io:format("~p>>>>> nodeup, Node ~p ~n",[node(), Node]), + send_monitor(S#state.monitor, {nodeup, Node}, S#state.sync_state), + global_name_server ! {nodeup, Node}, + {noreply, S}; +handle_info({nodeup, Node}, S) -> +% io:format("~p>>>>> nodeup, Node ~p ~n",[node(), Node]), + OthersNG = case S#state.sync_state of + synced -> + X = (catch rpc:call(Node, global_group, get_own_nodes, [])), + case X of + X when is_list(X) -> + lists:sort(X); + _ -> + [] + end; + no_conf -> + [] + end, + + NNC = lists:delete(Node, S#state.no_contact), + NSE = lists:delete(Node, S#state.sync_error), + OwnNG = get_own_nodes(), + case OwnNG of + OthersNG -> + send_monitor(S#state.monitor, {nodeup, Node}, S#state.sync_state), + global_name_server ! {nodeup, Node}, + case lists:member(Node, S#state.nodes) of + false -> + NN = lists:sort([Node | S#state.nodes]), + {noreply, S#state{nodes = NN, + no_contact = NNC, + sync_error = NSE}}; + true -> + {noreply, S#state{no_contact = NNC, + sync_error = NSE}} + end; + _ -> + case {lists:member(Node, get_own_nodes()), + lists:member(Node, S#state.sync_error)} of + {true, false} -> + NSE2 = lists:sort([Node | S#state.sync_error]), + {noreply, S#state{no_contact = NNC, + sync_error = NSE2}}; + _ -> + {noreply, S} + end + end; + +%%%==================================================================================== +%%% A node has crashed. +%%% nodedown must always be sent to global; this is a security measurement +%%% because during release upgrade the global_groups parameter is upgraded +%%% before the node is synced. This means that nodedown may arrive from a +%%% node which we are not aware of. +%%%==================================================================================== +handle_info({nodedown, Node}, S) when S#state.sync_state =:= no_conf -> +% io:format("~p>>>>> nodedown, no_conf Node ~p~n",[node(), Node]), + send_monitor(S#state.monitor, {nodedown, Node}, S#state.sync_state), + global_name_server ! {nodedown, Node}, + {noreply, S}; +handle_info({nodedown, Node}, S) -> +% io:format("~p>>>>> nodedown, Node ~p ~n",[node(), Node]), + send_monitor(S#state.monitor, {nodedown, Node}, S#state.sync_state), + global_name_server ! {nodedown, Node}, + NN = lists:delete(Node, S#state.nodes), + NSE = lists:delete(Node, S#state.sync_error), + NNC = case {lists:member(Node, get_own_nodes()), + lists:member(Node, S#state.no_contact)} of + {true, false} -> + [Node | S#state.no_contact]; + _ -> + S#state.no_contact + end, + {noreply, S#state{nodes = NN, no_contact = NNC, sync_error = NSE}}; + + +%%%==================================================================================== +%%% A node has changed its global_groups definition, and is telling us that we are not +%%% included in his group any more. This could happen at release upgrade. +%%%==================================================================================== +handle_info({disconnect_node, Node}, S) -> +% io:format("~p>>>>> disconnect_node Node ~p CN ~p~n",[node(), Node, S#state.nodes]), + case {S#state.sync_state, lists:member(Node, S#state.nodes)} of + {synced, true} -> + send_monitor(S#state.monitor, {nodedown, Node}, S#state.sync_state); + _ -> + cont + end, + global_name_server ! {nodedown, Node}, %% nodedown is used to inform global of the + %% disconnected node + NN = lists:delete(Node, S#state.nodes), + NNC = lists:delete(Node, S#state.no_contact), + NSE = lists:delete(Node, S#state.sync_error), + {noreply, S#state{nodes = NN, no_contact = NNC, sync_error = NSE}}; + + + + +handle_info({'EXIT', ExitPid, Reason}, S) -> + check_exit(ExitPid, Reason), + {noreply, S}; + + +handle_info(_Info, S) -> +% io:format("***** handle_info = ~p~n",[_Info]), + {noreply, S}. + + + +terminate(_Reason, _S) -> + ok. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + + + + +%%%==================================================================================== +%%% Check the global group configuration. +%%%==================================================================================== + +config_scan(NodeGrps) -> + config_scan(NodeGrps, original). + +config_scan(NodeGrps, original) -> + case config_scan(NodeGrps, publish_type) of + {DefGroupName, _, DefNodes, DefOther} -> + {DefGroupName, DefNodes, DefOther}; + Error -> + Error + end; +config_scan(NodeGrps, publish_type) -> + config_scan(node(), normal, NodeGrps, no_name, [], []). + +config_scan(_MyNode, PubType, [], Own_name, OwnNodes, OtherNodeGrps) -> + {Own_name, PubType, lists:sort(OwnNodes), lists:reverse(OtherNodeGrps)}; +config_scan(MyNode, PubType, [GrpTuple|NodeGrps], Own_name, OwnNodes, OtherNodeGrps) -> + {Name, PubTypeGroup, Nodes} = grp_tuple(GrpTuple), + case lists:member(MyNode, Nodes) of + true -> + case Own_name of + no_name -> + config_scan(MyNode, PubTypeGroup, NodeGrps, Name, Nodes, OtherNodeGrps); + _ -> + {error, {'node defined twice', {Own_name, Name}}} + end; + false -> + config_scan(MyNode,PubType,NodeGrps,Own_name,OwnNodes, + [{Name, Nodes}|OtherNodeGrps]) + end. + +grp_tuple({Name, Nodes}) -> + {Name, normal, Nodes}; +grp_tuple({Name, hidden, Nodes}) -> + {Name, hidden, Nodes}; +grp_tuple({Name, normal, Nodes}) -> + {Name, normal, Nodes}. + + +%%%==================================================================================== +%%% The special process which checks that all nodes in the own global group +%%% agrees on the configuration. +%%%==================================================================================== +sync_init(Type, Cname, PubType, Nodes) -> + {Up, Down} = sync_check_node(lists:delete(node(), Nodes), [], []), + sync_check_init(Type, Up, Cname, Nodes, Down, PubType). + +sync_check_node([], Up, Down) -> + {Up, Down}; +sync_check_node([Node|Nodes], Up, Down) -> + case net_adm:ping(Node) of + pang -> + sync_check_node(Nodes, Up, [Node|Down]); + pong -> + sync_check_node(Nodes, [Node|Up], Down) + end. + + + +%%%------------------------------------------------------------- +%%% Check that all nodes are in agreement of the global +%%% group configuration. +%%%------------------------------------------------------------- +sync_check_init(Type, Up, Cname, Nodes, Down, PubType) -> + sync_check_init(Type, Up, Cname, Nodes, 3, [], Down, PubType). + +sync_check_init(_Type, NoContact, _Cname, _Nodes, 0, ErrorNodes, Down, _PubType) -> + case ErrorNodes of + [] -> + gen_server:cast(global_group, {synced, lists:sort(NoContact ++ Down)}); + _ -> + gen_server:cast(global_group, {sync_error, lists:sort(NoContact ++ Down), + ErrorNodes}) + end, + receive + kill -> + exit(normal) + after 5000 -> + exit(normal) + end; + +sync_check_init(Type, Up, Cname, Nodes, N, ErrorNodes, Down, PubType) -> + ConfCheckMsg = case PubType of + normal -> + {conf_check, ?cc_vsn, node(), self(), Type, Cname, Nodes}; + _ -> + {conf_check, ?cc_vsn, node(), self(), Type, Cname, PubType, Nodes} + end, + lists:foreach(fun(Node) -> + gen_server:cast({global_group, Node}, ConfCheckMsg) + end, Up), + case sync_check(Up) of + {ok, synced} -> + sync_check_init(Type, [], Cname, Nodes, 0, ErrorNodes, Down, PubType); + {error, NewErrorNodes} -> + sync_check_init(Type, [], Cname, Nodes, 0, ErrorNodes ++ NewErrorNodes, Down, PubType); + {more, Rem, NewErrorNodes} -> + %% Try again to reach the global_group, + %% obviously the node is up but not the global_group process. + sync_check_init(Type, Rem, Cname, Nodes, N-1, ErrorNodes ++ NewErrorNodes, Down, PubType) + end. + +sync_check(Up) -> + sync_check(Up, Up, []). + +sync_check([], _Up, []) -> + {ok, synced}; +sync_check([], _Up, ErrorNodes) -> + {error, ErrorNodes}; +sync_check(Rem, Up, ErrorNodes) -> + receive + {config_ok, ?cc_vsn, Pid, Node} when Pid =:= self() -> + global_name_server ! {nodeup, Node}, + sync_check(Rem -- [Node], Up, ErrorNodes); + {config_error, ?cc_vsn, Pid, Node} when Pid =:= self() -> + sync_check(Rem -- [Node], Up, [Node | ErrorNodes]); + {no_global_group_configuration, ?cc_vsn, Pid, Node} when Pid =:= self() -> + sync_check(Rem -- [Node], Up, [Node | ErrorNodes]); + %% Ignore, illegal vsn or illegal Pid + _ -> + sync_check(Rem, Up, ErrorNodes) + after 2000 -> + %% Try again, the previous conf_check message + %% apparently disapared in the magic black hole. + {more, Rem, ErrorNodes} + end. + + +%%%==================================================================================== +%%% A process wants to toggle monitoring nodeup/nodedown from nodes. +%%%==================================================================================== +monitor_nodes(true, Pid, State) -> + link(Pid), + Monitor = State#state.monitor, + {ok, State#state{monitor = [Pid|Monitor]}}; +monitor_nodes(false, Pid, State) -> + Monitor = State#state.monitor, + State1 = State#state{monitor = delete_all(Pid,Monitor)}, + do_unlink(Pid, State1), + {ok, State1}; +monitor_nodes(_, _, State) -> + {error, State}. + +delete_all(From, [From |Tail]) -> delete_all(From, Tail); +delete_all(From, [H|Tail]) -> [H|delete_all(From, Tail)]; +delete_all(_, []) -> []. + +%% do unlink if we have no more references to Pid. +do_unlink(Pid, State) -> + case lists:member(Pid, State#state.monitor) of + true -> + false; + _ -> +% io:format("unlink(Pid) ~p~n",[Pid]), + unlink(Pid) + end. + + + +%%%==================================================================================== +%%% Send a nodeup/down messages to monitoring Pids in the own global group. +%%%==================================================================================== +send_monitor([P|T], M, no_conf) -> safesend_nc(P, M), send_monitor(T, M, no_conf); +send_monitor([P|T], M, SyncState) -> safesend(P, M), send_monitor(T, M, SyncState); +send_monitor([], _, _) -> ok. + +safesend(Name, {Msg, Node}) when is_atom(Name) -> + case lists:member(Node, get_own_nodes()) of + true -> + case whereis(Name) of + undefined -> + {Msg, Node}; + P when is_pid(P) -> + P ! {Msg, Node} + end; + false -> + not_own_group + end; +safesend(Pid, {Msg, Node}) -> + case lists:member(Node, get_own_nodes()) of + true -> + Pid ! {Msg, Node}; + false -> + not_own_group + end. + +safesend_nc(Name, {Msg, Node}) when is_atom(Name) -> + case whereis(Name) of + undefined -> + {Msg, Node}; + P when is_pid(P) -> + P ! {Msg, Node} + end; +safesend_nc(Pid, {Msg, Node}) -> + Pid ! {Msg, Node}. + + + + + + +%%%==================================================================================== +%%% Check which user is associated to the crashed process. +%%%==================================================================================== +check_exit(ExitPid, Reason) -> +% io:format("===EXIT=== ~p ~p ~n~p ~n~p ~n~p ~n~n",[ExitPid, Reason, get(registered_names), get(send), get(whereis_name)]), + check_exit_reg(get(registered_names), ExitPid, Reason), + check_exit_send(get(send), ExitPid, Reason), + check_exit_where(get(whereis_name), ExitPid, Reason). + + +check_exit_reg(undefined, _ExitPid, _Reason) -> + ok; +check_exit_reg(Reg, ExitPid, Reason) -> + case lists:keysearch(ExitPid, 1, lists:delete(undefined, Reg)) of + {value, {ExitPid, From}} -> + NewReg = lists:delete({ExitPid, From}, Reg), + put(registered_names, NewReg), + gen_server:reply(From, {error, Reason}); + false -> + not_found_ignored + end. + + +check_exit_send(undefined, _ExitPid, _Reason) -> + ok; +check_exit_send(Send, ExitPid, _Reason) -> + case lists:keysearch(ExitPid, 1, lists:delete(undefined, Send)) of + {value, {ExitPid, From, Name, Msg}} -> + NewSend = lists:delete({ExitPid, From, Name, Msg}, Send), + put(send, NewSend), + gen_server:reply(From, {badarg, {Name, Msg}}); + false -> + not_found_ignored + end. + + +check_exit_where(undefined, _ExitPid, _Reason) -> + ok; +check_exit_where(Where, ExitPid, Reason) -> + case lists:keysearch(ExitPid, 1, lists:delete(undefined, Where)) of + {value, {ExitPid, From}} -> + NewWhere = lists:delete({ExitPid, From}, Where), + put(whereis_name, NewWhere), + gen_server:reply(From, {error, Reason}); + false -> + not_found_ignored + end. + + + +%%%==================================================================================== +%%% Kill any possible global_group_check processes +%%%==================================================================================== +kill_global_group_check() -> + case whereis(global_group_check) of + undefined -> + ok; + Pid -> + unlink(Pid), + global_group_check ! kill, + unregister(global_group_check) + end. + + +%%%==================================================================================== +%%% Disconnect nodes not belonging to own global_groups +%%%==================================================================================== +disconnect_nodes(DisconnectNodes) -> + lists:foreach(fun(Node) -> + {global_group, Node} ! {disconnect_node, node()}, + global:node_disconnected(Node) + end, + DisconnectNodes). + + +%%%==================================================================================== +%%% Disconnect nodes not belonging to own global_groups +%%%==================================================================================== +force_nodedown(DisconnectNodes) -> + lists:foreach(fun(Node) -> + erlang:disconnect_node(Node), + global:node_disconnected(Node) + end, + DisconnectNodes). + + +%%%==================================================================================== +%%% Get the current global_groups definition +%%%==================================================================================== +get_own_nodes_with_errors() -> + case application:get_env(kernel, global_groups) of + undefined -> + {ok, all}; + {ok, []} -> + {ok, all}; + {ok, NodeGrps} -> + case catch config_scan(NodeGrps, publish_type) of + {error, Error} -> + {error, Error}; + {_, _, NodesDef, _} -> + {ok, lists:sort(NodesDef)} + end + end. + +get_own_nodes() -> + case get_own_nodes_with_errors() of + {ok, all} -> + []; + {error, _} -> + []; + {ok, Nodes} -> + Nodes + end. + +%%%==================================================================================== +%%% -hidden command line argument +%%%==================================================================================== +publish_arg() -> + case init:get_argument(hidden) of + {ok,[[]]} -> + hidden; + {ok,[["true"]]} -> + hidden; + _ -> + normal + end. + + +%%%==================================================================================== +%%% Own group publication type and nodes +%%%==================================================================================== +own_group() -> + case application:get_env(kernel, global_groups) of + undefined -> + no_group; + {ok, []} -> + no_group; + {ok, NodeGrps} -> + case catch config_scan(NodeGrps, publish_type) of + {error, _} -> + no_group; + {_, PubTpGrp, NodesDef, _} -> + {PubTpGrp, NodesDef} + end + end. + + +%%%==================================================================================== +%%% Help function which computes publication list +%%%==================================================================================== +publish_on_nodes(normal, no_group) -> + all; +publish_on_nodes(hidden, no_group) -> + []; +publish_on_nodes(normal, {normal, _}) -> + all; +publish_on_nodes(hidden, {_, Nodes}) -> + Nodes; +publish_on_nodes(_, {hidden, Nodes}) -> + Nodes. + +%%%==================================================================================== +%%% Update net_kernels publication list +%%%==================================================================================== +update_publish_nodes(PubArg) -> + update_publish_nodes(PubArg, no_group). +update_publish_nodes(PubArg, MyGroup) -> + net_kernel:update_publish_nodes(publish_on_nodes(PubArg, MyGroup)). + + +%%%==================================================================================== +%%% Fetch publication list +%%%==================================================================================== +publish_on_nodes() -> + publish_on_nodes(publish_arg(), own_group()). |