diff options
Diffstat (limited to 'lib/kernel/src/net_kernel.erl')
| -rw-r--r-- | lib/kernel/src/net_kernel.erl | 344 | 
1 files changed, 231 insertions, 113 deletions
| diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index f36b4f1e6a..cdb10a7b12 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -70,8 +70,8 @@  	 protocol_childspecs/0,  	 epmd_module/0]). --export([connect/1, disconnect/1, hidden_connect/1, passive_cnct/1]). --export([hidden_connect_node/1]). %% explicit connect +-export([disconnect/1, passive_cnct/1]). +-export([hidden_connect_node/1]).  -export([set_net_ticktime/1, set_net_ticktime/2, get_net_ticktime/0]).  -export([node_info/1, node_info/2, nodes_info/0, @@ -122,6 +122,7 @@  -record(connection, {  		     node,          %% remote node name +                     conn_id,       %% Connection identity  		     state,         %% pending | up | up_pending  		     owner,         %% owner pid  	             pending_owner, %% possible new owner @@ -247,14 +248,15 @@ ticktime_res(A)      when is_atom(A)             -> A.  %% Called though BIF's -connect(Node) ->               do_connect(Node, normal, false).  %%% Long timeout if blocked (== barred), only affects nodes with  %%% {dist_auto_connect, once} set. -passive_cnct(Node) ->              do_connect(Node, normal, true). -disconnect(Node) ->            request({disconnect, Node}). +passive_cnct(Node) -> +    case request({passive_cnct, Node}) of +        ignored -> false; +        Other -> Other +    end. -%% connect but not seen -hidden_connect(Node) ->        do_connect(Node, hidden, false). +disconnect(Node) ->            request({disconnect, Node}).  %% Should this node publish itself on Node?  publish_on_node(Node) when is_atom(Node) -> @@ -272,67 +274,30 @@ connect_node(Node) when is_atom(Node) ->  hidden_connect_node(Node) when is_atom(Node) ->      request({connect, hidden, Node}). -do_connect(Node, Type, WaitForBarred) -> %% Type = normal | hidden -    case catch ets:lookup(sys_dist, Node) of -	{'EXIT', _} -> -	    ?connect_failure(Node,{table_missing, sys_dist}), -	    false; -	[#barred_connection{}] -> -	    case WaitForBarred of -		false -> -		    false; -		true -> -		    Pid = spawn(?MODULE,passive_connect_monitor,[self(),Node]), -		    receive -			{Pid, true} -> -			    %%io:format("Net Kernel: barred connection (~p) " -			    %%          "connected from other end.~n",[Node]), -			    true; -			{Pid, false} -> -			    ?connect_failure(Node,{barred_connection, -						   ets:lookup(sys_dist, Node)}), -			    %%io:format("Net Kernel: barred connection (~p) " -			    %%      "- failure.~n",[Node]), -			    false -		    end -	    end; -	Else -> -	    case application:get_env(kernel, dist_auto_connect) of -		{ok, never} -> -		    ?connect_failure(Node,{dist_auto_connect,never}), -		    false; -		% This might happen due to connection close -		% not beeing propagated to user space yet. -		% Save the day by just not connecting... -		{ok, once} when Else =/= [], -				(hd(Else))#connection.state =:= up -> -		    ?connect_failure(Node,{barred_connection, -				ets:lookup(sys_dist, Node)}), -		    false; -		_ -> -		    request({connect, Type, Node}) -	    end -    end. -passive_connect_monitor(Parent, Node) -> +passive_connect_monitor(From, Node) ->      ok = monitor_nodes(true,[{node_type,all}]), -    case lists:member(Node,nodes([connected])) of -	true -> -	    ok = monitor_nodes(false,[{node_type,all}]), -	    Parent ! {self(),true}; -	_ -> -	    Ref = make_ref(), -	    Tref = erlang:send_after(connecttime(),self(),Ref), -	    receive -		Ref -> -		    ok = monitor_nodes(false,[{node_type,all}]), -		    Parent ! {self(), false}; -		{nodeup,Node,_} -> -		    ok = monitor_nodes(false,[{node_type,all}]), -		    _ = erlang:cancel_timer(Tref), -		    Parent ! {self(),true} -	    end -    end. +    Reply = case lists:member(Node,nodes([connected])) of +                true -> +                    io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]), +                    true; +                _ -> +                    receive +                        {nodeup,Node,_} -> +                            io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]), +                            true +                    after connecttime() -> +                            io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]), +                            false +                    end +            end, +    ok = monitor_nodes(false,[{node_type,all}]), +    io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]), +    {Pid, Tag} = From, +    io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]), +    erlang:send(Pid, {Tag, Reply}), +    io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]). +  %% If the net_kernel isn't running we ignore all requests to the  %% kernel, thus basically accepting them :-) @@ -394,40 +359,135 @@ init({Name, LongOrShortNames, TickT, CleanHalt}) ->      end. +do_auto_connect(Type, Node, ConnId, WaitForBarred, From, State) -> +    ConnLookup = ets:lookup(sys_dist, Node), + +    case ConnLookup of +        [#barred_connection{}] -> +            case WaitForBarred of +                false -> +                    {reply, false, State}; +                true -> +                    spawn(?MODULE,passive_connect_monitor,[From,Node]), +                    {noreply, State} +            end; + +        [#connection{conn_id=ConnId, state = up}] -> +            {reply, true, State}; +        [#connection{conn_id=ConnId, waiting=Waiting}=Conn] -> +            case From of +                noreply -> ok; +                _ -> ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}) +            end, +            {noreply, State}; + +        _ -> +            case application:get_env(kernel, dist_auto_connect) of +                {ok, never} -> +                    ?connect_failure(Node,{dist_auto_connect,never}), +                    {reply, false, State}; + +                %% This might happen due to connection close +                %% not beeing propagated to user space yet. +                %% Save the day by just not connecting... +                {ok, once} when ConnLookup =/= [], +                                (hd(ConnLookup))#connection.state =:= up -> +                    ?connect_failure(Node,{barred_connection, +                                           ets:lookup(sys_dist, Node)}), +                    {reply, false, State}; +                _ -> +                    case setup(ConnLookup, Node,ConnId,Type,From,State) of +                        {ok, SetupPid} -> +                            Owners = [{SetupPid, Node} | State#state.conn_owners], +                            {noreply,State#state{conn_owners=Owners}}; +                        _Error  -> +                            ?connect_failure(Node, {setup_call, failed, _Error}), +                            {reply, false, State} +                    end +            end +    end. + + +do_explicit_connect([#connection{conn_id = ConnId, state = up}], _, _, ConnId, _From, State) -> +    {reply, true, State}; +do_explicit_connect([#connection{conn_id = ConnId}=Conn], _, _, ConnId, From, State) +  when Conn#connection.state =:= pending; +       Conn#connection.state =:= up_pending -> +    Waiting = Conn#connection.waiting, +    ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),     +    {noreply, State}; +do_explicit_connect([#barred_connection{}], Type, Node, ConnId, From , State) -> +    %% Barred connection only affects auto_connect, ignore it. +    do_explicit_connect([], Type, Node, ConnId, From , State); +do_explicit_connect(ConnLookup, Type, Node, ConnId, From , State) -> +    case setup(ConnLookup, Node,ConnId,Type,From,State) of +        {ok, SetupPid} -> +            Owners = [{SetupPid, Node} | State#state.conn_owners], +            {noreply,State#state{conn_owners=Owners}}; +        _Error -> +            ?connect_failure(Node, {setup_call, failed, _Error}), +            {reply, false, State} +    end. + +-define(ERTS_DIST_CON_ID_MASK, 16#ffffff).  % also in external.h + +verify_new_conn_id([], {Nr,_DHandle}) +  when (Nr band (bnot ?ERTS_DIST_CON_ID_MASK)) =:= 0 -> +    true; +verify_new_conn_id([#connection{conn_id = {Old,_}}], {New,_}) +  when New =:= ((Old+1) band ?ERTS_DIST_CON_ID_MASK) -> +    true; +verify_new_conn_id(_, _) -> +    false. +                                                               + +  %% ------------------------------------------------------------  %% handle_call.  %% ------------------------------------------------------------  %% -%% Set up a connection to Node. -%% The response is delayed until the connection is up and -%% running. +%% Passive auto-connect to Node. +%% The response is delayed until the connection is up and running.  %% -handle_call({connect, _, Node}, From, State) when Node =:= node() -> +handle_call({passive_cnct, Node}, From, State) when Node =:= node() -> +    async_reply({reply, true, State}, From); +handle_call({passive_cnct, Node}, From, State) -> +    verbose({passive_cnct, Node}, 1, State), +    Type = normal, +    WaitForBarred = true, +    R = case (catch erts_internal:new_connection(Node)) of +            {Nr,_DHandle}=ConnId when is_integer(Nr) -> +                do_auto_connect(Type, Node, ConnId, WaitForBarred, From, State); + +            _Error -> +                error_logger:error_msg("~n** Cannot get connection id for node ~w~n", +                                       [Node]), +                {reply, false, State} +        end, + +    return_call(R, From); + +%% +%% Explicit connect +%% The response is delayed until the connection is up and running. +%% +handle_call({connect, _, Node, _, _}, From, State) when Node =:= node() ->      async_reply({reply, true, State}, From);  handle_call({connect, Type, Node}, From, State) ->      verbose({connect, Type, Node}, 1, State), -    case ets:lookup(sys_dist, Node) of -	[Conn] when Conn#connection.state =:= up -> -	    async_reply({reply, true, State}, From); -	[Conn] when Conn#connection.state =:= pending -> -	    Waiting = Conn#connection.waiting, -	    ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}), -	    {noreply, State}; -	[Conn] when Conn#connection.state =:= up_pending -> -	    Waiting = Conn#connection.waiting, -	    ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}), -	    {noreply, State}; -	_ -> -	    case setup(Node,Type,From,State) of -		{ok, SetupPid} -> -		    Owners = [{SetupPid, Node} | State#state.conn_owners], -		    {noreply,State#state{conn_owners=Owners}}; -		_Error  -> -		    ?connect_failure(Node, {setup_call, failed, _Error}), -		    async_reply({reply, false, State}, From) -	    end -    end; +    ConnLookup = ets:lookup(sys_dist, Node), +    R = case (catch erts_internal:new_connection(Node)) of +            {Nr,_DHandle}=ConnId when is_integer(Nr) -> +                do_explicit_connect(ConnLookup, Type, Node, ConnId, From, State); +                     +            _Error -> +                error_logger:error_msg("~n** Cannot get connection id for node ~w~n", +                                       [Node]), +                {reply, false, State}                     +        end, +    return_call(R, From); +              %%  %% Close the connection to Node. @@ -634,6 +694,26 @@ terminate(_Reason, State) ->  %% ------------------------------------------------------------  %% +%% Asynchronous auto connect request +%% +handle_info({auto_connect,Node, Nr, DHandle}, State) -> +    verbose({auto_connect, Node, Nr, DHandle}, 1, State), +    ConnId = {Nr, DHandle}, +    NewState = +        case do_auto_connect(normal, Node, ConnId, false, noreply, State) of +            {noreply, S} ->           %% Pending connection +                S; + +            {reply, true, S} ->  %% Already connected +                S; + +            {reply, false, S} -> %% Connection refused +                erts_internal:abort_connection(Node, ConnId), +                S +        end, +    {noreply, NewState}; + +%%  %% accept a new connection.  %%  handle_info({accept,AcceptPid,Socket,Family,Proto}, State) -> @@ -713,14 +793,23 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) ->  	    AcceptPid ! {self(), {accept_pending, already_pending}},  	    {noreply, State};  	_ -> -	    ets:insert(sys_dist, #connection{node = Node, -					     state = pending, -					     owner = AcceptPid, -					     address = Address, -					     type = Type}), -	    AcceptPid ! {self(),{accept_pending,ok}}, -	    Owners = [{AcceptPid,Node} | State#state.conn_owners], -	    {noreply, State#state{conn_owners = Owners}} +            case (catch erts_internal:new_connection(Node)) of +                {Nr,_DHandle}=ConnId when is_integer(Nr) -> +                    ets:insert(sys_dist, #connection{node = Node, +                                                     conn_id = ConnId, +                                                     state = pending, +                                                     owner = AcceptPid, +                                                     address = Address, +                                                     type = Type}), +                    AcceptPid ! {self(),{accept_pending,ok}}, +                    Owners = [{AcceptPid,Node} | State#state.conn_owners], +                    {noreply, State#state{conn_owners = Owners}}; + +                _ -> +                    error_logger:error_msg("~n** Cannot get connection id for node ~w~n", +                                           [Node]), +                    AcceptPid ! {self(),{accept_pending,nok_pending}} +            end      end;  handle_info({SetupPid, {is_pending, Node}}, State) -> @@ -906,6 +995,7 @@ pending_nodedown(Conn, Node, Type, State) ->      % Don't bar connections that have never been alive      %mark_sys_dist_nodedown(Node),      % - instead just delete the node: +    erts_internal:abort_connection(Node, Conn#connection.conn_id),      ets:delete(sys_dist, Node),      reply_waiting(Node,Conn#connection.waiting, false),      case Type of @@ -920,7 +1010,9 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) ->      AcceptPid = Conn#connection.pending_owner,      Owners = State#state.conn_owners,      Pend = lists:keydelete(AcceptPid, 1, State#state.pend_owners), +    erts_internal:abort_connection(Node, Conn#connection.conn_id),      Conn1 = Conn#connection { owner = AcceptPid, +                              conn_id = erts_internal:new_connection(Node),  			      pending_owner = undefined,  			      state = pending },      ets:insert(sys_dist, Conn1), @@ -928,15 +1020,16 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) ->      State#state{conn_owners = [{AcceptPid,Node}|Owners], pend_owners = Pend}. -up_nodedown(_Conn, Node, _Reason, Type, State) -> -    mark_sys_dist_nodedown(Node), +up_nodedown(Conn, Node, _Reason, Type, State) -> +    mark_sys_dist_nodedown(Conn, Node),      case Type of  	normal -> ?nodedown(Node, State);  	_ -> ok      end,      State. -mark_sys_dist_nodedown(Node) -> +mark_sys_dist_nodedown(Conn, Node) -> +    erts_internal:abort_connection(Node, Conn#connection.conn_id),      case application:get_env(kernel, dist_auto_connect) of  	{ok, once} ->  	    ets:insert(sys_dist, #barred_connection{node = Node}); @@ -1179,15 +1272,8 @@ spawn_func(_,{From,Tag},M,F,A,Gleader) ->  %% Set up connection to a new node.  %% ----------------------------------------------------------- -setup(Node,Type,From,State) -> -    Allowed = State#state.allowed, -    case lists:member(Node, Allowed) of -	false when Allowed =/= [] -> -	    error_msg("** Connection attempt with " -		      "disallowed node ~w ** ~n", [Node]), -	    {error, bad_node}; -	_ -> -	    case select_mod(Node, State#state.listen) of +setup(ConnLookup, Node,ConnId,Type,From,State) -> +    case setup_check(ConnLookup, Node, ConnId, State) of  		{ok, L} ->  		    Mod = L#listen.module,  		    LAddr = L#listen.address, @@ -1200,18 +1286,45 @@ setup(Node,Type,From,State) ->  		    Addr = LAddr#net_address {  					      address = undefined,  					      host = undefined }, +                    Waiting = case From of +                                  noreply -> []; +                                  _ -> [From] +                              end,  		    ets:insert(sys_dist, #connection{node = Node, +                                                     conn_id = ConnId,  						     state = pending,  						     owner = Pid, -						     waiting = [From], +						     waiting = Waiting,  						     address = Addr,  						     type = normal}),  		    {ok, Pid};  		Error ->  		    Error -	    end      end. +setup_check(ConnLookup, Node, ConnId, State) -> +    Allowed = State#state.allowed,     +    case lists:member(Node, Allowed) of +	false when Allowed =/= [] -> +	    error_msg("** Connection attempt with " +		      "disallowed node ~w ** ~n", [Node]), +	    {error, bad_node}; +       _ -> +            case verify_new_conn_id(ConnLookup, ConnId) of +                false -> +                    error_msg("** Connection attempt to ~w with " +                              "bad connection id ~w ** ~n", [Node, ConnId]), +                    {error, bad_conn_id}; +                true -> +                    case select_mod(Node, State#state.listen) of +                        {ok, _L}=OK -> OK; +                        Error -> Error +                    end +            end +    end.                     + +     +  %%  %% Find a module that is willing to handle connection setup to Node  %% @@ -1652,6 +1765,11 @@ verbose(_, _, _) ->  getnode(P) when is_pid(P) -> node(P);  getnode(P) -> P. +return_call({noreply, _State}=R, _From) -> +    R; +return_call(R, From) -> +    async_reply(R, From). +  async_reply({reply, Msg, State}, From) ->      async_gen_server_reply(From, Msg),      {noreply, State}. | 
