diff options
author | Sverker Eriksson <[email protected]> | 2017-07-14 19:34:54 +0200 |
---|---|---|
committer | Sverker Eriksson <[email protected]> | 2017-11-15 20:10:33 +0100 |
commit | f89fb92384280e2939414287a2ecb8f86a199318 (patch) | |
tree | facf1d7b31b56cc25575d6cb9a01a69e2e4317d0 /lib/kernel/src | |
parent | fe720f6b2051c9bf8ff303f857c3db0a84b1c050 (diff) | |
download | otp-f89fb92384280e2939414287a2ecb8f86a199318.tar.gz otp-f89fb92384280e2939414287a2ecb8f86a199318.tar.bz2 otp-f89fb92384280e2939414287a2ecb8f86a199318.zip |
erts: Introduce asynchronous auto-connect
Diffstat (limited to 'lib/kernel/src')
-rw-r--r-- | lib/kernel/src/net_kernel.erl | 210 |
1 files changed, 159 insertions, 51 deletions
diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index c68036a291..f929e4bf11 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -122,6 +122,7 @@ -record(connection, { node, %% remote node name + conn_id, %% Connection identity state, %% pending | up | up_pending owner, %% owner pid pending_owner, %% possible new owner @@ -362,54 +363,33 @@ init({Name, LongOrShortNames, TickT, CleanHalt}) -> end. -handle_connect([Conn], _, _, From, State) when Conn#connection.state =:= up -> - async_reply({reply, true, State}, From); -handle_connect([Conn], _, _, From, State) when Conn#connection.state =:= pending; - Conn#connection.state =:= up_pending -> - Waiting = Conn#connection.waiting, - ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}), - {noreply, State}; -handle_connect(_, Type, Node, From , State) -> - case setup(Node,Type,From,State) of - {ok, SetupPid} -> - Owners = [{SetupPid, Node} | State#state.conn_owners], - {noreply,State#state{conn_owners=Owners}}; - _Error -> - ?connect_failure(Node, {setup_call, failed, _Error}), - async_reply({reply, false, State}, From) - end. - -%% ------------------------------------------------------------ -%% handle_call. -%% ------------------------------------------------------------ - -%% -%% Auto-connect to Node. -%% The response is delayed until the connection is up and -%% running. -%% -handle_call({auto_connect, _, Node, _}, From, State) when Node =:= node() -> - async_reply({reply, true, State}, From); -handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) -> - verbose({auto_connect, Type, Node, WaitForBarred}, 1, State), - +handle_auto_connect(Type, Node, ConnId, WaitForBarred, From, State) -> ConnLookup = ets:lookup(sys_dist, Node), case ConnLookup of [#barred_connection{}] -> case WaitForBarred of false -> - async_reply({reply, false, State}, From); + {reply, false, State}; true -> spawn(?MODULE,passive_connect_monitor,[From,Node]), {noreply, State} end; + [#connection{conn_id=ConnId, state = up}] -> + {reply, true, State}; + [#connection{conn_id=ConnId, waiting=Waiting}=Conn] -> + case From of + noreply -> ok; + _ -> ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}) + end, + {noreply, State}; + _ -> case application:get_env(kernel, dist_auto_connect) of {ok, never} -> ?connect_failure(Node,{dist_auto_connect,never}), - async_reply({reply, false, State}, From); + {reply, false, State}; %% This might happen due to connection close %% not beeing propagated to user space yet. @@ -418,11 +398,78 @@ handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) -> (hd(ConnLookup))#connection.state =:= up -> ?connect_failure(Node,{barred_connection, ets:lookup(sys_dist, Node)}), - async_reply({reply, false, State}, From); + {reply, false, State}; _ -> - handle_connect(ConnLookup, Type, Node, From, State) + case setup(ConnLookup, Node,ConnId,Type,From,State) of + {ok, SetupPid} -> + Owners = [{SetupPid, Node} | State#state.conn_owners], + {noreply,State#state{conn_owners=Owners}}; + _Error -> + ?connect_failure(Node, {setup_call, failed, _Error}), + {reply, false, State} + end end - end; + end. + + +handle_connect([#connection{conn_id = ConnId, state = up}], _, _, ConnId, _From, State) -> + {reply, true, State}; +handle_connect([#connection{conn_id = ConnId}=Conn], _, _, ConnId, From, State) + when Conn#connection.state =:= pending; + Conn#connection.state =:= up_pending -> + Waiting = Conn#connection.waiting, + ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}), + {noreply, State}; +handle_connect([#barred_connection{}], Type, Node, ConnId, From , State) -> + %% Barred connection only affects auto_connect, ignore it. + handle_connect([], Type, Node, ConnId, From , State); +handle_connect(ConnLookup, Type, Node, ConnId, From , State) -> + case setup(ConnLookup, Node,ConnId,Type,From,State) of + {ok, SetupPid} -> + Owners = [{SetupPid, Node} | State#state.conn_owners], + {noreply,State#state{conn_owners=Owners}}; + _Error -> + ?connect_failure(Node, {setup_call, failed, _Error}), + {reply, false, State} + end. + +-define(ERTS_DIST_CON_ID_MASK, 16#ffffff). % also in external.h + +verify_new_conn_id([], ConnId) + when (ConnId band (bnot ?ERTS_DIST_CON_ID_MASK)) =:= 0 -> + true; +verify_new_conn_id([#connection{conn_id = Old}], New) + when New =:= ((Old+1) band ?ERTS_DIST_CON_ID_MASK) -> + true; +verify_new_conn_id(_, _) -> + false. + + + +%% ------------------------------------------------------------ +%% handle_call. +%% ------------------------------------------------------------ + +%% +%% Auto-connect to Node. +%% The response is delayed until the connection is up and running. +%% +handle_call({auto_connect, _, Node, _}, From, State) when Node =:= node() -> + async_reply({reply, true, State}, From); +handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) -> + verbose({auto_connect, Type, Node, WaitForBarred}, 1, State), + + R = case (catch erlang:new_connection_id(Node)) of + ConnId when is_integer(ConnId) -> + handle_auto_connect(Type, Node, ConnId, WaitForBarred, From, State); + + _Error -> + error_logger:error_msg("~n** Cannot get connection id for node ~w~n", + [Node]), + {reply, false, State} + end, + + return_call(R, From); %% %% Explicit connect @@ -433,7 +480,17 @@ handle_call({connect, _, Node, _, _}, From, State) when Node =:= node() -> handle_call({connect, Type, Node}, From, State) -> verbose({connect, Type, Node}, 1, State), ConnLookup = ets:lookup(sys_dist, Node), - handle_connect(ConnLookup, Type, Node, From, State); + R = case (catch erlang:new_connection_id(Node)) of + ConnId when is_integer(ConnId) -> + handle_connect(ConnLookup, Type, Node, ConnId, From, State); + + _Error -> + error_logger:error_msg("~n** Cannot get connection id for node ~w~n", + [Node]), + {reply, false, State} + end, + return_call(R, From); + %% %% Close the connection to Node. @@ -640,6 +697,25 @@ terminate(_Reason, State) -> %% ------------------------------------------------------------ %% +%% Asynchronous auto connect request +%% +handle_info({auto_connect,Node,ConnId}, State) -> + verbose({auto_connect, Node, ConnId}, 1, State), + NewState = + case handle_auto_connect(normal, Node, ConnId, false, noreply, State) of + {noreply, S} -> %% Pending connection + S; + + {reply, true, S} -> %% Already connected + S; + + {reply, false, S} -> %% Connection refused + erlang:abort_connection_id(Node, ConnId), + S + end, + {noreply, NewState}; + +%% %% accept a new connection. %% handle_info({accept,AcceptPid,Socket,Family,Proto}, State) -> @@ -719,7 +795,12 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) -> AcceptPid ! {self(), {accept_pending, already_pending}}, {noreply, State}; _ -> + ConnId = case (catch erlang:new_connection_id(Node)) of + CI when is_integer(CI) -> CI + %% SVERK What to do? + end, ets:insert(sys_dist, #connection{node = Node, + conn_id = ConnId, state = pending, owner = AcceptPid, address = Address, @@ -912,6 +993,7 @@ pending_nodedown(Conn, Node, Type, State) -> % Don't bar connections that have never been alive %mark_sys_dist_nodedown(Node), % - instead just delete the node: + erlang:abort_connection_id(Node, Conn#connection.conn_id), ets:delete(sys_dist, Node), reply_waiting(Node,Conn#connection.waiting, false), case Type of @@ -934,15 +1016,16 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) -> State#state{conn_owners = [{AcceptPid,Node}|Owners], pend_owners = Pend}. -up_nodedown(_Conn, Node, _Reason, Type, State) -> - mark_sys_dist_nodedown(Node), +up_nodedown(Conn, Node, _Reason, Type, State) -> + mark_sys_dist_nodedown(Conn, Node), case Type of normal -> ?nodedown(Node, State); _ -> ok end, State. -mark_sys_dist_nodedown(Node) -> +mark_sys_dist_nodedown(Conn, Node) -> + erlang:abort_connection_id(Node, Conn#connection.conn_id), case application:get_env(kernel, dist_auto_connect) of {ok, once} -> ets:insert(sys_dist, #barred_connection{node = Node}); @@ -1185,15 +1268,8 @@ spawn_func(_,{From,Tag},M,F,A,Gleader) -> %% Set up connection to a new node. %% ----------------------------------------------------------- -setup(Node,Type,From,State) -> - Allowed = State#state.allowed, - case lists:member(Node, Allowed) of - false when Allowed =/= [] -> - error_msg("** Connection attempt with " - "disallowed node ~w ** ~n", [Node]), - {error, bad_node}; - _ -> - case select_mod(Node, State#state.listen) of +setup(ConnLookup, Node,ConnId,Type,From,State) -> + case setup_check(ConnLookup, Node, ConnId, State) of {ok, L} -> Mod = L#listen.module, LAddr = L#listen.address, @@ -1206,18 +1282,45 @@ setup(Node,Type,From,State) -> Addr = LAddr#net_address { address = undefined, host = undefined }, + Waiting = case From of + noreply -> []; + _ -> [From] + end, ets:insert(sys_dist, #connection{node = Node, + conn_id = ConnId, state = pending, owner = Pid, - waiting = [From], + waiting = Waiting, address = Addr, type = normal}), {ok, Pid}; Error -> Error - end end. +setup_check(ConnLookup, Node, ConnId, State) -> + Allowed = State#state.allowed, + case lists:member(Node, Allowed) of + false when Allowed =/= [] -> + error_msg("** Connection attempt with " + "disallowed node ~w ** ~n", [Node]), + {error, bad_node}; + _ -> + case verify_new_conn_id(ConnLookup, ConnId) of + false -> + error_msg("** Connection attempt to ~w with " + "bad connection id ~w ** ~n", [Node, ConnId]), + {error, bad_conn_id}; + true -> + case select_mod(Node, State#state.listen) of + {ok, _L}=OK -> OK; + Error -> Error + end + end + end. + + + %% %% Find a module that is willing to handle connection setup to Node %% @@ -1658,6 +1761,11 @@ verbose(_, _, _) -> getnode(P) when is_pid(P) -> node(P); getnode(P) -> P. +return_call({noreply, _State}=R, _From) -> + R; +return_call(R, From) -> + async_reply(R, From). + async_reply({reply, Msg, State}, From) -> async_gen_server_reply(From, Msg), {noreply, State}. |