%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved via the world wide web at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
%% AB. All Rights Reserved.''
%%
%% $Id: global.erl,v 1.4 2009/09/17 09:46:19 kostis Exp $
%%
-module(global).
-behaviour(gen_server).
%% A Global register that allows the global registration of pid's and
%% name's, that dynamically keeps up to date with the entire network.
%% global can operate in two modes; in a fully connected network, or
%% in a non-fully connected network. In the latter case, the name
%% registration mechanism won't work.
%%
%% External exports
-export([start/0, start_link/0, stop/0, sync/0, sync/1,
safe_whereis_name/1, whereis_name/1, register_name/2, register_name/3,
register_name_external/2, register_name_external/3, unregister_name_external/1,
re_register_name/2, re_register_name/3,
unregister_name/1, registered_names/0, send/2, node_disconnected/1,
set_lock/1, set_lock/2, set_lock/3,
del_lock/1, del_lock/2,
trans/2, trans/3, trans/4,
random_exit_name/3, random_notify_name/3, notify_all_name/3, cnode/3]).
%% Internal exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, timer/2, sync_init/2, init_locker/5, resolve_it/4,
init_the_locker/1]).
-export([info/0]).
%-define(PRINT(X), erlang:display(X)).
-define(PRINT(X), true).
%-define(P2(X), erlang:display(X)).
%-define(P2(X), erlang:display({cs(),X})).
-define(P2(X), true).
%-define(P1(X), erlang:display(X)).
-define(P1(X), true).
%-define(P(X), erlang:display(X)).
-define(P(X), true).
%-define(FORMAT(S, A), format(S, A)).
-define(FORMAT(S, A), ok).
%%% In certain places in the server, calling io:format hangs everything,
%%% so we'd better use erlang:display/1.
% format(S, A) ->
% erlang:display({format, cs(), S, A}),
% % io:format(S, A),
% ok.
% cs() ->
% {Big, Small, Tiny} = now(),
% (Small rem 100) * 100 + (Tiny div 10000).
%% Some notes on the internal structure:
%% One invariant is that the list of locker processes is keyed; i.e.,
%% there is only one process per neighboring node.
%% When an item has been stored in the process dictionary, it is not
%% necessarily cleared when not in use anymore. In other words, it's
%% not an error if there is already an item there when one is to be
%% stored.
%% This is the protocol version
%% Vsn 1 is the original protocol.
%% Vsn 2 is enhanced with code to take care of registration of names from
%% non erlang nodes, e.g. c-nodes.
%% Vsn 3 is enhanced with a tag in the synch messages to distinguish
%% different synch sessions from each other, see OTP-2766.
%% Note: This requires also that the ticket OTP-2928 is fixed on the nodes
%% running vsn 1 or 2; if such nodes will coexist with vsn 3 nodes.
%% Vsn 4 uses a single, permanent, locker process, but works like vsn 3
%% when communicating with vsn 3 nodes.
%% -define(vsn, 4). %% Now given in options
%%-----------------------------------------------------------------
%% connect_all = boolean() - true if we are supposed to set up a
%% fully connected net
%% known = [Node] - all nodes known to us
%% synced = [Node] - all nodes that have the same names as us
%% lockers = [{Node, MyLockerPid}] - the pid of the locker
%% process for each Node
%% syncers = [pid()] - all current syncers processes
%% node_name = atom() - our node name (can change if distribution
%% is started/stopped dynamically)
%%
%% In addition to these, we keep info about messages arrived in
%% the process dictionary:
%% {pre_connect, Node} = {Vsn, InitMsg} - init_connect msgs that
%% arrived before nodeup
%% {wait_lock, Node} = {exchange, NameList} | lock_is_set
%% - see comment below (handle_cast)
%% {save_ops, Node} = [operation()] - save the ops between
%% exchange and resolved
%% {prot_vsn, Node} = Vsn - the exchange protocol version
%% {sync_tag_my, Node} = My tag, used at synchronization with Node
%% {sync_tag_his, Node} = The Node's tag, used at synchronization
%%-----------------------------------------------------------------
-record(state, {connect_all, known = [], synced = [],
lockers = [], syncers = [], node_name = node(),
the_locker, the_deleter}).
start() -> gen_server:start({local, global_name_server}, global, [], []).
start_link() -> gen_server:start_link({local, global_name_server},global,[],[]).
stop() -> gen_server:call(global_name_server, stop, infinity).
sync() ->
case check_sync_nodes() of
{error, Error} ->
{error, Error};
SyncNodes ->
gen_server:call(global_name_server, {sync, SyncNodes}, infinity)
end.
sync(Nodes) ->
case check_sync_nodes(Nodes) of
{error, Error} ->
{error, Error};
SyncNodes ->
gen_server:call(global_name_server, {sync, SyncNodes}, infinity)
end.
send(Name, Msg) ->
case whereis_name(Name) of
Pid when pid(Pid) ->
Pid ! Msg,
Pid;
undefined ->
exit({badarg, {Name, Msg}})
end.
%% See OTP-3737. (safe_whereis_name/1 is in fact not used anywhere in OTP.)
whereis_name(Name) ->
where(Name).
safe_whereis_name(Name) ->
gen_server:call(global_name_server, {whereis, Name}, infinity).
node_disconnected(Node) ->
global_name_server ! {nodedown, Node}.
%%-----------------------------------------------------------------
%% Method = function(Name, Pid1, Pid2) -> Pid | Pid2 | none
%% Method is called if a name conflict is detected when two nodes
%% are connecting to each other. It is supposed to return one of
%% the Pids or 'none'. If a pid is returned, that pid is
%% registered as Name on all nodes. If 'none' is returned, the
%% Name is unregistered on all nodes. If anything else is returned,
%% the Name is unregistered as well.
%% Method is called once at one of the nodes where the processes reside
%% only. If different Methods are used for the same name, it is
%% undefined which one of them is used.
%% Method is blocking, i.e. when it is called, no calls to whereis/
%% send is let through until it has returned.
%%-----------------------------------------------------------------
register_name(Name, Pid) when pid(Pid) ->
register_name(Name, Pid, {global, random_exit_name}).
register_name(Name, Pid, Method) when pid(Pid) ->
trans_all_known(fun(Nodes) ->
case where(Name) of
undefined ->
gen_server:multi_call(Nodes,
global_name_server,
{register, Name, Pid, Method}),
yes;
_Pid -> no
end
end).
unregister_name(Name) ->
case where(Name) of
undefined ->
ok;
_ ->
trans_all_known(fun(Nodes) ->
gen_server:multi_call(Nodes,
global_name_server,
{unregister, Name}),
ok
end)
end.
re_register_name(Name, Pid) when pid(Pid) ->
re_register_name(Name, Pid, {global, random_exit_name}).
re_register_name(Name, Pid, Method) when pid(Pid) ->
trans_all_known(fun(Nodes) ->
gen_server:multi_call(Nodes,
global_name_server,
{register, Name, Pid, Method}),
yes
end).
%% Returns all globally registered names
registered_names() -> lists:map(fun({Name, _Pid, _Method}) -> Name end,
ets:tab2list(global_names)).
%%-----------------------------------------------------------------
%% An external node (i.e not an erlang node) (un)registers a name.
%% If the registered Pid crashes the name is to be removed from global.
%% If the external node crashes the name is to be removed from global.
%% If the erlang node which registers the name crashes the name is also to be
%% removed, because the registered process is not supervised any more,
%% (i.e there is no link to the registered Pid).
%%-----------------------------------------------------------------
register_name_external(Name, Pid) when pid(Pid) ->
register_name_external(Name, Pid, {global, random_exit_name}).
register_name_external(Name, Pid, Method) when pid(Pid) ->
trans_all_known(fun(Nodes) ->
case where(Name) of
undefined ->
gen_server:multi_call(Nodes,
global_name_server,
{register, Name, Pid, Method}),
gen_server:multi_call(Nodes,
global_name_server,
{register_ext, Name, Pid, node()}),
yes;
_Pid -> no
end
end).
unregister_name_external(Name) ->
case where(Name) of
undefined ->
ok;
_ ->
trans_all_known(fun(Nodes) ->
gen_server:multi_call(Nodes,
global_name_server,
{unregister, Name}),
gen_server:multi_call(Nodes,
global_name_server,
{unregister_ext, Name}),
ok
end)
end.
%%-----------------------------------------------------------------
%% Args: Id = id()
%% Nodes = [node()]
%% id() = {ResourceId, LockRequesterId}
%% Retries = infinity | int() > 0
%% Purpose: Sets a lock on the specified nodes (or all nodes if
%% none are specified) on ResourceId for LockRequesterId. If there
%% already exists a lock on ResourceId for another owner
%% than LockRequesterId, false is returned, otherwise true.
%% Returns: boolean()
%%-----------------------------------------------------------------
set_lock(Id) ->
set_lock(Id, [node() | nodes()], infinity, 1).
set_lock(Id, Nodes) ->
set_lock(Id, Nodes, infinity, 1).
set_lock(Id, Nodes, Retries) when Retries > 0 ->
set_lock(Id, Nodes, Retries, 1);
set_lock(Id, Nodes, infinity) ->
set_lock(Id, Nodes, infinity, 1).
set_lock(_Id, _Nodes, 0, _) -> false;
set_lock({ResourceId, LockRequesterId}, Nodes, Retries, Times) ->
Id = {ResourceId, LockRequesterId},
Msg = {set_lock, Id},
{Replies, _} =
gen_server:multi_call(Nodes, global_name_server, Msg),
?P2({set_lock, node(), self(), {ResourceId, LockRequesterId},
Nodes, Retries, Times, Replies, catch erlang:error(kaka)}),
?P({set_lock, node(), ResourceId,
{LockRequesterId, node(LockRequesterId)}}),
case check_replies(Replies, Id, Nodes) of
true -> ?P({set_lock_true, node(), ResourceId}),
true;
false ->
random_sleep(Times),
set_lock(Id, Nodes, dec(Retries), Times+1);
N when integer(N) ->
?P({sleeping, N}),
timer:sleep(N*500),
set_lock(Id, Nodes, Retries, Times);
Pid when pid(Pid) ->
?P({waiting_for, Pid}),
Ref = erlang:monitor(process, Pid),
receive
{'DOWN', Ref, process, Pid, _Reason} ->
?P({waited_for, Pid, _Reason}),
set_lock(Id, Nodes, Retries, Times)
end
end.
check_replies([{_Node, true} | T], Id, Nodes) ->
check_replies(T, Id, Nodes);
check_replies([{_Node, Status} | _T], Id, Nodes) ->
gen_server:multi_call(Nodes, global_name_server, {del_lock, Id}),
Status;
check_replies([], _Id, _Nodes) ->
true.
del_lock(Id) ->
del_lock(Id, [node() | nodes()]).
del_lock({ResourceId, LockRequesterId}, Nodes) ->
Id = {ResourceId, LockRequesterId},
?P2({del_lock, node(), self(), ResourceId, LockRequesterId, Nodes}),
gen_server:multi_call(Nodes, global_name_server, {del_lock, Id}),
true.
%%-----------------------------------------------------------------
%% Args: Id = id()
%% Fun = fun() | {M,F}
%% Nodes = [node()]
%% Retries = infinity | int() > 0
%% Purpose: Sets a lock on Id (as set_lock), and evaluates
%% Res = Fun() on success.
%% Returns: Res | aborted (note, if Retries is infinity, the
%% transaction won't abort)
%%-----------------------------------------------------------------
trans(Id, Fun) -> trans(Id, Fun, [node() | nodes()], infinity).
trans(Id, Fun, Nodes) -> trans(Id, Fun, Nodes, infinity).
trans(_Id, _Fun, _Nodes, 0) -> aborted;
trans(Id, Fun, Nodes, Retries) ->
case set_lock(Id, Nodes, Retries) of
true ->
case catch Fun() of
{'EXIT', R} ->
del_lock(Id, Nodes),
exit(R);
Res ->
del_lock(Id, Nodes),
Res
end;
false ->
aborted
end.
%%% Similar to trans(Id, Fun), but always uses global's own lock,
%%% on all nodes known to global, making sure that no new nodes have
%%% become known while we got the list of known nodes.
trans_all_known(F) ->
Id = {global, self()},
Nodes = [node() | gen_server:call(global_name_server, get_known)],
case set_lock(Id, Nodes) of
true ->
Nodes2 = [node() | gen_server:call(global_name_server, get_known)],
case Nodes2 -- Nodes of
[] ->
case catch F(Nodes2) of
{'EXIT', R} ->
del_lock(Id, Nodes2),
exit(R);
Res ->
del_lock(Id, Nodes2),
Res
end;
_ ->
del_lock(Id, Nodes),
trans_all_known(F)
end;
false ->
aborted
end.
info() ->
gen_server:call(global_name_server, info).
%%%-----------------------------------------------------------------
%%% Call-back functions from gen_server
%%%-----------------------------------------------------------------
init([]) ->
process_flag(trap_exit, true),
ets:new(global_locks, [set, named_table, protected]),
ets:new(global_names, [set, named_table, protected]),
ets:new(global_names_ext, [set, named_table, protected]),
%% multi
S = #state{the_locker = start_the_locker(self()),
the_deleter = start_the_deleter(self())},
case init:get_argument(connect_all) of
{ok, [["false"]]} ->
{ok, S#state{connect_all = false}};
_ ->
{ok, S#state{connect_all = true}}
end.
%%-----------------------------------------------------------------
%% Connection algorithm
%% ====================
%% This alg solves the problem with partitioned nets as well.
%%
%% The main idea in the alg is that when two nodes connect, they
%% try to set a lock in their own partition (i.e. all nodes already
%% known to them). When the lock is set in each partition, these
%% two nodes send each other a list with all registered names in
%% resp partition(*). If no conflict is found, the name tables are
%% just updated. If a conflict is found, a resolve function is
%% called once for each conflict. The result of the resolving
%% is sent to the other node. When the names are exchanged, all
%% other nodes in each partition are informed of the other nodes,
%% and they ping each other to form a fully connected net.
%%
%% Here's the flow:
%% Suppose nodes A and B connect, and C is connected to A.
%%
%% Node A
%% ------
%% << {nodeup, B}
%% [spawn locker]
%% B ! {init_connect, MyLocker}
%% << {init_connect, MyLocker}
%% [The lockers try to set the lock]
%% << {lock_is_set, B}
%% [Now, lock is set in both partitions]
%% B ! {exchange, Names}
%% << {exchange, Names}
%% [solve conflict]
%% B ! {resolved, Resolved}
%% << {resolved, Resolved}
%% C ! {new_nodes, Resolved, [B]}
%%
%% Node C
%% ------
%% << {new_nodes, ResolvedOps, NewNodes}
%% [insert Ops]
%% ping(NewNodes)
%% << {nodeup, B}
%% <ignore this one>
%%
%% Several things can disturb this picture.
%%
%% First, the got_names message may arrive *before* the nodeup
%% message, due to delay in net_kernel and an optimisation in the
%% emulator. We handle this by keeping track of these messages in the
%% pre_connect and lockers variables in our state.
%%
%% The most common situation is when a new node connects to an
%% existing net. In this case there's no need to set the lock on
%% all nodes in the net, as we know that there won't be any conflict.
%% This is optimised by sending {first_contact, Node} instead of got_names.
%% This implies that first_contact may arrive before nodeup as well.
%%
%% Of course we must handle that some node goes down during the
%% connection.
%%
%% (*) When this information is being exchanged, no one is allowed
%% to change the global register table. All calls to register etc
%% are protected by a lock. If a registered process dies
%% during this phase, the deregistration is done as soon as possible
%% on each node (i.e. when the info about the process has arrived).
%%-----------------------------------------------------------------
%% Messages in the protocol
%% ========================
%% 1. Between connecting nodes (gen_server:casts)
%% {init_connect, Vsn, Node, InitMsg}
%% InitMsg = {locker, LockerPid}
%% {exchange, Node, ListOfNames}
%% {resolved, Node, Ops, Known}
%% Known = list of nodes in Node's partition
%% 2. Between lockers on connecting nodes (!s)
%% {his_locker, Pid} (from our global)
%% lockers link to each other
%% {lock, Bool} loop until both lockers have lock = true,
%% then send to global {lock_is_set, Node}
%% 3. From connecting node to other nodes in the partition
%% {new_nodes, Node, Ops, NewNodes}
%% 4. sync protocol
%% {in_sync, Node, IsKnown}
%% - sent by each node to all new nodes
%%-----------------------------------------------------------------
handle_call({whereis, Name}, From, S) ->
do_whereis(Name, From),
{noreply, S};
handle_call({register, Name, Pid, Method}, _From, S) ->
?P2({register, node(), Name}),
ins_name(Name, Pid, Method),
{reply, yes, S};
handle_call({unregister, Name}, _From, S) ->
case ets:lookup(global_names, Name) of
[{_, Pid, _}] ->
?P2({unregister, node(), Name, Pid, node(Pid)}),
ets:delete(global_names, Name),
dounlink(Pid);
_ -> ok
end,
{reply, ok, S};
handle_call({register_ext, Name, Pid, RegNode}, _F, S) ->
ins_name_ext(Name, Pid, RegNode),
{reply, yes, S};
handle_call({unregister_ext, Name}, _From, S) ->
ets:delete(global_names_ext, Name),
{reply, ok, S};
handle_call({set_lock, Lock}, {Pid, _Tag}, S) ->
Reply = handle_set_lock(Lock, Pid),
{reply, Reply, S};
handle_call({del_lock, Lock}, {Pid, _Tag}, S) ->
handle_del_lock(Lock, Pid),
{reply, true, S};
handle_call(get_known, _From, S) ->
{reply, S#state.known, S};
%% R7 may call us?
handle_call(get_known_v2, _From, S) ->
{reply, S#state.known, S};
handle_call({sync, Nodes}, From, S) ->
%% If we have several global groups, this won't work, since we will
%% do start_sync on a nonempty list of nodes even if the system
%% is quiet.
Pid = start_sync(lists:delete(node(), Nodes) -- S#state.synced, From),
{noreply, S#state{syncers = [Pid | S#state.syncers]}};
handle_call(get_protocol_version, _From, S) ->
{reply, ?vsn, S};
handle_call(get_names_ext, _From, S) ->
{reply, get_names_ext(), S};
handle_call(info, _From, S) ->
{reply, S, S};
handle_call(stop, _From, S) ->
{stop, normal, stopped, S}.
%%=======================================================================================
%% init_connect
%%
%% Vsn 1 is the original protocol.
%% Vsn 2 is enhanced with code to take care of registration of names from
%% non erlang nodes, e.g. c-nodes.
%% Vsn 3 is enhanced with a tag in the synch messages to distinguish
%% different synch sessions from each other, see OTP-2766.
%% Note: This requires also that the ticket OTP-2928 is fixed on the nodes
%% running vsn 1 or 2; if such nodes will coexist with vsn 3 nodes.
%%=======================================================================================
handle_cast({init_connect, Vsn, Node, InitMsg}, S) ->
?FORMAT("~p #### init_connect Vsn ~p, Node ~p, InitMsg ~p~n",[node(), Vsn, Node, InitMsg]),
case Vsn of
%% It is always the responsibility of newer versions to understand
%% older versions of the protocol.
{HisVsn, HisTag} when HisVsn > ?vsn ->
init_connect(?vsn, Node, InitMsg, HisTag, S#state.lockers, S);
{HisVsn, HisTag} ->
init_connect(HisVsn, Node, InitMsg, HisTag, S#state.lockers, S);
%% To be future compatible
Tuple when tuple(Tuple) ->
List = tuple_to_list(Tuple),
[_HisVsn, HisTag | _] = List,
%% use own version handling if his is newer.
init_connect(?vsn, Node, InitMsg, HisTag, S#state.lockers, S);
_ when Vsn < 3 ->
init_connect(Vsn, Node, InitMsg, undef, S#state.lockers, S);
_ ->
Txt = io_lib:format("Illegal global protocol version ~p Node: ~p",[Vsn, Node]),
error_logger:info_report(lists:flatten(Txt))
end,
{noreply, S};
%%=======================================================================================
%% lock_is_set
%%
%% Ok, the lock is now set on both partitions. Send our names to other node.
%%=======================================================================================
handle_cast({lock_is_set, Node, MyTag}, S) ->
?FORMAT("~p #### lock_is_set Node ~p~n",[node(), Node]),
Sync_tag_my = get({sync_tag_my, Node}),
PVsn = get({prot_vsn, Node}),
?P2({lock_is_set, node(), Node, {MyTag, PVsn}, Sync_tag_my}),
case {MyTag, PVsn} of
{Sync_tag_my, undefined} ->
%% Patch for otp-2728, the connection to the Node is flipping up and down
%% the messages from the 'older' sync tries can disturb the 'new' sync try
%% therefor all messages are discarded if the protocol vsn is not defined.
Txt = io_lib:format("undefined global protocol version Node: ~p",[Node]),
error_logger:info_report(lists:flatten(Txt)),
{noreply, S};
{Sync_tag_my, _} ->
%% Check that the Node is still not known
case lists:member(Node, S#state.known) of
false ->
?P2({lset, node(), Node, false}),
lock_is_set(Node, S#state.known),
{noreply, S};
true ->
?P2({lset, node(), Node, true}),
erase({wait_lock, Node}),
NewS = cancel_locker(Node, S),
{noreply, NewS}
end;
_ ->
?P2({lset, illegal, node(), Node}),
%% Illegal tag, delete the locker.
erase({wait_lock, Node}),
NewS = cancel_locker(Node, S),
{noreply, NewS}
end;
%%=======================================================================================
%% exchange
%%
%% Here the names are checked to detect name clashes.
%%=======================================================================================
%% Vsn 3 of the protocol
handle_cast({exchange, Node, NameList, NameExtList, MyTag}, S) ->
?FORMAT("~p #### handle_cast 3 lock_is_set exchange ~p~n",
[node(),{Node, NameList, NameExtList, MyTag}]),
Sync_tag_my = get({sync_tag_my, Node}),
PVsn = get({prot_vsn, Node}),
case {MyTag, PVsn} of
{Sync_tag_my, undefined} ->
%% Patch for otp-2728, the connection to the Node is flipping up and down
%% the messages from the 'older' sync tries can disturb the 'new' sync try
%% therefor all messages are discarded if the protocol vsn is not defined.
Txt = lists:flatten(io_lib:format(
"undefined global protocol version Node: ~p",[Node])),
error_logger:info_report(Txt),
{noreply, S};
{Sync_tag_my, _} ->
exchange(PVsn, Node, {NameList, NameExtList}, S#state.known),
{noreply, S};
_ ->
%% Illegal tag, delete the locker.
erase({wait_lock, Node}),
NewS = cancel_locker(Node, S),
{noreply, NewS}
end;
%%=======================================================================================
%% resolved
%%
%% Here the name clashes are resolved.
%%=======================================================================================
%% Vsn 3 of the protocol
handle_cast({resolved, Node, Resolved, HisKnown, _HisKnown_v2, Names_ext, MyTag}, S) ->
?FORMAT("~p #### 2 resolved ~p~n",[node(),{Node, Resolved, HisKnown, Names_ext}]),
Sync_tag_my = get({sync_tag_my, Node}),
PVsn = get({prot_vsn, Node}),
case {MyTag, PVsn} of
{Sync_tag_my, undefined} ->
%% Patch for otp-2728, the connection to the Node is flipping up and down
%% the messages from the 'older' sync tries can disturb the 'new' sync try
%% therefor all messages are discarded if the protocol vsn is not defined.
Txt = lists:flatten(io_lib:format(
"undefined global protocol version Node: ~p",[Node])),
error_logger:info_report(Txt),
{noreply, S};
{Sync_tag_my, _} ->
NewS = resolved(Node, Resolved, {HisKnown, HisKnown}, Names_ext, S),
{noreply, NewS};
_ ->
%% Illegal tag, delete the locker.
erase({wait_lock, Node}),
NewS = cancel_locker(Node, S),
{noreply, NewS}
end;
%%=======================================================================================
%% new_nodes
%%
%% We get to know the other node's known nodes.
%%=======================================================================================
%% Vsn 2 and 3 of the protocol
handle_cast({new_nodes, _Node, Ops, Names_ext, Nodes, _Nodes_v2}, S) ->
?P2({new_nodes, node(), Nodes}),
?FORMAT("~p #### 2 new_nodes ~p~n",[node(),{Ops, Names_ext, Nodes}]),
NewS = new_nodes(Ops, Names_ext, Nodes, S),
{noreply, NewS};
%%=======================================================================================
%% in_sync
%%
%% We are in sync with this node (from the other node's known world).
%%=======================================================================================
handle_cast({in_sync, Node, IsKnown}, S) ->
?FORMAT("~p #### in_sync ~p~n",[node(),{Node, IsKnown}]),
lists:foreach(fun(Pid) -> Pid ! {synced, [Node]} end, S#state.syncers),
%% moved up:
NewS = cancel_locker(Node, S),
erase({wait_lock, Node}),
erase({pre_connect, Node}),
erase({sync_tag_my, Node}),
erase({sync_tag_his, Node}),
NKnown = case lists:member(Node, Known = NewS#state.known) of
false when IsKnown == true ->
gen_server:cast({global_name_server, Node},
{in_sync, node(), false}),
[Node | Known];
_ ->
Known
end,
NSynced = case lists:member(Node, Synced = NewS#state.synced) of
true -> Synced;
false -> [Node | Synced]
end,
{noreply, NewS#state{known = NKnown, synced = NSynced}};
%% Called when Pid on other node crashed
handle_cast({async_del_name, Name, Pid}, S) ->
?P2({async_del_name, node(), Name, Pid, node(Pid)}),
case ets:lookup(global_names, Name) of
[{Name, Pid, _}] ->
ets:delete(global_names, Name),
dounlink(Pid);
_ -> ok
end,
ets:delete(global_names_ext, Name),
{noreply, S};
handle_cast({async_del_lock, _ResourceId, Pid}, S) ->
del_locks2(ets:tab2list(global_locks), Pid),
% ets:match_delete(global_locks, {ResourceId, '_', Pid}),
{noreply, S}.
handle_info({'EXIT', Deleter, _Reason}=Exit, #state{the_deleter=Deleter}=S) ->
{stop, {deleter_died,Exit}, S#state{the_deleter=undefined}};
handle_info({'EXIT', Pid, _Reason}, #state{the_deleter=Deleter}=S)
when pid(Pid) ->
?P2({global, exit, node(), Pid, node(Pid)}),
check_exit(Deleter, Pid),
Syncers = lists:delete(Pid, S#state.syncers),
Lockers = lists:keydelete(Pid, 2, S#state.lockers),
?PRINT({exit, Pid, lockers, node(), S#state.lockers}),
{noreply, S#state{syncers = Syncers, lockers = Lockers}};
handle_info({nodedown, Node}, S) when Node == S#state.node_name ->
%% Somebody stopped the distribution dynamically - change
%% references to old node name (Node) to new node name ('nonode@nohost')
{noreply, change_our_node_name(node(), S)};
handle_info({nodedown, Node}, S) ->
?FORMAT("~p #### nodedown 1 ####### Node ~p",[node(),Node]),
%% moved up:
do_node_down(Node),
#state{known = Known, synced = Syncs} = S,
NewS = cancel_locker(Node, S),
erase({wait_lock, Node}),
erase({save_ops, Node}),
erase({pre_connect, Node}),
erase({prot_vsn, Node}),
erase({sync_tag_my, Node}),
erase({sync_tag_his, Node}),
{noreply, NewS#state{known = lists:delete(Node, Known),
synced = lists:delete(Node, Syncs)}};
handle_info({nodeup, Node}, S) when Node == node() ->
?FORMAT("~p #### nodeup S ####### Node ~p~n",[node(), Node]),
%% Somebody started the distribution dynamically - change
%% references to old node name ('nonode@nohost') to Node.
{noreply, change_our_node_name(Node, S)};
handle_info({nodeup, Node}, S) when S#state.connect_all == true ->
?FORMAT("~p #### nodeup 1 ####### Node ~p",[node(),Node]),
IsKnown = lists:member(Node, S#state.known) or
%% This one is only for double nodeups (shouldn't occur!)
lists:keymember(Node, 1, S#state.lockers),
case IsKnown of
true ->
{noreply, S};
false ->
%% now() is used as a tag to separate different sycnh sessions
%% from each others. Global could be confused at bursty nodeups
%% because it couldn't separate the messages between the different
%% synch sessions started by a nodeup.
MyTag = now(),
resend_pre_connect(Node),
%% multi
S#state.the_locker ! {nodeup, Node, S#state.known, MyTag, self()},
Pid = start_locker(Node, S#state.known, MyTag, self(), S#state.the_locker),
Ls = S#state.lockers,
InitC = {init_connect, {?vsn, MyTag}, node(), {locker, Pid, S#state.known}},
?P2({putting, MyTag}),
put({sync_tag_my, Node}, MyTag),
gen_server:cast({global_name_server, Node}, InitC),
{noreply, S#state{lockers = [{Node, Pid} | Ls]}}
end;
%% This message is only to test otp-2766 Global may be confused at bursty
%% nodeup/nodedowns. It's a copy of the complex part of the handling of
%% the 'nodeup' message.
handle_info({test_vsn_tag_nodeup, Node}, S) when S#state.connect_all == true,
Node == node() ->
{noreply, S};
handle_info({test_vsn_tag_nodeup, Node}, S) when S#state.connect_all == true ->
?FORMAT("~p #### test_nodeup 1 ####### Node ~p~n",[node(), Node]),
MyTag = now(),
resend_pre_connect(Node),
S#state.the_locker ! {nodeup, Node, S#state.known, MyTag, self()},
Pid = start_locker(Node, S#state.known, MyTag, self(), S#state.the_locker),
Ls = S#state.lockers,
InitC = {init_connect, {?vsn, MyTag}, node(), {locker, Pid, S#state.known}},
put({sync_tag_my, Node}, MyTag),
gen_server:cast({global_name_server, Node}, InitC),
?PRINT({lockers, node(), Ls}),
{noreply, S#state{lockers = [{Node, Pid} | Ls]}};
handle_info({whereis, Name, From}, S) ->
do_whereis(Name, From),
{noreply, S};
handle_info(known, S) ->
io:format(">>>> ~p~n",[S#state.known]),
{noreply, S};
handle_info(_, S) ->
{noreply, S}.
%%=======================================================================================
%%=======================================================================================
%%=============================== Internal Functions ====================================
%%=======================================================================================
%%=======================================================================================
%%=======================================================================================
%% Another node wants to synchronize its registered names with us.
%% Start a locker process. Both nodes must have a lock before they are
%% allowed to continue.
%%=======================================================================================
init_connect(Vsn, Node, InitMsg, HisTag, Lockers, S) ->
?P2({init_connect, node(), Node}),
?FORMAT("~p #### init_connect Vsn, Node, InitMsg ~p~n",[node(),{Vsn, Node, InitMsg}]),
%% It is always the responsibility of newer versions to understand
%% older versions of the protocol.
put({prot_vsn, Node}, Vsn),
put({sync_tag_his, Node}, HisTag),
if
Vsn =< 3 ->
case lists:keysearch(Node, 1, Lockers) of
{value, {_Node, MyLocker}} ->
%% We both have lockers; let them set the lock
case InitMsg of
{locker, HisLocker, HisKnown} -> %% current version
?PRINT({init_connect1, node(), self(), Node,
MyLocker, HisLocker}),
MyLocker ! {his_locker, HisLocker, HisKnown};
{locker, _HisLocker, HisKnown, HisTheLocker} -> %% multi
?PRINT({init_connect1, node(), self(), Node,
MyLocker, _HisLocker}),
S#state.the_locker ! {his_the_locker, HisTheLocker,
HisKnown, S#state.known}
end;
false ->
?PRINT({init_connect11, node(), self(), Node}),
put({pre_connect, Node}, {Vsn, InitMsg, HisTag})
end;
true -> % Vsn > 3
?P2(vsn4),
case lists:keysearch(Node, 1, Lockers) of
{value, {_Node, _MyLocker}} ->
%% We both have lockers; let them set the lock
case InitMsg of
{locker, HisLocker, HisKnown} -> %% current version
?PRINT({init_connect1, node(), self(), Node,
_MyLocker, HisLocker}),
HisLocker ! {his_locker_new, S#state.the_locker,
{HisKnown, S#state.known}};
{locker, _HisLocker, HisKnown, HisTheLocker} -> %% multi
?PRINT({init_connect1, node(), self(), Node,
_MyLocker, _HisLocker}),
S#state.the_locker ! {his_the_locker, HisTheLocker,
HisKnown, S#state.known}
end;
false ->
?PRINT({init_connect11, node(), self(), Node}),
put({pre_connect, Node}, {Vsn, InitMsg, HisTag})
end
end.
%%=======================================================================================
%% In the simple case, we'll get lock_is_set before we get exchange,
%% but we may get exchange before we get lock_is_set from our locker.
%% If that's the case, we'll have to remember the exchange info, and
%% handle it when we get the lock_is_set. We do this by using the
%% process dictionary - when the lock_is_set msg is received, we store
%% this info. When exchange is received, we can check the dictionary
%% if the lock_is_set has been received. If not, we store info about
%% the exchange instead. In the lock_is_set we must first check if
%% exchange info is stored, in that case we take care of it.
%%=======================================================================================
lock_is_set(Node, Known) ->
?FORMAT("~p #### lock_is_set ~p~n",[node(),{Node, Node, Known}]),
PVsn = get({prot_vsn, Node}),
case PVsn of
_ -> % 3 and higher
gen_server:cast({global_name_server, Node},
{exchange, node(), get_names(), get_names_ext(),
get({sync_tag_his, Node})})
end,
%% If both have the lock, continue with exchange
case get({wait_lock, Node}) of
{exchange, NameList, NameExtList} ->
%% vsn 2, 3
put({wait_lock, Node}, lock_is_set),
exchange(PVsn, Node, {NameList, NameExtList}, Known);
undefined ->
put({wait_lock, Node}, lock_is_set)
end.
%%=======================================================================================
%% exchange
%%=======================================================================================
%% Vsn 3 and higher of the protocol
exchange(_Vsn, Node, {NameList, NameExtList}, Known) ->
?FORMAT("~p #### 3 lock_is_set exchange ~p~n",[node(),{Node, NameList, NameExtList}]),
case erase({wait_lock, Node}) of
lock_is_set ->
{Ops, Resolved} = exchange_names(NameList, Node, [], []),
put({save_ops, Node}, Ops),
gen_server:cast({global_name_server, Node},
{resolved, node(), Resolved, Known,
Known, get_names_ext(), get({sync_tag_his, Node})});
undefined ->
put({wait_lock, Node}, {exchange, NameList, NameExtList})
end.
resolved(Node, Resolved, {HisKnown, _HisKnown_v2}, Names_ext, S) ->
?P2({resolved, node(), Node, S#state.known}),
?FORMAT("~p #### 2 resolved ~p~n",[node(),{Node, Resolved, HisKnown, Names_ext}]),
erase({prot_vsn, Node}),
Ops = erase({save_ops, Node}) ++ Resolved,
Known = S#state.known,
Synced = S#state.synced,
NewNodes = [Node | HisKnown],
do_ops(Ops),
do_ops_ext(Ops,Names_ext),
gen_server:abcast(Known, global_name_server,
{new_nodes, node(), Ops, Names_ext, NewNodes, NewNodes}),
%% I am synced with Node, but not with HisKnown yet
lists:foreach(fun(Pid) -> Pid ! {synced, [Node]} end, S#state.syncers),
gen_server:abcast(HisKnown, global_name_server, {in_sync, node(), true}),
NewS = lists:foldl(fun(Node1, S1) -> cancel_locker(Node1, S1) end,
S,
NewNodes),
%% See (*) below... we're node b in that description
NewKnown = Known ++ (NewNodes -- Known),
NewS#state{known = NewKnown, synced = [Node | Synced]}.
new_nodes(Ops, Names_ext, Nodes, S) ->
?FORMAT("~p #### 2 new_nodes ~p~n",[node(),{Ops, Names_ext, Nodes}]),
do_ops(Ops),
do_ops_ext(Ops,Names_ext),
Known = S#state.known,
%% (*) This one requires some thought...
%% We're node a, other nodes b and c:
%% The problem is that {in_sync, a} may arrive before {resolved, [a]} to
%% b from c, leading to b sending {new_nodes, [a]} to us (node a).
%% Therefore, we make sure we never get duplicates in Known.
NewNodes = lists:delete(node(), Nodes -- Known),
gen_server:abcast(NewNodes, global_name_server, {in_sync, node(), true}),
S#state{known = Known ++ NewNodes}.
do_whereis(Name, From) ->
case is_lock_set(global) of
false ->
gen_server:reply(From, where(Name));
true ->
send_again({whereis, Name, From})
end.
terminate(_Reason, _S) ->
ets:delete(global_names),
ets:delete(global_names_ext),
ets:delete(global_locks).
code_change(_OldVsn, S, _Extra) ->
{ok, S}.
%% Resend init_connect to ourselves.
resend_pre_connect(Node) ->
case erase({pre_connect, Node}) of
% {Vsn, InitMsg, undef} ->
% %% Vsn 1 & 2
% ?PRINT({resend_pre_connect2, node(), self(), Node}),
% gen_server:cast(self(), {init_connect, Vsn, Node, InitMsg});
{Vsn, InitMsg, HisTag} ->
%% Vsn 3
?PRINT({resend_pre_connect3, node(), self(), Node}),
gen_server:cast(self(), {init_connect, {Vsn, HisTag}, Node, InitMsg});
_ ->
?PRINT({resend_pre_connect0, node(), self(), Node}),
ok
end.
ins_name(Name, Pid, Method) ->
case ets:lookup(global_names, Name) of
[{Name, Pid2, _}] ->
dounlink(Pid2);
[] ->
ok
end,
dolink(Pid),
ets:insert(global_names, {Name, Pid, Method}).
ins_name_ext(Name, Pid, RegNode) ->
case ets:lookup(global_names_ext, Name) of
[{Name, Pid2, _}] ->
dounlink(Pid2);
[] ->
ok
end,
dolink_ext(Pid, RegNode),
ets:insert(global_names_ext, {Name, Pid, RegNode}).
where(Name) ->
case ets:lookup(global_names, Name) of
[{_, Pid, _}] -> Pid;
[] -> undefined
end.
handle_set_lock({ResourceId, LockRequesterId}, Pid) ->
case ets:lookup(global_locks, ResourceId) of
[{ResourceId, LockRequesterId, Pids}] ->
case lists:member(Pid, Pids) of
true ->
true;
false ->
dolink(Pid),
ets:insert(global_locks, {ResourceId, LockRequesterId, [Pid | Pids]}),
true
end;
[{ResourceId, _LockRequesterId2, _Pid2}] ->
case ResourceId of
global ->
?P({before,
LockRequesterId,
_LockRequesterId2,
S#state.lockers}),
false;
_ ->
false
end;
[] ->
dolink(Pid),
ets:insert(global_locks, {ResourceId, LockRequesterId, [Pid]}),
true
end.
is_lock_set(ResourceId) ->
case ets:lookup(global_locks, ResourceId) of
[_Lock] -> true;
[] -> false
end.
handle_del_lock({ResourceId, LockRequesterId}, Pid) ->
case ets:lookup(global_locks, ResourceId) of
[{ResourceId, LockRequesterId, Pids}] when [Pid] == Pids ->
ets:delete(global_locks, ResourceId),
dounlink(Pid);
[{ResourceId, LockRequesterId, Pids}] ->
NewPids = lists:delete(Pid, Pids),
ets:insert(global_locks, {ResourceId, LockRequesterId, NewPids}),
dounlink(Pid);
_ -> ok
end.
do_ops(Ops) ->
lists:foreach(fun({insert, Item}) -> ets:insert(global_names, Item);
({delete, Name}) ->
case ets:lookup(global_names, Name) of
[{Name, Pid, _}] ->
?P2({do_ops_delete, node(), Name, Pid, node(Pid)}),
ets:delete(global_names, Name),
dounlink(Pid);
[] ->
ok
end
end, Ops).
%% If a new name, then it must be checked if it is an external name
%% If delete a name it is always deleted from global_names_ext
do_ops_ext(Ops, Names_ext) ->
lists:foreach(fun({insert, {Name, Pid, _Method}}) ->
case lists:keysearch(Name, 1, Names_ext) of
{value, {Name, Pid, RegNode}} ->
ets:insert(global_names_ext, {Name, Pid, RegNode});
_ ->
ok
end;
({delete, Name}) ->
ets:delete(global_names_ext, Name)
end, Ops).
%%-----------------------------------------------------------------
%% A locker is a process spawned by global_name_server when a
%% nodeup is received from a new node. Its purpose is to try to
%% set a lock in our partition, i.e. on all nodes known to us.
%% When the lock is set, it tells global about it, and keeps
%% the lock set. global sends a cancel message to the locker when
%% the partitions are connected.
%% Versions: at version 2, the messages exchanged between the lockers
%% include the known nodes (see OTP-3576). There is no way of knowing
%% the version number of the other side's locker when sending a message
%% to it, so we send both version 1 and 2, and flush the version 1 if
%% we receive version 2.
%%
%% Due to a mistake, an intermediate version of the new locking protocol
%% (using 3-tuples) went out in R7, which only understands itself. This patch
%% to R7 handles all kinds, which means sending all, and flush the ones we
%% don't want. (It will remain difficult to make a future version of the
%% protocol communicate with this one.)
%%
%%-----------------------------------------------------------------
%% (Version 2 in patched R7. No named version in R6 and older - let's call that
%% version 1.)
-define(locker_vsn, 2).
%%% multi
-record(multi, {known, others = []}).
start_the_locker(Global) ->
spawn_link(?MODULE, init_the_locker, [Global]).
%init_the_locker(Global) ->
% ok;
init_the_locker(Global) ->
process_flag(trap_exit, true), %needed?
loop_the_locker(Global, #multi{}),
erlang:error(locker_exited).
remove_node(_Node, []) ->
[];
remove_node(Node, [{Node, _HisTheLocker, _HisKnown, _MyTag} | Rest]) ->
Rest;
remove_node(Node, [E | Rest]) ->
[E | remove_node(Node, Rest)].
find_node_tag(_Node, []) ->
false;
find_node_tag(Node, [{Node, _HisTheLocker, _HisKnown, MyTag} | _Rest]) ->
{true, MyTag};
find_node_tag(Node, [_E | Rest]) ->
find_node_tag(Node, Rest).
loop_the_locker(Global, S) ->
?P2({others, node(), S#multi.others}),
% Known = S#multi.known,
Timeout = case S#multi.others of
[] ->
infinity;
_ ->
0
end,
receive
% {nodeup, Node, Known, Tag, P} ->
% ?P2({the_locker, nodeup, time(), node(), nodeup, Node, Tag}),
% loop_the_locker(Global, S);
{his_the_locker, HisTheLocker, HisKnown, MyKnown} ->
?P2({his_the_locker, time(), node(), HisTheLocker,
node(HisTheLocker)}),
receive
{nodeup, Node, _Known, MyTag, _P} when node(HisTheLocker) == Node ->
?P2({the_locker, nodeup, node(), Node,
node(HisTheLocker), MyTag,
process_info(self(), messages)}),
Others = S#multi.others,
loop_the_locker(Global,
S#multi{known=MyKnown,
others=[{node(HisTheLocker), HisTheLocker, HisKnown, MyTag} | Others]});
{cancel, Node, _Tag} when node(HisTheLocker) == Node ->
loop_the_locker(Global, S)
after 60000 ->
?P2({nodeupnevercame, node(), node(HisTheLocker)}),
error_logger:error_msg("global: nodeup never came ~w ~w~n",
[node(), node(HisTheLocker)]),
loop_the_locker(Global, S)
end;
{cancel, Node, undefined} ->
?P2({the_locker, cancel1, undefined, node(), Node}),
%% If we actually cancel something when a cancel message with the tag
%% 'undefined' arrives, we may be acting on an old nodedown, to cancel
%% a new nodeup, so we can't do that.
% receive
% {nodeup, Node, _Known, _MyTag, _P} ->
% ?P2({the_locker, cancelnodeup1, node(), Node}),
% ok
% after 0 ->
% ok
% end,
% Others = remove_node(Node, S#multi.others),
% loop_the_locker(Global, S#multi{others = Others});
loop_the_locker(Global, S);
{cancel, Node, Tag} ->
?P2({the_locker, cancel1, Tag, node(), Node}),
receive
{nodeup, Node, _Known, Tag, _P} ->
?P2({the_locker, cancelnodeup2, node(), Node}),
ok
after 0 ->
ok
end,
Others = remove_node(Node, S#multi.others),
loop_the_locker(Global, S#multi{others = Others});
{lock_set, _Pid, false, _} ->
?P2({the_locker, spurious, node(), node(_Pid)}),
loop_the_locker(Global, S);
{lock_set, Pid, true, HisKnown} ->
Node = node(Pid),
?P2({the_locker, spontaneous, node(), Node}),
NewKnown = gen_server:call(global_name_server, get_known),
Others =
case find_node_tag(Node, S#multi.others) of
{true, MyTag} ->
BothsKnown = HisKnown -- (HisKnown -- NewKnown),
Known1 = if
node() < Node ->
[node() | NewKnown];
true ->
[node() | NewKnown] -- BothsKnown
end,
?P2({lock1, node()}),
LockId = {global, self()},
IsLockSet = set_lock(LockId, Known1, 1),
Pid ! {lock_set, self(), IsLockSet, NewKnown},
?P2({the_locker, spontaneous, node(), Node, IsLockSet}),
case IsLockSet of
true ->
gen_server:cast(global_name_server,
{lock_is_set, Node, MyTag}),
?P1({lock_sync_done, time(), node(),
{Pid, node(Pid)}, self()}),
%% Wait for global to tell us to remove lock.
receive
{cancel, Node, _Tag} ->
%% All conflicts are resolved,
%% remove lock.
?PRINT({node(), self(), locked1}),
del_lock(LockId, Known1);
{'EXIT', Pid, _} ->
?PRINT({node(), self(), locked2}),
%% Other node died;
%% remove lock and ignore him.
del_lock(LockId, Known1),
link(Global)
end,
remove_node(Node, S#multi.others);
false ->
S#multi.others
end;
false ->
?P2({the_locker, spontaneous, node(), Node, not_there}),
Pid ! {lock_set, self(), false, NewKnown},
S#multi.others
end,
loop_the_locker(Global, S#multi{others = Others});
Other when element(1, Other) /= nodeup ->
?P2({the_locker, other_msg, Other}),
loop_the_locker(Global, S)
after Timeout ->
NewKnown = gen_server:call(global_name_server, get_known),
[{Node, HisTheLocker, HisKnown, MyTag} | Rest] = S#multi.others,
BothsKnown = HisKnown -- (HisKnown -- NewKnown),
Known1 = if
node() < Node ->
[node() | NewKnown];
true ->
[node() | NewKnown] -- BothsKnown
end,
?P2({picking, node(), Node}),
case lists:member(Node, NewKnown) of
false ->
LockId = {global, self()},
?P2({lock2, node()}),
IsLockSet = set_lock(LockId, Known1, 1),
Others =
case IsLockSet of
true ->
HisTheLocker ! {lock_set, self(),
IsLockSet, NewKnown},
%% OTP-4902
lock_set_loop(Global, S,
Node, MyTag, Rest,
Known1,
LockId);
false ->
?P2({the_locker, not_locked, node(),
Node}),
S#multi.others
end,
loop_the_locker(Global, S#multi{known=NewKnown,
others = Others});
true ->
?P2({is_known, node(), Node}),
loop_the_locker(Global, S#multi{known=NewKnown,
others = Rest})
end
end.
lock_set_loop(Global, S, Node, MyTag, Rest, Known1, LockId) ->
receive
{lock_set, P, true, _} when node(P) == Node ->
?P2({the_locker, both_set, node(), Node}),
%% do sync
gen_server:cast(global_name_server, {lock_is_set, Node, MyTag}),
?P1({lock_sync_done, time(), node(), {Pid, node(Pid)}, self()}),
%% Wait for global to tell us to remove lock.
receive
{cancel, Node, _} ->
%% All conflicts are resolved, remove lock.
?PRINT({node(), self(), locked1}),
del_lock(LockId, Known1);
{'EXIT', _Pid, _} ->
?PRINT({node(), self(), locked2}),
%% Other node died; remove lock and ignore him.
del_lock(LockId, Known1),
link(Global)
end,
Rest;
{lock_set, P, false, _} when node(P) == Node ->
?P2({the_locker, not_both_set, node(), Node}),
del_lock(LockId, Known1),
S#multi.others;
{cancel, Node, _} ->
?P2({the_locker, cancel2, node(), Node}),
del_lock(LockId, Known1),
remove_node(Node, S#multi.others);
{'EXIT', _, _} ->
?P2({the_locker, exit, node(), Node}),
del_lock(LockId, Known1),
S#multi.others
after
%% OTP-4902
%% A cyclic deadlock could occur in rare cases where three or
%% more nodes waited for a reply from each other.
%% Therefore, reject lock_set attempts in this state from
%% nodes < this node (its enough if at least one node in
%% the cycle rejects and thus breaks the deadlock)
5000 ->
reject_lock_set(),
lock_set_loop(Global, S, Node, MyTag, Rest, Known1, LockId)
end.
reject_lock_set() ->
receive
{lock_set, P, true, _} when node(P) < node() ->
P ! {lock_set, self(), false, []},
reject_lock_set()
after
0 ->
true
end.
start_locker(Node, Known, MyTag, Global, TheLocker) ->
%% No link here! The del_lock call would delete the link anyway.
%% global_name_server has control of these processes anyway...
%% When the locker process exits due to being sent the 'cancel' message
%% by the server, the server then removes it from its tables.
%% When the locker terminates due to other reasons, the server must
%% be told, so we make a link to it just before exiting.
spawn(?MODULE, init_locker, [Node, Known, MyTag, Global, TheLocker]).
init_locker(Node, Known, MyTag, Global, TheLocker) ->
process_flag(trap_exit, true),
?PRINT({init_locker, node(), self(), Node}),
?P1({init_locker, time(), node(), self(), Node}),
receive
{his_locker, Pid, HisKnown} ->
?PRINT({init_locker, node(), self(), his_locker, Node}),
link(Pid),
%% If two nodes in a group of nodes first disconnect
%% and then reconnect, this causes global to deadlock.
%% This because both of the reconnecting nodes
%% tries to set lock on the other nodes in the group.
%% This is solved by letting only one of the reconneting nodes set the lock.
BothsKnown = HisKnown -- (HisKnown -- Known),
?P({loop_locker1, node(), {Pid, node(Pid)}}),
Res = loop_locker(Node, Pid, Known, 1, MyTag, BothsKnown, Global),
?P({loop_locker2, node(), {Pid, node(Pid)}}),
Res;
{his_locker_new, HisTheLocker, {Known1, Known2}} ->
%% slide into the vsn 4 stuff
?P2({his_locker_new, node()}),
HisTheLocker ! {his_the_locker, TheLocker, Known1, Known2},
exit(normal);
cancel ->
?PRINT({init_locker, node(), self(), cancel, Node}),
exit(normal)
end.
loop_locker(Node, Pid, Known0, Try, MyTag, BothsKnown, Global) ->
Known = if
node() < Node ->
[node() | Known0];
true ->
[node() | Known0] -- BothsKnown
end,
?PRINT({locking, node(), self(), Known}),
LockId = {global, self()},
?P2({lock3, node()}),
IsLockSet = set_lock(LockId, Known, 1),
?P({loop_locker, IsLockSet,
node(), {Pid, node(Pid)}, self(), Try}),
?P1({loop_locker, time(), IsLockSet,
node(), {Pid, node(Pid)}, self(), Try}),
?PRINT({locking1, node(), self(), Known, IsLockSet}),
%% Tell other node that we managed to get the lock.
Pid ! {lock, ?locker_vsn, IsLockSet, Known},
Pid ! {lock, IsLockSet, Known},
Pid ! {lock, IsLockSet},
%% Wait for other node's result.
receive
%% R7 patched and later
{lock, _LockerVsn, true, _} when IsLockSet == true ->
receive
{lock, _} ->
ok
end,
receive
{lock, _, _} ->
ok
end,
?PRINT({node(), self(), locked}),
%% Now we got the lock in both partitions. Tell
%% global, and let him resolve name conflict.
?P1({lock_sync, time(), node(), {Pid, node(Pid)}, self()}),
gen_server:cast(global_name_server, {lock_is_set, Node, MyTag}),
?P1({lock_sync_done, time(), node(), {Pid, node(Pid)}, self()}),
%% Wait for global to tell us to remove lock.
receive
cancel ->
%% All conflicts are resolved, remove lock.
?PRINT({node(), self(), locked1}),
del_lock(LockId, Known);
{'EXIT', Pid, _} ->
?PRINT({node(), self(), locked2}),
%% Other node died; remove lock and ignore him.
del_lock(LockId, Known),
link(Global)
end;
{lock, _LockerVsn, _, HisKnown} ->
receive
{lock, _} ->
ok
end,
receive
{lock, _, _} ->
ok
end,
%% Some of us failed to get the lock; try again
?PRINT({node(), self(), locked0}),
d_lock(IsLockSet, LockId, Known),
try_again_locker(Node, Pid, Try, MyTag, HisKnown, Global);
%% R7 unpatched
{lock, true, _} when IsLockSet == true ->
?PRINT({node(), self(), locked}),
%% Now we got the lock in both partitions. Tell
%% global, and let him resolve name conflict.
gen_server:cast(global_name_server, {lock_is_set, Node, MyTag}),
%% Wait for global to tell us to remove lock.
receive
cancel ->
%% All conflicts are resolved, remove lock.
?PRINT({node(), self(), locked1}),
del_lock(LockId, Known);
{'EXIT', Pid, _} ->
?PRINT({node(), self(), locked2}),
%% Other node died; remove lock and ignore him.
del_lock(LockId, Known),
link(Global)
end;
{lock, _, HisKnown} ->
%% Some of us failed to get the lock; try again
?PRINT({node(), self(), locked0}),
d_lock(IsLockSet, LockId, Known),
try_again_locker(Node, Pid, Try, MyTag, HisKnown, Global);
%% R6 and earlier
{lock, true} when IsLockSet == true ->
?PRINT({node(), self(), locked}),
%% Now we got the lock in both partitions. Tell
%% global, and let him resolve name conflict.
gen_server:cast(global_name_server, {lock_is_set, Node, MyTag}),
%% Wait for global to tell us to remove lock.
receive
cancel ->
%% All conflicts are resolved, remove lock.
?PRINT({node(), self(), locked1}),
del_lock(LockId, Known);
{'EXIT', Pid, _} ->
?PRINT({node(), self(), locked2}),
%% Other node died; remove lock and ignore him.
del_lock(LockId, Known),
link(Global)
end;
{lock, _} ->
%% Some of us failed to get the lock; try again
?PRINT({node(), self(), locked0}),
d_lock(IsLockSet, LockId, Known),
try_again_locker(Node, Pid, Try, MyTag, BothsKnown, Global);
{'EXIT', Pid, _} ->
%% Other node died; remove lock and ignore him.
?PRINT({node(), self(), locked7}),
d_lock(IsLockSet, LockId, Known),
link(Global);
cancel ->
?PRINT({node(), self(), locked8}),
d_lock(IsLockSet, LockId, Known)
end.
d_lock(true, LockId, Known) -> del_lock(LockId, Known);
d_lock(false, _, _) -> ok.
try_again_locker(Node, Pid, Try, MyTag, HisKnown, Global) ->
?PRINT({try_again, node(), self(), Node, Pid, Known, Try, MyTag}),
?P1({try_again, time(), node(), self(), Node, Pid, Known, Try, MyTag}),
random_sleep(Try),
?P1({try_again2, time(), node(), self(), Node, Pid, Known, Try, MyTag}),
NewKnown = gen_server:call(global_name_server, get_known),
case lists:member(Node, NewKnown) of
false ->
BothsKnown1 = HisKnown -- (HisKnown -- NewKnown),
?PRINT({node(), self(), Node, again, notknown}),
?PRINT({bothknown, BothsKnown, BothsKnown1}),
loop_locker(Node, Pid, NewKnown, Try+1, MyTag,
BothsKnown1, Global);
true ->
?PRINT({node(), self(), Node, again, known}),
link(Global),
%% Node is already handled, we are ready.
ok
end.
cancel_locker(Node, S) ->
%% multi
?P2({cancel, node(), Node, get({sync_tag_my, Node})}),
S#state.the_locker ! {cancel, Node, get({sync_tag_my, Node})},
Lockers = S#state.lockers,
case lists:keysearch(Node, 1, Lockers) of
{value, {_, Pid}} ->
Pid ! cancel,
?PRINT({cancel, Node, lockers, node(), Lockers}),
S#state{lockers = lists:keydelete(Node, 1, Lockers)};
_ ->
S
end.
%% A node sent us his names. When a name clash is found, the resolve
%% function is called from the smaller node => all resolve funcs are called
%% from the same partition.
exchange_names([{Name, Pid, Method} |Tail], Node, Ops, Res) ->
case ets:lookup(global_names, Name) of
[{Name, Pid, _}] ->
exchange_names(Tail, Node, Ops, Res);
[{Name, Pid2, Method2}] when node() < Node ->
%% Name clash! Add the result of resolving to Res(olved).
%% We know that node(Pid) /= node(), so we don't
%% need to link/unlink to Pid.
Node2 = node(Pid2), %%&&&&&& check external node???
case rpc:call(Node2, ?MODULE, resolve_it,
[Method2, Name, Pid, Pid2]) of
Pid ->
dounlink(Pid2),
ets:insert(global_names, {Name, Pid, Method}),
Op = {insert, {Name, Pid, Method}},
exchange_names(Tail, Node, [Op | Ops], [Op | Res]);
Pid2 ->
Op = {insert, {Name, Pid2, Method2}},
exchange_names(Tail, Node, Ops, [Op | Res]);
none ->
dounlink(Pid2),
?P2({unregister, node(), Name, Pid2, node(Pid2)}),
ets:delete(global_names, Name),
Op = {delete, Name},
exchange_names(Tail, Node, [Op | Ops], [Op | Res]);
{badrpc, Badrpc} ->
error_logger:info_msg("global: badrpc ~w received when "
"conflicting name ~w was found",
[Badrpc, Name]),
dounlink(Pid2),
ets:insert(global_names, {Name, Pid, Method}),
Op = {insert, {Name, Pid, Method}},
exchange_names(Tail, Node, [Op | Ops], [Op | Res]);
Else ->
error_logger:info_msg("global: Resolve method ~w for "
"conflicting name ~w returned ~w~n",
[Method, Name, Else]),
dounlink(Pid2),
ets:delete(global_names, Name),
Op = {delete, Name},
exchange_names(Tail, Node, [Op | Ops], [Op | Res])
end;
[{Name, _Pid2, _}] ->
%% The other node will solve the conflict.
exchange_names(Tail, Node, Ops, Res);
_ ->
%% Entirely new name.
ets:insert(global_names, {Name, Pid, Method}),
exchange_names(Tail, Node,
[{insert, {Name, Pid, Method}} | Ops], Res)
end;
exchange_names([], _, Ops, Res) ->
{Ops, Res}.
resolve_it(Method, Name, Pid1, Pid2) ->
catch Method(Name, Pid1, Pid2).
minmax(P1,P2) ->
if node(P1) < node(P2) -> {P1, P2}; true -> {P2, P1} end.
random_exit_name(Name, Pid, Pid2) ->
{Min, Max} = minmax(Pid, Pid2),
error_logger:info_msg("global: Name conflict terminating ~w~n",
[{Name, Max}]),
exit(Max, kill),
Min.
random_notify_name(Name, Pid, Pid2) ->
{Min, Max} = minmax(Pid, Pid2),
Max ! {global_name_conflict, Name},
Min.
notify_all_name(Name, Pid, Pid2) ->
Pid ! {global_name_conflict, Name, Pid2},
Pid2 ! {global_name_conflict, Name, Pid},
none.
cnode(Name, Pid, Pid2) ->
{Min, Max} = minmax(Pid, Pid2),
error_logger:info_msg("global: Name conflict terminating ~w~n",
[{Name, Max}]),
Max ! {global_name_conflict, Name},
Min.
%% Only link to pids on our own node
dolink(Pid) when node(Pid) == node() ->
link(Pid);
dolink(_) -> ok.
%% Only link to pids on our own node
dolink_ext(Pid, RegNode) when RegNode == node() -> link(Pid);
dolink_ext(_, _) -> ok.
dounlink(Pid) when node(Pid) == node() ->
case ets:match(global_names, {'_', Pid, '_'}) of
[] ->
case is_pid_used(Pid) of
false ->
unlink(Pid);
true -> ok
end;
_ -> ok
end;
dounlink(_Pid) ->
ok.
is_pid_used(Pid) ->
is_pid_used(ets:tab2list(global_locks), Pid).
is_pid_used([], _Pid) ->
false;
is_pid_used([{_ResourceId, _LockReqId, Pids} | Tail], Pid) ->
case lists:member(Pid, Pids) of
true ->
true;
false ->
is_pid_used(Tail, Pid)
end.
%% check_exit/3 removes the Pid from affected tables.
%% This function needs to abcast the thingie since only the local
%% server is linked to the registered process (or the owner of the
%% lock). All the other servers rely on the nodedown mechanism.
check_exit(Deleter, Pid) ->
del_names(Deleter, Pid, ets:tab2list(global_names)),
del_locks(ets:tab2list(global_locks), Pid).
del_names(Deleter, Pid, [{Name, Pid, _Method} | Tail]) ->
%% First, delete the Pid from the local ets; then send to other nodes
ets:delete(global_names, Name),
ets:delete(global_names_ext, Name),
dounlink(Pid),
Deleter ! {delete_name,self(),Name,Pid},
del_names(Deleter, Pid, Tail);
del_names(Deleter, Pid, [_|T]) ->
del_names(Deleter, Pid, T);
del_names(_Deleter, _Pid, []) -> done.
del_locks([{ResourceId, LockReqId, Pids} | Tail], Pid) ->
case {lists:member(Pid, Pids), Pids} of
{true, [Pid]} ->
ets:delete(global_locks, ResourceId),
gen_server:abcast(nodes(), global_name_server,
{async_del_lock, ResourceId, Pid});
{true, _} ->
NewPids = lists:delete(Pid, Pids),
ets:insert(global_locks, {ResourceId, LockReqId, NewPids}),
gen_server:abcast(nodes(), global_name_server,
{async_del_lock, ResourceId, Pid});
_ ->
continue
end,
del_locks(Tail, Pid);
del_locks([], _Pid) -> done.
del_locks2([{ResourceId, LockReqId, Pids} | Tail], Pid) ->
case {lists:member(Pid, Pids), Pids} of
{true, [Pid]} ->
ets:delete(global_locks, ResourceId);
{true, _} ->
NewPids = lists:delete(Pid, Pids),
ets:insert(global_locks, {ResourceId, LockReqId, NewPids});
_ ->
continue
end,
del_locks2(Tail, Pid);
del_locks2([], _Pid) ->
done.
%% Unregister all Name/Pid pairs such that node(Pid) == Node
%% and delete all locks where node(Pid) == Node
do_node_down(Node) ->
do_node_down_names(Node, ets:tab2list(global_names)),
do_node_down_names_ext(Node, ets:tab2list(global_names_ext)),
do_node_down_locks(Node, ets:tab2list(global_locks)).
do_node_down_names(Node, [{Name, Pid, _Method} | T]) when node(Pid) == Node ->
ets:delete(global_names, Name),
do_node_down_names(Node, T);
do_node_down_names(Node, [_|T]) ->
do_node_down_names(Node, T);
do_node_down_names(_, []) -> ok.
%%remove all external names registered on the crashed node
do_node_down_names_ext(Node, [{Name, _Pid, Node} | T]) ->
ets:delete(global_names, Name),
ets:delete(global_names_ext, Name),
do_node_down_names_ext(Node, T);
do_node_down_names_ext(Node, [_|T]) ->
do_node_down_names_ext(Node, T);
do_node_down_names_ext(_, []) -> ok.
do_node_down_locks(Node, [{ResourceId, LockReqId, Pids} | T]) ->
case do_node_down_locks2(Pids, Node) of
[] ->
continue;
RemovePids ->
case Pids -- RemovePids of
[] ->
ets:delete(global_locks, ResourceId);
NewPids ->
ets:insert(global_locks, {ResourceId, LockReqId, NewPids})
end
end,
do_node_down_locks(Node, T);
do_node_down_locks(Node, [_|T]) ->
do_node_down_locks(Node, T);
do_node_down_locks(_, []) -> done.
do_node_down_locks2(Pids, Node) ->
do_node_down_locks2(Pids, Node, []).
do_node_down_locks2([], _Node, Res) ->
Res;
do_node_down_locks2([Pid | Pids], Node, Res) when node(Pid) == Node ->
do_node_down_locks2(Pids, Node, [Pid | Res]);
do_node_down_locks2([_ | Pids], Node, Res) ->
do_node_down_locks2(Pids, Node, Res).
get_names() ->
ets:tab2list(global_names).
get_names_ext() ->
ets:tab2list(global_names_ext).
random_sleep(Times) ->
case (Times rem 10) of
0 -> erase(random_seed);
_ -> ok
end,
case get(random_seed) of
undefined ->
{A1, A2, A3} = now(),
random:seed(A1, A2, A3 + erlang:phash(node(), 100000));
_ -> ok
end,
%% First time 1/4 seconds, then doubling each time up to 8 seconds max.
Tmax = if Times > 5 -> 8000;
true -> ((1 bsl Times) * 1000) div 8
end,
T = random:uniform(Tmax),
?P({random_sleep, node(), self(), Times, T}),
receive after T -> ok end.
dec(infinity) -> infinity;
dec(N) -> N-1.
send_again(Msg) ->
spawn_link(?MODULE, timer, [self(), Msg]).
timer(Pid, Msg) ->
random_sleep(5),
Pid ! Msg.
change_our_node_name(NewNode, S) ->
S#state{node_name = NewNode}.
%%-----------------------------------------------------------------
%% Each sync process corresponds to one call to sync. Each such
%% process asks the global_name_server on all Nodes if it is in sync
%% with Nodes. If not, that (other) node spawns a syncer process that
%% waits for global to get in sync with all Nodes. When it is in
%% sync, the syncer process tells the original sync process about it.
%%-----------------------------------------------------------------
start_sync(Nodes, From) ->
spawn_link(?MODULE, sync_init, [Nodes, From]).
sync_init(Nodes, From) ->
lists:foreach(fun(Node) -> monitor_node(Node, true) end, Nodes),
sync_loop(Nodes, From).
sync_loop([], From) ->
gen_server:reply(From, ok);
sync_loop(Nodes, From) ->
receive
{nodedown, Node} ->
monitor_node(Node, false),
sync_loop(lists:delete(Node, Nodes), From);
{synced, SNodes} ->
lists:foreach(fun(N) -> monitor_node(N, false) end, SNodes),
sync_loop(Nodes -- SNodes, From)
end.
%%%====================================================================================
%%% Get the current global_groups definition
%%%====================================================================================
check_sync_nodes() ->
case get_own_nodes() of
{ok, all} ->
nodes();
{ok, NodesNG} ->
%% global_groups parameter is defined, we are not allowed to sync
%% with nodes not in our own global group.
(nodes() -- (nodes() -- NodesNG));
{error, Error} ->
{error, Error}
end.
check_sync_nodes(SyncNodes) ->
case get_own_nodes() of
{ok, all} ->
SyncNodes;
{ok, NodesNG} ->
%% global_groups parameter is defined, we are not allowed to sync
%% with nodes not in our own global group.
OwnNodeGroup = (nodes() -- (nodes() -- NodesNG)),
IllegalSyncNodes = (SyncNodes -- [node() | OwnNodeGroup]),
case IllegalSyncNodes of
[] -> SyncNodes;
_ -> {error, {"Trying to sync nodes not defined in the own global group",
IllegalSyncNodes}}
end;
{error, Error} ->
{error, Error}
end.
get_own_nodes() ->
case global_group:get_own_nodes_with_errors() of
{error, Error} ->
{error, {"global_groups definition error", Error}};
OkTup ->
OkTup
end.
%%-----------------------------------------------------------------
%% The deleter process is a satellite process to global_name_server
%% that does background batch deleting of names when a process
%% that had globally registered names dies. It is started by and
%% linked to global_name_server.
%%-----------------------------------------------------------------
start_the_deleter(Global) ->
spawn_link(
fun () ->
loop_the_deleter(Global)
end).
loop_the_deleter(Global) ->
Deletions = collect_deletions(Global, []),
trans({global, self()},
fun() ->
lists:map(
fun ({Name,Pid}) ->
?P2({delete_name2, Name, Pid, nodes()}),
gen_server:abcast(nodes(), global_name_server,
{async_del_name, Name, Pid})
end, Deletions)
end,
nodes()),
loop_the_deleter(Global).
collect_deletions(Global, Deletions) ->
receive
{delete_name,Global,Name,Pid} ->
?P2({delete_name, node(), self(), Name, Pid, nodes()}),
collect_deletions(Global, [{Name,Pid}|Deletions]);
Other ->
error_logger:error_msg("The global_name_server deleter process "
"received an unexpected message:\n~p\n",
[Other]),
collect_deletions(Global, Deletions)
after case Deletions of
[] -> infinity;
_ -> 0
end ->
lists:reverse(Deletions)
end.