%%
%% %CopyrightBegin%
%% 
%% Copyright Ericsson AB 1996-2011. All Rights Reserved.
%% 
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%% 
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%% 
%% %CopyrightEnd%
%%
-module(pool).

%% Supplies a computational pool of processors.
%% The chief user interface function here is get_node()
%% Which returns the name of the nodes in the pool
%% with the least load !!!!
%% This function is callable from any node including the master
%% That is part of the pool
%% nodes are scheduled on a per usgae basis and per load basis,
%% Whenever we use a node, we put at the end of the queue, and whenever
%% a node report a change in load, we insert it accordingly

% User interface Exports ...
-export([start/1, 
	 start/2, 
	 stop/0,
	 get_nodes/0, 
	 get_nodes_and_load/0, 
	 get_node/0,
	 pspawn/3, 
	 attach/1,
	 pspawn_link/3]).

%% Internal Exports 
-export([statistic_collector/0,
	 do_spawn/4,
	 init/1,
	 handle_call/3,
	 handle_cast/2,
	 handle_info/2,
	 terminate/2]).
	 
%% User interface 

%% Start up using the .hosts.erlang file

-spec start(Name) -> Nodes when
      Name :: atom(),
      Nodes :: [node()].
start(Name) ->
    start(Name,[]).

-spec start(Name, Args) -> Nodes when
      Name :: atom(),
      Args :: string(),
      Nodes :: [node()].
start(Name, Args) when is_atom(Name) ->
    gen_server:start({global, pool_master}, pool, [], []),
    Hosts = net_adm:host_file(),
    Nodes = start_nodes(Hosts, Name, Args),
    lists:foreach(fun attach/1, Nodes),
    Nodes.

%%
%% Interface functions ...
%%
-spec get_nodes() -> [node()].
get_nodes() ->
    get_elements(2, get_nodes_and_load()).

-spec attach(Node) -> 'already_attached' | 'attached' when
      Node :: node().
attach(Node) ->
    gen_server:call({global, pool_master}, {attach, Node}).

get_nodes_and_load() ->
    gen_server:call({global, pool_master}, get_nodes).

-spec get_node() -> node().
get_node() ->
    gen_server:call({global, pool_master}, get_node).

-spec pspawn(Mod, Fun, Args) -> pid() when
      Mod :: module(),
      Fun :: atom(),
      Args :: [term()].
pspawn(M, F, A) ->
    gen_server:call({global, pool_master}, {spawn, group_leader(), M, F, A}).

-spec pspawn_link(Mod, Fun, Args) -> pid() when
      Mod :: module(),
      Fun :: atom(),
      Args :: [term()].
pspawn_link(M, F, A) ->
    P = pspawn(M, F, A),
    link(P),
    P.

start_nodes([], _, _) -> [];
start_nodes([Host|Tail], Name, Args) -> 
    case slave:start(Host, Name, Args) of 
	{error, {already_running, Node}} ->
	    io:format("Can't start node on host ~w due to ~w~n",[Host, {already_running, Node}]),
	    [Node | start_nodes(Tail, Name, Args)];
	{error, R} ->
	    io:format("Can't start node on host ~w due to ~w~n",[Host, R]),
	    start_nodes(Tail, Name, Args);
	{ok, Node} -> 
	    [Node | start_nodes(Tail, Name, Args)]
    end.

-spec stop() -> 'stopped'.
stop() ->
    gen_server:call({global, pool_master}, stop).

get_elements(_Pos,[]) -> [];
get_elements(Pos,[E|T]) -> [element(Pos,E) | get_elements(Pos,T)].

stop_em([]) -> stopped;
stop_em([N|Tail]) ->
    rpc:cast(N, erlang, halt, []),
    stop_em(Tail).

init([]) ->
    process_flag(trap_exit, true),
    spawn_link(pool, statistic_collector, []),
    {ok,[{0,node()}]}.

handle_call(get_nodes, _From, Nodes)->
    {reply, Nodes, Nodes};
handle_call(get_node, _From, [{Load,N}|Tail]) ->
    {reply, N, Tail++[{Load+1, N}]};
handle_call({attach, Node}, _From, Nodes) ->
    case lists:keymember(Node, 2, Nodes) of
	true ->
	    {reply, already_attached, Nodes};
	false ->
	    erlang:monitor_node(Node, true),
	    spawn_link(Node, pool, statistic_collector, []),
	    {reply, attached, Nodes++[{999999,Node}]}
    end;
handle_call({spawn, Gl, M, F, A}, _From, Nodes) ->
    [{Load,N}|Tail] = Nodes,
    Pid = spawn(N, pool, do_spawn, [Gl, M, F, A]),
    {reply, Pid, Tail++[{Load+1, N}]};
handle_call(stop, _From, Nodes) ->
    %% clean up in terminate/2
    {stop, normal, stopped, Nodes}.

handle_cast(_, Nodes) ->
    {noreply, Nodes}.

handle_info({Node,load,Load}, Nodes) ->
    Nodes2 = insert_node({Load,Node}, Nodes),
    {noreply, Nodes2};
handle_info({nodedown, Node}, Nodes) ->
    {noreply, lists:keydelete(Node, 2, Nodes)};
handle_info(_, Nodes) ->  %% The EXIT signals etc.etc
    {noreply, Nodes}.

terminate(_Reason, Nodes) ->
    N = lists:delete(node(), get_elements(2, Nodes)),
    stop_em(N),
    ok.

-spec do_spawn(pid(), module(), atom(), [term()]) -> term().
do_spawn(Gl, M, F, A) ->
    group_leader(Gl, self()),
    apply(M, F, A).

insert_node({Load,Node},[{L,Node}|Tail]) when Load > L ->
    %% We have a raised load here
    pure_insert({Load,Node},Tail);
insert_node({Load,Node},[{L,N}|Tail]) when Load =< L ->
    %% Move forward in the list
    T = lists:keydelete(Node,2,[{L,N}|Tail]),
    [{Load,Node} | T];
insert_node(Ln,[H|T]) ->
    [H | insert_node(Ln,T)];
insert_node(X,[]) ->          % Can't happen
    error_logger:error_msg("Pool_master: Bad node list X=~w\n", [X]),
    exit(crash).

pure_insert({Load,Node},[]) ->
    [{Load,Node}];
pure_insert({Load,Node},[{L,N}|Tail]) when Load < L ->
    [{Load,Node}, {L,N} | Tail];
pure_insert(L,[H|T]) -> [H|pure_insert(L,T)].

%% Really should not measure the contributions from
%% the back ground processes here .... which we do :-(
%% We don't have to monitor the master, since we're slaves anyway

statistic_collector() ->
    statistic_collector(5).

statistic_collector(0) -> exit(normal);
statistic_collector(I) ->
    sleep(300),  
    case global:whereis_name(pool_master) of
	undefined ->
	    statistic_collector(I-1);
	M ->
	    stat_loop(M, 999999)
    end.

%% Do not tell the master about our load if it has not  changed

stat_loop(M, Old) ->
    sleep(2000),
    case statistics(run_queue) of
	Old ->
	    stat_loop(M, Old);
	NewLoad ->
	    M ! {node(), load, NewLoad}, %% async 
	    stat_loop(M, NewLoad)
    end.

sleep(I) -> receive after I -> ok end.