aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel/src/rpc.erl
diff options
context:
space:
mode:
authorErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
committerErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/kernel/src/rpc.erl
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/kernel/src/rpc.erl')
-rw-r--r--lib/kernel/src/rpc.erl609
1 files changed, 609 insertions, 0 deletions
diff --git a/lib/kernel/src/rpc.erl b/lib/kernel/src/rpc.erl
new file mode 100644
index 0000000000..d69f2a12ad
--- /dev/null
+++ b/lib/kernel/src/rpc.erl
@@ -0,0 +1,609 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(rpc).
+
+%% General rpc, broadcast,multicall, promise and parallel evaluator
+%% facility
+
+%% This code used to reside in net.erl, but has now been moved to
+%% a searate module.
+
+-define(NAME, rex).
+
+-behaviour(gen_server).
+
+-export([start/0, start_link/0, stop/0,
+ call/4, call/5,
+ block_call/4, block_call/5,
+ server_call/4,
+ cast/4,
+ abcast/2,
+ abcast/3,
+ sbcast/2,
+ sbcast/3,
+ eval_everywhere/3,
+ eval_everywhere/4,
+ multi_server_call/2,
+ multi_server_call/3,
+ multicall/3,
+ multicall/4,
+ multicall/5,
+ async_call/4,
+ yield/1,
+ nb_yield/2,
+ nb_yield/1,
+ parallel_eval/1,
+ pmap/3, pinfo/1, pinfo/2]).
+
+%% Deprecated calls.
+-deprecated([{safe_multi_server_call,2},{safe_multi_server_call,3}]).
+-export([safe_multi_server_call/2,safe_multi_server_call/3]).
+
+%% gen_server exports
+-export([init/1,handle_call/3,handle_cast/2,handle_info/2,
+ terminate/2, code_change/3]).
+
+%% Internals
+-export([proxy_user_flush/0]).
+
+%%------------------------------------------------------------------------
+
+%% Remote execution and broadcasting facility
+
+start() ->
+ gen_server:start({local,?NAME},?MODULE,[],[]).
+
+start_link() ->
+ gen_server:start_link({local,?NAME},?MODULE,[],[]).
+
+stop() ->
+ stop(?NAME).
+
+stop(Rpc) ->
+ gen_server:call(Rpc, stop, infinity).
+
+-spec init([]) -> {'ok', gb_tree()}.
+init([]) ->
+ process_flag(trap_exit, true),
+ {ok, gb_trees:empty()}.
+
+handle_call({call, Mod, Fun, Args, Gleader}, To, S) ->
+ handle_call_call(Mod, Fun, Args, Gleader, To, S);
+handle_call({block_call, Mod, Fun, Args, Gleader}, _To, S) ->
+ MyGL = group_leader(),
+ set_group_leader(Gleader),
+ Reply =
+ case catch apply(Mod,Fun,Args) of
+ {'EXIT', _} = Exit ->
+ {badrpc, Exit};
+ Other ->
+ Other
+ end,
+ group_leader(MyGL, self()), % restore
+ {reply, Reply, S};
+handle_call(stop, _To, S) ->
+ {stop, normal, stopped, S};
+handle_call(_, _To, S) ->
+ {noreply, S}. % Ignore !
+
+
+handle_cast({cast, Mod, Fun, Args, Gleader}, S) ->
+ spawn(
+ fun() ->
+ set_group_leader(Gleader),
+ apply(Mod, Fun, Args)
+ end),
+ {noreply, S};
+handle_cast(_, S) ->
+ {noreply, S}. % Ignore !
+
+
+handle_info({'DOWN', _, process, Caller, Reason}, S) ->
+ case gb_trees:lookup(Caller, S) of
+ {value, To} ->
+ receive
+ {Caller, {reply, Reply}} ->
+ gen_server:reply(To, Reply)
+ after 0 ->
+ gen_server:reply(To, {badrpc, {'EXIT', Reason}})
+ end,
+ {noreply, gb_trees:delete(Caller, S)};
+ none ->
+ {noreply, S}
+ end;
+handle_info({Caller, {reply, Reply}}, S) ->
+ case gb_trees:lookup(Caller, S) of
+ {value, To} ->
+ receive
+ {'DOWN', _, process, Caller, _} ->
+ gen_server:reply(To, Reply),
+ {noreply, gb_trees:delete(Caller, S)}
+ end;
+ none ->
+ {noreply, S}
+ end;
+handle_info({From, {sbcast, Name, Msg}}, S) ->
+ case catch Name ! Msg of %% use catch to get the printout
+ {'EXIT', _} ->
+ From ! {?NAME, node(), {nonexisting_name, Name}};
+ _ ->
+ From ! {?NAME, node(), node()}
+ end,
+ {noreply,S};
+handle_info({From, {send, Name, Msg}}, S) ->
+ case catch Name ! {From, Msg} of %% use catch to get the printout
+ {'EXIT', _} ->
+ From ! {?NAME, node(), {nonexisting_name, Name}};
+ _ ->
+ ok %% It's up to Name to respond !!!!!
+ end,
+ {noreply,S};
+handle_info({From, {call,Mod,Fun,Args,Gleader}}, S) ->
+ %% Special for hidden C node's, uugh ...
+ handle_call_call(Mod, Fun, Args, Gleader, {From,?NAME}, S);
+handle_info(_, S) ->
+ {noreply,S}.
+
+terminate(_, _S) ->
+ ok.
+
+code_change(_, S, _) ->
+ {ok, S}.
+
+%%
+%% Auxiliary function to avoid a false dialyzer warning -- do not inline
+%%
+handle_call_call(Mod, Fun, Args, Gleader, To, S) ->
+ RpcServer = self(),
+ %% Spawn not to block the rpc server.
+ {Caller,_} =
+ erlang:spawn_monitor(
+ fun () ->
+ set_group_leader(Gleader),
+ Reply =
+ %% in case some sucker rex'es
+ %% something that throws
+ case catch apply(Mod, Fun, Args) of
+ {'EXIT', _} = Exit ->
+ {badrpc, Exit};
+ Result ->
+ Result
+ end,
+ RpcServer ! {self(), {reply, Reply}}
+ end),
+ {noreply, gb_trees:insert(Caller, To, S)}.
+
+
+%% RPC aid functions ....
+
+set_group_leader(Gleader) when is_pid(Gleader) ->
+ group_leader(Gleader, self());
+set_group_leader(user) ->
+ %% For example, hidden C nodes doesn't want any I/O.
+ Gleader = case whereis(user) of
+ Pid when is_pid(Pid) -> Pid;
+ undefined -> proxy_user()
+ end,
+ group_leader(Gleader, self()).
+
+
+%% The 'rex_proxy_user' process serve as group leader for early rpc's that
+%% may do IO before the real group leader 'user' has been started (OTP-7903).
+proxy_user() ->
+ case whereis(rex_proxy_user) of
+ Pid when is_pid(Pid) -> Pid;
+ undefined ->
+ Pid = spawn(fun()-> proxy_user_loop() end),
+ try register(rex_proxy_user,Pid) of
+ true -> Pid
+ catch error:_ -> % spawn race, kill and try again
+ exit(Pid,kill),
+ proxy_user()
+ end
+ end.
+
+proxy_user_loop() ->
+ %% Wait for the real 'user' to start
+ timer:sleep(200),
+ case whereis(user) of
+ Pid when is_pid(Pid) -> proxy_user_flush();
+ undefined -> proxy_user_loop()
+ end.
+
+proxy_user_flush() ->
+ %% Forward all received messages to 'user'
+ receive Msg ->
+ user ! Msg
+ after 10*1000 ->
+ %% Hibernate but live for ever, as it's not easy to know
+ %% when no more messages will arrive.
+ erlang:hibernate(?MODULE, proxy_user_flush, [])
+ end,
+ proxy_user_flush().
+
+
+%% THE rpc client interface
+
+-spec call(node(), atom(), atom(), [term()]) -> term().
+
+call(N,M,F,A) when node() =:= N -> %% Optimize local call
+ local_call(M, F, A);
+call(N,M,F,A) ->
+ do_call(N, {call,M,F,A,group_leader()}, infinity).
+
+-spec call(node(), atom(), atom(), [term()], timeout()) -> term().
+
+call(N,M,F,A,_Timeout) when node() =:= N -> %% Optimize local call
+ local_call(M,F,A);
+call(N,M,F,A,infinity) ->
+ do_call(N, {call,M,F,A,group_leader()}, infinity);
+call(N,M,F,A,Timeout) when is_integer(Timeout), Timeout >= 0 ->
+ do_call(N, {call,M,F,A,group_leader()}, Timeout).
+
+-spec block_call(node(), atom(), atom(), [term()]) -> term().
+
+block_call(N,M,F,A) when node() =:= N -> %% Optimize local call
+ local_call(M,F,A);
+block_call(N,M,F,A) ->
+ do_call(N, {block_call,M,F,A,group_leader()}, infinity).
+
+-spec block_call(node(), atom(), atom(), [term()], timeout()) -> term().
+
+block_call(N,M,F,A,_Timeout) when node() =:= N -> %% Optimize local call
+ local_call(M, F, A);
+block_call(N,M,F,A,infinity) ->
+ do_call(N, {block_call,M,F,A,group_leader()}, infinity);
+block_call(N,M,F,A,Timeout) when is_integer(Timeout), Timeout >= 0 ->
+ do_call(N, {block_call,M,F,A,group_leader()}, Timeout).
+
+local_call(M, F, A) when is_atom(M), is_atom(F), is_list(A) ->
+ case catch apply(M, F, A) of
+ {'EXIT',_}=V -> {badrpc, V};
+ Other -> Other
+ end.
+
+do_call(Node, Request, infinity) ->
+ rpc_check(catch gen_server:call({?NAME,Node}, Request, infinity));
+do_call(Node, Request, Timeout) ->
+ Tag = make_ref(),
+ {Receiver,Mref} =
+ erlang:spawn_monitor(
+ fun() ->
+ %% Middleman process. Should be unsensitive to regular
+ %% exit signals.
+ process_flag(trap_exit, true),
+ Result = gen_server:call({?NAME,Node}, Request, Timeout),
+ exit({self(),Tag,Result})
+ end),
+ receive
+ {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
+ rpc_check(Result);
+ {'DOWN',Mref,_,_,Reason} ->
+ %% The middleman code failed. Or someone did
+ %% exit(_, kill) on the middleman process => Reason==killed
+ rpc_check_t({'EXIT',Reason})
+ end.
+
+rpc_check_t({'EXIT', {timeout,_}}) -> {badrpc, timeout};
+rpc_check_t(X) -> rpc_check(X).
+
+rpc_check({'EXIT', {{nodedown,_},_}}) -> {badrpc, nodedown};
+rpc_check({'EXIT', X}) -> exit(X);
+rpc_check(X) -> X.
+
+
+%% This is a real handy function to be used when interacting with
+%% a server called Name at node Node, It is assumed that the server
+%% Receives messages on the form {From, Request} and replies on the
+%% form From ! {ReplyWrapper, Node, Reply}.
+%% This function makes such a server call and ensures that that
+%% The entire call is packed into an atomic transaction which
+%% either succeeds or fails, i.e. never hangs (unless the server itself hangs).
+
+-spec server_call(node(), atom(), term(), term()) -> term() | {'error', 'nodedown'}.
+
+server_call(Node, Name, ReplyWrapper, Msg)
+ when is_atom(Node), is_atom(Name) ->
+ if node() =:= nonode@nohost, Node =/= nonode@nohost ->
+ {error, nodedown};
+ true ->
+ Ref = erlang:monitor(process, {Name, Node}),
+ {Name, Node} ! {self(), Msg},
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ {error, nodedown};
+ {ReplyWrapper, Node, Reply} ->
+ erlang:demonitor(Ref),
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ Reply
+ after 0 ->
+ Reply
+ end
+ end
+ end.
+
+-spec cast(node(), atom(), atom(), [term()]) -> 'true'.
+
+cast(Node, Mod, Fun, Args) when Node =:= node() ->
+ catch spawn(Mod, Fun, Args),
+ true;
+cast(Node, Mod, Fun, Args) ->
+ gen_server:cast({?NAME,Node}, {cast,Mod,Fun,Args,group_leader()}),
+ true.
+
+
+%% Asynchronous broadcast, returns nothing, it's just send'n prey
+-spec abcast(atom(), term()) -> 'abcast'.
+
+abcast(Name, Mess) ->
+ abcast([node() | nodes()], Name, Mess).
+
+-spec abcast([node()], atom(), term()) -> 'abcast'.
+
+abcast([Node|Tail], Name, Mess) ->
+ Dest = {Name,Node},
+ case catch erlang:send(Dest, Mess, [noconnect]) of
+ noconnect -> spawn(erlang, send, [Dest,Mess]);
+ _ -> ok
+ end,
+ abcast(Tail, Name, Mess);
+abcast([], _,_) -> abcast.
+
+
+%% Syncronous broadcast, returns a list of the nodes which had Name
+%% as a registered server. Returns {Goodnodes, Badnodes}.
+%% Syncronous in the sense that we know that all servers have received the
+%% message when we return from the call, we can't know that they have
+%% processed the message though.
+
+-spec sbcast(atom(), term()) -> {[node()], [node()]}.
+
+sbcast(Name, Mess) ->
+ sbcast([node() | nodes()], Name, Mess).
+
+-spec sbcast([node()], atom(), term()) -> {[node()], [node()]}.
+
+sbcast(Nodes, Name, Mess) ->
+ Monitors = send_nodes(Nodes, ?NAME, {sbcast, Name, Mess}, []),
+ rec_nodes(?NAME, Monitors).
+
+-spec eval_everywhere(atom(), atom(), [term()]) -> 'abcast'.
+
+eval_everywhere(Mod, Fun, Args) ->
+ eval_everywhere([node() | nodes()] , Mod, Fun, Args).
+
+-spec eval_everywhere([node()], atom(), atom(), [term()]) -> 'abcast'.
+
+eval_everywhere(Nodes, Mod, Fun, Args) ->
+ gen_server:abcast(Nodes, ?NAME, {cast,Mod,Fun,Args,group_leader()}).
+
+
+send_nodes([Node|Tail], Name, Msg, Monitors) when is_atom(Node) ->
+ Monitor = start_monitor(Node, Name),
+ %% Handle non-existing names in rec_nodes.
+ catch {Name, Node} ! {self(), Msg},
+ send_nodes(Tail, Name, Msg, [Monitor | Monitors]);
+send_nodes([_Node|Tail], Name, Msg, Monitors) ->
+ %% Skip non-atom _Node
+ send_nodes(Tail, Name, Msg, Monitors);
+send_nodes([], _Name, _Req, Monitors) ->
+ Monitors.
+
+%% Starts a monitor, either the new way, or the old.
+%% Assumes that the arguments are atoms.
+start_monitor(Node, Name) ->
+ if node() =:= nonode@nohost, Node =/= nonode@nohost ->
+ Ref = make_ref(),
+ self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
+ {Node, Ref};
+ true ->
+ {Node,erlang:monitor(process, {Name, Node})}
+ end.
+
+%% Cancels a monitor started with Ref=erlang:monitor(_, _),
+%% i.e return value {Node, Ref} from start_monitor/2 above.
+unmonitor(Ref) when is_reference(Ref) ->
+ erlang:demonitor(Ref),
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ true
+ after 0 ->
+ true
+ end.
+
+
+%% Call apply(M,F,A) on all nodes in parallel
+-spec multicall(atom(), atom(), [term()]) -> {[_], [node()]}.
+
+multicall(M, F, A) ->
+ multicall(M, F, A, infinity).
+
+-spec multicall([node()], atom(), atom(), [term()]) -> {[_], [node()]}
+ ; (atom(), atom(), [term()], timeout()) -> {[_], [node()]}.
+
+multicall(Nodes, M, F, A) when is_list(Nodes) ->
+ multicall(Nodes, M, F, A, infinity);
+multicall(M, F, A, Timeout) ->
+ multicall([node() | nodes()], M, F, A, Timeout).
+
+-spec multicall([node()], atom(), atom(), [term()], timeout()) -> {[_], [node()]}.
+
+multicall(Nodes, M, F, A, infinity)
+ when is_list(Nodes), is_atom(M), is_atom(F), is_list(A) ->
+ do_multicall(Nodes, M, F, A, infinity);
+multicall(Nodes, M, F, A, Timeout)
+ when is_list(Nodes), is_atom(M), is_atom(F), is_list(A), is_integer(Timeout),
+ Timeout >= 0 ->
+ do_multicall(Nodes, M, F, A, Timeout).
+
+do_multicall(Nodes, M, F, A, Timeout) ->
+ {Rep,Bad} = gen_server:multi_call(Nodes, ?NAME,
+ {call, M,F,A, group_leader()},
+ Timeout),
+ {lists:map(fun({_,R}) -> R end, Rep), Bad}.
+
+
+%% Send Msg to Name on all nodes, and collect the answers.
+%% Return {Replies, Badnodes} where Badnodes is a list of the nodes
+%% that failed during the timespan of the call.
+%% This function assumes that if we send a request to a server
+%% called Name, the server will reply with a reply
+%% on the form {Name, Node, Reply}, otherwise this function will
+%% hang forever.
+%% It also assumes that the server receives messages on the form
+%% {From, Msg} and then replies as From ! {Name, node(), Reply}.
+%%
+%% There is no apparent order among the replies.
+
+-spec multi_server_call(atom(), term()) -> {[_], [node()]}.
+
+multi_server_call(Name, Msg) ->
+ multi_server_call([node() | nodes()], Name, Msg).
+
+-spec multi_server_call([node()], atom(), term()) -> {[_], [node()]}.
+
+multi_server_call(Nodes, Name, Msg)
+ when is_list(Nodes), is_atom(Name) ->
+ Monitors = send_nodes(Nodes, Name, Msg, []),
+ rec_nodes(Name, Monitors).
+
+%% Deprecated functions. Were only needed when communicating with R6 nodes.
+
+safe_multi_server_call(Name, Msg) ->
+ multi_server_call(Name, Msg).
+
+safe_multi_server_call(Nodes, Name, Msg) ->
+ multi_server_call(Nodes, Name, Msg).
+
+
+rec_nodes(Name, Nodes) ->
+ rec_nodes(Name, Nodes, [], []).
+
+rec_nodes(_Name, [], Badnodes, Replies) ->
+ {Replies, Badnodes};
+rec_nodes(Name, [{N,R} | Tail], Badnodes, Replies) ->
+ receive
+ {'DOWN', R, _, _, _} ->
+ rec_nodes(Name, Tail, [N|Badnodes], Replies);
+ {?NAME, N, {nonexisting_name, _}} ->
+ %% used by sbcast()
+ unmonitor(R),
+ rec_nodes(Name, Tail, [N|Badnodes], Replies);
+ {Name, N, Reply} -> %% Name is bound !!!
+ unmonitor(R),
+ rec_nodes(Name, Tail, Badnodes, [Reply|Replies])
+ end.
+
+%% Now for an asynchronous rpc.
+%% An asyncronous version of rpc that is faster for series of
+%% rpc's towards the same node. I.e. it returns immediately and
+%% it returns a Key that can be used in a subsequent yield(Key).
+
+-spec async_call(node(), atom(), atom(), [term()]) -> pid().
+
+async_call(Node, Mod, Fun, Args) ->
+ ReplyTo = self(),
+ spawn(
+ fun() ->
+ R = call(Node, Mod, Fun, Args), %% proper rpc
+ ReplyTo ! {self(), {promise_reply, R}} %% self() is key
+ end).
+
+-spec yield(pid()) -> term().
+
+yield(Key) when is_pid(Key) ->
+ {value,R} = do_yield(Key, infinity),
+ R.
+
+-spec nb_yield(pid(), timeout()) -> {'value', _} | 'timeout'.
+
+nb_yield(Key, infinity=Inf) when is_pid(Key) ->
+ do_yield(Key, Inf);
+nb_yield(Key, Timeout) when is_pid(Key), is_integer(Timeout), Timeout >= 0 ->
+ do_yield(Key, Timeout).
+
+-spec nb_yield(pid()) -> {'value', _} | 'timeout'.
+
+nb_yield(Key) when is_pid(Key) ->
+ do_yield(Key, 0).
+
+-spec do_yield(pid(), timeout()) -> {'value', _} | 'timeout'.
+
+do_yield(Key, Timeout) ->
+ receive
+ {Key,{promise_reply,R}} ->
+ {value,R}
+ after Timeout ->
+ timeout
+ end.
+
+
+%% A parallel network evaluator
+%% ArgL === [{M,F,Args},........]
+%% Returns a lists of the evaluations in the same order as
+%% given to ArgL
+-spec parallel_eval([{atom(), atom(), [_]}]) -> [_].
+
+parallel_eval(ArgL) ->
+ Nodes = [node() | nodes()],
+ Keys = map_nodes(ArgL,Nodes,Nodes),
+ [yield(K) || K <- Keys].
+
+map_nodes([],_,_) -> [];
+map_nodes(ArgL,[],Original) ->
+ map_nodes(ArgL,Original,Original);
+map_nodes([{M,F,A}|Tail],[Node|MoreNodes], Original) ->
+ [?MODULE:async_call(Node,M,F,A) |
+ map_nodes(Tail,MoreNodes,Original)].
+
+%% Parallel version of lists:map/3 with exactly the same
+%% arguments and return value as lists:map/3,
+%% except that it calls exit/1 if a network error occurs.
+-spec pmap({atom(),atom()}, [term()], [term()]) -> [term()].
+
+pmap({M,F}, As, List) ->
+ check(parallel_eval(build_args(M,F,As, List, [])), []).
+
+%% By using an accumulator twice we get the whole thing right
+build_args(M,F, As, [Arg|Tail], Acc) ->
+ build_args(M,F, As, Tail, [{M,F,[Arg|As]}|Acc]);
+build_args(M,F, _, [], Acc) when is_atom(M), is_atom(F) -> Acc.
+
+%% If one single call fails, we fail the whole computation
+check([{badrpc, _}|_], _) -> exit(badrpc);
+check([X|T], Ack) -> check(T, [X|Ack]);
+check([], Ack) -> Ack.
+
+
+%% location transparent version of process_info
+-spec pinfo(pid()) -> [{atom(), _}] | 'undefined'.
+
+pinfo(Pid) when node(Pid) =:= node() ->
+ process_info(Pid);
+pinfo(Pid) ->
+ call(node(Pid), erlang, process_info, [Pid]).
+
+-spec pinfo(pid(), Item) -> {Item, _} | 'undefined' | []
+ when is_subtype(Item, atom()).
+
+pinfo(Pid, Item) when node(Pid) =:= node() ->
+ process_info(Pid, Item);
+pinfo(Pid, Item) ->
+ block_call(node(Pid), erlang, process_info, [Pid, Item]).