aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl')
-rw-r--r--lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl781
1 files changed, 781 insertions, 0 deletions
diff --git a/lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl b/lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl
new file mode 100644
index 0000000000..98554ed805
--- /dev/null
+++ b/lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl
@@ -0,0 +1,781 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(gen_tcp_dist).
+
+%%
+%% This is an example of how to plug in an arbitrary distribution
+%% carrier for Erlang using distribution processes.
+%%
+%% This example uses gen_tcp for transportation of data, but
+%% you can use whatever underlying protocol you want as long
+%% as your implementation reliably delivers data chunks to the
+%% receiving VM in the order they were sent from the sending
+%% VM.
+%%
+%% This code is a rewrite of the lib/kernel/src/inet_tcp_dist.erl
+%% distribution impementation for TCP used by default. That
+%% implementation use distribution ports instead of distribution
+%% processes and is more efficient compared to this implementation.
+%% This since this implementation more or less gets the
+%% distribution processes in between the VM and the ports without
+%% any gain specific gain.
+%%
+
+-export([listen/1, accept/1, accept_connection/5,
+ setup/5, close/1, select/1, is_node_name/1]).
+
+%% Optional
+-export([setopts/2, getopts/2]).
+
+%% internal exports
+
+-export([dist_cntrlr_setup/1, dist_cntrlr_input_setup/3,
+ dist_cntrlr_tick_handler/1]).
+
+-export([accept_loop/2,do_accept/6,do_setup/6]).
+
+-import(error_logger,[error_msg/2]).
+
+-include("net_address.hrl").
+
+-include("dist.hrl").
+-include("dist_util.hrl").
+
+%% ------------------------------------------------------------
+%% Select this protocol based on node name
+%% select(Node) => Bool
+%% ------------------------------------------------------------
+
+select(Node) ->
+ case split_node(atom_to_list(Node), $@, []) of
+ [_, Host] ->
+ case inet:getaddr(Host, inet) of
+ {ok,_} -> true;
+ _ -> false
+ end;
+ _ -> false
+ end.
+
+%% ------------------------------------------------------------
+%% Create the listen socket, i.e. the port that this erlang
+%% node is accessible through.
+%% ------------------------------------------------------------
+
+listen(Name) ->
+ case do_listen([binary, {active, false}, {packet,2}, {reuseaddr, true}]) of
+ {ok, Socket} ->
+ TcpAddress = get_tcp_address(Socket),
+ {_,Port} = TcpAddress#net_address.address,
+ ErlEpmd = net_kernel:epmd_module(),
+ case ErlEpmd:register_node(Name, Port) of
+ {ok, Creation} ->
+ {ok, {Socket, TcpAddress, Creation}};
+ Error ->
+ Error
+ end;
+ Error ->
+ Error
+ end.
+
+do_listen(Options) ->
+ {First,Last} = case application:get_env(kernel,inet_dist_listen_min) of
+ {ok,N} when is_integer(N) ->
+ case application:get_env(kernel,
+ inet_dist_listen_max) of
+ {ok,M} when is_integer(M) ->
+ {N,M};
+ _ ->
+ {N,N}
+ end;
+ _ ->
+ {0,0}
+ end,
+ do_listen(First, Last, listen_options([{backlog,128}|Options])).
+
+do_listen(First,Last,_) when First > Last ->
+ {error,eaddrinuse};
+do_listen(First,Last,Options) ->
+ case gen_tcp:listen(First, Options) of
+ {error, eaddrinuse} ->
+ do_listen(First+1,Last,Options);
+ Other ->
+ Other
+ end.
+
+listen_options(Opts0) ->
+ Opts1 =
+ case application:get_env(kernel, inet_dist_use_interface) of
+ {ok, Ip} ->
+ [{ip, Ip} | Opts0];
+ _ ->
+ Opts0
+ end,
+ case application:get_env(kernel, inet_dist_listen_options) of
+ {ok,ListenOpts} ->
+ ListenOpts ++ Opts1;
+ _ ->
+ Opts1
+ end.
+
+
+%% ------------------------------------------------------------
+%% Accepts new connection attempts from other Erlang nodes.
+%% ------------------------------------------------------------
+
+accept(Listen) ->
+ spawn_opt(?MODULE, accept_loop, [self(), Listen], [link, {priority, max}]).
+
+accept_loop(Kernel, Listen) ->
+ ?trace("~p~n",[{?MODULE, accept_loop, self()}]),
+ case gen_tcp:accept(Listen) of
+ {ok, Socket} ->
+ DistCtrl = spawn_dist_cntrlr(Socket),
+ ?trace("~p~n",[{?MODULE, accept_loop, accepted, Socket, DistCtrl, self()}]),
+ flush_controller(DistCtrl, Socket),
+ gen_tcp:controlling_process(Socket, DistCtrl),
+ flush_controller(DistCtrl, Socket),
+ Kernel ! {accept,self(),DistCtrl,inet,tcp},
+ receive
+ {Kernel, controller, Pid} ->
+ call_ctrlr(DistCtrl, {supervisor, Pid}),
+ Pid ! {self(), controller};
+ {Kernel, unsupported_protocol} ->
+ exit(unsupported_protocol)
+ end,
+ accept_loop(Kernel, Listen);
+ Error ->
+ exit(Error)
+ end.
+
+flush_controller(Pid, Socket) ->
+ receive
+ {tcp, Socket, Data} ->
+ Pid ! {tcp, Socket, Data},
+ flush_controller(Pid, Socket);
+ {tcp_closed, Socket} ->
+ Pid ! {tcp_closed, Socket},
+ flush_controller(Pid, Socket)
+ after 0 ->
+ ok
+ end.
+
+%% ------------------------------------------------------------
+%% Accepts a new connection attempt from another Erlang node.
+%% Performs the handshake with the other side.
+%% ------------------------------------------------------------
+
+accept_connection(AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) ->
+ spawn_opt(?MODULE, do_accept,
+ [self(), AcceptPid, DistCtrl, MyNode, Allowed, SetupTime],
+ [link, {priority, max}]).
+
+do_accept(Kernel, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) ->
+ ?trace("~p~n",[{?MODULE, do_accept, self(), MyNode}]),
+ receive
+ {AcceptPid, controller} ->
+ Timer = dist_util:start_timer(SetupTime),
+ case check_ip(DistCtrl) of
+ true ->
+ HSData0 = hs_data_common(DistCtrl),
+ HSData = HSData0#hs_data{kernel_pid = Kernel,
+ this_node = MyNode,
+ socket = DistCtrl,
+ timer = Timer,
+ this_flags = 0,
+ allowed = Allowed},
+ dist_util:handshake_other_started(HSData);
+ {false,IP} ->
+ error_msg("** Connection attempt from "
+ "disallowed IP ~w ** ~n", [IP]),
+ ?shutdown(no_node)
+ end
+ end.
+
+%% we may not always want the nodelay behaviour
+%% for performance reasons
+
+nodelay() ->
+ case application:get_env(kernel, dist_nodelay) of
+ undefined ->
+ {nodelay, true};
+ {ok, true} ->
+ {nodelay, true};
+ {ok, false} ->
+ {nodelay, false};
+ _ ->
+ {nodelay, true}
+ end.
+
+%% ------------------------------------------------------------
+%% Setup a new connection to another Erlang node.
+%% Performs the handshake with the other side.
+%% ------------------------------------------------------------
+
+setup(Node, Type, MyNode, LongOrShortNames,SetupTime) ->
+ spawn_opt(?MODULE, do_setup,
+ [self(), Node, Type, MyNode, LongOrShortNames, SetupTime],
+ [link, {priority, max}]).
+
+do_setup(Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) ->
+ ?trace("~p~n",[{?MODULE, do_setup, self(), Node}]),
+ [Name, Address] = splitnode(Node, LongOrShortNames),
+ case inet:getaddr(Address, inet) of
+ {ok, Ip} ->
+ Timer = dist_util:start_timer(SetupTime),
+ ErlEpmd = net_kernel:epmd_module(),
+ case ErlEpmd:port_please(Name, Ip) of
+ {port, TcpPort, Version} ->
+ ?trace("port_please(~p) -> version ~p~n",
+ [Node,Version]),
+ dist_util:reset_timer(Timer),
+ case
+ gen_tcp:connect(
+ Ip, TcpPort,
+ connect_options([binary, {active, false}, {packet, 2}]))
+ of
+ {ok, Socket} ->
+ DistCtrl = spawn_dist_cntrlr(Socket),
+ call_ctrlr(DistCtrl, {supervisor, self()}),
+ flush_controller(DistCtrl, Socket),
+ gen_tcp:controlling_process(Socket, DistCtrl),
+ flush_controller(DistCtrl, Socket),
+ HSData0 = hs_data_common(DistCtrl),
+ HSData = HSData0#hs_data{kernel_pid = Kernel,
+ other_node = Node,
+ this_node = MyNode,
+ socket = DistCtrl,
+ timer = Timer,
+ this_flags = 0,
+ other_version = Version,
+ request_type = Type},
+ dist_util:handshake_we_started(HSData);
+ _ ->
+ %% Other Node may have closed since
+ %% port_please !
+ ?trace("other node (~p) "
+ "closed since port_please.~n",
+ [Node]),
+ ?shutdown(Node)
+ end;
+ _ ->
+ ?trace("port_please (~p) "
+ "failed.~n", [Node]),
+ ?shutdown(Node)
+ end;
+ _Other ->
+ ?trace("inet_getaddr(~p) "
+ "failed (~p).~n", [Node,_Other]),
+ ?shutdown(Node)
+ end.
+
+connect_options(Opts) ->
+ case application:get_env(kernel, inet_dist_connect_options) of
+ {ok,ConnectOpts} ->
+ ConnectOpts ++ Opts;
+ _ ->
+ Opts
+ end.
+
+%%
+%% Close a socket.
+%%
+close(Listen) ->
+ gen_tcp:close(Listen).
+
+
+%% If Node is illegal terminate the connection setup!!
+splitnode(Node, LongOrShortNames) ->
+ case split_node(atom_to_list(Node), $@, []) of
+ [Name|Tail] when Tail =/= [] ->
+ Host = lists:append(Tail),
+ case split_node(Host, $., []) of
+ [_] when LongOrShortNames =:= longnames ->
+ case inet:parse_address(Host) of
+ {ok, _} ->
+ [Name, Host];
+ _ ->
+ error_msg("** System running to use "
+ "fully qualified "
+ "hostnames **~n"
+ "** Hostname ~ts is illegal **~n",
+ [Host]),
+ ?shutdown(Node)
+ end;
+ L when length(L) > 1, LongOrShortNames =:= shortnames ->
+ error_msg("** System NOT running to use fully qualified "
+ "hostnames **~n"
+ "** Hostname ~ts is illegal **~n",
+ [Host]),
+ ?shutdown(Node);
+ _ ->
+ [Name, Host]
+ end;
+ [_] ->
+ error_msg("** Nodename ~p illegal, no '@' character **~n",
+ [Node]),
+ ?shutdown(Node);
+ _ ->
+ error_msg("** Nodename ~p illegal **~n", [Node]),
+ ?shutdown(Node)
+ end.
+
+split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])];
+split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]);
+split_node([], _, Ack) -> [lists:reverse(Ack)].
+
+%% ------------------------------------------------------------
+%% Fetch local information about a Socket.
+%% ------------------------------------------------------------
+get_tcp_address(Socket) ->
+ {ok, Address} = inet:sockname(Socket),
+ {ok, Host} = inet:gethostname(),
+ #net_address {
+ address = Address,
+ host = Host,
+ protocol = tcp,
+ family = inet
+ }.
+
+%% ------------------------------------------------------------
+%% Do only accept new connection attempts from nodes at our
+%% own LAN, if the check_ip environment parameter is true.
+%% ------------------------------------------------------------
+check_ip(DistCtrl) ->
+ case application:get_env(check_ip) of
+ {ok, true} ->
+ case get_ifs(DistCtrl) of
+ {ok, IFs, IP} ->
+ check_ip(IFs, IP);
+ _ ->
+ ?shutdown(no_node)
+ end;
+ _ ->
+ true
+ end.
+
+get_ifs(DistCtrl) ->
+ Socket = call_ctrlr(DistCtrl, socket),
+ case inet:peername(Socket) of
+ {ok, {IP, _}} ->
+ case inet:getif(Socket) of
+ {ok, IFs} -> {ok, IFs, IP};
+ Error -> Error
+ end;
+ Error ->
+ Error
+ end.
+
+check_ip([{OwnIP, _, Netmask}|IFs], PeerIP) ->
+ case {inet_tcp:mask(Netmask, PeerIP), inet_tcp:mask(Netmask, OwnIP)} of
+ {M, M} -> true;
+ _ -> check_ip(IFs, PeerIP)
+ end;
+check_ip([], PeerIP) ->
+ {false, PeerIP}.
+
+is_node_name(Node) when is_atom(Node) ->
+ case split_node(atom_to_list(Node), $@, []) of
+ [_, _Host] -> true;
+ _ -> false
+ end;
+is_node_name(_Node) ->
+ false.
+
+hs_data_common(DistCtrl) ->
+ TickHandler = call_ctrlr(DistCtrl, tick_handler),
+ Socket = call_ctrlr(DistCtrl, socket),
+ #hs_data{f_send = send_fun(),
+ f_recv = recv_fun(),
+ f_setopts_pre_nodeup = setopts_pre_nodeup_fun(),
+ f_setopts_post_nodeup = setopts_post_nodeup_fun(),
+ f_getll = getll_fun(),
+ f_handshake_complete = handshake_complete_fun(),
+ f_address = address_fun(),
+ mf_setopts = setopts_fun(DistCtrl, Socket),
+ mf_getopts = getopts_fun(DistCtrl, Socket),
+ mf_getstat = getstat_fun(DistCtrl, Socket),
+ mf_tick = tick_fun(DistCtrl, TickHandler)}.
+
+%%% ------------------------------------------------------------
+%%% Distribution controller processes
+%%% ------------------------------------------------------------
+
+%%
+%% There will be five parties working together when the
+%% connection is up:
+%% - The gen_tcp socket. Providing a tcp/ip connection
+%% to the other node.
+%% - The output handler. It will dispatch all outgoing
+%% traffic from the VM to the gen_tcp socket. This
+%% process is registered as distribution controller
+%% for this channel with the VM.
+%% - The input handler. It will dispatch all incoming
+%% traffic from the gen_tcp socket to the VM. This
+%% process is also the socket owner and receives
+%% incoming traffic using active-N.
+%% - The tick handler. Dispatches asynchronous tick
+%% requests to the socket. It executes on max priority
+%% since it is important to get ticks through to the
+%% other end.
+%% - The channel supervisor (provided by dist_util). It
+%% monitors traffic. Issue tick requests to the tick
+%% handler when no outgoing traffic is seen and bring
+%% the connection down if no incoming traffic is seen.
+%% This process also executes on max priority.
+%%
+%% These parties are linked togheter so should one
+%% of them fail, all of them are terminated and the
+%% connection is taken down.
+%%
+
+%% In order to avoid issues with lingering signal binaries
+%% we enable off-heap message queue data as well as fullsweep
+%% after 0. The fullsweeps will be cheap since we have more
+%% or less no live data.
+-define(DIST_CNTRL_COMMON_SPAWN_OPTS,
+ [{message_queue_data, off_heap},
+ {fullsweep_after, 0}]).
+
+tick_fun(DistCtrl, TickHandler) ->
+ fun (Ctrl) when Ctrl == DistCtrl ->
+ TickHandler ! tick
+ end.
+
+getstat_fun(DistCtrl, Socket) ->
+ fun (Ctrl) when Ctrl == DistCtrl ->
+ case inet:getstat(Socket, [recv_cnt, send_cnt, send_pend]) of
+ {ok, Stat} ->
+ split_stat(Stat,0,0,0);
+ Error ->
+ Error
+ end
+ end.
+
+split_stat([{recv_cnt, R}|Stat], _, W, P) ->
+ split_stat(Stat, R, W, P);
+split_stat([{send_cnt, W}|Stat], R, _, P) ->
+ split_stat(Stat, R, W, P);
+split_stat([{send_pend, P}|Stat], R, W, _) ->
+ split_stat(Stat, R, W, P);
+split_stat([], R, W, P) ->
+ {ok, R, W, P}.
+
+setopts_fun(DistCtrl, Socket) ->
+ fun (Ctrl, Opts) when Ctrl == DistCtrl ->
+ setopts(Socket, Opts)
+ end.
+
+getopts_fun(DistCtrl, Socket) ->
+ fun (Ctrl, Opts) when Ctrl == DistCtrl ->
+ getopts(Socket, Opts)
+ end.
+
+setopts(S, Opts) ->
+ case [Opt || {K,_}=Opt <- Opts,
+ K =:= active orelse K =:= deliver orelse K =:= packet] of
+ [] -> inet:setopts(S,Opts);
+ Opts1 -> {error, {badopts,Opts1}}
+ end.
+
+getopts(S, Opts) ->
+ inet:getopts(S, Opts).
+
+send_fun() ->
+ fun (Ctrlr, Packet) ->
+ call_ctrlr(Ctrlr, {send, Packet})
+ end.
+
+recv_fun() ->
+ fun (Ctrlr, Length, Timeout) ->
+ case call_ctrlr(Ctrlr, {recv, Length, Timeout}) of
+ {ok, Bin} when is_binary(Bin) ->
+ {ok, binary_to_list(Bin)};
+ Other ->
+ Other
+ end
+ end.
+
+getll_fun() ->
+ fun (Ctrlr) ->
+ call_ctrlr(Ctrlr, getll)
+ end.
+
+address_fun() ->
+ fun (Ctrlr, Node) ->
+ case call_ctrlr(Ctrlr, {address, Node}) of
+ {error, no_node} -> %% No '@' or more than one '@' in node name.
+ ?shutdown(no_node);
+ Res ->
+ Res
+ end
+ end.
+
+setopts_pre_nodeup_fun() ->
+ fun (Ctrlr) ->
+ call_ctrlr(Ctrlr, pre_nodeup)
+ end.
+
+setopts_post_nodeup_fun() ->
+ fun (Ctrlr) ->
+ call_ctrlr(Ctrlr, post_nodeup)
+ end.
+
+handshake_complete_fun() ->
+ fun (Ctrlr, Node, DHandle) ->
+ call_ctrlr(Ctrlr, {handshake_complete, Node, DHandle})
+ end.
+
+call_ctrlr(Ctrlr, Msg) ->
+ Ref = erlang:monitor(process, Ctrlr),
+ Ctrlr ! {Ref, self(), Msg},
+ receive
+ {Ref, Res} ->
+ erlang:demonitor(Ref, [flush]),
+ Res;
+ {'DOWN', Ref, process, Ctrlr, Reason} ->
+ exit({dist_controller_exit, Reason})
+ end.
+
+%%
+%% The tick handler process writes a tick to the
+%% socket when it receives a 'tick' message from
+%% the connection supervisor.
+%%
+%% We are not allowed to block the connection
+%% superviser when writing a tick and we also want
+%% the tick to go through even during a heavily
+%% loaded system. gen_tcp does not have a
+%% non-blocking send operation exposed in its API
+%% and we don't want to run the distribution
+%% controller under high priority. Therefore this
+%% sparate process with max prio that dispatches
+%% ticks.
+%%
+dist_cntrlr_tick_handler(Socket) ->
+ receive
+ tick ->
+ %% May block due to busy port...
+ sock_send(Socket, "");
+ _ ->
+ ok
+ end,
+ dist_cntrlr_tick_handler(Socket).
+
+spawn_dist_cntrlr(Socket) ->
+ spawn_opt(?MODULE, dist_cntrlr_setup, [Socket],
+ [{priority, max}] ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS).
+
+dist_cntrlr_setup(Socket) ->
+ TickHandler = spawn_opt(?MODULE, dist_cntrlr_tick_handler,
+ [Socket],
+ [link, {priority, max}]
+ ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS),
+ dist_cntrlr_setup_loop(Socket, TickHandler, undefined).
+
+%%
+%% During the handshake phase we loop in dist_cntrlr_setup().
+%% When the connection is up we spawn an input handler and
+%% continue as output handler.
+%%
+dist_cntrlr_setup_loop(Socket, TickHandler, Sup) ->
+ receive
+ {tcp_closed, Socket} ->
+ exit(connection_closed);
+
+ {Ref, From, {supervisor, Pid}} ->
+ Res = link(Pid),
+ From ! {Ref, Res},
+ dist_cntrlr_setup_loop(Socket, TickHandler, Pid);
+
+ {Ref, From, tick_handler} ->
+ From ! {Ref, TickHandler},
+ dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
+
+ {Ref, From, socket} ->
+ From ! {Ref, Socket},
+ dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
+
+ {Ref, From, {send, Packet}} ->
+ Res = gen_tcp:send(Socket, Packet),
+ From ! {Ref, Res},
+ dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
+
+ {Ref, From, {recv, Length, Timeout}} ->
+ Res = gen_tcp:recv(Socket, Length, Timeout),
+ From ! {Ref, Res},
+ dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
+
+ {Ref, From, getll} ->
+ From ! {Ref, {ok, self()}},
+ dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
+
+ {Ref, From, {address, Node}} ->
+ Res = case inet:peername(Socket) of
+ {ok, Address} ->
+ case split_node(atom_to_list(Node), $@, []) of
+ [_,Host] ->
+ #net_address{address=Address,host=Host,
+ protocol=tcp, family=inet};
+ _ ->
+ {error, no_node}
+ end
+ end,
+ From ! {Ref, Res},
+ dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
+
+ {Ref, From, pre_nodeup} ->
+ Res = inet:setopts(Socket,
+ [{active, false},
+ {packet, 4},
+ nodelay()]),
+ From ! {Ref, Res},
+ dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
+
+ {Ref, From, post_nodeup} ->
+ Res = inet:setopts(Socket,
+ [{active, false},
+ {packet, 4},
+ nodelay()]),
+ From ! {Ref, Res},
+ dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
+
+ {Ref, From, {handshake_complete, _Node, DHandle}} ->
+ From ! {Ref, ok},
+ %% Handshake complete! Begin dispatching traffic...
+
+ %% We use separate process for dispatching input. This
+ %% is not necessary, but it enables parallel execution
+ %% of independent work loads at the same time as it
+ %% simplifies the the implementation...
+ InputHandler = spawn_opt(?MODULE, dist_cntrlr_input_setup,
+ [DHandle, Socket, Sup],
+ [link] ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS),
+
+ flush_controller(InputHandler, Socket),
+ gen_tcp:controlling_process(Socket, InputHandler),
+ flush_controller(InputHandler, Socket),
+
+ ok = erlang:dist_ctrl_input_handler(DHandle, InputHandler),
+
+ InputHandler ! DHandle,
+
+ %% From now on we execute on normal priority
+ process_flag(priority, normal),
+ erlang:dist_ctrl_get_data_notification(DHandle),
+ dist_cntrlr_output_loop(DHandle, Socket)
+ end.
+
+%% We use active 10 for good throughput while still
+%% maintaining back-pressure if the input controller
+%% isn't able to handle all incoming messages...
+-define(ACTIVE_INPUT, 10).
+
+dist_cntrlr_input_setup(DHandle, Socket, Sup) ->
+ link(Sup),
+ %% Ensure we don't try to put data before registerd
+ %% as input handler...
+ receive
+ DHandle ->
+ dist_cntrlr_input_loop(DHandle, Socket, 0)
+ end.
+
+dist_cntrlr_input_loop(DHandle, Socket, N) when N =< ?ACTIVE_INPUT/2 ->
+ inet:setopts(Socket, [{active, ?ACTIVE_INPUT - N}]),
+ dist_cntrlr_input_loop(DHandle, Socket, ?ACTIVE_INPUT);
+dist_cntrlr_input_loop(DHandle, Socket, N) ->
+ receive
+ {tcp_closed, Socket} ->
+ %% Connection to remote node terminated...
+ exit(connection_closed);
+
+ {tcp, Socket, Data} ->
+ %% Incoming data from remote node...
+ try erlang:dist_ctrl_put_data(DHandle, Data)
+ catch _ : _ -> death_row()
+ end,
+ dist_cntrlr_input_loop(DHandle, Socket, N-1);
+
+ _ ->
+ %% Ignore...
+ dist_cntrlr_input_loop(DHandle, Socket, N)
+ end.
+
+dist_cntrlr_send_data(DHandle, Socket) ->
+ case erlang:dist_ctrl_get_data(DHandle) of
+ none ->
+ erlang:dist_ctrl_get_data_notification(DHandle);
+ Data ->
+ sock_send(Socket, Data),
+ dist_cntrlr_send_data(DHandle, Socket)
+ end.
+
+
+dist_cntrlr_output_loop(DHandle, Socket) ->
+ receive
+ dist_data ->
+ %% Outgoing data from this node...
+ try dist_cntrlr_send_data(DHandle, Socket)
+ catch _ : _ -> death_row()
+ end,
+ dist_cntrlr_output_loop(DHandle, Socket);
+
+ {send, From, Ref, Data} ->
+ %% This is for testing only!
+ %%
+ %% Needed by some OTP distribution
+ %% test suites...
+ sock_send(Socket, Data),
+ From ! {Ref, ok},
+ dist_cntrlr_output_loop(DHandle, Socket);
+
+ _ ->
+ %% Drop garbage message...
+ dist_cntrlr_output_loop(DHandle, Socket)
+
+ end.
+
+sock_send(Socket, Data) ->
+ try gen_tcp:send(Socket, Data) of
+ ok -> ok;
+ {error, Reason} -> death_row({send_error, Reason})
+ catch
+ Type : Reason -> death_row({send_error, {Type, Reason}})
+ end.
+
+death_row() ->
+ death_row(connection_closed).
+
+death_row(normal) ->
+ %% We do not want to exit with normal
+ %% exit reason since it wont bring down
+ %% linked processes...
+ death_row();
+death_row(Reason) ->
+ %% When the connection is on its way down operations
+ %% begin to fail. We catch the failures and call
+ %% this function waiting for termination. We should
+ %% be terminated by one of our links to the other
+ %% involved parties that began bringing the
+ %% connection down. By waiting for termination we
+ %% avoid altering the exit reason for the connection
+ %% teardown. We however limit the wait to 5 seconds
+ %% and bring down the connection ourselves if not
+ %% terminated...
+ receive after 5000 -> exit(Reason) end.