diff options
Diffstat (limited to 'lib/kernel/src/dist_ac.erl')
-rw-r--r-- | lib/kernel/src/dist_ac.erl | 1534 |
1 files changed, 1534 insertions, 0 deletions
diff --git a/lib/kernel/src/dist_ac.erl b/lib/kernel/src/dist_ac.erl new file mode 100644 index 0000000000..5c62aa31e9 --- /dev/null +++ b/lib/kernel/src/dist_ac.erl @@ -0,0 +1,1534 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1997-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(dist_ac). + +-behaviour(gen_server). + +%% External exports +-export([start_link/0, + load_application/2, + takeover_application/2, + permit_application/2, + permit_only_loaded_application/2]). + +-export([get_known_nodes/0]). + +%% Internal exports +-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2, + code_change/3, send_timeout/3]). +-export([info/0]). + +-import(lists, [zf/2, filter/2, map/2, foreach/2, foldl/3, mapfoldl/3, + keysearch/3, keydelete/3, keyreplace/4, member/2]). + +-define(AC, application_controller). +-define(DIST_AC, ?MODULE). +-define(LOCK_ID, ?MODULE). + +%% This is the protocol version for the dist_ac protcol (between nodes) +-define(vsn, 1). + +%%%----------------------------------------------------------------- +%%% This module implements the default Distributed Applications +%%% Controller. It is possible to write other controllers, when +%%% the functionality in this module are not sufficient. +%%% The process cooperates with the application_controller. +%%%----------------------------------------------------------------- + +%%----------------------------------------------------------------- +%% Naming conventions: +%% Appl = #appl +%% AppName = atom() +%%----------------------------------------------------------------- +-record(state, {appls = [], tmp_locals = [], remote_started = [], + known = [], started = [], tmp_weights = [], + dist_loaded = [], t_reqs = [], s_reqs = [], p_reqs = []}). +%%----------------------------------------------------------------- +%% appls = [#appl()] - these are the applications we control +%% tmp_locals = [{AppName, Weight, node()}] - tmp, info part of +%% application startup for some distrib appls, +%% not yet handled. +%% remote_started = [{Node, AppName}] - info on apps started before +%% we were started +%% known = [Node] - These are the nodes known to us +%% started = [AppName] - An ordered list of started applications +%% (reversed start order) +%% tmp_weight = [{AppName, MyWeight}] - tmp, if we're forced to +%% send a dist_ac_weight message before we're prepared to, +%% we remember the weight we sent here, so we can use +%% it in the dist_ac_weight msgs later. +%% dist_loaded = {{Name, Node}, HisNodes, Permission} - info on +%% application loaded on other nodes (and own node) +%% t_reqs = [{AppName, From}] - processes waiting for takeover +%% to complete. +%% s_reqs = [{AppName, From}] - processes waiting for stop +%% to complete. +%% p_reqs = [{From, AppName, Bool, [Node]] - outstanding permit. +%% Nodes is a list of nodes we're still waiting for. +%%----------------------------------------------------------------- + +-record(appl, {name, id, restart_time = 0, nodes = [], run = []}). + +%%----------------------------------------------------------------- +%% id = local | undefined | {distributed, node()} | waiting | run_waiting | +%% {failover, Node} | {takeover, Node} +%% local : local application +%% undefined : not yet started +%% {distributed, Node} : running on another node, we're standby +%% {failover, Node} : failover from Node +%% {takeover, Node} : takeover from Node +%% waiting : other node went down, we're waiting for a timeout +%% to takeover it. From = pid() | undefined +%% run_waiting : we have decided to start the app; wait for the +%% AC result +%%----------------------------------------------------------------- + +start_link() -> + case gen_server:start_link({local, ?DIST_AC}, ?MODULE, [], []) of + {ok, Pid} -> + gen_server:cast(?DIST_AC, init_sync), + {ok, Pid}; + Else -> + Else + end. + + +%%----------------------------------------------------------------- +%% Func: load_application(AppName, DistNodes) +%% Args: AppName = atom() +%% DistNodes = default | {AppName, Time, [node() | {node()...}]} +%% Purpose: Notifies the dist_ac about distributed nodes for an +%% application. DistNodes overrides the kernel 'distributed' +%% parameter. +%% Returns: ok | {error, Reason} +%%----------------------------------------------------------------- +load_application(AppName, DistNodes) -> + gen_server:call(?DIST_AC, {load_application, AppName, DistNodes}, infinity). + +takeover_application(AppName, RestartType) -> + case validRestartType(RestartType) of + true -> + wait_for_sync_dacs(), + Nodes = get_nodes(AppName), + global:trans( + {?LOCK_ID, self()}, + fun() -> + gen_server:call( + ?DIST_AC, + {takeover_application, AppName, RestartType}, + infinity) + end, + Nodes); + false -> + {error, {invalid_restart_type, RestartType}} + end. + +%%----------------------------------------------------------------- +%% This function controls which applications are permitted to run. If +%% an application X runs when this function is called as +%% permit_application(X, false), it is moved to another node where it +%% is permitted to run (distributed applications only). If there is +%% no such node, the application is stopped. (I.e. local applications +%% are always stopped, and distributed applications with no other node +%% alive are stopped as well.) If later a call to +%% permit_application(X, true) is made, X is restarted. +%% For example, suppose applications app1 and app2 are started and +%% running. +%% If we evaluate +%% permit_application(app2, false) +%% app2 is stopped and app1 only is running. +%% If we now evaluate +%% permit_application(app2, true), +%% permit_application(app3, true) +%% app2 is restarted, but not app3, since it hasn't been started by a +%% call to start_application. +%%----------------------------------------------------------------- +permit_application(AppName, Bool) -> + wait_for_sync_dacs(), + LockId = {?LOCK_ID, self()}, + global:trans( + LockId, + fun() -> + gen_server:call(?DIST_AC, + {permit_application, AppName, Bool, LockId, started}, + infinity) + end). + +permit_only_loaded_application(AppName, Bool) -> + wait_for_sync_dacs(), + LockId = {?LOCK_ID, self()}, + global:trans( + LockId, + fun() -> + gen_server:call(?DIST_AC, + {permit_application, AppName, Bool, LockId, only_loaded}, + infinity) + end). + +get_nodes(AppName) -> + gen_server:call(?DIST_AC, {get_nodes, AppName}, infinity). + +get_known_nodes() -> + gen_server:call(?DIST_AC, get_known_nodes). + +%%%----------------------------------------------------------------- +%%% call-back functions from gen_server +%%%----------------------------------------------------------------- +init([]) -> + process_flag(trap_exit, true), + {ok, #state{}}. + +sync_dacs(Appls) -> + Res = global:trans({?LOCK_ID, sync_dacs}, + fun() -> + Nodes = introduce_me(nodes(), Appls), + wait_dacs(Nodes, [node()], Appls, []) + end), + ets:insert(ac_tab, {sync_dacs, ok}), + Res. + +introduce_me(Nodes, Appls) -> + Msg = {dist_ac_new_node, ?vsn, node(), Appls, []}, + filter(fun(Node) -> + %% This handles nodes without DACs + case rpc:call(Node, erlang, whereis, [?DIST_AC]) of + Pid when is_pid(Pid) -> + Pid ! Msg, + true; + _ -> + false + end + end, Nodes). + +wait_dacs([Node | Nodes], KnownNodes, Appls, RStarted) -> + monitor_node(Node, true), + receive + %% HisAppls =/= [] is the case when our node connects to a running system + %% + %% It is always the responsibility of newer versions to understand + %% older versions of the protocol. As we don't have any older + %% versions (that are supposed to work with this version), we + %% don't handle version mismatch here. + {dist_ac_new_node, _Vsn, Node, HisAppls, HisStarted} -> + monitor_node(Node, false), + NRStarted = RStarted ++ HisStarted, + NAppls = dist_merge(Appls, HisAppls, Node), + wait_dacs(Nodes, [Node | KnownNodes], NAppls, NRStarted); + {nodedown, Node} -> + monitor_node(Node, false), + wait_dacs(Nodes, KnownNodes, Appls, RStarted) + end; +wait_dacs([], KnownNodes, Appls, RStarted) -> + {KnownNodes, Appls, RStarted}. + + +info() -> + gen_server:call(?DIST_AC, info). + + +%%----------------------------------------------------------------- +%% All functions that can affect which applications are running +%% execute within a global lock, to ensure that they are not +%% executing at the same time as sync_dacs. However, to avoid a +%% deadlock situation where e.g. permit_application gets the lock +%% before sync_dacs, this function is used to ensure that the local +%% sync_dacs always gets the lock first of all. The lock is still +%% used to not interfere with sync_dacs on other nodes. +%%----------------------------------------------------------------- +wait_for_sync_dacs() -> + case catch ets:lookup(ac_tab, sync_dacs) of + [{sync_dacs, ok}] -> ok; + _ -> + receive after 100 -> ok end, + wait_for_sync_dacs() + end. + +handle_cast(init_sync, _S) -> + %% When the dist_ac is started, it receives this msg, and gets into + %% the receive loop. 'go' is sent from the kernel_config proc when + %% all nodes that should be pinged has been pinged. The reason for this + %% is that dist_ac syncs with the other nodes at start-up. That is, + %% it does _not_ handle partitioned nets! The other nodes tries to call + %% the local name dist_ac, which means that this name must be registered + %% before the distribution. But it can't sync until after the distribution + %% is started. Therefore, this 'go'-thing. + receive + {go, KernelConfig} -> + Appls = case application:get_env(kernel, distributed) of + {ok, D} -> dist_check(D); + undefined -> [] + end, + + dist_take_control(Appls), + %% kernel_config waits for dist_ac to take control over its + %% applications. By this we can be sure that the kernel + %% application hasn't completed its start before dist_ac has + %% taken control over its applications. (OTP-3509) + KernelConfig ! dist_ac_took_control, + + %% we're really just interested in nodedowns. + net_kernel:monitor_nodes(true), + + {Known, NAppls, RStarted} = sync_dacs(Appls), + + {noreply, + #state{appls = NAppls, known = Known, remote_started = RStarted}} + end. + + +handle_call(info, _From, S) -> + {reply, S, S}; + + + +handle_call({load_application, AppName, DistNodes}, _From, S) -> + Appls = S#state.appls, + case catch dist_replace(DistNodes, AppName, Appls) of + {error, Error} -> + {reply, {error, Error}, S}; + {'EXIT', R} -> + {stop, R, {error, R}, S}; + NAppls -> + NewS = case dist_find_nodes(NAppls, AppName) of + [] -> % No distrib nodes; we ignore it + S; + _Nodes -> + ensure_take_control(AppName, Appls), + {ok, S2} = load(AppName, S#state{appls = NAppls}), + S2 + end, + {reply, ok, NewS} + end; + +handle_call({takeover_application, AppName, RestartType}, From, S) -> + Appls = S#state.appls, + case keysearch(AppName, #appl.name, Appls) of + {value, Appl} when element(1, Appl#appl.id) =:= distributed -> + {distributed, Node} = Appl#appl.id, + ac_takeover(req, AppName, Node, RestartType), + NAppl = Appl#appl{id = takeover}, + NAppls = keyreplace(AppName, #appl.name, Appls, NAppl), + TR = S#state.t_reqs, + {noreply, S#state{appls = NAppls, + t_reqs = [{AppName, From} | TR]}}; + {value, #appl{id = local}} -> + {reply, {error, {already_running_locally, AppName}}, S}; + _ -> + {reply, {error, {not_running_distributed, AppName}}, S} + end; + +handle_call({permit_application, AppName, Bool, LockId, StartInfo}, From, S) -> + case lists:keymember(AppName, #appl.name, S#state.appls) of + false -> + %% This one covers the case with permit for non-distributed + %% applications. This shouldn't be handled like this, and not + %% here, but we have to be backwards-compatible. + case application_controller:get_loaded(AppName) of + {true, _} when not Bool -> + ac_stop_it(AppName), + {reply, ok, S}; + {true, _} when Bool -> + ac_start_it(req, AppName), + {reply, ok, S}; + false -> + {reply, {error, {not_loaded, AppName}}, S} + end; + true -> + NAppls = dist_update_run(S#state.appls, AppName, node(), Bool), + NewS = S#state{appls = NAppls}, + %% Check if the application is running + IsRunning = keysearch(AppName, #appl.name, NAppls), + IsMyApp = case IsRunning of + {value, #appl{id = local}} -> true; + _ -> false + end, + %% Tell everyone about the new permission + Nodes = dist_flat_nodes(NAppls, AppName), + Msg = {dist_ac_new_permission, node(), AppName, Bool, IsMyApp}, + send_msg(Msg, Nodes), + case StartInfo of + only_loaded -> + {reply, ok, NewS}; + started -> + permit(Bool, IsRunning, AppName, From, NewS, LockId) + end + end; + +%%----------------------------------------------------------------- +%% The distributed parameter is changed. Update the parameters +%% but the applications are actually not moved to other nodes +%% even if they should. +%%----------------------------------------------------------------- +handle_call({distribution_changed, NewDistribution}, _From, S) -> + Appls = S#state.appls, + NewAppls = dist_change_update(Appls, NewDistribution), + NewS = S#state{appls = NewAppls}, + {reply, ok, NewS}; + + +handle_call({get_nodes, AppName}, _From, S) -> + Alive = intersection(dist_flat_nodes(S#state.appls, AppName), + S#state.known), + {reply, Alive, S}; + +handle_call(get_known_nodes, _From, S) -> + {reply, S#state.known, S}. + + +handle_info({ac_load_application_req, AppName}, S) -> + {ok, NewS} = load(AppName, S), + ?AC ! {ac_load_application_reply, AppName, ok}, + {noreply, NewS}; + +handle_info({ac_application_unloaded, AppName}, S) -> + {ok, NewS} = unload(AppName, S), + {noreply, NewS}; + +handle_info({ac_start_application_req, AppName}, S) -> + %% We must decide if we or another node should start the application + Lock = {?LOCK_ID, self()}, + case global:set_lock(Lock, [node()], 0) of + true -> + S2 = case catch start_appl(AppName, S, reply) of + {ok, NewS, _} -> + NewS; + {error, R} -> + ?AC ! {ac_start_application_reply, AppName, {error,R}}, + S + end, + global:del_lock(Lock), + {noreply, S2}; + false -> + send_after(100, {ac_start_application_req, AppName}), + {noreply, S} + end; + +handle_info({ac_application_run, AppName, Res}, S) -> + %% We ordered a start, and here's the result. Tell all other nodes. + Appls = S#state.appls, + Nodes = S#state.known, + %% Send this to _all_ known nodes, as any node could sync + %% on this app (not only nodes that can run it). + send_msg({dist_ac_app_started, node(), AppName, Res}, Nodes), + NId = case Res of + ok -> local; + {error, _R} -> undefined + end, + {value, Appl} = keysearch(AppName, #appl.name, Appls), + %% Check if we have somebody waiting for the takeover result + NTReqs = del_t_reqs(AppName, S#state.t_reqs, Res), + NAppl = Appl#appl{id = NId}, + NAppls = keyreplace(AppName, #appl.name, Appls, NAppl), + {noreply, S#state{appls = NAppls, t_reqs = NTReqs}}; + + +handle_info({ac_application_not_run, AppName}, S) -> + %% We ordered a stop, and now it has stopped + {value, Appl} = keysearch(AppName, #appl.name, Appls = S#state.appls), + %% Check if we have somebody waiting for the takeover result; + %% if somebody called stop just before takeover was handled, + NTReqs = del_t_reqs(AppName, S#state.t_reqs, {error, stopped}), + %% Check if we have somebody waiting for stop to return + SReqs = filter(fun({Name, From2}) when Name =:= AppName -> + gen_server:reply(From2, ok), + false; + (_) -> + true + end, S#state.s_reqs), + RS = case Appl#appl.id of + local -> + send_msg({dist_ac_app_stopped, AppName}, S#state.known), + S#state.remote_started; + {distributed, Node} -> + [{Node, AppName} | S#state.remote_started]; + _ -> + S#state.remote_started + end, + NAppl = Appl#appl{id = undefined}, + NAppls = keyreplace(AppName, #appl.name, Appls, NAppl), + {noreply, S#state{appls = NAppls, t_reqs = NTReqs, s_reqs = SReqs, + remote_started = RS}}; + +handle_info({ac_application_stopped, AppName}, S) -> + %% Somebody called application:stop - reset state as it was before + %% the application was started. + {value, Appl} = keysearch(AppName, #appl.name, Appls = S#state.appls), + %% Check if we have somebody waiting for the takeover result; + %% if somebody called stop just before takeover was handled, + NTReqs = del_t_reqs(AppName, S#state.t_reqs, {error, stopped}), + %% Check if we have somebody waiting for stop to return + SReqs = filter(fun({Name, From2}) when Name =:= AppName -> + gen_server:reply(From2, ok), + false; + (_) -> + true + end, S#state.s_reqs), + RS = case Appl#appl.id of + local -> + send_msg({dist_ac_app_stopped, AppName}, S#state.known), + S#state.remote_started; + {distributed, Node} -> + [{Node, AppName} | S#state.remote_started]; + _ -> + S#state.remote_started + end, + NAppl = Appl#appl{id = undefined}, + NAppls = keyreplace(AppName, #appl.name, Appls, NAppl), + Started = lists:delete(AppName, S#state.started), + {noreply, S#state{appls = NAppls, started = Started, + t_reqs = NTReqs, s_reqs = SReqs, + remote_started = RS}}; + + +%%----------------------------------------------------------------- +%% A new node gets running. +%% Send him info about our started distributed applications. +%%----------------------------------------------------------------- +handle_info({dist_ac_new_node, _Vsn, Node, HisAppls, []}, S) -> + Appls = S#state.appls, + MyStarted = zf(fun(Appl) when Appl#appl.id =:= local -> + {true, {node(), Appl#appl.name}}; + (_) -> + false + end, Appls), + {?DIST_AC, Node} ! {dist_ac_new_node, ?vsn, node(), Appls, MyStarted}, + NAppls = dist_merge(Appls, HisAppls, Node), + {noreply, S#state{appls = NAppls, known = [Node | S#state.known]}}; + +handle_info({dist_ac_app_started, Node, Name, Res}, S) -> + case {keysearch(Name, #appl.name, S#state.appls), lists:member(Name, S#state.started)} of + {{value, Appl}, true} -> + Appls = S#state.appls, + NId = case Appl#appl.id of + _ when element(1, Res) =:= error -> + %% Start of appl on some node failed. + %% Set Id to undefined. That node will have + %% to take some actions, e.g. reboot + undefined; + {distributed, _} -> + %% Another node tookover from some node. Update + %% appl list. + {distributed, Node}; + local -> + %% Another node tookover from me; stop my application + %% and update the running list. + {distributed, Node}; + _ -> + %% Another node started appl. Update appl list. + {distributed, Node} + end, + ac_started(req, Name, Node), + NAppl = Appl#appl{id = NId}, + NAppls = keyreplace(Name, #appl.name, Appls, NAppl), + TmpWeights = keydelete_all(Name, 1, S#state.tmp_weights), + NewS = S#state{appls = NAppls, tmp_weights = TmpWeights}, + NPermitReq = req_del_permit_false(NewS#state.p_reqs, Name), + case catch req_start_app(NewS#state{p_reqs = NPermitReq}, Name) of + {error, R} -> + {stop, R}; + {ok, NewS2} -> + {noreply, NewS2} + end; + {_, _} -> + %% The app has not been started at this node yet; remember this in + %% remote started. + NRStarted = [{Node, Name} | S#state.remote_started], + {noreply, S#state{remote_started = NRStarted}} + end; + +handle_info({dist_ac_app_stopped, AppName}, S) -> + Appls = S#state.appls, + case keysearch(AppName, #appl.name, Appls) of + false -> + RStarted = keydelete(AppName, 2, S#state.remote_started), + {noreply, S#state{remote_started = RStarted}}; + {value, Appl} -> + NAppl = Appl#appl{id = undefined}, + NAppls = keyreplace(AppName, #appl.name, Appls, NAppl), + RStarted = keydelete(AppName, 2, S#state.remote_started), + {noreply, S#state{appls = NAppls, remote_started = RStarted}} + end; + +handle_info({dist_ac_weight, Name, Weight, Node}, S) -> + %% This means another node starts up, and will eventually take over + %% this appl. We have a situation like: {Name, [{Node}, node()]} + %% Node sends us this msg, and we must respond. It doesn't really + %% matter what we send him; but it must be a dist_ac_weight msg. + %% Another situation is {Name, [RNode, {node()}, Node]}. + %% + %% Yet another situation is that the node where Name was running crashed, + %% and Node has got the nodedown message, but we haven't. In this case, + %% we must send a correct weight to Node. i.e. the same weight that + %% we'll send to him later, when we get the nodedown message. + case keysearch(Name, #appl.name, S#state.appls) of + {value, Appl} -> + Id = Appl#appl.id, + case Id of + run_waiting -> + {?DIST_AC, Node} ! {dist_ac_weight, Name, 0, node()}, + {noreply, S}; + undefined -> + {noreply, + S#state{tmp_locals = [{Name, Weight, Node} | + S#state.tmp_locals]}}; + {takeover, _} -> + {noreply, + S#state{tmp_locals = [{Name, Weight, Node} | + S#state.tmp_locals]}}; + {failover, _} -> + {noreply, + S#state{tmp_locals = [{Name, Weight, Node} | + S#state.tmp_locals]}}; + _ -> + MyWeight = get_cached_weight(Name, S), + {?DIST_AC, Node} ! {dist_ac_weight, Name, MyWeight, node()}, + NTWs = keyreplaceadd(Name, 1, S#state.tmp_weights, + {Name, MyWeight}), + {noreply, S#state{tmp_weights = NTWs}} + end; + _ -> + {noreply, + S#state{tmp_locals = [{Name, Weight, Node} | S#state.tmp_locals]}} + end; + +%%----------------------------------------------------------------- +%% A node died. Check if we should takeover some applications. +%%----------------------------------------------------------------- +handle_info({nodedown, Node}, S) -> + AppNames = dist_get_runnable(S#state.appls), + HisAppls = filter(fun(#appl{name = Name, id = {distributed, N}}) + when Node =:= N -> lists:member(Name, AppNames); + (_) -> false + end, + S#state.appls), + Appls2 = zf(fun(Appl) when Appl#appl.id =:= {distributed, Node} -> + case lists:member(Appl#appl.name, AppNames) of + true -> + {true, Appl#appl{id = {failover, Node}}}; + false -> + ac_not_running(Appl#appl.name), + {true, Appl#appl{id = undefined}} + end; + (_) -> + true + end, + S#state.appls), + RStarted = filter(fun({Node2, _Name}) when Node2 =:= Node -> false; + (_) -> true + end, + S#state.remote_started), + Appls3 = dist_del_node(Appls2, Node), + {NPermitReq, Appls4, SReqs} = req_del_node(S, Node, Appls3), + NKnown = lists:delete(Node, S#state.known), + NewS = S#state{appls = Appls4, p_reqs = NPermitReq, known = NKnown, + s_reqs = SReqs, + remote_started = RStarted}, + restart_appls(HisAppls), + {noreply, NewS}; + +handle_info({dist_ac_app_loaded, Node, Name, HisNodes, Permission, HeKnowsMe}, + S) -> + Nodes = dist_find_nodes(Appls = S#state.appls, Name), + case is_loaded(Name, S) of + true -> + case equal_nodes(Nodes, HisNodes) of + true -> + NAppls = dist_update_run(Appls, Name, Node, Permission), + if + not HeKnowsMe -> + %% We've got it loaded, but he doesn't know - + %% he's a new node connecting to us. + Msg = {dist_ac_app_loaded, node(), Name, + Nodes, dist_is_runnable(Appls, Name), true}, + {?DIST_AC, Node} ! Msg; + true -> + ok + end, + {noreply, S#state{appls = NAppls}}; + false -> + dist_mismatch(Name, Node) + end; + false -> + Load =[{{Name, Node}, HisNodes, Permission} | S#state.dist_loaded], + {noreply, S#state{dist_loaded = Load}} + end; + +handle_info({dist_ac_app_unloaded, Node, Name}, S) -> + Appls = dist_update_run(S#state.appls, Name, Node, undefined), + Load = keydelete({Name, Node}, 1, S#state.dist_loaded), + {noreply, S#state{appls = Appls, dist_loaded = Load}}; + + +handle_info({dist_ac_new_permission, Node, AppName, false, IsHisApp}, S) -> + Appls = dist_update_run(S#state.appls, AppName, Node, false), + NewS = S#state{appls =Appls}, + case dist_is_runnable(Appls, AppName) of + true when IsHisApp -> + case catch start_appl(AppName, NewS, req) of + {ok, NewS2, _} -> + {noreply, NewS2}; + {error, _R} -> % if app was permanent, AC will shutdown the node + {noreply, NewS} + end; + _ -> + {noreply, NewS} + end; +handle_info({dist_ac_new_permission, Node, AppName, true, _IsHisApp}, S) -> + Appls = dist_update_run(S#state.appls, AppName, Node, true), + {noreply, S#state{appls = Appls}}; + +handle_info({internal_restart_appl, Name}, S) -> + case restart_appl(Name, S) of + {error, R} -> + {stop, {error, R}, S}; + NewS -> + {noreply, NewS} + end; + +handle_info(_, S) -> + {noreply, S}. + +terminate(_Reason, _S) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%----------------------------------------------------------------- +%%% Internal functions +%%%----------------------------------------------------------------- +load(AppName, S) -> + Appls0 = S#state.appls, + %% Get the dist specification for the app on other nodes + DistLoaded = get_dist_loaded(AppName, Load1 = S#state.dist_loaded), + %% Get the local dist specification + Nodes = dist_find_nodes(Appls0, AppName), + FNodes = flat_nodes(Nodes), + %% Update dists spec with our local permission + Permission = get_default_permission(AppName), + Appls1 = dist_update_run(Appls0, AppName, node(), Permission), + %% Compare the local spec with other nodes's specs + %% If equal, update our spec with his current permission + {LoadedNodes, Appls2} = + mapfoldl( + fun({Node, HisNodes, HisPermission}, Appls) -> + case equal_nodes(Nodes, HisNodes) of + true -> + {Node, dist_update_run(Appls, AppName, + Node, HisPermission)}; + _ -> + dist_mismatch(AppName, Node) + end + end, Appls1, DistLoaded), + Load2 = del_dist_loaded(AppName, Load1), + %% Tell all Nodes about the new appl loaded, and its permission. + foreach(fun(Node) when Node =/= node() -> + Msg = {dist_ac_app_loaded, node(), AppName, + Nodes, Permission, member(Node, LoadedNodes)}, + {?DIST_AC, Node} ! Msg; + (_) -> ok + end, FNodes), + {ok, S#state{appls = Appls2, dist_loaded = Load2}}. + +ensure_take_control(AppName, Appls) -> + %% Check if this is a new application that we don't control yet + case lists:keymember(AppName, #appl.name, Appls) of + true -> % we have control + ok; + false -> % take control! + %% Note: this works because this is executed within a + %% synchronous call. I.e. we get the control *before* + %% application:load returns. (otherwise application:start + %% could be called before we got the chance to take control) + %% The only reason we have to bother about this is because + %% we have to be backwards compatible in the sense that all + %% apps don't have to be specified in the 'distributed' parameter, + %% but may be implicitly 'distributed' by a call to + %% application:load. + application_controller:control_application(AppName) + end. + +unload(AppName, S) -> + Appls = S#state.appls, + Nodes = dist_flat_nodes(Appls, AppName), + %% Tell all ACs in DistNodes about the unloaded appl + Msg = {dist_ac_app_unloaded, node(), AppName}, + send_msg(Msg, Nodes), + {value, Appl} = keysearch(AppName, #appl.name, Appls), + NAppl = Appl#appl{id = undefined, run = []}, + {ok, S#state{appls = keyreplace(AppName, #appl.name, Appls, NAppl)}}. + +start_appl(AppName, S, Type) -> + %% Get nodes, and check if App is loaded on all involved nodes. + %% If it is loaded everywhere, we know that we have the same picture + %% of the nodes; otherwise the load wouldn't have succeeded. + Appl = case keysearch(AppName, #appl.name, Appls = S#state.appls) of + {value, A} -> A; + _ -> throw({error, {unknown_application, AppName}}) + end, + case Appl#appl.id of + local -> + %% UW 990913: we've already started the app + %% this could happen if ac_start_application_req was resent. + {ok,S,false}; + _ -> + {Id, IsWaiting} = case dist_get_all_nodes(Appl) of + {ok, DistNodes, PermittedNodes} -> + start_distributed(Appl, AppName, DistNodes, + PermittedNodes, S, Type); + Error -> throw(Error) + end, + NAppl = Appl#appl{id = Id}, + NAppls = keyreplaceadd(AppName, #appl.name, Appls, NAppl), + {ok, NewS} = req_start_app(S#state{appls = NAppls}, AppName), + TmpLocals = keydelete_all(AppName, 1, NewS#state.tmp_locals), + TmpWeights = keydelete_all(AppName, 1, NewS#state.tmp_weights), + RStarted = keydelete(AppName, 2, S#state.remote_started), + Started = replaceadd(AppName, NewS#state.started), + {ok, + NewS#state{started = Started, tmp_locals = TmpLocals, + tmp_weights = TmpWeights, remote_started = RStarted}, + IsWaiting} + end. + + +start_distributed(Appl, Name, Nodes, PermittedNodes, S, Type) -> + case find_start_node(Nodes, PermittedNodes, Name, S) of + {ok, Node} when Node =:= node() -> + case Appl#appl.id of + {failover, FoNode} when Type =:= req -> + ac_failover(Name, FoNode, undefined); + {distributed, Node2} when Type =:= req -> + ac_takeover(req, Name, Node2, undefined); + _ when Type =:= reply -> + case lists:keysearch(Name, 2, S#state.remote_started) of + {value, {Node3, _}} -> + ac_takeover(reply, Name, Node3, undefined); + _ -> + ac_start_it(Type, Name) + end; + _ -> + ac_start_it(Type, Name) + end, + {run_waiting, true}; + {already_started, Node} -> + ac_started(Type, Name, Node), + {{distributed, Node}, false}; + {ok, Node} -> + case keysearch(Name, #appl.name, S#state.appls) of + {value, #appl{id = {distributed, Node}}} -> + ac_started(Type, Name, Node), + {{distributed, Node}, false}; + _ -> + wait_dist_start(Node, Appl, Name, Nodes, + PermittedNodes, S, Type) + end; + not_started -> + wait_dist_start2(Appl, Name, Nodes, PermittedNodes, S, Type); + no_permission -> + ac_not_started(Type, Name), + {undefined, false} + end. + +wait_dist_start(Node, Appl, Name, Nodes, PermittedNodes, S, Type) -> + monitor_node(Node, true), + receive + {dist_ac_app_started, Node, Name, ok} -> + ac_started(Type, Name, Node), + monitor_node(Node, false), + {{distributed, Node}, false}; + {dist_ac_app_started, Node, Name, {error, R}} -> + ac_error(Type, Name, {Node, R}), + monitor_node(Node, false), + {Appl#appl.id, false}; + {dist_ac_weight, Name, _Weigth, Node} -> + %% This is the situation: {Name, [RNode, {Node}, node()]} + %% and permit(false) is called on RNode, and we sent the + %% weigth first. Node handled it in handle_info, and + %% now we must send him a weigth msg. We can use any weigth; + %% he wins anyway. + monitor_node(Node, false), + {?DIST_AC, Node} ! + {dist_ac_weight, Name, get_cached_weight(Name, S), node()}, + wait_dist_start(Node, Appl, Name, Nodes, PermittedNodes, S, Type); + {nodedown, Node} -> + monitor_node(Node, false), + TmpLocals = + filter(fun({Name2, _Weight, Node2}) when Node2 =:= Node, + Name2 =:= Name -> false; + (_) -> true + end, + S#state.tmp_locals), + NewS = S#state{tmp_locals = TmpLocals}, + start_distributed(Appl, Name, Nodes, + lists:delete(Node, PermittedNodes), NewS, Type) + end. + +wait_dist_start2(Appl, Name, Nodes, PermittedNodes, S, Type) -> + receive + {dist_ac_app_started, Node, Name, ok} -> + ac_started(Type, Name, Node), + {{distributed, Node}, false}; + {dist_ac_app_started, Node, Name, {error, R}} -> + ac_error(Type, Name, {Node, R}), + {Appl#appl.id, false}; + {nodedown, Node} -> + %% A node went down, try to start the app again - there may not + %% be any more nodes to wait for. + TmpLocals = + filter(fun({Name2, _Weight, Node2}) when Node2 =:= Node, + Name2 =:= Name -> false; + (_) -> true + end, + S#state.tmp_locals), + NewS = S#state{tmp_locals = TmpLocals}, + start_distributed(Appl, Name, Nodes, + lists:delete(Node, PermittedNodes), NewS, Type) + end. + + +ac_start_it(reply, Name) -> + ?AC ! {ac_start_application_reply, Name, start_it}; +ac_start_it(req, Name) -> + ?AC ! {ac_change_application_req, Name, start_it}. + +ac_started(reply, Name, Node) -> + ?AC ! {ac_start_application_reply, Name, {started, Node}}; +ac_started(req, Name, Node) -> + ?AC ! {ac_change_application_req, Name, {started, Node}}. + +ac_error(reply, Name, Error) -> + ?AC ! {ac_start_application_reply, Name, {error, Error}}; +ac_error(req, _Name, _Error) -> + ok. + +ac_not_started(reply, Name) -> + ?AC ! {ac_start_application_reply, Name, not_started}; +ac_not_started(req, Name) -> + ?AC ! {ac_change_application_req, Name, stop_it}. + +ac_stop_it(Name) -> + ?AC ! {ac_change_application_req, Name, stop_it}. + +ac_takeover(reply, Name, Node, _RestartType) -> + ?AC ! {ac_start_application_reply, Name, {takeover, Node}}; +ac_takeover(req, Name, Node, RestartType) -> + ?AC ! {ac_change_application_req, Name, + {takeover, Node, RestartType}}. + +ac_failover(Name, Node, RestartType) -> + ?AC ! {ac_change_application_req, Name, + {failover, Node, RestartType}}. + +ac_not_running(Name) -> + ?AC ! {ac_change_application_req, Name, not_running}. + +restart_appls(Appls) -> + foreach(fun(Appl) -> + AppName = Appl#appl.name, + send_after(Appl#appl.restart_time, + {internal_restart_appl, AppName}) + end, lists:reverse(Appls)). + +restart_appl(AppName, S) -> + case keysearch(AppName, #appl.name, S#state.appls) of + {value, Appl} when element(1, Appl#appl.id) =:= failover -> + case catch start_appl(AppName, S, req) of + {ok, NewS, _} -> + NewS; + {error, R} -> + error_msg("Error when restarting application ~p: ~p~n", + [AppName, R]), + S + end; + _ -> + S + end. + +%% permit(ShouldBeRunning, IsRunning, ...) +permit(false, {value, #appl{id = undefined}}, _AppName, _From, S, _LockId) -> + {reply, ok, S}; % It's not running +permit(false, {value, #appl{id = Id}}, _AppName, _From, S, _LockId) + when element(1, Id) =:= distributed -> + %% It is running at another node already + {reply, ok, S}; +permit(false, {value, _}, AppName, From, S, _LockId) -> + %% It is a distributed application + %% Check if there is any runnable node + case dist_get_runnable_nodes(S#state.appls, AppName) of + [] -> + %% There is no runnable node; stop application + ac_stop_it(AppName), + SReqs = [{AppName, From} | S#state.s_reqs], + {noreply, S#state{s_reqs = SReqs}}; + Nodes -> + %% Delete all outstanding 'permit true' requests. + PR = req_del_permit_true(S#state.p_reqs, AppName), + NPReqs = [{From, AppName, false, Nodes} | PR], + {noreply, S#state{p_reqs = NPReqs}} + end; +permit(true, {value, #appl{id = local}}, _AppName, _From, S, _LockId) -> + {reply, ok, S}; +permit(true, _, AppName, From, S, LockId) -> + case catch start_appl(AppName, S, req) of + {_ErrorTag, {not_running, App}} -> + %% Delete all outstanding 'permit false' requests + PR = req_del_permit_false(S#state.p_reqs, AppName), + NPReqs = [{false, AppName, true, App} | PR], + {reply, ok, S#state{p_reqs = NPReqs}}; + {ok, NewS, true} -> + %% We have ordered a start or a takeover; we must not return + %% until the app is running. + TR = NewS#state.t_reqs, + %% Delete the lock, so others may start the app + global:del_lock(LockId), + {noreply, NewS#state{t_reqs = [{AppName, From} | TR]}}; + {ok, _S, false} -> + %% Application should be started, but at another node + %% State remains the same + {reply, ok, S}; + {_ErrorTag, R} -> + {stop, R, {error, R}, S} + end. + +do_start_appls(StartApps, S) -> + SortedStartApps = StartApps, + Appls = S#state.appls, + {ok, foldl( + fun(AppName, NewS) -> + case catch start_appl(AppName, NewS, req) of + {error, R} -> + throw({{error, NewS}, R}); + {ok, NewS2, _} -> + NewS2 + end + end, S#state{appls = Appls}, lists:reverse(SortedStartApps))}. + +%%----------------------------------------------------------------- +%% Nodes = [node() | {node(), ..., node()}] +%% A list in priority order. If it is a tuple, we may pick any of +%% them. This decision is made by all nodes in the list, and all +%% nodes choose the same. This is accomplished in the following +%% way: all Nodes send to all others a msg which tells how many +%% applications each node has started. The one with least no of +%% appls starts this one. +%%----------------------------------------------------------------- +find_start_node(Nodes, PermittedNodes, Name, S) -> + AllNodes = intersection(flat_nodes(Nodes), PermittedNodes), + case lists:member(node(), AllNodes) of + true -> + Weight = get_cached_weight(Name, S), + find_start_node(Nodes, Name, S, Weight, AllNodes); + false -> + case keysearch(Name, 2, S#state.remote_started) of + {value, {Node, _Name}} -> + {already_started, Node}; + _ when AllNodes =/= [] -> + not_started; + _ -> + no_permission + end + end. + +find_start_node([AnyNodes | Nodes], Name, S, Weight, AllNodes) + when is_tuple(AnyNodes) -> + case find_any_node(tuple_to_list(AnyNodes), Name, S, Weight, AllNodes) of + false -> find_start_node(Nodes, Name, S, Weight, AllNodes); + Res -> Res + end; +find_start_node([Node | Nodes], Name, S, Weight, AllNodes) -> + case lists:member(Node, AllNodes) of + true -> + case keysearch(Name, #appl.name, S#state.appls) of + {value, #appl{id = {distributed, Node}}} -> + {already_started, Node}; + _ -> + case keysearch(Name, 2, S#state.remote_started) of + {value, {Node, _Name}} -> + {already_started, Node}; + _ -> + {ok, Node} + end + end; + false -> find_start_node(Nodes, Name, S, Weight, AllNodes) + end; +find_start_node([], _Name, _S, _Weight, _AllNodes) -> + not_started. + +%%----------------------------------------------------------------- +%% First of all, check if the application is already running +%% somewhere in AnyNodes; in that case we shall not move it! +%%----------------------------------------------------------------- +find_any_node(AnyNodes, Name, S, Weight, AllNodes) -> + case check_running(Name, S, intersection(AnyNodes, AllNodes)) of + {already_started, Node} -> {already_started, Node}; + false -> + %% Synchronize with all other nodes. + send_nodes(AllNodes, {dist_ac_weight, Name, Weight, node()}), + Answers = [{Weight, node()} | + collect_answers(AllNodes, Name, S, [])], + %% Make a decision (the same at every node) (smallest weight wins) + find_alive_node(lists:sort(Answers), + intersection(AnyNodes, S#state.known)) + end. + +%%----------------------------------------------------------------- +%% Check if another node started the appl before we got alive. +%% If so, check if the node is one of AnyNodes. +%%----------------------------------------------------------------- +check_running(Name, #state{remote_started = RStarted, + appls = Appls}, AnyNodes) -> + case keysearch(Name, 2, RStarted) of + {value, {Node, _Name}} -> + case lists:member(Node, AnyNodes) of + true -> {already_started, Node}; + false -> false + end; + false -> + case keysearch(Name, #appl.name, Appls) of + {value, #appl{id = {distributed, Node}}} -> + case lists:member(Node, AnyNodes) of + true -> {already_started, Node}; + false -> false + end; + _ -> + false + end + end. + +find_alive_node([{_, Node} | Nodes], AliveNodes) -> + case lists:member(Node, AliveNodes) of + true -> {ok, Node}; + false -> find_alive_node(Nodes, AliveNodes) + end; +find_alive_node([], _AliveNodes) -> + false. + +%%----------------------------------------------------------------- +%% First, check if the node's msg is buffered (received in our +%% main loop). Otherwise, wait for msg or nodedown. +%% We have sent the dist_ac_weight message, and will wait for it +%% to be received here (or a nodedown). This implies that a +%% dist_ac must *always* be prepared to get this messages, and to +%% send it to us. +%%----------------------------------------------------------------- +collect_answers([Node | Nodes], Name, S, Res) when Node =/= node() -> + case keysearch(Node, 3, S#state.tmp_locals) of + {value, {Name, Weight, Node}} -> + collect_answers(Nodes, Name, S, [{Weight, Node} | Res]); + _ -> + monitor_node(Node, true), + receive + {dist_ac_weight, Name, Weight, Node} -> + monitor_node(Node, false), + collect_answers(Nodes, Name, S, [{Weight, Node} | Res]); + {nodedown, Node} -> + monitor_node(Node, false), + collect_answers(Nodes, Name, S, Res) + end + end; +collect_answers([_ThisNode | Nodes], Name, S, Res) -> + collect_answers(Nodes, Name, S, Res); +collect_answers([], _Name, _S, Res) -> + Res. + +send_nodes(Nodes, Msg) -> + FlatNodes = flat_nodes(Nodes), + foreach(fun(Node) when Node =/= node() -> {?DIST_AC, Node} ! Msg; + (_ThisNode) -> ok + end, FlatNodes). + +send_after(Time, Msg) when is_integer(Time), Time >= 0 -> + spawn_link(?MODULE, send_timeout, [self(), Time, Msg]); +send_after(_,_) -> % infinity + ok. + +send_timeout(To, Time, Msg) -> + receive + after Time -> To ! Msg + end. + +send_msg(Msg, Nodes) -> + foreach(fun(Node) when Node =/= node() -> {?DIST_AC, Node} ! Msg; + (_) -> ok + end, Nodes). + +replaceadd(Item, List) -> + case member(Item, List) of + true -> List; + false -> [Item | List] + end. + +keyreplaceadd(Key, Pos, List, New) -> + case lists:keymember(Key, Pos, List) of + true -> lists:keyreplace(Key, Pos, List, New); + false -> [New | List] + end. + +keydelete_all(Key, N, [H|T]) when element(N, H) =:= Key -> + keydelete_all(Key, N, T); +keydelete_all(Key, N, [H|T]) -> + [H|keydelete_all(Key, N, T)]; +keydelete_all(_Key, _N, []) -> []. + +-ifdef(NOTUSED). +keysearchdelete(Key, Pos, List) -> + ksd(Key, Pos, List, []). + +ksd(Key, Pos, [H | T], Rest) when element(Pos, H) =:= Key -> + {value, H, Rest ++ T}; +ksd(Key, Pos, [H | T], Rest) -> + ksd(Key, Pos, T, [H | Rest]); +ksd(_Key, _Pos, [], _Rest) -> + false. + +get_new_appl(Name, [{application, Name, App} | _]) -> + {ok, {application, Name, App}}; +get_new_appl(Name, [_ | T]) -> get_new_appl(Name, T); +get_new_appl(Name, []) -> false. +-endif. + +equal_nodes([H | T1], [H | T2]) when is_atom(H) -> + equal_nodes(T1, T2); +equal_nodes([H1 | T1], [H2 | T2]) when is_tuple(H1), is_tuple(H2) -> + case equal(tuple_to_list(H1), tuple_to_list(H2)) of + true -> equal_nodes(T1, T2); + false -> false + end; +equal_nodes([], []) -> true; +equal_nodes(_, _) -> false. + +equal([H | T] , S) -> + case lists:member(H, S) of + true -> equal(T, lists:delete(H, S)); + false -> false + end; +equal([], []) -> true; +equal(_, _) -> false. + +flat_nodes(Nodes) when is_list(Nodes) -> + foldl(fun(Node, Res) when is_atom(Node) -> [Node | Res]; + (Tuple, Res) when is_tuple(Tuple) -> tuple_to_list(Tuple) ++ Res + end, [], Nodes); +flat_nodes(Nodes) -> + throw({error, {badarg, Nodes}}). + +get_cached_weight(Name, S) -> + case lists:keysearch(Name, 1, S#state.tmp_weights) of + {value, {_, W}} -> W; + _ -> get_weight() + end. + +%% Simple weight; just count the number of applications running. +get_weight() -> + length(application:which_applications()). + +get_dist_loaded(Name, [{{Name, Node}, HisNodes, Permission} | T]) -> + [{Node, HisNodes, Permission} | get_dist_loaded(Name, T)]; +get_dist_loaded(Name, [_H | T]) -> + get_dist_loaded(Name, T); +get_dist_loaded(_Name, []) -> + []. + +del_dist_loaded(Name, [{{Name, _Node}, _HisNodes, _Permission} | T]) -> + del_dist_loaded(Name, T); +del_dist_loaded(Name, [H | T]) -> + [H | del_dist_loaded(Name, T)]; +del_dist_loaded(_Name, []) -> + []. + +req_start_app(State, Name) -> + {ok, foldl( + fun({false, AppName, true, Name2}, S) when Name =:= Name2 -> + PR = keydelete(AppName, 2, S#state.p_reqs), + NS = S#state{p_reqs = PR}, + case catch do_start_appls([AppName], NS) of + {_ErrorTag, {not_running, App}} -> + NRequests = [{false, AppName, true, App} | PR], + S#state{p_reqs = NRequests}; + {ok, NewS} -> + NewS; + {_ErrorTag, R} -> + throw({error, R}) + end; + (_, S) -> + S + end, State, State#state.p_reqs)}. + + +req_del_permit_true(Reqs, Name) -> + filter(fun({From, Name2, true, _}) when Name2 =:= Name -> + gen_server:reply(From, ok), + false; + (_) -> + true + end, Reqs). + +req_del_permit_false(Reqs, Name) -> + filter(fun({From, Name2, false, _Nodes}) when Name2 =:= Name -> + gen_server:reply(From, ok), + false; + (_) -> + true + end, Reqs). + +req_del_node(S, Node, Appls) -> + check_waiting(S#state.p_reqs, S, Node, Appls, [], S#state.s_reqs). + +del_t_reqs(AppName, TReqs, Res) -> + lists:filter(fun({AN, From}) when AppName =:= AN -> + gen_server:reply(From, Res), + false; + (_) -> + true + end, + TReqs). + + +check_waiting([{From, AppName, false, Nodes} | Reqs], + S, Node, Appls, Res, SReqs) -> + case lists:delete(Node, Nodes) of + [] -> + ac_stop_it(AppName), + NSReqs = [{AppName, From} | SReqs], + check_waiting(Reqs, Node, S, Appls, Res, NSReqs); + NNodes -> + check_waiting(Reqs, Node, S, Appls, + [{From, AppName, false, NNodes} | Res], SReqs) + end; +check_waiting([H | Reqs], S, Node, Appls, Res, SReqs) -> + check_waiting(Reqs, Node, S, Appls, [H | Res], SReqs); +check_waiting([], _Node, _S, Appls, Res, SReqs) -> + {Res, Appls, SReqs}. + +intersection([], _) -> + []; +intersection(_, []) -> + []; +intersection(L1, L2) -> + L1 -- (L1 -- L2). + +get_default_permission(AppName) -> + case application:get_env(kernel, permissions) of + {ok, Permissions} -> + case keysearch(AppName, 1, Permissions) of + {value, {_, true}} -> true; + {value, {_, false}} -> false; + {value, {_, X}} -> exit({bad_permission, {AppName, X}}); + false -> true + end; + undefined -> true + end. + +%%----------------------------------------------------------------- +%% ADT dist() - info on how an application is distributed +%% dist() = [{AppName, Time, DistNodes, [{Node, Runnable}]}] +%% Time = int() >= 0 | infinity +%% Nodes = [node() | {node()...}] +%% Runnable = true | false | undefined +%% An appl may not be started if any Runnable is undefined; +%% i.e. the appl must be loaded on all Nodes. +%%----------------------------------------------------------------- +dist_check([{AppName, Nodes} | T]) -> + P = get_default_permission(AppName), + [#appl{name = AppName, nodes = Nodes, run = [{node(), P}]} | dist_check(T)]; +dist_check([{AppName, Time, Nodes} | T]) when is_integer(Time), Time >= 0 -> + P = get_default_permission(AppName), + [#appl{name = AppName, restart_time = Time, nodes = Nodes, + run = [{node(), P}]} | dist_check(T)]; +dist_check([{AppName, infinity, Nodes} | T]) -> + P = get_default_permission(AppName), + [#appl{name = AppName, restart_time = infinity, + nodes = Nodes, run = [{node(), P}]} | + dist_check(T)]; +dist_check([_ | T]) -> + dist_check(T); +dist_check([]) -> + []. + +dist_take_control(Appls) -> + foreach(fun(#appl{name = AppName}) -> + application_controller:control_application(AppName) + end, Appls). + +dist_replace(default, _Name, Appls) -> Appls; +dist_replace({AppName, Nodes}, AppName, Appls) -> + Run = [{Node, undefined} || Node <- flat_nodes(Nodes)], + keyreplaceadd(AppName, #appl.name, Appls, + #appl{name = AppName, restart_time = 0, + nodes = Nodes, run = Run}); +dist_replace({AppName, Time, Nodes}, AppName, Appls) + when is_integer(Time), Time >= 0 -> + Run = [{Node, undefined} || Node <- flat_nodes(Nodes)], + keyreplaceadd(AppName, #appl.name, Appls, + #appl{name = AppName, restart_time = Time, + nodes = Nodes, run = Run}); +dist_replace(Bad, _Name, _Appls) -> + throw({error, {bad_distribution_spec, Bad}}). + +dist_update_run(Appls, AppName, Node, Permission) -> + map(fun(Appl) when Appl#appl.name =:= AppName -> + Run = Appl#appl.run, + NRun = keyreplaceadd(Node, 1, Run, {Node, Permission}), + Appl#appl{run = NRun}; + (Appl) -> + Appl + end, Appls). + + + +dist_change_update(Appls, []) -> + Appls; +dist_change_update(Appls, [{AppName, NewNodes} | NewDist]) -> + NewAppls = do_dist_change_update(Appls, AppName, 0, NewNodes), + dist_change_update(NewAppls, NewDist); +dist_change_update(Appls, [{AppName, NewTime, NewNodes} | NewDist]) -> + NewAppls = do_dist_change_update(Appls, AppName, NewTime, NewNodes), + dist_change_update(NewAppls, NewDist). + +do_dist_change_update(Appls, AppName, NewTime, NewNodes) -> + map(fun(Appl) when Appl#appl.name =:= AppName -> + Appl#appl{restart_time = NewTime, nodes = NewNodes}; + (Appl) -> + Appl + end, Appls). + +%% Merge his Permissions with mine. +dist_merge(MyAppls, HisAppls, HisNode) -> + zf(fun(Appl) -> + #appl{name = AppName, run = Run} = Appl, +% #appl{name = AppName, nodes = Nodes, run = Run} = Appl, +% HeIsMember = lists:member(HisNode, flat_nodes(Nodes)), + HeIsMember = true, + case keysearch(AppName, #appl.name, HisAppls) of + {value, #appl{run = HisRun}} when HeIsMember -> + case keysearch(HisNode, 1, HisRun) of + {value, Val} -> % He has it loaded + NRun = keyreplaceadd(HisNode, 1, Run, Val), + {true, Appl#appl{run = NRun}}; + false -> % He hasn't loaded it yet + Val = {HisNode, undefined}, + {true, Appl#appl{run = [Val | Run]}} + end; + _ -> + true + end + end, MyAppls). + +dist_get_runnable_nodes(Appls, AppName) -> + case keysearch(AppName, #appl.name, Appls) of + {value, #appl{run = Run}} -> + zf(fun({Node, true}) -> {true, Node}; + (_) -> false + end, Run); + false -> + [] + end. + +dist_is_runnable(Appls, AppName) -> + case keysearch(AppName, #appl.name, Appls) of + {value, #appl{run = Run}} -> + case keysearch(node(), 1, Run) of + {value, {_, true}} -> true; + _ -> false + end; + false -> + false + end. + +is_loaded(AppName, #state{appls = Appls}) -> + case keysearch(AppName, #appl.name, Appls) of + {value, #appl{run = Run}} -> + case keysearch(node(), 1, Run) of + {value, {_Node, undefined}} -> false; + {value, _} -> true; + false -> false + end; + false -> + false + end. + +dist_get_runnable(Appls) -> + zf(fun(#appl{name = AppName, run = Run}) -> + case keysearch(node(), 1, Run) of + {value, {_, true}} -> {true, AppName}; + _ -> false + end + end, Appls). + +dist_get_all_nodes(#appl{name = AppName, nodes = Nodes, run = Run}) -> + {Res, BadNodes} = check_nodes(Run, [], []), + case intersection(BadNodes, erlang:nodes(connected)) of + [] -> {ok, Nodes, Res}; + _ -> {error, {app_not_loaded, AppName, BadNodes}} + end. + +check_nodes([{Node, undefined} | T], Res, BadNodes) -> + check_nodes(T, Res, [Node | BadNodes]); +check_nodes([{Node, true} | T], Res, BadNodes) -> + check_nodes(T, [Node | Res], BadNodes); +check_nodes([{_Node, false} | T], Res, BadNodes) -> + check_nodes(T, Res, BadNodes); +check_nodes([], Res, BadNodes) -> + {Res, BadNodes}. + +-ifdef(NOTUSED). +dist_find_time([#appl{name = Name, restart_time = Time} |_], Name) -> Time; +dist_find_time([_ | T], Name) -> dist_find_time(T, Name); +dist_find_time([], Name) -> 0. +-endif. + +%% Find all nodes that can run the app (even if they're not permitted +%% to right now). +dist_find_nodes([#appl{name = Name, nodes = Nodes} |_], Name) -> Nodes; +dist_find_nodes([_ | T], Name) -> dist_find_nodes(T, Name); +dist_find_nodes([], _Name) -> []. + +dist_flat_nodes(Appls, Name) -> + flat_nodes(dist_find_nodes(Appls, Name)). + +dist_del_node(Appls, Node) -> + map(fun(Appl) -> + NRun = filter(fun({N, _Runnable}) when N =:= Node -> false; + (_) -> true + end, Appl#appl.run), + Appl#appl{run = NRun} + end, Appls). + +validRestartType(permanent) -> true; +validRestartType(temporary) -> true; +validRestartType(transient) -> true; +validRestartType(_RestartType) -> false. + +dist_mismatch(AppName, Node) -> + error_msg("Distribution mismatch for application \"~p\" on nodes ~p and ~p~n", + [AppName, node(), Node]), + exit({distribution_mismatch, AppName, Node}). + +%error_msg(Format) when is_list(Format) -> +% error_msg(Format, []). + +error_msg(Format, ArgList) when is_list(Format), is_list(ArgList) -> + error_logger:error_msg("dist_ac on node ~p:~n" ++ Format, [node()|ArgList]). + +%info_msg(Format) when is_list(Format) -> +% info_msg(Format, []). + +%info_msg(Format, ArgList) when is_list(Format), is_list(ArgList) -> +% error_logger:info_msg("dist_ac on node ~p:~n" ++ Format, [node()|ArgList]). |