aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2013-03-19 13:17:30 +0100
committerAnders Svensson <[email protected]>2013-03-19 13:17:30 +0100
commit53375f432c3ba5aa5a03bcbd2cb89caba6754113 (patch)
tree5fdb22d696d04821b296bd4da2aef8019a4dbaf7
parent77c9f844f137deec2d49bd6c28ec03bce97d34df (diff)
parentda15b3c5fdc9ff96b8d02f4620c879418648d45c (diff)
downloadotp-53375f432c3ba5aa5a03bcbd2cb89caba6754113.tar.gz
otp-53375f432c3ba5aa5a03bcbd2cb89caba6754113.tar.bz2
otp-53375f432c3ba5aa5a03bcbd2cb89caba6754113.zip
Merge branch 'anders/diameter/reopen_abuse/OTP-10898' into maint-r16
* anders/diameter/reopen_abuse/OTP-10898: Update appup Tweak okay -> suspect config Add testcases for watchdog_config Rework watchdog suite to remove most timing dependencies Rework stats to avoid concurrent read and write Minor spec fix Add transport_opt() watchdog_config
-rw-r--r--lib/diameter/doc/src/diameter.xml24
-rw-r--r--lib/diameter/src/base/diameter.appup.src10
-rw-r--r--lib/diameter/src/base/diameter.erl1
-rw-r--r--lib/diameter/src/base/diameter_reg.erl2
-rw-r--r--lib/diameter/src/base/diameter_service.erl12
-rw-r--r--lib/diameter/src/base/diameter_stats.erl79
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl111
-rw-r--r--lib/diameter/test/diameter_stats_SUITE.erl19
-rw-r--r--lib/diameter/test/diameter_watchdog_SUITE.erl453
9 files changed, 514 insertions, 197 deletions
diff --git a/lib/diameter/doc/src/diameter.xml b/lib/diameter/doc/src/diameter.xml
index 379e9f0738..8e9ec06ff9 100644
--- a/lib/diameter/doc/src/diameter.xml
+++ b/lib/diameter/doc/src/diameter.xml
@@ -1125,6 +1125,30 @@ modules in order until one establishes a connection within the
corresponding timeout (see below) or all fail.</p>
</item>
+<marker id="watchdog_config"/>
+<tag><c>{watchdog_config, [{okay|suspect, non_neg_integer()}]}</c></tag>
+<item>
+<p>
+Specifies configuration that alters the behaviour of the watchdog
+state machine.
+On key <c>okay</c>, the non-negative number of answered DWR
+messages before transitioning from REOPEN to OKAY.
+On key <c>suspect</c>, the number of watchdog timeouts before
+transitioning from OKAY to SUSPECT when DWR is unanswered, or 0 to
+not make the transition.</p>
+
+<p>
+Defaults to <c>[{okay, 3}, {suspect, 1}]</c>.
+Not specifying a key is equivalent to specifying
+the default value for that key.</p>
+<warning>
+<p>
+The default value is as required by RFC 3539: changing it results
+in non-standard behaviour that should only be used to simulate
+misbehaving nodes during test.</p>
+</warning>
+</item>
+
<marker id="watchdog_timer"/>
<tag><c>{watchdog_timer, TwInit}</c></tag>
<item>
diff --git a/lib/diameter/src/base/diameter.appup.src b/lib/diameter/src/base/diameter.appup.src
index 2ce89579ff..c3c7fe9a38 100644
--- a/lib/diameter/src/base/diameter.appup.src
+++ b/lib/diameter/src/base/diameter.appup.src
@@ -28,7 +28,12 @@
{"1.2.1", [{restart_application, diameter}]},
{"1.3", [{restart_application, diameter}]}, %% R15B03
{"1.3.1", [{restart_application, diameter}]},
- {"1.4", [{restart_application, diameter}]} %% R16A
+ {"1.4", [{restart_application, diameter}]}, %% R16A
+ {"1.4.1", [{load_module, diameter_reg}, %% R16B
+ {load_module, diameter_stats},
+ {load_module, diameter_service},
+ {load_module, diameter_watchdog},
+ {load_module, diameter}]}
],
[
{"0.9", [{restart_application, diameter}]},
@@ -39,6 +44,7 @@
{"1.2.1", [{restart_application, diameter}]},
{"1.3", [{restart_application, diameter}]},
{"1.3.1", [{restart_application, diameter}]},
- {"1.4", [{restart_application, diameter}]}
+ {"1.4", [{restart_application, diameter}]},
+ {"1.4.1", [{restart_application, diameter}]}
]
}.
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl
index c67fba5f89..5f06cef020 100644
--- a/lib/diameter/src/base/diameter.erl
+++ b/lib/diameter/src/base/diameter.erl
@@ -336,6 +336,7 @@ call(SvcName, App, Message) ->
| {length_errors, exit | handle | discard}
| {reconnect_timer, 'Unsigned32'()}
| {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}}
+ | {watchdog_config, [{okay|suspect, non_neg_integer()}]}
| {private, any()}.
%% Predicate passed to remove_transport/2
diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl
index ac58e4bf5b..3197c1aee1 100644
--- a/lib/diameter/src/base/diameter_reg.erl
+++ b/lib/diameter/src/base/diameter_reg.erl
@@ -138,7 +138,7 @@ del(T) ->
%% associations removed.)
%% ===========================================================================
--spec match(tuple())
+-spec match(any())
-> [{term(), pid()}].
match(Pat) ->
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index f1342df16c..c971646861 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -1626,16 +1626,10 @@ info_stats(#state{watchdogT = WatchdogT}) ->
info_transport(S) ->
PeerD = peer_dict(S, config_dict(S)),
- RefsD = dict:map(fun(_, Ls) -> [P || L <- Ls, {peer, {P,_}} <- L] end,
- PeerD),
- Refs = lists:append(dict:fold(fun(R, Ps, A) -> [[R|Ps] | A] end,
- [],
- RefsD)),
- Stats = diameter_stats:read(Refs),
+ Stats = diameter_stats:sum(dict:fetch_keys(PeerD)),
dict:fold(fun(R, Ls, A) ->
- Ps = dict:fetch(R, RefsD),
- [[{ref, R} | transport(Ls)] ++ [stats([R|Ps], Stats)]
- | A]
+ Cs = proplists:get_value(R, Stats, []),
+ [[{ref, R} | transport(Ls)] ++ [{statistics, Cs}] | A]
end,
[],
PeerD).
diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl
index 8fd5ded300..b68d4af11f 100644
--- a/lib/diameter/src/base/diameter_stats.erl
+++ b/lib/diameter/src/base/diameter_stats.erl
@@ -28,6 +28,7 @@
-export([reg/2, reg/1,
incr/3, incr/1,
read/1,
+ sum/1,
flush/1]).
%% supervisor callback
@@ -77,10 +78,14 @@
reg(Pid, Ref)
when is_pid(Pid) ->
- call({reg, Pid, Ref}).
+ try
+ call({reg, Pid, Ref})
+ catch
+ exit: _ -> false
+ end.
-spec reg(ref())
- -> true.
+ -> boolean().
reg(Ref) ->
reg(self(), Ref).
@@ -111,11 +116,19 @@ incr(Ctr) ->
%% Retrieve counters for the specified contributors.
%% ---------------------------------------------------------------------------
+%% Read in the server process to ensure that counters for a dying
+%% contributor aren't folded concurrently with select.
+
-spec read([ref()])
-> [{ref(), [{counter(), integer()}]}].
-read(Refs) ->
- read(Refs, false).
+read(Refs)
+ when is_list(Refs) ->
+ try call({read, Refs, false}) of
+ L -> to_refdict(L)
+ catch
+ exit: _ -> []
+ end.
read(Refs, B) ->
MatchSpec = [{{{'_', '$1'}, '_'},
@@ -124,11 +137,52 @@ read(Refs, B) ->
['$_']}],
L = ets:select(?TABLE, MatchSpec),
B andalso delete(L),
+ L.
+
+to_refdict(L) ->
lists:foldl(fun({{C,R}, N}, D) -> orddict:append(R, {C,N}, D) end,
orddict:new(),
L).
%% ---------------------------------------------------------------------------
+%% # sum(Refs)
+%%
+%% Retrieve counters summed over all contributors for each term.
+%% ---------------------------------------------------------------------------
+
+-spec sum([ref()])
+ -> [{ref(), [{counter(), integer()}]}].
+
+sum(Refs)
+ when is_list(Refs) ->
+ try call({read, Refs}) of
+ L -> [{R, to_ctrdict(Cs)} || {R, [_|_] = Cs} <- L]
+ catch
+ exit: _ -> []
+ end.
+
+read_refs(Refs) ->
+ [{R, readr(R)} || R <- Refs].
+
+readr(Ref) ->
+ MatchSpec = [{{{'_', '$1'}, '_'},
+ [?ORCOND([{'=:=', '$1', {const, R}}
+ || R <- [Ref | pids(Ref)]])],
+ ['$_']}],
+ ets:select(?TABLE, MatchSpec).
+
+pids(Ref) ->
+ MatchSpec = [{{'$1', '$2'},
+ [{'=:=', '$2', {const, Ref}}],
+ ['$1']}],
+ ets:select(?TABLE, MatchSpec).
+
+to_ctrdict(L) ->
+ lists:foldl(fun({{C,_}, N}, D) -> orddict:update_counter(C, N, D) end,
+ orddict:new(),
+ L).
+
+%% ---------------------------------------------------------------------------
%% # flush(Refs)
%%
%% Retrieve and delete statistics for the specified contributors.
@@ -138,11 +192,10 @@ read(Refs, B) ->
-> [{ref(), {counter(), integer()}}].
flush(Refs) ->
- try
- call({flush, Refs})
+ try call({read, Refs, true}) of
+ L -> to_refdict(L)
catch
- exit: _ ->
- []
+ exit: _ -> []
end.
%% ===========================================================================
@@ -186,8 +239,14 @@ handle_call({reg, Pid, Ref}, _From, State) ->
B andalso erlang:monitor(process, Pid),
{reply, B, State};
-handle_call({flush, Refs}, _From, State) ->
- {reply, read(Refs, true), State};
+handle_call({read, Refs, Del}, _From, State) ->
+ {reply, read(Refs, Del), State};
+
+handle_call({read, Refs}, _, State) ->
+ {reply, read_refs(Refs), State};
+
+handle_call({flush, Refs}, _From, State) -> %% from old code
+ {reply, to_refdict(read(Refs, true)), State};
handle_call(Req, From, State) ->
?UNEXPECTED([Req, From]),
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index 073a415d10..82ca603cf3 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -47,6 +47,14 @@
-define(BASE, ?DIAMETER_DICT_COMMON).
+-define(IS_NATURAL(N), (is_integer(N) andalso 0 =< N)).
+
+-define(CHOOSE(B,T,F), if (B) -> T; true -> F end).
+
+-record(config,
+ {suspect = 1 :: non_neg_integer(), %% OKAY -> SUSPECT
+ okay = 3 :: non_neg_integer()}). %% REOPEN -> OKAY
+
-record(watchdog,
{%% PCB - Peer Control Block; see RFC 3539, Appendix A
status = initial :: initial | okay | suspect | down | reopen,
@@ -54,7 +62,8 @@
tw :: 6000..16#FFFFFFFF | {module(), atom(), list()},
%% {M,F,A} -> integer() >= 0
num_dwa = 0 :: -1 | non_neg_integer(),
- %% number of DWAs received during reopen
+ %% number of DWAs received in reopen,
+ %% or number of timeouts before okay -> suspect
%% end PCB
parent = self() :: pid(), %% service process
transport :: pid() | undefined, %% peer_fsm process
@@ -64,7 +73,8 @@
%% term passed into diameter_service with incoming message
sequence :: diameter:sequence(), %% mask
restrict :: {diameter:restriction(), boolean()},
- shutdown = false :: boolean()}).
+ shutdown = false :: boolean(),
+ config :: #config{}}).
%% ---------------------------------------------------------------------------
%% start/2
@@ -129,7 +139,8 @@ i({Ack, T, Pid, {RecvData,
receive_data = RecvData,
dictionary = Dict0,
sequence = Mask,
- restrict = {Restrict, lists:member(node(), Nodes)}}.
+ restrict = {Restrict, lists:member(node(), Nodes)},
+ config = config(Opts)}.
wait(Ref, Pid) ->
receive
@@ -139,6 +150,27 @@ wait(Ref, Pid) ->
exit({shutdown, D})
end.
+%% config/1
+%%
+%% Could also configure counts for SUSPECT to DOWN and REOPEN to DOWN,
+%% but don't.
+
+config(Opts) ->
+ Config = proplists:get_value(watchdog_config, Opts, []),
+ is_list(Config) orelse config_error({watchdog_config, Config}),
+ lists:foldl(fun config/2, #config{}, Config).
+
+config({suspect, N}, Rec)
+ when ?IS_NATURAL(N) ->
+ Rec#config{suspect = N};
+
+config({okay, N}, Rec)
+ when ?IS_NATURAL(N) ->
+ Rec#config{okay = N};
+
+config(T, _) ->
+ config_error(T).
+
%% start/5
start(T, Opts, Mask, Nodes, Dict0, Svc) ->
@@ -219,6 +251,17 @@ handle_info(T, #watchdog{} = State) ->
?LOG(stop, T),
event(T, State, State#watchdog{status = down}),
{stop, {shutdown, T}, State}
+ end;
+
+handle_info(T, State) -> %% started in old code
+ handle_info(T, upgrade(State)).
+
+upgrade(State) ->
+ case erlang:append_element(State, #config{}) of
+ #watchdog{status = okay, config = #config{suspect = OS}} = S ->
+ S#watchdog{num_dwa = OS};
+ #watchdog{} = S ->
+ S
end.
close({'DOWN', _, process, TPid, {shutdown, Reason}},
@@ -331,11 +374,13 @@ transition({accepted = T, TPid}, #watchdog{transport = TPid,
transition({open, TPid, Hosts, _} = Open,
#watchdog{transport = TPid,
status = initial,
- restrict = {_, R}}
+ restrict = {_,R},
+ config = #config{suspect = OS}}
= S) ->
case okay(getr(restart), Hosts, R) of
okay ->
- set_watchdog(S#watchdog{status = okay});
+ set_watchdog(S#watchdog{status = okay,
+ num_dwa = OS});
reopen ->
transition(Open, S#watchdog{status = down})
end;
@@ -347,15 +392,22 @@ transition({open, TPid, Hosts, _} = Open,
transition({open = Key, TPid, _Hosts, T},
#watchdog{transport = TPid,
- status = down}
+ status = down,
+ config = #config{suspect = OS,
+ okay = RO}}
= S) ->
- %% Store the info we need to notify the parent to reopen the
- %% connection after the requisite DWA's are received, at which
- %% time we eraser(open). The reopen message is a later addition,
- %% to communicate the new capabilities as soon as they're known.
- putr(Key, {TPid, T}),
- set_watchdog(send_watchdog(S#watchdog{status = reopen,
- num_dwa = 0}));
+ case RO of
+ 0 -> %% non-standard: skip REOPEN
+ set_watchdog(S#watchdog{status = okay,
+ num_dwa = OS});
+ _ ->
+ %% Store the info we need to notify the parent to reopen
+ %% the connection after the requisite DWA's are received,
+ %% at which time we eraser(open).
+ putr(Key, {TPid, T}),
+ set_watchdog(send_watchdog(S#watchdog{status = reopen,
+ num_dwa = 0}))
+ end;
%% OKAY Connection down CloseConnection()
%% Failover()
@@ -374,7 +426,7 @@ transition({'DOWN', _, process, TPid, _Reason},
#watchdog{transport = TPid,
status = T}
= S) ->
- set_watchdog(S#watchdog{status = case T of initial -> T; _ -> down end,
+ set_watchdog(S#watchdog{status = ?CHOOSE(initial == T, T, down),
pending = false,
transport = undefined});
@@ -553,22 +605,27 @@ rcv(_, #watchdog{status = okay} = S) ->
%% SUSPECT Receive non-DWA Failback()
%% SetWatchdog() OKAY
-rcv('DWA', #watchdog{status = suspect} = S) ->
+rcv('DWA', #watchdog{status = suspect, config = #config{suspect = OS}} = S) ->
set_watchdog(S#watchdog{status = okay,
+ num_dwa = OS,
pending = false});
-rcv(_, #watchdog{status = suspect} = S) ->
- set_watchdog(S#watchdog{status = okay});
+rcv(_, #watchdog{status = suspect, config = #config{suspect = OS}} = S) ->
+ set_watchdog(S#watchdog{status = okay,
+ num_dwa = OS});
%% REOPEN Receive DWA & Pending = FALSE
%% NumDWA == 2 NumDWA++
%% Failback() OKAY
rcv('DWA', #watchdog{status = reopen,
- num_dwa = 2 = N}
- = S) ->
+ num_dwa = N,
+ config = #config{suspect = OS,
+ okay = RO}}
+ = S)
+ when N+1 == RO ->
S#watchdog{status = okay,
- num_dwa = N+1,
+ num_dwa = OS,
pending = false};
%% REOPEN Receive DWA & Pending = FALSE
@@ -607,9 +664,17 @@ timeout(#watchdog{status = T,
%% Pending SetWatchdog() SUSPECT
timeout(#watchdog{status = okay,
- pending = true}
- = S) ->
- S#watchdog{status = suspect};
+ pending = true,
+ num_dwa = N}
+ = S) ->
+ case N of
+ 1 ->
+ S#watchdog{status = suspect};
+ 0 -> %% non-standard: never move to suspect
+ S;
+ N -> %% non-standard: more timeouts before moving
+ S#watchdog{num_dwa = N-1}
+ end;
%% SUSPECT Timer expires CloseConnection()
%% SetWatchdog() DOWN
diff --git a/lib/diameter/test/diameter_stats_SUITE.erl b/lib/diameter/test/diameter_stats_SUITE.erl
index af52afb59c..76ff764671 100644
--- a/lib/diameter/test/diameter_stats_SUITE.erl
+++ b/lib/diameter/test/diameter_stats_SUITE.erl
@@ -33,6 +33,7 @@
-export([reg/1,
incr/1,
read/1,
+ sum/1,
flush/1]).
-define(stat, diameter_stats).
@@ -53,6 +54,7 @@ tc() ->
[reg,
incr,
read,
+ sum,
flush].
init_per_suite(Config) ->
@@ -98,6 +100,23 @@ read(_) ->
[] = ?stat:read([make_ref()]),
?stat:flush([self(), Ref, make_ref()]).
+sum(_) ->
+ Ref = make_ref(),
+ C1 = {a,b},
+ C2 = {b,a},
+ true = ?stat:reg(Ref),
+ 1 = ?stat:incr(C1),
+ 1 = ?stat:incr(C2),
+ 2 = ?stat:incr(C2),
+ 7 = ?stat:incr(C1, Ref, 7),
+ [{Ref, [{C1,8}, {C2,2}]}]
+ = ?stat:sum([Ref, make_ref()]),
+ Self = self(),
+ [{Self, [{C1,1}, {C2,2}]}]
+ = ?stat:sum([self()]),
+ [{Ref, [{C1,7}]}, {Self, [{C1,1}, {C2,2}]}]
+ = lists:sort(?stat:flush([self(), Ref])).
+
flush(_) ->
Ref = make_ref(),
Ctr = '_',
diff --git a/lib/diameter/test/diameter_watchdog_SUITE.erl b/lib/diameter/test/diameter_watchdog_SUITE.erl
index e1e166b834..704bf110c7 100644
--- a/lib/diameter/test/diameter_watchdog_SUITE.erl
+++ b/lib/diameter/test/diameter_watchdog_SUITE.erl
@@ -30,10 +30,14 @@
end_per_suite/1]).
%% testcases
--export([reopen/1, reopen/4, reopen/7]).
+-export([reopen/0, reopen/1, reopen/4, reopen/6,
+ suspect/1, suspect/4,
+ okay/1, okay/4]).
-export([id/1, %% jitter callback
- run1/1]).
+ run1/1,
+ abuse/1,
+ abuse/2]).
%% diameter_app callbacks
-export([peer_up/3,
@@ -64,7 +68,7 @@
{'Host-IP-Address', [?ADDR]},
{'Vendor-Id', 42},
{'Product-Name', "OTP/diameter"},
- {'Auth-Application-Id', [?DIAMETER_APP_ID_COMMON]},
+ {'Auth-Application-Id', [0 = ?BASE:id()]},
{application, [{alias, Name},
{dictionary, ?BASE},
{module, ?MODULE}]}]).
@@ -72,48 +76,51 @@
%% Watchdog timer as a callback.
-define(WD(T), {?MODULE, id, [T]}).
-%% Watchdog timers used by the testcases. Note that the short timeout
-%% with random jitter is excluded since the reopen/1 isn't smart
-%% enough to deal with it: see ONE_WD below.
--define(WD_TIMERS, [?WD(6000)
- | [F_(T_) || T_ <- [10000, 20000, 30000],
- F_ <- [fun(T__) -> T__ end,
- fun(T__) -> ?WD(T__) end]]]).
+%% Watchdog timers used by the testcases.
+-define(WD_TIMERS, [10000, ?WD(10000)]).
-%% Watchdog timer of the misbehaving peer.
+%% Watchdog timer of the misbehaving node.
-define(PEER_WD, 10000).
-%% Receive a watchdog event within a specified time.
--define(EVENT(T, Tmo),
- receive #diameter_event{info = T} -> now()
- after Tmo -> ?ERROR({timeout, Tmo})
- end).
-
-%% Receive an event in a given number of watchdogs, plus or minus
-%% half. Note that the call to now_diff assumes left to right
-%% evaluation order.
--define(EVENT(T, N, WdL, WdH),
- [?ERROR({received, _Elapsed_, _LowerBound_, N, WdL})
- || _UpperBound_ <- [(N)*(WdH) + (WdH) div 2],
- _Elapsed_ <- [now_diff(now(), ?EVENT(T, _UpperBound_))],
- _LowerBound_ <- [(N)*(WdL) - (WdL) div 2],
- _Elapsed_ =< _LowerBound_*1000]).
-
--define(EVENT(T, N, Wd),
- ?EVENT(T, N, Wd, Wd)).
-
-%% A timeout that ensures one watchdog. The ensure only one watchdog
+%% A timeout that ensures one watchdog. To ensure only one watchdog
%% requires (Wd + 2000) + 1000 < 2*(Wd - 2000) ==> 7000 < Wd for the
%% case with random jitter.
-define(ONE_WD(Wd), jitter(Wd,2000) + 1000).
+-define(INFO(T), #diameter_event{info = T}).
+
+%% Receive an event message from diameter.
+-define(EVENT(T), %% apply to not bind T_
+ apply(fun() ->
+ receive ?INFO(T = T_) -> log_event(T_) end
+ end,
+ [])).
+
+%% Receive a watchdog event.
+-define(WD_EVENT(Ref), log_wd(element(4, ?EVENT({watchdog, Ref, _, _, _})))).
+-define(WD_EVENT(Ref, Ms),
+ apply(fun() ->
+ receive ?INFO({watchdog, Ref, _, T_, _}) ->
+ log_wd(T_)
+ after Ms ->
+ false
+ end
+ end,
+ [])).
+
+%% Log to make failures identifiable.
+-define(LOG(T), ?LOG("~p", [T])).
+-define(LOG(F,A), ct:pal("~p: " ++ F, [self() | A])).
+-define(WARN(F,A), ct:pal(error, "~p: " ++ F, [self() | A])).
%% ===========================================================================
suite() ->
- [{timetrap, {minutes, 10}}].%% enough for 17 watchdogs @ 30 sec plus jitter
+ [{timetrap, {seconds, 90}}].
all() ->
- [reopen].
+ [reopen,
+ suspect,
+ okay].
init_per_suite(Config) ->
ok = diameter:start(),
@@ -129,83 +136,46 @@ end_per_suite(_Config) ->
%% Test the watchdog state machine for the required failover, failback
%% and reopen behaviour by examining watchdog events.
+reopen() ->
+ [{timetrap, {minutes, 5}}]. %% 20 watchdogs @ 15 sec
+
reopen(_) ->
- [] = run([[reopen, T, Wd, N, M]
- || Wd <- ?WD_TIMERS, %% watchdog_timer value
- T <- [listen, connect], %% watchdog to test
+ [] = run([[reopen, T, W, N, M]
+ || T <- [listen, connect], %% watchdog to test
+ W <- ?WD_TIMERS, %% watchdog_timer value
N <- [0,1,2], %% DWR's to answer before ignoring
M <- ['DWR', 'DWA', 'RAA']]). %% how to induce failback
-reopen(Type, Wd, N, M) ->
- Server = start_service(),
- Client = start_service(),
-
- %% The peer to the transport whose watchdog is tested is given a
- %% long watchdog timeout so that it doesn't send DWR of its own.
- {Node, Peer} = {{[], Wd}, {[{module, ?MODULE}], ?WD(?PEER_WD)}},
-
- {{LH,LW},{CH,CW}} = case Type of
- listen -> {Node, Peer};
- connect -> {Peer, Node}
- end,
-
- LO = [{transport_module, diameter_tcp},
- {transport_config, LH ++ [{ip, ?ADDR}, {port, 0}]},
- {watchdog_timer, LW}],
-
- {ok, LRef} = diameter:add_transport(Server, {listen, LO}),
-
- [LP] = ?util:lport(tcp, LRef, 20),
-
- CO = [{transport_module, diameter_tcp},
- {transport_config, CH ++ [{ip, ?ADDR}, {port, 0},
- {raddr, ?ADDR}, {rport, LP}]},
- {watchdog_timer, CW}],
-
- %% Use a temporary process to ensure the connecting transport is
- %% added only once events from the listening transport are
- %% subscribed to.
- Pid = spawn(fun() -> receive _ -> ok end end),
+reopen(Test, Wd, N, M) ->
+ %% Publish a ref ensure the connecting transport is added only
+ %% once events from the listening transport are subscribed to.
+ Ref = make_ref(),
+ [] = run([[reopen, T, Test, Ref, Wd, N, M] || T <- [listen, connect]]).
- [] = run([[reopen, Type, T, LRef, Pid, Wd, N, M]
- || T <- [{listen, Server}, {connect, Client, CO}]]).
+%% reopen/6
-%% start_service/1
+reopen(Type, Test, Ref, Wd, N, M) ->
+ {SvcName, TRef} = start(Type, Ref, cfg(Type, Test, Wd)),
+ reopen(Type, Test, SvcName, TRef, Wd, N, M).
-start_service() ->
- Name = hostname(),
- ok = diameter:start_service(Name, [{monitor, self()} | ?SERVICE(Name)]),
- Name.
+cfg(Type, Type, Wd) ->
+ {Wd, [], []};
+cfg(_Type, _Test, _Wd) ->
+ {?WD(?PEER_WD), [{okay, 0}], [{module, ?MODULE}]}.
%% reopen/7
-reopen(Type, {listen = T, SvcName}, Ref, Pid, Wd, N, M) ->
- diameter:subscribe(SvcName),
- Pid ! ok,
- recv(Type, T, SvcName, Ref, Wd, N, M);
-
-reopen(Type, {connect = T, SvcName, Opts}, _, Pid, Wd, N, M) ->
- diameter:subscribe(SvcName),
- MRef = erlang:monitor(process, Pid),
- receive {'DOWN', MRef, process, _, _} -> ok end,
- {ok, Ref} = diameter:add_transport(SvcName, {T, Opts}),
- recv(Type, T, SvcName, Ref, Wd, N, M).
-
-%% recv/7
-
%% The watchdog to be tested.
-recv(Type, Type, _SvcName, Ref, Wd, N, M) ->
+reopen(Type, Type, SvcName, Ref, Wd, N, M) ->
+ ?LOG("node ~p", [[Type, SvcName, Ref, Wd, N, M]]),
+
%% Connection should come up immediately as a consequence of
%% starting the watchdog process. In the accepting case this
%% results in a new watchdog on a transport waiting for a new
%% connection.
- ?EVENT({watchdog, Ref, _, {initial, okay}, _}, 2000),
- ?EVENT({up, Ref, _, _, #diameter_packet{}}, 0),
-
- %% Low/high watchdog timeouts.
- WdL = jitter(Wd, -2000),
- WdH = jitter(Wd, 2000),
+ {initial, okay} = ?WD_EVENT(Ref),
+ ?EVENT({up, Ref, _, _, #diameter_packet{}}),
%% OKAY Timer expires & Failover()
%% Pending SetWatchdog() SUSPECT
@@ -215,8 +185,13 @@ recv(Type, Type, _SvcName, Ref, Wd, N, M) ->
%% the first unanswered DWR. Knowing the min/max watchdog timeout
%% values gives the time interval in which the event is expected.
- ?EVENT({watchdog, Ref, _, {okay, suspect}, _}, N+2, WdL, WdH),
- ?EVENT({down, Ref, _, _}, 0),
+ [0,0,0,0] = wd_counts(SvcName),
+
+ {okay, suspect} = ?WD_EVENT(Ref),
+ ?EVENT({down, Ref, _, _}),
+
+ %% N received DWA's
+ [_,_,_,N] = wd_counts(SvcName),
%% SUSPECT Receive DWA Pending = FALSE
%% Failback()
@@ -228,8 +203,13 @@ recv(Type, Type, _SvcName, Ref, Wd, N, M) ->
%% The peer sends a message before the expiry of another watchdog
%% to induce failback.
- ?EVENT({watchdog, Ref, _, {suspect, okay}, _}, WdH + 2000),
- ?EVENT({up, Ref, _, _}, 0),
+ {suspect, okay} = ?WD_EVENT(Ref),
+ ?EVENT({up, Ref, _, _}),
+
+ %% N+1 sent DWR's, N/N+1 received DWA's
+ R1 = N+1,
+ A1 = choose(M == 'DWA', R1, N),
+ [R1,_,_,A1] = wd_counts(SvcName),
%% OKAY Timer expires & SendWatchdog()
%% !Pending SetWatchdog()
@@ -242,16 +222,19 @@ recv(Type, Type, _SvcName, Ref, Wd, N, M) ->
%% back down after either one or two watchdog expiries, depending
%% on whether or not DWA restored the connection.
- F = choose(M == 'DWA', 2, 1),
- ?EVENT({watchdog, Ref, _, {okay, suspect}, _}, F, WdL, WdH),
- ?EVENT({down, Ref, _, _}, 0),
+ {okay, suspect} = ?WD_EVENT(Ref),
+ ?EVENT({down, Ref, _, _}),
%% SUSPECT Timer expires CloseConnection()
%% SetWatchdog() DOWN
%%
%% Non-response brings the connection down after another timeout.
- ?EVENT({watchdog, Ref, _, {suspect, down}, _}, 1, WdL, WdH),
+ {suspect, down} = ?WD_EVENT(Ref),
+
+ R2 = R1 + choose(M == 'DWA', 1, 0),
+ A2 = A1,
+ [R2,_,_,A2] = wd_counts(SvcName),
%% DOWN Timer expires AttemptOpen()
%% SetWatchdog() DOWN
@@ -263,7 +246,7 @@ recv(Type, Type, _SvcName, Ref, Wd, N, M) ->
%%
%% The connection is reestablished after another timeout.
- recv_reopen(Type, Ref, WdL, WdH),
+ recv_reopen(Type, Ref),
%% REOPEN Receive non-DWA Throwaway() REOPEN
%%
@@ -281,18 +264,27 @@ recv(Type, Type, _SvcName, Ref, Wd, N, M) ->
%% An exchange of 3 watchdogs (the first directly after
%% capabilities exchange) brings the connection back up.
- ?EVENT({watchdog, Ref, _, {reopen, okay}, _}, 2, WdL, WdH),
- ?EVENT({up, Ref, _, _, #diameter_packet{}}, 0),
+ {reopen, okay} = ?WD_EVENT(Ref),
+ ?EVENT({up, Ref, _, _, #diameter_packet{}}),
+
+ %% Three DWR's have been answered.
+ R3 = R2 + 3,
+ A3 = A2 + 3,
+ [R3,_,_,A3] = wd_counts(SvcName),
%% Non-response brings it down again.
- ?EVENT({watchdog, Ref, _, {okay, suspect}, _}, 2, WdL, WdH),
- ?EVENT({down, Ref, _, _}, 0),
- ?EVENT({watchdog, Ref, _, {suspect, down}, _}, 1, WdL, WdH),
+ {okay, suspect} = ?WD_EVENT(Ref),
+ ?EVENT({down, Ref, _, _}),
+ {suspect, down} = ?WD_EVENT(Ref),
+
+ R4 = R3 + 1,
+ A4 = A3,
+ [R4,_,_,A4] = wd_counts(SvcName),
%% Reestablish after another watchdog.
- recv_reopen(Type, Ref, WdL, WdH),
+ recv_reopen(Type, Ref),
%% REOPEN Timer expires & NumDWA = -1
%% Pending & SetWatchdog()
@@ -305,63 +297,76 @@ recv(Type, Type, _SvcName, Ref, Wd, N, M) ->
%% Peer is now ignoring all watchdogs go down again after 2
%% timeouts.
- ?EVENT({watchdog, Ref, _, {reopen, down}, _}, 2, WdL, WdH);
+ {reopen, down} = ?WD_EVENT(Ref);
%% The misbehaving peer.
-recv(_, Type, SvcName, Ref, Wd, N, M) ->
+reopen(Type, _, SvcName, Ref, Wd, N, M) ->
+ ?LOG("peer ~p", [[Type, SvcName, Ref, Wd, N, M]]),
+
%% First transport process.
- ?EVENT({watchdog, Ref, _, {initial, okay}, _}, 1000),
- ?EVENT({up, Ref, _, _, #diameter_packet{}}, 0),
- reg(Type, Ref, SvcName, {SvcName, {Wd,N,M}}),
- ?EVENT({watchdog, Ref, _, {okay, down}, _}, infinity),
+ {initial, okay} = ?WD_EVENT(Ref),
+ ?EVENT({up, Ref, _, _, #diameter_packet{}}),
+
+ reg(Ref, SvcName, {SvcName, {Wd,N,M}}),
+
+ {okay, down} = ?WD_EVENT(Ref),
%% Second transport process.
- ?EVENT({watchdog, Ref, _, {_, reopen}, _}, infinity),
- reg(Type, Ref, SvcName, 3),
- ?EVENT({watchdog, Ref, _, {_, down}, _}, infinity),
+ ?EVENT({watchdog, Ref, _, {_, okay}, _}),
+ reg(Ref, SvcName, 3), %% answer 3 watchdogs then fall silent
+ ?EVENT({watchdog, Ref, _, {_, down}, _}),
%% Third transport process.
- ?EVENT({watchdog, Ref, _, {_, reopen}, _}, infinity),
- reg(Type, Ref, SvcName, 0),
- ?EVENT({watchdog, Ref, _, {_, down}, _}, infinity),
+ ?EVENT({watchdog, Ref, _, {_, okay}, _}),
+ reg(Ref, SvcName, 0), %% disable outgoing DWA
+ ?EVENT({watchdog, Ref, _, {_, down}, _}),
ok.
-%% recv_reopen/4
+log_wd({From, To} = T) ->
+ ?LOG("~p -> ~p", [From, To]),
+ T.
+
+log_event(E) ->
+ T = element(1,E),
+ T == watchdog orelse ?LOG("~p", [T]),
+ E.
-recv_reopen(connect, Ref, WdL, WdH) ->
- ?EVENT({watchdog, Ref, _, {_, reopen}, _}, 1, WdL, WdH),
- ?EVENT({reconnect, Ref, _}, 0);
+%% recv_reopen/2
-recv_reopen(listen, Ref, _, _) ->
- ?EVENT({watchdog, Ref, _, {_, reopen}, _}, 1, ?PEER_WD).
+recv_reopen(connect, Ref) ->
+ {down, reopen} = ?WD_EVENT(Ref),
+ ?EVENT({reconnect, Ref, _});
-%% reg/4
+recv_reopen(listen, Ref) ->
+ {_, reopen} = ?WD_EVENT(Ref).
+
+%% reg/3
%%
%% Lookup the pid of the transport process and publish a term for
%% send/2 to lookup.
-reg(Type, Ref, SvcName, T) ->
- TPid = tpid(Type, Ref, diameter:service_info(SvcName, transport)),
+reg(TRef, SvcName, T) ->
+ TPid = tpid(TRef, diameter:service_info(SvcName, transport)),
true = diameter_reg:add_new({?MODULE, TPid, T}).
-%% tpid/3
-
-tpid(connect, Ref, [[{ref, Ref},
- {type, connect},
- {options, _},
- {watchdog, _},
- {peer, _},
- {apps, _},
- {caps, _},
- {port, [{owner, TPid} | _]}
- | _]]) ->
+%% tpid/2
+
+tpid(Ref, [[{ref, Ref},
+ {type, connect},
+ {options, _},
+ {watchdog, _},
+ {peer, _},
+ {apps, _},
+ {caps, _},
+ {port, [{owner, TPid} | _]}
+ | _]]) ->
TPid;
-tpid(listen, Ref, [[{ref, Ref},
- {type, listen},
- {options, _},
- {accept, As}
- | _]]) ->
+tpid(Ref, [[{ref, Ref},
+ {type, listen},
+ {options, _},
+ {accept, As}
+ | _]]) ->
[[{watchdog, _},
{peer, _},
{apps, _},
@@ -375,6 +380,154 @@ tpid(listen, Ref, [[{ref, Ref},
TPid.
%% ===========================================================================
+%% # suspect/1
+%% ===========================================================================
+
+%% Configure transports to require a set number of watchdog timeouts
+%% before moving from OKAY to SUSPECT.
+
+suspect(_) ->
+ [] = run([[abuse, [suspect, N]] || N <- [0,1,3]]).
+
+suspect(Type, Fake, Ref, N)
+ when is_reference(Ref) ->
+ {SvcName, TRef}
+ = start(Type, Ref, {?WD(10000), [{suspect, N}], mod(Fake)}),
+ {initial, okay} = ?WD_EVENT(TRef),
+ suspect(TRef, Fake, SvcName, N);
+
+suspect(TRef, true, SvcName, _) ->
+ reg(TRef, SvcName, 0), %% disable outgoing DWA
+ {okay, _} = ?WD_EVENT(TRef);
+
+suspect(TRef, false, SvcName, 0) -> %% SUSPECT disabled
+ %% Wait 2+ watchdogs and see that only one watchdog has been sent.
+ false = ?WD_EVENT(TRef, 28000),
+ [1,0,0,0] = wd_counts(SvcName);
+
+suspect(TRef, false, SvcName, N) ->
+ %% Check that no watchdog transition takes place within N+
+ %% watchdogs ...
+ false = ?WD_EVENT(TRef, N*10000+8000),
+ [1,0,0,0] = wd_counts(SvcName),
+ %% ... but that the connection then becomes suspect ...
+ {okay, suspect} = ?WD_EVENT(TRef, 10000),
+ [1,0,0,0] = wd_counts(SvcName),
+ %% ... and goes down.
+ {suspect, down} = ?WD_EVENT(TRef, 18000),
+ [1,0,0,0] = wd_counts(SvcName).
+
+%% abuse/1
+
+abuse(F) ->
+ [] = run([[abuse, F, T] || T <- [listen, connect]]).
+
+abuse(F, [_,_,_|_] = Args) ->
+ ?LOG("~p", [Args]),
+ apply(?MODULE, F, Args);
+
+abuse([F|A], Test) ->
+ Ref = make_ref(),
+ [] = run([[abuse, F, [T, T == Test, Ref] ++ A]
+ || T <- [listen, connect]]);
+
+abuse(F, Test) ->
+ abuse([F], Test).
+
+mod(true) ->
+ [{module, ?MODULE}];
+mod(false) ->
+ [].
+
+%% ===========================================================================
+%% # okay/1
+%% ===========================================================================
+
+%% Configure the number of watchdog exchanges before moving from
+%% REOPEN to OKAY.
+
+okay(_) ->
+ [] = run([[abuse, [okay, N]] || N <- [0,2,3]]).
+
+okay(Type, Fake, Ref, N)
+ when is_reference(Ref) ->
+ {SvcName, TRef}
+ = start(Type, Ref, {?WD(10000),
+ [{okay, choose(Fake, 0, N)}],
+ mod(Fake)}),
+ {initial, okay} = ?WD_EVENT(TRef),
+ okay(TRef,
+ Fake,
+ SvcName,
+ choose(Type == listen, initial, down),
+ N).
+
+okay(TRef, true, SvcName, Down, _) ->
+ reg(TRef, SvcName, 0), %% disable outgoing DWA
+ {okay, down} = ?WD_EVENT(TRef),
+ {Down, okay} = ?WD_EVENT(TRef),
+ reg(TRef, SvcName, -1), %% enable outgoing DWA
+ {okay, down} = ?WD_EVENT(TRef);
+
+okay(TRef, false, SvcName, Down, N) ->
+ {okay, suspect} = ?WD_EVENT(TRef),
+ [1,0,0,0] = wd_counts(SvcName),
+ {suspect, down} = ?WD_EVENT(TRef),
+ ok(TRef, SvcName, Down, N).
+
+ok(TRef, SvcName, Down, 0) ->
+ %% Connection comes up without watchdog exchange.
+ {Down, okay} = ?WD_EVENT(TRef),
+ [1,0,0,0] = wd_counts(SvcName),
+ %% Wait 2+ watchdog timeouts to see that the connection stays up
+ %% and two watchdogs are exchanged.
+ false = ?WD_EVENT(TRef, 28000),
+ [3,0,0,2] = wd_counts(SvcName);
+
+ok(TRef, SvcName, Down, N) ->
+ %% Connection required watchdog exchange before reaching OKAY.
+ {Down, reopen} = ?WD_EVENT(TRef),
+ {reopen, okay} = ?WD_EVENT(TRef),
+ %% One DWR was sent in moving to expect, plus N more to reopen the
+ %% connection.
+ N1 = N+1,
+ [N1,0,0,N] = wd_counts(SvcName).
+
+%% ===========================================================================
+
+%% wd_counts/1
+
+wd_counts(SvcName) ->
+ [Info] = diameter:service_info(SvcName, transport),
+ {_, Counters} = lists:keyfind(statistics, 1, Info),
+ [proplists:get_value({{0,280,R}, D}, Counters, 0) || D <- [send,recv],
+ R <- [1,0]].
+
+%% start/3
+
+start(Type, Ref, T) ->
+ Name = hostname(),
+ true = diameter:subscribe(Name),
+ ok = diameter:start_service(Name, [{monitor, self()} | ?SERVICE(Name)]),
+ {ok, TRef} = diameter:add_transport(Name, {Type, opts(Type, Ref, T)}),
+ true = diameter_reg:add_new({Type, Ref, Name}),
+ {Name, TRef}.
+
+opts(Type, Ref, {Timer, Config, Mod}) ->
+ [{transport_module, diameter_tcp},
+ {transport_config, Mod ++ [{ip, ?ADDR}, {port, 0}] ++ cfg(Type, Ref)},
+ {watchdog_timer, Timer},
+ {watchdog_config, Config}].
+
+cfg(listen, _) ->
+ [];
+cfg(connect, Ref) ->
+ [{{_, _, SvcName}, _Pid}] = diameter_reg:wait({listen, Ref, '_'}),
+ [[{ref, LRef} | _]] = diameter:service_info(SvcName, transport),
+ [LP] = ?util:lport(tcp, LRef, 20),
+ [{raddr, ?ADDR}, {rport, LP}].
+
+%% ===========================================================================
listen(PortNr, Opts) ->
gen_tcp:listen(PortNr, Opts).
@@ -396,6 +549,7 @@ send(Sock, Bin) ->
%% First outgoing message from a new transport process is CER/CEA.
%% Remaining outgoing messages are either DWR or DWA.
send(undefined, Sock, Bin) ->
+ <<_:32, _:8, 257:24, _/binary>> = Bin,
putr(config, init),
gen_tcp:send(Sock, Bin);
@@ -505,15 +659,10 @@ run1([F|A]) ->
catch
E:R ->
S = erlang:get_stacktrace(),
- io:format("~p~n", [{A, E, R, S}]),
+ ?WARN("~p", [{A, E, R, S}]),
S
end.
-%% now_diff/2
-
-now_diff(T1, T2) ->
- timer:now_diff(T2, T1).
-
%% jitter/2
jitter(?WD(T), _) ->