aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel/src/dist_util.erl
diff options
context:
space:
mode:
authorErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
committerErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/kernel/src/dist_util.erl
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/kernel/src/dist_util.erl')
-rw-r--r--lib/kernel/src/dist_util.erl762
1 files changed, 762 insertions, 0 deletions
diff --git a/lib/kernel/src/dist_util.erl b/lib/kernel/src/dist_util.erl
new file mode 100644
index 0000000000..a2937d60b8
--- /dev/null
+++ b/lib/kernel/src/dist_util.erl
@@ -0,0 +1,762 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1999-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+%%%----------------------------------------------------------------------
+%%% Purpose : The handshake of a streamed distribution connection
+%%% in a separate file to make it usable for other
+%%% distribution protocols.
+%%%----------------------------------------------------------------------
+
+-module(dist_util).
+
+%%-compile(export_all).
+-export([handshake_we_started/1, handshake_other_started/1,
+ start_timer/1, setup_timer/2,
+ reset_timer/1, cancel_timer/1,
+ shutdown/3, shutdown/4]).
+
+-import(error_logger,[error_msg/2]).
+
+-include("dist_util.hrl").
+-include("dist.hrl").
+
+-ifdef(DEBUG).
+-define(shutdown_trace(A,B), io:format(A,B)).
+-else.
+-define(shutdown_trace(A,B), noop).
+-endif.
+
+-define(to_port(FSend, Socket, Data),
+ case FSend(Socket, Data) of
+ {error, closed} ->
+ self() ! {tcp_closed, Socket},
+ {error, closed};
+ R ->
+ R
+ end).
+
+
+-define(int16(X), [((X) bsr 8) band 16#ff, (X) band 16#ff]).
+
+-define(int32(X),
+ [((X) bsr 24) band 16#ff, ((X) bsr 16) band 16#ff,
+ ((X) bsr 8) band 16#ff, (X) band 16#ff]).
+
+-define(i16(X1,X0),
+ (?u16(X1,X0) -
+ (if (X1) > 127 -> 16#10000; true -> 0 end))).
+
+-define(u16(X1,X0),
+ (((X1) bsl 8) bor (X0))).
+
+-define(u32(X3,X2,X1,X0),
+ (((X3) bsl 24) bor ((X2) bsl 16) bor ((X1) bsl 8) bor (X0))).
+
+-record(tick, {read = 0,
+ write = 0,
+ tick = 0,
+ ticked = 0
+ }).
+
+remove_flag(Flag, Flags) ->
+ case Flags band Flag of
+ 0 ->
+ Flags;
+ _ ->
+ Flags - Flag
+ end.
+
+adjust_flags(ThisFlags, OtherFlags) ->
+ case (?DFLAG_PUBLISHED band ThisFlags) band OtherFlags of
+ 0 ->
+ {remove_flag(?DFLAG_PUBLISHED, ThisFlags),
+ remove_flag(?DFLAG_PUBLISHED, OtherFlags)};
+ _ ->
+ {ThisFlags, OtherFlags}
+ end.
+
+publish_flag(hidden, _) ->
+ 0;
+publish_flag(_, OtherNode) ->
+ case net_kernel:publish_on_node(OtherNode) of
+ true ->
+ ?DFLAG_PUBLISHED;
+ _ ->
+ 0
+ end.
+
+make_this_flags(RequestType, OtherNode) ->
+ publish_flag(RequestType, OtherNode) bor
+ %% The parenthesis below makes the compiler generate better code.
+ (?DFLAG_EXPORT_PTR_TAG bor
+ ?DFLAG_EXTENDED_PIDS_PORTS bor
+ ?DFLAG_EXTENDED_REFERENCES bor
+ ?DFLAG_DIST_MONITOR bor
+ ?DFLAG_FUN_TAGS bor
+ ?DFLAG_DIST_MONITOR_NAME bor
+ ?DFLAG_HIDDEN_ATOM_CACHE bor
+ ?DFLAG_NEW_FUN_TAGS bor
+ ?DFLAG_BIT_BINARIES bor
+ ?DFLAG_NEW_FLOATS bor
+ ?DFLAG_UNICODE_IO bor
+ ?DFLAG_DIST_HDR_ATOM_CACHE bor
+ ?DFLAG_SMALL_ATOM_TAGS).
+
+handshake_other_started(#hs_data{request_type=ReqType}=HSData0) ->
+ {PreOtherFlags,Node,Version} = recv_name(HSData0),
+ PreThisFlags = make_this_flags(ReqType, Node),
+ {ThisFlags, OtherFlags} = adjust_flags(PreThisFlags,
+ PreOtherFlags),
+ HSData = HSData0#hs_data{this_flags=ThisFlags,
+ other_flags=OtherFlags,
+ other_version=Version,
+ other_node=Node,
+ other_started=true},
+ check_dflag_xnc(HSData),
+ is_allowed(HSData),
+ ?debug({"MD5 connection from ~p (V~p)~n",
+ [Node, HSData#hs_data.other_version]}),
+ mark_pending(HSData),
+ {MyCookie,HisCookie} = get_cookies(Node),
+ ChallengeA = gen_challenge(),
+ send_challenge(HSData, ChallengeA),
+ reset_timer(HSData#hs_data.timer),
+ ChallengeB = recv_challenge_reply(HSData, ChallengeA, MyCookie),
+ send_challenge_ack(HSData, gen_digest(ChallengeB, HisCookie)),
+ ?debug({dist_util, self(), accept_connection, Node}),
+ connection(HSData).
+
+%%
+%% check if connecting node is allowed to connect
+%% with allow-node-scheme
+%%
+is_allowed(#hs_data{other_node = Node,
+ allowed = Allowed} = HSData) ->
+ case lists:member(Node, Allowed) of
+ false when Allowed =/= [] ->
+ send_status(HSData, not_allowed),
+ error_msg("** Connection attempt from "
+ "disallowed node ~w ** ~n", [Node]),
+ ?shutdown(Node);
+ _ -> true
+ end.
+
+%%
+%% Check that both nodes can handle the same types of extended
+%% node containers. If they can not, abort the connection.
+%%
+check_dflag_xnc(#hs_data{other_node = Node,
+ other_flags = OtherFlags,
+ other_started = OtherStarted} = HSData) ->
+ XRFlg = ?DFLAG_EXTENDED_REFERENCES,
+ XPPFlg = case erlang:system_info(compat_rel) of
+ R when R >= 10 ->
+ ?DFLAG_EXTENDED_PIDS_PORTS;
+ _ ->
+ 0
+ end,
+ ReqXncFlags = XRFlg bor XPPFlg,
+ case OtherFlags band ReqXncFlags =:= ReqXncFlags of
+ true ->
+ ok;
+ false ->
+ What = case {OtherFlags band XRFlg =:= XRFlg,
+ OtherFlags band XPPFlg =:= XPPFlg} of
+ {false, false} -> "references, pids and ports";
+ {true, false} -> "pids and ports";
+ {false, true} -> "references"
+ end,
+ case OtherStarted of
+ true ->
+ send_status(HSData, not_allowed),
+ Dir = "from",
+ How = "rejected";
+ _ ->
+ Dir = "to",
+ How = "aborted"
+ end,
+ error_msg("** ~w: Connection attempt ~s node ~w ~s "
+ "since it cannot handle extended ~s. "
+ "**~n", [node(), Dir, Node, How, What]),
+ ?shutdown(Node)
+ end.
+
+
+%% No nodedown will be sent if we fail before this process has
+%% succeeded to mark the node as pending.
+
+mark_pending(#hs_data{kernel_pid=Kernel,
+ other_node=Node,
+ this_node=MyNode}=HSData) ->
+ case do_mark_pending(Kernel, MyNode, Node,
+ (HSData#hs_data.f_address)(HSData#hs_data.socket,
+ Node),
+ HSData#hs_data.other_flags) of
+ ok ->
+ send_status(HSData, ok),
+ reset_timer(HSData#hs_data.timer);
+
+ ok_pending ->
+ send_status(HSData, ok_simultaneous),
+ reset_timer(HSData#hs_data.timer);
+
+ nok_pending ->
+ send_status(HSData, nok),
+ ?shutdown(Node);
+
+ up_pending ->
+ %% Check if connection is still alive, no
+ %% implies that the connection is no longer pending
+ %% due to simultaneous connect
+ do_alive(HSData),
+
+ %% This can happen if the other node goes down,
+ %% and goes up again and contact us before we have
+ %% detected that the socket was closed.
+ wait_pending(Kernel),
+ reset_timer(HSData#hs_data.timer);
+
+ already_pending ->
+ %% FIXME: is this a case ?
+ ?debug({dist_util,self(),mark_pending,already_pending,Node}),
+ ?shutdown(Node)
+ end.
+
+
+%%
+%% Marking pending and negotiating away
+%% simultaneous connection problems
+%%
+
+wait_pending(Kernel) ->
+ receive
+ {Kernel, pending} ->
+ ?trace("wait_pending returned for pid ~p.~n",
+ [self()]),
+ ok
+ end.
+
+do_alive(#hs_data{other_node = Node} = HSData) ->
+ send_status(HSData, alive),
+ case recv_status(HSData) of
+ true -> true;
+ false -> ?shutdown(Node)
+ end.
+
+do_mark_pending(Kernel, MyNode, Node, Address, Flags) ->
+ Kernel ! {self(), {accept_pending,MyNode,Node,Address,
+ publish_type(Flags)}},
+ receive
+ {Kernel,{accept_pending,Ret}} ->
+ ?trace("do_mark_pending(~p,~p,~p,~p) -> ~p~n",
+ [Kernel,Node,Address,Flags,Ret]),
+ Ret
+ end.
+
+is_pending(Kernel, Node) ->
+ Kernel ! {self(), {is_pending, Node}},
+ receive
+ {Kernel, {is_pending, Reply}} -> Reply
+ end.
+
+%%
+%% This will tell the net_kernel about the nodedown as it
+%% recognizes the exit signal.
+%% The termination of this process does also imply that the Socket
+%% is closed in a controlled way by inet_drv.
+%%
+
+-spec shutdown(atom(), non_neg_integer(), term()) -> no_return().
+
+shutdown(Module, Line, Data) ->
+ shutdown(Module, Line, Data, shutdown).
+
+-spec shutdown(atom(), non_neg_integer(), term(), term()) -> no_return().
+
+shutdown(_Module, _Line, _Data, Reason) ->
+ ?shutdown_trace("Net Kernel 2: shutting down connection "
+ "~p:~p, data ~p,reason ~p~n",
+ [_Module,_Line, _Data, Reason]),
+ flush_down(),
+ exit(Reason).
+%% Use this line to debug connection.
+%% Set net_kernel verbose = 1 as well.
+%% exit({Reason, ?MODULE, _Line, _Data, erlang:now()}).
+
+
+flush_down() ->
+ receive
+ {From, get_status} ->
+ From ! {self(), get_status, error},
+ flush_down()
+ after 0 ->
+ ok
+ end.
+
+handshake_we_started(#hs_data{request_type=ReqType,
+ other_node=Node}=PreHSData) ->
+ PreThisFlags = make_this_flags(ReqType, Node),
+ HSData = PreHSData#hs_data{this_flags=PreThisFlags},
+ send_name(HSData),
+ recv_status(HSData),
+ {PreOtherFlags,ChallengeA} = recv_challenge(HSData),
+ {ThisFlags,OtherFlags} = adjust_flags(PreThisFlags, PreOtherFlags),
+ NewHSData = HSData#hs_data{this_flags = ThisFlags,
+ other_flags = OtherFlags,
+ other_started = false},
+ check_dflag_xnc(NewHSData),
+ MyChallenge = gen_challenge(),
+ {MyCookie,HisCookie} = get_cookies(Node),
+ send_challenge_reply(NewHSData,MyChallenge,
+ gen_digest(ChallengeA,HisCookie)),
+ reset_timer(NewHSData#hs_data.timer),
+ recv_challenge_ack(NewHSData, MyChallenge, MyCookie),
+ connection(NewHSData).
+
+%% --------------------------------------------------------------
+%% The connection has been established.
+%% --------------------------------------------------------------
+
+connection(#hs_data{other_node = Node,
+ socket = Socket,
+ f_address = FAddress,
+ f_setopts_pre_nodeup = FPreNodeup,
+ f_setopts_post_nodeup = FPostNodeup}= HSData) ->
+ cancel_timer(HSData#hs_data.timer),
+ PType = publish_type(HSData#hs_data.other_flags),
+ case FPreNodeup(Socket) of
+ ok ->
+ do_setnode(HSData), % Succeeds or exits the process.
+ Address = FAddress(Socket,Node),
+ mark_nodeup(HSData,Address),
+ case FPostNodeup(Socket) of
+ ok ->
+ con_loop(HSData#hs_data.kernel_pid,
+ Node,
+ Socket,
+ Address,
+ HSData#hs_data.this_node,
+ PType,
+ #tick{},
+ HSData#hs_data.mf_tick,
+ HSData#hs_data.mf_getstat);
+ _ ->
+ ?shutdown2(Node, connection_setup_failed)
+ end;
+ _ ->
+ ?shutdown(Node)
+ end.
+
+%% Generate a message digest from Challenge number and Cookie
+gen_digest(Challenge, Cookie) when is_integer(Challenge), is_atom(Cookie) ->
+ erlang:md5([atom_to_list(Cookie)|integer_to_list(Challenge)]).
+
+%% ---------------------------------------------------------------
+%% Challenge code
+%% gen_challenge() returns a "random" number
+%% ---------------------------------------------------------------
+gen_challenge() ->
+ {A,B,C} = erlang:now(),
+ {D,_} = erlang:statistics(reductions),
+ {E,_} = erlang:statistics(runtime),
+ {F,_} = erlang:statistics(wall_clock),
+ {G,H,_} = erlang:statistics(garbage_collection),
+ %% A(8) B(16) C(16)
+ %% D(16),E(8), F(16) G(8) H(16)
+ ( ((A bsl 24) + (E bsl 16) + (G bsl 8) + F) bxor
+ (B + (C bsl 16)) bxor
+ (D + (H bsl 16)) ) band 16#ffffffff.
+
+%%
+%% Get the cookies for a node from auth
+%%
+get_cookies(Node) ->
+ case auth:get_cookie(Node) of
+ X when is_atom(X) ->
+ {X,X}
+% {Y,Z} when is_atom(Y), is_atom(Z) ->
+% {Y,Z};
+% _ ->
+% erlang:error("Corrupt cookie database")
+ end.
+
+%% No error return; either succeeds or terminates the process.
+do_setnode(#hs_data{other_node = Node, socket = Socket,
+ other_flags = Flags, other_version = Version,
+ f_getll = GetLL}) ->
+ case GetLL(Socket) of
+ {ok,Port} ->
+ ?trace("setnode(md5,~p ~p ~p)~n",
+ [Node, Port, {publish_type(Flags),
+ '(', Flags, ')',
+ Version}]),
+ case (catch
+ erlang:setnode(Node, Port,
+ {Flags, Version, '', ''})) of
+ {'EXIT', {system_limit, _}} ->
+ error_msg("** Distribution system limit reached, "
+ "no table space left for node ~w ** ~n",
+ [Node]),
+ ?shutdown(Node);
+ {'EXIT', Other} ->
+ exit(Other);
+ _Else ->
+ ok
+ end;
+ _ ->
+ error_msg("** Distribution connection error, "
+ "could not get low level port for node ~w ** ~n",
+ [Node]),
+ ?shutdown(Node)
+ end.
+
+mark_nodeup(#hs_data{kernel_pid = Kernel,
+ other_node = Node,
+ other_flags = Flags,
+ other_started = OtherStarted},
+ Address) ->
+ Kernel ! {self(), {nodeup,Node,Address,publish_type(Flags),
+ true}},
+ receive
+ {Kernel, inserted} ->
+ ok;
+ {Kernel, bad_request} ->
+ TypeT = case OtherStarted of
+ true ->
+ "accepting connection";
+ _ ->
+ "initiating connection"
+ end,
+ error_msg("Fatal: ~p was not allowed to "
+ "send {nodeup, ~p} to kernel when ~s~n",
+ [self(), Node, TypeT]),
+ ?shutdown(Node)
+ end.
+
+con_loop(Kernel, Node, Socket, TcpAddress,
+ MyNode, Type, Tick, MFTick, MFGetstat) ->
+ receive
+ {tcp_closed, Socket} ->
+ ?shutdown2(Node, connection_closed);
+ {Kernel, disconnect} ->
+ ?shutdown2(Node, disconnected);
+ {Kernel, aux_tick} ->
+ case MFGetstat(Socket) of
+ {ok, _, _, PendWrite} ->
+ send_tick(Socket, PendWrite, MFTick);
+ _ ->
+ ignore_it
+ end,
+ con_loop(Kernel, Node, Socket, TcpAddress, MyNode, Type,
+ Tick, MFTick, MFGetstat);
+ {Kernel, tick} ->
+ case send_tick(Socket, Tick, Type,
+ MFTick, MFGetstat) of
+ {ok, NewTick} ->
+ con_loop(Kernel, Node, Socket, TcpAddress,
+ MyNode, Type, NewTick, MFTick,
+ MFGetstat);
+ {error, not_responding} ->
+ error_msg("** Node ~p not responding **~n"
+ "** Removing (timedout) connection **~n",
+ [Node]),
+ ?shutdown2(Node, net_tick_timeout);
+ _Other ->
+ ?shutdown2(Node, send_net_tick_failed)
+ end;
+ {From, get_status} ->
+ case MFGetstat(Socket) of
+ {ok, Read, Write, _} ->
+ From ! {self(), get_status, {ok, Read, Write}},
+ con_loop(Kernel, Node, Socket, TcpAddress,
+ MyNode,
+ Type, Tick,
+ MFTick, MFGetstat);
+ _ ->
+ ?shutdown2(Node, get_status_failed)
+ end
+ end.
+
+
+%% ------------------------------------------------------------
+%% Misc. functions.
+%% ------------------------------------------------------------
+
+send_name(#hs_data{socket = Socket, this_node = Node,
+ f_send = FSend,
+ this_flags = Flags,
+ other_version = Version}) ->
+ ?trace("send_name: node=~w, version=~w\n",
+ [Node,Version]),
+ ?to_port(FSend, Socket,
+ [$n, ?int16(Version), ?int32(Flags), atom_to_list(Node)]).
+
+send_challenge(#hs_data{socket = Socket, this_node = Node,
+ other_version = Version,
+ this_flags = Flags,
+ f_send = FSend},
+ Challenge ) ->
+ ?trace("send: challenge=~w version=~w\n",
+ [Challenge,Version]),
+ ?to_port(FSend, Socket, [$n,?int16(Version), ?int32(Flags),
+ ?int32(Challenge),
+ atom_to_list(Node)]).
+
+send_challenge_reply(#hs_data{socket = Socket, f_send = FSend},
+ Challenge, Digest) ->
+ ?trace("send_reply: challenge=~w digest=~p\n",
+ [Challenge,Digest]),
+ ?to_port(FSend, Socket, [$r,?int32(Challenge),Digest]).
+
+send_challenge_ack(#hs_data{socket = Socket, f_send = FSend},
+ Digest) ->
+ ?trace("send_ack: digest=~p\n", [Digest]),
+ ?to_port(FSend, Socket, [$a,Digest]).
+
+
+%%
+%% Get the name of the other side.
+%% Close the connection if invalid data.
+%% The IP address sent is not interesting (as in the old
+%% tcp_drv.c which used it to detect simultaneous connection
+%% attempts).
+%%
+recv_name(#hs_data{socket = Socket, f_recv = Recv}) ->
+ case Recv(Socket, 0, infinity) of
+ {ok,Data} ->
+ get_name(Data);
+ _ ->
+ ?shutdown(no_node)
+ end.
+
+get_name([$n,VersionA, VersionB, Flag1, Flag2, Flag3, Flag4 | OtherNode]) ->
+ {?u32(Flag1, Flag2, Flag3, Flag4), list_to_atom(OtherNode),
+ ?u16(VersionA,VersionB)};
+get_name(Data) ->
+ ?shutdown(Data).
+
+publish_type(Flags) ->
+ case Flags band ?DFLAG_PUBLISHED of
+ 0 ->
+ hidden;
+ _ ->
+ normal
+ end.
+
+%% wait for challenge after connect
+recv_challenge(#hs_data{socket=Socket,other_node=Node,
+ other_version=Version,f_recv=Recv}) ->
+ case Recv(Socket, 0, infinity) of
+ {ok,[$n,V1,V0,Fl1,Fl2,Fl3,Fl4,CA3,CA2,CA1,CA0 | Ns]} ->
+ Flags = ?u32(Fl1,Fl2,Fl3,Fl4),
+ case {list_to_existing_atom(Ns),?u16(V1,V0)} of
+ {Node,Version} ->
+ Challenge = ?u32(CA3,CA2,CA1,CA0),
+ ?trace("recv: node=~w, challenge=~w version=~w\n",
+ [Node, Challenge,Version]),
+ {Flags,Challenge};
+ _ ->
+ ?shutdown(no_node)
+ end;
+ _ ->
+ ?shutdown(no_node)
+ end.
+
+
+%%
+%% wait for challenge response after send_challenge
+%%
+recv_challenge_reply(#hs_data{socket = Socket,
+ other_node = NodeB,
+ f_recv = FRecv},
+ ChallengeA, Cookie) ->
+ case FRecv(Socket, 0, infinity) of
+ {ok,[$r,CB3,CB2,CB1,CB0 | SumB]} when length(SumB) =:= 16 ->
+ SumA = gen_digest(ChallengeA, Cookie),
+ ChallengeB = ?u32(CB3,CB2,CB1,CB0),
+ ?trace("recv_reply: challenge=~w digest=~p\n",
+ [ChallengeB,SumB]),
+ ?trace("sum = ~p\n", [SumA]),
+ case list_to_binary(SumB) of
+ SumA ->
+ ChallengeB;
+ _ ->
+ error_msg("** Connection attempt from "
+ "disallowed node ~w ** ~n", [NodeB]),
+ ?shutdown(NodeB)
+ end;
+ _ ->
+ ?shutdown(no_node)
+ end.
+
+recv_challenge_ack(#hs_data{socket = Socket, f_recv = FRecv,
+ other_node = NodeB},
+ ChallengeB, CookieA) ->
+ case FRecv(Socket, 0, infinity) of
+ {ok,[$a|SumB]} when length(SumB) =:= 16 ->
+ SumA = gen_digest(ChallengeB, CookieA),
+ ?trace("recv_ack: digest=~p\n", [SumB]),
+ ?trace("sum = ~p\n", [SumA]),
+ case list_to_binary(SumB) of
+ SumA ->
+ ok;
+ _ ->
+ error_msg("** Connection attempt to "
+ "disallowed node ~w ** ~n", [NodeB]),
+ ?shutdown(NodeB)
+ end;
+ _ ->
+ ?shutdown(NodeB)
+ end.
+
+recv_status(#hs_data{kernel_pid = Kernel, socket = Socket,
+ other_node = Node, f_recv = Recv} = HSData) ->
+ case Recv(Socket, 0, infinity) of
+ {ok, [$s|StrStat]} ->
+ Stat = list_to_atom(StrStat),
+ ?debug({dist_util,self(),recv_status, Node, Stat}),
+ case Stat of
+ not_allowed -> ?shutdown(Node);
+ nok ->
+ %% wait to be killed by net_kernel
+ receive
+ after infinity -> ok
+ end;
+ alive ->
+ Reply = is_pending(Kernel, Node),
+ ?debug({is_pending,self(),Reply}),
+ send_status(HSData, Reply),
+ if not Reply ->
+ ?shutdown(Node);
+ Reply ->
+ Stat
+ end;
+ _ -> Stat
+ end;
+ _Error ->
+ ?debug({dist_util,self(),recv_status_error,
+ Node, _Error}),
+ ?shutdown(Node)
+ end.
+
+
+send_status(#hs_data{socket = Socket, other_node = Node,
+ f_send = FSend}, Stat) ->
+ ?debug({dist_util,self(),send_status, Node, Stat}),
+ case FSend(Socket, [$s | atom_to_list(Stat)]) of
+ {error, _} ->
+ ?shutdown(Node);
+ _ ->
+ true
+ end.
+
+
+
+%%
+%% Send a TICK to the other side.
+%%
+%% This will happen every 15 seconds (by default)
+%% The idea here is that every 15 secs, we write a little
+%% something on the connection if we haven't written anything for
+%% the last 15 secs.
+%% This will ensure that nodes that are not responding due to
+%% hardware errors (Or being suspended by means of ^Z) will
+%% be considered to be down. If we do not want to have this
+%% we must start the net_kernel (in erlang) without its
+%% ticker process, In that case this code will never run
+
+%% And then every 60 seconds we also check the connection and
+%% close it if we havn't received anything on it for the
+%% last 60 secs. If ticked == tick we havn't received anything
+%% on the connection the last 60 secs.
+
+%% The detection time interval is thus, by default, 45s < DT < 75s
+
+%% A HIDDEN node is always (if not a pending write) ticked if
+%% we haven't read anything as a hidden node only ticks when it receives
+%% a TICK !!
+
+send_tick(Socket, Tick, Type, MFTick, MFGetstat) ->
+ #tick{tick = T0,
+ read = Read,
+ write = Write,
+ ticked = Ticked} = Tick,
+ T = T0 + 1,
+ T1 = T rem 4,
+ case MFGetstat(Socket) of
+ {ok, Read, _, _} when Ticked =:= T ->
+ {error, not_responding};
+ {ok, Read, W, Pend} when Type =:= hidden ->
+ send_tick(Socket, Pend, MFTick),
+ {ok, Tick#tick{write = W + 1,
+ tick = T1}};
+ {ok, Read, Write, Pend} ->
+ send_tick(Socket, Pend, MFTick),
+ {ok, Tick#tick{write = Write + 1,
+ tick = T1}};
+ {ok, R, Write, Pend} ->
+ send_tick(Socket, Pend, MFTick),
+ {ok, Tick#tick{write = Write + 1,
+ read = R,
+ tick = T1,
+ ticked = T}};
+ {ok, Read, W, _} ->
+ {ok, Tick#tick{write = W,
+ tick = T1}};
+ {ok, R, W, _} ->
+ {ok, Tick#tick{write = W,
+ read = R,
+ tick = T1,
+ ticked = T}};
+ Error ->
+ Error
+ end.
+
+send_tick(Socket, 0, MFTick) ->
+ MFTick(Socket);
+send_tick(_, _Pend, _) ->
+ %% Dont send tick if pending write.
+ ok.
+
+%% ------------------------------------------------------------
+%% Connection setup timeout timer.
+%% After Timeout milliseconds this process terminates
+%% which implies that the owning setup/accept process terminates.
+%% The timer is reset before every network operation during the
+%% connection setup !
+%% ------------------------------------------------------------
+
+start_timer(Timeout) ->
+ spawn_link(?MODULE, setup_timer, [self(), Timeout*?trace_factor]).
+
+setup_timer(Pid, Timeout) ->
+ receive
+ {Pid, reset} ->
+ setup_timer(Pid, Timeout)
+ after Timeout ->
+ ?trace("Timer expires ~p, ~p~n",[Pid, Timeout]),
+ ?shutdown(timer)
+ end.
+
+reset_timer(Timer) ->
+ Timer ! {self(), reset}.
+
+cancel_timer(Timer) ->
+ unlink(Timer),
+ exit(Timer, shutdown).
+