diff options
-rw-r--r-- | lib/kernel/src/net_kernel.erl | 200 |
1 files changed, 110 insertions, 90 deletions
diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index 3afaedf274..dec353d6f2 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -1,19 +1,19 @@ %% %% %CopyrightBegin% -%% -%% Copyright Ericsson AB 1996-2009. All Rights Reserved. -%% +%% +%% Copyright Ericsson AB 1996-2010. All Rights Reserved. +%% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. -%% +%% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. -%% +%% %% %CopyrightEnd% %% -module(net_kernel). @@ -72,7 +72,7 @@ -export([publish_on_node/1, update_publish_nodes/1]). -%% Internal Exports +%% Internal Exports -export([do_spawn/3, spawn_func/6, ticker/2, @@ -94,7 +94,7 @@ connecttime, %% the connection setuptime. connections, %% table of connections conn_owners = [], %% List of connection owner pids, - pend_owners = [], %% List of potential owners + pend_owners = [], %% List of potential owners listen, %% list of #listen allowed, %% list of allowed nodes in a restricted system verbose = 0, %% level of verboseness @@ -232,7 +232,7 @@ do_connect(Node, Type, WaitForBarred) -> %% Type = normal | hidden %% "connected from other end.~n",[Node]), true; {Pid, false} -> - ?connect_failure(Node,{barred_connection, + ?connect_failure(Node,{barred_connection, ets:lookup(sys_dist, Node)}), %%io:format("Net Kernel: barred connection (~p) " %% "- failure.~n",[Node]), @@ -244,12 +244,12 @@ do_connect(Node, Type, WaitForBarred) -> %% Type = normal | hidden {ok, never} -> ?connect_failure(Node,{dist_auto_connect,never}), false; - % This might happen due to connection close + % This might happen due to connection close % not beeing propagated to user space yet. - % Save the day by just not connecting... + % Save the day by just not connecting... {ok, once} when Else =/= [], (hd(Else))#connection.state =:= up -> - ?connect_failure(Node,{barred_connection, + ?connect_failure(Node,{barred_connection, ets:lookup(sys_dist, Node)}), false; _ -> @@ -276,8 +276,8 @@ passive_connect_monitor(Parent, Node) -> Parent ! {self(),true} end end. - -%% If the net_kernel isn't running we ignore all requests to the + +%% If the net_kernel isn't running we ignore all requests to the %% kernel, thus basically accepting them :-) request(Req) -> case whereis(net_kernel) of @@ -302,7 +302,7 @@ start_link([Name, LongOrShortNames]) -> start_link([Name, LongOrShortNames, 15000]); start_link([Name, LongOrShortNames, Ticktime]) -> - case gen_server:start_link({local, net_kernel}, net_kernel, + case gen_server:start_link({local, net_kernel}, net_kernel, {Name, LongOrShortNames, Ticktime}, []) of {ok, Pid} -> {ok, Pid}; @@ -313,7 +313,7 @@ start_link([Name, LongOrShortNames, Ticktime]) -> end. %% auth:get_cookie should only be able to return an atom -%% tuple cookies are unknowns +%% tuple cookies are unknowns init({Name, LongOrShortNames, TickT}) -> process_flag(trap_exit,true), @@ -354,13 +354,13 @@ init({Name, LongOrShortNames, TickT}) -> %% The response is delayed until the connection is up and %% running. %% -handle_call({connect, _, Node}, _From, State) when Node =:= node() -> - {reply, true, State}; +handle_call({connect, _, Node}, From, State) when Node =:= node() -> + async_reply({reply, true, State}, From); handle_call({connect, Type, Node}, From, State) -> verbose({connect, Type, Node}, 1, State), case ets:lookup(sys_dist, Node) of [Conn] when Conn#connection.state =:= up -> - {reply, true, State}; + async_reply({reply, true, State}, From); [Conn] when Conn#connection.state =:= pending -> Waiting = Conn#connection.waiting, ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}), @@ -376,74 +376,75 @@ handle_call({connect, Type, Node}, From, State) -> {noreply,State#state{conn_owners=Owners}}; _ -> ?connect_failure(Node, {setup_call, failed}), - {reply, false, State} + async_reply({reply, false, State}, From) end end; %% %% Close the connection to Node. %% -handle_call({disconnect, Node}, _From, State) when Node =:= node() -> - {reply, false, State}; -handle_call({disconnect, Node}, _From, State) -> +handle_call({disconnect, Node}, From, State) when Node =:= node() -> + async_reply({reply, false, State}, From); +handle_call({disconnect, Node}, From, State) -> verbose({disconnect, Node}, 1, State), {Reply, State1} = do_disconnect(Node, State), - {reply, Reply, State1}; + async_reply({reply, Reply, State1}, From); -%% +%% %% The spawn/4 BIF ends up here. -%% +%% handle_call({spawn,M,F,A,Gleader},{From,Tag},State) when is_pid(From) -> do_spawn([no_link,{From,Tag},M,F,A,Gleader],[],State); -%% +%% %% The spawn_link/4 BIF ends up here. -%% +%% handle_call({spawn_link,M,F,A,Gleader},{From,Tag},State) when is_pid(From) -> do_spawn([link,{From,Tag},M,F,A,Gleader],[],State); -%% +%% %% The spawn_opt/5 BIF ends up here. -%% +%% handle_call({spawn_opt,M,F,A,O,L,Gleader},{From,Tag},State) when is_pid(From) -> do_spawn([L,{From,Tag},M,F,A,Gleader],O,State); -%% +%% %% Only allow certain nodes. -%% -handle_call({allow, Nodes}, _From, State) -> +%% +handle_call({allow, Nodes}, From, State) -> case all_atoms(Nodes) of true -> Allowed = State#state.allowed, - {reply,ok,State#state{allowed = Allowed ++ Nodes}}; + async_reply({reply,ok,State#state{allowed = Allowed ++ Nodes}}, + From); false -> - {reply,error,State} + async_reply({reply,error,State}, From) end; -%% +%% %% authentication, used by auth. Simply works as this: %% if the message comes through, the other node IS authorized. -%% -handle_call({is_auth, _Node}, _From, State) -> - {reply,yes,State}; +%% +handle_call({is_auth, _Node}, From, State) -> + async_reply({reply,yes,State}, From); -%% +%% %% Not applicable any longer !? -%% -handle_call({apply,_Mod,_Fun,_Args}, {From,Tag}, State) +%% +handle_call({apply,_Mod,_Fun,_Args}, {From,Tag}, State) when is_pid(From), node(From) =:= node() -> - gen_server:reply({From,Tag}, not_implemented), + async_gen_server_reply({From,Tag}, not_implemented), % Port = State#state.port, % catch apply(Mod,Fun,[Port|Args]), {noreply,State}; -handle_call(longnames, _From, State) -> - {reply, get(longnames), State}; +handle_call(longnames, From, State) -> + async_reply({reply, get(longnames), State}, From); -handle_call({update_publish_nodes, Ns}, _From, State) -> - {reply, ok, State#state{publish_on_nodes = Ns}}; +handle_call({update_publish_nodes, Ns}, From, State) -> + async_reply({reply, ok, State#state{publish_on_nodes = Ns}}, From); -handle_call({publish_on_node, Node}, _From, State) -> +handle_call({publish_on_node, Node}, From, State) -> NewState = case State#state.publish_on_nodes of undefined -> State#state{publish_on_nodes = @@ -457,11 +458,12 @@ handle_call({publish_on_node, Node}, _From, State) -> Nodes -> lists:member(Node, Nodes) end, - {reply, Publish, NewState}; + async_reply({reply, Publish, NewState}, From); -handle_call({verbose, Level}, _From, State) -> - {reply, State#state.verbose, State#state{verbose = Level}}; +handle_call({verbose, Level}, From, State) -> + async_reply({reply, State#state.verbose, State#state{verbose = Level}}, + From); %% %% Set new ticktime @@ -471,16 +473,16 @@ handle_call({verbose, Level}, _From, State) -> %% #tick_change{} record if the ticker process has been upgraded; %% otherwise, an integer or an atom. -handle_call(ticktime, _, #state{tick = #tick{time = T}} = State) -> - {reply, T, State}; -handle_call(ticktime, _, #state{tick = #tick_change{time = T}} = State) -> - {reply, {ongoing_change_to, T}, State}; +handle_call(ticktime, From, #state{tick = #tick{time = T}} = State) -> + async_reply({reply, T, State}, From); +handle_call(ticktime, From, #state{tick = #tick_change{time = T}} = State) -> + async_reply({reply, {ongoing_change_to, T}, State}, From); -handle_call({new_ticktime,T,_TP}, _, #state{tick = #tick{time = T}} = State) -> +handle_call({new_ticktime,T,_TP}, From, #state{tick = #tick{time = T}} = State) -> ?tckr_dbg(no_tick_change), - {reply, unchanged, State}; + async_reply({reply, unchanged, State}, From); -handle_call({new_ticktime,T,TP}, _, #state{tick = #tick{ticker = Tckr, +handle_call({new_ticktime,T,TP}, From, #state{tick = #tick{ticker = Tckr, time = OT}} = State) -> ?tckr_dbg(initiating_tick_change), start_aux_ticker(T, OT, TP), @@ -493,14 +495,15 @@ handle_call({new_ticktime,T,TP}, _, #state{tick = #tick{ticker = Tckr, ?tckr_dbg(shorter_ticktime), shorter end, - {reply, change_initiated, State#state{tick = #tick_change{ticker = Tckr, - time = T, - how = How}}}; + async_reply({reply, change_initiated, + State#state{tick = #tick_change{ticker = Tckr, + time = T, + how = How}}}, From); -handle_call({new_ticktime,_,_}, +handle_call({new_ticktime,From,_}, _, #state{tick = #tick_change{time = T}} = State) -> - {reply, {ongoing_change_to, T}, State}. + async_reply({reply, {ongoing_change_to, T}, State}, From). %% ------------------------------------------------------------ %% handle_cast. @@ -568,7 +571,7 @@ handle_info({accept,AcceptPid,Socket,Family,Proto}, State) -> %% %% A node has successfully been connected. %% -handle_info({SetupPid, {nodeup,Node,Address,Type,Immediate}}, +handle_info({SetupPid, {nodeup,Node,Address,Type,Immediate}}, State) -> case {Immediate, ets:lookup(sys_dist, Node)} of {true, [Conn]} when Conn#connection.state =:= pending, @@ -656,7 +659,7 @@ handle_info({From,registered_send,To,Mess},State) -> send(From,To,Mess), {noreply,State}; -%% badcookies SHOULD not be sent +%% badcookies SHOULD not be sent %% (if someone does erlang:set_cookie(node(),foo) this may be) handle_info({From,badcookie,_To,_Mess}, State) -> error_logger:error_msg("~n** Got OLD cookie from ~w~n", @@ -704,7 +707,7 @@ handle_info(X, State) -> %% 4. The ticker process. %% (5. Garbage pid.) %% -%% The process type function that handled the process throws +%% The process type function that handled the process throws %% the handle_info return value ! %% ----------------------------------------------------------- @@ -994,9 +997,9 @@ ticker(Kernel, Tick) when is_integer(Tick) -> ticker_loop(Kernel, Tick). to_integer(T) when is_integer(T) -> T; -to_integer(T) when is_atom(T) -> +to_integer(T) when is_atom(T) -> list_to_integer(atom_to_list(T)); -to_integer(T) when is_list(T) -> +to_integer(T) when is_list(T) -> list_to_integer(T). ticker_loop(Kernel, Tick) -> @@ -1004,7 +1007,7 @@ ticker_loop(Kernel, Tick) -> {new_ticktime, NewTick} -> ?tckr_dbg({ticker_changed_time, Tick, NewTick}), ?MODULE:ticker_loop(Kernel, NewTick) - after Tick -> + after Tick -> Kernel ! tick, ?MODULE:ticker_loop(Kernel, Tick) end. @@ -1052,7 +1055,7 @@ send(_From,To,Mess) -> -ifdef(UNUSED). safesend(Name,Mess) when is_atom(Name) -> - case whereis(Name) of + case whereis(Name) of undefined -> Mess; P when is_pid(P) -> @@ -1063,11 +1066,12 @@ safesend(Pid, Mess) -> Pid ! Mess. -endif. do_spawn(SpawnFuncArgs, SpawnOpts, State) -> + [_,From|_] = SpawnFuncArgs, case catch spawn_opt(?MODULE, spawn_func, SpawnFuncArgs, SpawnOpts) of - {'EXIT', {Reason,_}} -> - {reply, {'EXIT', {Reason,[]}}, State}; - {'EXIT', Reason} -> - {reply, {'EXIT', {Reason,[]}}, State}; + {'EXIT', {Reason,_}} -> + async_reply({reply, {'EXIT', {Reason,[]}}, State}, From); + {'EXIT', Reason} -> + async_reply({reply, {'EXIT', {Reason,[]}}, State}, From); _ -> {noreply,State} end. @@ -1079,11 +1083,11 @@ do_spawn(SpawnFuncArgs, SpawnOpts, State) -> spawn_func(link,{From,Tag},M,F,A,Gleader) -> link(From), - gen_server:reply({From,Tag},self()), %% ahhh + async_gen_server_reply({From,Tag},self()), %% ahhh group_leader(Gleader,self()), apply(M,F,A); spawn_func(_,{From,Tag},M,F,A,Gleader) -> - gen_server:reply({From,Tag},self()), %% ahhh + async_gen_server_reply({From,Tag},self()), %% ahhh group_leader(Gleader,self()), apply(M,F,A). @@ -1145,7 +1149,7 @@ get_proto_mod(Family,Protocol,[L|Ls]) -> true -> get_proto_mod(Family,Protocol,Ls) end; -get_proto_mod(_Family, _Protocol, []) -> +get_proto_mod(_Family, _Protocol, []) -> error. %% -------- Initialisation functions ------------------------ @@ -1156,9 +1160,9 @@ init_node(Name, LongOrShortNames) -> case create_name(Name, LongOrShortNames, 1) of {ok,Node} -> case start_protos(list_to_atom(NameWithoutHost),Node) of - {ok, Ls} -> + {ok, Ls} -> {ok, Node, Ls}; - Error -> + Error -> Error end; Error -> @@ -1167,9 +1171,9 @@ init_node(Name, LongOrShortNames) -> %% Create the node name create_name(Name, LongOrShortNames, Try) -> - put(longnames, case LongOrShortNames of - shortnames -> false; - longnames -> true + put(longnames, case LongOrShortNames of + shortnames -> false; + longnames -> true end), {Head,Host1} = create_hostpart(Name, LongOrShortNames), case Host1 of @@ -1218,7 +1222,7 @@ create_hostpart(Name, LongOrShortNames) -> {Head,Host1}. %% -%% +%% %% protocol_childspecs() -> case init:get_argument(proto_dist) of @@ -1228,7 +1232,7 @@ protocol_childspecs() -> protocol_childspecs(["inet_tcp"]) end. -protocol_childspecs([]) -> +protocol_childspecs([]) -> []; protocol_childspecs([H|T]) -> Mod = list_to_atom(H ++ "_dist"), @@ -1238,15 +1242,15 @@ protocol_childspecs([H|T]) -> _ -> protocol_childspecs(T) end. - - + + %% %% epmd_module() -> module_name of erl_epmd or similar gen_server_module. %% epmd_module() -> case init:get_argument(epmd_module) of - {ok,[[Module]]} -> + {ok,[[Module]]} -> Module; _ -> erl_epmd @@ -1293,7 +1297,7 @@ start_protos(Name, [Proto | Ps], Node, Ls) -> error_logger:info_msg("Protocol: ~p: not supported~n", [Proto]), start_protos(Name,Ps, Node, Ls); {'EXIT', Reason} -> - error_logger:info_msg("Protocol: ~p: register error: ~p~n", + error_logger:info_msg("Protocol: ~p: register error: ~p~n", [Proto, Reason]), start_protos(Name,Ps, Node, Ls); {error, duplicate_name} -> @@ -1303,7 +1307,7 @@ start_protos(Name, [Proto | Ps], Node, Ls) -> [Proto]), start_protos(Name,Ps, Node, Ls); {error, Reason} -> - error_logger:info_msg("Protocol: ~p: register/listen error: ~p~n", + error_logger:info_msg("Protocol: ~p: register/listen error: ~p~n", [Proto, Reason]), start_protos(Name,Ps, Node, Ls) end; @@ -1409,7 +1413,7 @@ reply_waiting(_Node, Waiting, Rep) -> reply_waiting1(lists:reverse(Waiting), Rep). reply_waiting1([From|W], Rep) -> - gen_server:reply(From, Rep), + async_gen_server_reply(From, Rep), reply_waiting1(W, Rep); reply_waiting1([], _) -> ok. @@ -1455,7 +1459,7 @@ display_info({Node, Info}, {I,O}) -> integer_to_list(In), integer_to_list(Out), Address), {I+In,O+Out}. -fmt_address(undefined) -> +fmt_address(undefined) -> "-"; fmt_address(A) -> case A#net_address.family of @@ -1511,3 +1515,19 @@ verbose(_, _, _) -> getnode(P) when is_pid(P) -> node(P); getnode(P) -> P. + +async_reply({reply, Msg, State}, From) -> + async_gen_server_reply(From, Msg), + {noreply, State}. + +async_gen_server_reply(From, Msg) -> + {Pid, Tag} = From, + M = {Tag, Msg}, + case catch erlang:send(Pid, M, [nosuspend, noconnect]) of + true -> + M; + false -> + spawn(fun() -> gen_server:reply(From, Msg) end); + EXIT -> + EXIT + end. |