%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1996-2010. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
-module(rpc).

%% General rpc, broadcast,multicall, promise and parallel evaluator
%% facility

%% This code used to reside in net.erl, but has now been moved to
%% a searate module.

-define(NAME, rex).

-behaviour(gen_server).

-export([start/0, start_link/0, stop/0,
	 call/4, call/5,
	 block_call/4, block_call/5,
	 server_call/4,
	 cast/4,
	 abcast/2,
	 abcast/3,
	 sbcast/2,
	 sbcast/3,
	 eval_everywhere/3,
	 eval_everywhere/4,
	 multi_server_call/2,
	 multi_server_call/3,
	 multicall/3,
	 multicall/4,
	 multicall/5,
	 async_call/4,
	 yield/1,
	 nb_yield/2,
	 nb_yield/1,
	 parallel_eval/1,
	 pmap/3, pinfo/1, pinfo/2]).

%% Deprecated calls.
-deprecated([{safe_multi_server_call,2},{safe_multi_server_call,3}]).
-export([safe_multi_server_call/2,safe_multi_server_call/3]).

%% gen_server exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
	 terminate/2, code_change/3]).

%% Internals
-export([proxy_user_flush/0]).

%%------------------------------------------------------------------------

-type state() :: gb_tree().

%%------------------------------------------------------------------------

%% Remote execution and broadcasting facility

-spec start() -> {'ok', pid()} | 'ignore' | {'error', term()}.

start() ->
    gen_server:start({local,?NAME}, ?MODULE, [], []).

-spec start_link() -> {'ok', pid()} | 'ignore' | {'error', term()}.

start_link() ->
    gen_server:start_link({local,?NAME}, ?MODULE, [], []).

-spec stop() -> term().

stop() ->
    stop(?NAME).

stop(Rpc) ->
    gen_server:call(Rpc, stop, infinity).

-spec init([]) -> {'ok', state()}.

init([]) ->
    process_flag(trap_exit, true),
    {ok, gb_trees:empty()}.

-spec handle_call(term(), term(), state()) ->
        {'noreply', state()} |
	{'reply', term(), state()} |
	{'stop', 'normal', 'stopped', state()}.

handle_call({call, Mod, Fun, Args, Gleader}, To, S) ->
    handle_call_call(Mod, Fun, Args, Gleader, To, S);
handle_call({block_call, Mod, Fun, Args, Gleader}, _To, S) ->
    MyGL = group_leader(),
    set_group_leader(Gleader),
    Reply = 
	case catch apply(Mod,Fun,Args) of
	    {'EXIT', _} = Exit ->
		{badrpc, Exit};
	    Other ->
		Other
	end,
    group_leader(MyGL, self()), % restore
    {reply, Reply, S};
handle_call(stop, _To, S) ->
    {stop, normal, stopped, S};
handle_call(_, _To, S) ->
    {noreply, S}.  % Ignore !

-spec handle_cast(term(), state()) -> {'noreply', state()}.

handle_cast({cast, Mod, Fun, Args, Gleader}, S) ->
    spawn(fun() ->
		  set_group_leader(Gleader),
		  apply(Mod, Fun, Args)
	  end),
    {noreply, S};
handle_cast(_, S) ->
    {noreply, S}.  % Ignore !

-spec handle_info(term(), state()) -> {'noreply', state()}.

handle_info({'DOWN', _, process, Caller, Reason}, S) ->
    case gb_trees:lookup(Caller, S) of
	{value, To} ->
	    receive
		{Caller, {reply, Reply}} ->
		    gen_server:reply(To, Reply)
	    after 0 ->
		    gen_server:reply(To, {badrpc, {'EXIT', Reason}})
	    end,
	    {noreply, gb_trees:delete(Caller, S)};
	none ->
	    {noreply, S}
    end;
handle_info({Caller, {reply, Reply}}, S) ->
    case gb_trees:lookup(Caller, S) of
	{value, To} ->
	    receive
		{'DOWN', _, process, Caller, _} -> 
		    gen_server:reply(To, Reply),
		    {noreply, gb_trees:delete(Caller, S)}
	    end;
	none ->
	    {noreply, S}
    end;
handle_info({From, {sbcast, Name, Msg}}, S) ->
    case catch Name ! Msg of  %% use catch to get the printout
	{'EXIT', _} ->
	    From ! {?NAME, node(), {nonexisting_name, Name}};
	_ -> 
	    From ! {?NAME, node(), node()}
    end,
    {noreply, S};
handle_info({From, {send, Name, Msg}}, S) ->
    case catch Name ! {From, Msg} of %% use catch to get the printout
	{'EXIT', _} ->
	    From ! {?NAME, node(), {nonexisting_name, Name}};
	_ ->
	    ok    %% It's up to Name to respond !!!!!
    end,
    {noreply, S};
handle_info({From, {call,Mod,Fun,Args,Gleader}}, S) ->
    %% Special for hidden C node's, uugh ...
    handle_call_call(Mod, Fun, Args, Gleader, {From,?NAME}, S);
handle_info(_, S) ->
    {noreply, S}.

-spec terminate(term(), state()) -> 'ok'.

terminate(_, _S) ->
    ok.

-spec code_change(term(), state(), term()) -> {'ok', state()}.

code_change(_, S, _) ->
    {ok, S}.

%%
%% Auxiliary function to avoid a false dialyzer warning -- do not inline
%%
handle_call_call(Mod, Fun, Args, Gleader, To, S) ->
    RpcServer = self(),
    %% Spawn not to block the rpc server.
    {Caller,_} =
	erlang:spawn_monitor(
	  fun () ->
		  set_group_leader(Gleader),
		  Reply = 
		      %% in case some sucker rex'es 
		      %% something that throws
		      case catch apply(Mod, Fun, Args) of
			  {'EXIT', _} = Exit ->
			      {badrpc, Exit};
			  Result ->
			      Result
		      end,
		  RpcServer ! {self(), {reply, Reply}}
	  end),
    {noreply, gb_trees:insert(Caller, To, S)}.


%% RPC aid functions ....

set_group_leader(Gleader) when is_pid(Gleader) -> 
    group_leader(Gleader, self());
set_group_leader(user) -> 
    %% For example, hidden C nodes doesn't want any I/O.
    Gleader = case whereis(user) of
		  Pid when is_pid(Pid) -> Pid;
		  undefined -> proxy_user()
	      end,
    group_leader(Gleader, self()).


%% The 'rex_proxy_user' process serve as group leader for early rpc's that
%% may do IO before the real group leader 'user' has been started (OTP-7903).
proxy_user() ->
    case whereis(rex_proxy_user) of
	Pid when is_pid(Pid) -> Pid;
	undefined ->
	    Pid = spawn(fun() -> proxy_user_loop() end),
	    try register(rex_proxy_user,Pid) of
		true -> Pid
	    catch error:_ -> % spawn race, kill and try again
		exit(Pid,kill),
		proxy_user()
	    end
    end.

proxy_user_loop() ->
    %% Wait for the real 'user' to start
    timer:sleep(200),
    case whereis(user) of
	Pid when is_pid(Pid) -> proxy_user_flush();
	undefined -> proxy_user_loop()
    end.

-spec proxy_user_flush() -> no_return().

proxy_user_flush() ->
    %% Forward all received messages to 'user'
    receive Msg ->
	    user ! Msg
    after 10*1000 ->
	    %% Hibernate but live for ever, as it's not easy to know
	    %% when no more messages will arrive.
	    erlang:hibernate(?MODULE, proxy_user_flush, [])
    end,
    proxy_user_flush().


%% THE rpc client interface

-spec call(node(), atom(), atom(), [term()]) -> term().

call(N,M,F,A) when node() =:= N ->  %% Optimize local call
    local_call(M, F, A);
call(N,M,F,A) ->
    do_call(N, {call,M,F,A,group_leader()}, infinity).

-spec call(node(), atom(), atom(), [term()], timeout()) -> term().

call(N,M,F,A,_Timeout) when node() =:= N ->  %% Optimize local call
    local_call(M,F,A);
call(N,M,F,A,infinity) ->
    do_call(N, {call,M,F,A,group_leader()}, infinity);
call(N,M,F,A,Timeout) when is_integer(Timeout), Timeout >= 0 ->
    do_call(N, {call,M,F,A,group_leader()}, Timeout).

-spec block_call(node(), atom(), atom(), [term()]) -> term().

block_call(N,M,F,A) when node() =:= N -> %% Optimize local call
    local_call(M,F,A);
block_call(N,M,F,A) ->
    do_call(N, {block_call,M,F,A,group_leader()}, infinity).

-spec block_call(node(), atom(), atom(), [term()], timeout()) -> term().

block_call(N,M,F,A,_Timeout) when node() =:= N ->  %% Optimize local call
    local_call(M, F, A);
block_call(N,M,F,A,infinity) ->
    do_call(N, {block_call,M,F,A,group_leader()}, infinity);
block_call(N,M,F,A,Timeout) when is_integer(Timeout), Timeout >= 0 ->
    do_call(N, {block_call,M,F,A,group_leader()}, Timeout).

local_call(M, F, A) when is_atom(M), is_atom(F), is_list(A) ->
    case catch apply(M, F, A) of
	{'EXIT',_}=V -> {badrpc, V};
	Other -> Other
    end.

do_call(Node, Request, infinity) ->
    rpc_check(catch gen_server:call({?NAME,Node}, Request, infinity));
do_call(Node, Request, Timeout) ->
    Tag = make_ref(),
    {Receiver,Mref} =
	erlang:spawn_monitor(
	  fun() ->
		  %% Middleman process. Should be unsensitive to regular
		  %% exit signals.
		  process_flag(trap_exit, true),
		  Result = gen_server:call({?NAME,Node}, Request, Timeout),
		  exit({self(),Tag,Result})
	  end),
    receive
	{'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
	    rpc_check(Result);
	{'DOWN',Mref,_,_,Reason} ->
	    %% The middleman code failed. Or someone did 
	    %% exit(_, kill) on the middleman process => Reason==killed
	    rpc_check_t({'EXIT',Reason})
    end.

rpc_check_t({'EXIT', {timeout,_}}) -> {badrpc, timeout};
rpc_check_t(X) -> rpc_check(X).
	    
rpc_check({'EXIT', {{nodedown,_},_}}) -> {badrpc, nodedown};
rpc_check({'EXIT', X}) -> exit(X);
rpc_check(X) -> X.


%% This is a real handy function to be used when interacting with
%% a server called Name at node Node, It is assumed that the server
%% Receives messages on the form {From, Request} and replies on the
%% form From ! {ReplyWrapper, Node, Reply}.
%% This function makes such a server call and ensures that that
%% The entire call is packed into an atomic transaction which 
%% either succeeds or fails, i.e. never hangs (unless the server itself hangs).

-spec server_call(node(), atom(), term(), term()) -> term() | {'error', 'nodedown'}.

server_call(Node, Name, ReplyWrapper, Msg) 
  when is_atom(Node), is_atom(Name) ->
    if node() =:= nonode@nohost, Node =/= nonode@nohost ->
	    {error, nodedown};
       true ->
	    Ref = erlang:monitor(process, {Name, Node}),
	    {Name, Node} ! {self(), Msg},
	    receive
		{'DOWN', Ref, _, _, _} ->
		    {error, nodedown};
		{ReplyWrapper, Node, Reply} ->
		    erlang:demonitor(Ref),
		    receive
			{'DOWN', Ref, _, _, _} ->
			    Reply
		    after 0 ->
			    Reply
		    end
	    end
    end.

-spec cast(node(), atom(), atom(), [term()]) -> 'true'.

cast(Node, Mod, Fun, Args) when Node =:= node() ->
    catch spawn(Mod, Fun, Args),
    true;
cast(Node, Mod, Fun, Args) ->
    gen_server:cast({?NAME,Node}, {cast,Mod,Fun,Args,group_leader()}),
    true.


%% Asynchronous broadcast, returns nothing, it's just send'n prey
-spec abcast(atom(), term()) -> 'abcast'.

abcast(Name, Mess) ->
    abcast([node() | nodes()], Name, Mess).

-spec abcast([node()], atom(), term()) -> 'abcast'.

abcast([Node|Tail], Name, Mess) ->
    Dest = {Name,Node},
    case catch erlang:send(Dest, Mess, [noconnect]) of
	noconnect -> spawn(erlang, send, [Dest,Mess]);
	_ -> ok
    end,
    abcast(Tail, Name, Mess);
abcast([], _,_) -> abcast.


%% Syncronous broadcast, returns a list of the nodes which had Name
%% as a registered server. Returns {Goodnodes, Badnodes}.
%% Syncronous in the sense that we know that all servers have received the
%% message when we return from the call, we can't know that they have
%% processed the message though.

-spec sbcast(atom(), term()) -> {[node()], [node()]}.

sbcast(Name, Mess) ->
    sbcast([node() | nodes()], Name, Mess).

-spec sbcast([node()], atom(), term()) -> {[node()], [node()]}.

sbcast(Nodes, Name, Mess) ->
    Monitors = send_nodes(Nodes, ?NAME, {sbcast, Name, Mess}, []),
    rec_nodes(?NAME, Monitors).

-spec eval_everywhere(atom(), atom(), [term()]) -> 'abcast'.

eval_everywhere(Mod, Fun, Args) ->
    eval_everywhere([node() | nodes()] , Mod, Fun, Args).

-spec eval_everywhere([node()], atom(), atom(), [term()]) -> 'abcast'.

eval_everywhere(Nodes, Mod, Fun, Args) ->
    gen_server:abcast(Nodes, ?NAME, {cast,Mod,Fun,Args,group_leader()}).


send_nodes([Node|Tail], Name, Msg, Monitors) when is_atom(Node) ->
    Monitor = start_monitor(Node, Name),
    %% Handle non-existing names in rec_nodes.
    catch {Name, Node} ! {self(), Msg},
    send_nodes(Tail, Name, Msg, [Monitor | Monitors]);
send_nodes([_Node|Tail], Name, Msg, Monitors) ->
    %% Skip non-atom _Node
    send_nodes(Tail, Name, Msg, Monitors);
send_nodes([], _Name,  _Req, Monitors) -> 
    Monitors.

%% Starts a monitor, either the new way, or the old.
%% Assumes that the arguments are atoms.
start_monitor(Node, Name) ->
    if node() =:= nonode@nohost, Node =/= nonode@nohost ->
	    Ref = make_ref(),
	    self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
	    {Node, Ref};
       true ->
	    {Node,erlang:monitor(process, {Name, Node})}
    end.

%% Cancels a monitor started with Ref=erlang:monitor(_, _),
%% i.e return value {Node, Ref} from start_monitor/2 above.
unmonitor(Ref) when is_reference(Ref) ->
    erlang:demonitor(Ref),
    receive
	{'DOWN', Ref, _, _, _} ->
	    true
    after 0 ->
	    true
    end.


%% Call apply(M,F,A) on all nodes in parallel
-spec multicall(atom(), atom(), [term()]) -> {[_], [node()]}.

multicall(M, F, A) -> 
    multicall(M, F, A, infinity).

-spec multicall([node()], atom(), atom(), [term()])  -> {[_], [node()]}
	     ; (atom(), atom(), [term()], timeout()) -> {[_], [node()]}.

multicall(Nodes, M, F, A) when is_list(Nodes) ->
    multicall(Nodes, M, F, A, infinity);
multicall(M, F, A, Timeout) ->
    multicall([node() | nodes()], M, F, A, Timeout).

-spec multicall([node()], atom(), atom(), [term()], timeout()) -> {[_], [node()]}.

multicall(Nodes, M, F, A, infinity)
  when is_list(Nodes), is_atom(M), is_atom(F), is_list(A) ->
    do_multicall(Nodes, M, F, A, infinity);
multicall(Nodes, M, F, A, Timeout) 
  when is_list(Nodes), is_atom(M), is_atom(F), is_list(A), is_integer(Timeout), 
       Timeout >= 0 ->
    do_multicall(Nodes, M, F, A, Timeout).

do_multicall(Nodes, M, F, A, Timeout) ->
    {Rep,Bad} = gen_server:multi_call(Nodes, ?NAME, 
				      {call, M,F,A, group_leader()}, 
				      Timeout),
    {lists:map(fun({_,R}) -> R end, Rep), Bad}.


%% Send Msg to Name on all nodes, and collect the answers.
%% Return {Replies, Badnodes} where Badnodes is a list of the nodes
%% that failed during the timespan of the call.
%% This function assumes that if we send a request to a server 
%% called Name, the server will reply with a reply
%% on the form {Name, Node, Reply}, otherwise this function will
%% hang forever. 
%% It also assumes that the server receives messages on the form
%% {From, Msg} and then replies as From ! {Name, node(), Reply}.
%%
%% There is no apparent order among the replies.

-spec multi_server_call(atom(), term()) -> {[_], [node()]}.

multi_server_call(Name, Msg) ->
    multi_server_call([node() | nodes()], Name, Msg).

-spec multi_server_call([node()], atom(), term()) -> {[_], [node()]}.

multi_server_call(Nodes, Name, Msg) 
  when is_list(Nodes), is_atom(Name) ->
    Monitors = send_nodes(Nodes, Name, Msg, []),
    rec_nodes(Name, Monitors).

%% Deprecated functions. Were only needed when communicating with R6 nodes.

safe_multi_server_call(Name, Msg) ->
    multi_server_call(Name, Msg).

safe_multi_server_call(Nodes, Name, Msg) ->
    multi_server_call(Nodes, Name, Msg).


rec_nodes(Name, Nodes) -> 
    rec_nodes(Name, Nodes, [], []).

rec_nodes(_Name, [],  Badnodes, Replies) ->
    {Replies, Badnodes};
rec_nodes(Name, [{N,R} | Tail], Badnodes, Replies) ->
    receive
	{'DOWN', R, _, _, _} ->
	    rec_nodes(Name, Tail, [N|Badnodes], Replies);
	{?NAME, N, {nonexisting_name, _}} ->  
	    %% used by sbcast()
	    unmonitor(R),
	    rec_nodes(Name, Tail, [N|Badnodes], Replies);
	{Name, N, Reply} ->  %% Name is bound !!!
	    unmonitor(R),
	    rec_nodes(Name, Tail, Badnodes, [Reply|Replies])
    end.

%% Now for an asynchronous rpc.
%% An asyncronous version of rpc that is faster for series of
%% rpc's towards the same node. I.e. it returns immediately and 
%% it returns a Key that can be used in a subsequent yield(Key).

-spec async_call(node(), atom(), atom(), [term()]) -> pid().

async_call(Node, Mod, Fun, Args) ->
    ReplyTo = self(),
    spawn(
      fun() ->
	      R = call(Node, Mod, Fun, Args),         %% proper rpc
	      ReplyTo ! {self(), {promise_reply, R}}  %% self() is key
      end).

-spec yield(pid()) -> term().

yield(Key) when is_pid(Key) ->
    {value,R} = do_yield(Key, infinity),
    R.

-spec nb_yield(pid(), timeout()) -> {'value', _} | 'timeout'.

nb_yield(Key, infinity=Inf) when is_pid(Key) ->
    do_yield(Key, Inf);
nb_yield(Key, Timeout) when is_pid(Key), is_integer(Timeout), Timeout >= 0 ->
    do_yield(Key, Timeout).

-spec nb_yield(pid()) -> {'value', _} | 'timeout'.

nb_yield(Key) when is_pid(Key) ->
    do_yield(Key, 0).

-spec do_yield(pid(), timeout()) -> {'value', _} | 'timeout'.

do_yield(Key, Timeout) ->
    receive
        {Key,{promise_reply,R}} ->
            {value,R}
        after Timeout ->
            timeout
    end.


%% A parallel network evaluator
%% ArgL === [{M,F,Args},........]
%% Returns a lists of the evaluations in the same order as 
%% given to ArgL
-spec parallel_eval([{atom(), atom(), [_]}]) -> [_].

parallel_eval(ArgL) ->
    Nodes = [node() | nodes()],
    Keys = map_nodes(ArgL,Nodes,Nodes),
    [yield(K) || K <- Keys].

map_nodes([],_,_) -> [];
map_nodes(ArgL,[],Original) ->
    map_nodes(ArgL,Original,Original); 
map_nodes([{M,F,A}|Tail],[Node|MoreNodes], Original) ->
    [?MODULE:async_call(Node,M,F,A) | 
     map_nodes(Tail,MoreNodes,Original)].

%% Parallel version of lists:map/3 with exactly the same 
%% arguments and return value as lists:map/3,
%% except that it calls exit/1 if a network error occurs.
-spec pmap({atom(),atom()}, [term()], [term()]) -> [term()].

pmap({M,F}, As, List) ->
    check(parallel_eval(build_args(M,F,As, List, [])), []).

%% By using an accumulator twice we get the whole thing right
build_args(M,F, As, [Arg|Tail], Acc) ->
    build_args(M,F, As, Tail, [{M,F,[Arg|As]}|Acc]);
build_args(M,F, _, [], Acc) when is_atom(M), is_atom(F) -> Acc.

%% If one single call fails, we fail the whole computation
check([{badrpc, _}|_], _) -> exit(badrpc);
check([X|T], Ack) -> check(T, [X|Ack]);
check([], Ack) -> Ack.


%% location transparent version of process_info
-spec pinfo(pid()) -> [{atom(), _}] | 'undefined'.

pinfo(Pid) when node(Pid) =:= node() ->
    process_info(Pid);
pinfo(Pid) ->
    call(node(Pid), erlang, process_info, [Pid]).

-spec pinfo(pid(), Item) -> {Item, _} | 'undefined' | []
				when is_subtype(Item, atom()).

pinfo(Pid, Item) when node(Pid) =:= node() ->
    process_info(Pid, Item);
pinfo(Pid, Item) ->
    block_call(node(Pid), erlang, process_info, [Pid, Item]).