aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel/src/dist_ac.erl
diff options
context:
space:
mode:
authorErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
committerErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/kernel/src/dist_ac.erl
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/kernel/src/dist_ac.erl')
-rw-r--r--lib/kernel/src/dist_ac.erl1534
1 files changed, 1534 insertions, 0 deletions
diff --git a/lib/kernel/src/dist_ac.erl b/lib/kernel/src/dist_ac.erl
new file mode 100644
index 0000000000..5c62aa31e9
--- /dev/null
+++ b/lib/kernel/src/dist_ac.erl
@@ -0,0 +1,1534 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1997-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(dist_ac).
+
+-behaviour(gen_server).
+
+%% External exports
+-export([start_link/0,
+ load_application/2,
+ takeover_application/2,
+ permit_application/2,
+ permit_only_loaded_application/2]).
+
+-export([get_known_nodes/0]).
+
+%% Internal exports
+-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2,
+ code_change/3, send_timeout/3]).
+-export([info/0]).
+
+-import(lists, [zf/2, filter/2, map/2, foreach/2, foldl/3, mapfoldl/3,
+ keysearch/3, keydelete/3, keyreplace/4, member/2]).
+
+-define(AC, application_controller).
+-define(DIST_AC, ?MODULE).
+-define(LOCK_ID, ?MODULE).
+
+%% This is the protocol version for the dist_ac protcol (between nodes)
+-define(vsn, 1).
+
+%%%-----------------------------------------------------------------
+%%% This module implements the default Distributed Applications
+%%% Controller. It is possible to write other controllers, when
+%%% the functionality in this module are not sufficient.
+%%% The process cooperates with the application_controller.
+%%%-----------------------------------------------------------------
+
+%%-----------------------------------------------------------------
+%% Naming conventions:
+%% Appl = #appl
+%% AppName = atom()
+%%-----------------------------------------------------------------
+-record(state, {appls = [], tmp_locals = [], remote_started = [],
+ known = [], started = [], tmp_weights = [],
+ dist_loaded = [], t_reqs = [], s_reqs = [], p_reqs = []}).
+%%-----------------------------------------------------------------
+%% appls = [#appl()] - these are the applications we control
+%% tmp_locals = [{AppName, Weight, node()}] - tmp, info part of
+%% application startup for some distrib appls,
+%% not yet handled.
+%% remote_started = [{Node, AppName}] - info on apps started before
+%% we were started
+%% known = [Node] - These are the nodes known to us
+%% started = [AppName] - An ordered list of started applications
+%% (reversed start order)
+%% tmp_weight = [{AppName, MyWeight}] - tmp, if we're forced to
+%% send a dist_ac_weight message before we're prepared to,
+%% we remember the weight we sent here, so we can use
+%% it in the dist_ac_weight msgs later.
+%% dist_loaded = {{Name, Node}, HisNodes, Permission} - info on
+%% application loaded on other nodes (and own node)
+%% t_reqs = [{AppName, From}] - processes waiting for takeover
+%% to complete.
+%% s_reqs = [{AppName, From}] - processes waiting for stop
+%% to complete.
+%% p_reqs = [{From, AppName, Bool, [Node]] - outstanding permit.
+%% Nodes is a list of nodes we're still waiting for.
+%%-----------------------------------------------------------------
+
+-record(appl, {name, id, restart_time = 0, nodes = [], run = []}).
+
+%%-----------------------------------------------------------------
+%% id = local | undefined | {distributed, node()} | waiting | run_waiting |
+%% {failover, Node} | {takeover, Node}
+%% local : local application
+%% undefined : not yet started
+%% {distributed, Node} : running on another node, we're standby
+%% {failover, Node} : failover from Node
+%% {takeover, Node} : takeover from Node
+%% waiting : other node went down, we're waiting for a timeout
+%% to takeover it. From = pid() | undefined
+%% run_waiting : we have decided to start the app; wait for the
+%% AC result
+%%-----------------------------------------------------------------
+
+start_link() ->
+ case gen_server:start_link({local, ?DIST_AC}, ?MODULE, [], []) of
+ {ok, Pid} ->
+ gen_server:cast(?DIST_AC, init_sync),
+ {ok, Pid};
+ Else ->
+ Else
+ end.
+
+
+%%-----------------------------------------------------------------
+%% Func: load_application(AppName, DistNodes)
+%% Args: AppName = atom()
+%% DistNodes = default | {AppName, Time, [node() | {node()...}]}
+%% Purpose: Notifies the dist_ac about distributed nodes for an
+%% application. DistNodes overrides the kernel 'distributed'
+%% parameter.
+%% Returns: ok | {error, Reason}
+%%-----------------------------------------------------------------
+load_application(AppName, DistNodes) ->
+ gen_server:call(?DIST_AC, {load_application, AppName, DistNodes}, infinity).
+
+takeover_application(AppName, RestartType) ->
+ case validRestartType(RestartType) of
+ true ->
+ wait_for_sync_dacs(),
+ Nodes = get_nodes(AppName),
+ global:trans(
+ {?LOCK_ID, self()},
+ fun() ->
+ gen_server:call(
+ ?DIST_AC,
+ {takeover_application, AppName, RestartType},
+ infinity)
+ end,
+ Nodes);
+ false ->
+ {error, {invalid_restart_type, RestartType}}
+ end.
+
+%%-----------------------------------------------------------------
+%% This function controls which applications are permitted to run. If
+%% an application X runs when this function is called as
+%% permit_application(X, false), it is moved to another node where it
+%% is permitted to run (distributed applications only). If there is
+%% no such node, the application is stopped. (I.e. local applications
+%% are always stopped, and distributed applications with no other node
+%% alive are stopped as well.) If later a call to
+%% permit_application(X, true) is made, X is restarted.
+%% For example, suppose applications app1 and app2 are started and
+%% running.
+%% If we evaluate
+%% permit_application(app2, false)
+%% app2 is stopped and app1 only is running.
+%% If we now evaluate
+%% permit_application(app2, true),
+%% permit_application(app3, true)
+%% app2 is restarted, but not app3, since it hasn't been started by a
+%% call to start_application.
+%%-----------------------------------------------------------------
+permit_application(AppName, Bool) ->
+ wait_for_sync_dacs(),
+ LockId = {?LOCK_ID, self()},
+ global:trans(
+ LockId,
+ fun() ->
+ gen_server:call(?DIST_AC,
+ {permit_application, AppName, Bool, LockId, started},
+ infinity)
+ end).
+
+permit_only_loaded_application(AppName, Bool) ->
+ wait_for_sync_dacs(),
+ LockId = {?LOCK_ID, self()},
+ global:trans(
+ LockId,
+ fun() ->
+ gen_server:call(?DIST_AC,
+ {permit_application, AppName, Bool, LockId, only_loaded},
+ infinity)
+ end).
+
+get_nodes(AppName) ->
+ gen_server:call(?DIST_AC, {get_nodes, AppName}, infinity).
+
+get_known_nodes() ->
+ gen_server:call(?DIST_AC, get_known_nodes).
+
+%%%-----------------------------------------------------------------
+%%% call-back functions from gen_server
+%%%-----------------------------------------------------------------
+init([]) ->
+ process_flag(trap_exit, true),
+ {ok, #state{}}.
+
+sync_dacs(Appls) ->
+ Res = global:trans({?LOCK_ID, sync_dacs},
+ fun() ->
+ Nodes = introduce_me(nodes(), Appls),
+ wait_dacs(Nodes, [node()], Appls, [])
+ end),
+ ets:insert(ac_tab, {sync_dacs, ok}),
+ Res.
+
+introduce_me(Nodes, Appls) ->
+ Msg = {dist_ac_new_node, ?vsn, node(), Appls, []},
+ filter(fun(Node) ->
+ %% This handles nodes without DACs
+ case rpc:call(Node, erlang, whereis, [?DIST_AC]) of
+ Pid when is_pid(Pid) ->
+ Pid ! Msg,
+ true;
+ _ ->
+ false
+ end
+ end, Nodes).
+
+wait_dacs([Node | Nodes], KnownNodes, Appls, RStarted) ->
+ monitor_node(Node, true),
+ receive
+ %% HisAppls =/= [] is the case when our node connects to a running system
+ %%
+ %% It is always the responsibility of newer versions to understand
+ %% older versions of the protocol. As we don't have any older
+ %% versions (that are supposed to work with this version), we
+ %% don't handle version mismatch here.
+ {dist_ac_new_node, _Vsn, Node, HisAppls, HisStarted} ->
+ monitor_node(Node, false),
+ NRStarted = RStarted ++ HisStarted,
+ NAppls = dist_merge(Appls, HisAppls, Node),
+ wait_dacs(Nodes, [Node | KnownNodes], NAppls, NRStarted);
+ {nodedown, Node} ->
+ monitor_node(Node, false),
+ wait_dacs(Nodes, KnownNodes, Appls, RStarted)
+ end;
+wait_dacs([], KnownNodes, Appls, RStarted) ->
+ {KnownNodes, Appls, RStarted}.
+
+
+info() ->
+ gen_server:call(?DIST_AC, info).
+
+
+%%-----------------------------------------------------------------
+%% All functions that can affect which applications are running
+%% execute within a global lock, to ensure that they are not
+%% executing at the same time as sync_dacs. However, to avoid a
+%% deadlock situation where e.g. permit_application gets the lock
+%% before sync_dacs, this function is used to ensure that the local
+%% sync_dacs always gets the lock first of all. The lock is still
+%% used to not interfere with sync_dacs on other nodes.
+%%-----------------------------------------------------------------
+wait_for_sync_dacs() ->
+ case catch ets:lookup(ac_tab, sync_dacs) of
+ [{sync_dacs, ok}] -> ok;
+ _ ->
+ receive after 100 -> ok end,
+ wait_for_sync_dacs()
+ end.
+
+handle_cast(init_sync, _S) ->
+ %% When the dist_ac is started, it receives this msg, and gets into
+ %% the receive loop. 'go' is sent from the kernel_config proc when
+ %% all nodes that should be pinged has been pinged. The reason for this
+ %% is that dist_ac syncs with the other nodes at start-up. That is,
+ %% it does _not_ handle partitioned nets! The other nodes tries to call
+ %% the local name dist_ac, which means that this name must be registered
+ %% before the distribution. But it can't sync until after the distribution
+ %% is started. Therefore, this 'go'-thing.
+ receive
+ {go, KernelConfig} ->
+ Appls = case application:get_env(kernel, distributed) of
+ {ok, D} -> dist_check(D);
+ undefined -> []
+ end,
+
+ dist_take_control(Appls),
+ %% kernel_config waits for dist_ac to take control over its
+ %% applications. By this we can be sure that the kernel
+ %% application hasn't completed its start before dist_ac has
+ %% taken control over its applications. (OTP-3509)
+ KernelConfig ! dist_ac_took_control,
+
+ %% we're really just interested in nodedowns.
+ net_kernel:monitor_nodes(true),
+
+ {Known, NAppls, RStarted} = sync_dacs(Appls),
+
+ {noreply,
+ #state{appls = NAppls, known = Known, remote_started = RStarted}}
+ end.
+
+
+handle_call(info, _From, S) ->
+ {reply, S, S};
+
+
+
+handle_call({load_application, AppName, DistNodes}, _From, S) ->
+ Appls = S#state.appls,
+ case catch dist_replace(DistNodes, AppName, Appls) of
+ {error, Error} ->
+ {reply, {error, Error}, S};
+ {'EXIT', R} ->
+ {stop, R, {error, R}, S};
+ NAppls ->
+ NewS = case dist_find_nodes(NAppls, AppName) of
+ [] -> % No distrib nodes; we ignore it
+ S;
+ _Nodes ->
+ ensure_take_control(AppName, Appls),
+ {ok, S2} = load(AppName, S#state{appls = NAppls}),
+ S2
+ end,
+ {reply, ok, NewS}
+ end;
+
+handle_call({takeover_application, AppName, RestartType}, From, S) ->
+ Appls = S#state.appls,
+ case keysearch(AppName, #appl.name, Appls) of
+ {value, Appl} when element(1, Appl#appl.id) =:= distributed ->
+ {distributed, Node} = Appl#appl.id,
+ ac_takeover(req, AppName, Node, RestartType),
+ NAppl = Appl#appl{id = takeover},
+ NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
+ TR = S#state.t_reqs,
+ {noreply, S#state{appls = NAppls,
+ t_reqs = [{AppName, From} | TR]}};
+ {value, #appl{id = local}} ->
+ {reply, {error, {already_running_locally, AppName}}, S};
+ _ ->
+ {reply, {error, {not_running_distributed, AppName}}, S}
+ end;
+
+handle_call({permit_application, AppName, Bool, LockId, StartInfo}, From, S) ->
+ case lists:keymember(AppName, #appl.name, S#state.appls) of
+ false ->
+ %% This one covers the case with permit for non-distributed
+ %% applications. This shouldn't be handled like this, and not
+ %% here, but we have to be backwards-compatible.
+ case application_controller:get_loaded(AppName) of
+ {true, _} when not Bool ->
+ ac_stop_it(AppName),
+ {reply, ok, S};
+ {true, _} when Bool ->
+ ac_start_it(req, AppName),
+ {reply, ok, S};
+ false ->
+ {reply, {error, {not_loaded, AppName}}, S}
+ end;
+ true ->
+ NAppls = dist_update_run(S#state.appls, AppName, node(), Bool),
+ NewS = S#state{appls = NAppls},
+ %% Check if the application is running
+ IsRunning = keysearch(AppName, #appl.name, NAppls),
+ IsMyApp = case IsRunning of
+ {value, #appl{id = local}} -> true;
+ _ -> false
+ end,
+ %% Tell everyone about the new permission
+ Nodes = dist_flat_nodes(NAppls, AppName),
+ Msg = {dist_ac_new_permission, node(), AppName, Bool, IsMyApp},
+ send_msg(Msg, Nodes),
+ case StartInfo of
+ only_loaded ->
+ {reply, ok, NewS};
+ started ->
+ permit(Bool, IsRunning, AppName, From, NewS, LockId)
+ end
+ end;
+
+%%-----------------------------------------------------------------
+%% The distributed parameter is changed. Update the parameters
+%% but the applications are actually not moved to other nodes
+%% even if they should.
+%%-----------------------------------------------------------------
+handle_call({distribution_changed, NewDistribution}, _From, S) ->
+ Appls = S#state.appls,
+ NewAppls = dist_change_update(Appls, NewDistribution),
+ NewS = S#state{appls = NewAppls},
+ {reply, ok, NewS};
+
+
+handle_call({get_nodes, AppName}, _From, S) ->
+ Alive = intersection(dist_flat_nodes(S#state.appls, AppName),
+ S#state.known),
+ {reply, Alive, S};
+
+handle_call(get_known_nodes, _From, S) ->
+ {reply, S#state.known, S}.
+
+
+handle_info({ac_load_application_req, AppName}, S) ->
+ {ok, NewS} = load(AppName, S),
+ ?AC ! {ac_load_application_reply, AppName, ok},
+ {noreply, NewS};
+
+handle_info({ac_application_unloaded, AppName}, S) ->
+ {ok, NewS} = unload(AppName, S),
+ {noreply, NewS};
+
+handle_info({ac_start_application_req, AppName}, S) ->
+ %% We must decide if we or another node should start the application
+ Lock = {?LOCK_ID, self()},
+ case global:set_lock(Lock, [node()], 0) of
+ true ->
+ S2 = case catch start_appl(AppName, S, reply) of
+ {ok, NewS, _} ->
+ NewS;
+ {error, R} ->
+ ?AC ! {ac_start_application_reply, AppName, {error,R}},
+ S
+ end,
+ global:del_lock(Lock),
+ {noreply, S2};
+ false ->
+ send_after(100, {ac_start_application_req, AppName}),
+ {noreply, S}
+ end;
+
+handle_info({ac_application_run, AppName, Res}, S) ->
+ %% We ordered a start, and here's the result. Tell all other nodes.
+ Appls = S#state.appls,
+ Nodes = S#state.known,
+ %% Send this to _all_ known nodes, as any node could sync
+ %% on this app (not only nodes that can run it).
+ send_msg({dist_ac_app_started, node(), AppName, Res}, Nodes),
+ NId = case Res of
+ ok -> local;
+ {error, _R} -> undefined
+ end,
+ {value, Appl} = keysearch(AppName, #appl.name, Appls),
+ %% Check if we have somebody waiting for the takeover result
+ NTReqs = del_t_reqs(AppName, S#state.t_reqs, Res),
+ NAppl = Appl#appl{id = NId},
+ NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
+ {noreply, S#state{appls = NAppls, t_reqs = NTReqs}};
+
+
+handle_info({ac_application_not_run, AppName}, S) ->
+ %% We ordered a stop, and now it has stopped
+ {value, Appl} = keysearch(AppName, #appl.name, Appls = S#state.appls),
+ %% Check if we have somebody waiting for the takeover result;
+ %% if somebody called stop just before takeover was handled,
+ NTReqs = del_t_reqs(AppName, S#state.t_reqs, {error, stopped}),
+ %% Check if we have somebody waiting for stop to return
+ SReqs = filter(fun({Name, From2}) when Name =:= AppName ->
+ gen_server:reply(From2, ok),
+ false;
+ (_) ->
+ true
+ end, S#state.s_reqs),
+ RS = case Appl#appl.id of
+ local ->
+ send_msg({dist_ac_app_stopped, AppName}, S#state.known),
+ S#state.remote_started;
+ {distributed, Node} ->
+ [{Node, AppName} | S#state.remote_started];
+ _ ->
+ S#state.remote_started
+ end,
+ NAppl = Appl#appl{id = undefined},
+ NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
+ {noreply, S#state{appls = NAppls, t_reqs = NTReqs, s_reqs = SReqs,
+ remote_started = RS}};
+
+handle_info({ac_application_stopped, AppName}, S) ->
+ %% Somebody called application:stop - reset state as it was before
+ %% the application was started.
+ {value, Appl} = keysearch(AppName, #appl.name, Appls = S#state.appls),
+ %% Check if we have somebody waiting for the takeover result;
+ %% if somebody called stop just before takeover was handled,
+ NTReqs = del_t_reqs(AppName, S#state.t_reqs, {error, stopped}),
+ %% Check if we have somebody waiting for stop to return
+ SReqs = filter(fun({Name, From2}) when Name =:= AppName ->
+ gen_server:reply(From2, ok),
+ false;
+ (_) ->
+ true
+ end, S#state.s_reqs),
+ RS = case Appl#appl.id of
+ local ->
+ send_msg({dist_ac_app_stopped, AppName}, S#state.known),
+ S#state.remote_started;
+ {distributed, Node} ->
+ [{Node, AppName} | S#state.remote_started];
+ _ ->
+ S#state.remote_started
+ end,
+ NAppl = Appl#appl{id = undefined},
+ NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
+ Started = lists:delete(AppName, S#state.started),
+ {noreply, S#state{appls = NAppls, started = Started,
+ t_reqs = NTReqs, s_reqs = SReqs,
+ remote_started = RS}};
+
+
+%%-----------------------------------------------------------------
+%% A new node gets running.
+%% Send him info about our started distributed applications.
+%%-----------------------------------------------------------------
+handle_info({dist_ac_new_node, _Vsn, Node, HisAppls, []}, S) ->
+ Appls = S#state.appls,
+ MyStarted = zf(fun(Appl) when Appl#appl.id =:= local ->
+ {true, {node(), Appl#appl.name}};
+ (_) ->
+ false
+ end, Appls),
+ {?DIST_AC, Node} ! {dist_ac_new_node, ?vsn, node(), Appls, MyStarted},
+ NAppls = dist_merge(Appls, HisAppls, Node),
+ {noreply, S#state{appls = NAppls, known = [Node | S#state.known]}};
+
+handle_info({dist_ac_app_started, Node, Name, Res}, S) ->
+ case {keysearch(Name, #appl.name, S#state.appls), lists:member(Name, S#state.started)} of
+ {{value, Appl}, true} ->
+ Appls = S#state.appls,
+ NId = case Appl#appl.id of
+ _ when element(1, Res) =:= error ->
+ %% Start of appl on some node failed.
+ %% Set Id to undefined. That node will have
+ %% to take some actions, e.g. reboot
+ undefined;
+ {distributed, _} ->
+ %% Another node tookover from some node. Update
+ %% appl list.
+ {distributed, Node};
+ local ->
+ %% Another node tookover from me; stop my application
+ %% and update the running list.
+ {distributed, Node};
+ _ ->
+ %% Another node started appl. Update appl list.
+ {distributed, Node}
+ end,
+ ac_started(req, Name, Node),
+ NAppl = Appl#appl{id = NId},
+ NAppls = keyreplace(Name, #appl.name, Appls, NAppl),
+ TmpWeights = keydelete_all(Name, 1, S#state.tmp_weights),
+ NewS = S#state{appls = NAppls, tmp_weights = TmpWeights},
+ NPermitReq = req_del_permit_false(NewS#state.p_reqs, Name),
+ case catch req_start_app(NewS#state{p_reqs = NPermitReq}, Name) of
+ {error, R} ->
+ {stop, R};
+ {ok, NewS2} ->
+ {noreply, NewS2}
+ end;
+ {_, _} ->
+ %% The app has not been started at this node yet; remember this in
+ %% remote started.
+ NRStarted = [{Node, Name} | S#state.remote_started],
+ {noreply, S#state{remote_started = NRStarted}}
+ end;
+
+handle_info({dist_ac_app_stopped, AppName}, S) ->
+ Appls = S#state.appls,
+ case keysearch(AppName, #appl.name, Appls) of
+ false ->
+ RStarted = keydelete(AppName, 2, S#state.remote_started),
+ {noreply, S#state{remote_started = RStarted}};
+ {value, Appl} ->
+ NAppl = Appl#appl{id = undefined},
+ NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
+ RStarted = keydelete(AppName, 2, S#state.remote_started),
+ {noreply, S#state{appls = NAppls, remote_started = RStarted}}
+ end;
+
+handle_info({dist_ac_weight, Name, Weight, Node}, S) ->
+ %% This means another node starts up, and will eventually take over
+ %% this appl. We have a situation like: {Name, [{Node}, node()]}
+ %% Node sends us this msg, and we must respond. It doesn't really
+ %% matter what we send him; but it must be a dist_ac_weight msg.
+ %% Another situation is {Name, [RNode, {node()}, Node]}.
+ %%
+ %% Yet another situation is that the node where Name was running crashed,
+ %% and Node has got the nodedown message, but we haven't. In this case,
+ %% we must send a correct weight to Node. i.e. the same weight that
+ %% we'll send to him later, when we get the nodedown message.
+ case keysearch(Name, #appl.name, S#state.appls) of
+ {value, Appl} ->
+ Id = Appl#appl.id,
+ case Id of
+ run_waiting ->
+ {?DIST_AC, Node} ! {dist_ac_weight, Name, 0, node()},
+ {noreply, S};
+ undefined ->
+ {noreply,
+ S#state{tmp_locals = [{Name, Weight, Node} |
+ S#state.tmp_locals]}};
+ {takeover, _} ->
+ {noreply,
+ S#state{tmp_locals = [{Name, Weight, Node} |
+ S#state.tmp_locals]}};
+ {failover, _} ->
+ {noreply,
+ S#state{tmp_locals = [{Name, Weight, Node} |
+ S#state.tmp_locals]}};
+ _ ->
+ MyWeight = get_cached_weight(Name, S),
+ {?DIST_AC, Node} ! {dist_ac_weight, Name, MyWeight, node()},
+ NTWs = keyreplaceadd(Name, 1, S#state.tmp_weights,
+ {Name, MyWeight}),
+ {noreply, S#state{tmp_weights = NTWs}}
+ end;
+ _ ->
+ {noreply,
+ S#state{tmp_locals = [{Name, Weight, Node} | S#state.tmp_locals]}}
+ end;
+
+%%-----------------------------------------------------------------
+%% A node died. Check if we should takeover some applications.
+%%-----------------------------------------------------------------
+handle_info({nodedown, Node}, S) ->
+ AppNames = dist_get_runnable(S#state.appls),
+ HisAppls = filter(fun(#appl{name = Name, id = {distributed, N}})
+ when Node =:= N -> lists:member(Name, AppNames);
+ (_) -> false
+ end,
+ S#state.appls),
+ Appls2 = zf(fun(Appl) when Appl#appl.id =:= {distributed, Node} ->
+ case lists:member(Appl#appl.name, AppNames) of
+ true ->
+ {true, Appl#appl{id = {failover, Node}}};
+ false ->
+ ac_not_running(Appl#appl.name),
+ {true, Appl#appl{id = undefined}}
+ end;
+ (_) ->
+ true
+ end,
+ S#state.appls),
+ RStarted = filter(fun({Node2, _Name}) when Node2 =:= Node -> false;
+ (_) -> true
+ end,
+ S#state.remote_started),
+ Appls3 = dist_del_node(Appls2, Node),
+ {NPermitReq, Appls4, SReqs} = req_del_node(S, Node, Appls3),
+ NKnown = lists:delete(Node, S#state.known),
+ NewS = S#state{appls = Appls4, p_reqs = NPermitReq, known = NKnown,
+ s_reqs = SReqs,
+ remote_started = RStarted},
+ restart_appls(HisAppls),
+ {noreply, NewS};
+
+handle_info({dist_ac_app_loaded, Node, Name, HisNodes, Permission, HeKnowsMe},
+ S) ->
+ Nodes = dist_find_nodes(Appls = S#state.appls, Name),
+ case is_loaded(Name, S) of
+ true ->
+ case equal_nodes(Nodes, HisNodes) of
+ true ->
+ NAppls = dist_update_run(Appls, Name, Node, Permission),
+ if
+ not HeKnowsMe ->
+ %% We've got it loaded, but he doesn't know -
+ %% he's a new node connecting to us.
+ Msg = {dist_ac_app_loaded, node(), Name,
+ Nodes, dist_is_runnable(Appls, Name), true},
+ {?DIST_AC, Node} ! Msg;
+ true ->
+ ok
+ end,
+ {noreply, S#state{appls = NAppls}};
+ false ->
+ dist_mismatch(Name, Node)
+ end;
+ false ->
+ Load =[{{Name, Node}, HisNodes, Permission} | S#state.dist_loaded],
+ {noreply, S#state{dist_loaded = Load}}
+ end;
+
+handle_info({dist_ac_app_unloaded, Node, Name}, S) ->
+ Appls = dist_update_run(S#state.appls, Name, Node, undefined),
+ Load = keydelete({Name, Node}, 1, S#state.dist_loaded),
+ {noreply, S#state{appls = Appls, dist_loaded = Load}};
+
+
+handle_info({dist_ac_new_permission, Node, AppName, false, IsHisApp}, S) ->
+ Appls = dist_update_run(S#state.appls, AppName, Node, false),
+ NewS = S#state{appls =Appls},
+ case dist_is_runnable(Appls, AppName) of
+ true when IsHisApp ->
+ case catch start_appl(AppName, NewS, req) of
+ {ok, NewS2, _} ->
+ {noreply, NewS2};
+ {error, _R} -> % if app was permanent, AC will shutdown the node
+ {noreply, NewS}
+ end;
+ _ ->
+ {noreply, NewS}
+ end;
+handle_info({dist_ac_new_permission, Node, AppName, true, _IsHisApp}, S) ->
+ Appls = dist_update_run(S#state.appls, AppName, Node, true),
+ {noreply, S#state{appls = Appls}};
+
+handle_info({internal_restart_appl, Name}, S) ->
+ case restart_appl(Name, S) of
+ {error, R} ->
+ {stop, {error, R}, S};
+ NewS ->
+ {noreply, NewS}
+ end;
+
+handle_info(_, S) ->
+ {noreply, S}.
+
+terminate(_Reason, _S) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%-----------------------------------------------------------------
+%%% Internal functions
+%%%-----------------------------------------------------------------
+load(AppName, S) ->
+ Appls0 = S#state.appls,
+ %% Get the dist specification for the app on other nodes
+ DistLoaded = get_dist_loaded(AppName, Load1 = S#state.dist_loaded),
+ %% Get the local dist specification
+ Nodes = dist_find_nodes(Appls0, AppName),
+ FNodes = flat_nodes(Nodes),
+ %% Update dists spec with our local permission
+ Permission = get_default_permission(AppName),
+ Appls1 = dist_update_run(Appls0, AppName, node(), Permission),
+ %% Compare the local spec with other nodes's specs
+ %% If equal, update our spec with his current permission
+ {LoadedNodes, Appls2} =
+ mapfoldl(
+ fun({Node, HisNodes, HisPermission}, Appls) ->
+ case equal_nodes(Nodes, HisNodes) of
+ true ->
+ {Node, dist_update_run(Appls, AppName,
+ Node, HisPermission)};
+ _ ->
+ dist_mismatch(AppName, Node)
+ end
+ end, Appls1, DistLoaded),
+ Load2 = del_dist_loaded(AppName, Load1),
+ %% Tell all Nodes about the new appl loaded, and its permission.
+ foreach(fun(Node) when Node =/= node() ->
+ Msg = {dist_ac_app_loaded, node(), AppName,
+ Nodes, Permission, member(Node, LoadedNodes)},
+ {?DIST_AC, Node} ! Msg;
+ (_) -> ok
+ end, FNodes),
+ {ok, S#state{appls = Appls2, dist_loaded = Load2}}.
+
+ensure_take_control(AppName, Appls) ->
+ %% Check if this is a new application that we don't control yet
+ case lists:keymember(AppName, #appl.name, Appls) of
+ true -> % we have control
+ ok;
+ false -> % take control!
+ %% Note: this works because this is executed within a
+ %% synchronous call. I.e. we get the control *before*
+ %% application:load returns. (otherwise application:start
+ %% could be called before we got the chance to take control)
+ %% The only reason we have to bother about this is because
+ %% we have to be backwards compatible in the sense that all
+ %% apps don't have to be specified in the 'distributed' parameter,
+ %% but may be implicitly 'distributed' by a call to
+ %% application:load.
+ application_controller:control_application(AppName)
+ end.
+
+unload(AppName, S) ->
+ Appls = S#state.appls,
+ Nodes = dist_flat_nodes(Appls, AppName),
+ %% Tell all ACs in DistNodes about the unloaded appl
+ Msg = {dist_ac_app_unloaded, node(), AppName},
+ send_msg(Msg, Nodes),
+ {value, Appl} = keysearch(AppName, #appl.name, Appls),
+ NAppl = Appl#appl{id = undefined, run = []},
+ {ok, S#state{appls = keyreplace(AppName, #appl.name, Appls, NAppl)}}.
+
+start_appl(AppName, S, Type) ->
+ %% Get nodes, and check if App is loaded on all involved nodes.
+ %% If it is loaded everywhere, we know that we have the same picture
+ %% of the nodes; otherwise the load wouldn't have succeeded.
+ Appl = case keysearch(AppName, #appl.name, Appls = S#state.appls) of
+ {value, A} -> A;
+ _ -> throw({error, {unknown_application, AppName}})
+ end,
+ case Appl#appl.id of
+ local ->
+ %% UW 990913: we've already started the app
+ %% this could happen if ac_start_application_req was resent.
+ {ok,S,false};
+ _ ->
+ {Id, IsWaiting} = case dist_get_all_nodes(Appl) of
+ {ok, DistNodes, PermittedNodes} ->
+ start_distributed(Appl, AppName, DistNodes,
+ PermittedNodes, S, Type);
+ Error -> throw(Error)
+ end,
+ NAppl = Appl#appl{id = Id},
+ NAppls = keyreplaceadd(AppName, #appl.name, Appls, NAppl),
+ {ok, NewS} = req_start_app(S#state{appls = NAppls}, AppName),
+ TmpLocals = keydelete_all(AppName, 1, NewS#state.tmp_locals),
+ TmpWeights = keydelete_all(AppName, 1, NewS#state.tmp_weights),
+ RStarted = keydelete(AppName, 2, S#state.remote_started),
+ Started = replaceadd(AppName, NewS#state.started),
+ {ok,
+ NewS#state{started = Started, tmp_locals = TmpLocals,
+ tmp_weights = TmpWeights, remote_started = RStarted},
+ IsWaiting}
+ end.
+
+
+start_distributed(Appl, Name, Nodes, PermittedNodes, S, Type) ->
+ case find_start_node(Nodes, PermittedNodes, Name, S) of
+ {ok, Node} when Node =:= node() ->
+ case Appl#appl.id of
+ {failover, FoNode} when Type =:= req ->
+ ac_failover(Name, FoNode, undefined);
+ {distributed, Node2} when Type =:= req ->
+ ac_takeover(req, Name, Node2, undefined);
+ _ when Type =:= reply ->
+ case lists:keysearch(Name, 2, S#state.remote_started) of
+ {value, {Node3, _}} ->
+ ac_takeover(reply, Name, Node3, undefined);
+ _ ->
+ ac_start_it(Type, Name)
+ end;
+ _ ->
+ ac_start_it(Type, Name)
+ end,
+ {run_waiting, true};
+ {already_started, Node} ->
+ ac_started(Type, Name, Node),
+ {{distributed, Node}, false};
+ {ok, Node} ->
+ case keysearch(Name, #appl.name, S#state.appls) of
+ {value, #appl{id = {distributed, Node}}} ->
+ ac_started(Type, Name, Node),
+ {{distributed, Node}, false};
+ _ ->
+ wait_dist_start(Node, Appl, Name, Nodes,
+ PermittedNodes, S, Type)
+ end;
+ not_started ->
+ wait_dist_start2(Appl, Name, Nodes, PermittedNodes, S, Type);
+ no_permission ->
+ ac_not_started(Type, Name),
+ {undefined, false}
+ end.
+
+wait_dist_start(Node, Appl, Name, Nodes, PermittedNodes, S, Type) ->
+ monitor_node(Node, true),
+ receive
+ {dist_ac_app_started, Node, Name, ok} ->
+ ac_started(Type, Name, Node),
+ monitor_node(Node, false),
+ {{distributed, Node}, false};
+ {dist_ac_app_started, Node, Name, {error, R}} ->
+ ac_error(Type, Name, {Node, R}),
+ monitor_node(Node, false),
+ {Appl#appl.id, false};
+ {dist_ac_weight, Name, _Weigth, Node} ->
+ %% This is the situation: {Name, [RNode, {Node}, node()]}
+ %% and permit(false) is called on RNode, and we sent the
+ %% weigth first. Node handled it in handle_info, and
+ %% now we must send him a weigth msg. We can use any weigth;
+ %% he wins anyway.
+ monitor_node(Node, false),
+ {?DIST_AC, Node} !
+ {dist_ac_weight, Name, get_cached_weight(Name, S), node()},
+ wait_dist_start(Node, Appl, Name, Nodes, PermittedNodes, S, Type);
+ {nodedown, Node} ->
+ monitor_node(Node, false),
+ TmpLocals =
+ filter(fun({Name2, _Weight, Node2}) when Node2 =:= Node,
+ Name2 =:= Name -> false;
+ (_) -> true
+ end,
+ S#state.tmp_locals),
+ NewS = S#state{tmp_locals = TmpLocals},
+ start_distributed(Appl, Name, Nodes,
+ lists:delete(Node, PermittedNodes), NewS, Type)
+ end.
+
+wait_dist_start2(Appl, Name, Nodes, PermittedNodes, S, Type) ->
+ receive
+ {dist_ac_app_started, Node, Name, ok} ->
+ ac_started(Type, Name, Node),
+ {{distributed, Node}, false};
+ {dist_ac_app_started, Node, Name, {error, R}} ->
+ ac_error(Type, Name, {Node, R}),
+ {Appl#appl.id, false};
+ {nodedown, Node} ->
+ %% A node went down, try to start the app again - there may not
+ %% be any more nodes to wait for.
+ TmpLocals =
+ filter(fun({Name2, _Weight, Node2}) when Node2 =:= Node,
+ Name2 =:= Name -> false;
+ (_) -> true
+ end,
+ S#state.tmp_locals),
+ NewS = S#state{tmp_locals = TmpLocals},
+ start_distributed(Appl, Name, Nodes,
+ lists:delete(Node, PermittedNodes), NewS, Type)
+ end.
+
+
+ac_start_it(reply, Name) ->
+ ?AC ! {ac_start_application_reply, Name, start_it};
+ac_start_it(req, Name) ->
+ ?AC ! {ac_change_application_req, Name, start_it}.
+
+ac_started(reply, Name, Node) ->
+ ?AC ! {ac_start_application_reply, Name, {started, Node}};
+ac_started(req, Name, Node) ->
+ ?AC ! {ac_change_application_req, Name, {started, Node}}.
+
+ac_error(reply, Name, Error) ->
+ ?AC ! {ac_start_application_reply, Name, {error, Error}};
+ac_error(req, _Name, _Error) ->
+ ok.
+
+ac_not_started(reply, Name) ->
+ ?AC ! {ac_start_application_reply, Name, not_started};
+ac_not_started(req, Name) ->
+ ?AC ! {ac_change_application_req, Name, stop_it}.
+
+ac_stop_it(Name) ->
+ ?AC ! {ac_change_application_req, Name, stop_it}.
+
+ac_takeover(reply, Name, Node, _RestartType) ->
+ ?AC ! {ac_start_application_reply, Name, {takeover, Node}};
+ac_takeover(req, Name, Node, RestartType) ->
+ ?AC ! {ac_change_application_req, Name,
+ {takeover, Node, RestartType}}.
+
+ac_failover(Name, Node, RestartType) ->
+ ?AC ! {ac_change_application_req, Name,
+ {failover, Node, RestartType}}.
+
+ac_not_running(Name) ->
+ ?AC ! {ac_change_application_req, Name, not_running}.
+
+restart_appls(Appls) ->
+ foreach(fun(Appl) ->
+ AppName = Appl#appl.name,
+ send_after(Appl#appl.restart_time,
+ {internal_restart_appl, AppName})
+ end, lists:reverse(Appls)).
+
+restart_appl(AppName, S) ->
+ case keysearch(AppName, #appl.name, S#state.appls) of
+ {value, Appl} when element(1, Appl#appl.id) =:= failover ->
+ case catch start_appl(AppName, S, req) of
+ {ok, NewS, _} ->
+ NewS;
+ {error, R} ->
+ error_msg("Error when restarting application ~p: ~p~n",
+ [AppName, R]),
+ S
+ end;
+ _ ->
+ S
+ end.
+
+%% permit(ShouldBeRunning, IsRunning, ...)
+permit(false, {value, #appl{id = undefined}}, _AppName, _From, S, _LockId) ->
+ {reply, ok, S}; % It's not running
+permit(false, {value, #appl{id = Id}}, _AppName, _From, S, _LockId)
+ when element(1, Id) =:= distributed ->
+ %% It is running at another node already
+ {reply, ok, S};
+permit(false, {value, _}, AppName, From, S, _LockId) ->
+ %% It is a distributed application
+ %% Check if there is any runnable node
+ case dist_get_runnable_nodes(S#state.appls, AppName) of
+ [] ->
+ %% There is no runnable node; stop application
+ ac_stop_it(AppName),
+ SReqs = [{AppName, From} | S#state.s_reqs],
+ {noreply, S#state{s_reqs = SReqs}};
+ Nodes ->
+ %% Delete all outstanding 'permit true' requests.
+ PR = req_del_permit_true(S#state.p_reqs, AppName),
+ NPReqs = [{From, AppName, false, Nodes} | PR],
+ {noreply, S#state{p_reqs = NPReqs}}
+ end;
+permit(true, {value, #appl{id = local}}, _AppName, _From, S, _LockId) ->
+ {reply, ok, S};
+permit(true, _, AppName, From, S, LockId) ->
+ case catch start_appl(AppName, S, req) of
+ {_ErrorTag, {not_running, App}} ->
+ %% Delete all outstanding 'permit false' requests
+ PR = req_del_permit_false(S#state.p_reqs, AppName),
+ NPReqs = [{false, AppName, true, App} | PR],
+ {reply, ok, S#state{p_reqs = NPReqs}};
+ {ok, NewS, true} ->
+ %% We have ordered a start or a takeover; we must not return
+ %% until the app is running.
+ TR = NewS#state.t_reqs,
+ %% Delete the lock, so others may start the app
+ global:del_lock(LockId),
+ {noreply, NewS#state{t_reqs = [{AppName, From} | TR]}};
+ {ok, _S, false} ->
+ %% Application should be started, but at another node
+ %% State remains the same
+ {reply, ok, S};
+ {_ErrorTag, R} ->
+ {stop, R, {error, R}, S}
+ end.
+
+do_start_appls(StartApps, S) ->
+ SortedStartApps = StartApps,
+ Appls = S#state.appls,
+ {ok, foldl(
+ fun(AppName, NewS) ->
+ case catch start_appl(AppName, NewS, req) of
+ {error, R} ->
+ throw({{error, NewS}, R});
+ {ok, NewS2, _} ->
+ NewS2
+ end
+ end, S#state{appls = Appls}, lists:reverse(SortedStartApps))}.
+
+%%-----------------------------------------------------------------
+%% Nodes = [node() | {node(), ..., node()}]
+%% A list in priority order. If it is a tuple, we may pick any of
+%% them. This decision is made by all nodes in the list, and all
+%% nodes choose the same. This is accomplished in the following
+%% way: all Nodes send to all others a msg which tells how many
+%% applications each node has started. The one with least no of
+%% appls starts this one.
+%%-----------------------------------------------------------------
+find_start_node(Nodes, PermittedNodes, Name, S) ->
+ AllNodes = intersection(flat_nodes(Nodes), PermittedNodes),
+ case lists:member(node(), AllNodes) of
+ true ->
+ Weight = get_cached_weight(Name, S),
+ find_start_node(Nodes, Name, S, Weight, AllNodes);
+ false ->
+ case keysearch(Name, 2, S#state.remote_started) of
+ {value, {Node, _Name}} ->
+ {already_started, Node};
+ _ when AllNodes =/= [] ->
+ not_started;
+ _ ->
+ no_permission
+ end
+ end.
+
+find_start_node([AnyNodes | Nodes], Name, S, Weight, AllNodes)
+ when is_tuple(AnyNodes) ->
+ case find_any_node(tuple_to_list(AnyNodes), Name, S, Weight, AllNodes) of
+ false -> find_start_node(Nodes, Name, S, Weight, AllNodes);
+ Res -> Res
+ end;
+find_start_node([Node | Nodes], Name, S, Weight, AllNodes) ->
+ case lists:member(Node, AllNodes) of
+ true ->
+ case keysearch(Name, #appl.name, S#state.appls) of
+ {value, #appl{id = {distributed, Node}}} ->
+ {already_started, Node};
+ _ ->
+ case keysearch(Name, 2, S#state.remote_started) of
+ {value, {Node, _Name}} ->
+ {already_started, Node};
+ _ ->
+ {ok, Node}
+ end
+ end;
+ false -> find_start_node(Nodes, Name, S, Weight, AllNodes)
+ end;
+find_start_node([], _Name, _S, _Weight, _AllNodes) ->
+ not_started.
+
+%%-----------------------------------------------------------------
+%% First of all, check if the application is already running
+%% somewhere in AnyNodes; in that case we shall not move it!
+%%-----------------------------------------------------------------
+find_any_node(AnyNodes, Name, S, Weight, AllNodes) ->
+ case check_running(Name, S, intersection(AnyNodes, AllNodes)) of
+ {already_started, Node} -> {already_started, Node};
+ false ->
+ %% Synchronize with all other nodes.
+ send_nodes(AllNodes, {dist_ac_weight, Name, Weight, node()}),
+ Answers = [{Weight, node()} |
+ collect_answers(AllNodes, Name, S, [])],
+ %% Make a decision (the same at every node) (smallest weight wins)
+ find_alive_node(lists:sort(Answers),
+ intersection(AnyNodes, S#state.known))
+ end.
+
+%%-----------------------------------------------------------------
+%% Check if another node started the appl before we got alive.
+%% If so, check if the node is one of AnyNodes.
+%%-----------------------------------------------------------------
+check_running(Name, #state{remote_started = RStarted,
+ appls = Appls}, AnyNodes) ->
+ case keysearch(Name, 2, RStarted) of
+ {value, {Node, _Name}} ->
+ case lists:member(Node, AnyNodes) of
+ true -> {already_started, Node};
+ false -> false
+ end;
+ false ->
+ case keysearch(Name, #appl.name, Appls) of
+ {value, #appl{id = {distributed, Node}}} ->
+ case lists:member(Node, AnyNodes) of
+ true -> {already_started, Node};
+ false -> false
+ end;
+ _ ->
+ false
+ end
+ end.
+
+find_alive_node([{_, Node} | Nodes], AliveNodes) ->
+ case lists:member(Node, AliveNodes) of
+ true -> {ok, Node};
+ false -> find_alive_node(Nodes, AliveNodes)
+ end;
+find_alive_node([], _AliveNodes) ->
+ false.
+
+%%-----------------------------------------------------------------
+%% First, check if the node's msg is buffered (received in our
+%% main loop). Otherwise, wait for msg or nodedown.
+%% We have sent the dist_ac_weight message, and will wait for it
+%% to be received here (or a nodedown). This implies that a
+%% dist_ac must *always* be prepared to get this messages, and to
+%% send it to us.
+%%-----------------------------------------------------------------
+collect_answers([Node | Nodes], Name, S, Res) when Node =/= node() ->
+ case keysearch(Node, 3, S#state.tmp_locals) of
+ {value, {Name, Weight, Node}} ->
+ collect_answers(Nodes, Name, S, [{Weight, Node} | Res]);
+ _ ->
+ monitor_node(Node, true),
+ receive
+ {dist_ac_weight, Name, Weight, Node} ->
+ monitor_node(Node, false),
+ collect_answers(Nodes, Name, S, [{Weight, Node} | Res]);
+ {nodedown, Node} ->
+ monitor_node(Node, false),
+ collect_answers(Nodes, Name, S, Res)
+ end
+ end;
+collect_answers([_ThisNode | Nodes], Name, S, Res) ->
+ collect_answers(Nodes, Name, S, Res);
+collect_answers([], _Name, _S, Res) ->
+ Res.
+
+send_nodes(Nodes, Msg) ->
+ FlatNodes = flat_nodes(Nodes),
+ foreach(fun(Node) when Node =/= node() -> {?DIST_AC, Node} ! Msg;
+ (_ThisNode) -> ok
+ end, FlatNodes).
+
+send_after(Time, Msg) when is_integer(Time), Time >= 0 ->
+ spawn_link(?MODULE, send_timeout, [self(), Time, Msg]);
+send_after(_,_) -> % infinity
+ ok.
+
+send_timeout(To, Time, Msg) ->
+ receive
+ after Time -> To ! Msg
+ end.
+
+send_msg(Msg, Nodes) ->
+ foreach(fun(Node) when Node =/= node() -> {?DIST_AC, Node} ! Msg;
+ (_) -> ok
+ end, Nodes).
+
+replaceadd(Item, List) ->
+ case member(Item, List) of
+ true -> List;
+ false -> [Item | List]
+ end.
+
+keyreplaceadd(Key, Pos, List, New) ->
+ case lists:keymember(Key, Pos, List) of
+ true -> lists:keyreplace(Key, Pos, List, New);
+ false -> [New | List]
+ end.
+
+keydelete_all(Key, N, [H|T]) when element(N, H) =:= Key ->
+ keydelete_all(Key, N, T);
+keydelete_all(Key, N, [H|T]) ->
+ [H|keydelete_all(Key, N, T)];
+keydelete_all(_Key, _N, []) -> [].
+
+-ifdef(NOTUSED).
+keysearchdelete(Key, Pos, List) ->
+ ksd(Key, Pos, List, []).
+
+ksd(Key, Pos, [H | T], Rest) when element(Pos, H) =:= Key ->
+ {value, H, Rest ++ T};
+ksd(Key, Pos, [H | T], Rest) ->
+ ksd(Key, Pos, T, [H | Rest]);
+ksd(_Key, _Pos, [], _Rest) ->
+ false.
+
+get_new_appl(Name, [{application, Name, App} | _]) ->
+ {ok, {application, Name, App}};
+get_new_appl(Name, [_ | T]) -> get_new_appl(Name, T);
+get_new_appl(Name, []) -> false.
+-endif.
+
+equal_nodes([H | T1], [H | T2]) when is_atom(H) ->
+ equal_nodes(T1, T2);
+equal_nodes([H1 | T1], [H2 | T2]) when is_tuple(H1), is_tuple(H2) ->
+ case equal(tuple_to_list(H1), tuple_to_list(H2)) of
+ true -> equal_nodes(T1, T2);
+ false -> false
+ end;
+equal_nodes([], []) -> true;
+equal_nodes(_, _) -> false.
+
+equal([H | T] , S) ->
+ case lists:member(H, S) of
+ true -> equal(T, lists:delete(H, S));
+ false -> false
+ end;
+equal([], []) -> true;
+equal(_, _) -> false.
+
+flat_nodes(Nodes) when is_list(Nodes) ->
+ foldl(fun(Node, Res) when is_atom(Node) -> [Node | Res];
+ (Tuple, Res) when is_tuple(Tuple) -> tuple_to_list(Tuple) ++ Res
+ end, [], Nodes);
+flat_nodes(Nodes) ->
+ throw({error, {badarg, Nodes}}).
+
+get_cached_weight(Name, S) ->
+ case lists:keysearch(Name, 1, S#state.tmp_weights) of
+ {value, {_, W}} -> W;
+ _ -> get_weight()
+ end.
+
+%% Simple weight; just count the number of applications running.
+get_weight() ->
+ length(application:which_applications()).
+
+get_dist_loaded(Name, [{{Name, Node}, HisNodes, Permission} | T]) ->
+ [{Node, HisNodes, Permission} | get_dist_loaded(Name, T)];
+get_dist_loaded(Name, [_H | T]) ->
+ get_dist_loaded(Name, T);
+get_dist_loaded(_Name, []) ->
+ [].
+
+del_dist_loaded(Name, [{{Name, _Node}, _HisNodes, _Permission} | T]) ->
+ del_dist_loaded(Name, T);
+del_dist_loaded(Name, [H | T]) ->
+ [H | del_dist_loaded(Name, T)];
+del_dist_loaded(_Name, []) ->
+ [].
+
+req_start_app(State, Name) ->
+ {ok, foldl(
+ fun({false, AppName, true, Name2}, S) when Name =:= Name2 ->
+ PR = keydelete(AppName, 2, S#state.p_reqs),
+ NS = S#state{p_reqs = PR},
+ case catch do_start_appls([AppName], NS) of
+ {_ErrorTag, {not_running, App}} ->
+ NRequests = [{false, AppName, true, App} | PR],
+ S#state{p_reqs = NRequests};
+ {ok, NewS} ->
+ NewS;
+ {_ErrorTag, R} ->
+ throw({error, R})
+ end;
+ (_, S) ->
+ S
+ end, State, State#state.p_reqs)}.
+
+
+req_del_permit_true(Reqs, Name) ->
+ filter(fun({From, Name2, true, _}) when Name2 =:= Name ->
+ gen_server:reply(From, ok),
+ false;
+ (_) ->
+ true
+ end, Reqs).
+
+req_del_permit_false(Reqs, Name) ->
+ filter(fun({From, Name2, false, _Nodes}) when Name2 =:= Name ->
+ gen_server:reply(From, ok),
+ false;
+ (_) ->
+ true
+ end, Reqs).
+
+req_del_node(S, Node, Appls) ->
+ check_waiting(S#state.p_reqs, S, Node, Appls, [], S#state.s_reqs).
+
+del_t_reqs(AppName, TReqs, Res) ->
+ lists:filter(fun({AN, From}) when AppName =:= AN ->
+ gen_server:reply(From, Res),
+ false;
+ (_) ->
+ true
+ end,
+ TReqs).
+
+
+check_waiting([{From, AppName, false, Nodes} | Reqs],
+ S, Node, Appls, Res, SReqs) ->
+ case lists:delete(Node, Nodes) of
+ [] ->
+ ac_stop_it(AppName),
+ NSReqs = [{AppName, From} | SReqs],
+ check_waiting(Reqs, Node, S, Appls, Res, NSReqs);
+ NNodes ->
+ check_waiting(Reqs, Node, S, Appls,
+ [{From, AppName, false, NNodes} | Res], SReqs)
+ end;
+check_waiting([H | Reqs], S, Node, Appls, Res, SReqs) ->
+ check_waiting(Reqs, Node, S, Appls, [H | Res], SReqs);
+check_waiting([], _Node, _S, Appls, Res, SReqs) ->
+ {Res, Appls, SReqs}.
+
+intersection([], _) ->
+ [];
+intersection(_, []) ->
+ [];
+intersection(L1, L2) ->
+ L1 -- (L1 -- L2).
+
+get_default_permission(AppName) ->
+ case application:get_env(kernel, permissions) of
+ {ok, Permissions} ->
+ case keysearch(AppName, 1, Permissions) of
+ {value, {_, true}} -> true;
+ {value, {_, false}} -> false;
+ {value, {_, X}} -> exit({bad_permission, {AppName, X}});
+ false -> true
+ end;
+ undefined -> true
+ end.
+
+%%-----------------------------------------------------------------
+%% ADT dist() - info on how an application is distributed
+%% dist() = [{AppName, Time, DistNodes, [{Node, Runnable}]}]
+%% Time = int() >= 0 | infinity
+%% Nodes = [node() | {node()...}]
+%% Runnable = true | false | undefined
+%% An appl may not be started if any Runnable is undefined;
+%% i.e. the appl must be loaded on all Nodes.
+%%-----------------------------------------------------------------
+dist_check([{AppName, Nodes} | T]) ->
+ P = get_default_permission(AppName),
+ [#appl{name = AppName, nodes = Nodes, run = [{node(), P}]} | dist_check(T)];
+dist_check([{AppName, Time, Nodes} | T]) when is_integer(Time), Time >= 0 ->
+ P = get_default_permission(AppName),
+ [#appl{name = AppName, restart_time = Time, nodes = Nodes,
+ run = [{node(), P}]} | dist_check(T)];
+dist_check([{AppName, infinity, Nodes} | T]) ->
+ P = get_default_permission(AppName),
+ [#appl{name = AppName, restart_time = infinity,
+ nodes = Nodes, run = [{node(), P}]} |
+ dist_check(T)];
+dist_check([_ | T]) ->
+ dist_check(T);
+dist_check([]) ->
+ [].
+
+dist_take_control(Appls) ->
+ foreach(fun(#appl{name = AppName}) ->
+ application_controller:control_application(AppName)
+ end, Appls).
+
+dist_replace(default, _Name, Appls) -> Appls;
+dist_replace({AppName, Nodes}, AppName, Appls) ->
+ Run = [{Node, undefined} || Node <- flat_nodes(Nodes)],
+ keyreplaceadd(AppName, #appl.name, Appls,
+ #appl{name = AppName, restart_time = 0,
+ nodes = Nodes, run = Run});
+dist_replace({AppName, Time, Nodes}, AppName, Appls)
+ when is_integer(Time), Time >= 0 ->
+ Run = [{Node, undefined} || Node <- flat_nodes(Nodes)],
+ keyreplaceadd(AppName, #appl.name, Appls,
+ #appl{name = AppName, restart_time = Time,
+ nodes = Nodes, run = Run});
+dist_replace(Bad, _Name, _Appls) ->
+ throw({error, {bad_distribution_spec, Bad}}).
+
+dist_update_run(Appls, AppName, Node, Permission) ->
+ map(fun(Appl) when Appl#appl.name =:= AppName ->
+ Run = Appl#appl.run,
+ NRun = keyreplaceadd(Node, 1, Run, {Node, Permission}),
+ Appl#appl{run = NRun};
+ (Appl) ->
+ Appl
+ end, Appls).
+
+
+
+dist_change_update(Appls, []) ->
+ Appls;
+dist_change_update(Appls, [{AppName, NewNodes} | NewDist]) ->
+ NewAppls = do_dist_change_update(Appls, AppName, 0, NewNodes),
+ dist_change_update(NewAppls, NewDist);
+dist_change_update(Appls, [{AppName, NewTime, NewNodes} | NewDist]) ->
+ NewAppls = do_dist_change_update(Appls, AppName, NewTime, NewNodes),
+ dist_change_update(NewAppls, NewDist).
+
+do_dist_change_update(Appls, AppName, NewTime, NewNodes) ->
+ map(fun(Appl) when Appl#appl.name =:= AppName ->
+ Appl#appl{restart_time = NewTime, nodes = NewNodes};
+ (Appl) ->
+ Appl
+ end, Appls).
+
+%% Merge his Permissions with mine.
+dist_merge(MyAppls, HisAppls, HisNode) ->
+ zf(fun(Appl) ->
+ #appl{name = AppName, run = Run} = Appl,
+% #appl{name = AppName, nodes = Nodes, run = Run} = Appl,
+% HeIsMember = lists:member(HisNode, flat_nodes(Nodes)),
+ HeIsMember = true,
+ case keysearch(AppName, #appl.name, HisAppls) of
+ {value, #appl{run = HisRun}} when HeIsMember ->
+ case keysearch(HisNode, 1, HisRun) of
+ {value, Val} -> % He has it loaded
+ NRun = keyreplaceadd(HisNode, 1, Run, Val),
+ {true, Appl#appl{run = NRun}};
+ false -> % He hasn't loaded it yet
+ Val = {HisNode, undefined},
+ {true, Appl#appl{run = [Val | Run]}}
+ end;
+ _ ->
+ true
+ end
+ end, MyAppls).
+
+dist_get_runnable_nodes(Appls, AppName) ->
+ case keysearch(AppName, #appl.name, Appls) of
+ {value, #appl{run = Run}} ->
+ zf(fun({Node, true}) -> {true, Node};
+ (_) -> false
+ end, Run);
+ false ->
+ []
+ end.
+
+dist_is_runnable(Appls, AppName) ->
+ case keysearch(AppName, #appl.name, Appls) of
+ {value, #appl{run = Run}} ->
+ case keysearch(node(), 1, Run) of
+ {value, {_, true}} -> true;
+ _ -> false
+ end;
+ false ->
+ false
+ end.
+
+is_loaded(AppName, #state{appls = Appls}) ->
+ case keysearch(AppName, #appl.name, Appls) of
+ {value, #appl{run = Run}} ->
+ case keysearch(node(), 1, Run) of
+ {value, {_Node, undefined}} -> false;
+ {value, _} -> true;
+ false -> false
+ end;
+ false ->
+ false
+ end.
+
+dist_get_runnable(Appls) ->
+ zf(fun(#appl{name = AppName, run = Run}) ->
+ case keysearch(node(), 1, Run) of
+ {value, {_, true}} -> {true, AppName};
+ _ -> false
+ end
+ end, Appls).
+
+dist_get_all_nodes(#appl{name = AppName, nodes = Nodes, run = Run}) ->
+ {Res, BadNodes} = check_nodes(Run, [], []),
+ case intersection(BadNodes, erlang:nodes(connected)) of
+ [] -> {ok, Nodes, Res};
+ _ -> {error, {app_not_loaded, AppName, BadNodes}}
+ end.
+
+check_nodes([{Node, undefined} | T], Res, BadNodes) ->
+ check_nodes(T, Res, [Node | BadNodes]);
+check_nodes([{Node, true} | T], Res, BadNodes) ->
+ check_nodes(T, [Node | Res], BadNodes);
+check_nodes([{_Node, false} | T], Res, BadNodes) ->
+ check_nodes(T, Res, BadNodes);
+check_nodes([], Res, BadNodes) ->
+ {Res, BadNodes}.
+
+-ifdef(NOTUSED).
+dist_find_time([#appl{name = Name, restart_time = Time} |_], Name) -> Time;
+dist_find_time([_ | T], Name) -> dist_find_time(T, Name);
+dist_find_time([], Name) -> 0.
+-endif.
+
+%% Find all nodes that can run the app (even if they're not permitted
+%% to right now).
+dist_find_nodes([#appl{name = Name, nodes = Nodes} |_], Name) -> Nodes;
+dist_find_nodes([_ | T], Name) -> dist_find_nodes(T, Name);
+dist_find_nodes([], _Name) -> [].
+
+dist_flat_nodes(Appls, Name) ->
+ flat_nodes(dist_find_nodes(Appls, Name)).
+
+dist_del_node(Appls, Node) ->
+ map(fun(Appl) ->
+ NRun = filter(fun({N, _Runnable}) when N =:= Node -> false;
+ (_) -> true
+ end, Appl#appl.run),
+ Appl#appl{run = NRun}
+ end, Appls).
+
+validRestartType(permanent) -> true;
+validRestartType(temporary) -> true;
+validRestartType(transient) -> true;
+validRestartType(_RestartType) -> false.
+
+dist_mismatch(AppName, Node) ->
+ error_msg("Distribution mismatch for application \"~p\" on nodes ~p and ~p~n",
+ [AppName, node(), Node]),
+ exit({distribution_mismatch, AppName, Node}).
+
+%error_msg(Format) when is_list(Format) ->
+% error_msg(Format, []).
+
+error_msg(Format, ArgList) when is_list(Format), is_list(ArgList) ->
+ error_logger:error_msg("dist_ac on node ~p:~n" ++ Format, [node()|ArgList]).
+
+%info_msg(Format) when is_list(Format) ->
+% info_msg(Format, []).
+
+%info_msg(Format, ArgList) when is_list(Format), is_list(ArgList) ->
+% error_logger:info_msg("dist_ac on node ~p:~n" ++ Format, [node()|ArgList]).