diff options
Diffstat (limited to 'lib/kernel')
-rw-r--r-- | lib/kernel/examples/Makefile | 2 | ||||
-rw-r--r-- | lib/kernel/examples/gen_tcp_dist/Makefile | 20 | ||||
-rw-r--r-- | lib/kernel/examples/gen_tcp_dist/ebin/.gitignore | 0 | ||||
-rw-r--r-- | lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl | 781 | ||||
-rw-r--r-- | lib/kernel/include/dist.hrl | 30 | ||||
-rw-r--r-- | lib/kernel/include/dist_util.hrl | 12 | ||||
-rw-r--r-- | lib/kernel/src/dist_util.erl | 252 | ||||
-rw-r--r-- | lib/kernel/src/net_kernel.erl | 4 |
8 files changed, 1023 insertions, 78 deletions
diff --git a/lib/kernel/examples/Makefile b/lib/kernel/examples/Makefile index 26ec58f571..f86e662838 100644 --- a/lib/kernel/examples/Makefile +++ b/lib/kernel/examples/Makefile @@ -45,7 +45,7 @@ RELSYSDIR = $(RELEASE_PATH)/lib/kernel-$(KERNEL_VSN)/examples # Pack and install the complete directory structure from # here (CWD) and down, for all examples. -EXAMPLES = uds_dist +EXAMPLES = uds_dist gen_tcp_dist release_spec: $(INSTALL_DIR) "$(RELSYSDIR)" diff --git a/lib/kernel/examples/gen_tcp_dist/Makefile b/lib/kernel/examples/gen_tcp_dist/Makefile new file mode 100644 index 0000000000..65513a1729 --- /dev/null +++ b/lib/kernel/examples/gen_tcp_dist/Makefile @@ -0,0 +1,20 @@ +RM=rm -f +CP=cp +EBIN=ebin +ERLC=erlc +# Works if building in open source source tree +KERNEL_INCLUDE=$(ERL_TOP)/lib/kernel/include +ERLCFLAGS+= -W -I$(KERNEL_INCLUDE) + +MODULES=gen_tcp_dist + +TARGET_FILES=$(MODULES:%=$(EBIN)/%.beam) + +opt: $(TARGET_FILES) + +$(EBIN)/%.beam: src/%.erl + $(ERLC) $(ERLCFLAGS) -o$(EBIN) $< + +clean: + $(RM) $(TARGET_FILES) + diff --git a/lib/kernel/examples/gen_tcp_dist/ebin/.gitignore b/lib/kernel/examples/gen_tcp_dist/ebin/.gitignore new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/lib/kernel/examples/gen_tcp_dist/ebin/.gitignore diff --git a/lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl b/lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl new file mode 100644 index 0000000000..98554ed805 --- /dev/null +++ b/lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl @@ -0,0 +1,781 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2017. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% +-module(gen_tcp_dist). + +%% +%% This is an example of how to plug in an arbitrary distribution +%% carrier for Erlang using distribution processes. +%% +%% This example uses gen_tcp for transportation of data, but +%% you can use whatever underlying protocol you want as long +%% as your implementation reliably delivers data chunks to the +%% receiving VM in the order they were sent from the sending +%% VM. +%% +%% This code is a rewrite of the lib/kernel/src/inet_tcp_dist.erl +%% distribution impementation for TCP used by default. That +%% implementation use distribution ports instead of distribution +%% processes and is more efficient compared to this implementation. +%% This since this implementation more or less gets the +%% distribution processes in between the VM and the ports without +%% any gain specific gain. +%% + +-export([listen/1, accept/1, accept_connection/5, + setup/5, close/1, select/1, is_node_name/1]). + +%% Optional +-export([setopts/2, getopts/2]). + +%% internal exports + +-export([dist_cntrlr_setup/1, dist_cntrlr_input_setup/3, + dist_cntrlr_tick_handler/1]). + +-export([accept_loop/2,do_accept/6,do_setup/6]). + +-import(error_logger,[error_msg/2]). + +-include("net_address.hrl"). + +-include("dist.hrl"). +-include("dist_util.hrl"). + +%% ------------------------------------------------------------ +%% Select this protocol based on node name +%% select(Node) => Bool +%% ------------------------------------------------------------ + +select(Node) -> + case split_node(atom_to_list(Node), $@, []) of + [_, Host] -> + case inet:getaddr(Host, inet) of + {ok,_} -> true; + _ -> false + end; + _ -> false + end. + +%% ------------------------------------------------------------ +%% Create the listen socket, i.e. the port that this erlang +%% node is accessible through. +%% ------------------------------------------------------------ + +listen(Name) -> + case do_listen([binary, {active, false}, {packet,2}, {reuseaddr, true}]) of + {ok, Socket} -> + TcpAddress = get_tcp_address(Socket), + {_,Port} = TcpAddress#net_address.address, + ErlEpmd = net_kernel:epmd_module(), + case ErlEpmd:register_node(Name, Port) of + {ok, Creation} -> + {ok, {Socket, TcpAddress, Creation}}; + Error -> + Error + end; + Error -> + Error + end. + +do_listen(Options) -> + {First,Last} = case application:get_env(kernel,inet_dist_listen_min) of + {ok,N} when is_integer(N) -> + case application:get_env(kernel, + inet_dist_listen_max) of + {ok,M} when is_integer(M) -> + {N,M}; + _ -> + {N,N} + end; + _ -> + {0,0} + end, + do_listen(First, Last, listen_options([{backlog,128}|Options])). + +do_listen(First,Last,_) when First > Last -> + {error,eaddrinuse}; +do_listen(First,Last,Options) -> + case gen_tcp:listen(First, Options) of + {error, eaddrinuse} -> + do_listen(First+1,Last,Options); + Other -> + Other + end. + +listen_options(Opts0) -> + Opts1 = + case application:get_env(kernel, inet_dist_use_interface) of + {ok, Ip} -> + [{ip, Ip} | Opts0]; + _ -> + Opts0 + end, + case application:get_env(kernel, inet_dist_listen_options) of + {ok,ListenOpts} -> + ListenOpts ++ Opts1; + _ -> + Opts1 + end. + + +%% ------------------------------------------------------------ +%% Accepts new connection attempts from other Erlang nodes. +%% ------------------------------------------------------------ + +accept(Listen) -> + spawn_opt(?MODULE, accept_loop, [self(), Listen], [link, {priority, max}]). + +accept_loop(Kernel, Listen) -> + ?trace("~p~n",[{?MODULE, accept_loop, self()}]), + case gen_tcp:accept(Listen) of + {ok, Socket} -> + DistCtrl = spawn_dist_cntrlr(Socket), + ?trace("~p~n",[{?MODULE, accept_loop, accepted, Socket, DistCtrl, self()}]), + flush_controller(DistCtrl, Socket), + gen_tcp:controlling_process(Socket, DistCtrl), + flush_controller(DistCtrl, Socket), + Kernel ! {accept,self(),DistCtrl,inet,tcp}, + receive + {Kernel, controller, Pid} -> + call_ctrlr(DistCtrl, {supervisor, Pid}), + Pid ! {self(), controller}; + {Kernel, unsupported_protocol} -> + exit(unsupported_protocol) + end, + accept_loop(Kernel, Listen); + Error -> + exit(Error) + end. + +flush_controller(Pid, Socket) -> + receive + {tcp, Socket, Data} -> + Pid ! {tcp, Socket, Data}, + flush_controller(Pid, Socket); + {tcp_closed, Socket} -> + Pid ! {tcp_closed, Socket}, + flush_controller(Pid, Socket) + after 0 -> + ok + end. + +%% ------------------------------------------------------------ +%% Accepts a new connection attempt from another Erlang node. +%% Performs the handshake with the other side. +%% ------------------------------------------------------------ + +accept_connection(AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> + spawn_opt(?MODULE, do_accept, + [self(), AcceptPid, DistCtrl, MyNode, Allowed, SetupTime], + [link, {priority, max}]). + +do_accept(Kernel, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> + ?trace("~p~n",[{?MODULE, do_accept, self(), MyNode}]), + receive + {AcceptPid, controller} -> + Timer = dist_util:start_timer(SetupTime), + case check_ip(DistCtrl) of + true -> + HSData0 = hs_data_common(DistCtrl), + HSData = HSData0#hs_data{kernel_pid = Kernel, + this_node = MyNode, + socket = DistCtrl, + timer = Timer, + this_flags = 0, + allowed = Allowed}, + dist_util:handshake_other_started(HSData); + {false,IP} -> + error_msg("** Connection attempt from " + "disallowed IP ~w ** ~n", [IP]), + ?shutdown(no_node) + end + end. + +%% we may not always want the nodelay behaviour +%% for performance reasons + +nodelay() -> + case application:get_env(kernel, dist_nodelay) of + undefined -> + {nodelay, true}; + {ok, true} -> + {nodelay, true}; + {ok, false} -> + {nodelay, false}; + _ -> + {nodelay, true} + end. + +%% ------------------------------------------------------------ +%% Setup a new connection to another Erlang node. +%% Performs the handshake with the other side. +%% ------------------------------------------------------------ + +setup(Node, Type, MyNode, LongOrShortNames,SetupTime) -> + spawn_opt(?MODULE, do_setup, + [self(), Node, Type, MyNode, LongOrShortNames, SetupTime], + [link, {priority, max}]). + +do_setup(Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> + ?trace("~p~n",[{?MODULE, do_setup, self(), Node}]), + [Name, Address] = splitnode(Node, LongOrShortNames), + case inet:getaddr(Address, inet) of + {ok, Ip} -> + Timer = dist_util:start_timer(SetupTime), + ErlEpmd = net_kernel:epmd_module(), + case ErlEpmd:port_please(Name, Ip) of + {port, TcpPort, Version} -> + ?trace("port_please(~p) -> version ~p~n", + [Node,Version]), + dist_util:reset_timer(Timer), + case + gen_tcp:connect( + Ip, TcpPort, + connect_options([binary, {active, false}, {packet, 2}])) + of + {ok, Socket} -> + DistCtrl = spawn_dist_cntrlr(Socket), + call_ctrlr(DistCtrl, {supervisor, self()}), + flush_controller(DistCtrl, Socket), + gen_tcp:controlling_process(Socket, DistCtrl), + flush_controller(DistCtrl, Socket), + HSData0 = hs_data_common(DistCtrl), + HSData = HSData0#hs_data{kernel_pid = Kernel, + other_node = Node, + this_node = MyNode, + socket = DistCtrl, + timer = Timer, + this_flags = 0, + other_version = Version, + request_type = Type}, + dist_util:handshake_we_started(HSData); + _ -> + %% Other Node may have closed since + %% port_please ! + ?trace("other node (~p) " + "closed since port_please.~n", + [Node]), + ?shutdown(Node) + end; + _ -> + ?trace("port_please (~p) " + "failed.~n", [Node]), + ?shutdown(Node) + end; + _Other -> + ?trace("inet_getaddr(~p) " + "failed (~p).~n", [Node,_Other]), + ?shutdown(Node) + end. + +connect_options(Opts) -> + case application:get_env(kernel, inet_dist_connect_options) of + {ok,ConnectOpts} -> + ConnectOpts ++ Opts; + _ -> + Opts + end. + +%% +%% Close a socket. +%% +close(Listen) -> + gen_tcp:close(Listen). + + +%% If Node is illegal terminate the connection setup!! +splitnode(Node, LongOrShortNames) -> + case split_node(atom_to_list(Node), $@, []) of + [Name|Tail] when Tail =/= [] -> + Host = lists:append(Tail), + case split_node(Host, $., []) of + [_] when LongOrShortNames =:= longnames -> + case inet:parse_address(Host) of + {ok, _} -> + [Name, Host]; + _ -> + error_msg("** System running to use " + "fully qualified " + "hostnames **~n" + "** Hostname ~ts is illegal **~n", + [Host]), + ?shutdown(Node) + end; + L when length(L) > 1, LongOrShortNames =:= shortnames -> + error_msg("** System NOT running to use fully qualified " + "hostnames **~n" + "** Hostname ~ts is illegal **~n", + [Host]), + ?shutdown(Node); + _ -> + [Name, Host] + end; + [_] -> + error_msg("** Nodename ~p illegal, no '@' character **~n", + [Node]), + ?shutdown(Node); + _ -> + error_msg("** Nodename ~p illegal **~n", [Node]), + ?shutdown(Node) + end. + +split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])]; +split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]); +split_node([], _, Ack) -> [lists:reverse(Ack)]. + +%% ------------------------------------------------------------ +%% Fetch local information about a Socket. +%% ------------------------------------------------------------ +get_tcp_address(Socket) -> + {ok, Address} = inet:sockname(Socket), + {ok, Host} = inet:gethostname(), + #net_address { + address = Address, + host = Host, + protocol = tcp, + family = inet + }. + +%% ------------------------------------------------------------ +%% Do only accept new connection attempts from nodes at our +%% own LAN, if the check_ip environment parameter is true. +%% ------------------------------------------------------------ +check_ip(DistCtrl) -> + case application:get_env(check_ip) of + {ok, true} -> + case get_ifs(DistCtrl) of + {ok, IFs, IP} -> + check_ip(IFs, IP); + _ -> + ?shutdown(no_node) + end; + _ -> + true + end. + +get_ifs(DistCtrl) -> + Socket = call_ctrlr(DistCtrl, socket), + case inet:peername(Socket) of + {ok, {IP, _}} -> + case inet:getif(Socket) of + {ok, IFs} -> {ok, IFs, IP}; + Error -> Error + end; + Error -> + Error + end. + +check_ip([{OwnIP, _, Netmask}|IFs], PeerIP) -> + case {inet_tcp:mask(Netmask, PeerIP), inet_tcp:mask(Netmask, OwnIP)} of + {M, M} -> true; + _ -> check_ip(IFs, PeerIP) + end; +check_ip([], PeerIP) -> + {false, PeerIP}. + +is_node_name(Node) when is_atom(Node) -> + case split_node(atom_to_list(Node), $@, []) of + [_, _Host] -> true; + _ -> false + end; +is_node_name(_Node) -> + false. + +hs_data_common(DistCtrl) -> + TickHandler = call_ctrlr(DistCtrl, tick_handler), + Socket = call_ctrlr(DistCtrl, socket), + #hs_data{f_send = send_fun(), + f_recv = recv_fun(), + f_setopts_pre_nodeup = setopts_pre_nodeup_fun(), + f_setopts_post_nodeup = setopts_post_nodeup_fun(), + f_getll = getll_fun(), + f_handshake_complete = handshake_complete_fun(), + f_address = address_fun(), + mf_setopts = setopts_fun(DistCtrl, Socket), + mf_getopts = getopts_fun(DistCtrl, Socket), + mf_getstat = getstat_fun(DistCtrl, Socket), + mf_tick = tick_fun(DistCtrl, TickHandler)}. + +%%% ------------------------------------------------------------ +%%% Distribution controller processes +%%% ------------------------------------------------------------ + +%% +%% There will be five parties working together when the +%% connection is up: +%% - The gen_tcp socket. Providing a tcp/ip connection +%% to the other node. +%% - The output handler. It will dispatch all outgoing +%% traffic from the VM to the gen_tcp socket. This +%% process is registered as distribution controller +%% for this channel with the VM. +%% - The input handler. It will dispatch all incoming +%% traffic from the gen_tcp socket to the VM. This +%% process is also the socket owner and receives +%% incoming traffic using active-N. +%% - The tick handler. Dispatches asynchronous tick +%% requests to the socket. It executes on max priority +%% since it is important to get ticks through to the +%% other end. +%% - The channel supervisor (provided by dist_util). It +%% monitors traffic. Issue tick requests to the tick +%% handler when no outgoing traffic is seen and bring +%% the connection down if no incoming traffic is seen. +%% This process also executes on max priority. +%% +%% These parties are linked togheter so should one +%% of them fail, all of them are terminated and the +%% connection is taken down. +%% + +%% In order to avoid issues with lingering signal binaries +%% we enable off-heap message queue data as well as fullsweep +%% after 0. The fullsweeps will be cheap since we have more +%% or less no live data. +-define(DIST_CNTRL_COMMON_SPAWN_OPTS, + [{message_queue_data, off_heap}, + {fullsweep_after, 0}]). + +tick_fun(DistCtrl, TickHandler) -> + fun (Ctrl) when Ctrl == DistCtrl -> + TickHandler ! tick + end. + +getstat_fun(DistCtrl, Socket) -> + fun (Ctrl) when Ctrl == DistCtrl -> + case inet:getstat(Socket, [recv_cnt, send_cnt, send_pend]) of + {ok, Stat} -> + split_stat(Stat,0,0,0); + Error -> + Error + end + end. + +split_stat([{recv_cnt, R}|Stat], _, W, P) -> + split_stat(Stat, R, W, P); +split_stat([{send_cnt, W}|Stat], R, _, P) -> + split_stat(Stat, R, W, P); +split_stat([{send_pend, P}|Stat], R, W, _) -> + split_stat(Stat, R, W, P); +split_stat([], R, W, P) -> + {ok, R, W, P}. + +setopts_fun(DistCtrl, Socket) -> + fun (Ctrl, Opts) when Ctrl == DistCtrl -> + setopts(Socket, Opts) + end. + +getopts_fun(DistCtrl, Socket) -> + fun (Ctrl, Opts) when Ctrl == DistCtrl -> + getopts(Socket, Opts) + end. + +setopts(S, Opts) -> + case [Opt || {K,_}=Opt <- Opts, + K =:= active orelse K =:= deliver orelse K =:= packet] of + [] -> inet:setopts(S,Opts); + Opts1 -> {error, {badopts,Opts1}} + end. + +getopts(S, Opts) -> + inet:getopts(S, Opts). + +send_fun() -> + fun (Ctrlr, Packet) -> + call_ctrlr(Ctrlr, {send, Packet}) + end. + +recv_fun() -> + fun (Ctrlr, Length, Timeout) -> + case call_ctrlr(Ctrlr, {recv, Length, Timeout}) of + {ok, Bin} when is_binary(Bin) -> + {ok, binary_to_list(Bin)}; + Other -> + Other + end + end. + +getll_fun() -> + fun (Ctrlr) -> + call_ctrlr(Ctrlr, getll) + end. + +address_fun() -> + fun (Ctrlr, Node) -> + case call_ctrlr(Ctrlr, {address, Node}) of + {error, no_node} -> %% No '@' or more than one '@' in node name. + ?shutdown(no_node); + Res -> + Res + end + end. + +setopts_pre_nodeup_fun() -> + fun (Ctrlr) -> + call_ctrlr(Ctrlr, pre_nodeup) + end. + +setopts_post_nodeup_fun() -> + fun (Ctrlr) -> + call_ctrlr(Ctrlr, post_nodeup) + end. + +handshake_complete_fun() -> + fun (Ctrlr, Node, DHandle) -> + call_ctrlr(Ctrlr, {handshake_complete, Node, DHandle}) + end. + +call_ctrlr(Ctrlr, Msg) -> + Ref = erlang:monitor(process, Ctrlr), + Ctrlr ! {Ref, self(), Msg}, + receive + {Ref, Res} -> + erlang:demonitor(Ref, [flush]), + Res; + {'DOWN', Ref, process, Ctrlr, Reason} -> + exit({dist_controller_exit, Reason}) + end. + +%% +%% The tick handler process writes a tick to the +%% socket when it receives a 'tick' message from +%% the connection supervisor. +%% +%% We are not allowed to block the connection +%% superviser when writing a tick and we also want +%% the tick to go through even during a heavily +%% loaded system. gen_tcp does not have a +%% non-blocking send operation exposed in its API +%% and we don't want to run the distribution +%% controller under high priority. Therefore this +%% sparate process with max prio that dispatches +%% ticks. +%% +dist_cntrlr_tick_handler(Socket) -> + receive + tick -> + %% May block due to busy port... + sock_send(Socket, ""); + _ -> + ok + end, + dist_cntrlr_tick_handler(Socket). + +spawn_dist_cntrlr(Socket) -> + spawn_opt(?MODULE, dist_cntrlr_setup, [Socket], + [{priority, max}] ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS). + +dist_cntrlr_setup(Socket) -> + TickHandler = spawn_opt(?MODULE, dist_cntrlr_tick_handler, + [Socket], + [link, {priority, max}] + ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS), + dist_cntrlr_setup_loop(Socket, TickHandler, undefined). + +%% +%% During the handshake phase we loop in dist_cntrlr_setup(). +%% When the connection is up we spawn an input handler and +%% continue as output handler. +%% +dist_cntrlr_setup_loop(Socket, TickHandler, Sup) -> + receive + {tcp_closed, Socket} -> + exit(connection_closed); + + {Ref, From, {supervisor, Pid}} -> + Res = link(Pid), + From ! {Ref, Res}, + dist_cntrlr_setup_loop(Socket, TickHandler, Pid); + + {Ref, From, tick_handler} -> + From ! {Ref, TickHandler}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, socket} -> + From ! {Ref, Socket}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, {send, Packet}} -> + Res = gen_tcp:send(Socket, Packet), + From ! {Ref, Res}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, {recv, Length, Timeout}} -> + Res = gen_tcp:recv(Socket, Length, Timeout), + From ! {Ref, Res}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, getll} -> + From ! {Ref, {ok, self()}}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, {address, Node}} -> + Res = case inet:peername(Socket) of + {ok, Address} -> + case split_node(atom_to_list(Node), $@, []) of + [_,Host] -> + #net_address{address=Address,host=Host, + protocol=tcp, family=inet}; + _ -> + {error, no_node} + end + end, + From ! {Ref, Res}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, pre_nodeup} -> + Res = inet:setopts(Socket, + [{active, false}, + {packet, 4}, + nodelay()]), + From ! {Ref, Res}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, post_nodeup} -> + Res = inet:setopts(Socket, + [{active, false}, + {packet, 4}, + nodelay()]), + From ! {Ref, Res}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, {handshake_complete, _Node, DHandle}} -> + From ! {Ref, ok}, + %% Handshake complete! Begin dispatching traffic... + + %% We use separate process for dispatching input. This + %% is not necessary, but it enables parallel execution + %% of independent work loads at the same time as it + %% simplifies the the implementation... + InputHandler = spawn_opt(?MODULE, dist_cntrlr_input_setup, + [DHandle, Socket, Sup], + [link] ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS), + + flush_controller(InputHandler, Socket), + gen_tcp:controlling_process(Socket, InputHandler), + flush_controller(InputHandler, Socket), + + ok = erlang:dist_ctrl_input_handler(DHandle, InputHandler), + + InputHandler ! DHandle, + + %% From now on we execute on normal priority + process_flag(priority, normal), + erlang:dist_ctrl_get_data_notification(DHandle), + dist_cntrlr_output_loop(DHandle, Socket) + end. + +%% We use active 10 for good throughput while still +%% maintaining back-pressure if the input controller +%% isn't able to handle all incoming messages... +-define(ACTIVE_INPUT, 10). + +dist_cntrlr_input_setup(DHandle, Socket, Sup) -> + link(Sup), + %% Ensure we don't try to put data before registerd + %% as input handler... + receive + DHandle -> + dist_cntrlr_input_loop(DHandle, Socket, 0) + end. + +dist_cntrlr_input_loop(DHandle, Socket, N) when N =< ?ACTIVE_INPUT/2 -> + inet:setopts(Socket, [{active, ?ACTIVE_INPUT - N}]), + dist_cntrlr_input_loop(DHandle, Socket, ?ACTIVE_INPUT); +dist_cntrlr_input_loop(DHandle, Socket, N) -> + receive + {tcp_closed, Socket} -> + %% Connection to remote node terminated... + exit(connection_closed); + + {tcp, Socket, Data} -> + %% Incoming data from remote node... + try erlang:dist_ctrl_put_data(DHandle, Data) + catch _ : _ -> death_row() + end, + dist_cntrlr_input_loop(DHandle, Socket, N-1); + + _ -> + %% Ignore... + dist_cntrlr_input_loop(DHandle, Socket, N) + end. + +dist_cntrlr_send_data(DHandle, Socket) -> + case erlang:dist_ctrl_get_data(DHandle) of + none -> + erlang:dist_ctrl_get_data_notification(DHandle); + Data -> + sock_send(Socket, Data), + dist_cntrlr_send_data(DHandle, Socket) + end. + + +dist_cntrlr_output_loop(DHandle, Socket) -> + receive + dist_data -> + %% Outgoing data from this node... + try dist_cntrlr_send_data(DHandle, Socket) + catch _ : _ -> death_row() + end, + dist_cntrlr_output_loop(DHandle, Socket); + + {send, From, Ref, Data} -> + %% This is for testing only! + %% + %% Needed by some OTP distribution + %% test suites... + sock_send(Socket, Data), + From ! {Ref, ok}, + dist_cntrlr_output_loop(DHandle, Socket); + + _ -> + %% Drop garbage message... + dist_cntrlr_output_loop(DHandle, Socket) + + end. + +sock_send(Socket, Data) -> + try gen_tcp:send(Socket, Data) of + ok -> ok; + {error, Reason} -> death_row({send_error, Reason}) + catch + Type : Reason -> death_row({send_error, {Type, Reason}}) + end. + +death_row() -> + death_row(connection_closed). + +death_row(normal) -> + %% We do not want to exit with normal + %% exit reason since it wont bring down + %% linked processes... + death_row(); +death_row(Reason) -> + %% When the connection is on its way down operations + %% begin to fail. We catch the failures and call + %% this function waiting for termination. We should + %% be terminated by one of our links to the other + %% involved parties that began bringing the + %% connection down. By waiting for termination we + %% avoid altering the exit reason for the connection + %% teardown. We however limit the wait to 5 seconds + %% and bring down the connection ourselves if not + %% terminated... + receive after 5000 -> exit(Reason) end. diff --git a/lib/kernel/include/dist.hrl b/lib/kernel/include/dist.hrl index d6bccdf474..db4a5eaebc 100644 --- a/lib/kernel/include/dist.hrl +++ b/lib/kernel/include/dist.hrl @@ -40,3 +40,33 @@ -define(DFLAG_UTF8_ATOMS, 16#10000). -define(DFLAG_MAP_TAG, 16#20000). -define(DFLAG_BIG_CREATION, 16#40000). +-define(DFLAG_SEND_SENDER, 16#80000). + +%% DFLAGs that require strict ordering or:ed together... +-define(DFLAGS_STRICT_ORDER_DELIVERY, + ?DFLAG_DIST_HDR_ATOM_CACHE). + + +%% Also update dflag2str() in ../src/dist_util.erl +%% when adding flags... + +-define(DFLAGS_ALL, + (?DFLAG_PUBLISHED + bor ?DFLAG_ATOM_CACHE + bor ?DFLAG_EXTENDED_REFERENCES + bor ?DFLAG_DIST_MONITOR + bor ?DFLAG_FUN_TAGS + bor ?DFLAG_DIST_MONITOR_NAME + bor ?DFLAG_HIDDEN_ATOM_CACHE + bor ?DFLAG_NEW_FUN_TAGS + bor ?DFLAG_EXTENDED_PIDS_PORTS + bor ?DFLAG_EXPORT_PTR_TAG + bor ?DFLAG_BIT_BINARIES + bor ?DFLAG_NEW_FLOATS + bor ?DFLAG_UNICODE_IO + bor ?DFLAG_DIST_HDR_ATOM_CACHE + bor ?DFLAG_SMALL_ATOM_TAGS + bor ?DFLAG_UTF8_ATOMS + bor ?DFLAG_MAP_TAG + bor ?DFLAG_BIG_CREATION + bor ?DFLAG_SEND_SENDER)). diff --git a/lib/kernel/include/dist_util.hrl b/lib/kernel/include/dist_util.hrl index e3d2fe0eb6..eeb0f8dd43 100644 --- a/lib/kernel/include/dist_util.hrl +++ b/lib/kernel/include/dist_util.hrl @@ -29,9 +29,9 @@ -endif. -ifdef(dist_trace). --define(trace(Fmt,Args), io:format("~p ~p:~s",[erlang:timestamp(),node(),lists:flatten(io_lib:format(Fmt, Args))])). +-define(trace(Fmt,Args), io:format("~p ~p:~s",[erlang:convert_time_unit(erlang:monotonic_time()-erlang:system_info(start_time), native, microsecond),node(),lists:flatten(io_lib:format(Fmt, Args))])). % Use the one below for config-file (early boot) connection tracing -%-define(trace(Fmt,Args), erlang:display([erlang:now(),node(),lists:flatten(io_lib:format(Fmt, Args))])). +%-define(trace(Fmt,Args), erlang:display([erlang:convert_time_unit(erlang:monotonic_time()-erlang:system_info(start_time), native, microsecond),node(),lists:flatten(io_lib:format(Fmt, Args))])). -define(trace_factor,8). -else. -define(trace(Fmt,Args), ok). @@ -78,7 +78,13 @@ %% New in kernel-5.1 (OTP 19.1): mf_setopts, %% netkernel:setopts on active connection - mf_getopts %% netkernel:getopts on active connection + mf_getopts, %% netkernel:getopts on active connection + + %% New in kernel-6.0 (OTP 21.0) + f_handshake_complete, %% Notify handshake complete + add_flags, %% dflags to add + reject_flags, %% dflags not to use (not all can be rejected) + require_flags %% dflags that are required }). diff --git a/lib/kernel/src/dist_util.erl b/lib/kernel/src/dist_util.erl index b3507e5d13..08bd5946cd 100644 --- a/lib/kernel/src/dist_util.erl +++ b/lib/kernel/src/dist_util.erl @@ -74,6 +74,48 @@ ticked = 0 }). +dflag2str(?DFLAG_PUBLISHED) -> + "PUBLISHED"; +dflag2str(?DFLAG_ATOM_CACHE) -> + "ATOM_CACHE"; +dflag2str(?DFLAG_EXTENDED_REFERENCES) -> + "EXTENDED_REFERENCES"; +dflag2str(?DFLAG_DIST_MONITOR) -> + "DIST_MONITOR"; +dflag2str(?DFLAG_FUN_TAGS) -> + "FUN_TAGS"; +dflag2str(?DFLAG_DIST_MONITOR_NAME) -> + "DIST_MONITOR_NAME"; +dflag2str(?DFLAG_HIDDEN_ATOM_CACHE) -> + "HIDDEN_ATOM_CACHE"; +dflag2str(?DFLAG_NEW_FUN_TAGS) -> + "NEW_FUN_TAGS"; +dflag2str(?DFLAG_EXTENDED_PIDS_PORTS) -> + "EXTENDED_PIDS_PORTS"; +dflag2str(?DFLAG_EXPORT_PTR_TAG) -> + "EXPORT_PTR_TAG"; +dflag2str(?DFLAG_BIT_BINARIES) -> + "BIT_BINARIES"; +dflag2str(?DFLAG_NEW_FLOATS) -> + "NEW_FLOATS"; +dflag2str(?DFLAG_UNICODE_IO) -> + "UNICODE_IO"; +dflag2str(?DFLAG_DIST_HDR_ATOM_CACHE) -> + "DIST_HDR_ATOM_CACHE"; +dflag2str(?DFLAG_SMALL_ATOM_TAGS) -> + "SMALL_ATOM_TAGS"; +dflag2str(?DFLAG_UTF8_ATOMS) -> + "UTF8_ATOMS"; +dflag2str(?DFLAG_MAP_TAG) -> + "MAP_TAG"; +dflag2str(?DFLAG_BIG_CREATION) -> + "BIG_CREATION"; +dflag2str(?DFLAG_SEND_SENDER) -> + "SEND_SENDER"; +dflag2str(_) -> + "UNKNOWN". + + remove_flag(Flag, Flags) -> case Flags band Flag of 0 -> @@ -82,13 +124,13 @@ remove_flag(Flag, Flags) -> Flags - Flag end. -adjust_flags(ThisFlags, OtherFlags) -> +adjust_flags(ThisFlags, OtherFlags, RejectFlags) -> case (?DFLAG_PUBLISHED band ThisFlags) band OtherFlags of 0 -> {remove_flag(?DFLAG_PUBLISHED, ThisFlags), remove_flag(?DFLAG_PUBLISHED, OtherFlags)}; _ -> - {ThisFlags, OtherFlags} + {ThisFlags, OtherFlags band (bnot RejectFlags)} end. publish_flag(hidden, _) -> @@ -101,36 +143,71 @@ publish_flag(_, OtherNode) -> 0 end. -make_this_flags(RequestType, OtherNode) -> - publish_flag(RequestType, OtherNode) bor - %% The parenthesis below makes the compiler generate better code. - (?DFLAG_EXPORT_PTR_TAG bor - ?DFLAG_EXTENDED_PIDS_PORTS bor - ?DFLAG_EXTENDED_REFERENCES bor - ?DFLAG_DIST_MONITOR bor - ?DFLAG_FUN_TAGS bor - ?DFLAG_DIST_MONITOR_NAME bor - ?DFLAG_HIDDEN_ATOM_CACHE bor - ?DFLAG_NEW_FUN_TAGS bor - ?DFLAG_BIT_BINARIES bor - ?DFLAG_NEW_FLOATS bor - ?DFLAG_UNICODE_IO bor - ?DFLAG_DIST_HDR_ATOM_CACHE bor - ?DFLAG_SMALL_ATOM_TAGS bor - ?DFLAG_UTF8_ATOMS bor - ?DFLAG_MAP_TAG bor - ?DFLAG_BIG_CREATION). - -handshake_other_started(#hs_data{request_type=ReqType}=HSData0) -> +-define(DFLAGS_REMOVABLE, + (?DFLAG_DIST_HDR_ATOM_CACHE + bor ?DFLAG_HIDDEN_ATOM_CACHE + bor ?DFLAG_ATOM_CACHE)). + +-define(DFLAGS_ADDABLE, + (?DFLAGS_ALL + band (bnot (?DFLAG_PUBLISHED + bor ?DFLAG_HIDDEN_ATOM_CACHE + bor ?DFLAG_ATOM_CACHE)))). + +-define(DFLAGS_THIS_DEFAULT, + (?DFLAG_EXPORT_PTR_TAG + bor ?DFLAG_EXTENDED_PIDS_PORTS + bor ?DFLAG_EXTENDED_REFERENCES + bor ?DFLAG_DIST_MONITOR + bor ?DFLAG_FUN_TAGS + bor ?DFLAG_DIST_MONITOR_NAME + bor ?DFLAG_NEW_FUN_TAGS + bor ?DFLAG_BIT_BINARIES + bor ?DFLAG_NEW_FLOATS + bor ?DFLAG_UNICODE_IO + bor ?DFLAG_DIST_HDR_ATOM_CACHE + bor ?DFLAG_SMALL_ATOM_TAGS + bor ?DFLAG_UTF8_ATOMS + bor ?DFLAG_MAP_TAG + bor ?DFLAG_BIG_CREATION + bor ?DFLAG_SEND_SENDER)). + +make_this_flags(RequestType, AddFlags, RemoveFlags, OtherNode) -> + case RemoveFlags band (bnot ?DFLAGS_REMOVABLE) of + 0 -> ok; + Rerror -> exit({"Rejecting non rejectable flags", Rerror}) + end, + case AddFlags band (bnot ?DFLAGS_ADDABLE) of + 0 -> ok; + Aerror -> exit({"Adding non addable flags", Aerror}) + end, + Flgs0 = ?DFLAGS_THIS_DEFAULT, + Flgs1 = Flgs0 bor publish_flag(RequestType, OtherNode), + Flgs2 = Flgs1 bor AddFlags, + Flgs3 = Flgs2 band (bnot (?DFLAG_HIDDEN_ATOM_CACHE + bor ?DFLAG_ATOM_CACHE)), + Flgs3 band (bnot RemoveFlags). + +handshake_other_started(#hs_data{request_type=ReqType, + add_flags=AddFlgs0, + reject_flags=RejFlgs0, + require_flags=ReqFlgs0}=HSData0) -> + AddFlgs = convert_flags(AddFlgs0), + RejFlgs = convert_flags(RejFlgs0), + ReqFlgs = convert_flags(ReqFlgs0), {PreOtherFlags,Node,Version} = recv_name(HSData0), - PreThisFlags = make_this_flags(ReqType, Node), + PreThisFlags = make_this_flags(ReqType, AddFlgs, RejFlgs, Node), {ThisFlags, OtherFlags} = adjust_flags(PreThisFlags, - PreOtherFlags), + PreOtherFlags, + RejFlgs), HSData = HSData0#hs_data{this_flags=ThisFlags, other_flags=OtherFlags, other_version=Version, other_node=Node, - other_started=true}, + other_started=true, + add_flags=AddFlgs, + reject_flags=RejFlgs, + require_flags=ReqFlgs}, check_dflags(HSData), is_allowed(HSData), ?debug({"MD5 connection from ~p (V~p)~n", @@ -165,23 +242,18 @@ is_allowed(#hs_data{other_node = Node, end. %% -%% Check that both nodes can handle the same types of extended -%% node containers. If they can not, abort the connection. +%% Check mandatory flags... %% check_dflags(#hs_data{other_node = Node, other_flags = OtherFlags, - other_started = OtherStarted} = HSData) -> - - Mandatory = [{?DFLAG_EXTENDED_REFERENCES, "EXTENDED_REFERENCES"}, - {?DFLAG_EXTENDED_PIDS_PORTS, "EXTENDED_PIDS_PORTS"}, - {?DFLAG_UTF8_ATOMS, "UTF8_ATOMS"}], - Missing = lists:filtermap(fun({Bit, Str}) -> - case Bit band OtherFlags of - Bit -> false; - 0 -> {true, Str} - end - end, - Mandatory), + other_started = OtherStarted, + require_flags = RequiredFlags} = HSData) -> + Mandatory = ((?DFLAG_EXTENDED_REFERENCES + bor ?DFLAG_EXTENDED_PIDS_PORTS + bor ?DFLAG_UTF8_ATOMS) + bor RequiredFlags), + Missing = check_mandatory(0, ?DFLAGS_ALL, Mandatory, + OtherFlags, []), case Missing of [] -> ok; @@ -201,6 +273,22 @@ check_dflags(#hs_data{other_node = Node, ?shutdown2(Node, {check_dflags_failed, Missing}) end. +check_mandatory(_Bit, 0, _Mandatory, _OtherFlags, Missing) -> + Missing; +check_mandatory(Bit, Left, Mandatory, OtherFlags, Missing) -> + DFlag = (1 bsl Bit), + NewLeft = Left band (bnot DFlag), + NewMissing = case {DFlag band Mandatory, + DFlag band OtherFlags} of + {DFlag, 0} -> + %% Mandatory and missing... + [dflag2str(DFlag) | Missing]; + _ -> + %% Not mandatory or present... + Missing + end, + check_mandatory(Bit+1, NewLeft, Mandatory, OtherFlags, NewMissing). + %% No nodedown will be sent if we fail before this process has %% succeeded to mark the node as pending. @@ -314,13 +402,24 @@ flush_down() -> end. handshake_we_started(#hs_data{request_type=ReqType, - other_node=Node}=PreHSData) -> - PreThisFlags = make_this_flags(ReqType, Node), - HSData = PreHSData#hs_data{this_flags=PreThisFlags}, + other_node=Node, + add_flags=AddFlgs0, + reject_flags=RejFlgs0, + require_flags=ReqFlgs0}=PreHSData) -> + AddFlgs = convert_flags(AddFlgs0), + RejFlgs = convert_flags(RejFlgs0), + ReqFlgs = convert_flags(ReqFlgs0), + PreThisFlags = make_this_flags(ReqType, AddFlgs, RejFlgs, Node), + HSData = PreHSData#hs_data{this_flags = PreThisFlags, + add_flags = AddFlgs, + reject_flags = RejFlgs, + require_flags = ReqFlgs}, send_name(HSData), recv_status(HSData), {PreOtherFlags,ChallengeA} = recv_challenge(HSData), - {ThisFlags,OtherFlags} = adjust_flags(PreThisFlags, PreOtherFlags), + {ThisFlags,OtherFlags} = adjust_flags(PreThisFlags, + PreOtherFlags, + RejFlgs), NewHSData = HSData#hs_data{this_flags = ThisFlags, other_flags = OtherFlags, other_started = false}, @@ -336,15 +435,16 @@ handshake_we_started(#hs_data{request_type=ReqType, handshake_we_started(OldHsData) when element(1,OldHsData) =:= hs_data -> handshake_we_started(convert_old_hsdata(OldHsData)). -convert_old_hsdata({hs_data, KP, ON, TN, S, T, TF, A, OV, OF, OS, FS, FR, - FS_PRE, FS_POST, FG, FA, MFT, MFG, RT}) -> - #hs_data{ - kernel_pid = KP, other_node = ON, this_node = TN, socket = S, timer = T, - this_flags = TF, allowed = A, other_version = OV, other_flags = OF, - other_started = OS, f_send = FS, f_recv = FR, f_setopts_pre_nodeup = FS_PRE, - f_setopts_post_nodeup = FS_POST, f_getll = FG, f_address = FA, - mf_tick = MFT, mf_getstat = MFG, request_type = RT}. +convert_old_hsdata(OldHsData) -> + OHSDL = tuple_to_list(OldHsData), + NoMissing = tuple_size(#hs_data{}) - tuple_size(OldHsData), + true = NoMissing > 0, + list_to_tuple(OHSDL ++ lists:duplicate(NoMissing, undefined)). +convert_flags(Flags) when is_integer(Flags) -> + Flags; +convert_flags(_Undefined) -> + 0. %% -------------------------------------------------------------- %% The connection has been established. @@ -359,15 +459,20 @@ connection(#hs_data{other_node = Node, PType = publish_type(HSData#hs_data.other_flags), case FPreNodeup(Socket) of ok -> - do_setnode(HSData), % Succeeds or exits the process. + DHandle = do_setnode(HSData), % Succeeds or exits the process. Address = FAddress(Socket,Node), mark_nodeup(HSData,Address), case FPostNodeup(Socket) of ok -> + case HSData#hs_data.f_handshake_complete of + undefined -> ok; + HsComplete -> HsComplete(Socket, Node, DHandle) + end, con_loop({HSData#hs_data.kernel_pid, Node, Socket, PType, + DHandle, HSData#hs_data.mf_tick, HSData#hs_data.mf_getstat, HSData#hs_data.mf_setopts, @@ -425,18 +530,16 @@ do_setnode(#hs_data{other_node = Node, socket = Socket, [Node, Port, {publish_type(Flags), '(', Flags, ')', Version}]), - case (catch - erlang:setnode(Node, Port, - {Flags, Version, '', ''})) of - {'EXIT', {system_limit, _}} -> + try + erlang:setnode(Node, Port, {Flags, Version, '', ''}) + catch + error:system_limit -> error_msg("** Distribution system limit reached, " "no table space left for node ~w ** ~n", [Node]), ?shutdown(Node); - {'EXIT', Other} -> - exit(Other); - _Else -> - ok + error:Other -> + exit({Other, erlang:get_stacktrace()}) end; _ -> error_msg("** Distribution connection error, " @@ -468,7 +571,13 @@ mark_nodeup(#hs_data{kernel_pid = Kernel, ?shutdown(Node) end. -con_loop({Kernel, Node, Socket, Type, MFTick, MFGetstat, MFSetOpts, MFGetOpts}=ConData, +getstat(DHandle, _Socket, undefined) -> + erlang:dist_get_stat(DHandle); +getstat(_DHandle, Socket, MFGetstat) -> + MFGetstat(Socket). + +con_loop({Kernel, Node, Socket, Type, DHandle, MFTick, MFGetstat, + MFSetOpts, MFGetOpts}=ConData, Tick) -> receive {tcp_closed, Socket} -> @@ -476,7 +585,7 @@ con_loop({Kernel, Node, Socket, Type, MFTick, MFGetstat, MFSetOpts, MFGetOpts}=C {Kernel, disconnect} -> ?shutdown2(Node, disconnected); {Kernel, aux_tick} -> - case MFGetstat(Socket) of + case getstat(DHandle, Socket, MFGetstat) of {ok, _, _, PendWrite} -> send_tick(Socket, PendWrite, MFTick); _ -> @@ -484,7 +593,7 @@ con_loop({Kernel, Node, Socket, Type, MFTick, MFGetstat, MFSetOpts, MFGetOpts}=C end, con_loop(ConData, Tick); {Kernel, tick} -> - case send_tick(Socket, Tick, Type, + case send_tick(DHandle, Socket, Tick, Type, MFTick, MFGetstat) of {ok, NewTick} -> con_loop(ConData, NewTick); @@ -497,7 +606,7 @@ con_loop({Kernel, Node, Socket, Type, MFTick, MFGetstat, MFSetOpts, MFGetOpts}=C ?shutdown2(Node, send_net_tick_failed) end; {From, get_status} -> - case MFGetstat(Socket) of + case getstat(DHandle, Socket, MFGetstat) of {ok, Read, Write, _} -> From ! {self(), get_status, {ok, Read, Write}}, con_loop(ConData, Tick); @@ -735,14 +844,14 @@ send_status(#hs_data{socket = Socket, other_node = Node, %% we haven't read anything as a hidden node only ticks when it receives %% a TICK !! -send_tick(Socket, Tick, Type, MFTick, MFGetstat) -> +send_tick(DHandle, Socket, Tick, Type, MFTick, MFGetstat) -> #tick{tick = T0, read = Read, write = Write, ticked = Ticked} = Tick, T = T0 + 1, T1 = T rem 4, - case MFGetstat(Socket) of + case getstat(DHandle, Socket, MFGetstat) of {ok, Read, _, _} when Ticked =:= T -> {error, not_responding}; {ok, Read, W, Pend} when Type =:= hidden -> @@ -771,11 +880,10 @@ send_tick(Socket, Tick, Type, MFTick, MFGetstat) -> Error end. -send_tick(Socket, 0, MFTick) -> - MFTick(Socket); -send_tick(_, _Pend, _) -> - %% Dont send tick if pending write. - ok. +send_tick(_, Pend, _) when Pend /= false, Pend /= 0 -> + ok; %% Dont send tick if pending write. +send_tick(Socket, _Pend, MFTick) -> + MFTick(Socket). %% ------------------------------------------------------------ %% Connection setup timeout timer. diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index ddda396713..fb4faea420 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -423,8 +423,8 @@ handle_call({connect, Type, Node}, From, State) -> {ok, SetupPid} -> Owners = [{SetupPid, Node} | State#state.conn_owners], {noreply,State#state{conn_owners=Owners}}; - _ -> - ?connect_failure(Node, {setup_call, failed}), + _Error -> + ?connect_failure(Node, {setup_call, failed, _Error}), async_reply({reply, false, State}, From) end end; |