aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel/src
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2017-07-14 19:34:54 +0200
committerSverker Eriksson <[email protected]>2017-11-15 20:10:33 +0100
commitf89fb92384280e2939414287a2ecb8f86a199318 (patch)
treefacf1d7b31b56cc25575d6cb9a01a69e2e4317d0 /lib/kernel/src
parentfe720f6b2051c9bf8ff303f857c3db0a84b1c050 (diff)
downloadotp-f89fb92384280e2939414287a2ecb8f86a199318.tar.gz
otp-f89fb92384280e2939414287a2ecb8f86a199318.tar.bz2
otp-f89fb92384280e2939414287a2ecb8f86a199318.zip
erts: Introduce asynchronous auto-connect
Diffstat (limited to 'lib/kernel/src')
-rw-r--r--lib/kernel/src/net_kernel.erl210
1 files changed, 159 insertions, 51 deletions
diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl
index c68036a291..f929e4bf11 100644
--- a/lib/kernel/src/net_kernel.erl
+++ b/lib/kernel/src/net_kernel.erl
@@ -122,6 +122,7 @@
-record(connection, {
node, %% remote node name
+ conn_id, %% Connection identity
state, %% pending | up | up_pending
owner, %% owner pid
pending_owner, %% possible new owner
@@ -362,54 +363,33 @@ init({Name, LongOrShortNames, TickT, CleanHalt}) ->
end.
-handle_connect([Conn], _, _, From, State) when Conn#connection.state =:= up ->
- async_reply({reply, true, State}, From);
-handle_connect([Conn], _, _, From, State) when Conn#connection.state =:= pending;
- Conn#connection.state =:= up_pending ->
- Waiting = Conn#connection.waiting,
- ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
- {noreply, State};
-handle_connect(_, Type, Node, From , State) ->
- case setup(Node,Type,From,State) of
- {ok, SetupPid} ->
- Owners = [{SetupPid, Node} | State#state.conn_owners],
- {noreply,State#state{conn_owners=Owners}};
- _Error ->
- ?connect_failure(Node, {setup_call, failed, _Error}),
- async_reply({reply, false, State}, From)
- end.
-
-%% ------------------------------------------------------------
-%% handle_call.
-%% ------------------------------------------------------------
-
-%%
-%% Auto-connect to Node.
-%% The response is delayed until the connection is up and
-%% running.
-%%
-handle_call({auto_connect, _, Node, _}, From, State) when Node =:= node() ->
- async_reply({reply, true, State}, From);
-handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) ->
- verbose({auto_connect, Type, Node, WaitForBarred}, 1, State),
-
+handle_auto_connect(Type, Node, ConnId, WaitForBarred, From, State) ->
ConnLookup = ets:lookup(sys_dist, Node),
case ConnLookup of
[#barred_connection{}] ->
case WaitForBarred of
false ->
- async_reply({reply, false, State}, From);
+ {reply, false, State};
true ->
spawn(?MODULE,passive_connect_monitor,[From,Node]),
{noreply, State}
end;
+ [#connection{conn_id=ConnId, state = up}] ->
+ {reply, true, State};
+ [#connection{conn_id=ConnId, waiting=Waiting}=Conn] ->
+ case From of
+ noreply -> ok;
+ _ -> ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]})
+ end,
+ {noreply, State};
+
_ ->
case application:get_env(kernel, dist_auto_connect) of
{ok, never} ->
?connect_failure(Node,{dist_auto_connect,never}),
- async_reply({reply, false, State}, From);
+ {reply, false, State};
%% This might happen due to connection close
%% not beeing propagated to user space yet.
@@ -418,11 +398,78 @@ handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) ->
(hd(ConnLookup))#connection.state =:= up ->
?connect_failure(Node,{barred_connection,
ets:lookup(sys_dist, Node)}),
- async_reply({reply, false, State}, From);
+ {reply, false, State};
_ ->
- handle_connect(ConnLookup, Type, Node, From, State)
+ case setup(ConnLookup, Node,ConnId,Type,From,State) of
+ {ok, SetupPid} ->
+ Owners = [{SetupPid, Node} | State#state.conn_owners],
+ {noreply,State#state{conn_owners=Owners}};
+ _Error ->
+ ?connect_failure(Node, {setup_call, failed, _Error}),
+ {reply, false, State}
+ end
end
- end;
+ end.
+
+
+handle_connect([#connection{conn_id = ConnId, state = up}], _, _, ConnId, _From, State) ->
+ {reply, true, State};
+handle_connect([#connection{conn_id = ConnId}=Conn], _, _, ConnId, From, State)
+ when Conn#connection.state =:= pending;
+ Conn#connection.state =:= up_pending ->
+ Waiting = Conn#connection.waiting,
+ ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
+ {noreply, State};
+handle_connect([#barred_connection{}], Type, Node, ConnId, From , State) ->
+ %% Barred connection only affects auto_connect, ignore it.
+ handle_connect([], Type, Node, ConnId, From , State);
+handle_connect(ConnLookup, Type, Node, ConnId, From , State) ->
+ case setup(ConnLookup, Node,ConnId,Type,From,State) of
+ {ok, SetupPid} ->
+ Owners = [{SetupPid, Node} | State#state.conn_owners],
+ {noreply,State#state{conn_owners=Owners}};
+ _Error ->
+ ?connect_failure(Node, {setup_call, failed, _Error}),
+ {reply, false, State}
+ end.
+
+-define(ERTS_DIST_CON_ID_MASK, 16#ffffff). % also in external.h
+
+verify_new_conn_id([], ConnId)
+ when (ConnId band (bnot ?ERTS_DIST_CON_ID_MASK)) =:= 0 ->
+ true;
+verify_new_conn_id([#connection{conn_id = Old}], New)
+ when New =:= ((Old+1) band ?ERTS_DIST_CON_ID_MASK) ->
+ true;
+verify_new_conn_id(_, _) ->
+ false.
+
+
+
+%% ------------------------------------------------------------
+%% handle_call.
+%% ------------------------------------------------------------
+
+%%
+%% Auto-connect to Node.
+%% The response is delayed until the connection is up and running.
+%%
+handle_call({auto_connect, _, Node, _}, From, State) when Node =:= node() ->
+ async_reply({reply, true, State}, From);
+handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) ->
+ verbose({auto_connect, Type, Node, WaitForBarred}, 1, State),
+
+ R = case (catch erlang:new_connection_id(Node)) of
+ ConnId when is_integer(ConnId) ->
+ handle_auto_connect(Type, Node, ConnId, WaitForBarred, From, State);
+
+ _Error ->
+ error_logger:error_msg("~n** Cannot get connection id for node ~w~n",
+ [Node]),
+ {reply, false, State}
+ end,
+
+ return_call(R, From);
%%
%% Explicit connect
@@ -433,7 +480,17 @@ handle_call({connect, _, Node, _, _}, From, State) when Node =:= node() ->
handle_call({connect, Type, Node}, From, State) ->
verbose({connect, Type, Node}, 1, State),
ConnLookup = ets:lookup(sys_dist, Node),
- handle_connect(ConnLookup, Type, Node, From, State);
+ R = case (catch erlang:new_connection_id(Node)) of
+ ConnId when is_integer(ConnId) ->
+ handle_connect(ConnLookup, Type, Node, ConnId, From, State);
+
+ _Error ->
+ error_logger:error_msg("~n** Cannot get connection id for node ~w~n",
+ [Node]),
+ {reply, false, State}
+ end,
+ return_call(R, From);
+
%%
%% Close the connection to Node.
@@ -640,6 +697,25 @@ terminate(_Reason, State) ->
%% ------------------------------------------------------------
%%
+%% Asynchronous auto connect request
+%%
+handle_info({auto_connect,Node,ConnId}, State) ->
+ verbose({auto_connect, Node, ConnId}, 1, State),
+ NewState =
+ case handle_auto_connect(normal, Node, ConnId, false, noreply, State) of
+ {noreply, S} -> %% Pending connection
+ S;
+
+ {reply, true, S} -> %% Already connected
+ S;
+
+ {reply, false, S} -> %% Connection refused
+ erlang:abort_connection_id(Node, ConnId),
+ S
+ end,
+ {noreply, NewState};
+
+%%
%% accept a new connection.
%%
handle_info({accept,AcceptPid,Socket,Family,Proto}, State) ->
@@ -719,7 +795,12 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) ->
AcceptPid ! {self(), {accept_pending, already_pending}},
{noreply, State};
_ ->
+ ConnId = case (catch erlang:new_connection_id(Node)) of
+ CI when is_integer(CI) -> CI
+ %% SVERK What to do?
+ end,
ets:insert(sys_dist, #connection{node = Node,
+ conn_id = ConnId,
state = pending,
owner = AcceptPid,
address = Address,
@@ -912,6 +993,7 @@ pending_nodedown(Conn, Node, Type, State) ->
% Don't bar connections that have never been alive
%mark_sys_dist_nodedown(Node),
% - instead just delete the node:
+ erlang:abort_connection_id(Node, Conn#connection.conn_id),
ets:delete(sys_dist, Node),
reply_waiting(Node,Conn#connection.waiting, false),
case Type of
@@ -934,15 +1016,16 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) ->
State#state{conn_owners = [{AcceptPid,Node}|Owners], pend_owners = Pend}.
-up_nodedown(_Conn, Node, _Reason, Type, State) ->
- mark_sys_dist_nodedown(Node),
+up_nodedown(Conn, Node, _Reason, Type, State) ->
+ mark_sys_dist_nodedown(Conn, Node),
case Type of
normal -> ?nodedown(Node, State);
_ -> ok
end,
State.
-mark_sys_dist_nodedown(Node) ->
+mark_sys_dist_nodedown(Conn, Node) ->
+ erlang:abort_connection_id(Node, Conn#connection.conn_id),
case application:get_env(kernel, dist_auto_connect) of
{ok, once} ->
ets:insert(sys_dist, #barred_connection{node = Node});
@@ -1185,15 +1268,8 @@ spawn_func(_,{From,Tag},M,F,A,Gleader) ->
%% Set up connection to a new node.
%% -----------------------------------------------------------
-setup(Node,Type,From,State) ->
- Allowed = State#state.allowed,
- case lists:member(Node, Allowed) of
- false when Allowed =/= [] ->
- error_msg("** Connection attempt with "
- "disallowed node ~w ** ~n", [Node]),
- {error, bad_node};
- _ ->
- case select_mod(Node, State#state.listen) of
+setup(ConnLookup, Node,ConnId,Type,From,State) ->
+ case setup_check(ConnLookup, Node, ConnId, State) of
{ok, L} ->
Mod = L#listen.module,
LAddr = L#listen.address,
@@ -1206,18 +1282,45 @@ setup(Node,Type,From,State) ->
Addr = LAddr#net_address {
address = undefined,
host = undefined },
+ Waiting = case From of
+ noreply -> [];
+ _ -> [From]
+ end,
ets:insert(sys_dist, #connection{node = Node,
+ conn_id = ConnId,
state = pending,
owner = Pid,
- waiting = [From],
+ waiting = Waiting,
address = Addr,
type = normal}),
{ok, Pid};
Error ->
Error
- end
end.
+setup_check(ConnLookup, Node, ConnId, State) ->
+ Allowed = State#state.allowed,
+ case lists:member(Node, Allowed) of
+ false when Allowed =/= [] ->
+ error_msg("** Connection attempt with "
+ "disallowed node ~w ** ~n", [Node]),
+ {error, bad_node};
+ _ ->
+ case verify_new_conn_id(ConnLookup, ConnId) of
+ false ->
+ error_msg("** Connection attempt to ~w with "
+ "bad connection id ~w ** ~n", [Node, ConnId]),
+ {error, bad_conn_id};
+ true ->
+ case select_mod(Node, State#state.listen) of
+ {ok, _L}=OK -> OK;
+ Error -> Error
+ end
+ end
+ end.
+
+
+
%%
%% Find a module that is willing to handle connection setup to Node
%%
@@ -1658,6 +1761,11 @@ verbose(_, _, _) ->
getnode(P) when is_pid(P) -> node(P);
getnode(P) -> P.
+return_call({noreply, _State}=R, _From) ->
+ R;
+return_call(R, From) ->
+ async_reply(R, From).
+
async_reply({reply, Msg, State}, From) ->
async_gen_server_reply(From, Msg),
{noreply, State}.