diff options
author | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
---|---|---|
committer | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
commit | 84adefa331c4159d432d22840663c38f155cd4c1 (patch) | |
tree | bff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/stdlib/src/pool.erl | |
download | otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2 otp-84adefa331c4159d432d22840663c38f155cd4c1.zip |
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/stdlib/src/pool.erl')
-rw-r--r-- | lib/stdlib/src/pool.erl | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/lib/stdlib/src/pool.erl b/lib/stdlib/src/pool.erl new file mode 100644 index 0000000000..7f5f23e26d --- /dev/null +++ b/lib/stdlib/src/pool.erl @@ -0,0 +1,212 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1996-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(pool). + +%% Supplies a computational pool of processors. +%% The chief user interface function here is get_node() +%% Which returns the name of the nodes in the pool +%% with the least load !!!! +%% This function is callable from any node including the master +%% That is part of the pool +%% nodes are scheduled on a per usgae basis and per load basis, +%% Whenever we use a node, we put at the end of the queue, and whenever +%% a node report a change in load, we insert it accordingly + +% User interface Exports ... +-export([start/1, + start/2, + stop/0, + get_nodes/0, + get_nodes_and_load/0, + get_node/0, + pspawn/3, + attach/1, + pspawn_link/3]). + +%% Internal Exports +-export([statistic_collector/0, + do_spawn/4, + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2]). + +%% User interface + +%% Start up using the .hosts.erlang file + +-spec start(atom()) -> [node()]. +start(Name) -> + start(Name,[]). + +-spec start(atom(), string()) -> [node()]. +start(Name, Args) when is_atom(Name) -> + gen_server:start({global, pool_master}, pool, [], []), + Hosts = net_adm:host_file(), + Nodes = start_nodes(Hosts, Name, Args), + lists:foreach(fun attach/1, Nodes), + Nodes. + +%% +%% Interface functions ... +%% +-spec get_nodes() -> [node()]. +get_nodes() -> + get_elements(2, get_nodes_and_load()). + +-spec attach(node()) -> 'already_attached' | 'attached'. +attach(Node) -> + gen_server:call({global, pool_master}, {attach, Node}). + +get_nodes_and_load() -> + gen_server:call({global, pool_master}, get_nodes). + +-spec get_node() -> node(). +get_node() -> + gen_server:call({global, pool_master}, get_node). + +-spec pspawn(module(), atom(), [term()]) -> pid(). +pspawn(M, F, A) -> + gen_server:call({global, pool_master}, {spawn, group_leader(), M, F, A}). + +-spec pspawn_link(module(), atom(), [term()]) -> pid(). +pspawn_link(M, F, A) -> + P = pspawn(M, F, A), + link(P), + P. + +start_nodes([], _, _) -> []; +start_nodes([Host|Tail], Name, Args) -> + case slave:start(Host, Name, Args) of + {error, R} -> + io:format("Can't start node on host ~w due to ~w~n",[Host, R]), + start_nodes(Tail, Name, Args); + {ok, Node} -> + [Node | start_nodes(Tail, Name, Args)] + end. + +-spec stop() -> 'stopped'. +stop() -> + gen_server:call({global, pool_master}, stop). + +get_elements(_Pos,[]) -> []; +get_elements(Pos,[E|T]) -> [element(Pos,E) | get_elements(Pos,T)]. + +stop_em([]) -> stopped; +stop_em([N|Tail]) -> + rpc:cast(N, erlang, halt, []), + stop_em(Tail). + +init([]) -> + process_flag(trap_exit, true), + spawn_link(pool, statistic_collector, []), + {ok,[{0,node()}]}. + +handle_call(get_nodes, _From, Nodes)-> + {reply, Nodes, Nodes}; +handle_call(get_node, _From, [{Load,N}|Tail]) -> + {reply, N, Tail++[{Load+1, N}]}; +handle_call({attach, Node}, _From, Nodes) -> + case lists:keymember(Node, 2, Nodes) of + true -> + {reply, already_attached, Nodes}; + false -> + erlang:monitor_node(Node, true), + spawn_link(Node, pool, statistic_collector, []), + {reply, attached, Nodes++[{999999,Node}]} + end; +handle_call({spawn, Gl, M, F, A}, _From, Nodes) -> + [{Load,N}|Tail] = Nodes, + Pid = spawn(N, pool, do_spawn, [Gl, M, F, A]), + {reply, Pid, Tail++[{Load+1, N}]}; +handle_call(stop, _From, Nodes) -> + %% clean up in terminate/2 + {stop, normal, stopped, Nodes}. + +handle_cast(_, Nodes) -> + {noreply, Nodes}. + +handle_info({Node,load,Load}, Nodes) -> + Nodes2 = insert_node({Load,Node}, Nodes), + {noreply, Nodes2}; +handle_info({nodedown, Node}, Nodes) -> + {noreply, lists:keydelete(Node, 2, Nodes)}; +handle_info(_, Nodes) -> %% The EXIT signals etc.etc + {noreply, Nodes}. + +terminate(_Reason, Nodes) -> + N = lists:delete(node(), get_elements(2, Nodes)), + stop_em(N), + ok. + +-spec do_spawn(pid(), module(), atom(), [term()]) -> term(). +do_spawn(Gl, M, F, A) -> + group_leader(Gl, self()), + apply(M, F, A). + +insert_node({Load,Node},[{L,Node}|Tail]) when Load > L -> + %% We have a raised load here + pure_insert({Load,Node},Tail); +insert_node({Load,Node},[{L,N}|Tail]) when Load =< L -> + %% Move forward in the list + T = lists:keydelete(Node,2,[{L,N}|Tail]), + [{Load,Node} | T]; +insert_node(Ln,[H|T]) -> + [H | insert_node(Ln,T)]; +insert_node(X,[]) -> % Can't happen + error_logger:error_msg("Pool_master: Bad node list X=~w\n", [X]), + exit(crash). + +pure_insert({Load,Node},[]) -> + [{Load,Node}]; +pure_insert({Load,Node},[{L,N}|Tail]) when Load < L -> + [{Load,Node}, {L,N} | Tail]; +pure_insert(L,[H|T]) -> [H|pure_insert(L,T)]. + +%% Really should not measure the contributions from +%% the back ground processes here .... which we do :-( +%% We don't have to monitor the master, since we're slaves anyway + +statistic_collector() -> + statistic_collector(5). + +statistic_collector(0) -> exit(normal); +statistic_collector(I) -> + sleep(300), + case global:whereis_name(pool_master) of + undefined -> + statistic_collector(I-1); + M -> + stat_loop(M, 999999) + end. + +%% Do not tell the master about our load if it has not changed + +stat_loop(M, Old) -> + sleep(2000), + case statistics(run_queue) of + Old -> + stat_loop(M, Old); + NewLoad -> + M ! {node(), load, NewLoad}, %% async + stat_loop(M, NewLoad) + end. + +sleep(I) -> receive after I -> ok end. |