%%
%% %CopyrightBegin%
%% 
%% Copyright Ericsson AB 1997-2013. All Rights Reserved.
%% 
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%% 
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%% 
%% %CopyrightEnd%
%%
-module(dist_ac).

-behaviour(gen_server).

%% External exports
-export([start_link/0,
	 load_application/2,
	 takeover_application/2,
	 permit_application/2,
	 permit_only_loaded_application/2]).

-export([get_known_nodes/0]).

%% Internal exports
-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2,
	 code_change/3, send_timeout/3]).
-export([info/0]).

-import(lists, [zf/2, filter/2, map/2, foreach/2, foldl/3, mapfoldl/3,
		keysearch/3, keydelete/3, keyreplace/4, member/2]).

-define(AC, application_controller).
-define(DIST_AC, ?MODULE).
-define(LOCK_ID, ?MODULE).

%% This is the protocol version for the dist_ac protcol (between nodes)
-define(vsn, 1).

%%%-----------------------------------------------------------------
%%% This module implements the default Distributed Applications
%%% Controller.  It is possible to write other controllers, when
%%% the functionality in this module are not sufficient.
%%% The process cooperates with the application_controller.
%%%-----------------------------------------------------------------

%%-----------------------------------------------------------------
%% Naming conventions:
%%   Appl = #appl
%%   AppName = atom()
%%-----------------------------------------------------------------
-record(state, {appls = [], tmp_locals = [], remote_started = [],
		known = [], started = [], tmp_weights = [],
		dist_loaded = [], t_reqs = [], s_reqs = [], p_reqs = []}).
%%-----------------------------------------------------------------
%% appls           = [#appl()] - these are the applications we control
%% tmp_locals      = [{AppName, Weight, node()}] - tmp, info part of
%%                   application startup for some distrib appls,
%%                   not yet handled.
%% remote_started  = [{Node, AppName}] - info on apps started before
%%                   we were started
%% known           = [Node] - These are the nodes known to us
%% started         = [AppName] - An ordered list of started applications
%%                   (reversed start order)
%% tmp_weight      = [{AppName, MyWeight}] - tmp, if we're forced to
%%                   send a dist_ac_weight message before we're prepared to,
%%                   we remember the weight we sent here, so we can use
%%                   it in the dist_ac_weight msgs later.
%% dist_loaded     = {{Name, Node}, HisNodes, Permission} - info on
%%                   application loaded on other nodes (and own node)
%% t_reqs          = [{AppName, From}] - processes waiting for takeover
%%                   to complete.
%% s_reqs          = [{AppName, From}] - processes waiting for stop
%%                   to complete.
%% p_reqs          = [{From, AppName, Bool, [Node]] - outstanding permit.
%%                   Nodes is a list of nodes we're still waiting for.
%%-----------------------------------------------------------------

-record(appl, {name, id, restart_time = 0,  nodes = [], run = []}).

%%-----------------------------------------------------------------
%% id = local | undefined | {distributed, node()} | waiting | run_waiting |
%%      {failover, Node} | {takeover, Node}
%%      local : local application
%%      undefined : not yet started
%%      {distributed, Node} : running on another node, we're standby
%%      {failover, Node} : failover from Node
%%      {takeover, Node} : takeover from Node
%%      waiting : other node went down, we're waiting for a timeout
%%                to takeover it.  From = pid() | undefined
%%      run_waiting : we have decided to start the app; wait for the
%%                    AC result
%%-----------------------------------------------------------------

start_link() ->
    case gen_server:start_link({local, ?DIST_AC}, ?MODULE, [], []) of
	{ok, Pid} ->
	    gen_server:cast(?DIST_AC, init_sync),
	    {ok, Pid};
	Else ->
	    Else
    end.
    

%%-----------------------------------------------------------------
%% Func: load_application(AppName, DistNodes)
%% Args: AppName = atom()
%%       DistNodes = default | {AppName, Time, [node() | {node()...}]}
%% Purpose: Notifies the dist_ac about distributed nodes for an
%%          application.  DistNodes overrides the kernel 'distributed'
%%          parameter.
%% Returns: ok | {error, Reason}
%%-----------------------------------------------------------------
load_application(AppName, DistNodes) ->
    gen_server:call(?DIST_AC, {load_application, AppName, DistNodes}, infinity).

takeover_application(AppName, RestartType) ->
    case validRestartType(RestartType) of
	true ->
	    wait_for_sync_dacs(),
	    Nodes = get_nodes(AppName),
	    global:trans(
	      {?LOCK_ID, self()},
	      fun() ->
		      gen_server:call(
			?DIST_AC,
			{takeover_application, AppName, RestartType},
			infinity)
	      end,
	      Nodes);
	false ->
	    {error, {invalid_restart_type, RestartType}}
    end.

%%-----------------------------------------------------------------
%% This function controls which applications are permitted to run.  If
%% an application X runs when this function is called as
%% permit_application(X, false), it is moved to another node where it
%% is permitted to run (distributed applications only).  If there is
%% no such node, the application is stopped. (I.e. local applications
%% are always stopped, and distributed applications with no other node
%% alive are stopped as well.)  If later a call to
%% permit_application(X, true) is made, X is restarted.
%% For example, suppose applications app1 and app2 are started and
%% running.
%% If we evaluate
%%   permit_application(app2, false)
%% app2 is stopped and app1 only is running.
%% If we now evaluate
%%   permit_application(app2, true),
%%   permit_application(app3, true)
%% app2 is restarted, but not app3, since it hasn't been started by a
%% call to start_application.
%%-----------------------------------------------------------------
permit_application(AppName, Bool) ->
    wait_for_sync_dacs(),
    LockId = {?LOCK_ID, self()},
    global:trans(
      LockId,
      fun() ->
	      gen_server:call(?DIST_AC,
			      {permit_application, AppName, Bool, LockId, started},
			      infinity)
      end).

permit_only_loaded_application(AppName, Bool) ->
    wait_for_sync_dacs(),
    LockId = {?LOCK_ID, self()},
    global:trans(
      LockId,
      fun() ->
	      gen_server:call(?DIST_AC,
			      {permit_application, AppName, Bool, LockId, only_loaded},
			      infinity)
      end).

get_nodes(AppName) ->
    gen_server:call(?DIST_AC, {get_nodes, AppName}, infinity).

get_known_nodes() ->
    gen_server:call(?DIST_AC, get_known_nodes).

%%%-----------------------------------------------------------------
%%% call-back functions from gen_server
%%%-----------------------------------------------------------------
init([]) ->
    process_flag(trap_exit, true),
    {ok, #state{}}.

sync_dacs(Appls) ->
    Res = global:trans({?LOCK_ID, sync_dacs},
		       fun() ->
			       Nodes = introduce_me(nodes(), Appls),
			       wait_dacs(Nodes, [node()], Appls, [])
		       end),
    ets:insert(ac_tab, {sync_dacs, ok}),
    Res.
  
introduce_me(Nodes, Appls) ->
    Msg = {dist_ac_new_node, ?vsn, node(), Appls, []},
    filter(fun(Node) ->
		   %% This handles nodes without DACs
		   case rpc:call(Node, erlang, whereis, [?DIST_AC]) of
		       Pid when is_pid(Pid) ->
			   Pid ! Msg,
			   true;
		       _ ->
			   false
		   end
	   end, Nodes).

wait_dacs([Node | Nodes], KnownNodes, Appls, RStarted) ->
    monitor_node(Node, true),
    receive
	%% HisAppls =/= [] is the case when our node connects to a running system
	%%
	%% It is always the responsibility of newer versions to understand
	%% older versions of the protocol.  As we don't have any older
	%% versions (that are supposed to work with this version), we
	%% don't handle version mismatch here.
	{dist_ac_new_node, _Vsn, Node, HisAppls, HisStarted} ->
	    monitor_node(Node, false),
	    NRStarted = RStarted ++ HisStarted,
	    NAppls = dist_merge(Appls, HisAppls, Node),
	    wait_dacs(Nodes, [Node | KnownNodes], NAppls, NRStarted);
	{nodedown, Node} ->
	    monitor_node(Node, false),
	    wait_dacs(Nodes, KnownNodes, Appls, RStarted)
    end;
wait_dacs([], KnownNodes, Appls, RStarted) ->
    {KnownNodes, Appls, RStarted}.


info() ->
    gen_server:call(?DIST_AC, info).
    

%%-----------------------------------------------------------------
%% All functions that can affect which applications are running
%% execute within a global lock, to ensure that they are not
%% executing at the same time as sync_dacs.  However, to avoid a
%% deadlock situation where e.g. permit_application gets the lock
%% before sync_dacs, this function is used to ensure that the local
%% sync_dacs always gets the lock first of all.  The lock is still
%% used to not interfere with sync_dacs on other nodes.
%%-----------------------------------------------------------------
wait_for_sync_dacs() ->
    case catch ets:lookup(ac_tab, sync_dacs) of
	[{sync_dacs, ok}] -> ok;
	_ ->
	    receive after 100 -> ok end,
	    wait_for_sync_dacs()
    end.

handle_cast(init_sync, _S) ->
    %% When the dist_ac is started, it receives this msg, and gets into
    %% the receive loop.  'go' is sent from the kernel_config proc when
    %% all nodes that should be pinged has been pinged.  The reason for this
    %% is that dist_ac syncs with the other nodes at start-up.  That is,
    %% it does _not_ handle partitioned nets!  The other nodes tries to call
    %% the local name dist_ac, which means that this name must be registered
    %% before the distribution.  But it can't sync until after the distribution
    %% is started.  Therefore, this 'go'-thing.
    receive
	{go, KernelConfig} ->
	    Appls = case application:get_env(kernel, distributed) of
			{ok, D} -> dist_check(D);
			undefined -> []
		    end,

	    dist_take_control(Appls),
	    %% kernel_config waits for dist_ac to take control over its
	    %% applications. By this we can be sure that the kernel
	    %% application hasn't completed its start before dist_ac has
	    %% taken control over its applications. (OTP-3509)
	    KernelConfig ! dist_ac_took_control,

	    %% we're really just interested in nodedowns.
	    ok = net_kernel:monitor_nodes(true),
	    
	    {Known, NAppls, RStarted} = sync_dacs(Appls),

	    {noreply,
	     #state{appls = NAppls, known = Known, remote_started = RStarted}}
    end.


handle_call(info, _From, S) ->
    {reply, S, S};
    


handle_call({load_application, AppName, DistNodes}, _From, S) ->
    Appls = S#state.appls,
    case catch dist_replace(DistNodes, AppName, Appls) of
	{error, Error} ->
	    {reply, {error, Error}, S};
	{'EXIT', R} -> 
	    {stop, R, {error, R}, S};
	NAppls ->
	    NewS = case dist_find_nodes(NAppls, AppName) of
		       [] -> % No distrib nodes; we ignore it
			   S;
		       _Nodes ->
			   ensure_take_control(AppName, Appls),
			   {ok, S2} = load(AppName, S#state{appls = NAppls}),
			   S2
		   end,
	    {reply, ok, NewS}
    end;

handle_call({takeover_application, AppName, RestartType}, From, S) ->
    Appls = S#state.appls,
    case keysearch(AppName, #appl.name, Appls) of
	{value, Appl} when element(1, Appl#appl.id) =:= distributed ->
	    {distributed, Node} = Appl#appl.id,
	    _ = ac_takeover(req, AppName, Node, RestartType),
	    NAppl = Appl#appl{id = takeover},
	    NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
	    TR = S#state.t_reqs,
	    {noreply, S#state{appls = NAppls,
			      t_reqs = [{AppName, From} | TR]}};
	{value, #appl{id = local}} ->
	    {reply, {error, {already_running_locally, AppName}}, S};
	_ ->
	    {reply, {error, {not_running_distributed, AppName}}, S}
    end;

handle_call({permit_application, AppName, Bool, LockId, StartInfo}, From, S) ->
    case lists:keymember(AppName, #appl.name, S#state.appls) of
	false ->
	    %% This one covers the case with permit for non-distributed
	    %% applications.  This shouldn't be handled like this, and not
	    %% here, but we have to be backwards-compatible.
	    case application_controller:get_loaded(AppName) of
		{true, _} when not Bool ->
		    _ = ac_stop_it(AppName),
		    {reply, ok, S};
		{true, _} when Bool ->
		    _ = ac_start_it(req, AppName),
		    {reply, ok, S};
		false ->
		    {reply, {error, {not_loaded, AppName}}, S}
	    end;
	true ->
	    NAppls = dist_update_run(S#state.appls, AppName, node(), Bool),
	    NewS = S#state{appls = NAppls},
	    %% Check if the application is running
	    IsRunning = keysearch(AppName, #appl.name, NAppls),
	    IsMyApp = case IsRunning of
			  {value, #appl{id = local}} -> true;
			  _ -> false
		      end,
	    %% Tell everyone about the new permission
	    Nodes = dist_flat_nodes(NAppls, AppName),
	    Msg = {dist_ac_new_permission, node(), AppName, Bool, IsMyApp},
	    send_msg(Msg, Nodes),
	    case StartInfo of
		only_loaded ->
		    {reply, ok, NewS};
		started ->
		    permit(Bool, IsRunning, AppName, From, NewS, LockId)
	    end
    end;

%%-----------------------------------------------------------------
%% The distributed parameter is changed. Update the parameters
%% but the applications are actually not moved to other nodes
%% even if they should.
%%-----------------------------------------------------------------
handle_call({distribution_changed, NewDistribution}, _From, S) ->
    Appls = S#state.appls,
    NewAppls = dist_change_update(Appls, NewDistribution),
    NewS = S#state{appls = NewAppls},
    {reply, ok, NewS};

    
handle_call({get_nodes, AppName}, _From, S) ->
    Alive = intersection(dist_flat_nodes(S#state.appls, AppName),
			 S#state.known),
    {reply, Alive, S};

handle_call(get_known_nodes, _From, S) ->
    {reply, S#state.known, S}.


handle_info({ac_load_application_req, AppName}, S) ->
    {ok, NewS} = load(AppName, S),
    ?AC ! {ac_load_application_reply, AppName, ok},
    {noreply, NewS};
	    
handle_info({ac_application_unloaded, AppName}, S) ->
    {ok, NewS} = unload(AppName, S),
    {noreply, NewS};
	    
handle_info({ac_start_application_req, AppName}, S) ->
    %% We must decide if we or another node should start the application
    Lock = {?LOCK_ID, self()},
    case global:set_lock(Lock, [node()], 0) of
	true ->
	    S2 = case catch start_appl(AppName, S, reply) of
		     {ok, NewS, _} ->
			 NewS;
		     {error, R} -> 
			 ?AC ! {ac_start_application_reply, AppName, {error,R}},
			 S
		 end,
	    global:del_lock(Lock),
	    {noreply, S2};
	false ->
	    send_after(100, {ac_start_application_req, AppName}),
	    {noreply, S}
    end;

handle_info({ac_application_run, AppName, Res}, S) ->
    %% We ordered a start, and here's the result.  Tell all other nodes.
    Appls = S#state.appls,
    Nodes = S#state.known,
    %% Send this to _all_ known nodes, as any node could sync
    %% on this app (not only nodes that can run it).
    send_msg({dist_ac_app_started, node(), AppName, Res}, Nodes),
    NId = case Res of
	       ok -> local;
	       {error, _R} -> undefined
	  end,
    {value, Appl} = keysearch(AppName, #appl.name, Appls),
    %% Check if we have somebody waiting for the takeover result
    NTReqs = del_t_reqs(AppName, S#state.t_reqs, Res),
    NAppl = Appl#appl{id = NId},
    NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
    {noreply, S#state{appls = NAppls, t_reqs = NTReqs}};


handle_info({ac_application_not_run, AppName}, S) ->
    %% We ordered a stop, and now it has stopped
    {value, Appl} = keysearch(AppName, #appl.name, Appls = S#state.appls),
    %% Check if we have somebody waiting for the takeover result;
    %% if somebody called stop just before takeover was handled,
    NTReqs = del_t_reqs(AppName, S#state.t_reqs, {error, stopped}),
    %% Check if we have somebody waiting for stop to return
    SReqs = filter(fun({Name, From2}) when Name =:= AppName ->
			   gen_server:reply(From2, ok),
			   false;
		      (_) ->
			   true
		   end, S#state.s_reqs),
    RS = case Appl#appl.id of
	     local ->
		 send_msg({dist_ac_app_stopped, AppName}, S#state.known),
		 S#state.remote_started;
	     {distributed, Node} ->
		 [{Node, AppName} | S#state.remote_started];
	     _ ->
		 S#state.remote_started
	 end,
    NAppl = Appl#appl{id = undefined},
    NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
    {noreply, S#state{appls = NAppls, t_reqs = NTReqs, s_reqs = SReqs,
		      remote_started = RS}};

handle_info({ac_application_stopped, AppName}, S) ->
    %% Somebody called application:stop - reset state as it was before
    %% the application was started.
    {value, Appl} = keysearch(AppName, #appl.name, Appls = S#state.appls),
    %% Check if we have somebody waiting for the takeover result;
    %% if somebody called stop just before takeover was handled,
    NTReqs = del_t_reqs(AppName, S#state.t_reqs, {error, stopped}),
    %% Check if we have somebody waiting for stop to return
    SReqs = filter(fun({Name, From2}) when Name =:= AppName ->
			   gen_server:reply(From2, ok),
			   false;
		      (_) ->
			   true
		   end, S#state.s_reqs),
    RS = case Appl#appl.id of
	     local ->
		 send_msg({dist_ac_app_stopped, AppName}, S#state.known),
		 S#state.remote_started;
	     {distributed, Node} ->
		 [{Node, AppName} | S#state.remote_started];
	     _ ->
		 S#state.remote_started
	 end,
    NAppl = Appl#appl{id = undefined},
    NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
    Started = lists:delete(AppName, S#state.started),
    {noreply, S#state{appls = NAppls, started = Started,
		      t_reqs = NTReqs, s_reqs = SReqs,
		      remote_started = RS}};


%%-----------------------------------------------------------------
%% A new node gets running.
%% Send him info about our started distributed applications.
%%-----------------------------------------------------------------
handle_info({dist_ac_new_node, _Vsn, Node, HisAppls, []}, S) ->
    Appls = S#state.appls,
    MyStarted = zf(fun(Appl) when Appl#appl.id =:= local ->
			   {true, {node(), Appl#appl.name}};
		      (_) ->
			   false
		   end, Appls),
    {?DIST_AC, Node} ! {dist_ac_new_node, ?vsn, node(), Appls, MyStarted},
    NAppls = dist_merge(Appls, HisAppls, Node),
    {noreply, S#state{appls = NAppls, known = [Node | S#state.known]}};

handle_info({dist_ac_app_started, Node, Name, Res}, S) ->
    case {keysearch(Name, #appl.name, S#state.appls), lists:member(Name, S#state.started)} of
	{{value, Appl}, true} ->
	    Appls = S#state.appls,
	    NId = case Appl#appl.id of
		      _ when element(1, Res) =:= error ->
			  %% Start of appl on some node failed.
			  %% Set Id to undefined.  That node will have
			  %% to take some actions, e.g. reboot
			  undefined;
		      {distributed, _} ->
			  %% Another node tookover from some node. Update
			  %% appl list.
			  {distributed, Node};
		      local -> 
			  %% Another node tookover from me; stop my application
			  %% and update the running list.
			  {distributed, Node};
		      _ ->
			  %% Another node started appl. Update appl list.
			  {distributed, Node}
		  end,
	    _ = ac_started(req, Name, Node),
	    NAppl = Appl#appl{id = NId},
	    NAppls = keyreplace(Name, #appl.name, Appls, NAppl),
	    TmpWeights = keydelete_all(Name, 1, S#state.tmp_weights),
	    NewS = S#state{appls = NAppls, tmp_weights = TmpWeights},
	    NPermitReq = req_del_permit_false(NewS#state.p_reqs, Name),
	    case catch req_start_app(NewS#state{p_reqs = NPermitReq}, Name) of
		{error, R} ->
		    {stop, R};
		{ok, NewS2} ->
		    {noreply, NewS2}
	    end;
	{_, _} ->
	    %% The app has not been started at this node yet; remember this in
	    %% remote started.
	    NRStarted = [{Node, Name} | S#state.remote_started],
	    {noreply, S#state{remote_started = NRStarted}}
    end;

handle_info({dist_ac_app_stopped, AppName}, S) ->
    Appls = S#state.appls,
    case keysearch(AppName, #appl.name, Appls) of
	false ->
	    RStarted = keydelete(AppName, 2, S#state.remote_started),
	    {noreply, S#state{remote_started = RStarted}};
	{value, Appl} ->
	    NAppl = Appl#appl{id = undefined},
	    NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
	    RStarted = keydelete(AppName, 2, S#state.remote_started),
	    {noreply, S#state{appls = NAppls, remote_started = RStarted}}
    end;

handle_info({dist_ac_weight, Name, Weight, Node}, S) ->
    %% This means another node starts up, and will eventually take over
    %% this appl.  We have a situation like: {Name, [{Node}, node()]}
    %% Node sends us this msg, and we must respond.  It doesn't really
    %% matter what we send him; but it must be a dist_ac_weight msg.
    %% Another situation is {Name, [RNode, {node()}, Node]}.
    %%
    %% Yet another situation is that the node where Name was running crashed,
    %% and Node has got the nodedown message, but we haven't.  In this case,
    %% we must send a correct weight to Node. i.e. the same weight that
    %% we'll send to him later, when we get the nodedown message.
    case keysearch(Name, #appl.name, S#state.appls) of
	{value, Appl} -> 
	    Id = Appl#appl.id,
	    case Id of 
		run_waiting ->
		    {?DIST_AC, Node} ! {dist_ac_weight, Name, 0, node()},
		    {noreply, S};
		undefined -> 
		    {noreply, 
		     S#state{tmp_locals = [{Name, Weight, Node} |
					   S#state.tmp_locals]}};
		{takeover, _} -> 
		    {noreply, 
		     S#state{tmp_locals = [{Name, Weight, Node} |
					   S#state.tmp_locals]}};
		{failover, _} -> 
		    {noreply, 
		     S#state{tmp_locals = [{Name, Weight, Node} |
					   S#state.tmp_locals]}};
		_ ->
		    MyWeight = get_cached_weight(Name, S),
		    {?DIST_AC, Node} ! {dist_ac_weight, Name, MyWeight, node()},
		    NTWs = keyreplaceadd(Name, 1, S#state.tmp_weights,
					 {Name, MyWeight}),
		    {noreply,  S#state{tmp_weights = NTWs}}
	    end;
	_ ->
	    {noreply, 
	     S#state{tmp_locals = [{Name, Weight, Node} | S#state.tmp_locals]}}
    end;

%%-----------------------------------------------------------------
%% A node died.  Check if we should takeover some applications.
%%-----------------------------------------------------------------
handle_info({nodedown, Node}, S) ->
    AppNames = dist_get_runnable(S#state.appls),
    HisAppls = filter(fun(#appl{name = Name, id = {distributed, N}}) 
			 when Node =:= N -> lists:member(Name, AppNames);
			 (_) -> false
		      end,
		      S#state.appls),
    Appls2 = zf(fun(Appl) when Appl#appl.id =:= {distributed, Node} -> 
			case lists:member(Appl#appl.name, AppNames) of
			    true ->
				{true, Appl#appl{id = {failover, Node}}};
			    false -> 
				_ = ac_not_running(Appl#appl.name),
				{true, Appl#appl{id = undefined}}
			end;
		   (_) ->
			true
		end,
		S#state.appls),
    RStarted = filter(fun({Node2, _Name}) when Node2 =:= Node -> false;
			 (_) -> true
		      end,
		      S#state.remote_started),
    Appls3 = dist_del_node(Appls2, Node),
    {NPermitReq, Appls4, SReqs} = req_del_node(S, Node, Appls3),
    NKnown = lists:delete(Node, S#state.known),
    NewS = S#state{appls = Appls4, p_reqs = NPermitReq, known = NKnown,
		   s_reqs = SReqs,
		   remote_started = RStarted},
    restart_appls(HisAppls),
    {noreply, NewS};

handle_info({dist_ac_app_loaded, Node, Name, HisNodes, Permission, HeKnowsMe},
	    S) ->
    Nodes = dist_find_nodes(Appls = S#state.appls, Name),
    case is_loaded(Name, S) of
	true ->
	    case equal_nodes(Nodes, HisNodes) of
		true ->
		    NAppls = dist_update_run(Appls, Name, Node, Permission),
		    if
			not HeKnowsMe ->
			    %% We've got it loaded, but he doesn't know -
			    %% he's a new node connecting to us.
			    Msg = {dist_ac_app_loaded, node(), Name,
				   Nodes, dist_is_runnable(Appls, Name), true},
			    {?DIST_AC, Node} ! Msg,
                            ok;
			true ->
			    ok
		    end,
		    {noreply, S#state{appls = NAppls}};
		false ->
		    dist_mismatch(Name, Node)
	    end;
	false ->
	    Load =[{{Name, Node}, HisNodes, Permission} | S#state.dist_loaded],
	    {noreply, S#state{dist_loaded = Load}}
    end;

handle_info({dist_ac_app_unloaded, Node, Name}, S) ->
    Appls = dist_update_run(S#state.appls, Name, Node, undefined),
    Load = keydelete({Name, Node}, 1, S#state.dist_loaded),
    {noreply, S#state{appls = Appls, dist_loaded = Load}};


handle_info({dist_ac_new_permission, Node, AppName, false, IsHisApp}, S) ->
    Appls = dist_update_run(S#state.appls, AppName, Node, false),
    NewS = S#state{appls =Appls},
    case dist_is_runnable(Appls, AppName) of
	true when IsHisApp ->
	    case catch start_appl(AppName, NewS, req) of
		{ok, NewS2, _}  ->
		    {noreply, NewS2};
		{error, _R} -> % if app was permanent, AC will shutdown the node
		    {noreply, NewS}
	    end;
	_ ->
	    {noreply, NewS}
    end;
handle_info({dist_ac_new_permission, Node, AppName, true, _IsHisApp}, S) ->
    Appls = dist_update_run(S#state.appls, AppName, Node, true),
    {noreply, S#state{appls = Appls}};

handle_info({internal_restart_appl, Name}, S) ->
    case restart_appl(Name, S) of
	{error, R} ->
	    {stop, {error, R}, S};
	NewS ->
	    {noreply, NewS}
    end;
    
handle_info(_, S) ->
    {noreply, S}.

terminate(_Reason, _S) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%-----------------------------------------------------------------
%%% Internal functions
%%%-----------------------------------------------------------------
load(AppName, S) ->
    Appls0 = S#state.appls,
    %% Get the dist specification for the app on other nodes
    DistLoaded = get_dist_loaded(AppName, Load1 = S#state.dist_loaded),
    %% Get the local dist specification
    Nodes = dist_find_nodes(Appls0, AppName),
    FNodes = flat_nodes(Nodes),
    %% Update dists spec with our local permission
    Permission = get_default_permission(AppName),
    Appls1 = dist_update_run(Appls0, AppName, node(), Permission),
    %% Compare the local spec with other nodes's specs
    %% If equal, update our spec with his current permission
    {LoadedNodes, Appls2} =
	mapfoldl(
	  fun({Node, HisNodes, HisPermission}, Appls) ->
		  case equal_nodes(Nodes, HisNodes) of
		      true ->
			  {Node, dist_update_run(Appls, AppName,
						 Node, HisPermission)};
		      _ ->
			  dist_mismatch(AppName, Node)
		  end
	  end, Appls1, DistLoaded),
    Load2 = del_dist_loaded(AppName, Load1),
    %% Tell all Nodes about the new appl loaded, and its permission.
    foreach(fun(Node) when Node =/= node() ->
		    Msg = {dist_ac_app_loaded, node(), AppName,
			   Nodes, Permission, member(Node, LoadedNodes)},
		    {?DIST_AC, Node} ! Msg;
	       (_) -> ok
	    end, FNodes),
    {ok, S#state{appls = Appls2, dist_loaded = Load2}}.

ensure_take_control(AppName, Appls) ->
    %% Check if this is a new application that we don't control yet
    case lists:keymember(AppName, #appl.name, Appls) of
	true ->       % we have control
	    ok;
	false ->      % take control!
	    %% Note: this works because this is executed within a
	    %% synchronous call. I.e. we get the control *before*
	    %% application:load returns. (otherwise application:start
	    %% could be called before we got the chance to take control)
	    %% The only reason we have to bother about this is because
	    %% we have to be backwards compatible in the sense that all
	    %% apps don't have to be specified in the 'distributed' parameter,
	    %% but may be implicitly 'distributed' by a call to
	    %% application:load.
	    application_controller:control_application(AppName)
    end.
    
unload(AppName, S) ->
    Appls = S#state.appls,
    Nodes = dist_flat_nodes(Appls, AppName),
    %% Tell all ACs in DistNodes about the unloaded appl
    Msg = {dist_ac_app_unloaded, node(), AppName},
    send_msg(Msg, Nodes),
    {value, Appl} = keysearch(AppName, #appl.name, Appls),
    NAppl = Appl#appl{id = undefined, run = []},
    {ok, S#state{appls = keyreplace(AppName, #appl.name, Appls, NAppl)}}.

start_appl(AppName, S, Type) ->
    %% Get nodes, and check if App is loaded on all involved nodes.
    %% If it is loaded everywhere, we know that we have the same picture
    %% of the nodes; otherwise the load wouldn't have succeeded.
    Appl = case keysearch(AppName, #appl.name, Appls = S#state.appls) of
	       {value, A} -> A;
	       _ -> throw({error, {unknown_application, AppName}})
	   end,
    case Appl#appl.id of
	local ->
	    %% UW 990913: we've already started the app
	    %% this could happen if ac_start_application_req was resent.
	    {ok,S,false};
	_ ->
	    {Id, IsWaiting} = case dist_get_all_nodes(Appl) of
				  {ok, DistNodes, PermittedNodes} ->
				      start_distributed(Appl, AppName, DistNodes,
							PermittedNodes, S, Type);
				  Error -> throw(Error)
			      end,
	    NAppl = Appl#appl{id = Id},
	    NAppls = keyreplaceadd(AppName, #appl.name, Appls, NAppl),
	    {ok, NewS} = req_start_app(S#state{appls = NAppls}, AppName),
	    TmpLocals = keydelete_all(AppName, 1, NewS#state.tmp_locals),
	    TmpWeights = keydelete_all(AppName, 1, NewS#state.tmp_weights),
	    RStarted = keydelete(AppName, 2, S#state.remote_started),
	    Started = replaceadd(AppName, NewS#state.started),
	    {ok,
	     NewS#state{started = Started, tmp_locals = TmpLocals,
			tmp_weights = TmpWeights, remote_started = RStarted},
	     IsWaiting}
    end.


start_distributed(Appl, Name, Nodes, PermittedNodes, S, Type) ->
    case find_start_node(Nodes, PermittedNodes, Name, S) of
	{ok, Node} when Node =:= node() ->
	    _ = case Appl#appl.id of
		    {failover, FoNode} when Type =:= req ->
			ac_failover(Name, FoNode, undefined);
		    {distributed, Node2} when Type =:= req ->
			ac_takeover(req, Name, Node2, undefined);
		    _ when Type =:= reply ->
			case lists:keysearch(Name, 2, S#state.remote_started) of
			    {value, {Node3, _}} ->
				ac_takeover(reply, Name, Node3, undefined);
			    _ ->
				ac_start_it(Type, Name)
			end;
		    _ ->
			ac_start_it(Type, Name)
		end,
	    {run_waiting, true};
	{already_started, Node} ->
	    _ = ac_started(Type, Name, Node),
	    {{distributed, Node}, false};
	{ok, Node} ->
	    case keysearch(Name, #appl.name, S#state.appls) of
		{value, #appl{id = {distributed, Node}}} ->
		    _ = ac_started(Type, Name, Node),
		    {{distributed, Node}, false};
		_ ->
		    wait_dist_start(Node, Appl, Name, Nodes,
				    PermittedNodes, S, Type)
	    end;
	not_started ->
	    wait_dist_start2(Appl, Name, Nodes, PermittedNodes, S, Type);
	no_permission ->
	    _ = ac_not_started(Type, Name),
	    {undefined, false}
    end.

wait_dist_start(Node, Appl, Name, Nodes, PermittedNodes, S, Type) ->
    monitor_node(Node, true),
    receive
	{dist_ac_app_started, Node, Name, ok} ->
	    _ = ac_started(Type, Name, Node),
	    monitor_node(Node, false),
	    {{distributed, Node}, false};
	{dist_ac_app_started, Node, Name, {error, R}} ->
	    _ = ac_error(Type, Name, {Node, R}),
	    monitor_node(Node, false),
	    {Appl#appl.id, false};
	{dist_ac_weight, Name, _Weigth, Node} ->
	    %% This is the situation: {Name, [RNode, {Node}, node()]}
	    %% and permit(false) is called on RNode, and we sent the
	    %% weigth first.  Node handled it in handle_info, and
	    %% now we must send him a weigth msg.  We can use any weigth;
	    %% he wins anyway.
	    monitor_node(Node, false),
	    {?DIST_AC, Node} !
		{dist_ac_weight, Name, get_cached_weight(Name, S), node()},
	    wait_dist_start(Node, Appl, Name, Nodes, PermittedNodes, S, Type);
	{nodedown, Node} ->
	    monitor_node(Node, false),
	    TmpLocals =
		filter(fun({Name2, _Weight, Node2}) when Node2 =:= Node,
							 Name2 =:= Name -> false;
			  (_) -> true
		       end,
		       S#state.tmp_locals),
	    NewS = S#state{tmp_locals = TmpLocals},
	    start_distributed(Appl, Name, Nodes, 
			      lists:delete(Node, PermittedNodes), NewS, Type)
    end.

wait_dist_start2(Appl, Name, Nodes, PermittedNodes, S, Type) ->
    receive
	{dist_ac_app_started, Node, Name, ok} ->
	    _ = ac_started(Type, Name, Node),
	    {{distributed, Node}, false};
	{dist_ac_app_started, Node, Name, {error, R}} ->
	    _ = ac_error(Type, Name, {Node, R}),
	    {Appl#appl.id, false};
	{nodedown, Node} ->
	    %% A node went down, try to start the app again - there may not
	    %% be any more nodes to wait for.
	    TmpLocals =
		filter(fun({Name2, _Weight, Node2}) when Node2 =:= Node,
							 Name2 =:= Name -> false;
			  (_) -> true
		       end,
		       S#state.tmp_locals),
	    NewS = S#state{tmp_locals = TmpLocals},
	    start_distributed(Appl, Name, Nodes, 
			      lists:delete(Node, PermittedNodes), NewS, Type)
    end.


ac_start_it(reply, Name) ->
    ?AC ! {ac_start_application_reply, Name, start_it};
ac_start_it(req, Name) ->
    ?AC ! {ac_change_application_req, Name, start_it}.

ac_started(reply, Name, Node) ->
    ?AC ! {ac_start_application_reply, Name, {started, Node}};
ac_started(req, Name, Node) ->
    ?AC ! {ac_change_application_req, Name, {started, Node}}.

ac_error(reply, Name, Error) ->
    ?AC ! {ac_start_application_reply, Name, {error, Error}};
ac_error(req, _Name, _Error) ->
    ok.

ac_not_started(reply, Name) ->
    ?AC ! {ac_start_application_reply, Name, not_started};
ac_not_started(req, Name) ->
    ?AC ! {ac_change_application_req, Name, stop_it}.

ac_stop_it(Name) ->
  ?AC ! {ac_change_application_req, Name, stop_it}.

ac_takeover(reply, Name, Node, _RestartType) ->
    ?AC ! {ac_start_application_reply, Name, {takeover, Node}};
ac_takeover(req, Name, Node, RestartType) ->
    ?AC ! {ac_change_application_req, Name, 
	   {takeover, Node, RestartType}}.

ac_failover(Name, Node, RestartType) ->
    ?AC ! {ac_change_application_req, Name, 
	   {failover, Node, RestartType}}.

ac_not_running(Name) ->
    ?AC ! {ac_change_application_req, Name, not_running}.

restart_appls(Appls) ->
    foreach(fun(Appl) ->
		    AppName = Appl#appl.name,
		    send_after(Appl#appl.restart_time,
			       {internal_restart_appl, AppName})
	    end, lists:reverse(Appls)).

restart_appl(AppName, S) ->
    case keysearch(AppName, #appl.name, S#state.appls) of
	{value, Appl} when element(1, Appl#appl.id) =:= failover ->
	    case catch start_appl(AppName, S, req) of
		{ok, NewS, _} ->
		    NewS;
		{error, R}  ->
		    error_msg("Error when restarting application ~p: ~p~n",
			      [AppName, R]),
		    S
	    end;
	_ ->
	    S
    end.
    
%% permit(ShouldBeRunning, IsRunning, ...)
permit(false, {value, #appl{id = undefined}}, _AppName, _From, S, _LockId) ->
   {reply, ok, S}; % It's not running
permit(false, {value, #appl{id = Id}}, _AppName, _From, S, _LockId)
  when element(1, Id) =:= distributed ->
    %% It is running at another node already
    {reply, ok, S};
permit(false, {value, _}, AppName, From, S, _LockId) ->
    %% It is a distributed application
    %% Check if there is any runnable node
    case dist_get_runnable_nodes(S#state.appls, AppName) of
	[] ->
	    %% There is no runnable node; stop application
	    _ = ac_stop_it(AppName),
	    SReqs = [{AppName, From} | S#state.s_reqs],
	    {noreply, S#state{s_reqs = SReqs}};
	Nodes ->
	    %% Delete all outstanding 'permit true' requests.
	    PR = req_del_permit_true(S#state.p_reqs, AppName), 
	    NPReqs = [{From, AppName, false, Nodes} | PR],
	    {noreply, S#state{p_reqs = NPReqs}}
    end;
permit(true, {value, #appl{id = local}}, _AppName, _From, S, _LockId) ->
    {reply, ok, S};
permit(true, _, AppName, From, S, LockId) ->
    case catch start_appl(AppName, S, req) of
	{_ErrorTag, {not_running, App}} ->
	    %% Delete all outstanding 'permit false' requests
	    PR = req_del_permit_false(S#state.p_reqs, AppName),
	    NPReqs = [{false, AppName, true, App} | PR],
	    {reply, ok, S#state{p_reqs = NPReqs}};
	{ok, NewS, true} ->
	    %% We have ordered a start or a takeover; we must not return
	    %% until the app is running.
	    TR = NewS#state.t_reqs,
	    %% Delete the lock, so others may start the app
	    global:del_lock(LockId),
	    {noreply, NewS#state{t_reqs = [{AppName, From} | TR]}};
	{ok, _S, false} ->
	    %% Application should be started, but at another node
	    %% State remains the same
	    {reply, ok, S};
	{_ErrorTag, R} ->
	    {stop, R, {error, R}, S}
    end.

do_start_appls(StartApps, S) ->
    SortedStartApps = StartApps,
    Appls = S#state.appls,
    {ok, foldl(
      fun(AppName, NewS) ->
	      case catch start_appl(AppName, NewS, req) of
		  {error, R}  ->
		      throw({{error, NewS}, R});
		  {ok, NewS2, _} ->
		      NewS2
	      end
      end, S#state{appls = Appls}, lists:reverse(SortedStartApps))}.

%%-----------------------------------------------------------------
%% Nodes = [node() | {node(), ..., node()}]
%% A list in priority order.  If it is a tuple, we may pick any of
%% them.  This decision is made by all nodes in the list, and all
%% nodes choose the same.  This is accomplished in the following
%% way:  all Nodes send to all others a msg which tells how many
%% applications each node has started.  The one with least no of
%% appls starts this one.
%%-----------------------------------------------------------------
find_start_node(Nodes, PermittedNodes, Name, S) ->
    AllNodes = intersection(flat_nodes(Nodes), PermittedNodes),
    case lists:member(node(), AllNodes) of
	true ->
	    Weight = get_cached_weight(Name, S),
	    find_start_node(Nodes, Name, S, Weight, AllNodes);
	false ->
	    case keysearch(Name, 2, S#state.remote_started) of
		{value, {Node, _Name}} ->
		    {already_started, Node};
		_ when AllNodes =/= [] ->
		    not_started;
		_ ->
		    no_permission
	    end
    end.

find_start_node([AnyNodes | Nodes], Name, S, Weight, AllNodes)
  when is_tuple(AnyNodes) ->
    case find_any_node(tuple_to_list(AnyNodes), Name, S, Weight, AllNodes) of
	false -> find_start_node(Nodes, Name, S, Weight, AllNodes);
	Res -> Res
    end;
find_start_node([Node | Nodes], Name, S, Weight, AllNodes) ->
    case lists:member(Node, AllNodes) of
	true ->
	    case keysearch(Name, #appl.name, S#state.appls) of
		{value, #appl{id = {distributed, Node}}} ->
		    {already_started, Node};
		_ ->
		    case keysearch(Name, 2, S#state.remote_started) of
			{value, {Node, _Name}} ->
			    {already_started, Node};
			_ ->
			    {ok, Node}
		    end
	    end;
	false -> find_start_node(Nodes, Name, S, Weight, AllNodes)
    end;
find_start_node([], _Name, _S, _Weight, _AllNodes) ->
    not_started.

%%-----------------------------------------------------------------
%% First of all, check if the application is already running
%% somewhere in AnyNodes; in that case we shall not move it!
%%-----------------------------------------------------------------
find_any_node(AnyNodes, Name, S, Weight, AllNodes) ->
    case check_running(Name, S, intersection(AnyNodes, AllNodes)) of
	{already_started, Node} -> {already_started, Node};
	false ->
	    %% Synchronize with all other nodes.
	    send_nodes(AllNodes, {dist_ac_weight, Name, Weight, node()}),
	    Answers = [{Weight, node()} |
		       collect_answers(AllNodes, Name, S, [])],
	    %% Make a decision (the same at every node) (smallest weight wins)
	    find_alive_node(lists:sort(Answers), 
			    intersection(AnyNodes, S#state.known))
    end.

%%-----------------------------------------------------------------
%% Check if another node started the appl before we got alive.
%% If so, check if the node is one of AnyNodes.
%%-----------------------------------------------------------------
check_running(Name, #state{remote_started = RStarted,
			   appls = Appls}, AnyNodes) ->
    case keysearch(Name, 2, RStarted) of
	{value, {Node, _Name}} ->
	    case lists:member(Node, AnyNodes) of
		true -> {already_started, Node};
		false -> false
	    end;
	false ->
	    case keysearch(Name, #appl.name, Appls) of
		{value, #appl{id = {distributed, Node}}} ->
		    case lists:member(Node, AnyNodes) of
			true -> {already_started, Node};
			false -> false
		    end;
		_ ->
		    false
	    end
    end.

find_alive_node([{_, Node} | Nodes], AliveNodes) ->
    case lists:member(Node, AliveNodes) of
	true -> {ok, Node};
	false -> find_alive_node(Nodes, AliveNodes)
    end;
find_alive_node([], _AliveNodes) ->
    false.

%%-----------------------------------------------------------------
%% First, check if the node's msg is buffered (received in our
%% main loop).  Otherwise, wait for msg or nodedown.
%% We have sent the dist_ac_weight message, and will wait for it
%% to be received here (or a nodedown).  This implies that a 
%% dist_ac must *always* be prepared to get this messages, and to
%% send it to us.
%%-----------------------------------------------------------------
collect_answers([Node | Nodes], Name, S, Res) when Node =/= node() ->
    case keysearch(Node, 3, S#state.tmp_locals) of
	{value, {Name, Weight, Node}} ->
	    collect_answers(Nodes, Name, S, [{Weight, Node} | Res]);
	_ ->
	    monitor_node(Node, true),
	    receive
		{dist_ac_weight, Name, Weight, Node} ->
		    monitor_node(Node, false),
		    collect_answers(Nodes, Name, S, [{Weight, Node} | Res]);
		{nodedown, Node} ->
		    monitor_node(Node, false),
		    collect_answers(Nodes, Name, S, Res)
	    end
    end;
collect_answers([_ThisNode | Nodes], Name, S, Res) ->
    collect_answers(Nodes, Name, S, Res);
collect_answers([], _Name, _S, Res) ->
    Res.

send_nodes(Nodes, Msg) ->
    FlatNodes = flat_nodes(Nodes),
    foreach(fun(Node) when Node =/= node() -> {?DIST_AC, Node} ! Msg;
	       (_ThisNode) -> ok
	    end, FlatNodes).

send_after(Time, Msg) when is_integer(Time), Time >= 0 ->
    _Pid = spawn_link(?MODULE, send_timeout, [self(), Time, Msg]),
    ok;
send_after(_,_) -> % infinity
    ok.

send_timeout(To, Time, Msg) ->
    receive
    after Time -> To ! Msg
    end.

send_msg(Msg, Nodes) ->
    foreach(fun(Node) when Node =/= node() -> {?DIST_AC, Node} ! Msg;
	       (_) -> ok
	    end, Nodes).

replaceadd(Item, List) ->
    case member(Item, List) of
	true -> List;
	false -> [Item | List]
    end.

keyreplaceadd(Key, Pos, List, New) ->
    case lists:keymember(Key, Pos, List) of
	true -> lists:keyreplace(Key, Pos, List, New);
	false -> [New | List]
    end.

keydelete_all(Key, N, [H|T]) when element(N, H) =:= Key ->
    keydelete_all(Key, N, T);
keydelete_all(Key, N, [H|T]) ->
    [H|keydelete_all(Key, N, T)];
keydelete_all(_Key, _N, []) -> [].

-ifdef(NOTUSED).
keysearchdelete(Key, Pos, List) ->
    ksd(Key, Pos, List, []).

ksd(Key, Pos, [H | T], Rest) when element(Pos, H) =:= Key ->
    {value, H, Rest ++ T};
ksd(Key, Pos, [H | T], Rest) ->
    ksd(Key, Pos, T, [H | Rest]);
ksd(_Key, _Pos, [], _Rest) ->
    false.
    
get_new_appl(Name, [{application, Name, App} | _]) ->
    {ok, {application, Name, App}};
get_new_appl(Name, [_ | T]) -> get_new_appl(Name, T);
get_new_appl(Name, []) -> false.
-endif.

equal_nodes([H | T1], [H | T2]) when is_atom(H) ->
    equal_nodes(T1, T2);
equal_nodes([H1 | T1], [H2 | T2]) when is_tuple(H1), is_tuple(H2) ->
    case equal(tuple_to_list(H1), tuple_to_list(H2)) of
	true -> equal_nodes(T1, T2);
	false -> false
    end;
equal_nodes([], []) -> true;
equal_nodes(_, _) -> false.

equal([H | T] , S) ->
    case lists:member(H, S) of
	true -> equal(T, lists:delete(H, S));
	false -> false
    end;
equal([], []) -> true;
equal(_, _) -> false.

flat_nodes(Nodes) when is_list(Nodes) ->
    foldl(fun(Node, Res) when is_atom(Node) -> [Node | Res];
	     (Tuple, Res) when is_tuple(Tuple) -> tuple_to_list(Tuple) ++ Res
	  end, [], Nodes);
flat_nodes(Nodes) ->
    throw({error, {badarg, Nodes}}).

get_cached_weight(Name, S) ->
    case lists:keysearch(Name, 1, S#state.tmp_weights) of
	{value, {_, W}} -> W;
	_ -> get_weight()
    end.

%% Simple weight; just count the number of applications running.
get_weight() ->
    length(application:which_applications()).

get_dist_loaded(Name, [{{Name, Node}, HisNodes, Permission} | T]) ->
    [{Node, HisNodes, Permission} | get_dist_loaded(Name, T)];
get_dist_loaded(Name, [_H | T]) ->
    get_dist_loaded(Name, T);
get_dist_loaded(_Name, []) ->
    [].

del_dist_loaded(Name, [{{Name, _Node}, _HisNodes, _Permission} | T]) ->
    del_dist_loaded(Name, T);
del_dist_loaded(Name, [H | T]) ->
    [H | del_dist_loaded(Name, T)];
del_dist_loaded(_Name, []) ->
    [].

req_start_app(State, Name) ->
    {ok, foldl(
	   fun({false, AppName, true, Name2}, S) when Name =:= Name2 ->
		   PR = keydelete(AppName, 2, S#state.p_reqs),
		   NS = S#state{p_reqs = PR},
		   case catch do_start_appls([AppName], NS) of
		       {_ErrorTag, {not_running, App}} ->
			   NRequests = [{false, AppName, true, App} | PR],
			   S#state{p_reqs = NRequests};
		       {ok, NewS} ->
			   NewS;
		       {_ErrorTag, R} ->
			   throw({error, R})
		   end;
	      (_, S) ->
		   S
	   end, State, State#state.p_reqs)}.


req_del_permit_true(Reqs, Name) ->
    filter(fun({From, Name2, true, _}) when Name2 =:= Name ->
		   gen_server:reply(From, ok),
		   false;
	      (_) ->
		   true
	   end, Reqs).

req_del_permit_false(Reqs, Name) ->
    filter(fun({From, Name2, false, _Nodes}) when Name2 =:= Name ->
		   gen_server:reply(From, ok),
		   false;
	      (_) ->
		   true
	   end, Reqs).
    
req_del_node(S, Node, Appls) ->
    check_waiting(S#state.p_reqs, S, Node, Appls, [], S#state.s_reqs).

del_t_reqs(AppName, TReqs, Res) ->
    lists:filter(fun({AN, From}) when AppName =:= AN ->
			 gen_server:reply(From, Res),
			 false;
		    (_) ->
			 true
		 end,
		 TReqs).


check_waiting([{From, AppName, false, Nodes} | Reqs],
	      S, Node, Appls, Res, SReqs) ->
    case lists:delete(Node, Nodes) of
	[] ->
	    _ = ac_stop_it(AppName),
	    NSReqs = [{AppName, From} | SReqs],
	    check_waiting(Reqs, Node, S, Appls, Res, NSReqs);
	NNodes ->
	    check_waiting(Reqs, Node, S, Appls,
			  [{From, AppName, false, NNodes} | Res], SReqs)
    end;
check_waiting([H | Reqs], S, Node, Appls, Res, SReqs) ->
    check_waiting(Reqs, Node, S, Appls, [H | Res], SReqs);
check_waiting([], _Node, _S, Appls, Res, SReqs) ->
    {Res, Appls, SReqs}.
	    
intersection([], _) -> 
    [];
intersection(_, []) -> 
    [];
intersection(L1, L2) ->
    L1 -- (L1 -- L2).
    
get_default_permission(AppName) ->
    case application:get_env(kernel, permissions) of
	{ok, Permissions} ->
	    case keysearch(AppName, 1, Permissions) of
		{value, {_, true}} ->  true;
		{value, {_, false}} -> false;
		{value, {_, X}} -> exit({bad_permission, {AppName, X}});
		false -> true
	    end;
	undefined -> true
    end.
		     
%%-----------------------------------------------------------------
%% ADT dist() - info on how an application is distributed
%% dist() = [{AppName, Time, DistNodes, [{Node, Runnable}]}]
%% Time = int() >= 0 | infinity
%% Nodes = [node() | {node()...}]
%% Runnable = true | false | undefined
%%       An appl may not be started if any Runnable is undefined;
%%       i.e. the appl must be loaded on all Nodes.
%%-----------------------------------------------------------------
dist_check([{AppName, Nodes} | T]) ->
    P = get_default_permission(AppName),
    [#appl{name = AppName, nodes = Nodes, run = [{node(), P}]} | dist_check(T)];
dist_check([{AppName, Time, Nodes} | T]) when is_integer(Time), Time >= 0 ->
    P = get_default_permission(AppName),
    [#appl{name = AppName, restart_time = Time, nodes = Nodes,
	   run = [{node(), P}]} | dist_check(T)];
dist_check([{AppName, infinity, Nodes} | T]) ->
    P = get_default_permission(AppName),
    [#appl{name = AppName, restart_time = infinity,
	   nodes = Nodes, run = [{node(), P}]} |
     dist_check(T)];
dist_check([_ | T]) ->
    dist_check(T);
dist_check([]) ->
    [].

dist_take_control(Appls) ->
    foreach(fun(#appl{name = AppName}) ->
		    application_controller:control_application(AppName)
	    end, Appls).

dist_replace(default, _Name, Appls) -> Appls;
dist_replace({AppName, Nodes}, AppName, Appls) ->
    Run = [{Node, undefined} || Node <- flat_nodes(Nodes)],
    keyreplaceadd(AppName, #appl.name, Appls,
		  #appl{name = AppName, restart_time = 0,
			nodes = Nodes, run = Run});
dist_replace({AppName, Time, Nodes}, AppName, Appls)
  when is_integer(Time), Time >= 0 ->
    Run = [{Node, undefined} || Node <- flat_nodes(Nodes)],
    keyreplaceadd(AppName, #appl.name, Appls,
		  #appl{name = AppName, restart_time = Time,
			nodes = Nodes, run = Run});
dist_replace(Bad, _Name, _Appls) ->
    throw({error, {bad_distribution_spec, Bad}}).

dist_update_run(Appls, AppName, Node, Permission) ->
    map(fun(Appl) when Appl#appl.name =:= AppName ->
		Run = Appl#appl.run,
		NRun = keyreplaceadd(Node, 1, Run, {Node, Permission}),
		Appl#appl{run = NRun};
	   (Appl) ->
		Appl
	end, Appls).



dist_change_update(Appls, []) ->
    Appls;
dist_change_update(Appls, [{AppName, NewNodes} | NewDist]) ->
    NewAppls = do_dist_change_update(Appls, AppName, 0, NewNodes),
    dist_change_update(NewAppls, NewDist);
dist_change_update(Appls, [{AppName, NewTime, NewNodes} | NewDist]) ->
    NewAppls = do_dist_change_update(Appls, AppName, NewTime, NewNodes),
    dist_change_update(NewAppls, NewDist).

do_dist_change_update(Appls, AppName, NewTime, NewNodes) ->
    map(fun(Appl) when Appl#appl.name =:= AppName ->
		Appl#appl{restart_time = NewTime, nodes = NewNodes};
	   (Appl) ->
		Appl
	end, Appls).

%% Merge his Permissions with mine.
dist_merge(MyAppls, HisAppls, HisNode) ->
    zf(fun(Appl) ->
	       #appl{name = AppName, run = Run} = Appl,
%	       #appl{name = AppName, nodes = Nodes, run = Run} = Appl,
%	       HeIsMember = lists:member(HisNode, flat_nodes(Nodes)),
	       HeIsMember = true,
	       case keysearch(AppName, #appl.name, HisAppls) of
		   {value, #appl{run = HisRun}} when HeIsMember ->
		       case keysearch(HisNode, 1, HisRun) of
			   {value, Val} -> % He has it loaded
			       NRun = keyreplaceadd(HisNode, 1, Run, Val),
			       {true, Appl#appl{run = NRun}};
			   false -> % He hasn't loaded it yet
			       Val = {HisNode, undefined},
			       {true, Appl#appl{run = [Val | Run]}}
		       end;
		   _ ->
		       true
	       end
       end, MyAppls).

dist_get_runnable_nodes(Appls, AppName) ->
    case keysearch(AppName, #appl.name, Appls) of
	{value, #appl{run = Run}} ->
	    zf(fun({Node, true}) -> {true, Node};
		  (_) -> false
	       end, Run);
	false ->
	    []
    end.

dist_is_runnable(Appls, AppName) ->
    case keysearch(AppName, #appl.name, Appls) of
	{value, #appl{run = Run}} ->
	    case keysearch(node(), 1, Run) of
		{value, {_, true}} -> true;
		_ -> false
	    end;
	false ->
	    false
    end.

is_loaded(AppName, #state{appls = Appls}) ->
    case keysearch(AppName, #appl.name, Appls) of
	{value, #appl{run = Run}} ->
	    case keysearch(node(), 1, Run) of
		{value, {_Node, undefined}} -> false;
		{value, _} -> true;
		false -> false
	    end;
	false ->
	    false
    end.

dist_get_runnable(Appls) ->
    zf(fun(#appl{name = AppName, run = Run}) ->
	       case keysearch(node(), 1, Run) of
		   {value, {_, true}} -> {true, AppName};
		   _ -> false
	       end
       end, Appls).
	       
dist_get_all_nodes(#appl{name = AppName, nodes = Nodes, run = Run}) ->
    {Res, BadNodes} = check_nodes(Run, [], []),
    case intersection(BadNodes, erlang:nodes(connected)) of
        [] -> {ok, Nodes, Res};
        _ -> {error, {app_not_loaded, AppName, BadNodes}}
    end.

check_nodes([{Node, undefined} | T], Res, BadNodes) ->
    check_nodes(T, Res, [Node | BadNodes]);
check_nodes([{Node, true} | T], Res, BadNodes) ->
    check_nodes(T, [Node | Res], BadNodes);
check_nodes([{_Node, false} | T], Res, BadNodes) ->
    check_nodes(T, Res, BadNodes);
check_nodes([], Res, BadNodes) ->
    {Res, BadNodes}.

-ifdef(NOTUSED).
dist_find_time([#appl{name = Name, restart_time = Time} |_], Name) -> Time;
dist_find_time([_ | T], Name) -> dist_find_time(T, Name);
dist_find_time([], Name) -> 0.
-endif.

%% Find all nodes that can run the app (even if they're not permitted
%% to right now).
dist_find_nodes([#appl{name = Name, nodes = Nodes} |_], Name) -> Nodes;
dist_find_nodes([_ | T], Name) -> dist_find_nodes(T, Name);
dist_find_nodes([], _Name) -> [].

dist_flat_nodes(Appls, Name) ->
    flat_nodes(dist_find_nodes(Appls, Name)).

dist_del_node(Appls, Node) ->
    map(fun(Appl) ->
		NRun = filter(fun({N, _Runnable}) when N =:= Node -> false;
				 (_) -> true
			      end, Appl#appl.run),
		Appl#appl{run = NRun}
	end, Appls).

validRestartType(permanent)   -> true;
validRestartType(temporary)   -> true;
validRestartType(transient)   -> true;
validRestartType(_RestartType) -> false.

dist_mismatch(AppName, Node) ->
    error_msg("Distribution mismatch for application \"~p\" on nodes ~p and ~p~n",
	      [AppName, node(), Node]),
    exit({distribution_mismatch, AppName, Node}).

%error_msg(Format) when is_list(Format) ->
%    error_msg(Format, []).

error_msg(Format, ArgList) when is_list(Format), is_list(ArgList) ->
    error_logger:error_msg("dist_ac on node ~p:~n" ++ Format, [node()|ArgList]).

%info_msg(Format) when is_list(Format) ->
%    info_msg(Format, []).

%info_msg(Format, ArgList) when is_list(Format), is_list(ArgList) ->
%    error_logger:info_msg("dist_ac on node ~p:~n" ++ Format, [node()|ArgList]).