aboutsummaryrefslogtreecommitdiffstats
path: root/lib/stdlib/src/pool.erl
diff options
context:
space:
mode:
authorErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
committerErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/stdlib/src/pool.erl
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/stdlib/src/pool.erl')
-rw-r--r--lib/stdlib/src/pool.erl212
1 files changed, 212 insertions, 0 deletions
diff --git a/lib/stdlib/src/pool.erl b/lib/stdlib/src/pool.erl
new file mode 100644
index 0000000000..7f5f23e26d
--- /dev/null
+++ b/lib/stdlib/src/pool.erl
@@ -0,0 +1,212 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(pool).
+
+%% Supplies a computational pool of processors.
+%% The chief user interface function here is get_node()
+%% Which returns the name of the nodes in the pool
+%% with the least load !!!!
+%% This function is callable from any node including the master
+%% That is part of the pool
+%% nodes are scheduled on a per usgae basis and per load basis,
+%% Whenever we use a node, we put at the end of the queue, and whenever
+%% a node report a change in load, we insert it accordingly
+
+% User interface Exports ...
+-export([start/1,
+ start/2,
+ stop/0,
+ get_nodes/0,
+ get_nodes_and_load/0,
+ get_node/0,
+ pspawn/3,
+ attach/1,
+ pspawn_link/3]).
+
+%% Internal Exports
+-export([statistic_collector/0,
+ do_spawn/4,
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2]).
+
+%% User interface
+
+%% Start up using the .hosts.erlang file
+
+-spec start(atom()) -> [node()].
+start(Name) ->
+ start(Name,[]).
+
+-spec start(atom(), string()) -> [node()].
+start(Name, Args) when is_atom(Name) ->
+ gen_server:start({global, pool_master}, pool, [], []),
+ Hosts = net_adm:host_file(),
+ Nodes = start_nodes(Hosts, Name, Args),
+ lists:foreach(fun attach/1, Nodes),
+ Nodes.
+
+%%
+%% Interface functions ...
+%%
+-spec get_nodes() -> [node()].
+get_nodes() ->
+ get_elements(2, get_nodes_and_load()).
+
+-spec attach(node()) -> 'already_attached' | 'attached'.
+attach(Node) ->
+ gen_server:call({global, pool_master}, {attach, Node}).
+
+get_nodes_and_load() ->
+ gen_server:call({global, pool_master}, get_nodes).
+
+-spec get_node() -> node().
+get_node() ->
+ gen_server:call({global, pool_master}, get_node).
+
+-spec pspawn(module(), atom(), [term()]) -> pid().
+pspawn(M, F, A) ->
+ gen_server:call({global, pool_master}, {spawn, group_leader(), M, F, A}).
+
+-spec pspawn_link(module(), atom(), [term()]) -> pid().
+pspawn_link(M, F, A) ->
+ P = pspawn(M, F, A),
+ link(P),
+ P.
+
+start_nodes([], _, _) -> [];
+start_nodes([Host|Tail], Name, Args) ->
+ case slave:start(Host, Name, Args) of
+ {error, R} ->
+ io:format("Can't start node on host ~w due to ~w~n",[Host, R]),
+ start_nodes(Tail, Name, Args);
+ {ok, Node} ->
+ [Node | start_nodes(Tail, Name, Args)]
+ end.
+
+-spec stop() -> 'stopped'.
+stop() ->
+ gen_server:call({global, pool_master}, stop).
+
+get_elements(_Pos,[]) -> [];
+get_elements(Pos,[E|T]) -> [element(Pos,E) | get_elements(Pos,T)].
+
+stop_em([]) -> stopped;
+stop_em([N|Tail]) ->
+ rpc:cast(N, erlang, halt, []),
+ stop_em(Tail).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ spawn_link(pool, statistic_collector, []),
+ {ok,[{0,node()}]}.
+
+handle_call(get_nodes, _From, Nodes)->
+ {reply, Nodes, Nodes};
+handle_call(get_node, _From, [{Load,N}|Tail]) ->
+ {reply, N, Tail++[{Load+1, N}]};
+handle_call({attach, Node}, _From, Nodes) ->
+ case lists:keymember(Node, 2, Nodes) of
+ true ->
+ {reply, already_attached, Nodes};
+ false ->
+ erlang:monitor_node(Node, true),
+ spawn_link(Node, pool, statistic_collector, []),
+ {reply, attached, Nodes++[{999999,Node}]}
+ end;
+handle_call({spawn, Gl, M, F, A}, _From, Nodes) ->
+ [{Load,N}|Tail] = Nodes,
+ Pid = spawn(N, pool, do_spawn, [Gl, M, F, A]),
+ {reply, Pid, Tail++[{Load+1, N}]};
+handle_call(stop, _From, Nodes) ->
+ %% clean up in terminate/2
+ {stop, normal, stopped, Nodes}.
+
+handle_cast(_, Nodes) ->
+ {noreply, Nodes}.
+
+handle_info({Node,load,Load}, Nodes) ->
+ Nodes2 = insert_node({Load,Node}, Nodes),
+ {noreply, Nodes2};
+handle_info({nodedown, Node}, Nodes) ->
+ {noreply, lists:keydelete(Node, 2, Nodes)};
+handle_info(_, Nodes) -> %% The EXIT signals etc.etc
+ {noreply, Nodes}.
+
+terminate(_Reason, Nodes) ->
+ N = lists:delete(node(), get_elements(2, Nodes)),
+ stop_em(N),
+ ok.
+
+-spec do_spawn(pid(), module(), atom(), [term()]) -> term().
+do_spawn(Gl, M, F, A) ->
+ group_leader(Gl, self()),
+ apply(M, F, A).
+
+insert_node({Load,Node},[{L,Node}|Tail]) when Load > L ->
+ %% We have a raised load here
+ pure_insert({Load,Node},Tail);
+insert_node({Load,Node},[{L,N}|Tail]) when Load =< L ->
+ %% Move forward in the list
+ T = lists:keydelete(Node,2,[{L,N}|Tail]),
+ [{Load,Node} | T];
+insert_node(Ln,[H|T]) ->
+ [H | insert_node(Ln,T)];
+insert_node(X,[]) -> % Can't happen
+ error_logger:error_msg("Pool_master: Bad node list X=~w\n", [X]),
+ exit(crash).
+
+pure_insert({Load,Node},[]) ->
+ [{Load,Node}];
+pure_insert({Load,Node},[{L,N}|Tail]) when Load < L ->
+ [{Load,Node}, {L,N} | Tail];
+pure_insert(L,[H|T]) -> [H|pure_insert(L,T)].
+
+%% Really should not measure the contributions from
+%% the back ground processes here .... which we do :-(
+%% We don't have to monitor the master, since we're slaves anyway
+
+statistic_collector() ->
+ statistic_collector(5).
+
+statistic_collector(0) -> exit(normal);
+statistic_collector(I) ->
+ sleep(300),
+ case global:whereis_name(pool_master) of
+ undefined ->
+ statistic_collector(I-1);
+ M ->
+ stat_loop(M, 999999)
+ end.
+
+%% Do not tell the master about our load if it has not changed
+
+stat_loop(M, Old) ->
+ sleep(2000),
+ case statistics(run_queue) of
+ Old ->
+ stat_loop(M, Old);
+ NewLoad ->
+ M ! {node(), load, NewLoad}, %% async
+ stat_loop(M, NewLoad)
+ end.
+
+sleep(I) -> receive after I -> ok end.