path: root/lib/diameter/src/base/diameter_watchdog.erl
diff options
authorAnders Svensson <[email protected]>2011-10-14 19:40:29 +0200
committerAnders Svensson <[email protected]>2011-10-17 12:30:58 +0200
commit14bf1dc855bbd973bb15578f418e37ab2d4f17fe (patch)
treeed67388edcead44b9867eb034c84b4394d0cf117 /lib/diameter/src/base/diameter_watchdog.erl
parentf4c38ecd803451280d0021bcfa7f2ad25b96cbcd (diff)
One makefile for src build instead of recursion
Simpler, no duplication of similar makefiles and makes for better dependencies. (Aka, recursive make considered harmful.)
Diffstat (limited to 'lib/diameter/src/base/diameter_watchdog.erl')
1 files changed, 571 insertions, 0 deletions
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
new file mode 100644
index 0000000000..b7c1491f4b
--- /dev/null
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -0,0 +1,571 @@
+%% %CopyrightBegin%
+%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%% %CopyrightEnd%
+%% This module implements (as a process) the state machine documented
+%% in Appendix A of RFC 3539.
+%% towards diameter_service
+%% gen_server callbacks
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+%% diameter_watchdog_sup callback
+-define(DEFAULT_TW_INIT, 30000). %% RFC 3539 ch 3.4.1
+ {%% PCB - Peer Control Block; see RFC 3539, Appendix A
+ status = initial :: initial | okay | suspect | down | reopen,
+ pending = false :: boolean(),
+ tw :: 6000..16#FFFFFFFF | {module(), atom(), list()},
+ %% {M,F,A} -> integer() >= 0
+ num_dwa = 0 :: -1 | non_neg_integer(),
+ %% number of DWAs received during reopen
+ %% end PCB
+ parent = self() :: pid(),
+ transport :: pid(),
+ tref :: reference(), %% reference for current watchdog timer
+ message_data}). %% term passed into diameter_service with message
+%% start/2
+start({_,_} = Type, T) ->
+ {ok, Pid} = diameter_watchdog_sup:start_child({Type, self(), T}),
+ Pid.
+start_link(T) ->
+ {ok, _} = proc_lib:start_link(?MODULE,
+ init,
+ [T],
+ infinity,
+ diameter_lib:spawn_opts(server, [])).
+%% ===========================================================================
+%% ===========================================================================
+%% init/1
+init(T) ->
+ proc_lib:init_ack({ok, self()}),
+ gen_server:enter_loop(?MODULE, [], i(T)).
+i({T, Pid, {ConnT, Opts, SvcName, #diameter_service{applications = Apps,
+ capabilities = Caps}
+ = Svc}}) ->
+ {M,S,U} = now(),
+ random:seed(M,S,U),
+ putr(restart, {T, Opts, Svc}), %% save seeing it in trace
+ putr(dwr, dwr(Caps)), %%
+ #watchdog{parent = monitor(Pid),
+ transport = monitor(diameter_peer_fsm:start(T, Opts, Svc)),
+ tw = proplists:get_value(watchdog_timer,
+ Opts,
+ message_data = {ConnT, SvcName, Apps}}.
+%% handle_call/3
+handle_call(_, _, State) ->
+ {reply, nok, State}.
+%% handle_cast/2
+handle_cast(_, State) ->
+ {noreply, State}.
+%% handle_info/2
+handle_info(T, State) ->
+ case transition(T, State) of
+ ok ->
+ {noreply, State};
+ #watchdog{status = X} = S ->
+ ?LOGC(X =/= State#watchdog.status, transition, X),
+ {noreply, S};
+ stop ->
+ ?LOG(stop, T),
+ {stop, {shutdown, T}, State}
+ end.
+%% terminate/2
+terminate(_, _) ->
+ ok.
+%% code_change/3
+code_change(_, State, _) ->
+ {ok, State}.
+%% ===========================================================================
+%% ===========================================================================
+%% transition/2
+%% The state transitions documented here are extracted from RFC 3539,
+%% the commentary is ours.
+%% Service or watchdog is telling the watchdog of an accepting
+%% transport to die after reconnect_timer expiry or reestablished
+%% connection (in another transport process) respectively.
+transition(close, #watchdog{status = down}) ->
+ {{accept, _}, _, _} = getr(restart), %% assert
+ stop;
+transition(close, #watchdog{}) ->
+ ok;
+%% Service is asking for the peer to be taken down gracefully.
+transition({shutdown, Pid}, #watchdog{parent = Pid,
+ transport = undefined,
+ status = S}) ->
+ down = S, %% sanity check
+ stop;
+transition({shutdown = T, Pid}, #watchdog{parent = Pid,
+ transport = TPid}) ->
+ TPid ! {T, self()},
+ ok;
+%% Parent process has died,
+transition({'DOWN', _, process, Pid, _Reason},
+ #watchdog{parent = Pid}) ->
+ stop;
+%% Transport has accepted a connection.
+transition({accepted = T, TPid}, #watchdog{transport = TPid,
+ parent = Pid}) ->
+ Pid ! {T, self(), TPid},
+ ok;
+%% Transport is telling us that its impending death isn't failure.
+transition({close, TPid, _Reason}, #watchdog{transport = TPid}) ->
+ stop;
+%% STATE Event Actions New State
+%% ===== ------ ------- ----------
+%% INITIAL Connection up SetWatchdog() OKAY
+%% By construction, the watchdog timer isn't set until we move into
+%% state okay as the result of the Peer State Machine reaching the
+%% Open state.
+%% If we're an acceptor then we may be resuming a connection that went
+%% down in another acceptor process, in which case this is the
+%% transition below, from down into reopen. That is, it's not until
+%% we know the identity of the peer (ie. now) that we know that we're
+%% in state down rather than initial.
+transition({open, TPid, Hosts, T} = Open,
+ #watchdog{transport = TPid,
+ status = initial,
+ parent = Pid}
+ = S) ->
+ case okay(getr(restart), Hosts) of
+ okay ->
+ open(Pid, {TPid, T}),
+ set_watchdog(S#watchdog{status = okay});
+ reopen ->
+ transition(Open, S#watchdog{status = down})
+ end;
+%% DOWN Connection up NumDWA = 0
+%% SendWatchdog()
+%% SetWatchdog()
+%% Pending = TRUE REOPEN
+transition({open = P, TPid, _Hosts, T},
+ #watchdog{transport = TPid,
+ status = down}
+ = S) ->
+ %% Store the info we need to notify the parent to reopen the
+ %% connection after the requisite DWA's are received, at which
+ %% time we eraser(open).
+ putr(P, {TPid, T}),
+ set_watchdog(send_watchdog(S#watchdog{status = reopen,
+ num_dwa = 0}));
+%% OKAY Connection down CloseConnection()
+%% Failover()
+%% SetWatchdog() DOWN
+%% SUSPECT Connection down CloseConnection()
+%% SetWatchdog() DOWN
+%% REOPEN Connection down CloseConnection()
+%% SetWatchdog() DOWN
+transition({'DOWN', _, process, TPid, _},
+ #watchdog{transport = TPid,
+ status = initial}) ->
+ stop;
+transition({'DOWN', _, process, Pid, _},
+ #watchdog{transport = Pid}
+ = S) ->
+ failover(S),
+ close(S),
+ set_watchdog(S#watchdog{status = down,
+ pending = false,
+ transport = undefined});
+%% Any outstanding pending (or other messages from the transport) will
+%% have arrived before 'DOWN' since the message comes from the same
+%% process. Note that we could also get this message in the initial
+%% state.
+%% Incoming message.
+transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) ->
+ recv(Name, Pkt, S);
+%% Current watchdog has timed out.
+transition({timeout, TRef, tw}, #watchdog{tref = TRef} = S) ->
+ set_watchdog(timeout(S));
+%% Timer was canceled after message was already sent.
+transition({timeout, _, tw}, #watchdog{}) ->
+ ok;
+%% State query.
+transition({state, Pid}, #watchdog{status = S}) ->
+ Pid ! {self(), S},
+ ok.
+%% ===========================================================================
+monitor(Pid) ->
+ erlang:monitor(process, Pid),
+ Pid.
+putr(Key, Val) ->
+ put({?MODULE, Key}, Val).
+getr(Key) ->
+ get({?MODULE, Key}).
+eraser(Key) ->
+ erase({?MODULE, Key}).
+%% encode/1
+encode(Msg) ->
+ #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Msg),
+ Bin.
+%% okay/2
+okay({{accept, Ref}, _, _}, Hosts) ->
+ T = {?MODULE, connection, Ref, Hosts},
+ diameter_reg:add(T),
+ okay(diameter_reg:match(T));
+%% Register before matching so that at least one of two registering
+%% processes will match the other. (Which can't happen as long as
+%% diameter_peer_fsm guarantees at most one open connection to the same
+%% peer.)
+okay({{connect, _}, _, _}, _) ->
+ okay.
+%% The peer hasn't been connected recently ...
+okay([{_,P}]) ->
+ P = self(), %% assert
+ okay;
+%% ... or it has.
+okay(C) ->
+ [_|_] = [P ! close || {_,P} <- C, self() /= P],
+ reopen.
+%% set_watchdog/1
+set_watchdog(#watchdog{tw = TwInit,
+ tref = TRef}
+ = S) ->
+ cancel(TRef),
+ S#watchdog{tref = erlang:start_timer(tw(TwInit), self(), tw)}.
+cancel(undefined) ->
+ ok;
+cancel(TRef) ->
+ erlang:cancel_timer(TRef).
+ when is_integer(T), T >= 6000 ->
+ T - 2000 + (random:uniform(4001) - 1); %% RFC3539 jitter of +/- 2 sec.
+tw({M,F,A}) ->
+ apply(M,F,A).
+%% open/2
+open(Pid, {_,_} = T) ->
+ Pid ! {connection_up, self(), T}.
+%% failover/1
+failover(#watchdog{status = okay,
+ parent = Pid}) ->
+ Pid ! {connection_down, self()};
+failover(_) ->
+ ok.
+%% close/1
+close(#watchdog{status = down}) ->
+ ok;
+close(#watchdog{parent = Pid}) ->
+ {{T, _}, _, _} = getr(restart),
+ T == accept andalso (Pid ! {close, self()}).
+%% send_watchdog/1
+send_watchdog(#watchdog{pending = false,
+ transport = TPid}
+ = S) ->
+ TPid ! {send, encode(getr(dwr))},
+ ?LOG(send, 'DWR'),
+ S#watchdog{pending = true}.
+%% recv/3
+recv(Name, Pkt, S) ->
+ try rcv(Name, S) of
+ #watchdog{} = NS ->
+ rcv(Name, Pkt, S),
+ NS
+ catch
+ throw: {?MODULE, throwaway, #watchdog{} = NS} ->
+ NS
+ end.
+%% rcv/3
+rcv(N, _, _)
+ when N == 'CER';
+ N == 'CEA';
+ N == 'DWR';
+ N == 'DWA';
+ N == 'DPR';
+ N == 'DPA' ->
+ false;
+rcv(_, Pkt, #watchdog{transport = TPid,
+ message_data = T}) ->
+ diameter_service:receive_message(TPid, Pkt, T).
+throwaway(S) ->
+ throw({?MODULE, throwaway, S}).
+%% rcv/2
+%% INITIAL Receive DWA Pending = FALSE
+%% Throwaway() INITIAL
+%% INITIAL Receive non-DWA Throwaway() INITIAL
+rcv('DWA', #watchdog{status = initial} = S) ->
+ throwaway(S#watchdog{pending = false});
+rcv(_, #watchdog{status = initial} = S) ->
+ throwaway(S);
+%% DOWN Receive DWA Pending = FALSE
+%% Throwaway() DOWN
+%% DOWN Receive non-DWA Throwaway() DOWN
+rcv('DWA', #watchdog{status = down} = S) ->
+ throwaway(S#watchdog{pending = false});
+rcv(_, #watchdog{status = down} = S) ->
+ throwaway(S);
+%% OKAY Receive DWA Pending = FALSE
+%% SetWatchdog() OKAY
+%% OKAY Receive non-DWA SetWatchdog() OKAY
+rcv('DWA', #watchdog{status = okay} = S) ->
+ set_watchdog(S#watchdog{pending = false});
+rcv(_, #watchdog{status = okay} = S) ->
+ set_watchdog(S);
+%% SUSPECT Receive DWA Pending = FALSE
+%% Failback()
+%% SetWatchdog() OKAY
+%% SUSPECT Receive non-DWA Failback()
+%% SetWatchdog() OKAY
+rcv('DWA', #watchdog{status = suspect} = S) ->
+ failback(S),
+ set_watchdog(S#watchdog{status = okay,
+ pending = false});
+rcv(_, #watchdog{status = suspect} = S) ->
+ failback(S),
+ set_watchdog(S#watchdog{status = okay});
+%% REOPEN Receive DWA & Pending = FALSE
+%% NumDWA == 2 NumDWA++
+%% Failback() OKAY
+rcv('DWA', #watchdog{status = reopen,
+ num_dwa = 2 = N,
+ parent = Pid}
+ = S) ->
+ open(Pid, eraser(open)),
+ S#watchdog{status = okay,
+ num_dwa = N+1,
+ pending = false};
+%% REOPEN Receive DWA & Pending = FALSE
+%% NumDWA < 2 NumDWA++ REOPEN
+rcv('DWA', #watchdog{status = reopen,
+ num_dwa = N}
+ = S) ->
+ S#watchdog{num_dwa = N+1,
+ pending = false};
+%% REOPEN Receive non-DWA Throwaway() REOPEN
+rcv(_, #watchdog{status = reopen} = S) ->
+ throwaway(S).
+%% failback/1
+failback(#watchdog{parent = Pid}) ->
+ Pid ! {connection_up, self()}.
+%% timeout/1
+%% The caller sets the watchdog on the return value.
+%% OKAY Timer expires & SendWatchdog()
+%% !Pending SetWatchdog()
+%% Pending = TRUE OKAY
+%% REOPEN Timer expires & SendWatchdog()
+%% !Pending SetWatchdog()
+%% Pending = TRUE REOPEN
+timeout(#watchdog{status = T,
+ pending = false}
+ = S)
+ when T == okay;
+ T == reopen ->
+ send_watchdog(S);
+%% OKAY Timer expires & Failover()
+%% Pending SetWatchdog() SUSPECT
+timeout(#watchdog{status = okay,
+ pending = true}
+ = S) ->
+ failover(S),
+ S#watchdog{status = suspect};
+%% SUSPECT Timer expires CloseConnection()
+%% SetWatchdog() DOWN
+%% REOPEN Timer expires & CloseConnection()
+%% Pending & SetWatchdog()
+%% NumDWA < 0 DOWN
+timeout(#watchdog{status = T,
+ pending = P,
+ num_dwa = N,
+ transport = TPid}
+ = S)
+ when T == suspect;
+ T == reopen, P, N < 0 ->
+ exit(TPid, shutdown),
+ close(S),
+ S#watchdog{status = down};
+%% REOPEN Timer expires & NumDWA = -1
+%% Pending & SetWatchdog()
+%% NumDWA >= 0 REOPEN
+timeout(#watchdog{status = reopen,
+ pending = true,
+ num_dwa = N}
+ = S)
+ when 0 =< N ->
+ S#watchdog{num_dwa = -1};
+%% DOWN Timer expires AttemptOpen()
+%% SetWatchdog() DOWN
+%% INITIAL Timer expires AttemptOpen()
+%% SetWatchdog() INITIAL
+%% RFC 3539, 3.4.1:
+%% [5] While the connection is in the closed state, the AAA client MUST
+%% NOT attempt to send further watchdog messages on the connection.
+%% However, after the connection is closed, the AAA client continues
+%% to periodically attempt to reopen the connection.
+%% The AAA client SHOULD wait for the transport layer to report
+%% connection failure before attempting again, but MAY choose to
+%% bound this wait time by the watchdog interval, Tw.
+%% Don't bound, restarting the peer process only when the previous
+%% process has died. We only need to handle state down since we start
+%% the first watchdog when transitioning out of initial.
+timeout(#watchdog{status = down} = S) ->
+ restart(S).
+%% restart/1
+restart(#watchdog{transport = undefined} = S) ->
+ restart(getr(restart), S);
+restart(S) ->
+ S.
+%% Only restart the transport in the connecting case. For an accepting
+%% transport, we've registered the peer connection when leaving state
+%% initial and this is used by a new accepting process to realize that
+%% it's actually in state down rather then initial when receiving
+%% notification of an open connection.
+restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid} = S) ->
+ Pid ! {reconnect, self()},
+ S#watchdog{transport = monitor(diameter_peer_fsm:start(T, Opts, Svc))};
+restart({{accept, _}, _, _}, S) ->
+ S.
+%% Don't currently use Opts/Svc in the accept case but having them in
+%% the process dictionary is helpful if the process dies unexpectedly.
+%% dwr/1
+dwr(#diameter_caps{origin_host = OH,
+ origin_realm = OR,
+ origin_state_id = OSI}) ->
+ ['DWR', {'Origin-Host', OH},
+ {'Origin-Realm', OR},
+ {'Origin-State-Id', OSI}].