diff options
author | Erlang/OTP <otp@erlang.org> | 2009-11-20 14:54:40 +0000 |
---|---|---|
committer | Erlang/OTP <otp@erlang.org> | 2009-11-20 14:54:40 +0000 |
commit | 84adefa331c4159d432d22840663c38f155cd4c1 (patch) | |
tree | bff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/kernel/src/dist_util.erl | |
download | otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2 otp-84adefa331c4159d432d22840663c38f155cd4c1.zip |
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/kernel/src/dist_util.erl')
-rw-r--r-- | lib/kernel/src/dist_util.erl | 762 |
1 files changed, 762 insertions, 0 deletions
diff --git a/lib/kernel/src/dist_util.erl b/lib/kernel/src/dist_util.erl new file mode 100644 index 0000000000..a2937d60b8 --- /dev/null +++ b/lib/kernel/src/dist_util.erl @@ -0,0 +1,762 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1999-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%%%---------------------------------------------------------------------- +%%% Purpose : The handshake of a streamed distribution connection +%%% in a separate file to make it usable for other +%%% distribution protocols. +%%%---------------------------------------------------------------------- + +-module(dist_util). + +%%-compile(export_all). +-export([handshake_we_started/1, handshake_other_started/1, + start_timer/1, setup_timer/2, + reset_timer/1, cancel_timer/1, + shutdown/3, shutdown/4]). + +-import(error_logger,[error_msg/2]). + +-include("dist_util.hrl"). +-include("dist.hrl"). + +-ifdef(DEBUG). +-define(shutdown_trace(A,B), io:format(A,B)). +-else. +-define(shutdown_trace(A,B), noop). +-endif. + +-define(to_port(FSend, Socket, Data), + case FSend(Socket, Data) of + {error, closed} -> + self() ! {tcp_closed, Socket}, + {error, closed}; + R -> + R + end). + + +-define(int16(X), [((X) bsr 8) band 16#ff, (X) band 16#ff]). + +-define(int32(X), + [((X) bsr 24) band 16#ff, ((X) bsr 16) band 16#ff, + ((X) bsr 8) band 16#ff, (X) band 16#ff]). + +-define(i16(X1,X0), + (?u16(X1,X0) - + (if (X1) > 127 -> 16#10000; true -> 0 end))). + +-define(u16(X1,X0), + (((X1) bsl 8) bor (X0))). + +-define(u32(X3,X2,X1,X0), + (((X3) bsl 24) bor ((X2) bsl 16) bor ((X1) bsl 8) bor (X0))). + +-record(tick, {read = 0, + write = 0, + tick = 0, + ticked = 0 + }). + +remove_flag(Flag, Flags) -> + case Flags band Flag of + 0 -> + Flags; + _ -> + Flags - Flag + end. + +adjust_flags(ThisFlags, OtherFlags) -> + case (?DFLAG_PUBLISHED band ThisFlags) band OtherFlags of + 0 -> + {remove_flag(?DFLAG_PUBLISHED, ThisFlags), + remove_flag(?DFLAG_PUBLISHED, OtherFlags)}; + _ -> + {ThisFlags, OtherFlags} + end. + +publish_flag(hidden, _) -> + 0; +publish_flag(_, OtherNode) -> + case net_kernel:publish_on_node(OtherNode) of + true -> + ?DFLAG_PUBLISHED; + _ -> + 0 + end. + +make_this_flags(RequestType, OtherNode) -> + publish_flag(RequestType, OtherNode) bor + %% The parenthesis below makes the compiler generate better code. + (?DFLAG_EXPORT_PTR_TAG bor + ?DFLAG_EXTENDED_PIDS_PORTS bor + ?DFLAG_EXTENDED_REFERENCES bor + ?DFLAG_DIST_MONITOR bor + ?DFLAG_FUN_TAGS bor + ?DFLAG_DIST_MONITOR_NAME bor + ?DFLAG_HIDDEN_ATOM_CACHE bor + ?DFLAG_NEW_FUN_TAGS bor + ?DFLAG_BIT_BINARIES bor + ?DFLAG_NEW_FLOATS bor + ?DFLAG_UNICODE_IO bor + ?DFLAG_DIST_HDR_ATOM_CACHE bor + ?DFLAG_SMALL_ATOM_TAGS). + +handshake_other_started(#hs_data{request_type=ReqType}=HSData0) -> + {PreOtherFlags,Node,Version} = recv_name(HSData0), + PreThisFlags = make_this_flags(ReqType, Node), + {ThisFlags, OtherFlags} = adjust_flags(PreThisFlags, + PreOtherFlags), + HSData = HSData0#hs_data{this_flags=ThisFlags, + other_flags=OtherFlags, + other_version=Version, + other_node=Node, + other_started=true}, + check_dflag_xnc(HSData), + is_allowed(HSData), + ?debug({"MD5 connection from ~p (V~p)~n", + [Node, HSData#hs_data.other_version]}), + mark_pending(HSData), + {MyCookie,HisCookie} = get_cookies(Node), + ChallengeA = gen_challenge(), + send_challenge(HSData, ChallengeA), + reset_timer(HSData#hs_data.timer), + ChallengeB = recv_challenge_reply(HSData, ChallengeA, MyCookie), + send_challenge_ack(HSData, gen_digest(ChallengeB, HisCookie)), + ?debug({dist_util, self(), accept_connection, Node}), + connection(HSData). + +%% +%% check if connecting node is allowed to connect +%% with allow-node-scheme +%% +is_allowed(#hs_data{other_node = Node, + allowed = Allowed} = HSData) -> + case lists:member(Node, Allowed) of + false when Allowed =/= [] -> + send_status(HSData, not_allowed), + error_msg("** Connection attempt from " + "disallowed node ~w ** ~n", [Node]), + ?shutdown(Node); + _ -> true + end. + +%% +%% Check that both nodes can handle the same types of extended +%% node containers. If they can not, abort the connection. +%% +check_dflag_xnc(#hs_data{other_node = Node, + other_flags = OtherFlags, + other_started = OtherStarted} = HSData) -> + XRFlg = ?DFLAG_EXTENDED_REFERENCES, + XPPFlg = case erlang:system_info(compat_rel) of + R when R >= 10 -> + ?DFLAG_EXTENDED_PIDS_PORTS; + _ -> + 0 + end, + ReqXncFlags = XRFlg bor XPPFlg, + case OtherFlags band ReqXncFlags =:= ReqXncFlags of + true -> + ok; + false -> + What = case {OtherFlags band XRFlg =:= XRFlg, + OtherFlags band XPPFlg =:= XPPFlg} of + {false, false} -> "references, pids and ports"; + {true, false} -> "pids and ports"; + {false, true} -> "references" + end, + case OtherStarted of + true -> + send_status(HSData, not_allowed), + Dir = "from", + How = "rejected"; + _ -> + Dir = "to", + How = "aborted" + end, + error_msg("** ~w: Connection attempt ~s node ~w ~s " + "since it cannot handle extended ~s. " + "**~n", [node(), Dir, Node, How, What]), + ?shutdown(Node) + end. + + +%% No nodedown will be sent if we fail before this process has +%% succeeded to mark the node as pending. + +mark_pending(#hs_data{kernel_pid=Kernel, + other_node=Node, + this_node=MyNode}=HSData) -> + case do_mark_pending(Kernel, MyNode, Node, + (HSData#hs_data.f_address)(HSData#hs_data.socket, + Node), + HSData#hs_data.other_flags) of + ok -> + send_status(HSData, ok), + reset_timer(HSData#hs_data.timer); + + ok_pending -> + send_status(HSData, ok_simultaneous), + reset_timer(HSData#hs_data.timer); + + nok_pending -> + send_status(HSData, nok), + ?shutdown(Node); + + up_pending -> + %% Check if connection is still alive, no + %% implies that the connection is no longer pending + %% due to simultaneous connect + do_alive(HSData), + + %% This can happen if the other node goes down, + %% and goes up again and contact us before we have + %% detected that the socket was closed. + wait_pending(Kernel), + reset_timer(HSData#hs_data.timer); + + already_pending -> + %% FIXME: is this a case ? + ?debug({dist_util,self(),mark_pending,already_pending,Node}), + ?shutdown(Node) + end. + + +%% +%% Marking pending and negotiating away +%% simultaneous connection problems +%% + +wait_pending(Kernel) -> + receive + {Kernel, pending} -> + ?trace("wait_pending returned for pid ~p.~n", + [self()]), + ok + end. + +do_alive(#hs_data{other_node = Node} = HSData) -> + send_status(HSData, alive), + case recv_status(HSData) of + true -> true; + false -> ?shutdown(Node) + end. + +do_mark_pending(Kernel, MyNode, Node, Address, Flags) -> + Kernel ! {self(), {accept_pending,MyNode,Node,Address, + publish_type(Flags)}}, + receive + {Kernel,{accept_pending,Ret}} -> + ?trace("do_mark_pending(~p,~p,~p,~p) -> ~p~n", + [Kernel,Node,Address,Flags,Ret]), + Ret + end. + +is_pending(Kernel, Node) -> + Kernel ! {self(), {is_pending, Node}}, + receive + {Kernel, {is_pending, Reply}} -> Reply + end. + +%% +%% This will tell the net_kernel about the nodedown as it +%% recognizes the exit signal. +%% The termination of this process does also imply that the Socket +%% is closed in a controlled way by inet_drv. +%% + +-spec shutdown(atom(), non_neg_integer(), term()) -> no_return(). + +shutdown(Module, Line, Data) -> + shutdown(Module, Line, Data, shutdown). + +-spec shutdown(atom(), non_neg_integer(), term(), term()) -> no_return(). + +shutdown(_Module, _Line, _Data, Reason) -> + ?shutdown_trace("Net Kernel 2: shutting down connection " + "~p:~p, data ~p,reason ~p~n", + [_Module,_Line, _Data, Reason]), + flush_down(), + exit(Reason). +%% Use this line to debug connection. +%% Set net_kernel verbose = 1 as well. +%% exit({Reason, ?MODULE, _Line, _Data, erlang:now()}). + + +flush_down() -> + receive + {From, get_status} -> + From ! {self(), get_status, error}, + flush_down() + after 0 -> + ok + end. + +handshake_we_started(#hs_data{request_type=ReqType, + other_node=Node}=PreHSData) -> + PreThisFlags = make_this_flags(ReqType, Node), + HSData = PreHSData#hs_data{this_flags=PreThisFlags}, + send_name(HSData), + recv_status(HSData), + {PreOtherFlags,ChallengeA} = recv_challenge(HSData), + {ThisFlags,OtherFlags} = adjust_flags(PreThisFlags, PreOtherFlags), + NewHSData = HSData#hs_data{this_flags = ThisFlags, + other_flags = OtherFlags, + other_started = false}, + check_dflag_xnc(NewHSData), + MyChallenge = gen_challenge(), + {MyCookie,HisCookie} = get_cookies(Node), + send_challenge_reply(NewHSData,MyChallenge, + gen_digest(ChallengeA,HisCookie)), + reset_timer(NewHSData#hs_data.timer), + recv_challenge_ack(NewHSData, MyChallenge, MyCookie), + connection(NewHSData). + +%% -------------------------------------------------------------- +%% The connection has been established. +%% -------------------------------------------------------------- + +connection(#hs_data{other_node = Node, + socket = Socket, + f_address = FAddress, + f_setopts_pre_nodeup = FPreNodeup, + f_setopts_post_nodeup = FPostNodeup}= HSData) -> + cancel_timer(HSData#hs_data.timer), + PType = publish_type(HSData#hs_data.other_flags), + case FPreNodeup(Socket) of + ok -> + do_setnode(HSData), % Succeeds or exits the process. + Address = FAddress(Socket,Node), + mark_nodeup(HSData,Address), + case FPostNodeup(Socket) of + ok -> + con_loop(HSData#hs_data.kernel_pid, + Node, + Socket, + Address, + HSData#hs_data.this_node, + PType, + #tick{}, + HSData#hs_data.mf_tick, + HSData#hs_data.mf_getstat); + _ -> + ?shutdown2(Node, connection_setup_failed) + end; + _ -> + ?shutdown(Node) + end. + +%% Generate a message digest from Challenge number and Cookie +gen_digest(Challenge, Cookie) when is_integer(Challenge), is_atom(Cookie) -> + erlang:md5([atom_to_list(Cookie)|integer_to_list(Challenge)]). + +%% --------------------------------------------------------------- +%% Challenge code +%% gen_challenge() returns a "random" number +%% --------------------------------------------------------------- +gen_challenge() -> + {A,B,C} = erlang:now(), + {D,_} = erlang:statistics(reductions), + {E,_} = erlang:statistics(runtime), + {F,_} = erlang:statistics(wall_clock), + {G,H,_} = erlang:statistics(garbage_collection), + %% A(8) B(16) C(16) + %% D(16),E(8), F(16) G(8) H(16) + ( ((A bsl 24) + (E bsl 16) + (G bsl 8) + F) bxor + (B + (C bsl 16)) bxor + (D + (H bsl 16)) ) band 16#ffffffff. + +%% +%% Get the cookies for a node from auth +%% +get_cookies(Node) -> + case auth:get_cookie(Node) of + X when is_atom(X) -> + {X,X} +% {Y,Z} when is_atom(Y), is_atom(Z) -> +% {Y,Z}; +% _ -> +% erlang:error("Corrupt cookie database") + end. + +%% No error return; either succeeds or terminates the process. +do_setnode(#hs_data{other_node = Node, socket = Socket, + other_flags = Flags, other_version = Version, + f_getll = GetLL}) -> + case GetLL(Socket) of + {ok,Port} -> + ?trace("setnode(md5,~p ~p ~p)~n", + [Node, Port, {publish_type(Flags), + '(', Flags, ')', + Version}]), + case (catch + erlang:setnode(Node, Port, + {Flags, Version, '', ''})) of + {'EXIT', {system_limit, _}} -> + error_msg("** Distribution system limit reached, " + "no table space left for node ~w ** ~n", + [Node]), + ?shutdown(Node); + {'EXIT', Other} -> + exit(Other); + _Else -> + ok + end; + _ -> + error_msg("** Distribution connection error, " + "could not get low level port for node ~w ** ~n", + [Node]), + ?shutdown(Node) + end. + +mark_nodeup(#hs_data{kernel_pid = Kernel, + other_node = Node, + other_flags = Flags, + other_started = OtherStarted}, + Address) -> + Kernel ! {self(), {nodeup,Node,Address,publish_type(Flags), + true}}, + receive + {Kernel, inserted} -> + ok; + {Kernel, bad_request} -> + TypeT = case OtherStarted of + true -> + "accepting connection"; + _ -> + "initiating connection" + end, + error_msg("Fatal: ~p was not allowed to " + "send {nodeup, ~p} to kernel when ~s~n", + [self(), Node, TypeT]), + ?shutdown(Node) + end. + +con_loop(Kernel, Node, Socket, TcpAddress, + MyNode, Type, Tick, MFTick, MFGetstat) -> + receive + {tcp_closed, Socket} -> + ?shutdown2(Node, connection_closed); + {Kernel, disconnect} -> + ?shutdown2(Node, disconnected); + {Kernel, aux_tick} -> + case MFGetstat(Socket) of + {ok, _, _, PendWrite} -> + send_tick(Socket, PendWrite, MFTick); + _ -> + ignore_it + end, + con_loop(Kernel, Node, Socket, TcpAddress, MyNode, Type, + Tick, MFTick, MFGetstat); + {Kernel, tick} -> + case send_tick(Socket, Tick, Type, + MFTick, MFGetstat) of + {ok, NewTick} -> + con_loop(Kernel, Node, Socket, TcpAddress, + MyNode, Type, NewTick, MFTick, + MFGetstat); + {error, not_responding} -> + error_msg("** Node ~p not responding **~n" + "** Removing (timedout) connection **~n", + [Node]), + ?shutdown2(Node, net_tick_timeout); + _Other -> + ?shutdown2(Node, send_net_tick_failed) + end; + {From, get_status} -> + case MFGetstat(Socket) of + {ok, Read, Write, _} -> + From ! {self(), get_status, {ok, Read, Write}}, + con_loop(Kernel, Node, Socket, TcpAddress, + MyNode, + Type, Tick, + MFTick, MFGetstat); + _ -> + ?shutdown2(Node, get_status_failed) + end + end. + + +%% ------------------------------------------------------------ +%% Misc. functions. +%% ------------------------------------------------------------ + +send_name(#hs_data{socket = Socket, this_node = Node, + f_send = FSend, + this_flags = Flags, + other_version = Version}) -> + ?trace("send_name: node=~w, version=~w\n", + [Node,Version]), + ?to_port(FSend, Socket, + [$n, ?int16(Version), ?int32(Flags), atom_to_list(Node)]). + +send_challenge(#hs_data{socket = Socket, this_node = Node, + other_version = Version, + this_flags = Flags, + f_send = FSend}, + Challenge ) -> + ?trace("send: challenge=~w version=~w\n", + [Challenge,Version]), + ?to_port(FSend, Socket, [$n,?int16(Version), ?int32(Flags), + ?int32(Challenge), + atom_to_list(Node)]). + +send_challenge_reply(#hs_data{socket = Socket, f_send = FSend}, + Challenge, Digest) -> + ?trace("send_reply: challenge=~w digest=~p\n", + [Challenge,Digest]), + ?to_port(FSend, Socket, [$r,?int32(Challenge),Digest]). + +send_challenge_ack(#hs_data{socket = Socket, f_send = FSend}, + Digest) -> + ?trace("send_ack: digest=~p\n", [Digest]), + ?to_port(FSend, Socket, [$a,Digest]). + + +%% +%% Get the name of the other side. +%% Close the connection if invalid data. +%% The IP address sent is not interesting (as in the old +%% tcp_drv.c which used it to detect simultaneous connection +%% attempts). +%% +recv_name(#hs_data{socket = Socket, f_recv = Recv}) -> + case Recv(Socket, 0, infinity) of + {ok,Data} -> + get_name(Data); + _ -> + ?shutdown(no_node) + end. + +get_name([$n,VersionA, VersionB, Flag1, Flag2, Flag3, Flag4 | OtherNode]) -> + {?u32(Flag1, Flag2, Flag3, Flag4), list_to_atom(OtherNode), + ?u16(VersionA,VersionB)}; +get_name(Data) -> + ?shutdown(Data). + +publish_type(Flags) -> + case Flags band ?DFLAG_PUBLISHED of + 0 -> + hidden; + _ -> + normal + end. + +%% wait for challenge after connect +recv_challenge(#hs_data{socket=Socket,other_node=Node, + other_version=Version,f_recv=Recv}) -> + case Recv(Socket, 0, infinity) of + {ok,[$n,V1,V0,Fl1,Fl2,Fl3,Fl4,CA3,CA2,CA1,CA0 | Ns]} -> + Flags = ?u32(Fl1,Fl2,Fl3,Fl4), + case {list_to_existing_atom(Ns),?u16(V1,V0)} of + {Node,Version} -> + Challenge = ?u32(CA3,CA2,CA1,CA0), + ?trace("recv: node=~w, challenge=~w version=~w\n", + [Node, Challenge,Version]), + {Flags,Challenge}; + _ -> + ?shutdown(no_node) + end; + _ -> + ?shutdown(no_node) + end. + + +%% +%% wait for challenge response after send_challenge +%% +recv_challenge_reply(#hs_data{socket = Socket, + other_node = NodeB, + f_recv = FRecv}, + ChallengeA, Cookie) -> + case FRecv(Socket, 0, infinity) of + {ok,[$r,CB3,CB2,CB1,CB0 | SumB]} when length(SumB) =:= 16 -> + SumA = gen_digest(ChallengeA, Cookie), + ChallengeB = ?u32(CB3,CB2,CB1,CB0), + ?trace("recv_reply: challenge=~w digest=~p\n", + [ChallengeB,SumB]), + ?trace("sum = ~p\n", [SumA]), + case list_to_binary(SumB) of + SumA -> + ChallengeB; + _ -> + error_msg("** Connection attempt from " + "disallowed node ~w ** ~n", [NodeB]), + ?shutdown(NodeB) + end; + _ -> + ?shutdown(no_node) + end. + +recv_challenge_ack(#hs_data{socket = Socket, f_recv = FRecv, + other_node = NodeB}, + ChallengeB, CookieA) -> + case FRecv(Socket, 0, infinity) of + {ok,[$a|SumB]} when length(SumB) =:= 16 -> + SumA = gen_digest(ChallengeB, CookieA), + ?trace("recv_ack: digest=~p\n", [SumB]), + ?trace("sum = ~p\n", [SumA]), + case list_to_binary(SumB) of + SumA -> + ok; + _ -> + error_msg("** Connection attempt to " + "disallowed node ~w ** ~n", [NodeB]), + ?shutdown(NodeB) + end; + _ -> + ?shutdown(NodeB) + end. + +recv_status(#hs_data{kernel_pid = Kernel, socket = Socket, + other_node = Node, f_recv = Recv} = HSData) -> + case Recv(Socket, 0, infinity) of + {ok, [$s|StrStat]} -> + Stat = list_to_atom(StrStat), + ?debug({dist_util,self(),recv_status, Node, Stat}), + case Stat of + not_allowed -> ?shutdown(Node); + nok -> + %% wait to be killed by net_kernel + receive + after infinity -> ok + end; + alive -> + Reply = is_pending(Kernel, Node), + ?debug({is_pending,self(),Reply}), + send_status(HSData, Reply), + if not Reply -> + ?shutdown(Node); + Reply -> + Stat + end; + _ -> Stat + end; + _Error -> + ?debug({dist_util,self(),recv_status_error, + Node, _Error}), + ?shutdown(Node) + end. + + +send_status(#hs_data{socket = Socket, other_node = Node, + f_send = FSend}, Stat) -> + ?debug({dist_util,self(),send_status, Node, Stat}), + case FSend(Socket, [$s | atom_to_list(Stat)]) of + {error, _} -> + ?shutdown(Node); + _ -> + true + end. + + + +%% +%% Send a TICK to the other side. +%% +%% This will happen every 15 seconds (by default) +%% The idea here is that every 15 secs, we write a little +%% something on the connection if we haven't written anything for +%% the last 15 secs. +%% This will ensure that nodes that are not responding due to +%% hardware errors (Or being suspended by means of ^Z) will +%% be considered to be down. If we do not want to have this +%% we must start the net_kernel (in erlang) without its +%% ticker process, In that case this code will never run + +%% And then every 60 seconds we also check the connection and +%% close it if we havn't received anything on it for the +%% last 60 secs. If ticked == tick we havn't received anything +%% on the connection the last 60 secs. + +%% The detection time interval is thus, by default, 45s < DT < 75s + +%% A HIDDEN node is always (if not a pending write) ticked if +%% we haven't read anything as a hidden node only ticks when it receives +%% a TICK !! + +send_tick(Socket, Tick, Type, MFTick, MFGetstat) -> + #tick{tick = T0, + read = Read, + write = Write, + ticked = Ticked} = Tick, + T = T0 + 1, + T1 = T rem 4, + case MFGetstat(Socket) of + {ok, Read, _, _} when Ticked =:= T -> + {error, not_responding}; + {ok, Read, W, Pend} when Type =:= hidden -> + send_tick(Socket, Pend, MFTick), + {ok, Tick#tick{write = W + 1, + tick = T1}}; + {ok, Read, Write, Pend} -> + send_tick(Socket, Pend, MFTick), + {ok, Tick#tick{write = Write + 1, + tick = T1}}; + {ok, R, Write, Pend} -> + send_tick(Socket, Pend, MFTick), + {ok, Tick#tick{write = Write + 1, + read = R, + tick = T1, + ticked = T}}; + {ok, Read, W, _} -> + {ok, Tick#tick{write = W, + tick = T1}}; + {ok, R, W, _} -> + {ok, Tick#tick{write = W, + read = R, + tick = T1, + ticked = T}}; + Error -> + Error + end. + +send_tick(Socket, 0, MFTick) -> + MFTick(Socket); +send_tick(_, _Pend, _) -> + %% Dont send tick if pending write. + ok. + +%% ------------------------------------------------------------ +%% Connection setup timeout timer. +%% After Timeout milliseconds this process terminates +%% which implies that the owning setup/accept process terminates. +%% The timer is reset before every network operation during the +%% connection setup ! +%% ------------------------------------------------------------ + +start_timer(Timeout) -> + spawn_link(?MODULE, setup_timer, [self(), Timeout*?trace_factor]). + +setup_timer(Pid, Timeout) -> + receive + {Pid, reset} -> + setup_timer(Pid, Timeout) + after Timeout -> + ?trace("Timer expires ~p, ~p~n",[Pid, Timeout]), + ?shutdown(timer) + end. + +reset_timer(Timer) -> + Timer ! {self(), reset}. + +cancel_timer(Timer) -> + unlink(Timer), + exit(Timer, shutdown). + |