diff options
author | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
---|---|---|
committer | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
commit | 84adefa331c4159d432d22840663c38f155cd4c1 (patch) | |
tree | bff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/kernel/src/rpc.erl | |
download | otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2 otp-84adefa331c4159d432d22840663c38f155cd4c1.zip |
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/kernel/src/rpc.erl')
-rw-r--r-- | lib/kernel/src/rpc.erl | 609 |
1 files changed, 609 insertions, 0 deletions
diff --git a/lib/kernel/src/rpc.erl b/lib/kernel/src/rpc.erl new file mode 100644 index 0000000000..d69f2a12ad --- /dev/null +++ b/lib/kernel/src/rpc.erl @@ -0,0 +1,609 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1996-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(rpc). + +%% General rpc, broadcast,multicall, promise and parallel evaluator +%% facility + +%% This code used to reside in net.erl, but has now been moved to +%% a searate module. + +-define(NAME, rex). + +-behaviour(gen_server). + +-export([start/0, start_link/0, stop/0, + call/4, call/5, + block_call/4, block_call/5, + server_call/4, + cast/4, + abcast/2, + abcast/3, + sbcast/2, + sbcast/3, + eval_everywhere/3, + eval_everywhere/4, + multi_server_call/2, + multi_server_call/3, + multicall/3, + multicall/4, + multicall/5, + async_call/4, + yield/1, + nb_yield/2, + nb_yield/1, + parallel_eval/1, + pmap/3, pinfo/1, pinfo/2]). + +%% Deprecated calls. +-deprecated([{safe_multi_server_call,2},{safe_multi_server_call,3}]). +-export([safe_multi_server_call/2,safe_multi_server_call/3]). + +%% gen_server exports +-export([init/1,handle_call/3,handle_cast/2,handle_info/2, + terminate/2, code_change/3]). + +%% Internals +-export([proxy_user_flush/0]). + +%%------------------------------------------------------------------------ + +%% Remote execution and broadcasting facility + +start() -> + gen_server:start({local,?NAME},?MODULE,[],[]). + +start_link() -> + gen_server:start_link({local,?NAME},?MODULE,[],[]). + +stop() -> + stop(?NAME). + +stop(Rpc) -> + gen_server:call(Rpc, stop, infinity). + +-spec init([]) -> {'ok', gb_tree()}. +init([]) -> + process_flag(trap_exit, true), + {ok, gb_trees:empty()}. + +handle_call({call, Mod, Fun, Args, Gleader}, To, S) -> + handle_call_call(Mod, Fun, Args, Gleader, To, S); +handle_call({block_call, Mod, Fun, Args, Gleader}, _To, S) -> + MyGL = group_leader(), + set_group_leader(Gleader), + Reply = + case catch apply(Mod,Fun,Args) of + {'EXIT', _} = Exit -> + {badrpc, Exit}; + Other -> + Other + end, + group_leader(MyGL, self()), % restore + {reply, Reply, S}; +handle_call(stop, _To, S) -> + {stop, normal, stopped, S}; +handle_call(_, _To, S) -> + {noreply, S}. % Ignore ! + + +handle_cast({cast, Mod, Fun, Args, Gleader}, S) -> + spawn( + fun() -> + set_group_leader(Gleader), + apply(Mod, Fun, Args) + end), + {noreply, S}; +handle_cast(_, S) -> + {noreply, S}. % Ignore ! + + +handle_info({'DOWN', _, process, Caller, Reason}, S) -> + case gb_trees:lookup(Caller, S) of + {value, To} -> + receive + {Caller, {reply, Reply}} -> + gen_server:reply(To, Reply) + after 0 -> + gen_server:reply(To, {badrpc, {'EXIT', Reason}}) + end, + {noreply, gb_trees:delete(Caller, S)}; + none -> + {noreply, S} + end; +handle_info({Caller, {reply, Reply}}, S) -> + case gb_trees:lookup(Caller, S) of + {value, To} -> + receive + {'DOWN', _, process, Caller, _} -> + gen_server:reply(To, Reply), + {noreply, gb_trees:delete(Caller, S)} + end; + none -> + {noreply, S} + end; +handle_info({From, {sbcast, Name, Msg}}, S) -> + case catch Name ! Msg of %% use catch to get the printout + {'EXIT', _} -> + From ! {?NAME, node(), {nonexisting_name, Name}}; + _ -> + From ! {?NAME, node(), node()} + end, + {noreply,S}; +handle_info({From, {send, Name, Msg}}, S) -> + case catch Name ! {From, Msg} of %% use catch to get the printout + {'EXIT', _} -> + From ! {?NAME, node(), {nonexisting_name, Name}}; + _ -> + ok %% It's up to Name to respond !!!!! + end, + {noreply,S}; +handle_info({From, {call,Mod,Fun,Args,Gleader}}, S) -> + %% Special for hidden C node's, uugh ... + handle_call_call(Mod, Fun, Args, Gleader, {From,?NAME}, S); +handle_info(_, S) -> + {noreply,S}. + +terminate(_, _S) -> + ok. + +code_change(_, S, _) -> + {ok, S}. + +%% +%% Auxiliary function to avoid a false dialyzer warning -- do not inline +%% +handle_call_call(Mod, Fun, Args, Gleader, To, S) -> + RpcServer = self(), + %% Spawn not to block the rpc server. + {Caller,_} = + erlang:spawn_monitor( + fun () -> + set_group_leader(Gleader), + Reply = + %% in case some sucker rex'es + %% something that throws + case catch apply(Mod, Fun, Args) of + {'EXIT', _} = Exit -> + {badrpc, Exit}; + Result -> + Result + end, + RpcServer ! {self(), {reply, Reply}} + end), + {noreply, gb_trees:insert(Caller, To, S)}. + + +%% RPC aid functions .... + +set_group_leader(Gleader) when is_pid(Gleader) -> + group_leader(Gleader, self()); +set_group_leader(user) -> + %% For example, hidden C nodes doesn't want any I/O. + Gleader = case whereis(user) of + Pid when is_pid(Pid) -> Pid; + undefined -> proxy_user() + end, + group_leader(Gleader, self()). + + +%% The 'rex_proxy_user' process serve as group leader for early rpc's that +%% may do IO before the real group leader 'user' has been started (OTP-7903). +proxy_user() -> + case whereis(rex_proxy_user) of + Pid when is_pid(Pid) -> Pid; + undefined -> + Pid = spawn(fun()-> proxy_user_loop() end), + try register(rex_proxy_user,Pid) of + true -> Pid + catch error:_ -> % spawn race, kill and try again + exit(Pid,kill), + proxy_user() + end + end. + +proxy_user_loop() -> + %% Wait for the real 'user' to start + timer:sleep(200), + case whereis(user) of + Pid when is_pid(Pid) -> proxy_user_flush(); + undefined -> proxy_user_loop() + end. + +proxy_user_flush() -> + %% Forward all received messages to 'user' + receive Msg -> + user ! Msg + after 10*1000 -> + %% Hibernate but live for ever, as it's not easy to know + %% when no more messages will arrive. + erlang:hibernate(?MODULE, proxy_user_flush, []) + end, + proxy_user_flush(). + + +%% THE rpc client interface + +-spec call(node(), atom(), atom(), [term()]) -> term(). + +call(N,M,F,A) when node() =:= N -> %% Optimize local call + local_call(M, F, A); +call(N,M,F,A) -> + do_call(N, {call,M,F,A,group_leader()}, infinity). + +-spec call(node(), atom(), atom(), [term()], timeout()) -> term(). + +call(N,M,F,A,_Timeout) when node() =:= N -> %% Optimize local call + local_call(M,F,A); +call(N,M,F,A,infinity) -> + do_call(N, {call,M,F,A,group_leader()}, infinity); +call(N,M,F,A,Timeout) when is_integer(Timeout), Timeout >= 0 -> + do_call(N, {call,M,F,A,group_leader()}, Timeout). + +-spec block_call(node(), atom(), atom(), [term()]) -> term(). + +block_call(N,M,F,A) when node() =:= N -> %% Optimize local call + local_call(M,F,A); +block_call(N,M,F,A) -> + do_call(N, {block_call,M,F,A,group_leader()}, infinity). + +-spec block_call(node(), atom(), atom(), [term()], timeout()) -> term(). + +block_call(N,M,F,A,_Timeout) when node() =:= N -> %% Optimize local call + local_call(M, F, A); +block_call(N,M,F,A,infinity) -> + do_call(N, {block_call,M,F,A,group_leader()}, infinity); +block_call(N,M,F,A,Timeout) when is_integer(Timeout), Timeout >= 0 -> + do_call(N, {block_call,M,F,A,group_leader()}, Timeout). + +local_call(M, F, A) when is_atom(M), is_atom(F), is_list(A) -> + case catch apply(M, F, A) of + {'EXIT',_}=V -> {badrpc, V}; + Other -> Other + end. + +do_call(Node, Request, infinity) -> + rpc_check(catch gen_server:call({?NAME,Node}, Request, infinity)); +do_call(Node, Request, Timeout) -> + Tag = make_ref(), + {Receiver,Mref} = + erlang:spawn_monitor( + fun() -> + %% Middleman process. Should be unsensitive to regular + %% exit signals. + process_flag(trap_exit, true), + Result = gen_server:call({?NAME,Node}, Request, Timeout), + exit({self(),Tag,Result}) + end), + receive + {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> + rpc_check(Result); + {'DOWN',Mref,_,_,Reason} -> + %% The middleman code failed. Or someone did + %% exit(_, kill) on the middleman process => Reason==killed + rpc_check_t({'EXIT',Reason}) + end. + +rpc_check_t({'EXIT', {timeout,_}}) -> {badrpc, timeout}; +rpc_check_t(X) -> rpc_check(X). + +rpc_check({'EXIT', {{nodedown,_},_}}) -> {badrpc, nodedown}; +rpc_check({'EXIT', X}) -> exit(X); +rpc_check(X) -> X. + + +%% This is a real handy function to be used when interacting with +%% a server called Name at node Node, It is assumed that the server +%% Receives messages on the form {From, Request} and replies on the +%% form From ! {ReplyWrapper, Node, Reply}. +%% This function makes such a server call and ensures that that +%% The entire call is packed into an atomic transaction which +%% either succeeds or fails, i.e. never hangs (unless the server itself hangs). + +-spec server_call(node(), atom(), term(), term()) -> term() | {'error', 'nodedown'}. + +server_call(Node, Name, ReplyWrapper, Msg) + when is_atom(Node), is_atom(Name) -> + if node() =:= nonode@nohost, Node =/= nonode@nohost -> + {error, nodedown}; + true -> + Ref = erlang:monitor(process, {Name, Node}), + {Name, Node} ! {self(), Msg}, + receive + {'DOWN', Ref, _, _, _} -> + {error, nodedown}; + {ReplyWrapper, Node, Reply} -> + erlang:demonitor(Ref), + receive + {'DOWN', Ref, _, _, _} -> + Reply + after 0 -> + Reply + end + end + end. + +-spec cast(node(), atom(), atom(), [term()]) -> 'true'. + +cast(Node, Mod, Fun, Args) when Node =:= node() -> + catch spawn(Mod, Fun, Args), + true; +cast(Node, Mod, Fun, Args) -> + gen_server:cast({?NAME,Node}, {cast,Mod,Fun,Args,group_leader()}), + true. + + +%% Asynchronous broadcast, returns nothing, it's just send'n prey +-spec abcast(atom(), term()) -> 'abcast'. + +abcast(Name, Mess) -> + abcast([node() | nodes()], Name, Mess). + +-spec abcast([node()], atom(), term()) -> 'abcast'. + +abcast([Node|Tail], Name, Mess) -> + Dest = {Name,Node}, + case catch erlang:send(Dest, Mess, [noconnect]) of + noconnect -> spawn(erlang, send, [Dest,Mess]); + _ -> ok + end, + abcast(Tail, Name, Mess); +abcast([], _,_) -> abcast. + + +%% Syncronous broadcast, returns a list of the nodes which had Name +%% as a registered server. Returns {Goodnodes, Badnodes}. +%% Syncronous in the sense that we know that all servers have received the +%% message when we return from the call, we can't know that they have +%% processed the message though. + +-spec sbcast(atom(), term()) -> {[node()], [node()]}. + +sbcast(Name, Mess) -> + sbcast([node() | nodes()], Name, Mess). + +-spec sbcast([node()], atom(), term()) -> {[node()], [node()]}. + +sbcast(Nodes, Name, Mess) -> + Monitors = send_nodes(Nodes, ?NAME, {sbcast, Name, Mess}, []), + rec_nodes(?NAME, Monitors). + +-spec eval_everywhere(atom(), atom(), [term()]) -> 'abcast'. + +eval_everywhere(Mod, Fun, Args) -> + eval_everywhere([node() | nodes()] , Mod, Fun, Args). + +-spec eval_everywhere([node()], atom(), atom(), [term()]) -> 'abcast'. + +eval_everywhere(Nodes, Mod, Fun, Args) -> + gen_server:abcast(Nodes, ?NAME, {cast,Mod,Fun,Args,group_leader()}). + + +send_nodes([Node|Tail], Name, Msg, Monitors) when is_atom(Node) -> + Monitor = start_monitor(Node, Name), + %% Handle non-existing names in rec_nodes. + catch {Name, Node} ! {self(), Msg}, + send_nodes(Tail, Name, Msg, [Monitor | Monitors]); +send_nodes([_Node|Tail], Name, Msg, Monitors) -> + %% Skip non-atom _Node + send_nodes(Tail, Name, Msg, Monitors); +send_nodes([], _Name, _Req, Monitors) -> + Monitors. + +%% Starts a monitor, either the new way, or the old. +%% Assumes that the arguments are atoms. +start_monitor(Node, Name) -> + if node() =:= nonode@nohost, Node =/= nonode@nohost -> + Ref = make_ref(), + self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, + {Node, Ref}; + true -> + {Node,erlang:monitor(process, {Name, Node})} + end. + +%% Cancels a monitor started with Ref=erlang:monitor(_, _), +%% i.e return value {Node, Ref} from start_monitor/2 above. +unmonitor(Ref) when is_reference(Ref) -> + erlang:demonitor(Ref), + receive + {'DOWN', Ref, _, _, _} -> + true + after 0 -> + true + end. + + +%% Call apply(M,F,A) on all nodes in parallel +-spec multicall(atom(), atom(), [term()]) -> {[_], [node()]}. + +multicall(M, F, A) -> + multicall(M, F, A, infinity). + +-spec multicall([node()], atom(), atom(), [term()]) -> {[_], [node()]} + ; (atom(), atom(), [term()], timeout()) -> {[_], [node()]}. + +multicall(Nodes, M, F, A) when is_list(Nodes) -> + multicall(Nodes, M, F, A, infinity); +multicall(M, F, A, Timeout) -> + multicall([node() | nodes()], M, F, A, Timeout). + +-spec multicall([node()], atom(), atom(), [term()], timeout()) -> {[_], [node()]}. + +multicall(Nodes, M, F, A, infinity) + when is_list(Nodes), is_atom(M), is_atom(F), is_list(A) -> + do_multicall(Nodes, M, F, A, infinity); +multicall(Nodes, M, F, A, Timeout) + when is_list(Nodes), is_atom(M), is_atom(F), is_list(A), is_integer(Timeout), + Timeout >= 0 -> + do_multicall(Nodes, M, F, A, Timeout). + +do_multicall(Nodes, M, F, A, Timeout) -> + {Rep,Bad} = gen_server:multi_call(Nodes, ?NAME, + {call, M,F,A, group_leader()}, + Timeout), + {lists:map(fun({_,R}) -> R end, Rep), Bad}. + + +%% Send Msg to Name on all nodes, and collect the answers. +%% Return {Replies, Badnodes} where Badnodes is a list of the nodes +%% that failed during the timespan of the call. +%% This function assumes that if we send a request to a server +%% called Name, the server will reply with a reply +%% on the form {Name, Node, Reply}, otherwise this function will +%% hang forever. +%% It also assumes that the server receives messages on the form +%% {From, Msg} and then replies as From ! {Name, node(), Reply}. +%% +%% There is no apparent order among the replies. + +-spec multi_server_call(atom(), term()) -> {[_], [node()]}. + +multi_server_call(Name, Msg) -> + multi_server_call([node() | nodes()], Name, Msg). + +-spec multi_server_call([node()], atom(), term()) -> {[_], [node()]}. + +multi_server_call(Nodes, Name, Msg) + when is_list(Nodes), is_atom(Name) -> + Monitors = send_nodes(Nodes, Name, Msg, []), + rec_nodes(Name, Monitors). + +%% Deprecated functions. Were only needed when communicating with R6 nodes. + +safe_multi_server_call(Name, Msg) -> + multi_server_call(Name, Msg). + +safe_multi_server_call(Nodes, Name, Msg) -> + multi_server_call(Nodes, Name, Msg). + + +rec_nodes(Name, Nodes) -> + rec_nodes(Name, Nodes, [], []). + +rec_nodes(_Name, [], Badnodes, Replies) -> + {Replies, Badnodes}; +rec_nodes(Name, [{N,R} | Tail], Badnodes, Replies) -> + receive + {'DOWN', R, _, _, _} -> + rec_nodes(Name, Tail, [N|Badnodes], Replies); + {?NAME, N, {nonexisting_name, _}} -> + %% used by sbcast() + unmonitor(R), + rec_nodes(Name, Tail, [N|Badnodes], Replies); + {Name, N, Reply} -> %% Name is bound !!! + unmonitor(R), + rec_nodes(Name, Tail, Badnodes, [Reply|Replies]) + end. + +%% Now for an asynchronous rpc. +%% An asyncronous version of rpc that is faster for series of +%% rpc's towards the same node. I.e. it returns immediately and +%% it returns a Key that can be used in a subsequent yield(Key). + +-spec async_call(node(), atom(), atom(), [term()]) -> pid(). + +async_call(Node, Mod, Fun, Args) -> + ReplyTo = self(), + spawn( + fun() -> + R = call(Node, Mod, Fun, Args), %% proper rpc + ReplyTo ! {self(), {promise_reply, R}} %% self() is key + end). + +-spec yield(pid()) -> term(). + +yield(Key) when is_pid(Key) -> + {value,R} = do_yield(Key, infinity), + R. + +-spec nb_yield(pid(), timeout()) -> {'value', _} | 'timeout'. + +nb_yield(Key, infinity=Inf) when is_pid(Key) -> + do_yield(Key, Inf); +nb_yield(Key, Timeout) when is_pid(Key), is_integer(Timeout), Timeout >= 0 -> + do_yield(Key, Timeout). + +-spec nb_yield(pid()) -> {'value', _} | 'timeout'. + +nb_yield(Key) when is_pid(Key) -> + do_yield(Key, 0). + +-spec do_yield(pid(), timeout()) -> {'value', _} | 'timeout'. + +do_yield(Key, Timeout) -> + receive + {Key,{promise_reply,R}} -> + {value,R} + after Timeout -> + timeout + end. + + +%% A parallel network evaluator +%% ArgL === [{M,F,Args},........] +%% Returns a lists of the evaluations in the same order as +%% given to ArgL +-spec parallel_eval([{atom(), atom(), [_]}]) -> [_]. + +parallel_eval(ArgL) -> + Nodes = [node() | nodes()], + Keys = map_nodes(ArgL,Nodes,Nodes), + [yield(K) || K <- Keys]. + +map_nodes([],_,_) -> []; +map_nodes(ArgL,[],Original) -> + map_nodes(ArgL,Original,Original); +map_nodes([{M,F,A}|Tail],[Node|MoreNodes], Original) -> + [?MODULE:async_call(Node,M,F,A) | + map_nodes(Tail,MoreNodes,Original)]. + +%% Parallel version of lists:map/3 with exactly the same +%% arguments and return value as lists:map/3, +%% except that it calls exit/1 if a network error occurs. +-spec pmap({atom(),atom()}, [term()], [term()]) -> [term()]. + +pmap({M,F}, As, List) -> + check(parallel_eval(build_args(M,F,As, List, [])), []). + +%% By using an accumulator twice we get the whole thing right +build_args(M,F, As, [Arg|Tail], Acc) -> + build_args(M,F, As, Tail, [{M,F,[Arg|As]}|Acc]); +build_args(M,F, _, [], Acc) when is_atom(M), is_atom(F) -> Acc. + +%% If one single call fails, we fail the whole computation +check([{badrpc, _}|_], _) -> exit(badrpc); +check([X|T], Ack) -> check(T, [X|Ack]); +check([], Ack) -> Ack. + + +%% location transparent version of process_info +-spec pinfo(pid()) -> [{atom(), _}] | 'undefined'. + +pinfo(Pid) when node(Pid) =:= node() -> + process_info(Pid); +pinfo(Pid) -> + call(node(Pid), erlang, process_info, [Pid]). + +-spec pinfo(pid(), Item) -> {Item, _} | 'undefined' | [] + when is_subtype(Item, atom()). + +pinfo(Pid, Item) when node(Pid) =:= node() -> + process_info(Pid, Item); +pinfo(Pid, Item) -> + block_call(node(Pid), erlang, process_info, [Pid, Item]). |