%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. %% %% %CopyrightEnd% %% %% %% This module implements (as a process) the state machine documented %% in Appendix A of RFC 3539. %% -module(diameter_watchdog). -behaviour(gen_server). %% towards diameter_service -export([start/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% diameter_watchdog_sup callback -export([start_link/1]). -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). -define(DEFAULT_TW_INIT, 30000). %% RFC 3539 ch 3.4.1 -define(NOMASK, {0,32}). %% default sequence mask -record(watchdog, {%% PCB - Peer Control Block; see RFC 3539, Appendix A status = initial :: initial | okay | suspect | down | reopen, pending = false :: boolean(), tw :: 6000..16#FFFFFFFF | {module(), atom(), list()}, %% {M,F,A} -> integer() >= 0 num_dwa = 0 :: -1 | non_neg_integer(), %% number of DWAs received during reopen %% end PCB parent = self() :: pid(), transport :: pid() | undefined, tref :: reference(), %% reference for current watchdog timer message_data, %% term passed into diameter_service with message sequence :: diameter:sequence()}). %% mask %% start/2 %% %% Start a monitor before the watchdog is allowed to proceed to ensure %% that a failed capabilities exchange produces the desired exit %% reason. -spec start(Type, {RecvData, [Opt], SvcName, #diameter_service{}}) -> {reference(), pid()} when Type :: {connect|accept, diameter:transport_ref()}, RecvData :: term(), Opt :: diameter:transport_opt(), SvcName :: diameter:service_name(). start({_,_} = Type, T) -> Ref = make_ref(), {ok, Pid} = diameter_watchdog_sup:start_child({Ref, {Type, self(), T}}), try {erlang:monitor(process, Pid), Pid} after Pid ! Ref end. start_link(T) -> {ok, _} = proc_lib:start_link(?MODULE, init, [T], infinity, diameter_lib:spawn_opts(server, [])). %% =========================================================================== %% =========================================================================== %% init/1 init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). i({Ref, {_, Pid, _} = T}) -> MRef = erlang:monitor(process, Pid), receive Ref -> make_state(T); {'DOWN', MRef, process, _, _} = D -> exit({shutdown, D}) end; i({_, Pid, _} = T) -> %% from old code erlang:monitor(process, Pid), make_state(T). make_state({T, Pid, {RecvData, Opts, SvcName, #diameter_service{applications = Apps, capabilities = Caps} = Svc}}) -> random:seed(now()), putr(restart, {T, Opts, Svc}), %% save seeing it in trace putr(dwr, dwr(Caps)), %% {_,_} = Mask = call(Pid, sequence), #watchdog{parent = Pid, transport = monitor(diameter_peer_fsm:start(T, Opts, {Mask, Svc})), tw = proplists:get_value(watchdog_timer, Opts, ?DEFAULT_TW_INIT), message_data = {RecvData, SvcName, Apps, Mask}, sequence = Mask}. %% Retrieve the sequence mask from the parent from the parent, rather %% than having it passed into init/1, for upgrade reasons: the call to %% diameter_service:receive_message/3 passes back the mask. %% handle_call/3 handle_call(_, _, State) -> {reply, nok, State}. %% handle_cast/2 handle_cast(_, State) -> {noreply, State}. %% handle_info/2 handle_info(T, #watchdog{} = State) -> case transition(T, State) of ok -> {noreply, State}; #watchdog{} = S -> event(State, S), {noreply, S}; stop -> ?LOG(stop, T), event(State, State#watchdog{status = down}), {stop, {shutdown, T}, State} end; handle_info(T, S) -> %% upgrade handle_info(T, #watchdog{} = list_to_tuple(tuple_to_list(S) ++ [?NOMASK])). event(#watchdog{status = T}, #watchdog{status = T}) -> ok; event(#watchdog{transport = undefined}, #watchdog{transport = undefined}) -> ok; event(#watchdog{status = From, transport = F, parent = Pid}, #watchdog{status = To, transport = T}) -> E = {tpid(F,T), From, To}, notify(Pid, E), ?LOG(transition, {self(), E}). tpid(_, Pid) when is_pid(Pid) -> Pid; tpid(Pid, _) -> Pid. notify(Pid, E) -> Pid ! {watchdog, self(), E}. %% terminate/2 terminate(_, _) -> ok. %% code_change/3 code_change(_, State, _) -> {ok, State}. %% =========================================================================== %% =========================================================================== %% transition/2 %% %% The state transitions documented here are extracted from RFC 3539, %% the commentary is ours. %% Service or watchdog is telling the watchdog of an accepting %% transport to die after reconnect_timer expiry or reestablished %% connection (in another transport process) respectively. transition(close, #watchdog{status = down}) -> {{accept, _}, _, _} = getr(restart), %% assert stop; transition(close, #watchdog{}) -> ok; %% Service is asking for the peer to be taken down gracefully. transition({shutdown, Pid}, #watchdog{parent = Pid, transport = undefined, status = S}) -> down = S, %% sanity check stop; transition({shutdown = T, Pid}, #watchdog{parent = Pid, transport = TPid}) -> TPid ! {T, self()}, ok; %% Parent process has died, transition({'DOWN', _, process, Pid, _Reason}, #watchdog{parent = Pid}) -> stop; %% Transport has accepted a connection. transition({accepted = T, TPid}, #watchdog{transport = TPid, parent = Pid}) -> Pid ! {T, self(), TPid}, ok; %% Transport is telling us that its impending death isn't failure. transition({close, TPid, _Reason}, #watchdog{transport = TPid}) -> stop; %% STATE Event Actions New State %% ===== ------ ------- ---------- %% INITIAL Connection up SetWatchdog() OKAY %% By construction, the watchdog timer isn't set until we move into %% state okay as the result of the Peer State Machine reaching the %% Open state. %% %% If we're accepting then we may be resuming a connection that went %% down in another watchdog process, in which case this is the %% transition below, from down to reopen. That is, it's not until we %% know the identity of the peer (ie. now) that we know that we're in %% state down rather than initial. transition({open, TPid, Hosts, T} = Open, #watchdog{transport = TPid, status = initial, parent = Pid} = S) -> case okay(getr(restart), Hosts) of okay -> open(Pid, {TPid, T}), set_watchdog(S#watchdog{status = okay}); reopen -> transition(Open, S#watchdog{status = down}) end; %% DOWN Connection up NumDWA = 0 %% SendWatchdog() %% SetWatchdog() %% Pending = TRUE REOPEN transition({open = P, TPid, _Hosts, T}, #watchdog{transport = TPid, status = down} = S) -> %% Store the info we need to notify the parent to reopen the %% connection after the requisite DWA's are received, at which %% time we eraser(open). putr(P, {TPid, T}), set_watchdog(send_watchdog(S#watchdog{status = reopen, num_dwa = 0})); %% OKAY Connection down CloseConnection() %% Failover() %% SetWatchdog() DOWN %% SUSPECT Connection down CloseConnection() %% SetWatchdog() DOWN %% REOPEN Connection down CloseConnection() %% SetWatchdog() DOWN transition({'DOWN', _, process, TPid, _}, #watchdog{transport = TPid, status = initial}) -> stop; transition({'DOWN', _, process, TPid, _}, #watchdog{transport = TPid} = S) -> failover(S), close(S), set_watchdog(S#watchdog{status = down, pending = false, transport = undefined}); %% Any outstanding pending (or other messages from the transport) will %% have arrived before 'DOWN' since the message comes from the same %% process. Note that we could also get this message in the initial %% state. %% Incoming message. transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) -> recv(Name, Pkt, S); %% Current watchdog has timed out. transition({timeout, TRef, tw}, #watchdog{tref = TRef} = S) -> set_watchdog(timeout(S)); %% Timer was canceled after message was already sent. transition({timeout, _, tw}, #watchdog{}) -> ok; %% State query. transition({state, Pid}, #watchdog{status = S}) -> Pid ! {self(), S}, ok. %% =========================================================================== %% Only call "upwards", to the parent service. call(Pid, Req) -> try gen_server:call(Pid, Req, infinity) catch exit: Reason -> exit({shutdown, {Req, Reason}}) end. monitor(Pid) -> erlang:monitor(process, Pid), Pid. putr(Key, Val) -> put({?MODULE, Key}, Val). getr(Key) -> get({?MODULE, Key}). eraser(Key) -> erase({?MODULE, Key}). %% encode/2 encode(Msg, Mask) -> Seq = diameter_session:sequence(Mask), Hdr = #diameter_header{version = ?DIAMETER_VERSION, end_to_end_id = Seq, hop_by_hop_id = Seq}, Pkt = #diameter_packet{header = Hdr, msg = Msg}, #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt), Bin. %% okay/2 okay({{accept, Ref}, _, _}, Hosts) -> T = {?MODULE, connection, Ref, Hosts}, diameter_reg:add(T), okay(diameter_reg:match(T)); %% Register before matching so that at least one of two registering %% processes will match the other. (Which can't happen as long as %% diameter_peer_fsm guarantees at most one open connection to the same %% peer.) okay({{connect, _}, _, _}, _) -> okay. %% The peer hasn't been connected recently ... okay([{_,P}]) -> P = self(), %% assert okay; %% ... or it has. okay(C) -> [_|_] = [P ! close || {_,P} <- C, self() /= P], reopen. %% set_watchdog/1 set_watchdog(#watchdog{tw = TwInit, tref = TRef} = S) -> cancel(TRef), S#watchdog{tref = erlang:start_timer(tw(TwInit), self(), tw)}. cancel(undefined) -> ok; cancel(TRef) -> erlang:cancel_timer(TRef). tw(T) when is_integer(T), T >= 6000 -> T - 2000 + (random:uniform(4001) - 1); %% RFC3539 jitter of +/- 2 sec. tw({M,F,A}) -> apply(M,F,A). %% open/2 open(Pid, {_,_} = T) -> Pid ! {connection_up, self(), T}. %% failover/1 failover(#watchdog{status = okay, parent = Pid}) -> Pid ! {connection_down, self()}; failover(_) -> ok. %% close/1 close(#watchdog{status = down}) -> ok; close(#watchdog{parent = Pid}) -> {{T, _}, _, _} = getr(restart), T == accept andalso (Pid ! {close, self()}). %% send_watchdog/1 send_watchdog(#watchdog{pending = false, transport = TPid, sequence = Mask} = S) -> TPid ! {send, encode(getr(dwr), Mask)}, ?LOG(send, 'DWR'), S#watchdog{pending = true}. %% recv/3 recv(Name, Pkt, S) -> try rcv(Name, S) of #watchdog{} = NS -> rcv(Name, Pkt, S), NS catch {?MODULE, throwaway, #watchdog{} = NS} -> NS end. %% rcv/3 rcv(N, _, _) when N == 'CER'; N == 'CEA'; N == 'DWR'; N == 'DWA'; N == 'DPR'; N == 'DPA' -> false; rcv(_, Pkt, #watchdog{transport = TPid, message_data = T}) -> diameter_service:receive_message(TPid, Pkt, T). throwaway(S) -> throw({?MODULE, throwaway, S}). %% rcv/2 %% INITIAL Receive DWA Pending = FALSE %% Throwaway() INITIAL %% INITIAL Receive non-DWA Throwaway() INITIAL rcv('DWA', #watchdog{status = initial} = S) -> throwaway(S#watchdog{pending = false}); rcv(_, #watchdog{status = initial} = S) -> throwaway(S); %% DOWN Receive DWA Pending = FALSE %% Throwaway() DOWN %% DOWN Receive non-DWA Throwaway() DOWN rcv('DWA', #watchdog{status = down} = S) -> throwaway(S#watchdog{pending = false}); rcv(_, #watchdog{status = down} = S) -> throwaway(S); %% OKAY Receive DWA Pending = FALSE %% SetWatchdog() OKAY %% OKAY Receive non-DWA SetWatchdog() OKAY rcv('DWA', #watchdog{status = okay} = S) -> set_watchdog(S#watchdog{pending = false}); rcv(_, #watchdog{status = okay} = S) -> set_watchdog(S); %% SUSPECT Receive DWA Pending = FALSE %% Failback() %% SetWatchdog() OKAY %% SUSPECT Receive non-DWA Failback() %% SetWatchdog() OKAY rcv('DWA', #watchdog{status = suspect} = S) -> failback(S), set_watchdog(S#watchdog{status = okay, pending = false}); rcv(_, #watchdog{status = suspect} = S) -> failback(S), set_watchdog(S#watchdog{status = okay}); %% REOPEN Receive DWA & Pending = FALSE %% NumDWA == 2 NumDWA++ %% Failback() OKAY rcv('DWA', #watchdog{status = reopen, num_dwa = 2 = N, parent = Pid} = S) -> open(Pid, eraser(open)), S#watchdog{status = okay, num_dwa = N+1, pending = false}; %% REOPEN Receive DWA & Pending = FALSE %% NumDWA < 2 NumDWA++ REOPEN rcv('DWA', #watchdog{status = reopen, num_dwa = N} = S) -> S#watchdog{num_dwa = N+1, pending = false}; %% REOPEN Receive non-DWA Throwaway() REOPEN rcv(_, #watchdog{status = reopen} = S) -> throwaway(S). %% failback/1 failback(#watchdog{parent = Pid}) -> Pid ! {connection_up, self()}. %% timeout/1 %% %% The caller sets the watchdog on the return value. %% OKAY Timer expires & SendWatchdog() %% !Pending SetWatchdog() %% Pending = TRUE OKAY %% REOPEN Timer expires & SendWatchdog() %% !Pending SetWatchdog() %% Pending = TRUE REOPEN timeout(#watchdog{status = T, pending = false} = S) when T == okay; T == reopen -> send_watchdog(S); %% OKAY Timer expires & Failover() %% Pending SetWatchdog() SUSPECT timeout(#watchdog{status = okay, pending = true} = S) -> failover(S), S#watchdog{status = suspect}; %% SUSPECT Timer expires CloseConnection() %% SetWatchdog() DOWN %% REOPEN Timer expires & CloseConnection() %% Pending & SetWatchdog() %% NumDWA < 0 DOWN timeout(#watchdog{status = T, pending = P, num_dwa = N, transport = TPid} = S) when T == suspect; T == reopen, P, N < 0 -> exit(TPid, {shutdown, watchdog_timeout}), close(S), S#watchdog{status = down}; %% REOPEN Timer expires & NumDWA = -1 %% Pending & SetWatchdog() %% NumDWA >= 0 REOPEN timeout(#watchdog{status = reopen, pending = true, num_dwa = N} = S) when 0 =< N -> S#watchdog{num_dwa = -1}; %% DOWN Timer expires AttemptOpen() %% SetWatchdog() DOWN %% INITIAL Timer expires AttemptOpen() %% SetWatchdog() INITIAL %% RFC 3539, 3.4.1: %% %% [5] While the connection is in the closed state, the AAA client MUST %% NOT attempt to send further watchdog messages on the connection. %% However, after the connection is closed, the AAA client continues %% to periodically attempt to reopen the connection. %% %% The AAA client SHOULD wait for the transport layer to report %% connection failure before attempting again, but MAY choose to %% bound this wait time by the watchdog interval, Tw. %% Don't bound, restarting the peer process only when the previous %% process has died. We only need to handle state down since we start %% the first watchdog when transitioning out of initial. timeout(#watchdog{status = down} = S) -> restart(S). %% restart/1 restart(#watchdog{transport = undefined} = S) -> restart(getr(restart), S); restart(S) -> S. %% Only restart the transport in the connecting case. For an accepting %% transport, we've registered the peer connection when leaving state %% initial and this is used by a new accepting process to realize that %% it's actually in state down rather then initial when receiving %% notification of an open connection. restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid, sequence = Mask} = S) -> Pid ! {reconnect, self()}, S#watchdog{transport = monitor(diameter_peer_fsm:start(T, Opts, {Mask, Svc}))}; restart({{accept, _}, _, _}, S) -> S. %% Don't currently use Opts/Svc in the accept case but having them in %% the process dictionary is helpful if the process dies unexpectedly. %% dwr/1 dwr(#diameter_caps{origin_host = OH, origin_realm = OR, origin_state_id = OSI}) -> ['DWR', {'Origin-Host', OH}, {'Origin-Realm', OR}, {'Origin-State-Id', OSI}].