aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel/src/global.erl
diff options
context:
space:
mode:
authorErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
committerErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/kernel/src/global.erl
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/kernel/src/global.erl')
-rw-r--r--lib/kernel/src/global.erl2244
1 files changed, 2244 insertions, 0 deletions
diff --git a/lib/kernel/src/global.erl b/lib/kernel/src/global.erl
new file mode 100644
index 0000000000..cc0402da73
--- /dev/null
+++ b/lib/kernel/src/global.erl
@@ -0,0 +1,2244 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(global).
+-behaviour(gen_server).
+
+%% Global provides global registration of process names. The names are
+%% dynamically kept up to date with the entire network. Global can
+%% operate in two modes: in a fully connected network, or in a
+%% non-fully connected network. In the latter case, the name
+%% registration mechanism won't work.
+%% As a separate service Global also provides global locks.
+
+%% External exports
+-export([start/0, start_link/0, stop/0, sync/0, sync/1,
+ safe_whereis_name/1, whereis_name/1, register_name/2,
+ register_name/3, register_name_external/2, register_name_external/3,
+ unregister_name_external/1,re_register_name/2, re_register_name/3,
+ unregister_name/1, registered_names/0, send/2, node_disconnected/1,
+ set_lock/1, set_lock/2, set_lock/3,
+ del_lock/1, del_lock/2,
+ trans/2, trans/3, trans/4,
+ random_exit_name/3, random_notify_name/3, notify_all_name/3]).
+
+%% Internal exports
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3, resolve_it/4]).
+
+-export([info/0]).
+
+-include_lib("stdlib/include/ms_transform.hrl").
+
+%% Set this variable to 'allow' to allow several names of a process.
+%% This is for backward compatibility only; the functionality is broken.
+-define(WARN_DUPLICATED_NAME, global_multi_name_action).
+
+%% Undocumented Kernel variable. Set this to 0 (zero) to get the old
+%% behaviour.
+-define(N_CONNECT_RETRIES, global_connect_retries).
+-define(DEFAULT_N_CONNECT_RETRIES, 5).
+
+%%% In certain places in the server, calling io:format hangs everything,
+%%% so we'd better use erlang:display/1.
+%%% my_tracer is used in testsuites
+-define(trace(_), ok).
+
+%-define(trace(T), (catch my_tracer ! {node(), {line,?LINE}, T})).
+
+%-define(trace(T), erlang:display({format, node(), cs(), T})).
+%cs() ->
+% {_Big, Small, Tiny} = now(),
+% (Small rem 100) * 100 + (Tiny div 10000).
+
+%% These are the protocol versions:
+%% Vsn 1 is the original protocol.
+%% Vsn 2 is enhanced with code to take care of registration of names from
+%% non erlang nodes, e.g. C-nodes.
+%% Vsn 3 is enhanced with a tag in the synch messages to distinguish
+%% different synch sessions from each other, see OTP-2766.
+%% Vsn 4 uses a single, permanent, locker process, but works like vsn 3
+%% when communicating with vsn 3 nodes. (-R10B)
+%% Vsn 5 uses an ordered list of self() and HisTheLocker when locking
+%% nodes in the own partition. (R11B-)
+
+%% Current version of global does not support vsn 4 or earlier.
+
+-define(vsn, 5).
+
+%%-----------------------------------------------------------------
+%% connect_all = boolean() - true if we are supposed to set up a
+%% fully connected net
+%% known = [Node] - all nodes known to us
+%% synced = [Node] - all nodes that have the same names as us
+%% resolvers = [{Node, MyTag, Resolver}] -
+%% the tag separating different synch sessions,
+%% and the pid of the name resolver process
+%% syncers = [pid()] - all current syncers processes
+%% node_name = atom() - our node name (can change if distribution
+%% is started/stopped dynamically)
+%%
+%% In addition to these, we keep info about messages arrived in
+%% the process dictionary:
+%% {pre_connect, Node} = {Vsn, InitMsg} - init_connect msgs that
+%% arrived before nodeup
+%% {wait_lock, Node} = {exchange, NameList, _NamelistExt} | lock_is_set
+%% - see comment below (handle_cast)
+%% {save_ops, Node} = {resolved, HisKnown, NamesExt, Res} | [operation()]
+%% - save the ops between exchange and resolved
+%% {prot_vsn, Node} = Vsn - the exchange protocol version (not used now)
+%% {sync_tag_my, Node} = My tag, used at synchronization with Node
+%% {sync_tag_his, Node} = The Node's tag, used at synchronization
+%% {lock_id, Node} = The resource locking the partitions
+%%-----------------------------------------------------------------
+-record(state, {connect_all :: boolean(),
+ known = [] :: [node()],
+ synced = [] :: [node()],
+ resolvers = [],
+ syncers = [] :: [pid()],
+ node_name = node() :: node(),
+ the_locker, the_deleter, the_registrar, trace,
+ global_lock_down = false
+ }).
+
+%%% There are also ETS tables used for bookkeeping of locks and names
+%%% (the first position is the key):
+%%%
+%%% global_locks (set): {ResourceId, LockRequesterId, [{Pid,RPid,ref()]}
+%%% Pid is locking ResourceId, ref() is the monitor ref.
+%%% RPid =/= Pid if there is an extra process calling erlang:monitor().
+%%% global_names (set): {Name, Pid, Method, RPid, ref()}
+%%% Registered names. ref() is the monitor ref.
+%%% RPid =/= Pid if there is an extra process calling erlang:monitor().
+%%% global_names_ext (set): {Name, Pid, RegNode}
+%%% External registered names (C-nodes).
+%%% (The RPid:s can be removed when/if erlang:monitor() returns before
+%%% trying to connect to the other node.)
+%%%
+%%% Helper tables:
+%%% global_pid_names (bag): {Pid, Name} | {ref(), Name}
+%%% Name(s) registered for Pid.
+%%% There is one {Pid, Name} and one {ref(), Name} for every Pid.
+%%% ref() is the same ref() as in global_names.
+%%% global_pid_ids (bag): {Pid, ResourceId} | {ref(), ResourceId}
+%%% Resources locked by Pid.
+%%% ref() is the same ref() as in global_locks.
+%%%
+%%% global_pid_names is a 'bag' for backward compatibility.
+%%% (Before vsn 5 more than one name could be registered for a process.)
+%%%
+%%% R11B-3 (OTP-6341): The list of pids in the table 'global_locks'
+%%% was replaced by a list of {Pid, Ref}, where Ref is a monitor ref.
+%%% It was necessary to use monitors to fix bugs regarding locks that
+%%% were never removed. The signal {async_del_lock, ...} has been
+%%% kept for backward compatibility. It can be removed later.
+%%%
+%%% R11B-4 (OTP-6428): Monitors are used for registered names.
+%%% The signal {delete_name, ...} has been kept for backward compatibility.
+%%% It can be removed later as can the deleter process.
+%%% An extra process calling erlang:monitor() is sometimes created.
+%%% The new_nodes messages has been augmented with the global lock id.
+
+start() ->
+ gen_server:start({local, global_name_server}, ?MODULE, [], []).
+
+start_link() ->
+ gen_server:start_link({local, global_name_server}, ?MODULE, [], []).
+
+stop() ->
+ gen_server:call(global_name_server, stop, infinity).
+
+-spec sync() -> 'ok' | {'error', term()}.
+sync() ->
+ case check_sync_nodes() of
+ {error, _} = Error ->
+ Error;
+ SyncNodes ->
+ gen_server:call(global_name_server, {sync, SyncNodes}, infinity)
+ end.
+
+-spec sync([node()]) -> 'ok' | {'error', term()}.
+sync(Nodes) ->
+ case check_sync_nodes(Nodes) of
+ {error, _} = Error ->
+ Error;
+ SyncNodes ->
+ gen_server:call(global_name_server, {sync, SyncNodes}, infinity)
+ end.
+
+-spec send(term(), term()) -> pid().
+send(Name, Msg) ->
+ case whereis_name(Name) of
+ Pid when is_pid(Pid) ->
+ Pid ! Msg,
+ Pid;
+ undefined ->
+ exit({badarg, {Name, Msg}})
+ end.
+
+%% See OTP-3737.
+-spec whereis_name(term()) -> pid() | 'undefined'.
+whereis_name(Name) ->
+ where(Name).
+
+-spec safe_whereis_name(term()) -> pid() | 'undefined'.
+safe_whereis_name(Name) ->
+ gen_server:call(global_name_server, {whereis, Name}, infinity).
+
+node_disconnected(Node) ->
+ global_name_server ! {nodedown, Node}.
+
+%%-----------------------------------------------------------------
+%% Method = function(Name, Pid1, Pid2) -> Pid | Pid2 | none
+%% Method is called if a name conflict is detected when two nodes
+%% are connecting to each other. It is supposed to return one of
+%% the Pids or 'none'. If a pid is returned, that pid is
+%% registered as Name on all nodes. If 'none' is returned, the
+%% Name is unregistered on all nodes. If anything else is returned,
+%% the Name is unregistered as well.
+%% Method is called once at one of the nodes where the processes reside
+%% only. If different Methods are used for the same name, it is
+%% undefined which one of them is used.
+%% Method blocks the name registration, but does not affect global locking.
+%%-----------------------------------------------------------------
+-spec register_name(term(), pid()) -> 'yes' | 'no'.
+register_name(Name, Pid) when is_pid(Pid) ->
+ register_name(Name, Pid, fun random_exit_name/3).
+
+-type method() :: fun((term(), pid(), pid()) -> pid() | 'none').
+
+-spec register_name(term(), pid(), method()) -> 'yes' | 'no'.
+register_name(Name, Pid, Method) when is_pid(Pid) ->
+ Fun = fun(Nodes) ->
+ case (where(Name) =:= undefined) andalso check_dupname(Name, Pid) of
+ true ->
+ gen_server:multi_call(Nodes,
+ global_name_server,
+ {register, Name, Pid, Method}),
+ yes;
+ _ ->
+ no
+ end
+ end,
+ ?trace({register_name, self(), Name, Pid, Method}),
+ gen_server:call(global_name_server, {registrar, Fun}, infinity).
+
+check_dupname(Name, Pid) ->
+ case ets:lookup(global_pid_names, Pid) of
+ [] ->
+ true;
+ PidNames ->
+ case application:get_env(kernel, ?WARN_DUPLICATED_NAME) of
+ {ok, allow} ->
+ true;
+ _ ->
+ S = "global: ~w registered under several names: ~w\n",
+ Names = [Name | [Name1 || {_Pid, Name1} <- PidNames]],
+ error_logger:error_msg(S, [Pid, Names]),
+ false
+ end
+ end.
+
+-spec unregister_name(term()) -> _.
+unregister_name(Name) ->
+ case where(Name) of
+ undefined ->
+ ok;
+ _ ->
+ Fun = fun(Nodes) ->
+ gen_server:multi_call(Nodes,
+ global_name_server,
+ {unregister, Name}),
+ ok
+ end,
+ ?trace({unregister_name, self(), Name}),
+ gen_server:call(global_name_server, {registrar, Fun}, infinity)
+ end.
+
+-spec re_register_name(term(), pid()) -> _.
+re_register_name(Name, Pid) when is_pid(Pid) ->
+ re_register_name(Name, Pid, fun random_exit_name/3).
+
+-spec re_register_name(term(), pid(), method()) -> _.
+re_register_name(Name, Pid, Method) when is_pid(Pid) ->
+ Fun = fun(Nodes) ->
+ gen_server:multi_call(Nodes,
+ global_name_server,
+ {register, Name, Pid, Method}),
+ yes
+ end,
+ ?trace({re_register_name, self(), Name, Pid, Method}),
+ gen_server:call(global_name_server, {registrar, Fun}, infinity).
+
+-spec registered_names() -> [term()].
+registered_names() ->
+ MS = ets:fun2ms(fun({Name,_Pid,_M,_RP,_R}) -> Name end),
+ ets:select(global_names, MS).
+
+%%-----------------------------------------------------------------
+%% The external node (e.g. a C-node) registers the name on an Erlang
+%% node which links to the process (an Erlang node has to be used
+%% since there is no global_name_server on the C-node). If the Erlang
+%% node dies the name is to be unregistered on all nodes. Normally
+%% node(Pid) is compared to the node that died, but that does not work
+%% for external nodes (the process does not run on the Erlang node
+%% that died). Therefore a table of all names registered by external
+%% nodes is kept up-to-date on all nodes.
+%%
+%% Note: if the Erlang node dies an EXIT signal is also sent to the
+%% C-node due to the link between the global_name_server and the
+%% registered process. [This is why the link has been kept despite
+%% the fact that monitors do the job now.]
+%%-----------------------------------------------------------------
+register_name_external(Name, Pid) when is_pid(Pid) ->
+ register_name_external(Name, Pid, fun random_exit_name/3).
+
+register_name_external(Name, Pid, Method) when is_pid(Pid) ->
+ Fun = fun(Nodes) ->
+ case where(Name) of
+ undefined ->
+ gen_server:multi_call(Nodes,
+ global_name_server,
+ {register_ext, Name, Pid,
+ Method, node()}),
+ yes;
+ _Pid -> no
+ end
+ end,
+ ?trace({register_name_external, self(), Name, Pid, Method}),
+ gen_server:call(global_name_server, {registrar, Fun}, infinity).
+
+unregister_name_external(Name) ->
+ unregister_name(Name).
+
+-type id() :: {term(), term()}.
+
+-spec set_lock(id()) -> boolean().
+set_lock(Id) ->
+ set_lock(Id, [node() | nodes()], infinity, 1).
+
+-type retries() :: non_neg_integer() | 'infinity'.
+
+-spec set_lock(id(), [node()]) -> boolean().
+set_lock(Id, Nodes) ->
+ set_lock(Id, Nodes, infinity, 1).
+
+-spec set_lock(id(), [node()], retries()) -> boolean().
+set_lock(Id, Nodes, Retries) when is_integer(Retries), Retries >= 0 ->
+ set_lock(Id, Nodes, Retries, 1);
+set_lock(Id, Nodes, infinity) ->
+ set_lock(Id, Nodes, infinity, 1).
+
+set_lock({_ResourceId, _LockRequesterId}, [], _Retries, _Times) ->
+ true;
+set_lock({_ResourceId, _LockRequesterId} = Id, Nodes, Retries, Times) ->
+ ?trace({set_lock,{me,self()},Id,{nodes,Nodes},
+ {retries,Retries}, {times,Times}}),
+ case set_lock_on_nodes(Id, Nodes) of
+ true ->
+ ?trace({set_lock_true, Id}),
+ true;
+ false=Reply when Retries =:= 0 ->
+ Reply;
+ false ->
+ random_sleep(Times),
+ set_lock(Id, Nodes, dec(Retries), Times+1)
+ end.
+
+-spec del_lock(id()) -> 'true'.
+del_lock(Id) ->
+ del_lock(Id, [node() | nodes()]).
+
+-spec del_lock(id(), [node()]) -> 'true'.
+del_lock({_ResourceId, _LockRequesterId} = Id, Nodes) ->
+ ?trace({del_lock, {me,self()}, Id, {nodes,Nodes}}),
+ gen_server:multi_call(Nodes, global_name_server, {del_lock, Id}),
+ true.
+
+-type trans_fun() :: function() | {module(), atom()}.
+
+-spec trans(id(), trans_fun()) -> term().
+trans(Id, Fun) -> trans(Id, Fun, [node() | nodes()], infinity).
+
+-spec trans(id(), trans_fun(), [node()]) -> term().
+trans(Id, Fun, Nodes) -> trans(Id, Fun, Nodes, infinity).
+
+-spec trans(id(), trans_fun(), [node()], retries()) -> term().
+trans(Id, Fun, Nodes, Retries) ->
+ case set_lock(Id, Nodes, Retries) of
+ true ->
+ try
+ Fun()
+ after
+ del_lock(Id, Nodes)
+ end;
+ false ->
+ aborted
+ end.
+
+info() ->
+ gen_server:call(global_name_server, info, infinity).
+
+%%%-----------------------------------------------------------------
+%%% Call-back functions from gen_server
+%%%-----------------------------------------------------------------
+init([]) ->
+ process_flag(trap_exit, true),
+ _ = ets:new(global_locks, [set, named_table, protected]),
+ _ = ets:new(global_names, [set, named_table, protected]),
+ _ = ets:new(global_names_ext, [set, named_table, protected]),
+
+ _ = ets:new(global_pid_names, [bag, named_table, protected]),
+ _ = ets:new(global_pid_ids, [bag, named_table, protected]),
+
+ %% This is for troubleshooting only.
+ DoTrace = os:getenv("GLOBAL_HIGH_LEVEL_TRACE") =:= "TRUE",
+ T0 = case DoTrace of
+ true ->
+ send_high_level_trace(),
+ [];
+ false ->
+ no_trace
+ end,
+
+ S = #state{the_locker = start_the_locker(DoTrace),
+ trace = T0,
+ the_deleter = start_the_deleter(self()),
+ the_registrar = start_the_registrar()},
+ S1 = trace_message(S, {init, node()}, []),
+
+ case init:get_argument(connect_all) of
+ {ok, [["false"]]} ->
+ {ok, S1#state{connect_all = false}};
+ _ ->
+ {ok, S1#state{connect_all = true}}
+ end.
+
+%%-----------------------------------------------------------------
+%% Connection algorithm
+%% ====================
+%% This algorithm solves the problem with partitioned nets as well.
+%%
+%% The main idea in the algorithm is that when two nodes connect, they
+%% try to set a lock in their own partition (i.e. all nodes already
+%% known to them; partitions are not necessarily disjoint). When the
+%% lock is set in each partition, these two nodes send each other a
+%% list with all registered names in resp partition (*). If no conflict
+%% is found, the name tables are just updated. If a conflict is found,
+%% a resolve function is called once for each conflict. The result of
+%% the resolving is sent to the other node. When the names are
+%% exchanged, all other nodes in each partition are informed of the
+%% other nodes, and they ping each other to form a fully connected
+%% net.
+%%
+%% A few remarks:
+%%
+%% (*) When this information is being exchanged, no one is allowed to
+%% change the global register table. All calls to register etc are
+%% protected by a lock. If a registered process dies during this
+%% phase the name is unregistered on the local node immediately,
+%% but the unregistration on other nodes will take place when the
+%% deleter manages to acquire the lock. This is necessary to
+%% prevent names from spreading to nodes where they cannot be
+%% deleted.
+%%
+%% - It is assumed that nodeups and nodedowns arrive in an orderly
+%% fashion: for every node, nodeup is followed by nodedown, and vice
+%% versa. "Double" nodeups and nodedowns must never occur. It is
+%% the responsibility of net_kernel to assure this.
+%%
+%% - There is always a delay between the termination of a registered
+%% process and the removal of the name from Global's tables. This
+%% delay can sometimes be quite substantial. Global guarantees that
+%% the name will eventually be removed, but there is no
+%% synchronization between nodes; the name can be removed from some
+%% node(s) long before it is removed from other nodes. Using
+%% safe_whereis_name is no cure.
+%%
+%% - Global cannot handle problems with the distribution very well.
+%% Depending on the value of the kernel variable 'net_ticktime' long
+%% delays may occur. This does not affect the handling of locks but
+%% will block name registration.
+%%
+%% - Old synch session messages may linger on in the message queue of
+%% global_name_server after the sending node has died. The tags of
+%% such messages do not match the current tag (if there is one),
+%% which makes it possible to discard those messages and cancel the
+%% corresponding lock.
+%%
+%% Suppose nodes A and B connect, and C is connected to A.
+%% Here's the algorithm's flow:
+%%
+%% Node A
+%% ------
+%% << {nodeup, B}
+%% TheLocker ! {nodeup, ..., Node, ...} (there is one locker per node)
+%% B ! {init_connect, ..., {..., TheLockerAtA, ...}}
+%% << {init_connect, TheLockerAtB}
+%% [The lockers try to set the lock]
+%% << {lock_is_set, B, ...}
+%% [Now, lock is set in both partitions]
+%% B ! {exchange, A, Names, ...}
+%% << {exchange, B, Names, ...}
+%% [solve conflict]
+%% B ! {resolved, A, ResolvedA, KnownAtA, ...}
+%% << {resolved, B, ResolvedB, KnownAtB, ...}
+%% C ! {new_nodes, ResolvedAandB, [B]}
+%%
+%% Node C
+%% ------
+%% << {new_nodes, ResolvedOps, NewNodes}
+%% [insert Ops]
+%% ping(NewNodes)
+%% << {nodeup, B}
+%% <ignore this one>
+%%
+%% Several things can disturb this picture.
+%%
+%% First, the init_connect message may arrive _before_ the nodeup
+%% message due to delay in net_kernel. We handle this by keeping track
+%% of these messages in the pre_connect variable in our state.
+%%
+%% Of course we must handle that some node goes down during the
+%% connection.
+%%
+%%-----------------------------------------------------------------
+%% Messages in the protocol
+%% ========================
+%% 1. Between global_name_servers on connecting nodes
+%% {init_connect, Vsn, Node, InitMsg}
+%% InitMsg = {locker, _Unused, HisKnown, HisTheLocker}
+%% {exchange, Node, ListOfNames, _ListOfNamesExt, Tag}
+%% {resolved, Node, HisOps, HisKnown, _Unused, ListOfNamesExt, Tag}
+%% HisKnown = list of known nodes in Node's partition
+%% 2. Between lockers on connecting nodes
+%% {his_locker, Pid} (from our global)
+%% {lock, Bool} loop until both lockers have lock = true,
+%% then send to global_name_server {lock_is_set, Node, Tag}
+%% 3. Connecting node's global_name_server informs other nodes in the same
+%% partition about hitherto unknown nodes in the other partition
+%% {new_nodes, Node, Ops, ListOfNamesExt, NewNodes, ExtraInfo}
+%% 4. Between global_name_server and resolver
+%% {resolve, NameList, Node} to resolver
+%% {exchange_ops, Node, Tag, Ops, Resolved} from resolver
+%% 5. sync protocol, between global_name_servers in different partitions
+%% {in_sync, Node, IsKnown}
+%% sent by each node to all new nodes (Node becomes known to them)
+%%-----------------------------------------------------------------
+
+handle_call({whereis, Name}, From, S) ->
+ do_whereis(Name, From),
+ {noreply, S};
+
+handle_call({registrar, Fun}, From, S) ->
+ S#state.the_registrar ! {trans_all_known, Fun, From},
+ {noreply, S};
+
+%% The pattern {register,'_','_','_'} is traced by the inviso
+%% application. Do not change.
+handle_call({register, Name, Pid, Method}, {FromPid, _Tag}, S0) ->
+ S = ins_name(Name, Pid, Method, FromPid, [], S0),
+ {reply, yes, S};
+
+handle_call({unregister, Name}, _From, S0) ->
+ S = delete_global_name2(Name, S0),
+ {reply, ok, S};
+
+handle_call({register_ext, Name, Pid, Method, RegNode}, {FromPid,_Tag}, S0) ->
+ S = ins_name_ext(Name, Pid, Method, RegNode, FromPid, [], S0),
+ {reply, yes, S};
+
+handle_call({set_lock, Lock}, {Pid, _Tag}, S0) ->
+ {Reply, S} = handle_set_lock(Lock, Pid, S0),
+ {reply, Reply, S};
+
+handle_call({del_lock, Lock}, {Pid, _Tag}, S0) ->
+ S = handle_del_lock(Lock, Pid, S0),
+ {reply, true, S};
+
+handle_call(get_known, _From, S) ->
+ {reply, S#state.known, S};
+
+handle_call(get_synced, _From, S) ->
+ {reply, S#state.synced, S};
+
+handle_call({sync, Nodes}, From, S) ->
+ %% If we have several global groups, this won't work, since we will
+ %% do start_sync on a nonempty list of nodes even if the system
+ %% is quiet.
+ Pid = start_sync(lists:delete(node(), Nodes) -- S#state.synced, From),
+ {noreply, S#state{syncers = [Pid | S#state.syncers]}};
+
+handle_call(get_protocol_version, _From, S) ->
+ {reply, ?vsn, S};
+
+handle_call(get_names_ext, _From, S) ->
+ {reply, get_names_ext(), S};
+
+handle_call(info, _From, S) ->
+ {reply, S, S};
+
+%% "High level trace". For troubleshooting only.
+handle_call(high_level_trace_start, _From, S) ->
+ S#state.the_locker ! {do_trace, true},
+ send_high_level_trace(),
+ {reply, ok, trace_message(S#state{trace = []}, {init, node()}, [])};
+handle_call(high_level_trace_stop, _From, S) ->
+ #state{the_locker = TheLocker, trace = Trace} = S,
+ TheLocker ! {do_trace, false},
+ wait_high_level_trace(),
+ {reply, Trace, S#state{trace = no_trace}};
+handle_call(high_level_trace_get, _From, #state{trace = Trace}=S) ->
+ {reply, Trace, S#state{trace = []}};
+
+handle_call(stop, _From, S) ->
+ {stop, normal, stopped, S};
+
+handle_call(Request, From, S) ->
+ error_logger:warning_msg("The global_name_server "
+ "received an unexpected message:\n"
+ "handle_call(~p, ~p, _)\n",
+ [Request, From]),
+ {noreply, S}.
+
+%%========================================================================
+%% init_connect
+%%
+%%========================================================================
+handle_cast({init_connect, Vsn, Node, InitMsg}, S) ->
+ %% Sent from global_name_server at Node.
+ ?trace({'####', init_connect, {vsn, Vsn}, {node,Node},{initmsg,InitMsg}}),
+ case Vsn of
+ %% It is always the responsibility of newer versions to understand
+ %% older versions of the protocol.
+ {HisVsn, HisTag} when HisVsn > ?vsn ->
+ init_connect(?vsn, Node, InitMsg, HisTag, S#state.resolvers, S);
+ {HisVsn, HisTag} ->
+ init_connect(HisVsn, Node, InitMsg, HisTag, S#state.resolvers, S);
+ %% To be future compatible
+ Tuple when is_tuple(Tuple) ->
+ List = tuple_to_list(Tuple),
+ [_HisVsn, HisTag | _] = List,
+ %% use own version handling if his is newer.
+ init_connect(?vsn, Node, InitMsg, HisTag, S#state.resolvers, S);
+ _ ->
+ Txt = io_lib:format("Illegal global protocol version ~p Node: ~p\n",
+ [Vsn, Node]),
+ error_logger:info_report(lists:flatten(Txt))
+ end,
+ {noreply, S};
+
+%%=======================================================================
+%% lock_is_set
+%%
+%% Ok, the lock is now set on both partitions. Send our names to other node.
+%%=======================================================================
+handle_cast({lock_is_set, Node, MyTag, LockId}, S) ->
+ %% Sent from the_locker at node().
+ ?trace({'####', lock_is_set , {node,Node}}),
+ case get({sync_tag_my, Node}) of
+ MyTag ->
+ lock_is_set(Node, S#state.resolvers, LockId),
+ {noreply, S};
+ _ -> %% Illegal tag, delete the old sync session.
+ NewS = cancel_locker(Node, S, MyTag),
+ {noreply, NewS}
+ end;
+
+%%========================================================================
+%% exchange
+%%
+%% Here the names are checked to detect name clashes.
+%%========================================================================
+handle_cast({exchange, Node, NameList, _NameExtList, MyTag}, S) ->
+ %% Sent from global_name_server at Node.
+ case get({sync_tag_my, Node}) of
+ MyTag ->
+ exchange(Node, NameList, S#state.resolvers),
+ {noreply, S};
+ _ -> %% Illegal tag, delete the old sync session.
+ NewS = cancel_locker(Node, S, MyTag),
+ {noreply, NewS}
+ end;
+
+%% {exchange_ops, ...} is sent by the resolver process (which then
+%% dies). It could happen that {resolved, ...} has already arrived
+%% from the other node. In that case we can go ahead and run the
+%% resolve operations. Otherwise we have to save the operations and
+%% wait for {resolve, ...}. This is very much like {lock_is_set, ...}
+%% and {exchange, ...}.
+handle_cast({exchange_ops, Node, MyTag, Ops, Resolved}, S0) ->
+ %% Sent from the resolver for Node at node().
+ ?trace({exchange_ops, {node,Node}, {ops,Ops},{resolved,Resolved},
+ {mytag,MyTag}}),
+ S = trace_message(S0, {exit_resolver, Node}, [MyTag]),
+ case get({sync_tag_my, Node}) of
+ MyTag ->
+ Known = S#state.known,
+ gen_server:cast({global_name_server, Node},
+ {resolved, node(), Resolved, Known,
+ Known,get_names_ext(),get({sync_tag_his,Node})}),
+ case get({save_ops, Node}) of
+ {resolved, HisKnown, Names_ext, HisResolved} ->
+ put({save_ops, Node}, Ops),
+ NewS = resolved(Node, HisResolved, HisKnown, Names_ext,S),
+ {noreply, NewS};
+ undefined ->
+ put({save_ops, Node}, Ops),
+ {noreply, S}
+ end;
+ _ -> %% Illegal tag, delete the old sync session.
+ NewS = cancel_locker(Node, S, MyTag),
+ {noreply, NewS}
+ end;
+
+%%========================================================================
+%% resolved
+%%
+%% Here the name clashes are resolved.
+%%========================================================================
+handle_cast({resolved, Node, HisResolved, HisKnown, _HisKnown_v2,
+ Names_ext, MyTag}, S) ->
+ %% Sent from global_name_server at Node.
+ ?trace({'####', resolved, {his_resolved,HisResolved}, {node,Node}}),
+ case get({sync_tag_my, Node}) of
+ MyTag ->
+ %% See the comment at handle_case({exchange_ops, ...}).
+ case get({save_ops, Node}) of
+ Ops when is_list(Ops) ->
+ NewS = resolved(Node, HisResolved, HisKnown, Names_ext, S),
+ {noreply, NewS};
+ undefined ->
+ Resolved = {resolved, HisKnown, Names_ext, HisResolved},
+ put({save_ops, Node}, Resolved),
+ {noreply, S}
+ end;
+ _ -> %% Illegal tag, delete the old sync session.
+ NewS = cancel_locker(Node, S, MyTag),
+ {noreply, NewS}
+ end;
+
+%%========================================================================
+%% new_nodes
+%%
+%% We get to know the other node's known nodes.
+%%========================================================================
+handle_cast({new_nodes, Node, Ops, Names_ext, Nodes, ExtraInfo}, S) ->
+ %% Sent from global_name_server at Node.
+ ?trace({new_nodes, {node,Node},{ops,Ops},{nodes,Nodes},{x,ExtraInfo}}),
+ NewS = new_nodes(Ops, Node, Names_ext, Nodes, ExtraInfo, S),
+ {noreply, NewS};
+
+%%========================================================================
+%% in_sync
+%%
+%% We are in sync with this node (from the other node's known world).
+%%========================================================================
+handle_cast({in_sync, Node, _IsKnown}, S) ->
+ %% Sent from global_name_server at Node (in the other partition).
+ ?trace({'####', in_sync, {Node, _IsKnown}}),
+ lists:foreach(fun(Pid) -> Pid ! {synced, [Node]} end, S#state.syncers),
+ NewS = cancel_locker(Node, S, get({sync_tag_my, Node})),
+ reset_node_state(Node),
+ NSynced = case lists:member(Node, Synced = NewS#state.synced) of
+ true -> Synced;
+ false -> [Node | Synced]
+ end,
+ {noreply, NewS#state{synced = NSynced}};
+
+%% Called when Pid on other node crashed
+handle_cast({async_del_name, _Name, _Pid}, S) ->
+ %% Sent from the_deleter at some node in the partition but node().
+ %% The DOWN message deletes the name.
+ {noreply, S};
+
+handle_cast({async_del_lock, _ResourceId, _Pid}, S) ->
+ %% Sent from global_name_server at some node in the partition but node().
+ %% The DOWN message deletes the lock.
+ {noreply, S};
+
+handle_cast(Request, S) ->
+ error_logger:warning_msg("The global_name_server "
+ "received an unexpected message:\n"
+ "handle_cast(~p, _)\n", [Request]),
+ {noreply, S}.
+
+handle_info({'EXIT', Deleter, _Reason}=Exit, #state{the_deleter=Deleter}=S) ->
+ {stop, {deleter_died,Exit}, S#state{the_deleter=undefined}};
+handle_info({'EXIT', Locker, _Reason}=Exit, #state{the_locker=Locker}=S) ->
+ {stop, {locker_died,Exit}, S#state{the_locker=undefined}};
+handle_info({'EXIT', Registrar, _}=Exit, #state{the_registrar=Registrar}=S) ->
+ {stop, {registrar_died,Exit}, S#state{the_registrar=undefined}};
+handle_info({'EXIT', Pid, _Reason}, S) when is_pid(Pid) ->
+ ?trace({global_EXIT,_Reason,Pid}),
+ %% The process that died was a synch process started by start_sync
+ %% or a registered process running on an external node (C-node).
+ %% Links to external names are ignored here (there are also DOWN
+ %% signals).
+ Syncers = lists:delete(Pid, S#state.syncers),
+ {noreply, S#state{syncers = Syncers}};
+
+handle_info({nodedown, Node}, S) when Node =:= S#state.node_name ->
+ %% Somebody stopped the distribution dynamically - change
+ %% references to old node name (Node) to new node name ('nonode@nohost')
+ {noreply, change_our_node_name(node(), S)};
+
+handle_info({nodedown, Node}, S0) ->
+ ?trace({'####', nodedown, {node,Node}}),
+ S1 = trace_message(S0, {nodedown, Node}, []),
+ S = handle_nodedown(Node, S1),
+ {noreply, S};
+
+handle_info({extra_nodedown, Node}, S0) ->
+ ?trace({'####', extra_nodedown, {node,Node}}),
+ S1 = trace_message(S0, {extra_nodedown, Node}, []),
+ S = handle_nodedown(Node, S1),
+ {noreply, S};
+
+handle_info({nodeup, Node}, S) when Node =:= node() ->
+ ?trace({'####', local_nodeup, {node, Node}}),
+ %% Somebody started the distribution dynamically - change
+ %% references to old node name ('nonode@nohost') to Node.
+ {noreply, change_our_node_name(Node, S)};
+
+handle_info({nodeup, _Node}, S) when not S#state.connect_all ->
+ {noreply, S};
+
+handle_info({nodeup, Node}, S0) when S0#state.connect_all ->
+ IsKnown = lists:member(Node, S0#state.known) or
+ %% This one is only for double nodeups (shouldn't occur!)
+ lists:keymember(Node, 1, S0#state.resolvers),
+ ?trace({'####', nodeup, {node,Node}, {isknown,IsKnown}}),
+ S1 = trace_message(S0, {nodeup, Node}, []),
+ case IsKnown of
+ true ->
+ {noreply, S1};
+ false ->
+ resend_pre_connect(Node),
+
+ %% now() is used as a tag to separate different synch sessions
+ %% from each others. Global could be confused at bursty nodeups
+ %% because it couldn't separate the messages between the different
+ %% synch sessions started by a nodeup.
+ MyTag = now(),
+ put({sync_tag_my, Node}, MyTag),
+ ?trace({sending_nodeup_to_locker, {node,Node},{mytag,MyTag}}),
+ S1#state.the_locker ! {nodeup, Node, MyTag},
+
+ %% In order to be compatible with unpatched R7 a locker
+ %% process was spawned. Vsn 5 is no longer compatible with
+ %% vsn 3 nodes, so the locker process is no longer needed.
+ %% The permanent locker takes its place.
+ NotAPid = no_longer_a_pid,
+ Locker = {locker, NotAPid, S1#state.known, S1#state.the_locker},
+ InitC = {init_connect, {?vsn, MyTag}, node(), Locker},
+ Rs = S1#state.resolvers,
+ ?trace({casting_init_connect, {node,Node},{initmessage,InitC},
+ {resolvers,Rs}}),
+ gen_server:cast({global_name_server, Node}, InitC),
+ Resolver = start_resolver(Node, MyTag),
+ S = trace_message(S1, {new_resolver, Node}, [MyTag, Resolver]),
+ {noreply, S#state{resolvers = [{Node, MyTag, Resolver} | Rs]}}
+ end;
+
+handle_info({whereis, Name, From}, S) ->
+ do_whereis(Name, From),
+ {noreply, S};
+
+handle_info(known, S) ->
+ io:format(">>>> ~p\n",[S#state.known]),
+ {noreply, S};
+
+%% "High level trace". For troubleshooting only.
+handle_info(high_level_trace, S) ->
+ case S of
+ #state{trace = [{Node, _Time, _M, Nodes, _X} | _]} ->
+ send_high_level_trace(),
+ CNode = node(),
+ CNodes = nodes(),
+ case {CNode, CNodes} of
+ {Node, Nodes} ->
+ {noreply, S};
+ _ ->
+ {New, _, Old} =
+ sofs:symmetric_partition(sofs:set([CNode|CNodes]),
+ sofs:set([Node|Nodes])),
+ M = {nodes_changed, {sofs:to_external(New),
+ sofs:to_external(Old)}},
+ {noreply, trace_message(S, M, [])}
+ end;
+ _ ->
+ {noreply, S}
+ end;
+handle_info({trace_message, M}, S) ->
+ {noreply, trace_message(S, M, [])};
+handle_info({trace_message, M, X}, S) ->
+ {noreply, trace_message(S, M, X)};
+
+handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S0) ->
+ S1 = delete_lock(MonitorRef, S0),
+ S = del_name(MonitorRef, S1),
+ {noreply, S};
+
+handle_info(Message, S) ->
+ error_logger:warning_msg("The global_name_server "
+ "received an unexpected message:\n"
+ "handle_info(~p, _)\n", [Message]),
+ {noreply, S}.
+
+
+%%========================================================================
+%%========================================================================
+%%=============================== Internal Functions =====================
+%%========================================================================
+%%========================================================================
+
+-define(HIGH_LEVEL_TRACE_INTERVAL, 500). % ms
+
+wait_high_level_trace() ->
+ receive
+ high_level_trace ->
+ ok
+ after ?HIGH_LEVEL_TRACE_INTERVAL+1 ->
+ ok
+ end.
+
+send_high_level_trace() ->
+ erlang:send_after(?HIGH_LEVEL_TRACE_INTERVAL, self(), high_level_trace).
+
+-define(GLOBAL_RID, global).
+
+%% Similar to trans(Id, Fun), but always uses global's own lock
+%% on all nodes known to global, making sure that no new nodes have
+%% become known while we got the list of known nodes.
+trans_all_known(Fun) ->
+ Id = {?GLOBAL_RID, self()},
+ Nodes = set_lock_known(Id, 0),
+ try
+ Fun(Nodes)
+ after
+ delete_global_lock(Id, Nodes)
+ end.
+
+set_lock_known(Id, Times) ->
+ Known = get_known(),
+ Nodes = [node() | Known],
+ Boss = the_boss(Nodes),
+ %% Use the same convention (a boss) as lock_nodes_safely. Optimization.
+ case set_lock_on_nodes(Id, [Boss]) of
+ true ->
+ case lock_on_known_nodes(Id, Known, Nodes) of
+ true ->
+ Nodes;
+ false ->
+ del_lock(Id, [Boss]),
+ random_sleep(Times),
+ set_lock_known(Id, Times+1)
+ end;
+ false ->
+ random_sleep(Times),
+ set_lock_known(Id, Times+1)
+ end.
+
+lock_on_known_nodes(Id, Known, Nodes) ->
+ case set_lock_on_nodes(Id, Nodes) of
+ true ->
+ (get_known() -- Known) =:= [];
+ false ->
+ false
+ end.
+
+set_lock_on_nodes(_Id, []) ->
+ true;
+set_lock_on_nodes(Id, Nodes) ->
+ case local_lock_check(Id, Nodes) of
+ true ->
+ Msg = {set_lock, Id},
+ {Replies, _} =
+ gen_server:multi_call(Nodes, global_name_server, Msg),
+ ?trace({set_lock,{me,self()},Id,{nodes,Nodes},{replies,Replies}}),
+ check_replies(Replies, Id, Replies);
+ false=Reply ->
+ Reply
+ end.
+
+%% Probe lock on local node to see if one should go on trying other nodes.
+local_lock_check(_Id, [_] = _Nodes) ->
+ true;
+local_lock_check(Id, Nodes) ->
+ not lists:member(node(), Nodes) orelse (can_set_lock(Id) =/= false).
+
+check_replies([{_Node, true} | T], Id, Replies) ->
+ check_replies(T, Id, Replies);
+check_replies([{_Node, false=Reply} | _T], _Id, [_]) ->
+ Reply;
+check_replies([{_Node, false=Reply} | _T], Id, Replies) ->
+ TrueReplyNodes = [N || {N, true} <- Replies],
+ ?trace({check_replies, {true_reply_nodes, TrueReplyNodes}}),
+ gen_server:multi_call(TrueReplyNodes, global_name_server, {del_lock, Id}),
+ Reply;
+check_replies([], _Id, _Replies) ->
+ true.
+
+%%========================================================================
+%% Another node wants to synchronize its registered names with us.
+%% Both nodes must have a lock before they are allowed to continue.
+%%========================================================================
+init_connect(Vsn, Node, InitMsg, HisTag, Resolvers, S) ->
+ %% It is always the responsibility of newer versions to understand
+ %% older versions of the protocol.
+ put({prot_vsn, Node}, Vsn),
+ put({sync_tag_his, Node}, HisTag),
+ case lists:keyfind(Node, 1, Resolvers) of
+ {Node, MyTag, _Resolver} ->
+ MyTag = get({sync_tag_my, Node}), % assertion
+ {locker, _NoLongerAPid, _HisKnown0, HisTheLocker} = InitMsg,
+ ?trace({init_connect,{histhelocker,HisTheLocker}}),
+ HisKnown = [],
+ S#state.the_locker ! {his_the_locker, HisTheLocker,
+ {Vsn,HisKnown}, S#state.known};
+ false ->
+ ?trace({init_connect,{pre_connect,Node},{histag,HisTag}}),
+ put({pre_connect, Node}, {Vsn, InitMsg, HisTag})
+ end.
+
+%%========================================================================
+%% In the simple case, we'll get lock_is_set before we get exchange,
+%% but we may get exchange before we get lock_is_set from our locker.
+%% If that's the case, we'll have to remember the exchange info, and
+%% handle it when we get the lock_is_set. We do this by using the
+%% process dictionary - when the lock_is_set msg is received, we store
+%% this info. When exchange is received, we can check the dictionary
+%% if the lock_is_set has been received. If not, we store info about
+%% the exchange instead. In the lock_is_set we must first check if
+%% exchange info is stored, in that case we take care of it.
+%%========================================================================
+lock_is_set(Node, Resolvers, LockId) ->
+ gen_server:cast({global_name_server, Node},
+ {exchange, node(), get_names(), _ExtNames = [],
+ get({sync_tag_his, Node})}),
+ put({lock_id, Node}, LockId),
+ %% If both have the lock, continue with exchange.
+ case get({wait_lock, Node}) of
+ {exchange, NameList} ->
+ put({wait_lock, Node}, lock_is_set),
+ exchange(Node, NameList, Resolvers);
+ undefined ->
+ put({wait_lock, Node}, lock_is_set)
+ end.
+
+%%========================================================================
+%% exchange
+%%========================================================================
+exchange(Node, NameList, Resolvers) ->
+ ?trace({'####', exchange, {node,Node}, {namelist,NameList},
+ {resolvers, Resolvers}}),
+ case erase({wait_lock, Node}) of
+ lock_is_set ->
+ {Node, _Tag, Resolver} = lists:keyfind(Node, 1, Resolvers),
+ Resolver ! {resolve, NameList, Node};
+ undefined ->
+ put({wait_lock, Node}, {exchange, NameList})
+ end.
+
+resolved(Node, HisResolved, HisKnown, Names_ext, S0) ->
+ Ops = erase({save_ops, Node}) ++ HisResolved,
+ %% Known may have shrunk since the lock was taken (due to nodedowns).
+ Known = S0#state.known,
+ Synced = S0#state.synced,
+ NewNodes = [Node | HisKnown],
+ sync_others(HisKnown),
+ ExtraInfo = [{vsn,get({prot_vsn, Node})}, {lock, get({lock_id, Node})}],
+ S = do_ops(Ops, node(), Names_ext, ExtraInfo, S0),
+ %% I am synced with Node, but not with HisKnown yet
+ lists:foreach(fun(Pid) -> Pid ! {synced, [Node]} end, S#state.syncers),
+ S3 = lists:foldl(fun(Node1, S1) ->
+ F = fun(Tag) -> cancel_locker(Node1,S1,Tag) end,
+ cancel_resolved_locker(Node1, F)
+ end, S, HisKnown),
+ %% The locker that took the lock is asked to send
+ %% the {new_nodes, ...} message. This ensures that
+ %% {del_lock, ...} is received after {new_nodes, ...}
+ %% (except when abcast spawns process(es)...).
+ NewNodesF = fun() ->
+ gen_server:abcast(Known, global_name_server,
+ {new_nodes, node(), Ops, Names_ext,
+ NewNodes, ExtraInfo})
+ end,
+ F = fun(Tag) -> cancel_locker(Node, S3, Tag, NewNodesF) end,
+ S4 = cancel_resolved_locker(Node, F),
+ %% See (*) below... we're node b in that description
+ AddedNodes = (NewNodes -- Known),
+ NewKnown = Known ++ AddedNodes,
+ S4#state.the_locker ! {add_to_known, AddedNodes},
+ NewS = trace_message(S4, {added, AddedNodes},
+ [{new_nodes, NewNodes}, {abcast, Known}, {ops,Ops}]),
+ NewS#state{known = NewKnown, synced = [Node | Synced]}.
+
+cancel_resolved_locker(Node, CancelFun) ->
+ Tag = get({sync_tag_my, Node}),
+ ?trace({calling_cancel_locker,Tag,get()}),
+ S = CancelFun(Tag),
+ reset_node_state(Node),
+ S.
+
+new_nodes(Ops, ConnNode, Names_ext, Nodes, ExtraInfo, S0) ->
+ Known = S0#state.known,
+ %% (*) This one requires some thought...
+ %% We're node a, other nodes b and c:
+ %% The problem is that {in_sync, a} may arrive before {resolved, [a]} to
+ %% b from c, leading to b sending {new_nodes, [a]} to us (node a).
+ %% Therefore, we make sure we never get duplicates in Known.
+ AddedNodes = lists:delete(node(), Nodes -- Known),
+ sync_others(AddedNodes),
+ S = do_ops(Ops, ConnNode, Names_ext, ExtraInfo, S0),
+ ?trace({added_nodes_in_sync,{added_nodes,AddedNodes}}),
+ S#state.the_locker ! {add_to_known, AddedNodes},
+ S1 = trace_message(S, {added, AddedNodes}, [{ops,Ops}]),
+ S1#state{known = Known ++ AddedNodes}.
+
+do_whereis(Name, From) ->
+ case is_global_lock_set() of
+ false ->
+ gen_server:reply(From, where(Name));
+ true ->
+ send_again({whereis, Name, From})
+ end.
+
+terminate(_Reason, _S) ->
+ true = ets:delete(global_names),
+ true = ets:delete(global_names_ext),
+ true = ets:delete(global_locks),
+ true = ets:delete(global_pid_names),
+ true = ets:delete(global_pid_ids).
+
+code_change(_OldVsn, S, _Extra) ->
+ {ok, S}.
+
+%% The resolver runs exchange_names in a separate process. The effect
+%% is that locks can be used at the same time as name resolution takes
+%% place.
+start_resolver(Node, MyTag) ->
+ spawn(fun() -> resolver(Node, MyTag) end).
+
+resolver(Node, Tag) ->
+ receive
+ {resolve, NameList, Node} ->
+ ?trace({resolver, {me,self()}, {node,Node}, {namelist,NameList}}),
+ {Ops, Resolved} = exchange_names(NameList, Node, [], []),
+ Exchange = {exchange_ops, Node, Tag, Ops, Resolved},
+ gen_server:cast(global_name_server, Exchange),
+ exit(normal);
+ _ -> % Ignore garbage.
+ resolver(Node, Tag)
+ end.
+
+resend_pre_connect(Node) ->
+ case erase({pre_connect, Node}) of
+ {Vsn, InitMsg, HisTag} ->
+ gen_server:cast(self(),
+ {init_connect, {Vsn, HisTag}, Node, InitMsg});
+ _ ->
+ ok
+ end.
+
+ins_name(Name, Pid, Method, FromPidOrNode, ExtraInfo, S0) ->
+ ?trace({ins_name,insert,{name,Name},{pid,Pid}}),
+ S1 = delete_global_name_keep_pid(Name, S0),
+ S = trace_message(S1, {ins_name, node(Pid)}, [Name, Pid]),
+ insert_global_name(Name, Pid, Method, FromPidOrNode, ExtraInfo, S).
+
+ins_name_ext(Name, Pid, Method, RegNode, FromPidOrNode, ExtraInfo, S0) ->
+ ?trace({ins_name_ext, {name,Name}, {pid,Pid}}),
+ S1 = delete_global_name_keep_pid(Name, S0),
+ dolink_ext(Pid, RegNode),
+ S = trace_message(S1, {ins_name_ext, node(Pid)}, [Name, Pid]),
+ true = ets:insert(global_names_ext, {Name, Pid, RegNode}),
+ insert_global_name(Name, Pid, Method, FromPidOrNode, ExtraInfo, S).
+
+where(Name) ->
+ case ets:lookup(global_names, Name) of
+ [{_Name, Pid, _Method, _RPid, _Ref}] -> Pid;
+ [] -> undefined
+ end.
+
+handle_set_lock(Id, Pid, S) ->
+ ?trace({handle_set_lock, Id, Pid}),
+ case can_set_lock(Id) of
+ {true, PidRefs} ->
+ case pid_is_locking(Pid, PidRefs) of
+ true ->
+ {true, S};
+ false ->
+ {true, insert_lock(Id, Pid, PidRefs, S)}
+ end;
+ false=Reply ->
+ {Reply, S}
+ end.
+
+can_set_lock({ResourceId, LockRequesterId}) ->
+ case ets:lookup(global_locks, ResourceId) of
+ [{ResourceId, LockRequesterId, PidRefs}] ->
+ {true, PidRefs};
+ [{ResourceId, _LockRequesterId2, _PidRefs}] ->
+ false;
+ [] ->
+ {true, []}
+ end.
+
+insert_lock({ResourceId, LockRequesterId}=Id, Pid, PidRefs, S) ->
+ {RPid, Ref} = do_monitor(Pid),
+ true = ets:insert(global_pid_ids, {Pid, ResourceId}),
+ true = ets:insert(global_pid_ids, {Ref, ResourceId}),
+ Lock = {ResourceId, LockRequesterId, [{Pid,RPid,Ref} | PidRefs]},
+ true = ets:insert(global_locks, Lock),
+ trace_message(S, {ins_lock, node(Pid)}, [Id, Pid]).
+
+is_global_lock_set() ->
+ is_lock_set(?GLOBAL_RID).
+
+is_lock_set(ResourceId) ->
+ ets:member(global_locks, ResourceId).
+
+handle_del_lock({ResourceId, LockReqId}, Pid, S0) ->
+ ?trace({handle_del_lock, {pid,Pid},{id,{ResourceId, LockReqId}}}),
+ case ets:lookup(global_locks, ResourceId) of
+ [{ResourceId, LockReqId, PidRefs}]->
+ remove_lock(ResourceId, LockReqId, Pid, PidRefs, false, S0);
+ _ -> S0
+ end.
+
+remove_lock(ResourceId, LockRequesterId, Pid, [{Pid,RPid,Ref}], Down, S0) ->
+ ?trace({remove_lock_1, {id,ResourceId},{pid,Pid}}),
+ true = erlang:demonitor(Ref, [flush]),
+ kill_monitor_proc(RPid, Pid),
+ true = ets:delete(global_locks, ResourceId),
+ true = ets:delete_object(global_pid_ids, {Pid, ResourceId}),
+ true = ets:delete_object(global_pid_ids, {Ref, ResourceId}),
+ S = case ResourceId of
+ ?GLOBAL_RID -> S0#state{global_lock_down = Down};
+ _ -> S0
+ end,
+ trace_message(S, {rem_lock, node(Pid)},
+ [{ResourceId, LockRequesterId}, Pid]);
+remove_lock(ResourceId, LockRequesterId, Pid, PidRefs0, _Down, S) ->
+ ?trace({remove_lock_2, {id,ResourceId},{pid,Pid}}),
+ PidRefs = case lists:keyfind(Pid, 1, PidRefs0) of
+ {Pid, RPid, Ref} ->
+ true = erlang:demonitor(Ref, [flush]),
+ kill_monitor_proc(RPid, Pid),
+ true = ets:delete_object(global_pid_ids,
+ {Ref, ResourceId}),
+ lists:keydelete(Pid, 1, PidRefs0);
+ false ->
+ PidRefs0
+ end,
+ Lock = {ResourceId, LockRequesterId, PidRefs},
+ true = ets:insert(global_locks, Lock),
+ true = ets:delete_object(global_pid_ids, {Pid, ResourceId}),
+ trace_message(S, {rem_lock, node(Pid)},
+ [{ResourceId, LockRequesterId}, Pid]).
+
+kill_monitor_proc(Pid, Pid) ->
+ ok;
+kill_monitor_proc(RPid, _Pid) ->
+ exit(RPid, kill).
+
+do_ops(Ops, ConnNode, Names_ext, ExtraInfo, S0) ->
+ ?trace({do_ops, {ops,Ops}}),
+
+ XInserts = [{Name, Pid, RegNode, Method} ||
+ {Name2, Pid2, RegNode} <- Names_ext,
+ {insert, {Name, Pid, Method}} <- Ops,
+ Name =:= Name2, Pid =:= Pid2],
+ S1 = lists:foldl(fun({Name, Pid, RegNode, Method}, S1) ->
+ ins_name_ext(Name, Pid, Method, RegNode,
+ ConnNode, ExtraInfo, S1)
+ end, S0, XInserts),
+
+ XNames = [Name || {Name, _Pid, _RegNode, _Method} <- XInserts],
+ Inserts = [{Name, Pid, node(Pid), Method} ||
+ {insert, {Name, Pid, Method}} <- Ops,
+ not lists:member(Name, XNames)],
+ S2 = lists:foldl(fun({Name, Pid, _RegNode, Method}, S2) ->
+ ins_name(Name, Pid, Method, ConnNode,
+ ExtraInfo, S2)
+ end, S1, Inserts),
+
+ DelNames = [Name || {delete, Name} <- Ops],
+ lists:foldl(fun(Name, S) -> delete_global_name2(Name, S)
+ end, S2, DelNames).
+
+%% It is possible that a node that was up and running when the
+%% operations were assembled has since died. The final {in_sync,...}
+%% messages do not generate nodedown messages for such nodes. To
+%% compensate "artificial" nodedown messages are created. Since
+%% monitor_node may take some time processes are spawned to avoid
+%% locking up the global_name_server. Should somehow double nodedown
+%% messages occur (one of them artificial), nothing bad can happen
+%% (the second nodedown is a no-op). It is assumed that there cannot
+%% be a nodeup before the artificial nodedown.
+%%
+%% The extra nodedown messages generated here also take care of the
+%% case that a nodedown message is received _before_ the operations
+%% are run.
+sync_others(Nodes) ->
+ N = case application:get_env(kernel, ?N_CONNECT_RETRIES) of
+ {ok, NRetries} when is_integer(NRetries),
+ NRetries >= 0 -> NRetries;
+ _ -> ?DEFAULT_N_CONNECT_RETRIES
+ end,
+ lists:foreach(fun(Node) ->
+ spawn(fun() -> sync_other(Node, N) end)
+ end, Nodes).
+
+sync_other(Node, N) ->
+ erlang:monitor_node(Node, true, [allow_passive_connect]),
+ receive
+ {nodedown, Node} when N > 0 ->
+ sync_other(Node, N - 1);
+ {nodedown, Node} ->
+ ?trace({missing_nodedown, {node, Node}}),
+ error_logger:warning_msg("global: ~w failed to connect to ~w\n",
+ [node(), Node]),
+ global_name_server ! {extra_nodedown, Node}
+ after 0 ->
+ gen_server:cast({global_name_server,Node}, {in_sync,node(),true})
+ end.
+ % monitor_node(Node, false),
+ % exit(normal).
+
+insert_global_name(Name, Pid, Method, FromPidOrNode, ExtraInfo, S) ->
+ {RPid, Ref} = do_monitor(Pid),
+ true = ets:insert(global_names, {Name, Pid, Method, RPid, Ref}),
+ true = ets:insert(global_pid_names, {Pid, Name}),
+ true = ets:insert(global_pid_names, {Ref, Name}),
+ case lock_still_set(FromPidOrNode, ExtraInfo, S) of
+ true ->
+ S;
+ false ->
+ %% The node that took the lock has gone down and then up
+ %% again. The {register, ...} or {new_nodes, ...} message
+ %% was delayed and arrived after nodeup (maybe it caused
+ %% the nodeup). The DOWN signal from the monitor of the
+ %% lock has removed the lock.
+ %% Note: it is assumed here that the DOWN signal arrives
+ %% _before_ nodeup and any message that caused nodeup.
+ %% This is true of Erlang/OTP.
+ delete_global_name2(Name, S)
+ end.
+
+lock_still_set(PidOrNode, ExtraInfo, S) ->
+ case ets:lookup(global_locks, ?GLOBAL_RID) of
+ [{?GLOBAL_RID, _LockReqId, PidRefs}] when is_pid(PidOrNode) ->
+ %% Name registration.
+ lists:keymember(PidOrNode, 1, PidRefs);
+ [{?GLOBAL_RID, LockReqId, PidRefs}] when is_atom(PidOrNode) ->
+ case extra_info(lock, ExtraInfo) of
+ {?GLOBAL_RID, LockId} -> % R11B-4 or later
+ LockReqId =:= LockId;
+ undefined ->
+ lock_still_set_old(PidOrNode, LockReqId, PidRefs)
+ end;
+ [] ->
+ %% If the global lock was not removed by a DOWN message
+ %% then we have a node that do not monitor locking pids
+ %% (pre R11B-3), or an R11B-3 node (which does not ensure
+ %% that {new_nodes, ...} arrives before {del_lock, ...}).
+ not S#state.global_lock_down
+ end.
+
+%%% The following is probably overkill. It is possible that this node
+%%% has been locked again, but it is a rare occasion.
+lock_still_set_old(_Node, ReqId, _PidRefs) when is_pid(ReqId) ->
+ %% Cannot do better than return true.
+ true;
+lock_still_set_old(Node, ReqId, PidRefs) when is_list(ReqId) ->
+ %% Connection, version > 4, but before R11B-4.
+ [P || {P, _RPid, _Ref} <- PidRefs, node(P) =:= Node] =/= [].
+
+extra_info(Tag, ExtraInfo) ->
+ %% ExtraInfo used to be a list of nodes (vsn 2).
+ case catch lists:keyfind(Tag, 1, ExtraInfo) of
+ {Tag, Info} ->
+ Info;
+ _ ->
+ undefined
+ end.
+
+del_name(Ref, S) ->
+ NameL = [{Name, Pid} ||
+ {_, Name} <- ets:lookup(global_pid_names, Ref),
+ {_, Pid, _Method, _RPid, Ref1} <-
+ ets:lookup(global_names, Name),
+ Ref1 =:= Ref],
+ ?trace({async_del_name, self(), NameL, Ref}),
+ case NameL of
+ [{Name, Pid}] ->
+ _ = del_names(Name, Pid, S),
+ delete_global_name2(Name, S);
+ [] ->
+ S
+ end.
+
+%% Send {async_del_name, ...} to old nodes (pre R11B-3).
+del_names(Name, Pid, S) ->
+ Send = case ets:lookup(global_names_ext, Name) of
+ [{Name, Pid, RegNode}] ->
+ RegNode =:= node();
+ [] ->
+ node(Pid) =:= node()
+ end,
+ if
+ Send ->
+ ?trace({del_names, {pid,Pid}, {name,Name}}),
+ S#state.the_deleter ! {delete_name, self(), Name, Pid};
+ true ->
+ ok
+ end.
+
+%% Keeps the entry in global_names for whereis_name/1.
+delete_global_name_keep_pid(Name, S) ->
+ case ets:lookup(global_names, Name) of
+ [{Name, Pid, _Method, RPid, Ref}] ->
+ delete_global_name2(Name, Pid, RPid, Ref, S);
+ [] ->
+ S
+ end.
+
+delete_global_name2(Name, S) ->
+ case ets:lookup(global_names, Name) of
+ [{Name, Pid, _Method, RPid, Ref}] ->
+ true = ets:delete(global_names, Name),
+ delete_global_name2(Name, Pid, RPid, Ref, S);
+ [] ->
+ S
+ end.
+
+delete_global_name2(Name, Pid, RPid, Ref, S) ->
+ true = erlang:demonitor(Ref, [flush]),
+ kill_monitor_proc(RPid, Pid),
+ delete_global_name(Name, Pid),
+ ?trace({delete_global_name,{item,Name},{pid,Pid}}),
+ true = ets:delete_object(global_pid_names, {Pid, Name}),
+ true = ets:delete_object(global_pid_names, {Ref, Name}),
+ case ets:lookup(global_names_ext, Name) of
+ [{Name, Pid, RegNode}] ->
+ true = ets:delete(global_names_ext, Name),
+ ?trace({delete_global_name, {name,Name,{pid,Pid},{RegNode,Pid}}}),
+ dounlink_ext(Pid, RegNode);
+ [] ->
+ ?trace({delete_global_name,{name,Name,{pid,Pid},{node(Pid),Pid}}}),
+ ok
+ end,
+ trace_message(S, {del_name, node(Pid)}, [Name, Pid]).
+
+%% delete_global_name/2 is traced by the inviso application.
+%% Do not change.
+delete_global_name(_Name, _Pid) ->
+ ok.
+
+%%-----------------------------------------------------------------
+%% The locker is a satellite process to global_name_server. When a
+%% nodeup is received from a new node the global_name_server sends a
+%% message to the locker. The locker tries to set a lock in our
+%% partition, i.e. on all nodes known to us. When the lock is set, it
+%% tells global_name_server about it, and keeps the lock set.
+%% global_name_server sends a cancel message to the locker when the
+%% partitions are connected.
+
+%% There are two versions of the protocol between lockers on two nodes:
+%% Version 1: used by unpatched R7.
+%% Version 2: the messages exchanged between the lockers include the known
+%% nodes (see OTP-3576).
+%%-----------------------------------------------------------------
+
+-define(locker_vsn, 2).
+
+-record(multi,
+ {local = [], % Requests from nodes on the local host.
+ remote = [], % Other requests.
+ known = [], % Copy of global_name_server's known nodes. It's
+ % faster to keep a copy of known than asking
+ % for it when needed.
+ the_boss, % max([node() | 'known'])
+ just_synced = false, % true if node() synced just a moment ago
+ %% Statistics:
+ do_trace % bool()
+ }).
+
+-record(him, {node, locker, vsn, my_tag}).
+
+start_the_locker(DoTrace) ->
+ spawn_link(fun() -> init_the_locker(DoTrace) end).
+
+init_the_locker(DoTrace) ->
+ process_flag(trap_exit, true), % needed?
+ S0 = #multi{do_trace = DoTrace},
+ S1 = update_locker_known({add, get_known()}, S0),
+ loop_the_locker(S1),
+ erlang:error(locker_exited).
+
+loop_the_locker(S) ->
+ ?trace({loop_the_locker,S}),
+ receive
+ Message when element(1, Message) =/= nodeup ->
+ the_locker_message(Message, S)
+ after 0 ->
+ Timeout =
+ case {S#multi.local, S#multi.remote} of
+ {[],[]} ->
+ infinity;
+ _ ->
+ %% It is important that the timeout is greater
+ %% than zero, or the chance that some other node
+ %% in the partition sets the lock once this node
+ %% has failed after setting the lock is very slim.
+ if
+ S#multi.just_synced ->
+ 0; % no reason to wait after success
+ S#multi.known =:= [] ->
+ 200; % just to get started
+ true ->
+ erlang:min(1000 + 100*length(S#multi.known),
+ 3000)
+ end
+ end,
+ S1 = S#multi{just_synced = false},
+ receive
+ Message when element(1, Message) =/= nodeup ->
+ the_locker_message(Message, S1)
+ after Timeout ->
+ case is_global_lock_set() of
+ true ->
+ loop_the_locker(S1);
+ false ->
+ select_node(S1)
+ end
+ end
+ end.
+
+the_locker_message({his_the_locker, HisTheLocker, HisKnown0, _MyKnown}, S) ->
+ ?trace({his_the_locker, HisTheLocker, {node,node(HisTheLocker)}}),
+ {HisVsn, _HisKnown} = HisKnown0,
+ true = HisVsn > 4,
+ receive
+ {nodeup, Node, MyTag} when node(HisTheLocker) =:= Node ->
+ ?trace({the_locker_nodeup, {node,Node},{mytag,MyTag}}),
+ Him = #him{node = node(HisTheLocker), my_tag = MyTag,
+ locker = HisTheLocker, vsn = HisVsn},
+ loop_the_locker(add_node(Him, S));
+ {cancel, Node, _Tag, no_fun} when node(HisTheLocker) =:= Node ->
+ loop_the_locker(S)
+ after 60000 ->
+ ?trace({nodeupnevercame, node(HisTheLocker)}),
+ error_logger:error_msg("global: nodeup never came ~w ~w\n",
+ [node(), node(HisTheLocker)]),
+ loop_the_locker(S#multi{just_synced = false})
+ end;
+the_locker_message({cancel, _Node, undefined, no_fun}, S) ->
+ ?trace({cancel_the_locker, undefined, {node,_Node}}),
+ %% If we actually cancel something when a cancel message with the
+ %% tag 'undefined' arrives, we may be acting on an old nodedown,
+ %% to cancel a new nodeup, so we can't do that.
+ loop_the_locker(S);
+the_locker_message({cancel, Node, Tag, no_fun}, S) ->
+ ?trace({the_locker, cancel, {multi,S}, {tag,Tag},{node,Node}}),
+ receive
+ {nodeup, Node, Tag} ->
+ ?trace({cancelnodeup2, {node,Node},{tag,Tag}}),
+ ok
+ after 0 ->
+ ok
+ end,
+ loop_the_locker(remove_node(Node, S));
+the_locker_message({lock_set, _Pid, false, _}, S) ->
+ ?trace({the_locker, spurious, {node,node(_Pid)}}),
+ loop_the_locker(S);
+the_locker_message({lock_set, Pid, true, _HisKnown}, S) ->
+ Node = node(Pid),
+ ?trace({the_locker, self(), spontaneous, {node,Node}}),
+ case find_node_tag(Node, S) of
+ {true, MyTag, HisVsn} ->
+ LockId = locker_lock_id(Pid, HisVsn),
+ {IsLockSet, S1} = lock_nodes_safely(LockId, [], S),
+ Pid ! {lock_set, self(), IsLockSet, S1#multi.known},
+ Known2 = [node() | S1#multi.known],
+ ?trace({the_locker, spontaneous, {known2, Known2},
+ {node,Node}, {is_lock_set,IsLockSet}}),
+ case IsLockSet of
+ true ->
+ gen_server:cast(global_name_server,
+ {lock_is_set, Node, MyTag, LockId}),
+ ?trace({lock_sync_done, {pid,Pid},
+ {node,node(Pid)}, {me,self()}}),
+ %% Wait for global to tell us to remove lock.
+ %% Should the other locker's node die,
+ %% global_name_server will receive nodedown, and
+ %% then send {cancel, Node, Tag}.
+ receive
+ {cancel, Node, _Tag, Fun} ->
+ ?trace({cancel_the_lock,{node,Node}}),
+ call_fun(Fun),
+ delete_global_lock(LockId, Known2)
+ end,
+ S2 = S1#multi{just_synced = true},
+ loop_the_locker(remove_node(Node, S2));
+ false ->
+ loop_the_locker(S1#multi{just_synced = false})
+ end;
+ false ->
+ ?trace({the_locker, not_there, {node,Node}}),
+ Pid ! {lock_set, self(), false, S#multi.known},
+ loop_the_locker(S)
+ end;
+the_locker_message({add_to_known, Nodes}, S) ->
+ S1 = update_locker_known({add, Nodes}, S),
+ loop_the_locker(S1);
+the_locker_message({remove_from_known, Node}, S) ->
+ S1 = update_locker_known({remove, Node}, S),
+ loop_the_locker(S1);
+the_locker_message({do_trace, DoTrace}, S) ->
+ loop_the_locker(S#multi{do_trace = DoTrace});
+the_locker_message(Other, S) ->
+ unexpected_message(Other, locker),
+ ?trace({the_locker, {other_msg, Other}}),
+ loop_the_locker(S).
+
+%% Requests from nodes on the local host are chosen before requests
+%% from other nodes. This should be a safe optimization.
+select_node(S) ->
+ UseRemote = S#multi.local =:= [],
+ Others1 = if UseRemote -> S#multi.remote; true -> S#multi.local end,
+ Others2 = exclude_known(Others1, S#multi.known),
+ S1 = if
+ UseRemote -> S#multi{remote = Others2};
+ true -> S#multi{local = Others2}
+ end,
+ if
+ Others2 =:= [] ->
+ loop_the_locker(S1);
+ true ->
+ Him = random_element(Others2),
+ #him{locker = HisTheLocker, vsn = HisVsn,
+ node = Node, my_tag = MyTag} = Him,
+ HisNode = [Node],
+ Us = [node() | HisNode],
+ LockId = locker_lock_id(HisTheLocker, HisVsn),
+ ?trace({select_node, self(), {us, Us}}),
+ %% HisNode = [Node] prevents deadlock:
+ {IsLockSet, S2} = lock_nodes_safely(LockId, HisNode, S1),
+ case IsLockSet of
+ true ->
+ Known1 = Us ++ S2#multi.known,
+ ?trace({sending_lock_set, self(), {his,HisTheLocker}}),
+ HisTheLocker ! {lock_set, self(), true, S2#multi.known},
+ S3 = lock_is_set(S2, Him, MyTag, Known1, LockId),
+ loop_the_locker(S3);
+ false ->
+ loop_the_locker(S2)
+ end
+ end.
+
+%% Version 5: Both sides use the same requester id. Thereby the nodes
+%% common to both sides are locked by both locker processes. This
+%% means that the lock is still there when the 'new_nodes' message is
+%% received even if the other side has deleted the lock.
+locker_lock_id(Pid, Vsn) when Vsn > 4 ->
+ {?GLOBAL_RID, lists:sort([self(), Pid])}.
+
+lock_nodes_safely(LockId, Extra, S0) ->
+ %% Locking node() could stop some node that has already locked the
+ %% boss, so just check if it is possible to lock node().
+ First = delete_nonode([S0#multi.the_boss]),
+ case ([node()] =:= First) orelse (can_set_lock(LockId) =/= false) of
+ true ->
+ %% Locking the boss first is an optimization.
+ case set_lock(LockId, First, 0) of
+ true ->
+ S = update_locker_known(S0),
+ %% The boss may have changed, but don't bother.
+ Second = delete_nonode([node() | Extra] -- First),
+ case set_lock(LockId, Second, 0) of
+ true ->
+ Known = S#multi.known,
+ case set_lock(LockId, Known -- First, 0) of
+ true ->
+ _ = locker_trace(S, ok, {First, Known}),
+ {true, S};
+ false ->
+ %% Since the boss is locked we
+ %% should have gotten the lock, at
+ %% least if no one else is locking
+ %% 'global'. Calling set_lock with
+ %% Retries > 0 does not seem to
+ %% speed things up.
+ SoFar = First ++ Second,
+ del_lock(LockId, SoFar),
+ _ = locker_trace(S, not_ok, {Known,SoFar}),
+ {false, S}
+ end;
+ false ->
+ del_lock(LockId, First),
+ _ = locker_trace(S, not_ok, {Second, First}),
+ {false, S}
+ end;
+ false ->
+ _ = locker_trace(S0, not_ok, {First, []}),
+ {false, S0}
+ end;
+ false ->
+ {false, S0}
+ end.
+
+delete_nonode(L) ->
+ lists:delete(nonode@nohost, L).
+
+%% Let the server add timestamp.
+locker_trace(#multi{do_trace = false}, _, _Nodes) ->
+ ok;
+locker_trace(#multi{do_trace = true}, ok, Ns) ->
+ global_name_server ! {trace_message, {locker_succeeded, node()}, Ns};
+locker_trace(#multi{do_trace = true}, not_ok, Ns) ->
+ global_name_server ! {trace_message, {locker_failed, node()}, Ns};
+locker_trace(#multi{do_trace = true}, rejected, Ns) ->
+ global_name_server ! {trace_message, {lock_rejected, node()}, Ns}.
+
+update_locker_known(S) ->
+ receive
+ {add_to_known, Nodes} ->
+ S1 = update_locker_known({add, Nodes}, S),
+ update_locker_known(S1);
+ {remove_from_known, Node} ->
+ S1 = update_locker_known({remove, Node}, S),
+ update_locker_known(S1)
+ after 0 ->
+ S
+ end.
+
+update_locker_known(Upd, S) ->
+ Known = case Upd of
+ {add, Nodes} -> Nodes ++ S#multi.known;
+ {remove, Node} -> lists:delete(Node, S#multi.known)
+ end,
+ TheBoss = the_boss([node() | Known]),
+ S#multi{known = Known, the_boss = TheBoss}.
+
+random_element(L) ->
+ {A,B,C} = now(),
+ E = (A+B+C) rem length(L),
+ lists:nth(E+1, L).
+
+exclude_known(Others, Known) ->
+ [N || N <- Others, not lists:member(N#him.node, Known)].
+
+lock_is_set(S, Him, MyTag, Known1, LockId) ->
+ Node = Him#him.node,
+ receive
+ {lock_set, P, true, _} when node(P) =:= Node ->
+ gen_server:cast(global_name_server,
+ {lock_is_set, Node, MyTag, LockId}),
+ ?trace({lock_sync_done, {p,P, node(P)}, {me,self()}}),
+
+ %% Wait for global to tell us to remove lock. Should the
+ %% other locker's node die, global_name_server will
+ %% receive nodedown, and then send {cancel, Node, Tag, Fun}.
+ receive
+ {cancel, Node, _, Fun} ->
+ ?trace({lock_set_loop, {known1,Known1}}),
+ call_fun(Fun),
+ delete_global_lock(LockId, Known1)
+ end,
+ S#multi{just_synced = true,
+ local = lists:delete(Him, S#multi.local),
+ remote = lists:delete(Him, S#multi.remote)};
+ {lock_set, P, false, _} when node(P) =:= Node ->
+ ?trace({not_both_set, {node,Node},{p, P},{known1,Known1}}),
+ _ = locker_trace(S, rejected, Known1),
+ delete_global_lock(LockId, Known1),
+ S;
+ {cancel, Node, _, Fun} ->
+ ?trace({the_locker, cancel2, {node,Node}}),
+ call_fun(Fun),
+ _ = locker_trace(S, rejected, Known1),
+ delete_global_lock(LockId, Known1),
+ remove_node(Node, S);
+ {'EXIT', _, _} ->
+ ?trace({the_locker, exit, {node,Node}}),
+ _ = locker_trace(S, rejected, Known1),
+ delete_global_lock(LockId, Known1),
+ S
+ %% There used to be an 'after' clause (OTP-4902), but it is
+ %% no longer needed:
+ %% OTP-5770. Version 5 of the protocol. Deadlock can no longer
+ %% occur due to the fact that if a partition is locked, one
+ %% node in the other partition is also locked with the same
+ %% lock-id, which makes it impossible for any node in the
+ %% other partition to lock its partition unless it negotiates
+ %% with the first partition.
+ end.
+
+%% The locker does the {new_nodes, ...} call before removing the lock.
+call_fun(no_fun) ->
+ ok;
+call_fun(Fun) ->
+ Fun().
+
+%% The lock on the boss is removed last. The purpose is to reduce the
+%% risk of failing to lock the known nodes after having locked the
+%% boss. (Assumes the boss occurs only once.)
+delete_global_lock(LockId, Nodes) ->
+ TheBoss = the_boss(Nodes),
+ del_lock(LockId, lists:delete(TheBoss, Nodes)),
+ del_lock(LockId, [TheBoss]).
+
+the_boss(Nodes) ->
+ lists:max(Nodes).
+
+find_node_tag(Node, S) ->
+ case find_node_tag2(Node, S#multi.local) of
+ false ->
+ find_node_tag2(Node, S#multi.remote);
+ Reply ->
+ Reply
+ end.
+
+find_node_tag2(_Node, []) ->
+ false;
+find_node_tag2(Node, [#him{node = Node, my_tag = MyTag, vsn = HisVsn} | _]) ->
+ {true, MyTag, HisVsn};
+find_node_tag2(Node, [_E | Rest]) ->
+ find_node_tag2(Node, Rest).
+
+remove_node(Node, S) ->
+ S#multi{local = remove_node2(Node, S#multi.local),
+ remote = remove_node2(Node, S#multi.remote)}.
+
+remove_node2(_Node, []) ->
+ [];
+remove_node2(Node, [#him{node = Node} | Rest]) ->
+ Rest;
+remove_node2(Node, [E | Rest]) ->
+ [E | remove_node2(Node, Rest)].
+
+add_node(Him, S) ->
+ case is_node_local(Him#him.node) of
+ true ->
+ S#multi{local = [Him | S#multi.local]};
+ false ->
+ S#multi{remote = [Him | S#multi.remote]}
+ end.
+
+is_node_local(Node) ->
+ {ok, Host} = inet:gethostname(),
+ case catch split_node(atom_to_list(Node), $@, []) of
+ [_, Host] ->
+ true;
+ _ ->
+ false
+ end.
+
+split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])];
+split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]);
+split_node([], _, Ack) -> [lists:reverse(Ack)].
+
+cancel_locker(Node, S, Tag) ->
+ cancel_locker(Node, S, Tag, no_fun).
+
+cancel_locker(Node, S, Tag, ToBeRunOnLockerF) ->
+ S#state.the_locker ! {cancel, Node, Tag, ToBeRunOnLockerF},
+ Resolvers = S#state.resolvers,
+ ?trace({cancel_locker, {node,Node},{tag,Tag},
+ {sync_tag_my, get({sync_tag_my, Node})},{resolvers,Resolvers}}),
+ case lists:keyfind(Node, 1, Resolvers) of
+ {_, Tag, Resolver} ->
+ ?trace({{resolver, Resolver}}),
+ exit(Resolver, kill),
+ S1 = trace_message(S, {kill_resolver, Node}, [Tag, Resolver]),
+ S1#state{resolvers = lists:keydelete(Node, 1, Resolvers)};
+ _ ->
+ S
+ end.
+
+reset_node_state(Node) ->
+ ?trace({{node,Node}, reset_node_state, get()}),
+ erase({wait_lock, Node}),
+ erase({save_ops, Node}),
+ erase({pre_connect, Node}),
+ erase({prot_vsn, Node}),
+ erase({sync_tag_my, Node}),
+ erase({sync_tag_his, Node}),
+ erase({lock_id, Node}).
+
+%% Some node sent us his names. When a name clash is found, the resolve
+%% function is called from the smaller node => all resolve funcs are called
+%% from the same partition.
+exchange_names([{Name, Pid, Method} | Tail], Node, Ops, Res) ->
+ case ets:lookup(global_names, Name) of
+ [{Name, Pid, _Method, _RPid2, _Ref2}] ->
+ exchange_names(Tail, Node, Ops, Res);
+ [{Name, Pid2, Method2, _RPid2, _Ref2}] when node() < Node ->
+ %% Name clash! Add the result of resolving to Res(olved).
+ %% We know that node(Pid) =/= node(), so we don't
+ %% need to link/unlink to Pid.
+ Node2 = node(Pid2), %% Node2 is connected to node().
+ case rpc:call(Node2, ?MODULE, resolve_it,
+ [Method2, Name, Pid, Pid2]) of
+ Pid ->
+ Op = {insert, {Name, Pid, Method}},
+ exchange_names(Tail, Node, [Op | Ops], Res);
+ Pid2 ->
+ Op = {insert, {Name, Pid2, Method2}},
+ exchange_names(Tail, Node, Ops, [Op | Res]);
+ none ->
+ Op = {delete, Name},
+ exchange_names(Tail, Node, [Op | Ops], [Op | Res]);
+ {badrpc, Badrpc} ->
+ error_logger:info_msg("global: badrpc ~w received when "
+ "conflicting name ~w was found\n",
+ [Badrpc, Name]),
+ Op = {insert, {Name, Pid, Method}},
+ exchange_names(Tail, Node, [Op | Ops], Res);
+ Else ->
+ error_logger:info_msg("global: Resolve method ~w for "
+ "conflicting name ~w returned ~w\n",
+ [Method, Name, Else]),
+ Op = {delete, Name},
+ exchange_names(Tail, Node, [Op | Ops], [Op | Res])
+ end;
+ [{Name, _Pid2, _Method, _RPid, _Ref}] ->
+ %% The other node will solve the conflict.
+ exchange_names(Tail, Node, Ops, Res);
+ _ ->
+ %% Entirely new name.
+ exchange_names(Tail, Node,
+ [{insert, {Name, Pid, Method}} | Ops], Res)
+ end;
+exchange_names([], _, Ops, Res) ->
+ ?trace({exchange_names_finish,{ops,Ops},{res,Res}}),
+ {Ops, Res}.
+
+resolve_it(Method, Name, Pid1, Pid2) ->
+ catch Method(Name, Pid1, Pid2).
+
+minmax(P1,P2) ->
+ if node(P1) < node(P2) -> {P1, P2}; true -> {P2, P1} end.
+
+-spec random_exit_name(term(), pid(), pid()) -> pid().
+random_exit_name(Name, Pid, Pid2) ->
+ {Min, Max} = minmax(Pid, Pid2),
+ error_logger:info_msg("global: Name conflict terminating ~w\n",
+ [{Name, Max}]),
+ exit(Max, kill),
+ Min.
+
+random_notify_name(Name, Pid, Pid2) ->
+ {Min, Max} = minmax(Pid, Pid2),
+ Max ! {global_name_conflict, Name},
+ Min.
+
+-spec notify_all_name(term(), pid(), pid()) -> 'none'.
+notify_all_name(Name, Pid, Pid2) ->
+ Pid ! {global_name_conflict, Name, Pid2},
+ Pid2 ! {global_name_conflict, Name, Pid},
+ none.
+
+dolink_ext(Pid, RegNode) when RegNode =:= node() ->
+ link(Pid);
+dolink_ext(_, _) ->
+ ok.
+
+dounlink_ext(Pid, RegNode) when RegNode =:= node() ->
+ unlink_pid(Pid);
+dounlink_ext(_Pid, _RegNode) ->
+ ok.
+
+unlink_pid(Pid) ->
+ case ets:member(global_pid_names, Pid) of
+ false ->
+ case ets:member(global_pid_ids, Pid) of
+ false ->
+ unlink(Pid);
+ true ->
+ ok
+ end;
+ true ->
+ ok
+ end.
+
+pid_is_locking(Pid, PidRefs) ->
+ lists:keyfind(Pid, 1, PidRefs) =/= false.
+
+delete_lock(Ref, S0) ->
+ Locks = pid_locks(Ref),
+ del_locks(Locks, Ref, S0#state.known),
+ F = fun({ResourceId, LockRequesterId, PidRefs}, S) ->
+ {Pid, _RPid, Ref} = lists:keyfind(Ref, 3, PidRefs),
+ remove_lock(ResourceId, LockRequesterId, Pid, PidRefs, true,S)
+ end,
+ lists:foldl(F, S0, Locks).
+
+pid_locks(Ref) ->
+ L = lists:flatmap(fun({_, ResourceId}) ->
+ ets:lookup(global_locks, ResourceId)
+ end, ets:lookup(global_pid_ids, Ref)),
+ [Lock || Lock = {_Id, _Req, PidRefs} <- L,
+ rpid_is_locking(Ref, PidRefs)].
+
+rpid_is_locking(Ref, PidRefs) ->
+ lists:keyfind(Ref, 3, PidRefs) =/= false.
+
+%% Send {async_del_lock, ...} to old nodes (pre R11B-3).
+del_locks([{ResourceId, _LockReqId, PidRefs} | Tail], Ref, KnownNodes) ->
+ {Pid, _RPid, Ref} = lists:keyfind(Ref, 3, PidRefs),
+ case node(Pid) =:= node() of
+ true ->
+ gen_server:abcast(KnownNodes, global_name_server,
+ {async_del_lock, ResourceId, Pid});
+ false ->
+ ok
+ end,
+ del_locks(Tail, Ref, KnownNodes);
+del_locks([], _Ref, _KnownNodes) ->
+ ok.
+
+handle_nodedown(Node, S) ->
+ %% DOWN signals from monitors have removed locks and registered names.
+ #state{known = Known, synced = Syncs} = S,
+ NewS = cancel_locker(Node, S, get({sync_tag_my, Node})),
+ NewS#state.the_locker ! {remove_from_known, Node},
+ reset_node_state(Node),
+ NewS#state{known = lists:delete(Node, Known),
+ synced = lists:delete(Node, Syncs)}.
+
+get_names() ->
+ ets:select(global_names,
+ ets:fun2ms(fun({Name, Pid, Method, _RPid, _Ref}) ->
+ {Name, Pid, Method}
+ end)).
+
+get_names_ext() ->
+ ets:tab2list(global_names_ext).
+
+get_known() ->
+ gen_server:call(global_name_server, get_known, infinity).
+
+random_sleep(Times) ->
+ case (Times rem 10) of
+ 0 -> erase(random_seed);
+ _ -> ok
+ end,
+ case get(random_seed) of
+ undefined ->
+ {A1, A2, A3} = now(),
+ random:seed(A1, A2, A3 + erlang:phash(node(), 100000));
+ _ -> ok
+ end,
+ %% First time 1/4 seconds, then doubling each time up to 8 seconds max.
+ Tmax = if Times > 5 -> 8000;
+ true -> ((1 bsl Times) * 1000) div 8
+ end,
+ T = random:uniform(Tmax),
+ ?trace({random_sleep, {me,self()}, {times,Times}, {t,T}, {tmax,Tmax}}),
+ receive after T -> ok end.
+
+dec(infinity) -> infinity;
+dec(N) -> N - 1.
+
+send_again(Msg) ->
+ Me = self(),
+ spawn(fun() -> timer(Me, Msg) end).
+
+timer(Pid, Msg) ->
+ random_sleep(5),
+ Pid ! Msg.
+
+change_our_node_name(NewNode, S) ->
+ S1 = trace_message(S, {new_node_name, NewNode}, []),
+ S1#state{node_name = NewNode}.
+
+trace_message(#state{trace = no_trace}=S, _M, _X) ->
+ S;
+trace_message(S, M, X) ->
+ S#state{trace = [trace_message(M, X) | S#state.trace]}.
+
+trace_message(M, X) ->
+ {node(), now(), M, nodes(), X}.
+
+%%-----------------------------------------------------------------
+%% Each sync process corresponds to one call to sync. Each such
+%% process asks the global_name_server on all Nodes if it is in sync
+%% with Nodes. If not, that (other) node spawns a syncer process that
+%% waits for global to get in sync with all Nodes. When it is in
+%% sync, the syncer process tells the original sync process about it.
+%%-----------------------------------------------------------------
+start_sync(Nodes, From) ->
+ spawn_link(fun() -> sync_init(Nodes, From) end).
+
+sync_init(Nodes, From) ->
+ lists:foreach(fun(Node) -> monitor_node(Node, true) end, Nodes),
+ sync_loop(Nodes, From).
+
+sync_loop([], From) ->
+ gen_server:reply(From, ok);
+sync_loop(Nodes, From) ->
+ receive
+ {nodedown, Node} ->
+ monitor_node(Node, false),
+ sync_loop(lists:delete(Node, Nodes), From);
+ {synced, SNodes} ->
+ lists:foreach(fun(N) -> monitor_node(N, false) end, SNodes),
+ sync_loop(Nodes -- SNodes, From)
+ end.
+
+%%%=======================================================================
+%%% Get the current global_groups definition
+%%%=======================================================================
+check_sync_nodes() ->
+ case get_own_nodes() of
+ {ok, all} ->
+ nodes();
+ {ok, NodesNG} ->
+ %% global_groups parameter is defined, we are not allowed to sync
+ %% with nodes not in our own global group.
+ intersection(nodes(), NodesNG);
+ {error, _} = Error ->
+ Error
+ end.
+
+check_sync_nodes(SyncNodes) ->
+ case get_own_nodes() of
+ {ok, all} ->
+ SyncNodes;
+ {ok, NodesNG} ->
+ %% global_groups parameter is defined, we are not allowed to sync
+ %% with nodes not in our own global group.
+ OwnNodeGroup = intersection(nodes(), NodesNG),
+ IllegalSyncNodes = (SyncNodes -- [node() | OwnNodeGroup]),
+ case IllegalSyncNodes of
+ [] -> SyncNodes;
+ _ -> {error, {"Trying to sync nodes not defined in "
+ "the own global group", IllegalSyncNodes}}
+ end;
+ {error, _} = Error ->
+ Error
+ end.
+
+get_own_nodes() ->
+ case global_group:get_own_nodes_with_errors() of
+ {error, Error} ->
+ {error, {"global_groups definition error", Error}};
+ OkTup ->
+ OkTup
+ end.
+
+%%-----------------------------------------------------------------
+%% The deleter process is a satellite process to global_name_server
+%% that does background batch deleting of names when a process
+%% that had globally registered names dies. It is started by and
+%% linked to global_name_server.
+%%-----------------------------------------------------------------
+
+start_the_deleter(Global) ->
+ spawn_link(fun() -> loop_the_deleter(Global) end).
+
+loop_the_deleter(Global) ->
+ Deletions = collect_deletions(Global, []),
+ ?trace({loop_the_deleter, self(), {deletions,Deletions},
+ {names,get_names()}}),
+ %% trans_all_known is called rather than trans/3 with nodes() as
+ %% third argument. The reason is that known gets updated by
+ %% new_nodes when the lock is still set. nodes() on the other hand
+ %% could be updated later (if in_sync is received after the lock
+ %% is gone). It is not likely that in_sync would be received after
+ %% the lock has been taken here, but using trans_all_known makes it
+ %% even less likely.
+ trans_all_known(
+ fun(Known) ->
+ lists:map(
+ fun({Name,Pid}) ->
+ gen_server:abcast(Known, global_name_server,
+ {async_del_name, Name, Pid})
+ end, Deletions)
+ end),
+ loop_the_deleter(Global).
+
+collect_deletions(Global, Deletions) ->
+ receive
+ {delete_name, Global, Name, Pid} ->
+ collect_deletions(Global, [{Name,Pid} | Deletions]);
+ Other ->
+ unexpected_message(Other, deleter),
+ collect_deletions(Global, Deletions)
+ after case Deletions of
+ [] -> infinity;
+ _ -> 0
+ end ->
+ lists:reverse(Deletions)
+ end.
+
+%% The registrar is a helper process that registers and unregisters
+%% names. Since it never dies it assures that names are registered and
+%% unregistered on all known nodes. It is started by and linked to
+%% global_name_server.
+
+start_the_registrar() ->
+ spawn_link(fun() -> loop_the_registrar() end).
+
+loop_the_registrar() ->
+ receive
+ {trans_all_known, Fun, From} ->
+ ?trace({loop_the_registrar, self(), Fun, From}),
+ gen_server:reply(From, trans_all_known(Fun));
+ Other ->
+ unexpected_message(Other, register)
+ end,
+ loop_the_registrar().
+
+unexpected_message({'EXIT', _Pid, _Reason}, _What) ->
+ %% global_name_server died
+ ok;
+unexpected_message(Message, What) ->
+ error_logger:warning_msg("The global_name_server ~w process "
+ "received an unexpected message:\n~p\n",
+ [What, Message]).
+
+%%% Utilities
+
+%% When/if erlang:monitor() returns before trying to connect to the
+%% other node this function can be removed.
+do_monitor(Pid) ->
+ case (node(Pid) =:= node()) orelse lists:member(node(Pid), nodes()) of
+ true ->
+ %% Assume the node is still up
+ {Pid, erlang:monitor(process, Pid)};
+ false ->
+ F = fun() ->
+ Ref = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', Ref, process, Pid, _Info} ->
+ exit(normal)
+ end
+ end,
+ erlang:spawn_monitor(F)
+ end.
+
+intersection(_, []) ->
+ [];
+intersection(L1, L2) ->
+ L1 -- (L1 -- L2).