diff options
Diffstat (limited to 'lib/kernel/src/rpc.erl')
-rw-r--r-- | lib/kernel/src/rpc.erl | 93 |
1 files changed, 36 insertions, 57 deletions
diff --git a/lib/kernel/src/rpc.erl b/lib/kernel/src/rpc.erl index d3db8eb80a..0e0b7dffa3 100644 --- a/lib/kernel/src/rpc.erl +++ b/lib/kernel/src/rpc.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1996-2014. All Rights Reserved. +%% Copyright Ericsson AB 1996-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -52,10 +52,6 @@ parallel_eval/1, pmap/3, pinfo/1, pinfo/2]). -%% Deprecated calls. --deprecated([{safe_multi_server_call,2},{safe_multi_server_call,3}]). --export([safe_multi_server_call/2,safe_multi_server_call/3]). - %% gen_server exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -67,21 +63,31 @@ %%------------------------------------------------------------------------ --type state() :: gb_trees:tree(pid(), {pid(), reference()}). +-type state() :: map(). %%------------------------------------------------------------------------ + +%% The rex server may receive a huge amount of +%% messages. Make sure that they are stored off heap to +%% avoid exessive GCs. + +-define(SPAWN_OPTS, [{spawn_opt,[{message_queue_data,off_heap}]}]). + %% Remote execution and broadcasting facility -spec start() -> {'ok', pid()} | 'ignore' | {'error', term()}. start() -> - gen_server:start({local,?NAME}, ?MODULE, [], []). + gen_server:start({local,?NAME}, ?MODULE, [], ?SPAWN_OPTS). -spec start_link() -> {'ok', pid()} | 'ignore' | {'error', term()}. start_link() -> - gen_server:start_link({local,?NAME}, ?MODULE, [], []). + %% The rex server process may receive a huge amount of + %% messages. Make sure that they are stored off heap to + %% avoid exessive GCs. + gen_server:start_link({local,?NAME}, ?MODULE, [], ?SPAWN_OPTS). -spec stop() -> term(). @@ -95,7 +101,7 @@ stop(Rpc) -> init([]) -> process_flag(trap_exit, true), - {ok, gb_trees:empty()}. + {ok, maps:new()}. -spec handle_call(term(), term(), state()) -> {'noreply', state()} | @@ -134,29 +140,15 @@ handle_cast(_, S) -> -spec handle_info(term(), state()) -> {'noreply', state()}. +handle_info({'DOWN', _, process, Caller, normal}, S) -> + {noreply, maps:remove(Caller, S)}; handle_info({'DOWN', _, process, Caller, Reason}, S) -> - case gb_trees:lookup(Caller, S) of - {value, To} -> - receive - {Caller, {reply, Reply}} -> - gen_server:reply(To, Reply) - after 0 -> - gen_server:reply(To, {badrpc, {'EXIT', Reason}}) - end, - {noreply, gb_trees:delete(Caller, S)}; - none -> - {noreply, S} - end; -handle_info({Caller, {reply, Reply}}, S) -> - case gb_trees:lookup(Caller, S) of - {value, To} -> - receive - {'DOWN', _, process, Caller, _} -> - gen_server:reply(To, Reply), - {noreply, gb_trees:delete(Caller, S)} - end; - none -> - {noreply, S} + case maps:get(Caller, S, undefined) of + undefined -> + {noreply, S}; + {_, _} = To -> + gen_server:reply(To, {badrpc, {'EXIT', Reason}}), + {noreply, maps:remove(Caller, S)} end; handle_info({From, {sbcast, Name, Msg}}, S) -> _ = case catch Name ! Msg of %% use catch to get the printout @@ -194,7 +186,6 @@ code_change(_, S, _) -> %% Auxiliary function to avoid a false dialyzer warning -- do not inline %% handle_call_call(Mod, Fun, Args, Gleader, To, S) -> - RpcServer = self(), %% Spawn not to block the rpc server. {Caller,_} = erlang:spawn_monitor( @@ -209,9 +200,9 @@ handle_call_call(Mod, Fun, Args, Gleader, To, S) -> Result -> Result end, - RpcServer ! {self(), {reply, Reply}} + gen_server:reply(To, Reply) end), - {noreply, gb_trees:insert(Caller, To, S)}. + {noreply, maps:put(Caller, To, S)}. %% RPC aid functions .... @@ -357,8 +348,12 @@ do_call(Node, Request, Timeout) -> rpc_check_t({'EXIT', {timeout,_}}) -> {badrpc, timeout}; rpc_check_t(X) -> rpc_check(X). -rpc_check({'EXIT', {{nodedown,_},_}}) -> {badrpc, nodedown}; -rpc_check({'EXIT', X}) -> exit(X); +rpc_check({'EXIT', {{nodedown,_},_}}) -> + {badrpc, nodedown}; +rpc_check({'EXIT', _}=Exit) -> + %% Should only happen if the rex process on the other node + %% died. + {badrpc, Exit}; rpc_check(X) -> X. @@ -587,27 +582,6 @@ multi_server_call(Nodes, Name, Msg) Monitors = send_nodes(Nodes, Name, Msg, []), rec_nodes(Name, Monitors). -%% Deprecated functions. Were only needed when communicating with R6 nodes. - --spec safe_multi_server_call(Name, Msg) -> {Replies, BadNodes} when - Name :: atom(), - Msg :: term(), - Replies :: [Reply :: term()], - BadNodes :: [node()]. - -safe_multi_server_call(Name, Msg) -> - multi_server_call(Name, Msg). - --spec safe_multi_server_call(Nodes, Name, Msg) -> {Replies, BadNodes} when - Nodes :: [node()], - Name :: atom(), - Msg :: term(), - Replies :: [Reply :: term()], - BadNodes :: [node()]. - -safe_multi_server_call(Nodes, Name, Msg) -> - multi_server_call(Nodes, Name, Msg). - rec_nodes(Name, Nodes) -> rec_nodes(Name, Nodes, [], []). @@ -748,6 +722,11 @@ pinfo(Pid) -> -spec pinfo(Pid, Item) -> {Item, Info} | undefined | [] when Pid :: pid(), Item :: atom(), + Info :: term(); + (Pid, ItemList) -> [{Item, Info}] | undefined | [] when + Pid :: pid(), + Item :: atom(), + ItemList :: [Item], Info :: term(). pinfo(Pid, Item) when node(Pid) =:= node() -> |