%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1996-2011. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
-module(global).
-behaviour(gen_server).
%% Global provides global registration of process names. The names are
%% dynamically kept up to date with the entire network. Global can
%% operate in two modes: in a fully connected network, or in a
%% non-fully connected network. In the latter case, the name
%% registration mechanism won't work.
%% As a separate service Global also provides global locks.
%% External exports
-export([start/0, start_link/0, stop/0, sync/0, sync/1,
whereis_name/1, register_name/2,
register_name/3, register_name_external/2, register_name_external/3,
unregister_name_external/1,re_register_name/2, re_register_name/3,
unregister_name/1, registered_names/0, send/2, node_disconnected/1,
set_lock/1, set_lock/2, set_lock/3,
del_lock/1, del_lock/2,
trans/2, trans/3, trans/4,
random_exit_name/3, random_notify_name/3, notify_all_name/3]).
%% Internal exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, resolve_it/4]).
-export([info/0]).
-include_lib("stdlib/include/ms_transform.hrl").
%% Set this variable to 'allow' to allow several names of a process.
%% This is for backward compatibility only; the functionality is broken.
-define(WARN_DUPLICATED_NAME, global_multi_name_action).
%% Undocumented Kernel variable. Set this to 0 (zero) to get the old
%% behaviour.
-define(N_CONNECT_RETRIES, global_connect_retries).
-define(DEFAULT_N_CONNECT_RETRIES, 5).
%%% In certain places in the server, calling io:format hangs everything,
%%% so we'd better use erlang:display/1.
%%% my_tracer is used in testsuites
-define(trace(_), ok).
%-define(trace(T), (catch my_tracer ! {node(), {line,?LINE}, T})).
%-define(trace(T), erlang:display({format, node(), cs(), T})).
%cs() ->
% {_Big, Small, Tiny} = now(),
% (Small rem 100) * 100 + (Tiny div 10000).
%% These are the protocol versions:
%% Vsn 1 is the original protocol.
%% Vsn 2 is enhanced with code to take care of registration of names from
%% non erlang nodes, e.g. C-nodes.
%% Vsn 3 is enhanced with a tag in the synch messages to distinguish
%% different synch sessions from each other, see OTP-2766.
%% Vsn 4 uses a single, permanent, locker process, but works like vsn 3
%% when communicating with vsn 3 nodes. (-R10B)
%% Vsn 5 uses an ordered list of self() and HisTheLocker when locking
%% nodes in the own partition. (R11B-)
%% Current version of global does not support vsn 4 or earlier.
-define(vsn, 5).
%%-----------------------------------------------------------------
%% connect_all = boolean() - true if we are supposed to set up a
%% fully connected net
%% known = [Node] - all nodes known to us
%% synced = [Node] - all nodes that have the same names as us
%% resolvers = [{Node, MyTag, Resolver}] -
%% the tag separating different synch sessions,
%% and the pid of the name resolver process
%% syncers = [pid()] - all current syncers processes
%% node_name = atom() - our node name (can change if distribution
%% is started/stopped dynamically)
%%
%% In addition to these, we keep info about messages arrived in
%% the process dictionary:
%% {pre_connect, Node} = {Vsn, InitMsg} - init_connect msgs that
%% arrived before nodeup
%% {wait_lock, Node} = {exchange, NameList, _NamelistExt} | lock_is_set
%% - see comment below (handle_cast)
%% {save_ops, Node} = {resolved, HisKnown, NamesExt, Res} | [operation()]
%% - save the ops between exchange and resolved
%% {prot_vsn, Node} = Vsn - the exchange protocol version (not used now)
%% {sync_tag_my, Node} = My tag, used at synchronization with Node
%% {sync_tag_his, Node} = The Node's tag, used at synchronization
%% {lock_id, Node} = The resource locking the partitions
%%-----------------------------------------------------------------
-record(state, {connect_all :: boolean(),
known = [] :: [node()],
synced = [] :: [node()],
resolvers = [],
syncers = [] :: [pid()],
node_name = node() :: node(),
the_locker, the_registrar, trace,
global_lock_down = false :: boolean()
}).
-type state() :: #state{}.
%%% There are also ETS tables used for bookkeeping of locks and names
%%% (the first position is the key):
%%%
%%% global_locks (set): {ResourceId, LockRequesterId, [{Pid,RPid,ref()]}
%%% Pid is locking ResourceId, ref() is the monitor ref.
%%% RPid =/= Pid if there is an extra process calling erlang:monitor().
%%% global_names (set): {Name, Pid, Method, RPid, ref()}
%%% Registered names. ref() is the monitor ref.
%%% RPid =/= Pid if there is an extra process calling erlang:monitor().
%%% global_names_ext (set): {Name, Pid, RegNode}
%%% External registered names (C-nodes).
%%% (The RPid:s can be removed when/if erlang:monitor() returns before
%%% trying to connect to the other node.)
%%%
%%% Helper tables:
%%% global_pid_names (bag): {Pid, Name} | {ref(), Name}
%%% Name(s) registered for Pid.
%%% There is one {Pid, Name} and one {ref(), Name} for every Pid.
%%% ref() is the same ref() as in global_names.
%%% global_pid_ids (bag): {Pid, ResourceId} | {ref(), ResourceId}
%%% Resources locked by Pid.
%%% ref() is the same ref() as in global_locks.
%%%
%%% global_pid_names is a 'bag' for backward compatibility.
%%% (Before vsn 5 more than one name could be registered for a process.)
%%%
%%% R11B-3 (OTP-6341): The list of pids in the table 'global_locks'
%%% was replaced by a list of {Pid, Ref}, where Ref is a monitor ref.
%%% It was necessary to use monitors to fix bugs regarding locks that
%%% were never removed. The signal {async_del_lock, ...} has been
%%% kept for backward compatibility. It can be removed later.
%%%
%%% R11B-4 (OTP-6428): Monitors are used for registered names.
%%% The signal {delete_name, ...} has been kept for backward compatibility.
%%% It can be removed later as can the deleter process.
%%% An extra process calling erlang:monitor() is sometimes created.
%%% The new_nodes messages has been augmented with the global lock id.
%%%
%%% R14A (OTP-8527): The deleter process has been removed.
start() ->
gen_server:start({local, global_name_server}, ?MODULE, [], []).
start_link() ->
gen_server:start_link({local, global_name_server}, ?MODULE, [], []).
stop() ->
gen_server:call(global_name_server, stop, infinity).
-spec sync() -> 'ok' | {'error', Reason :: term()}.
sync() ->
case check_sync_nodes() of
{error, _} = Error ->
Error;
SyncNodes ->
gen_server:call(global_name_server, {sync, SyncNodes}, infinity)
end.
-spec sync([node()]) -> 'ok' | {'error', Reason :: term()}.
sync(Nodes) ->
case check_sync_nodes(Nodes) of
{error, _} = Error ->
Error;
SyncNodes ->
gen_server:call(global_name_server, {sync, SyncNodes}, infinity)
end.
-spec send(Name, Msg) -> Pid when
Name :: term(),
Msg :: term(),
Pid :: pid().
send(Name, Msg) ->
case whereis_name(Name) of
Pid when is_pid(Pid) ->
Pid ! Msg,
Pid;
undefined ->
exit({badarg, {Name, Msg}})
end.
%% See OTP-3737.
-spec whereis_name(Name) -> pid() | 'undefined' when
Name :: term().
whereis_name(Name) ->
where(Name).
node_disconnected(Node) ->
global_name_server ! {nodedown, Node}.
%%-----------------------------------------------------------------
%% Method = function(Name, Pid1, Pid2) -> Pid | Pid2 | none
%% Method is called if a name conflict is detected when two nodes
%% are connecting to each other. It is supposed to return one of
%% the Pids or 'none'. If a pid is returned, that pid is
%% registered as Name on all nodes. If 'none' is returned, the
%% Name is unregistered on all nodes. If anything else is returned,
%% the Name is unregistered as well.
%% Method is called once at one of the nodes where the processes reside
%% only. If different Methods are used for the same name, it is
%% undefined which one of them is used.
%% Method blocks the name registration, but does not affect global locking.
%%-----------------------------------------------------------------
-spec register_name(Name, Pid) -> 'yes' | 'no' when
Name :: term(),
Pid :: pid().
register_name(Name, Pid) when is_pid(Pid) ->
register_name(Name, Pid, fun random_exit_name/3).
-type method() :: fun((Name :: term(), Pid :: pid(), Pid2 :: pid()) ->
pid() | 'none').
-spec register_name(Name, Pid, Resolve) -> 'yes' | 'no' when
Name :: term(),
Pid :: pid(),
Resolve :: method().
register_name(Name, Pid, Method) when is_pid(Pid) ->
Fun = fun(Nodes) ->
case (where(Name) =:= undefined) andalso check_dupname(Name, Pid) of
true ->
gen_server:multi_call(Nodes,
global_name_server,
{register, Name, Pid, Method}),
yes;
_ ->
no
end
end,
?trace({register_name, self(), Name, Pid, Method}),
gen_server:call(global_name_server, {registrar, Fun}, infinity).
check_dupname(Name, Pid) ->
case ets:lookup(global_pid_names, Pid) of
[] ->
true;
PidNames ->
case application:get_env(kernel, ?WARN_DUPLICATED_NAME) of
{ok, allow} ->
true;
_ ->
S = "global: ~w registered under several names: ~w\n",
Names = [Name | [Name1 || {_Pid, Name1} <- PidNames]],
error_logger:error_msg(S, [Pid, Names]),
false
end
end.
-spec unregister_name(Name) -> _ when
Name :: term().
unregister_name(Name) ->
case where(Name) of
undefined ->
ok;
_ ->
Fun = fun(Nodes) ->
gen_server:multi_call(Nodes,
global_name_server,
{unregister, Name}),
ok
end,
?trace({unregister_name, self(), Name}),
gen_server:call(global_name_server, {registrar, Fun}, infinity)
end.
-spec re_register_name(Name, Pid) -> _ when
Name :: term(),
Pid :: pid().
re_register_name(Name, Pid) when is_pid(Pid) ->
re_register_name(Name, Pid, fun random_exit_name/3).
-spec re_register_name(Name, Pid, Resolve) -> _ when
Name :: term(),
Pid :: pid(),
Resolve :: method().
re_register_name(Name, Pid, Method) when is_pid(Pid) ->
Fun = fun(Nodes) ->
gen_server:multi_call(Nodes,
global_name_server,
{register, Name, Pid, Method}),
yes
end,
?trace({re_register_name, self(), Name, Pid, Method}),
gen_server:call(global_name_server, {registrar, Fun}, infinity).
-spec registered_names() -> [Name] when
Name :: term().
registered_names() ->
MS = ets:fun2ms(fun({Name,_Pid,_M,_RP,_R}) -> Name end),
ets:select(global_names, MS).
%%-----------------------------------------------------------------
%% The external node (e.g. a C-node) registers the name on an Erlang
%% node which links to the process (an Erlang node has to be used
%% since there is no global_name_server on the C-node). If the Erlang
%% node dies the name is to be unregistered on all nodes. Normally
%% node(Pid) is compared to the node that died, but that does not work
%% for external nodes (the process does not run on the Erlang node
%% that died). Therefore a table of all names registered by external
%% nodes is kept up-to-date on all nodes.
%%
%% Note: if the Erlang node dies an EXIT signal is also sent to the
%% C-node due to the link between the global_name_server and the
%% registered process. [This is why the link has been kept despite
%% the fact that monitors do the job now.]
%%-----------------------------------------------------------------
register_name_external(Name, Pid) when is_pid(Pid) ->
register_name_external(Name, Pid, fun random_exit_name/3).
register_name_external(Name, Pid, Method) when is_pid(Pid) ->
Fun = fun(Nodes) ->
case where(Name) of
undefined ->
gen_server:multi_call(Nodes,
global_name_server,
{register_ext, Name, Pid,
Method, node()}),
yes;
_Pid -> no
end
end,
?trace({register_name_external, self(), Name, Pid, Method}),
gen_server:call(global_name_server, {registrar, Fun}, infinity).
unregister_name_external(Name) ->
unregister_name(Name).
-type id() :: {ResourceId :: term(), LockRequesterId :: term()}.
-spec set_lock(Id) -> boolean() when
Id :: id().
set_lock(Id) ->
set_lock(Id, [node() | nodes()], infinity, 1).
-type retries() :: non_neg_integer() | 'infinity'.
-spec set_lock(Id, Nodes) -> boolean() when
Id :: id(),
Nodes :: [node()].
set_lock(Id, Nodes) ->
set_lock(Id, Nodes, infinity, 1).
-spec set_lock(Id, Nodes, Retries) -> boolean() when
Id :: id(),
Nodes :: [node()],
Retries :: retries().
set_lock(Id, Nodes, Retries) when is_integer(Retries), Retries >= 0 ->
set_lock(Id, Nodes, Retries, 1);
set_lock(Id, Nodes, infinity) ->
set_lock(Id, Nodes, infinity, 1).
set_lock({_ResourceId, _LockRequesterId}, [], _Retries, _Times) ->
true;
set_lock({_ResourceId, _LockRequesterId} = Id, Nodes, Retries, Times) ->
?trace({set_lock,{me,self()},Id,{nodes,Nodes},
{retries,Retries}, {times,Times}}),
case set_lock_on_nodes(Id, Nodes) of
true ->
?trace({set_lock_true, Id}),
true;
false=Reply when Retries =:= 0 ->
Reply;
false ->
random_sleep(Times),
set_lock(Id, Nodes, dec(Retries), Times+1)
end.
-spec del_lock(Id) -> 'true' when
Id :: id().
del_lock(Id) ->
del_lock(Id, [node() | nodes()]).
-spec del_lock(Id, Nodes) -> 'true' when
Id :: id(),
Nodes :: [node()].
del_lock({_ResourceId, _LockRequesterId} = Id, Nodes) ->
?trace({del_lock, {me,self()}, Id, {nodes,Nodes}}),
gen_server:multi_call(Nodes, global_name_server, {del_lock, Id}),
true.
-type trans_fun() :: function() | {module(), atom()}.
-spec trans(Id, Fun) -> Res | aborted when
Id :: id(),
Fun :: trans_fun(),
Res :: term().
trans(Id, Fun) -> trans(Id, Fun, [node() | nodes()], infinity).
-spec trans(Id, Fun, Nodes) -> Res | aborted when
Id :: id(),
Fun :: trans_fun(),
Nodes :: [node()],
Res :: term().
trans(Id, Fun, Nodes) -> trans(Id, Fun, Nodes, infinity).
-spec trans(Id, Fun, Nodes, Retries) -> Res | aborted when
Id :: id(),
Fun :: trans_fun(),
Nodes :: [node()],
Retries :: retries(),
Res :: term().
trans(Id, Fun, Nodes, Retries) ->
case set_lock(Id, Nodes, Retries) of
true ->
try
Fun()
after
del_lock(Id, Nodes)
end;
false ->
aborted
end.
info() ->
gen_server:call(global_name_server, info, infinity).
%%%-----------------------------------------------------------------
%%% Call-back functions from gen_server
%%%-----------------------------------------------------------------
-spec init([]) -> {'ok', state()}.
init([]) ->
process_flag(trap_exit, true),
_ = ets:new(global_locks, [set, named_table, protected]),
_ = ets:new(global_names, [set, named_table, protected]),
_ = ets:new(global_names_ext, [set, named_table, protected]),
_ = ets:new(global_pid_names, [bag, named_table, protected]),
_ = ets:new(global_pid_ids, [bag, named_table, protected]),
%% This is for troubleshooting only.
DoTrace = os:getenv("GLOBAL_HIGH_LEVEL_TRACE") =:= "TRUE",
T0 = case DoTrace of
true ->
send_high_level_trace(),
[];
false ->
no_trace
end,
S = #state{the_locker = start_the_locker(DoTrace),
trace = T0,
the_registrar = start_the_registrar()},
S1 = trace_message(S, {init, node()}, []),
case init:get_argument(connect_all) of
{ok, [["false"]]} ->
{ok, S1#state{connect_all = false}};
_ ->
{ok, S1#state{connect_all = true}}
end.
%%-----------------------------------------------------------------
%% Connection algorithm
%% ====================
%% This algorithm solves the problem with partitioned nets as well.
%%
%% The main idea in the algorithm is that when two nodes connect, they
%% try to set a lock in their own partition (i.e. all nodes already
%% known to them; partitions are not necessarily disjoint). When the
%% lock is set in each partition, these two nodes send each other a
%% list with all registered names in resp partition (*). If no conflict
%% is found, the name tables are just updated. If a conflict is found,
%% a resolve function is called once for each conflict. The result of
%% the resolving is sent to the other node. When the names are
%% exchanged, all other nodes in each partition are informed of the
%% other nodes, and they ping each other to form a fully connected
%% net.
%%
%% A few remarks:
%%
%% (*) When this information is being exchanged, no one is allowed to
%% change the global register table. All calls to register etc are
%% protected by a lock. If a registered process dies during this
%% phase the name is unregistered on the local node immediately,
%% but the unregistration on other nodes will take place when the
%% deleter manages to acquire the lock. This is necessary to
%% prevent names from spreading to nodes where they cannot be
%% deleted.
%%
%% - It is assumed that nodeups and nodedowns arrive in an orderly
%% fashion: for every node, nodeup is followed by nodedown, and vice
%% versa. "Double" nodeups and nodedowns must never occur. It is
%% the responsibility of net_kernel to assure this.
%%
%% - There is always a delay between the termination of a registered
%% process and the removal of the name from Global's tables. This
%% delay can sometimes be quite substantial. Global guarantees that
%% the name will eventually be removed, but there is no
%% synchronization between nodes; the name can be removed from some
%% node(s) long before it is removed from other nodes.
%%
%% - Global cannot handle problems with the distribution very well.
%% Depending on the value of the kernel variable 'net_ticktime' long
%% delays may occur. This does not affect the handling of locks but
%% will block name registration.
%%
%% - Old synch session messages may linger on in the message queue of
%% global_name_server after the sending node has died. The tags of
%% such messages do not match the current tag (if there is one),
%% which makes it possible to discard those messages and cancel the
%% corresponding lock.
%%
%% Suppose nodes A and B connect, and C is connected to A.
%% Here's the algorithm's flow:
%%
%% Node A
%% ------
%% << {nodeup, B}
%% TheLocker ! {nodeup, ..., Node, ...} (there is one locker per node)
%% B ! {init_connect, ..., {..., TheLockerAtA, ...}}
%% << {init_connect, TheLockerAtB}
%% [The lockers try to set the lock]
%% << {lock_is_set, B, ...}
%% [Now, lock is set in both partitions]
%% B ! {exchange, A, Names, ...}
%% << {exchange, B, Names, ...}
%% [solve conflict]
%% B ! {resolved, A, ResolvedA, KnownAtA, ...}
%% << {resolved, B, ResolvedB, KnownAtB, ...}
%% C ! {new_nodes, ResolvedAandB, [B]}
%%
%% Node C
%% ------
%% << {new_nodes, ResolvedOps, NewNodes}
%% [insert Ops]
%% ping(NewNodes)
%% << {nodeup, B}
%% <ignore this one>
%%
%% Several things can disturb this picture.
%%
%% First, the init_connect message may arrive _before_ the nodeup
%% message due to delay in net_kernel. We handle this by keeping track
%% of these messages in the pre_connect variable in our state.
%%
%% Of course we must handle that some node goes down during the
%% connection.
%%
%%-----------------------------------------------------------------
%% Messages in the protocol
%% ========================
%% 1. Between global_name_servers on connecting nodes
%% {init_connect, Vsn, Node, InitMsg}
%% InitMsg = {locker, _Unused, HisKnown, HisTheLocker}
%% {exchange, Node, ListOfNames, _ListOfNamesExt, Tag}
%% {resolved, Node, HisOps, HisKnown, _Unused, ListOfNamesExt, Tag}
%% HisKnown = list of known nodes in Node's partition
%% 2. Between lockers on connecting nodes
%% {his_locker, Pid} (from our global)
%% {lock, Bool} loop until both lockers have lock = true,
%% then send to global_name_server {lock_is_set, Node, Tag}
%% 3. Connecting node's global_name_server informs other nodes in the same
%% partition about hitherto unknown nodes in the other partition
%% {new_nodes, Node, Ops, ListOfNamesExt, NewNodes, ExtraInfo}
%% 4. Between global_name_server and resolver
%% {resolve, NameList, Node} to resolver
%% {exchange_ops, Node, Tag, Ops, Resolved} from resolver
%% 5. sync protocol, between global_name_servers in different partitions
%% {in_sync, Node, IsKnown}
%% sent by each node to all new nodes (Node becomes known to them)
%%-----------------------------------------------------------------
-spec handle_call(term(), {pid(), term()}, state()) ->
{'noreply', state()} |
{'reply', term(), state()} |
{'stop', 'normal', 'stopped', state()}.
handle_call({registrar, Fun}, From, S) ->
S#state.the_registrar ! {trans_all_known, Fun, From},
{noreply, S};
%% The pattern {register,'_','_','_'} is traced by the inviso
%% application. Do not change.
handle_call({register, Name, Pid, Method}, {FromPid, _Tag}, S0) ->
S = ins_name(Name, Pid, Method, FromPid, [], S0),
{reply, yes, S};
handle_call({unregister, Name}, _From, S0) ->
S = delete_global_name2(Name, S0),
{reply, ok, S};
handle_call({register_ext, Name, Pid, Method, RegNode}, {FromPid,_Tag}, S0) ->
S = ins_name_ext(Name, Pid, Method, RegNode, FromPid, [], S0),
{reply, yes, S};
handle_call({set_lock, Lock}, {Pid, _Tag}, S0) ->
{Reply, S} = handle_set_lock(Lock, Pid, S0),
{reply, Reply, S};
handle_call({del_lock, Lock}, {Pid, _Tag}, S0) ->
S = handle_del_lock(Lock, Pid, S0),
{reply, true, S};
handle_call(get_known, _From, S) ->
{reply, S#state.known, S};
handle_call(get_synced, _From, S) ->
{reply, S#state.synced, S};
handle_call({sync, Nodes}, From, S) ->
%% If we have several global groups, this won't work, since we will
%% do start_sync on a nonempty list of nodes even if the system
%% is quiet.
Pid = start_sync(lists:delete(node(), Nodes) -- S#state.synced, From),
{noreply, S#state{syncers = [Pid | S#state.syncers]}};
handle_call(get_protocol_version, _From, S) ->
{reply, ?vsn, S};
handle_call(get_names_ext, _From, S) ->
{reply, get_names_ext(), S};
handle_call(info, _From, S) ->
{reply, S, S};
%% "High level trace". For troubleshooting only.
handle_call(high_level_trace_start, _From, S) ->
S#state.the_locker ! {do_trace, true},
send_high_level_trace(),
{reply, ok, trace_message(S#state{trace = []}, {init, node()}, [])};
handle_call(high_level_trace_stop, _From, S) ->
#state{the_locker = TheLocker, trace = Trace} = S,
TheLocker ! {do_trace, false},
wait_high_level_trace(),
{reply, Trace, S#state{trace = no_trace}};
handle_call(high_level_trace_get, _From, #state{trace = Trace}=S) ->
{reply, Trace, S#state{trace = []}};
handle_call(stop, _From, S) ->
{stop, normal, stopped, S};
handle_call(Request, From, S) ->
error_logger:warning_msg("The global_name_server "
"received an unexpected message:\n"
"handle_call(~p, ~p, _)\n",
[Request, From]),
{noreply, S}.
%%========================================================================
%% init_connect
%%
%%========================================================================
-spec handle_cast(term(), state()) -> {'noreply', state()}.
handle_cast({init_connect, Vsn, Node, InitMsg}, S) ->
%% Sent from global_name_server at Node.
?trace({'####', init_connect, {vsn, Vsn}, {node,Node},{initmsg,InitMsg}}),
case Vsn of
%% It is always the responsibility of newer versions to understand
%% older versions of the protocol.
{HisVsn, HisTag} when HisVsn > ?vsn ->
init_connect(?vsn, Node, InitMsg, HisTag, S#state.resolvers, S);
{HisVsn, HisTag} ->
init_connect(HisVsn, Node, InitMsg, HisTag, S#state.resolvers, S);
%% To be future compatible
Tuple when is_tuple(Tuple) ->
List = tuple_to_list(Tuple),
[_HisVsn, HisTag | _] = List,
%% use own version handling if his is newer.
init_connect(?vsn, Node, InitMsg, HisTag, S#state.resolvers, S);
_ ->
Txt = io_lib:format("Illegal global protocol version ~p Node: ~p\n",
[Vsn, Node]),
error_logger:info_report(lists:flatten(Txt))
end,
{noreply, S};
%%=======================================================================
%% lock_is_set
%%
%% Ok, the lock is now set on both partitions. Send our names to other node.
%%=======================================================================
handle_cast({lock_is_set, Node, MyTag, LockId}, S) ->
%% Sent from the_locker at node().
?trace({'####', lock_is_set , {node,Node}}),
case get({sync_tag_my, Node}) of
MyTag ->
lock_is_set(Node, S#state.resolvers, LockId),
{noreply, S};
_ -> %% Illegal tag, delete the old sync session.
NewS = cancel_locker(Node, S, MyTag),
{noreply, NewS}
end;
%%========================================================================
%% exchange
%%
%% Here the names are checked to detect name clashes.
%%========================================================================
handle_cast({exchange, Node, NameList, _NameExtList, MyTag}, S) ->
%% Sent from global_name_server at Node.
case get({sync_tag_my, Node}) of
MyTag ->
exchange(Node, NameList, S#state.resolvers),
{noreply, S};
_ -> %% Illegal tag, delete the old sync session.
NewS = cancel_locker(Node, S, MyTag),
{noreply, NewS}
end;
%% {exchange_ops, ...} is sent by the resolver process (which then
%% dies). It could happen that {resolved, ...} has already arrived
%% from the other node. In that case we can go ahead and run the
%% resolve operations. Otherwise we have to save the operations and
%% wait for {resolve, ...}. This is very much like {lock_is_set, ...}
%% and {exchange, ...}.
handle_cast({exchange_ops, Node, MyTag, Ops, Resolved}, S0) ->
%% Sent from the resolver for Node at node().
?trace({exchange_ops, {node,Node}, {ops,Ops},{resolved,Resolved},
{mytag,MyTag}}),
S = trace_message(S0, {exit_resolver, Node}, [MyTag]),
case get({sync_tag_my, Node}) of
MyTag ->
Known = S#state.known,
gen_server:cast({global_name_server, Node},
{resolved, node(), Resolved, Known,
Known,get_names_ext(),get({sync_tag_his,Node})}),
case get({save_ops, Node}) of
{resolved, HisKnown, Names_ext, HisResolved} ->
put({save_ops, Node}, Ops),
NewS = resolved(Node, HisResolved, HisKnown, Names_ext,S),
{noreply, NewS};
undefined ->
put({save_ops, Node}, Ops),
{noreply, S}
end;
_ -> %% Illegal tag, delete the old sync session.
NewS = cancel_locker(Node, S, MyTag),
{noreply, NewS}
end;
%%========================================================================
%% resolved
%%
%% Here the name clashes are resolved.
%%========================================================================
handle_cast({resolved, Node, HisResolved, HisKnown, _HisKnown_v2,
Names_ext, MyTag}, S) ->
%% Sent from global_name_server at Node.
?trace({'####', resolved, {his_resolved,HisResolved}, {node,Node}}),
case get({sync_tag_my, Node}) of
MyTag ->
%% See the comment at handle_case({exchange_ops, ...}).
case get({save_ops, Node}) of
Ops when is_list(Ops) ->
NewS = resolved(Node, HisResolved, HisKnown, Names_ext, S),
{noreply, NewS};
undefined ->
Resolved = {resolved, HisKnown, Names_ext, HisResolved},
put({save_ops, Node}, Resolved),
{noreply, S}
end;
_ -> %% Illegal tag, delete the old sync session.
NewS = cancel_locker(Node, S, MyTag),
{noreply, NewS}
end;
%%========================================================================
%% new_nodes
%%
%% We get to know the other node's known nodes.
%%========================================================================
handle_cast({new_nodes, Node, Ops, Names_ext, Nodes, ExtraInfo}, S) ->
%% Sent from global_name_server at Node.
?trace({new_nodes, {node,Node},{ops,Ops},{nodes,Nodes},{x,ExtraInfo}}),
NewS = new_nodes(Ops, Node, Names_ext, Nodes, ExtraInfo, S),
{noreply, NewS};
%%========================================================================
%% in_sync
%%
%% We are in sync with this node (from the other node's known world).
%%========================================================================
handle_cast({in_sync, Node, _IsKnown}, S) ->
%% Sent from global_name_server at Node (in the other partition).
?trace({'####', in_sync, {Node, _IsKnown}}),
lists:foreach(fun(Pid) -> Pid ! {synced, [Node]} end, S#state.syncers),
NewS = cancel_locker(Node, S, get({sync_tag_my, Node})),
reset_node_state(Node),
NSynced = case lists:member(Node, Synced = NewS#state.synced) of
true -> Synced;
false -> [Node | Synced]
end,
{noreply, NewS#state{synced = NSynced}};
%% Called when Pid on other node crashed
handle_cast({async_del_name, _Name, _Pid}, S) ->
%% Sent from the_deleter at some node in the partition but node() (-R13B)
%% The DOWN message deletes the name.
%% R14A nodes and later do not send async_del_name messages.
{noreply, S};
handle_cast({async_del_lock, _ResourceId, _Pid}, S) ->
%% Sent from global_name_server at some node in the partition but
%% node(). (-R13B)
%% The DOWN message deletes the lock.
%% R14A nodes and later do not send async_del_lock messages.
{noreply, S};
handle_cast(Request, S) ->
error_logger:warning_msg("The global_name_server "
"received an unexpected message:\n"
"handle_cast(~p, _)\n", [Request]),
{noreply, S}.
%%========================================================================
-spec handle_info(term(), state()) ->
{'noreply', state()} | {'stop', term(), state()}.
handle_info({'EXIT', Locker, _Reason}=Exit, #state{the_locker=Locker}=S) ->
{stop, {locker_died,Exit}, S#state{the_locker=undefined}};
handle_info({'EXIT', Registrar, _}=Exit, #state{the_registrar=Registrar}=S) ->
{stop, {registrar_died,Exit}, S#state{the_registrar=undefined}};
handle_info({'EXIT', Pid, _Reason}, S) when is_pid(Pid) ->
?trace({global_EXIT,_Reason,Pid}),
%% The process that died was a synch process started by start_sync
%% or a registered process running on an external node (C-node).
%% Links to external names are ignored here (there are also DOWN
%% signals).
Syncers = lists:delete(Pid, S#state.syncers),
{noreply, S#state{syncers = Syncers}};
handle_info({nodedown, Node}, S) when Node =:= S#state.node_name ->
%% Somebody stopped the distribution dynamically - change
%% references to old node name (Node) to new node name ('nonode@nohost')
{noreply, change_our_node_name(node(), S)};
handle_info({nodedown, Node}, S0) ->
?trace({'####', nodedown, {node,Node}}),
S1 = trace_message(S0, {nodedown, Node}, []),
S = handle_nodedown(Node, S1),
{noreply, S};
handle_info({extra_nodedown, Node}, S0) ->
?trace({'####', extra_nodedown, {node,Node}}),
S1 = trace_message(S0, {extra_nodedown, Node}, []),
S = handle_nodedown(Node, S1),
{noreply, S};
handle_info({nodeup, Node}, S) when Node =:= node() ->
?trace({'####', local_nodeup, {node, Node}}),
%% Somebody started the distribution dynamically - change
%% references to old node name ('nonode@nohost') to Node.
{noreply, change_our_node_name(Node, S)};
handle_info({nodeup, _Node}, S) when not S#state.connect_all ->
{noreply, S};
handle_info({nodeup, Node}, S0) when S0#state.connect_all ->
IsKnown = lists:member(Node, S0#state.known) or
%% This one is only for double nodeups (shouldn't occur!)
lists:keymember(Node, 1, S0#state.resolvers),
?trace({'####', nodeup, {node,Node}, {isknown,IsKnown}}),
S1 = trace_message(S0, {nodeup, Node}, []),
case IsKnown of
true ->
{noreply, S1};
false ->
resend_pre_connect(Node),
%% now() is used as a tag to separate different synch sessions
%% from each others. Global could be confused at bursty nodeups
%% because it couldn't separate the messages between the different
%% synch sessions started by a nodeup.
MyTag = now(),
put({sync_tag_my, Node}, MyTag),
?trace({sending_nodeup_to_locker, {node,Node},{mytag,MyTag}}),
S1#state.the_locker ! {nodeup, Node, MyTag},
%% In order to be compatible with unpatched R7 a locker
%% process was spawned. Vsn 5 is no longer compatible with
%% vsn 3 nodes, so the locker process is no longer needed.
%% The permanent locker takes its place.
NotAPid = no_longer_a_pid,
Locker = {locker, NotAPid, S1#state.known, S1#state.the_locker},
InitC = {init_connect, {?vsn, MyTag}, node(), Locker},
Rs = S1#state.resolvers,
?trace({casting_init_connect, {node,Node},{initmessage,InitC},
{resolvers,Rs}}),
gen_server:cast({global_name_server, Node}, InitC),
Resolver = start_resolver(Node, MyTag),
S = trace_message(S1, {new_resolver, Node}, [MyTag, Resolver]),
{noreply, S#state{resolvers = [{Node, MyTag, Resolver} | Rs]}}
end;
handle_info({whereis, Name, From}, S) ->
do_whereis(Name, From),
{noreply, S};
handle_info(known, S) ->
io:format(">>>> ~p\n",[S#state.known]),
{noreply, S};
%% "High level trace". For troubleshooting only.
handle_info(high_level_trace, S) ->
case S of
#state{trace = [{Node, _Time, _M, Nodes, _X} | _]} ->
send_high_level_trace(),
CNode = node(),
CNodes = nodes(),
case {CNode, CNodes} of
{Node, Nodes} ->
{noreply, S};
_ ->
{New, _, Old} =
sofs:symmetric_partition(sofs:set([CNode|CNodes]),
sofs:set([Node|Nodes])),
M = {nodes_changed, {sofs:to_external(New),
sofs:to_external(Old)}},
{noreply, trace_message(S, M, [])}
end;
_ ->
{noreply, S}
end;
handle_info({trace_message, M}, S) ->
{noreply, trace_message(S, M, [])};
handle_info({trace_message, M, X}, S) ->
{noreply, trace_message(S, M, X)};
handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S0) ->
S1 = delete_lock(MonitorRef, S0),
S = del_name(MonitorRef, S1),
{noreply, S};
handle_info(Message, S) ->
error_logger:warning_msg("The global_name_server "
"received an unexpected message:\n"
"handle_info(~p, _)\n", [Message]),
{noreply, S}.
%%========================================================================
%%========================================================================
%%=============================== Internal Functions =====================
%%========================================================================
%%========================================================================
-define(HIGH_LEVEL_TRACE_INTERVAL, 500). % ms
wait_high_level_trace() ->
receive
high_level_trace ->
ok
after ?HIGH_LEVEL_TRACE_INTERVAL+1 ->
ok
end.
send_high_level_trace() ->
erlang:send_after(?HIGH_LEVEL_TRACE_INTERVAL, self(), high_level_trace).
-define(GLOBAL_RID, global).
%% Similar to trans(Id, Fun), but always uses global's own lock
%% on all nodes known to global, making sure that no new nodes have
%% become known while we got the list of known nodes.
trans_all_known(Fun) ->
Id = {?GLOBAL_RID, self()},
Nodes = set_lock_known(Id, 0),
try
Fun(Nodes)
after
delete_global_lock(Id, Nodes)
end.
set_lock_known(Id, Times) ->
Known = get_known(),
Nodes = [node() | Known],
Boss = the_boss(Nodes),
%% Use the same convention (a boss) as lock_nodes_safely. Optimization.
case set_lock_on_nodes(Id, [Boss]) of
true ->
case lock_on_known_nodes(Id, Known, Nodes) of
true ->
Nodes;
false ->
del_lock(Id, [Boss]),
random_sleep(Times),
set_lock_known(Id, Times+1)
end;
false ->
random_sleep(Times),
set_lock_known(Id, Times+1)
end.
lock_on_known_nodes(Id, Known, Nodes) ->
case set_lock_on_nodes(Id, Nodes) of
true ->
(get_known() -- Known) =:= [];
false ->
false
end.
set_lock_on_nodes(_Id, []) ->
true;
set_lock_on_nodes(Id, Nodes) ->
case local_lock_check(Id, Nodes) of
true ->
Msg = {set_lock, Id},
{Replies, _} =
gen_server:multi_call(Nodes, global_name_server, Msg),
?trace({set_lock,{me,self()},Id,{nodes,Nodes},{replies,Replies}}),
check_replies(Replies, Id, Replies);
false=Reply ->
Reply
end.
%% Probe lock on local node to see if one should go on trying other nodes.
local_lock_check(_Id, [_] = _Nodes) ->
true;
local_lock_check(Id, Nodes) ->
not lists:member(node(), Nodes) orelse (can_set_lock(Id) =/= false).
check_replies([{_Node, true} | T], Id, Replies) ->
check_replies(T, Id, Replies);
check_replies([{_Node, false=Reply} | _T], _Id, [_]) ->
Reply;
check_replies([{_Node, false=Reply} | _T], Id, Replies) ->
TrueReplyNodes = [N || {N, true} <- Replies],
?trace({check_replies, {true_reply_nodes, TrueReplyNodes}}),
gen_server:multi_call(TrueReplyNodes, global_name_server, {del_lock, Id}),
Reply;
check_replies([], _Id, _Replies) ->
true.
%%========================================================================
%% Another node wants to synchronize its registered names with us.
%% Both nodes must have a lock before they are allowed to continue.
%%========================================================================
init_connect(Vsn, Node, InitMsg, HisTag, Resolvers, S) ->
%% It is always the responsibility of newer versions to understand
%% older versions of the protocol.
put({prot_vsn, Node}, Vsn),
put({sync_tag_his, Node}, HisTag),
case lists:keyfind(Node, 1, Resolvers) of
{Node, MyTag, _Resolver} ->
MyTag = get({sync_tag_my, Node}), % assertion
{locker, _NoLongerAPid, _HisKnown0, HisTheLocker} = InitMsg,
?trace({init_connect,{histhelocker,HisTheLocker}}),
HisKnown = [],
S#state.the_locker ! {his_the_locker, HisTheLocker,
{Vsn,HisKnown}, S#state.known};
false ->
?trace({init_connect,{pre_connect,Node},{histag,HisTag}}),
put({pre_connect, Node}, {Vsn, InitMsg, HisTag})
end.
%%========================================================================
%% In the simple case, we'll get lock_is_set before we get exchange,
%% but we may get exchange before we get lock_is_set from our locker.
%% If that's the case, we'll have to remember the exchange info, and
%% handle it when we get the lock_is_set. We do this by using the
%% process dictionary - when the lock_is_set msg is received, we store
%% this info. When exchange is received, we can check the dictionary
%% if the lock_is_set has been received. If not, we store info about
%% the exchange instead. In the lock_is_set we must first check if
%% exchange info is stored, in that case we take care of it.
%%========================================================================
lock_is_set(Node, Resolvers, LockId) ->
gen_server:cast({global_name_server, Node},
{exchange, node(), get_names(), _ExtNames = [],
get({sync_tag_his, Node})}),
put({lock_id, Node}, LockId),
%% If both have the lock, continue with exchange.
case get({wait_lock, Node}) of
{exchange, NameList} ->
put({wait_lock, Node}, lock_is_set),
exchange(Node, NameList, Resolvers);
undefined ->
put({wait_lock, Node}, lock_is_set)
end.
%%========================================================================
%% exchange
%%========================================================================
exchange(Node, NameList, Resolvers) ->
?trace({'####', exchange, {node,Node}, {namelist,NameList},
{resolvers, Resolvers}}),
case erase({wait_lock, Node}) of
lock_is_set ->
{Node, _Tag, Resolver} = lists:keyfind(Node, 1, Resolvers),
Resolver ! {resolve, NameList, Node};
undefined ->
put({wait_lock, Node}, {exchange, NameList})
end.
resolved(Node, HisResolved, HisKnown, Names_ext, S0) ->
Ops = erase({save_ops, Node}) ++ HisResolved,
%% Known may have shrunk since the lock was taken (due to nodedowns).
Known = S0#state.known,
Synced = S0#state.synced,
NewNodes = [Node | HisKnown],
sync_others(HisKnown),
ExtraInfo = [{vsn,get({prot_vsn, Node})}, {lock, get({lock_id, Node})}],
S = do_ops(Ops, node(), Names_ext, ExtraInfo, S0),
%% I am synced with Node, but not with HisKnown yet
lists:foreach(fun(Pid) -> Pid ! {synced, [Node]} end, S#state.syncers),
S3 = lists:foldl(fun(Node1, S1) ->
F = fun(Tag) -> cancel_locker(Node1,S1,Tag) end,
cancel_resolved_locker(Node1, F)
end, S, HisKnown),
%% The locker that took the lock is asked to send
%% the {new_nodes, ...} message. This ensures that
%% {del_lock, ...} is received after {new_nodes, ...}
%% (except when abcast spawns process(es)...).
NewNodesF = fun() ->
gen_server:abcast(Known, global_name_server,
{new_nodes, node(), Ops, Names_ext,
NewNodes, ExtraInfo})
end,
F = fun(Tag) -> cancel_locker(Node, S3, Tag, NewNodesF) end,
S4 = cancel_resolved_locker(Node, F),
%% See (*) below... we're node b in that description
AddedNodes = (NewNodes -- Known),
NewKnown = Known ++ AddedNodes,
S4#state.the_locker ! {add_to_known, AddedNodes},
NewS = trace_message(S4, {added, AddedNodes},
[{new_nodes, NewNodes}, {abcast, Known}, {ops,Ops}]),
NewS#state{known = NewKnown, synced = [Node | Synced]}.
cancel_resolved_locker(Node, CancelFun) ->
Tag = get({sync_tag_my, Node}),
?trace({calling_cancel_locker,Tag,get()}),
S = CancelFun(Tag),
reset_node_state(Node),
S.
new_nodes(Ops, ConnNode, Names_ext, Nodes, ExtraInfo, S0) ->
Known = S0#state.known,
%% (*) This one requires some thought...
%% We're node a, other nodes b and c:
%% The problem is that {in_sync, a} may arrive before {resolved, [a]} to
%% b from c, leading to b sending {new_nodes, [a]} to us (node a).
%% Therefore, we make sure we never get duplicates in Known.
AddedNodes = lists:delete(node(), Nodes -- Known),
sync_others(AddedNodes),
S = do_ops(Ops, ConnNode, Names_ext, ExtraInfo, S0),
?trace({added_nodes_in_sync,{added_nodes,AddedNodes}}),
S#state.the_locker ! {add_to_known, AddedNodes},
S1 = trace_message(S, {added, AddedNodes}, [{ops,Ops}]),
S1#state{known = Known ++ AddedNodes}.
do_whereis(Name, From) ->
case is_global_lock_set() of
false ->
gen_server:reply(From, where(Name));
true ->
send_again({whereis, Name, From})
end.
-spec terminate(term(), state()) -> 'ok'.
terminate(_Reason, _S) ->
true = ets:delete(global_names),
true = ets:delete(global_names_ext),
true = ets:delete(global_locks),
true = ets:delete(global_pid_names),
true = ets:delete(global_pid_ids),
ok.
-spec code_change(term(), state(), term()) -> {'ok', state()}.
code_change(_OldVsn, S, _Extra) ->
{ok, S}.
%% The resolver runs exchange_names in a separate process. The effect
%% is that locks can be used at the same time as name resolution takes
%% place.
start_resolver(Node, MyTag) ->
spawn(fun() -> resolver(Node, MyTag) end).
resolver(Node, Tag) ->
receive
{resolve, NameList, Node} ->
?trace({resolver, {me,self()}, {node,Node}, {namelist,NameList}}),
{Ops, Resolved} = exchange_names(NameList, Node, [], []),
Exchange = {exchange_ops, Node, Tag, Ops, Resolved},
gen_server:cast(global_name_server, Exchange),
exit(normal);
_ -> % Ignore garbage.
resolver(Node, Tag)
end.
resend_pre_connect(Node) ->
case erase({pre_connect, Node}) of
{Vsn, InitMsg, HisTag} ->
gen_server:cast(self(),
{init_connect, {Vsn, HisTag}, Node, InitMsg});
_ ->
ok
end.
ins_name(Name, Pid, Method, FromPidOrNode, ExtraInfo, S0) ->
?trace({ins_name,insert,{name,Name},{pid,Pid}}),
S1 = delete_global_name_keep_pid(Name, S0),
S = trace_message(S1, {ins_name, node(Pid)}, [Name, Pid]),
insert_global_name(Name, Pid, Method, FromPidOrNode, ExtraInfo, S).
ins_name_ext(Name, Pid, Method, RegNode, FromPidOrNode, ExtraInfo, S0) ->
?trace({ins_name_ext, {name,Name}, {pid,Pid}}),
S1 = delete_global_name_keep_pid(Name, S0),
dolink_ext(Pid, RegNode),
S = trace_message(S1, {ins_name_ext, node(Pid)}, [Name, Pid]),
true = ets:insert(global_names_ext, {Name, Pid, RegNode}),
insert_global_name(Name, Pid, Method, FromPidOrNode, ExtraInfo, S).
where(Name) ->
case ets:lookup(global_names, Name) of
[{_Name, Pid, _Method, _RPid, _Ref}] ->
if node(Pid) == node() ->
case is_process_alive(Pid) of
true -> Pid;
false -> undefined
end;
true ->
Pid
end;
[] -> undefined
end.
handle_set_lock(Id, Pid, S) ->
?trace({handle_set_lock, Id, Pid}),
case can_set_lock(Id) of
{true, PidRefs} ->
case pid_is_locking(Pid, PidRefs) of
true ->
{true, S};
false ->
{true, insert_lock(Id, Pid, PidRefs, S)}
end;
false=Reply ->
{Reply, S}
end.
can_set_lock({ResourceId, LockRequesterId}) ->
case ets:lookup(global_locks, ResourceId) of
[{ResourceId, LockRequesterId, PidRefs}] ->
{true, PidRefs};
[{ResourceId, _LockRequesterId2, _PidRefs}] ->
false;
[] ->
{true, []}
end.
insert_lock({ResourceId, LockRequesterId}=Id, Pid, PidRefs, S) ->
{RPid, Ref} = do_monitor(Pid),
true = ets:insert(global_pid_ids, {Pid, ResourceId}),
true = ets:insert(global_pid_ids, {Ref, ResourceId}),
Lock = {ResourceId, LockRequesterId, [{Pid,RPid,Ref} | PidRefs]},
true = ets:insert(global_locks, Lock),
trace_message(S, {ins_lock, node(Pid)}, [Id, Pid]).
is_global_lock_set() ->
is_lock_set(?GLOBAL_RID).
is_lock_set(ResourceId) ->
ets:member(global_locks, ResourceId).
handle_del_lock({ResourceId, LockReqId}, Pid, S0) ->
?trace({handle_del_lock, {pid,Pid},{id,{ResourceId, LockReqId}}}),
case ets:lookup(global_locks, ResourceId) of
[{ResourceId, LockReqId, PidRefs}]->
remove_lock(ResourceId, LockReqId, Pid, PidRefs, false, S0);
_ -> S0
end.
remove_lock(ResourceId, LockRequesterId, Pid, [{Pid,RPid,Ref}], Down, S0) ->
?trace({remove_lock_1, {id,ResourceId},{pid,Pid}}),
true = erlang:demonitor(Ref, [flush]),
kill_monitor_proc(RPid, Pid),
true = ets:delete(global_locks, ResourceId),
true = ets:delete_object(global_pid_ids, {Pid, ResourceId}),
true = ets:delete_object(global_pid_ids, {Ref, ResourceId}),
S = case ResourceId of
?GLOBAL_RID -> S0#state{global_lock_down = Down};
_ -> S0
end,
trace_message(S, {rem_lock, node(Pid)},
[{ResourceId, LockRequesterId}, Pid]);
remove_lock(ResourceId, LockRequesterId, Pid, PidRefs0, _Down, S) ->
?trace({remove_lock_2, {id,ResourceId},{pid,Pid}}),
PidRefs = case lists:keyfind(Pid, 1, PidRefs0) of
{Pid, RPid, Ref} ->
true = erlang:demonitor(Ref, [flush]),
kill_monitor_proc(RPid, Pid),
true = ets:delete_object(global_pid_ids,
{Ref, ResourceId}),
lists:keydelete(Pid, 1, PidRefs0);
false ->
PidRefs0
end,
Lock = {ResourceId, LockRequesterId, PidRefs},
true = ets:insert(global_locks, Lock),
true = ets:delete_object(global_pid_ids, {Pid, ResourceId}),
trace_message(S, {rem_lock, node(Pid)},
[{ResourceId, LockRequesterId}, Pid]).
kill_monitor_proc(Pid, Pid) ->
ok;
kill_monitor_proc(RPid, _Pid) ->
exit(RPid, kill).
do_ops(Ops, ConnNode, Names_ext, ExtraInfo, S0) ->
?trace({do_ops, {ops,Ops}}),
XInserts = [{Name, Pid, RegNode, Method} ||
{Name2, Pid2, RegNode} <- Names_ext,
{insert, {Name, Pid, Method}} <- Ops,
Name =:= Name2, Pid =:= Pid2],
S1 = lists:foldl(fun({Name, Pid, RegNode, Method}, S1) ->
ins_name_ext(Name, Pid, Method, RegNode,
ConnNode, ExtraInfo, S1)
end, S0, XInserts),
XNames = [Name || {Name, _Pid, _RegNode, _Method} <- XInserts],
Inserts = [{Name, Pid, node(Pid), Method} ||
{insert, {Name, Pid, Method}} <- Ops,
not lists:member(Name, XNames)],
S2 = lists:foldl(fun({Name, Pid, _RegNode, Method}, S2) ->
ins_name(Name, Pid, Method, ConnNode,
ExtraInfo, S2)
end, S1, Inserts),
DelNames = [Name || {delete, Name} <- Ops],
lists:foldl(fun(Name, S) -> delete_global_name2(Name, S)
end, S2, DelNames).
%% It is possible that a node that was up and running when the
%% operations were assembled has since died. The final {in_sync,...}
%% messages do not generate nodedown messages for such nodes. To
%% compensate "artificial" nodedown messages are created. Since
%% monitor_node may take some time processes are spawned to avoid
%% locking up the global_name_server. Should somehow double nodedown
%% messages occur (one of them artificial), nothing bad can happen
%% (the second nodedown is a no-op). It is assumed that there cannot
%% be a nodeup before the artificial nodedown.
%%
%% The extra nodedown messages generated here also take care of the
%% case that a nodedown message is received _before_ the operations
%% are run.
sync_others(Nodes) ->
N = case application:get_env(kernel, ?N_CONNECT_RETRIES) of
{ok, NRetries} when is_integer(NRetries),
NRetries >= 0 -> NRetries;
_ -> ?DEFAULT_N_CONNECT_RETRIES
end,
lists:foreach(fun(Node) ->
spawn(fun() -> sync_other(Node, N) end)
end, Nodes).
sync_other(Node, N) ->
erlang:monitor_node(Node, true, [allow_passive_connect]),
receive
{nodedown, Node} when N > 0 ->
sync_other(Node, N - 1);
{nodedown, Node} ->
?trace({missing_nodedown, {node, Node}}),
error_logger:warning_msg("global: ~w failed to connect to ~w\n",
[node(), Node]),
global_name_server ! {extra_nodedown, Node}
after 0 ->
gen_server:cast({global_name_server,Node}, {in_sync,node(),true})
end.
% monitor_node(Node, false),
% exit(normal).
insert_global_name(Name, Pid, Method, FromPidOrNode, ExtraInfo, S) ->
{RPid, Ref} = do_monitor(Pid),
true = ets:insert(global_names, {Name, Pid, Method, RPid, Ref}),
true = ets:insert(global_pid_names, {Pid, Name}),
true = ets:insert(global_pid_names, {Ref, Name}),
case lock_still_set(FromPidOrNode, ExtraInfo, S) of
true ->
S;
false ->
%% The node that took the lock has gone down and then up
%% again. The {register, ...} or {new_nodes, ...} message
%% was delayed and arrived after nodeup (maybe it caused
%% the nodeup). The DOWN signal from the monitor of the
%% lock has removed the lock.
%% Note: it is assumed here that the DOWN signal arrives
%% _before_ nodeup and any message that caused nodeup.
%% This is true of Erlang/OTP.
delete_global_name2(Name, S)
end.
lock_still_set(PidOrNode, ExtraInfo, S) ->
case ets:lookup(global_locks, ?GLOBAL_RID) of
[{?GLOBAL_RID, _LockReqId, PidRefs}] when is_pid(PidOrNode) ->
%% Name registration.
lists:keymember(PidOrNode, 1, PidRefs);
[{?GLOBAL_RID, LockReqId, _PidRefs}] when is_atom(PidOrNode) ->
{?GLOBAL_RID, LockId} = extra_info(lock, ExtraInfo),
LockReqId =:= LockId;
[] ->
not S#state.global_lock_down
end.
extra_info(Tag, ExtraInfo) ->
%% ExtraInfo used to be a list of nodes (vsn 2).
case catch lists:keyfind(Tag, 1, ExtraInfo) of
{Tag, Info} ->
Info;
_ ->
undefined
end.
del_name(Ref, S) ->
NameL = [Name ||
{_, Name} <- ets:lookup(global_pid_names, Ref),
{_, _Pid, _Method, _RPid, Ref1} <-
ets:lookup(global_names, Name),
Ref1 =:= Ref],
case NameL of
[Name] ->
delete_global_name2(Name, S);
[] ->
S
end.
%% Keeps the entry in global_names for whereis_name/1.
delete_global_name_keep_pid(Name, S) ->
case ets:lookup(global_names, Name) of
[{Name, Pid, _Method, RPid, Ref}] ->
delete_global_name2(Name, Pid, RPid, Ref, S);
[] ->
S
end.
delete_global_name2(Name, S) ->
case ets:lookup(global_names, Name) of
[{Name, Pid, _Method, RPid, Ref}] ->
true = ets:delete(global_names, Name),
delete_global_name2(Name, Pid, RPid, Ref, S);
[] ->
S
end.
delete_global_name2(Name, Pid, RPid, Ref, S) ->
true = erlang:demonitor(Ref, [flush]),
kill_monitor_proc(RPid, Pid),
delete_global_name(Name, Pid),
?trace({delete_global_name,{item,Name},{pid,Pid}}),
true = ets:delete_object(global_pid_names, {Pid, Name}),
true = ets:delete_object(global_pid_names, {Ref, Name}),
case ets:lookup(global_names_ext, Name) of
[{Name, Pid, RegNode}] ->
true = ets:delete(global_names_ext, Name),
?trace({delete_global_name, {name,Name,{pid,Pid},{RegNode,Pid}}}),
dounlink_ext(Pid, RegNode);
[] ->
?trace({delete_global_name,{name,Name,{pid,Pid},{node(Pid),Pid}}}),
ok
end,
trace_message(S, {del_name, node(Pid)}, [Name, Pid]).
%% delete_global_name/2 is traced by the inviso application.
%% Do not change.
delete_global_name(_Name, _Pid) ->
ok.
%%-----------------------------------------------------------------
%% The locker is a satellite process to global_name_server. When a
%% nodeup is received from a new node the global_name_server sends a
%% message to the locker. The locker tries to set a lock in our
%% partition, i.e. on all nodes known to us. When the lock is set, it
%% tells global_name_server about it, and keeps the lock set.
%% global_name_server sends a cancel message to the locker when the
%% partitions are connected.
%% There are two versions of the protocol between lockers on two nodes:
%% Version 1: used by unpatched R7.
%% Version 2: the messages exchanged between the lockers include the known
%% nodes (see OTP-3576).
%%-----------------------------------------------------------------
-define(locker_vsn, 2).
-record(multi,
{local = [], % Requests from nodes on the local host.
remote = [], % Other requests.
known = [], % Copy of global_name_server's known nodes. It's
% faster to keep a copy of known than asking
% for it when needed.
the_boss, % max([node() | 'known'])
just_synced = false, % true if node() synced just a moment ago
%% Statistics:
do_trace % bool()
}).
-record(him, {node, locker, vsn, my_tag}).
start_the_locker(DoTrace) ->
spawn_link(fun() -> init_the_locker(DoTrace) end).
init_the_locker(DoTrace) ->
process_flag(trap_exit, true), % needed?
S0 = #multi{do_trace = DoTrace},
S1 = update_locker_known({add, get_known()}, S0),
loop_the_locker(S1),
erlang:error(locker_exited).
loop_the_locker(S) ->
?trace({loop_the_locker,S}),
receive
Message when element(1, Message) =/= nodeup ->
the_locker_message(Message, S)
after 0 ->
Timeout =
case {S#multi.local, S#multi.remote} of
{[],[]} ->
infinity;
_ ->
%% It is important that the timeout is greater
%% than zero, or the chance that some other node
%% in the partition sets the lock once this node
%% has failed after setting the lock is very slim.
if
S#multi.just_synced ->
0; % no reason to wait after success
S#multi.known =:= [] ->
200; % just to get started
true ->
erlang:min(1000 + 100*length(S#multi.known),
3000)
end
end,
S1 = S#multi{just_synced = false},
receive
Message when element(1, Message) =/= nodeup ->
the_locker_message(Message, S1)
after Timeout ->
case is_global_lock_set() of
true ->
loop_the_locker(S1);
false ->
select_node(S1)
end
end
end.
the_locker_message({his_the_locker, HisTheLocker, HisKnown0, _MyKnown}, S) ->
?trace({his_the_locker, HisTheLocker, {node,node(HisTheLocker)}}),
{HisVsn, _HisKnown} = HisKnown0,
true = HisVsn > 4,
receive
{nodeup, Node, MyTag} when node(HisTheLocker) =:= Node ->
?trace({the_locker_nodeup, {node,Node},{mytag,MyTag}}),
Him = #him{node = node(HisTheLocker), my_tag = MyTag,
locker = HisTheLocker, vsn = HisVsn},
loop_the_locker(add_node(Him, S));
{cancel, Node, _Tag, no_fun} when node(HisTheLocker) =:= Node ->
loop_the_locker(S)
after 60000 ->
?trace({nodeupnevercame, node(HisTheLocker)}),
error_logger:error_msg("global: nodeup never came ~w ~w\n",
[node(), node(HisTheLocker)]),
loop_the_locker(S#multi{just_synced = false})
end;
the_locker_message({cancel, _Node, undefined, no_fun}, S) ->
?trace({cancel_the_locker, undefined, {node,_Node}}),
%% If we actually cancel something when a cancel message with the
%% tag 'undefined' arrives, we may be acting on an old nodedown,
%% to cancel a new nodeup, so we can't do that.
loop_the_locker(S);
the_locker_message({cancel, Node, Tag, no_fun}, S) ->
?trace({the_locker, cancel, {multi,S}, {tag,Tag},{node,Node}}),
receive
{nodeup, Node, Tag} ->
?trace({cancelnodeup2, {node,Node},{tag,Tag}}),
ok
after 0 ->
ok
end,
loop_the_locker(remove_node(Node, S));
the_locker_message({lock_set, _Pid, false, _}, S) ->
?trace({the_locker, spurious, {node,node(_Pid)}}),
loop_the_locker(S);
the_locker_message({lock_set, Pid, true, _HisKnown}, S) ->
Node = node(Pid),
?trace({the_locker, self(), spontaneous, {node,Node}}),
case find_node_tag(Node, S) of
{true, MyTag, HisVsn} ->
LockId = locker_lock_id(Pid, HisVsn),
{IsLockSet, S1} = lock_nodes_safely(LockId, [], S),
Pid ! {lock_set, self(), IsLockSet, S1#multi.known},
Known2 = [node() | S1#multi.known],
?trace({the_locker, spontaneous, {known2, Known2},
{node,Node}, {is_lock_set,IsLockSet}}),
case IsLockSet of
true ->
gen_server:cast(global_name_server,
{lock_is_set, Node, MyTag, LockId}),
?trace({lock_sync_done, {pid,Pid},
{node,node(Pid)}, {me,self()}}),
%% Wait for global to tell us to remove lock.
%% Should the other locker's node die,
%% global_name_server will receive nodedown, and
%% then send {cancel, Node, Tag}.
receive
{cancel, Node, _Tag, Fun} ->
?trace({cancel_the_lock,{node,Node}}),
call_fun(Fun),
delete_global_lock(LockId, Known2)
end,
S2 = S1#multi{just_synced = true},
loop_the_locker(remove_node(Node, S2));
false ->
loop_the_locker(S1#multi{just_synced = false})
end;
false ->
?trace({the_locker, not_there, {node,Node}}),
Pid ! {lock_set, self(), false, S#multi.known},
loop_the_locker(S)
end;
the_locker_message({add_to_known, Nodes}, S) ->
S1 = update_locker_known({add, Nodes}, S),
loop_the_locker(S1);
the_locker_message({remove_from_known, Node}, S) ->
S1 = update_locker_known({remove, Node}, S),
loop_the_locker(S1);
the_locker_message({do_trace, DoTrace}, S) ->
loop_the_locker(S#multi{do_trace = DoTrace});
the_locker_message(Other, S) ->
unexpected_message(Other, locker),
?trace({the_locker, {other_msg, Other}}),
loop_the_locker(S).
%% Requests from nodes on the local host are chosen before requests
%% from other nodes. This should be a safe optimization.
select_node(S) ->
UseRemote = S#multi.local =:= [],
Others1 = if UseRemote -> S#multi.remote; true -> S#multi.local end,
Others2 = exclude_known(Others1, S#multi.known),
S1 = if
UseRemote -> S#multi{remote = Others2};
true -> S#multi{local = Others2}
end,
if
Others2 =:= [] ->
loop_the_locker(S1);
true ->
Him = random_element(Others2),
#him{locker = HisTheLocker, vsn = HisVsn,
node = Node, my_tag = MyTag} = Him,
HisNode = [Node],
Us = [node() | HisNode],
LockId = locker_lock_id(HisTheLocker, HisVsn),
?trace({select_node, self(), {us, Us}}),
%% HisNode = [Node] prevents deadlock:
{IsLockSet, S2} = lock_nodes_safely(LockId, HisNode, S1),
case IsLockSet of
true ->
Known1 = Us ++ S2#multi.known,
?trace({sending_lock_set, self(), {his,HisTheLocker}}),
HisTheLocker ! {lock_set, self(), true, S2#multi.known},
S3 = lock_is_set(S2, Him, MyTag, Known1, LockId),
loop_the_locker(S3);
false ->
loop_the_locker(S2)
end
end.
%% Version 5: Both sides use the same requester id. Thereby the nodes
%% common to both sides are locked by both locker processes. This
%% means that the lock is still there when the 'new_nodes' message is
%% received even if the other side has deleted the lock.
locker_lock_id(Pid, Vsn) when Vsn > 4 ->
{?GLOBAL_RID, lists:sort([self(), Pid])}.
lock_nodes_safely(LockId, Extra, S0) ->
%% Locking node() could stop some node that has already locked the
%% boss, so just check if it is possible to lock node().
First = delete_nonode([S0#multi.the_boss]),
case ([node()] =:= First) orelse (can_set_lock(LockId) =/= false) of
true ->
%% Locking the boss first is an optimization.
case set_lock(LockId, First, 0) of
true ->
S = update_locker_known(S0),
%% The boss may have changed, but don't bother.
Second = delete_nonode([node() | Extra] -- First),
case set_lock(LockId, Second, 0) of
true ->
Known = S#multi.known,
case set_lock(LockId, Known -- First, 0) of
true ->
_ = locker_trace(S, ok, {First, Known}),
{true, S};
false ->
%% Since the boss is locked we
%% should have gotten the lock, at
%% least if no one else is locking
%% 'global'. Calling set_lock with
%% Retries > 0 does not seem to
%% speed things up.
SoFar = First ++ Second,
del_lock(LockId, SoFar),
_ = locker_trace(S, not_ok, {Known,SoFar}),
{false, S}
end;
false ->
del_lock(LockId, First),
_ = locker_trace(S, not_ok, {Second, First}),
{false, S}
end;
false ->
_ = locker_trace(S0, not_ok, {First, []}),
{false, S0}
end;
false ->
{false, S0}
end.
delete_nonode(L) ->
lists:delete(nonode@nohost, L).
%% Let the server add timestamp.
locker_trace(#multi{do_trace = false}, _, _Nodes) ->
ok;
locker_trace(#multi{do_trace = true}, ok, Ns) ->
global_name_server ! {trace_message, {locker_succeeded, node()}, Ns};
locker_trace(#multi{do_trace = true}, not_ok, Ns) ->
global_name_server ! {trace_message, {locker_failed, node()}, Ns};
locker_trace(#multi{do_trace = true}, rejected, Ns) ->
global_name_server ! {trace_message, {lock_rejected, node()}, Ns}.
update_locker_known(S) ->
receive
{add_to_known, Nodes} ->
S1 = update_locker_known({add, Nodes}, S),
update_locker_known(S1);
{remove_from_known, Node} ->
S1 = update_locker_known({remove, Node}, S),
update_locker_known(S1)
after 0 ->
S
end.
update_locker_known(Upd, S) ->
Known = case Upd of
{add, Nodes} -> Nodes ++ S#multi.known;
{remove, Node} -> lists:delete(Node, S#multi.known)
end,
TheBoss = the_boss([node() | Known]),
S#multi{known = Known, the_boss = TheBoss}.
random_element(L) ->
{A,B,C} = now(),
E = (A+B+C) rem length(L),
lists:nth(E+1, L).
exclude_known(Others, Known) ->
[N || N <- Others, not lists:member(N#him.node, Known)].
lock_is_set(S, Him, MyTag, Known1, LockId) ->
Node = Him#him.node,
receive
{lock_set, P, true, _} when node(P) =:= Node ->
gen_server:cast(global_name_server,
{lock_is_set, Node, MyTag, LockId}),
?trace({lock_sync_done, {p,P, node(P)}, {me,self()}}),
%% Wait for global to tell us to remove lock. Should the
%% other locker's node die, global_name_server will
%% receive nodedown, and then send {cancel, Node, Tag, Fun}.
receive
{cancel, Node, _, Fun} ->
?trace({lock_set_loop, {known1,Known1}}),
call_fun(Fun),
delete_global_lock(LockId, Known1)
end,
S#multi{just_synced = true,
local = lists:delete(Him, S#multi.local),
remote = lists:delete(Him, S#multi.remote)};
{lock_set, P, false, _} when node(P) =:= Node ->
?trace({not_both_set, {node,Node},{p, P},{known1,Known1}}),
_ = locker_trace(S, rejected, Known1),
delete_global_lock(LockId, Known1),
S;
{cancel, Node, _, Fun} ->
?trace({the_locker, cancel2, {node,Node}}),
call_fun(Fun),
_ = locker_trace(S, rejected, Known1),
delete_global_lock(LockId, Known1),
remove_node(Node, S);
{'EXIT', _, _} ->
?trace({the_locker, exit, {node,Node}}),
_ = locker_trace(S, rejected, Known1),
delete_global_lock(LockId, Known1),
S
%% There used to be an 'after' clause (OTP-4902), but it is
%% no longer needed:
%% OTP-5770. Version 5 of the protocol. Deadlock can no longer
%% occur due to the fact that if a partition is locked, one
%% node in the other partition is also locked with the same
%% lock-id, which makes it impossible for any node in the
%% other partition to lock its partition unless it negotiates
%% with the first partition.
end.
%% The locker does the {new_nodes, ...} call before removing the lock.
call_fun(no_fun) ->
ok;
call_fun(Fun) ->
Fun().
%% The lock on the boss is removed last. The purpose is to reduce the
%% risk of failing to lock the known nodes after having locked the
%% boss. (Assumes the boss occurs only once.)
delete_global_lock(LockId, Nodes) ->
TheBoss = the_boss(Nodes),
del_lock(LockId, lists:delete(TheBoss, Nodes)),
del_lock(LockId, [TheBoss]).
the_boss(Nodes) ->
lists:max(Nodes).
find_node_tag(Node, S) ->
case find_node_tag2(Node, S#multi.local) of
false ->
find_node_tag2(Node, S#multi.remote);
Reply ->
Reply
end.
find_node_tag2(_Node, []) ->
false;
find_node_tag2(Node, [#him{node = Node, my_tag = MyTag, vsn = HisVsn} | _]) ->
{true, MyTag, HisVsn};
find_node_tag2(Node, [_E | Rest]) ->
find_node_tag2(Node, Rest).
remove_node(Node, S) ->
S#multi{local = remove_node2(Node, S#multi.local),
remote = remove_node2(Node, S#multi.remote)}.
remove_node2(_Node, []) ->
[];
remove_node2(Node, [#him{node = Node} | Rest]) ->
Rest;
remove_node2(Node, [E | Rest]) ->
[E | remove_node2(Node, Rest)].
add_node(Him, S) ->
case is_node_local(Him#him.node) of
true ->
S#multi{local = [Him | S#multi.local]};
false ->
S#multi{remote = [Him | S#multi.remote]}
end.
is_node_local(Node) ->
{ok, Host} = inet:gethostname(),
case catch split_node(atom_to_list(Node), $@, []) of
[_, Host] ->
true;
_ ->
false
end.
split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])];
split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]);
split_node([], _, Ack) -> [lists:reverse(Ack)].
cancel_locker(Node, S, Tag) ->
cancel_locker(Node, S, Tag, no_fun).
cancel_locker(Node, S, Tag, ToBeRunOnLockerF) ->
S#state.the_locker ! {cancel, Node, Tag, ToBeRunOnLockerF},
Resolvers = S#state.resolvers,
?trace({cancel_locker, {node,Node},{tag,Tag},
{sync_tag_my, get({sync_tag_my, Node})},{resolvers,Resolvers}}),
case lists:keyfind(Node, 1, Resolvers) of
{_, Tag, Resolver} ->
?trace({{resolver, Resolver}}),
exit(Resolver, kill),
S1 = trace_message(S, {kill_resolver, Node}, [Tag, Resolver]),
S1#state{resolvers = lists:keydelete(Node, 1, Resolvers)};
_ ->
S
end.
reset_node_state(Node) ->
?trace({{node,Node}, reset_node_state, get()}),
erase({wait_lock, Node}),
erase({save_ops, Node}),
erase({pre_connect, Node}),
erase({prot_vsn, Node}),
erase({sync_tag_my, Node}),
erase({sync_tag_his, Node}),
erase({lock_id, Node}).
%% Some node sent us his names. When a name clash is found, the resolve
%% function is called from the smaller node => all resolve funcs are called
%% from the same partition.
exchange_names([{Name, Pid, Method} | Tail], Node, Ops, Res) ->
case ets:lookup(global_names, Name) of
[{Name, Pid, _Method, _RPid2, _Ref2}] ->
exchange_names(Tail, Node, Ops, Res);
[{Name, Pid2, Method2, _RPid2, _Ref2}] when node() < Node ->
%% Name clash! Add the result of resolving to Res(olved).
%% We know that node(Pid) =/= node(), so we don't
%% need to link/unlink to Pid.
Node2 = node(Pid2), %% Node2 is connected to node().
case rpc:call(Node2, ?MODULE, resolve_it,
[Method2, Name, Pid, Pid2]) of
Pid ->
Op = {insert, {Name, Pid, Method}},
exchange_names(Tail, Node, [Op | Ops], Res);
Pid2 ->
Op = {insert, {Name, Pid2, Method2}},
exchange_names(Tail, Node, Ops, [Op | Res]);
none ->
Op = {delete, Name},
exchange_names(Tail, Node, [Op | Ops], [Op | Res]);
{badrpc, Badrpc} ->
error_logger:info_msg("global: badrpc ~w received when "
"conflicting name ~w was found\n",
[Badrpc, Name]),
Op = {insert, {Name, Pid, Method}},
exchange_names(Tail, Node, [Op | Ops], Res);
Else ->
error_logger:info_msg("global: Resolve method ~w for "
"conflicting name ~w returned ~w\n",
[Method, Name, Else]),
Op = {delete, Name},
exchange_names(Tail, Node, [Op | Ops], [Op | Res])
end;
[{Name, _Pid2, _Method, _RPid, _Ref}] ->
%% The other node will solve the conflict.
exchange_names(Tail, Node, Ops, Res);
_ ->
%% Entirely new name.
exchange_names(Tail, Node,
[{insert, {Name, Pid, Method}} | Ops], Res)
end;
exchange_names([], _, Ops, Res) ->
?trace({exchange_names_finish,{ops,Ops},{res,Res}}),
{Ops, Res}.
resolve_it(Method, Name, Pid1, Pid2) ->
catch Method(Name, Pid1, Pid2).
minmax(P1,P2) ->
if node(P1) < node(P2) -> {P1, P2}; true -> {P2, P1} end.
-spec random_exit_name(Name, Pid1, Pid2) -> pid() when
Name :: term(),
Pid1 :: pid(),
Pid2 :: pid().
random_exit_name(Name, Pid, Pid2) ->
{Min, Max} = minmax(Pid, Pid2),
error_logger:info_msg("global: Name conflict terminating ~w\n",
[{Name, Max}]),
exit(Max, kill),
Min.
-spec random_notify_name(Name, Pid1, Pid2) -> pid() when
Name :: term(),
Pid1 :: pid(),
Pid2 :: pid().
random_notify_name(Name, Pid, Pid2) ->
{Min, Max} = minmax(Pid, Pid2),
Max ! {global_name_conflict, Name},
Min.
-spec notify_all_name(Name, Pid1, Pid2) -> 'none' when
Name :: term(),
Pid1 :: pid(),
Pid2 :: pid().
notify_all_name(Name, Pid, Pid2) ->
Pid ! {global_name_conflict, Name, Pid2},
Pid2 ! {global_name_conflict, Name, Pid},
none.
dolink_ext(Pid, RegNode) when RegNode =:= node() ->
link(Pid);
dolink_ext(_, _) ->
ok.
dounlink_ext(Pid, RegNode) when RegNode =:= node() ->
unlink_pid(Pid);
dounlink_ext(_Pid, _RegNode) ->
ok.
unlink_pid(Pid) ->
case ets:member(global_pid_names, Pid) of
false ->
case ets:member(global_pid_ids, Pid) of
false ->
unlink(Pid);
true ->
ok
end;
true ->
ok
end.
pid_is_locking(Pid, PidRefs) ->
lists:keyfind(Pid, 1, PidRefs) =/= false.
delete_lock(Ref, S0) ->
Locks = pid_locks(Ref),
F = fun({ResourceId, LockRequesterId, PidRefs}, S) ->
{Pid, _RPid, Ref} = lists:keyfind(Ref, 3, PidRefs),
remove_lock(ResourceId, LockRequesterId, Pid, PidRefs, true, S)
end,
lists:foldl(F, S0, Locks).
pid_locks(Ref) ->
L = lists:flatmap(fun({_, ResourceId}) ->
ets:lookup(global_locks, ResourceId)
end, ets:lookup(global_pid_ids, Ref)),
[Lock || Lock = {_Id, _Req, PidRefs} <- L,
rpid_is_locking(Ref, PidRefs)].
rpid_is_locking(Ref, PidRefs) ->
lists:keyfind(Ref, 3, PidRefs) =/= false.
handle_nodedown(Node, S) ->
%% DOWN signals from monitors have removed locks and registered names.
#state{known = Known, synced = Syncs} = S,
NewS = cancel_locker(Node, S, get({sync_tag_my, Node})),
NewS#state.the_locker ! {remove_from_known, Node},
reset_node_state(Node),
NewS#state{known = lists:delete(Node, Known),
synced = lists:delete(Node, Syncs)}.
get_names() ->
ets:select(global_names,
ets:fun2ms(fun({Name, Pid, Method, _RPid, _Ref}) ->
{Name, Pid, Method}
end)).
get_names_ext() ->
ets:tab2list(global_names_ext).
get_known() ->
gen_server:call(global_name_server, get_known, infinity).
random_sleep(Times) ->
case (Times rem 10) of
0 -> erase(random_seed);
_ -> ok
end,
case get(random_seed) of
undefined ->
{A1, A2, A3} = now(),
random:seed(A1, A2, A3 + erlang:phash(node(), 100000));
_ -> ok
end,
%% First time 1/4 seconds, then doubling each time up to 8 seconds max.
Tmax = if Times > 5 -> 8000;
true -> ((1 bsl Times) * 1000) div 8
end,
T = random:uniform(Tmax),
?trace({random_sleep, {me,self()}, {times,Times}, {t,T}, {tmax,Tmax}}),
receive after T -> ok end.
dec(infinity) -> infinity;
dec(N) -> N - 1.
send_again(Msg) ->
Me = self(),
spawn(fun() -> timer(Me, Msg) end).
timer(Pid, Msg) ->
random_sleep(5),
Pid ! Msg.
change_our_node_name(NewNode, S) ->
S1 = trace_message(S, {new_node_name, NewNode}, []),
S1#state{node_name = NewNode}.
trace_message(#state{trace = no_trace}=S, _M, _X) ->
S;
trace_message(S, M, X) ->
S#state{trace = [trace_message(M, X) | S#state.trace]}.
trace_message(M, X) ->
{node(), now(), M, nodes(), X}.
%%-----------------------------------------------------------------
%% Each sync process corresponds to one call to sync. Each such
%% process asks the global_name_server on all Nodes if it is in sync
%% with Nodes. If not, that (other) node spawns a syncer process that
%% waits for global to get in sync with all Nodes. When it is in
%% sync, the syncer process tells the original sync process about it.
%%-----------------------------------------------------------------
start_sync(Nodes, From) ->
spawn_link(fun() -> sync_init(Nodes, From) end).
sync_init(Nodes, From) ->
lists:foreach(fun(Node) -> monitor_node(Node, true) end, Nodes),
sync_loop(Nodes, From).
sync_loop([], From) ->
gen_server:reply(From, ok);
sync_loop(Nodes, From) ->
receive
{nodedown, Node} ->
monitor_node(Node, false),
sync_loop(lists:delete(Node, Nodes), From);
{synced, SNodes} ->
lists:foreach(fun(N) -> monitor_node(N, false) end, SNodes),
sync_loop(Nodes -- SNodes, From)
end.
%%%=======================================================================
%%% Get the current global_groups definition
%%%=======================================================================
check_sync_nodes() ->
case get_own_nodes() of
{ok, all} ->
nodes();
{ok, NodesNG} ->
%% global_groups parameter is defined, we are not allowed to sync
%% with nodes not in our own global group.
intersection(nodes(), NodesNG);
{error, _} = Error ->
Error
end.
check_sync_nodes(SyncNodes) ->
case get_own_nodes() of
{ok, all} ->
SyncNodes;
{ok, NodesNG} ->
%% global_groups parameter is defined, we are not allowed to sync
%% with nodes not in our own global group.
OwnNodeGroup = intersection(nodes(), NodesNG),
IllegalSyncNodes = (SyncNodes -- [node() | OwnNodeGroup]),
case IllegalSyncNodes of
[] -> SyncNodes;
_ -> {error, {"Trying to sync nodes not defined in "
"the own global group", IllegalSyncNodes}}
end;
{error, _} = Error ->
Error
end.
get_own_nodes() ->
case global_group:get_own_nodes_with_errors() of
{error, Error} ->
{error, {"global_groups definition error", Error}};
OkTup ->
OkTup
end.
%% The registrar is a helper process that registers and unregisters
%% names. Since it never dies it assures that names are registered and
%% unregistered on all known nodes. It is started by and linked to
%% global_name_server.
start_the_registrar() ->
spawn_link(fun() -> loop_the_registrar() end).
loop_the_registrar() ->
receive
{trans_all_known, Fun, From} ->
?trace({loop_the_registrar, self(), Fun, From}),
gen_server:reply(From, trans_all_known(Fun));
Other ->
unexpected_message(Other, register)
end,
loop_the_registrar().
unexpected_message({'EXIT', _Pid, _Reason}, _What) ->
%% global_name_server died
ok;
unexpected_message(Message, What) ->
error_logger:warning_msg("The global_name_server ~w process "
"received an unexpected message:\n~p\n",
[What, Message]).
%%% Utilities
%% When/if erlang:monitor() returns before trying to connect to the
%% other node this function can be removed.
do_monitor(Pid) ->
case (node(Pid) =:= node()) orelse lists:member(node(Pid), nodes()) of
true ->
%% Assume the node is still up
{Pid, erlang:monitor(process, Pid)};
false ->
F = fun() ->
Ref = erlang:monitor(process, Pid),
receive
{'DOWN', Ref, process, Pid, _Info} ->
exit(normal)
end
end,
erlang:spawn_monitor(F)
end.
intersection(_, []) ->
[];
intersection(L1, L2) ->
L1 -- (L1 -- L2).