aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/kernel/src/net_kernel.erl200
1 files changed, 110 insertions, 90 deletions
diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl
index 3afaedf274..dec353d6f2 100644
--- a/lib/kernel/src/net_kernel.erl
+++ b/lib/kernel/src/net_kernel.erl
@@ -1,19 +1,19 @@
%%
%% %CopyrightBegin%
-%%
-%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
-%%
+%%
+%% Copyright Ericsson AB 1996-2010. All Rights Reserved.
+%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
-%%
+%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
-%%
+%%
%% %CopyrightEnd%
%%
-module(net_kernel).
@@ -72,7 +72,7 @@
-export([publish_on_node/1, update_publish_nodes/1]).
-%% Internal Exports
+%% Internal Exports
-export([do_spawn/3,
spawn_func/6,
ticker/2,
@@ -94,7 +94,7 @@
connecttime, %% the connection setuptime.
connections, %% table of connections
conn_owners = [], %% List of connection owner pids,
- pend_owners = [], %% List of potential owners
+ pend_owners = [], %% List of potential owners
listen, %% list of #listen
allowed, %% list of allowed nodes in a restricted system
verbose = 0, %% level of verboseness
@@ -232,7 +232,7 @@ do_connect(Node, Type, WaitForBarred) -> %% Type = normal | hidden
%% "connected from other end.~n",[Node]),
true;
{Pid, false} ->
- ?connect_failure(Node,{barred_connection,
+ ?connect_failure(Node,{barred_connection,
ets:lookup(sys_dist, Node)}),
%%io:format("Net Kernel: barred connection (~p) "
%% "- failure.~n",[Node]),
@@ -244,12 +244,12 @@ do_connect(Node, Type, WaitForBarred) -> %% Type = normal | hidden
{ok, never} ->
?connect_failure(Node,{dist_auto_connect,never}),
false;
- % This might happen due to connection close
+ % This might happen due to connection close
% not beeing propagated to user space yet.
- % Save the day by just not connecting...
+ % Save the day by just not connecting...
{ok, once} when Else =/= [],
(hd(Else))#connection.state =:= up ->
- ?connect_failure(Node,{barred_connection,
+ ?connect_failure(Node,{barred_connection,
ets:lookup(sys_dist, Node)}),
false;
_ ->
@@ -276,8 +276,8 @@ passive_connect_monitor(Parent, Node) ->
Parent ! {self(),true}
end
end.
-
-%% If the net_kernel isn't running we ignore all requests to the
+
+%% If the net_kernel isn't running we ignore all requests to the
%% kernel, thus basically accepting them :-)
request(Req) ->
case whereis(net_kernel) of
@@ -302,7 +302,7 @@ start_link([Name, LongOrShortNames]) ->
start_link([Name, LongOrShortNames, 15000]);
start_link([Name, LongOrShortNames, Ticktime]) ->
- case gen_server:start_link({local, net_kernel}, net_kernel,
+ case gen_server:start_link({local, net_kernel}, net_kernel,
{Name, LongOrShortNames, Ticktime}, []) of
{ok, Pid} ->
{ok, Pid};
@@ -313,7 +313,7 @@ start_link([Name, LongOrShortNames, Ticktime]) ->
end.
%% auth:get_cookie should only be able to return an atom
-%% tuple cookies are unknowns
+%% tuple cookies are unknowns
init({Name, LongOrShortNames, TickT}) ->
process_flag(trap_exit,true),
@@ -354,13 +354,13 @@ init({Name, LongOrShortNames, TickT}) ->
%% The response is delayed until the connection is up and
%% running.
%%
-handle_call({connect, _, Node}, _From, State) when Node =:= node() ->
- {reply, true, State};
+handle_call({connect, _, Node}, From, State) when Node =:= node() ->
+ async_reply({reply, true, State}, From);
handle_call({connect, Type, Node}, From, State) ->
verbose({connect, Type, Node}, 1, State),
case ets:lookup(sys_dist, Node) of
[Conn] when Conn#connection.state =:= up ->
- {reply, true, State};
+ async_reply({reply, true, State}, From);
[Conn] when Conn#connection.state =:= pending ->
Waiting = Conn#connection.waiting,
ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
@@ -376,74 +376,75 @@ handle_call({connect, Type, Node}, From, State) ->
{noreply,State#state{conn_owners=Owners}};
_ ->
?connect_failure(Node, {setup_call, failed}),
- {reply, false, State}
+ async_reply({reply, false, State}, From)
end
end;
%%
%% Close the connection to Node.
%%
-handle_call({disconnect, Node}, _From, State) when Node =:= node() ->
- {reply, false, State};
-handle_call({disconnect, Node}, _From, State) ->
+handle_call({disconnect, Node}, From, State) when Node =:= node() ->
+ async_reply({reply, false, State}, From);
+handle_call({disconnect, Node}, From, State) ->
verbose({disconnect, Node}, 1, State),
{Reply, State1} = do_disconnect(Node, State),
- {reply, Reply, State1};
+ async_reply({reply, Reply, State1}, From);
-%%
+%%
%% The spawn/4 BIF ends up here.
-%%
+%%
handle_call({spawn,M,F,A,Gleader},{From,Tag},State) when is_pid(From) ->
do_spawn([no_link,{From,Tag},M,F,A,Gleader],[],State);
-%%
+%%
%% The spawn_link/4 BIF ends up here.
-%%
+%%
handle_call({spawn_link,M,F,A,Gleader},{From,Tag},State) when is_pid(From) ->
do_spawn([link,{From,Tag},M,F,A,Gleader],[],State);
-%%
+%%
%% The spawn_opt/5 BIF ends up here.
-%%
+%%
handle_call({spawn_opt,M,F,A,O,L,Gleader},{From,Tag},State) when is_pid(From) ->
do_spawn([L,{From,Tag},M,F,A,Gleader],O,State);
-%%
+%%
%% Only allow certain nodes.
-%%
-handle_call({allow, Nodes}, _From, State) ->
+%%
+handle_call({allow, Nodes}, From, State) ->
case all_atoms(Nodes) of
true ->
Allowed = State#state.allowed,
- {reply,ok,State#state{allowed = Allowed ++ Nodes}};
+ async_reply({reply,ok,State#state{allowed = Allowed ++ Nodes}},
+ From);
false ->
- {reply,error,State}
+ async_reply({reply,error,State}, From)
end;
-%%
+%%
%% authentication, used by auth. Simply works as this:
%% if the message comes through, the other node IS authorized.
-%%
-handle_call({is_auth, _Node}, _From, State) ->
- {reply,yes,State};
+%%
+handle_call({is_auth, _Node}, From, State) ->
+ async_reply({reply,yes,State}, From);
-%%
+%%
%% Not applicable any longer !?
-%%
-handle_call({apply,_Mod,_Fun,_Args}, {From,Tag}, State)
+%%
+handle_call({apply,_Mod,_Fun,_Args}, {From,Tag}, State)
when is_pid(From), node(From) =:= node() ->
- gen_server:reply({From,Tag}, not_implemented),
+ async_gen_server_reply({From,Tag}, not_implemented),
% Port = State#state.port,
% catch apply(Mod,Fun,[Port|Args]),
{noreply,State};
-handle_call(longnames, _From, State) ->
- {reply, get(longnames), State};
+handle_call(longnames, From, State) ->
+ async_reply({reply, get(longnames), State}, From);
-handle_call({update_publish_nodes, Ns}, _From, State) ->
- {reply, ok, State#state{publish_on_nodes = Ns}};
+handle_call({update_publish_nodes, Ns}, From, State) ->
+ async_reply({reply, ok, State#state{publish_on_nodes = Ns}}, From);
-handle_call({publish_on_node, Node}, _From, State) ->
+handle_call({publish_on_node, Node}, From, State) ->
NewState = case State#state.publish_on_nodes of
undefined ->
State#state{publish_on_nodes =
@@ -457,11 +458,12 @@ handle_call({publish_on_node, Node}, _From, State) ->
Nodes ->
lists:member(Node, Nodes)
end,
- {reply, Publish, NewState};
+ async_reply({reply, Publish, NewState}, From);
-handle_call({verbose, Level}, _From, State) ->
- {reply, State#state.verbose, State#state{verbose = Level}};
+handle_call({verbose, Level}, From, State) ->
+ async_reply({reply, State#state.verbose, State#state{verbose = Level}},
+ From);
%%
%% Set new ticktime
@@ -471,16 +473,16 @@ handle_call({verbose, Level}, _From, State) ->
%% #tick_change{} record if the ticker process has been upgraded;
%% otherwise, an integer or an atom.
-handle_call(ticktime, _, #state{tick = #tick{time = T}} = State) ->
- {reply, T, State};
-handle_call(ticktime, _, #state{tick = #tick_change{time = T}} = State) ->
- {reply, {ongoing_change_to, T}, State};
+handle_call(ticktime, From, #state{tick = #tick{time = T}} = State) ->
+ async_reply({reply, T, State}, From);
+handle_call(ticktime, From, #state{tick = #tick_change{time = T}} = State) ->
+ async_reply({reply, {ongoing_change_to, T}, State}, From);
-handle_call({new_ticktime,T,_TP}, _, #state{tick = #tick{time = T}} = State) ->
+handle_call({new_ticktime,T,_TP}, From, #state{tick = #tick{time = T}} = State) ->
?tckr_dbg(no_tick_change),
- {reply, unchanged, State};
+ async_reply({reply, unchanged, State}, From);
-handle_call({new_ticktime,T,TP}, _, #state{tick = #tick{ticker = Tckr,
+handle_call({new_ticktime,T,TP}, From, #state{tick = #tick{ticker = Tckr,
time = OT}} = State) ->
?tckr_dbg(initiating_tick_change),
start_aux_ticker(T, OT, TP),
@@ -493,14 +495,15 @@ handle_call({new_ticktime,T,TP}, _, #state{tick = #tick{ticker = Tckr,
?tckr_dbg(shorter_ticktime),
shorter
end,
- {reply, change_initiated, State#state{tick = #tick_change{ticker = Tckr,
- time = T,
- how = How}}};
+ async_reply({reply, change_initiated,
+ State#state{tick = #tick_change{ticker = Tckr,
+ time = T,
+ how = How}}}, From);
-handle_call({new_ticktime,_,_},
+handle_call({new_ticktime,From,_},
_,
#state{tick = #tick_change{time = T}} = State) ->
- {reply, {ongoing_change_to, T}, State}.
+ async_reply({reply, {ongoing_change_to, T}, State}, From).
%% ------------------------------------------------------------
%% handle_cast.
@@ -568,7 +571,7 @@ handle_info({accept,AcceptPid,Socket,Family,Proto}, State) ->
%%
%% A node has successfully been connected.
%%
-handle_info({SetupPid, {nodeup,Node,Address,Type,Immediate}},
+handle_info({SetupPid, {nodeup,Node,Address,Type,Immediate}},
State) ->
case {Immediate, ets:lookup(sys_dist, Node)} of
{true, [Conn]} when Conn#connection.state =:= pending,
@@ -656,7 +659,7 @@ handle_info({From,registered_send,To,Mess},State) ->
send(From,To,Mess),
{noreply,State};
-%% badcookies SHOULD not be sent
+%% badcookies SHOULD not be sent
%% (if someone does erlang:set_cookie(node(),foo) this may be)
handle_info({From,badcookie,_To,_Mess}, State) ->
error_logger:error_msg("~n** Got OLD cookie from ~w~n",
@@ -704,7 +707,7 @@ handle_info(X, State) ->
%% 4. The ticker process.
%% (5. Garbage pid.)
%%
-%% The process type function that handled the process throws
+%% The process type function that handled the process throws
%% the handle_info return value !
%% -----------------------------------------------------------
@@ -994,9 +997,9 @@ ticker(Kernel, Tick) when is_integer(Tick) ->
ticker_loop(Kernel, Tick).
to_integer(T) when is_integer(T) -> T;
-to_integer(T) when is_atom(T) ->
+to_integer(T) when is_atom(T) ->
list_to_integer(atom_to_list(T));
-to_integer(T) when is_list(T) ->
+to_integer(T) when is_list(T) ->
list_to_integer(T).
ticker_loop(Kernel, Tick) ->
@@ -1004,7 +1007,7 @@ ticker_loop(Kernel, Tick) ->
{new_ticktime, NewTick} ->
?tckr_dbg({ticker_changed_time, Tick, NewTick}),
?MODULE:ticker_loop(Kernel, NewTick)
- after Tick ->
+ after Tick ->
Kernel ! tick,
?MODULE:ticker_loop(Kernel, Tick)
end.
@@ -1052,7 +1055,7 @@ send(_From,To,Mess) ->
-ifdef(UNUSED).
safesend(Name,Mess) when is_atom(Name) ->
- case whereis(Name) of
+ case whereis(Name) of
undefined ->
Mess;
P when is_pid(P) ->
@@ -1063,11 +1066,12 @@ safesend(Pid, Mess) -> Pid ! Mess.
-endif.
do_spawn(SpawnFuncArgs, SpawnOpts, State) ->
+ [_,From|_] = SpawnFuncArgs,
case catch spawn_opt(?MODULE, spawn_func, SpawnFuncArgs, SpawnOpts) of
- {'EXIT', {Reason,_}} ->
- {reply, {'EXIT', {Reason,[]}}, State};
- {'EXIT', Reason} ->
- {reply, {'EXIT', {Reason,[]}}, State};
+ {'EXIT', {Reason,_}} ->
+ async_reply({reply, {'EXIT', {Reason,[]}}, State}, From);
+ {'EXIT', Reason} ->
+ async_reply({reply, {'EXIT', {Reason,[]}}, State}, From);
_ ->
{noreply,State}
end.
@@ -1079,11 +1083,11 @@ do_spawn(SpawnFuncArgs, SpawnOpts, State) ->
spawn_func(link,{From,Tag},M,F,A,Gleader) ->
link(From),
- gen_server:reply({From,Tag},self()), %% ahhh
+ async_gen_server_reply({From,Tag},self()), %% ahhh
group_leader(Gleader,self()),
apply(M,F,A);
spawn_func(_,{From,Tag},M,F,A,Gleader) ->
- gen_server:reply({From,Tag},self()), %% ahhh
+ async_gen_server_reply({From,Tag},self()), %% ahhh
group_leader(Gleader,self()),
apply(M,F,A).
@@ -1145,7 +1149,7 @@ get_proto_mod(Family,Protocol,[L|Ls]) ->
true ->
get_proto_mod(Family,Protocol,Ls)
end;
-get_proto_mod(_Family, _Protocol, []) ->
+get_proto_mod(_Family, _Protocol, []) ->
error.
%% -------- Initialisation functions ------------------------
@@ -1156,9 +1160,9 @@ init_node(Name, LongOrShortNames) ->
case create_name(Name, LongOrShortNames, 1) of
{ok,Node} ->
case start_protos(list_to_atom(NameWithoutHost),Node) of
- {ok, Ls} ->
+ {ok, Ls} ->
{ok, Node, Ls};
- Error ->
+ Error ->
Error
end;
Error ->
@@ -1167,9 +1171,9 @@ init_node(Name, LongOrShortNames) ->
%% Create the node name
create_name(Name, LongOrShortNames, Try) ->
- put(longnames, case LongOrShortNames of
- shortnames -> false;
- longnames -> true
+ put(longnames, case LongOrShortNames of
+ shortnames -> false;
+ longnames -> true
end),
{Head,Host1} = create_hostpart(Name, LongOrShortNames),
case Host1 of
@@ -1218,7 +1222,7 @@ create_hostpart(Name, LongOrShortNames) ->
{Head,Host1}.
%%
-%%
+%%
%%
protocol_childspecs() ->
case init:get_argument(proto_dist) of
@@ -1228,7 +1232,7 @@ protocol_childspecs() ->
protocol_childspecs(["inet_tcp"])
end.
-protocol_childspecs([]) ->
+protocol_childspecs([]) ->
[];
protocol_childspecs([H|T]) ->
Mod = list_to_atom(H ++ "_dist"),
@@ -1238,15 +1242,15 @@ protocol_childspecs([H|T]) ->
_ ->
protocol_childspecs(T)
end.
-
-
+
+
%%
%% epmd_module() -> module_name of erl_epmd or similar gen_server_module.
%%
epmd_module() ->
case init:get_argument(epmd_module) of
- {ok,[[Module]]} ->
+ {ok,[[Module]]} ->
Module;
_ ->
erl_epmd
@@ -1293,7 +1297,7 @@ start_protos(Name, [Proto | Ps], Node, Ls) ->
error_logger:info_msg("Protocol: ~p: not supported~n", [Proto]),
start_protos(Name,Ps, Node, Ls);
{'EXIT', Reason} ->
- error_logger:info_msg("Protocol: ~p: register error: ~p~n",
+ error_logger:info_msg("Protocol: ~p: register error: ~p~n",
[Proto, Reason]),
start_protos(Name,Ps, Node, Ls);
{error, duplicate_name} ->
@@ -1303,7 +1307,7 @@ start_protos(Name, [Proto | Ps], Node, Ls) ->
[Proto]),
start_protos(Name,Ps, Node, Ls);
{error, Reason} ->
- error_logger:info_msg("Protocol: ~p: register/listen error: ~p~n",
+ error_logger:info_msg("Protocol: ~p: register/listen error: ~p~n",
[Proto, Reason]),
start_protos(Name,Ps, Node, Ls)
end;
@@ -1409,7 +1413,7 @@ reply_waiting(_Node, Waiting, Rep) ->
reply_waiting1(lists:reverse(Waiting), Rep).
reply_waiting1([From|W], Rep) ->
- gen_server:reply(From, Rep),
+ async_gen_server_reply(From, Rep),
reply_waiting1(W, Rep);
reply_waiting1([], _) ->
ok.
@@ -1455,7 +1459,7 @@ display_info({Node, Info}, {I,O}) ->
integer_to_list(In), integer_to_list(Out), Address),
{I+In,O+Out}.
-fmt_address(undefined) ->
+fmt_address(undefined) ->
"-";
fmt_address(A) ->
case A#net_address.family of
@@ -1511,3 +1515,19 @@ verbose(_, _, _) ->
getnode(P) when is_pid(P) -> node(P);
getnode(P) -> P.
+
+async_reply({reply, Msg, State}, From) ->
+ async_gen_server_reply(From, Msg),
+ {noreply, State}.
+
+async_gen_server_reply(From, Msg) ->
+ {Pid, Tag} = From,
+ M = {Tag, Msg},
+ case catch erlang:send(Pid, M, [nosuspend, noconnect]) of
+ true ->
+ M;
+ false ->
+ spawn(fun() -> gen_server:reply(From, Msg) end);
+ EXIT ->
+ EXIT
+ end.