aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel/src/net_kernel.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/kernel/src/net_kernel.erl')
-rw-r--r--lib/kernel/src/net_kernel.erl355
1 files changed, 233 insertions, 122 deletions
diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl
index 7da89dd7cb..3cf11fd7b1 100644
--- a/lib/kernel/src/net_kernel.erl
+++ b/lib/kernel/src/net_kernel.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1996-2017. All Rights Reserved.
+%% Copyright Ericsson AB 1996-2018. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -53,7 +53,7 @@
%% Documented API functions.
--export([allow/1,
+-export([allow/1, allowed/0,
connect_node/1,
monitor_nodes/1,
monitor_nodes/2,
@@ -70,8 +70,8 @@
protocol_childspecs/0,
epmd_module/0]).
--export([connect/1, disconnect/1, hidden_connect/1, passive_cnct/1]).
--export([hidden_connect_node/1]). %% explicit connect
+-export([disconnect/1, passive_cnct/1]).
+-export([hidden_connect_node/1]).
-export([set_net_ticktime/1, set_net_ticktime/2, get_net_ticktime/0]).
-export([node_info/1, node_info/2, nodes_info/0,
@@ -122,6 +122,7 @@
-record(connection, {
node, %% remote node name
+ conn_id, %% Connection identity
state, %% pending | up | up_pending
owner, %% owner pid
pending_owner, %% possible new owner
@@ -170,6 +171,8 @@ kernel_apply(M,F,A) -> request({apply,M,F,A}).
Nodes :: [node()].
allow(Nodes) -> request({allow, Nodes}).
+allowed() -> request(allowed).
+
longnames() -> request(longnames).
-spec stop() -> ok | {error, Reason} when
@@ -221,8 +224,7 @@ get_net_ticktime() ->
Error :: error | {error, term()}.
monitor_nodes(Flag) ->
case catch process_flag(monitor_nodes, Flag) of
- true -> ok;
- false -> ok;
+ N when is_integer(N) -> ok;
_ -> mk_monitor_nodes_error(Flag, [])
end.
@@ -235,8 +237,7 @@ monitor_nodes(Flag) ->
Error :: error | {error, term()}.
monitor_nodes(Flag, Opts) ->
case catch process_flag({monitor_nodes, Opts}, Flag) of
- true -> ok;
- false -> ok;
+ N when is_integer(N) -> ok;
_ -> mk_monitor_nodes_error(Flag, Opts)
end.
@@ -247,14 +248,15 @@ ticktime_res(A) when is_atom(A) -> A.
%% Called though BIF's
-connect(Node) -> do_connect(Node, normal, false).
%%% Long timeout if blocked (== barred), only affects nodes with
%%% {dist_auto_connect, once} set.
-passive_cnct(Node) -> do_connect(Node, normal, true).
-disconnect(Node) -> request({disconnect, Node}).
+passive_cnct(Node) ->
+ case request({passive_cnct, Node}) of
+ ignored -> false;
+ Other -> Other
+ end.
-%% connect but not seen
-hidden_connect(Node) -> do_connect(Node, hidden, false).
+disconnect(Node) -> request({disconnect, Node}).
%% Should this node publish itself on Node?
publish_on_node(Node) when is_atom(Node) ->
@@ -272,67 +274,24 @@ connect_node(Node) when is_atom(Node) ->
hidden_connect_node(Node) when is_atom(Node) ->
request({connect, hidden, Node}).
-do_connect(Node, Type, WaitForBarred) -> %% Type = normal | hidden
- case catch ets:lookup(sys_dist, Node) of
- {'EXIT', _} ->
- ?connect_failure(Node,{table_missing, sys_dist}),
- false;
- [#barred_connection{}] ->
- case WaitForBarred of
- false ->
- false;
- true ->
- Pid = spawn(?MODULE,passive_connect_monitor,[self(),Node]),
- receive
- {Pid, true} ->
- %%io:format("Net Kernel: barred connection (~p) "
- %% "connected from other end.~n",[Node]),
- true;
- {Pid, false} ->
- ?connect_failure(Node,{barred_connection,
- ets:lookup(sys_dist, Node)}),
- %%io:format("Net Kernel: barred connection (~p) "
- %% "- failure.~n",[Node]),
- false
- end
- end;
- Else ->
- case application:get_env(kernel, dist_auto_connect) of
- {ok, never} ->
- ?connect_failure(Node,{dist_auto_connect,never}),
- false;
- % This might happen due to connection close
- % not beeing propagated to user space yet.
- % Save the day by just not connecting...
- {ok, once} when Else =/= [],
- (hd(Else))#connection.state =:= up ->
- ?connect_failure(Node,{barred_connection,
- ets:lookup(sys_dist, Node)}),
- false;
- _ ->
- request({connect, Type, Node})
- end
- end.
-passive_connect_monitor(Parent, Node) ->
+passive_connect_monitor(From, Node) ->
ok = monitor_nodes(true,[{node_type,all}]),
- case lists:member(Node,nodes([connected])) of
- true ->
- ok = monitor_nodes(false,[{node_type,all}]),
- Parent ! {self(),true};
- _ ->
- Ref = make_ref(),
- Tref = erlang:send_after(connecttime(),self(),Ref),
- receive
- Ref ->
- ok = monitor_nodes(false,[{node_type,all}]),
- Parent ! {self(), false};
- {nodeup,Node,_} ->
- ok = monitor_nodes(false,[{node_type,all}]),
- _ = erlang:cancel_timer(Tref),
- Parent ! {self(),true}
- end
- end.
+ Reply = case lists:member(Node,nodes([connected])) of
+ true ->
+ true;
+ _ ->
+ receive
+ {nodeup,Node,_} ->
+ true
+ after connecttime() ->
+ false
+ end
+ end,
+ ok = monitor_nodes(false,[{node_type,all}]),
+ {Pid, Tag} = From,
+ erlang:send(Pid, {Tag, Reply}).
+
%% If the net_kernel isn't running we ignore all requests to the
%% kernel, thus basically accepting them :-)
@@ -393,41 +352,140 @@ init({Name, LongOrShortNames, TickT, CleanHalt}) ->
{stop, Error}
end.
+do_auto_connect_1(Node, ConnId, From, State) ->
+ case ets:lookup(sys_dist, Node) of
+ [#barred_connection{}] ->
+ case ConnId of
+ passive_cnct ->
+ spawn(?MODULE,passive_connect_monitor,[From,Node]),
+ {noreply, State};
+ _ ->
+ erts_internal:abort_connection(Node, ConnId),
+ {reply, false, State}
+ end;
+
+ ConnLookup ->
+ do_auto_connect_2(Node, ConnId, From, State, ConnLookup)
+ end.
+
+do_auto_connect_2(Node, passive_cnct, From, State, ConnLookup) ->
+ try erts_internal:new_connection(Node) of
+ ConnId ->
+ do_auto_connect_2(Node, ConnId, From, State, ConnLookup)
+ catch
+ _:_ ->
+ error_logger:error_msg("~n** Cannot get connection id for node ~w~n",
+ [Node]),
+ {reply, false, State}
+ end;
+do_auto_connect_2(Node, ConnId, From, State, ConnLookup) ->
+ case ConnLookup of
+ [#connection{conn_id=ConnId, state = up}] ->
+ {reply, true, State};
+ [#connection{conn_id=ConnId, waiting=Waiting}=Conn] ->
+ case From of
+ noreply -> ok;
+ _ -> ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]})
+ end,
+ {noreply, State};
+
+ _ ->
+ case application:get_env(kernel, dist_auto_connect) of
+ {ok, never} ->
+ ?connect_failure(Node,{dist_auto_connect,never}),
+ erts_internal:abort_connection(Node, ConnId),
+ {reply, false, State};
+
+ %% This might happen due to connection close
+ %% not beeing propagated to user space yet.
+ %% Save the day by just not connecting...
+ {ok, once} when ConnLookup =/= [],
+ (hd(ConnLookup))#connection.state =:= up ->
+ ?connect_failure(Node,{barred_connection,
+ ets:lookup(sys_dist, Node)}),
+ erts_internal:abort_connection(Node, ConnId),
+ {reply, false, State};
+ _ ->
+ case setup(Node, ConnId, normal, From, State) of
+ {ok, SetupPid} ->
+ Owners = [{SetupPid, Node} | State#state.conn_owners],
+ {noreply,State#state{conn_owners=Owners}};
+ _Error ->
+ ?connect_failure(Node, {setup_call, failed, _Error}),
+ erts_internal:abort_connection(Node, ConnId),
+ {reply, false, State}
+ end
+ end
+ end.
+
+
+do_explicit_connect([#connection{conn_id = ConnId, state = up}], _, _, ConnId, _From, State) ->
+ {reply, true, State};
+do_explicit_connect([#connection{conn_id = ConnId}=Conn], _, _, ConnId, From, State)
+ when Conn#connection.state =:= pending;
+ Conn#connection.state =:= up_pending ->
+ Waiting = Conn#connection.waiting,
+ ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
+ {noreply, State};
+do_explicit_connect([#barred_connection{}], Type, Node, ConnId, From , State) ->
+ %% Barred connection only affects auto_connect, ignore it.
+ do_explicit_connect([], Type, Node, ConnId, From , State);
+do_explicit_connect(_ConnLookup, Type, Node, ConnId, From , State) ->
+ case setup(Node,ConnId,Type,From,State) of
+ {ok, SetupPid} ->
+ Owners = [{SetupPid, Node} | State#state.conn_owners],
+ {noreply,State#state{conn_owners=Owners}};
+ _Error ->
+ ?connect_failure(Node, {setup_call, failed, _Error}),
+ {reply, false, State}
+ end.
+
%% ------------------------------------------------------------
%% handle_call.
%% ------------------------------------------------------------
%%
-%% Set up a connection to Node.
-%% The response is delayed until the connection is up and
-%% running.
+%% Passive auto-connect to Node.
+%% The response is delayed until the connection is up and running.
+%%
+handle_call({passive_cnct, Node}, From, State) when Node =:= node() ->
+ async_reply({reply, true, State}, From);
+handle_call({passive_cnct, Node}, From, State) ->
+ verbose({passive_cnct, Node}, 1, State),
+ R = do_auto_connect_1(Node, passive_cnct, From, State),
+ return_call(R, From);
+
+%%
+%% Explicit connect
+%% The response is delayed until the connection is up and running.
%%
handle_call({connect, _, Node}, From, State) when Node =:= node() ->
async_reply({reply, true, State}, From);
handle_call({connect, Type, Node}, From, State) ->
verbose({connect, Type, Node}, 1, State),
- case ets:lookup(sys_dist, Node) of
- [Conn] when Conn#connection.state =:= up ->
- async_reply({reply, true, State}, From);
- [Conn] when Conn#connection.state =:= pending ->
- Waiting = Conn#connection.waiting,
- ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
- {noreply, State};
- [Conn] when Conn#connection.state =:= up_pending ->
- Waiting = Conn#connection.waiting,
- ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
- {noreply, State};
- _ ->
- case setup(Node,Type,From,State) of
- {ok, SetupPid} ->
- Owners = [{SetupPid, Node} | State#state.conn_owners],
- {noreply,State#state{conn_owners=Owners}};
- _ ->
- ?connect_failure(Node, {setup_call, failed}),
- async_reply({reply, false, State}, From)
- end
- end;
+ ConnLookup = ets:lookup(sys_dist, Node),
+ R = try erts_internal:new_connection(Node) of
+ ConnId ->
+ R1 = do_explicit_connect(ConnLookup, Type, Node, ConnId, From, State),
+ case R1 of
+ {reply, true, _S} -> %% already connected
+ ok;
+ {noreply, _S} -> %% connection pending
+ ok;
+ {reply, false, _S} -> %% connection refused
+ erts_internal:abort_connection(Node, ConnId)
+ end,
+ R1
+
+ catch
+ _:_ ->
+ error_logger:error_msg("~n** Cannot get connection id for node ~w~n",
+ [Node]),
+ {reply, false, State}
+ end,
+ return_call(R, From);
+
%%
%% Close the connection to Node.
@@ -470,6 +528,9 @@ handle_call({allow, Nodes}, From, State) ->
async_reply({reply,error,State}, From)
end;
+handle_call(allowed, From, #state{allowed = Allowed} = State) ->
+ async_reply({reply,{ok,Allowed},State}, From);
+
%%
%% authentication, used by auth. Simply works as this:
%% if the message comes through, the other node IS authorized.
@@ -634,6 +695,25 @@ terminate(_Reason, State) ->
%% ------------------------------------------------------------
%%
+%% Asynchronous auto connect request
+%%
+handle_info({auto_connect,Node, DHandle}, State) ->
+ verbose({auto_connect, Node, DHandle}, 1, State),
+ ConnId = DHandle,
+ NewState =
+ case do_auto_connect_1(Node, ConnId, noreply, State) of
+ {noreply, S} -> %% Pending connection
+ S;
+
+ {reply, true, S} -> %% Already connected
+ S;
+
+ {reply, false, S} -> %% Connection refused
+ S
+ end,
+ {noreply, NewState};
+
+%%
%% accept a new connection.
%%
handle_info({accept,AcceptPid,Socket,Family,Proto}, State) ->
@@ -713,14 +793,23 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) ->
AcceptPid ! {self(), {accept_pending, already_pending}},
{noreply, State};
_ ->
- ets:insert(sys_dist, #connection{node = Node,
- state = pending,
- owner = AcceptPid,
- address = Address,
- type = Type}),
- AcceptPid ! {self(),{accept_pending,ok}},
- Owners = [{AcceptPid,Node} | State#state.conn_owners],
- {noreply, State#state{conn_owners = Owners}}
+ try erts_internal:new_connection(Node) of
+ ConnId ->
+ ets:insert(sys_dist, #connection{node = Node,
+ conn_id = ConnId,
+ state = pending,
+ owner = AcceptPid,
+ address = Address,
+ type = Type}),
+ AcceptPid ! {self(),{accept_pending,ok}},
+ Owners = [{AcceptPid,Node} | State#state.conn_owners],
+ {noreply, State#state{conn_owners = Owners}}
+ catch
+ _:_ ->
+ error_logger:error_msg("~n** Cannot get connection id for node ~w~n",
+ [Node]),
+ AcceptPid ! {self(),{accept_pending,nok_pending}}
+ end
end;
handle_info({SetupPid, {is_pending, Node}}, State) ->
@@ -906,6 +995,7 @@ pending_nodedown(Conn, Node, Type, State) ->
% Don't bar connections that have never been alive
%mark_sys_dist_nodedown(Node),
% - instead just delete the node:
+ erts_internal:abort_connection(Node, Conn#connection.conn_id),
ets:delete(sys_dist, Node),
reply_waiting(Node,Conn#connection.waiting, false),
case Type of
@@ -920,7 +1010,9 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) ->
AcceptPid = Conn#connection.pending_owner,
Owners = State#state.conn_owners,
Pend = lists:keydelete(AcceptPid, 1, State#state.pend_owners),
+ erts_internal:abort_connection(Node, Conn#connection.conn_id),
Conn1 = Conn#connection { owner = AcceptPid,
+ conn_id = erts_internal:new_connection(Node),
pending_owner = undefined,
state = pending },
ets:insert(sys_dist, Conn1),
@@ -928,15 +1020,16 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) ->
State#state{conn_owners = [{AcceptPid,Node}|Owners], pend_owners = Pend}.
-up_nodedown(_Conn, Node, _Reason, Type, State) ->
- mark_sys_dist_nodedown(Node),
+up_nodedown(Conn, Node, _Reason, Type, State) ->
+ mark_sys_dist_nodedown(Conn, Node),
case Type of
normal -> ?nodedown(Node, State);
_ -> ok
end,
State.
-mark_sys_dist_nodedown(Node) ->
+mark_sys_dist_nodedown(Conn, Node) ->
+ erts_internal:abort_connection(Node, Conn#connection.conn_id),
case application:get_env(kernel, dist_auto_connect) of
{ok, once} ->
ets:insert(sys_dist, #barred_connection{node = Node});
@@ -1179,15 +1272,8 @@ spawn_func(_,{From,Tag},M,F,A,Gleader) ->
%% Set up connection to a new node.
%% -----------------------------------------------------------
-setup(Node,Type,From,State) ->
- Allowed = State#state.allowed,
- case lists:member(Node, Allowed) of
- false when Allowed =/= [] ->
- error_msg("** Connection attempt with "
- "disallowed node ~w ** ~n", [Node]),
- {error, bad_node};
- _ ->
- case select_mod(Node, State#state.listen) of
+setup(Node, ConnId, Type, From, State) ->
+ case setup_check(Node, State) of
{ok, L} ->
Mod = L#listen.module,
LAddr = L#listen.address,
@@ -1200,18 +1286,38 @@ setup(Node,Type,From,State) ->
Addr = LAddr#net_address {
address = undefined,
host = undefined },
+ Waiting = case From of
+ noreply -> [];
+ _ -> [From]
+ end,
ets:insert(sys_dist, #connection{node = Node,
+ conn_id = ConnId,
state = pending,
owner = Pid,
- waiting = [From],
+ waiting = Waiting,
address = Addr,
type = normal}),
{ok, Pid};
Error ->
Error
- end
end.
+setup_check(Node, State) ->
+ Allowed = State#state.allowed,
+ case lists:member(Node, Allowed) of
+ false when Allowed =/= [] ->
+ error_msg("** Connection attempt with "
+ "disallowed node ~w ** ~n", [Node]),
+ {error, bad_node};
+ _ ->
+ case select_mod(Node, State#state.listen) of
+ {ok, _L}=OK -> OK;
+ Error -> Error
+ end
+ end.
+
+
+
%%
%% Find a module that is willing to handle connection setup to Node
%%
@@ -1652,6 +1758,11 @@ verbose(_, _, _) ->
getnode(P) when is_pid(P) -> node(P);
getnode(P) -> P.
+return_call({noreply, _State}=R, _From) ->
+ R;
+return_call(R, From) ->
+ async_reply(R, From).
+
async_reply({reply, Msg, State}, From) ->
async_gen_server_reply(From, Msg),
{noreply, State}.
@@ -1659,16 +1770,16 @@ async_reply({reply, Msg, State}, From) ->
async_gen_server_reply(From, Msg) ->
{Pid, Tag} = From,
M = {Tag, Msg},
- case catch erlang:send(Pid, M, [nosuspend, noconnect]) of
+ try erlang:send(Pid, M, [nosuspend, noconnect]) of
ok ->
ok;
nosuspend ->
_ = spawn(fun() -> catch erlang:send(Pid, M, [noconnect]) end),
ok;
noconnect ->
- ok; % The gen module takes care of this case.
- {'EXIT', _} ->
- ok
+ ok % The gen module takes care of this case.
+ catch
+ _:_ -> ok
end.
call_owner(Owner, Msg) ->