diff options
Diffstat (limited to 'lib/diameter/src')
-rw-r--r-- | lib/diameter/src/base/diameter.appup.src | 10 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter.erl | 1 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_reg.erl | 2 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 12 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_stats.erl | 79 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 111 |
6 files changed, 170 insertions, 45 deletions
diff --git a/lib/diameter/src/base/diameter.appup.src b/lib/diameter/src/base/diameter.appup.src index 2ce89579ff..c3c7fe9a38 100644 --- a/lib/diameter/src/base/diameter.appup.src +++ b/lib/diameter/src/base/diameter.appup.src @@ -28,7 +28,12 @@ {"1.2.1", [{restart_application, diameter}]}, {"1.3", [{restart_application, diameter}]}, %% R15B03 {"1.3.1", [{restart_application, diameter}]}, - {"1.4", [{restart_application, diameter}]} %% R16A + {"1.4", [{restart_application, diameter}]}, %% R16A + {"1.4.1", [{load_module, diameter_reg}, %% R16B + {load_module, diameter_stats}, + {load_module, diameter_service}, + {load_module, diameter_watchdog}, + {load_module, diameter}]} ], [ {"0.9", [{restart_application, diameter}]}, @@ -39,6 +44,7 @@ {"1.2.1", [{restart_application, diameter}]}, {"1.3", [{restart_application, diameter}]}, {"1.3.1", [{restart_application, diameter}]}, - {"1.4", [{restart_application, diameter}]} + {"1.4", [{restart_application, diameter}]}, + {"1.4.1", [{restart_application, diameter}]} ] }. diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index c67fba5f89..5f06cef020 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -336,6 +336,7 @@ call(SvcName, App, Message) -> | {length_errors, exit | handle | discard} | {reconnect_timer, 'Unsigned32'()} | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}} + | {watchdog_config, [{okay|suspect, non_neg_integer()}]} | {private, any()}. %% Predicate passed to remove_transport/2 diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index ac58e4bf5b..3197c1aee1 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -138,7 +138,7 @@ del(T) -> %% associations removed.) %% =========================================================================== --spec match(tuple()) +-spec match(any()) -> [{term(), pid()}]. match(Pat) -> diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index f1342df16c..c971646861 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -1626,16 +1626,10 @@ info_stats(#state{watchdogT = WatchdogT}) -> info_transport(S) -> PeerD = peer_dict(S, config_dict(S)), - RefsD = dict:map(fun(_, Ls) -> [P || L <- Ls, {peer, {P,_}} <- L] end, - PeerD), - Refs = lists:append(dict:fold(fun(R, Ps, A) -> [[R|Ps] | A] end, - [], - RefsD)), - Stats = diameter_stats:read(Refs), + Stats = diameter_stats:sum(dict:fetch_keys(PeerD)), dict:fold(fun(R, Ls, A) -> - Ps = dict:fetch(R, RefsD), - [[{ref, R} | transport(Ls)] ++ [stats([R|Ps], Stats)] - | A] + Cs = proplists:get_value(R, Stats, []), + [[{ref, R} | transport(Ls)] ++ [{statistics, Cs}] | A] end, [], PeerD). diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl index 8fd5ded300..b68d4af11f 100644 --- a/lib/diameter/src/base/diameter_stats.erl +++ b/lib/diameter/src/base/diameter_stats.erl @@ -28,6 +28,7 @@ -export([reg/2, reg/1, incr/3, incr/1, read/1, + sum/1, flush/1]). %% supervisor callback @@ -77,10 +78,14 @@ reg(Pid, Ref) when is_pid(Pid) -> - call({reg, Pid, Ref}). + try + call({reg, Pid, Ref}) + catch + exit: _ -> false + end. -spec reg(ref()) - -> true. + -> boolean(). reg(Ref) -> reg(self(), Ref). @@ -111,11 +116,19 @@ incr(Ctr) -> %% Retrieve counters for the specified contributors. %% --------------------------------------------------------------------------- +%% Read in the server process to ensure that counters for a dying +%% contributor aren't folded concurrently with select. + -spec read([ref()]) -> [{ref(), [{counter(), integer()}]}]. -read(Refs) -> - read(Refs, false). +read(Refs) + when is_list(Refs) -> + try call({read, Refs, false}) of + L -> to_refdict(L) + catch + exit: _ -> [] + end. read(Refs, B) -> MatchSpec = [{{{'_', '$1'}, '_'}, @@ -124,11 +137,52 @@ read(Refs, B) -> ['$_']}], L = ets:select(?TABLE, MatchSpec), B andalso delete(L), + L. + +to_refdict(L) -> lists:foldl(fun({{C,R}, N}, D) -> orddict:append(R, {C,N}, D) end, orddict:new(), L). %% --------------------------------------------------------------------------- +%% # sum(Refs) +%% +%% Retrieve counters summed over all contributors for each term. +%% --------------------------------------------------------------------------- + +-spec sum([ref()]) + -> [{ref(), [{counter(), integer()}]}]. + +sum(Refs) + when is_list(Refs) -> + try call({read, Refs}) of + L -> [{R, to_ctrdict(Cs)} || {R, [_|_] = Cs} <- L] + catch + exit: _ -> [] + end. + +read_refs(Refs) -> + [{R, readr(R)} || R <- Refs]. + +readr(Ref) -> + MatchSpec = [{{{'_', '$1'}, '_'}, + [?ORCOND([{'=:=', '$1', {const, R}} + || R <- [Ref | pids(Ref)]])], + ['$_']}], + ets:select(?TABLE, MatchSpec). + +pids(Ref) -> + MatchSpec = [{{'$1', '$2'}, + [{'=:=', '$2', {const, Ref}}], + ['$1']}], + ets:select(?TABLE, MatchSpec). + +to_ctrdict(L) -> + lists:foldl(fun({{C,_}, N}, D) -> orddict:update_counter(C, N, D) end, + orddict:new(), + L). + +%% --------------------------------------------------------------------------- %% # flush(Refs) %% %% Retrieve and delete statistics for the specified contributors. @@ -138,11 +192,10 @@ read(Refs, B) -> -> [{ref(), {counter(), integer()}}]. flush(Refs) -> - try - call({flush, Refs}) + try call({read, Refs, true}) of + L -> to_refdict(L) catch - exit: _ -> - [] + exit: _ -> [] end. %% =========================================================================== @@ -186,8 +239,14 @@ handle_call({reg, Pid, Ref}, _From, State) -> B andalso erlang:monitor(process, Pid), {reply, B, State}; -handle_call({flush, Refs}, _From, State) -> - {reply, read(Refs, true), State}; +handle_call({read, Refs, Del}, _From, State) -> + {reply, read(Refs, Del), State}; + +handle_call({read, Refs}, _, State) -> + {reply, read_refs(Refs), State}; + +handle_call({flush, Refs}, _From, State) -> %% from old code + {reply, to_refdict(read(Refs, true)), State}; handle_call(Req, From, State) -> ?UNEXPECTED([Req, From]), diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 073a415d10..82ca603cf3 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -47,6 +47,14 @@ -define(BASE, ?DIAMETER_DICT_COMMON). +-define(IS_NATURAL(N), (is_integer(N) andalso 0 =< N)). + +-define(CHOOSE(B,T,F), if (B) -> T; true -> F end). + +-record(config, + {suspect = 1 :: non_neg_integer(), %% OKAY -> SUSPECT + okay = 3 :: non_neg_integer()}). %% REOPEN -> OKAY + -record(watchdog, {%% PCB - Peer Control Block; see RFC 3539, Appendix A status = initial :: initial | okay | suspect | down | reopen, @@ -54,7 +62,8 @@ tw :: 6000..16#FFFFFFFF | {module(), atom(), list()}, %% {M,F,A} -> integer() >= 0 num_dwa = 0 :: -1 | non_neg_integer(), - %% number of DWAs received during reopen + %% number of DWAs received in reopen, + %% or number of timeouts before okay -> suspect %% end PCB parent = self() :: pid(), %% service process transport :: pid() | undefined, %% peer_fsm process @@ -64,7 +73,8 @@ %% term passed into diameter_service with incoming message sequence :: diameter:sequence(), %% mask restrict :: {diameter:restriction(), boolean()}, - shutdown = false :: boolean()}). + shutdown = false :: boolean(), + config :: #config{}}). %% --------------------------------------------------------------------------- %% start/2 @@ -129,7 +139,8 @@ i({Ack, T, Pid, {RecvData, receive_data = RecvData, dictionary = Dict0, sequence = Mask, - restrict = {Restrict, lists:member(node(), Nodes)}}. + restrict = {Restrict, lists:member(node(), Nodes)}, + config = config(Opts)}. wait(Ref, Pid) -> receive @@ -139,6 +150,27 @@ wait(Ref, Pid) -> exit({shutdown, D}) end. +%% config/1 +%% +%% Could also configure counts for SUSPECT to DOWN and REOPEN to DOWN, +%% but don't. + +config(Opts) -> + Config = proplists:get_value(watchdog_config, Opts, []), + is_list(Config) orelse config_error({watchdog_config, Config}), + lists:foldl(fun config/2, #config{}, Config). + +config({suspect, N}, Rec) + when ?IS_NATURAL(N) -> + Rec#config{suspect = N}; + +config({okay, N}, Rec) + when ?IS_NATURAL(N) -> + Rec#config{okay = N}; + +config(T, _) -> + config_error(T). + %% start/5 start(T, Opts, Mask, Nodes, Dict0, Svc) -> @@ -219,6 +251,17 @@ handle_info(T, #watchdog{} = State) -> ?LOG(stop, T), event(T, State, State#watchdog{status = down}), {stop, {shutdown, T}, State} + end; + +handle_info(T, State) -> %% started in old code + handle_info(T, upgrade(State)). + +upgrade(State) -> + case erlang:append_element(State, #config{}) of + #watchdog{status = okay, config = #config{suspect = OS}} = S -> + S#watchdog{num_dwa = OS}; + #watchdog{} = S -> + S end. close({'DOWN', _, process, TPid, {shutdown, Reason}}, @@ -331,11 +374,13 @@ transition({accepted = T, TPid}, #watchdog{transport = TPid, transition({open, TPid, Hosts, _} = Open, #watchdog{transport = TPid, status = initial, - restrict = {_, R}} + restrict = {_,R}, + config = #config{suspect = OS}} = S) -> case okay(getr(restart), Hosts, R) of okay -> - set_watchdog(S#watchdog{status = okay}); + set_watchdog(S#watchdog{status = okay, + num_dwa = OS}); reopen -> transition(Open, S#watchdog{status = down}) end; @@ -347,15 +392,22 @@ transition({open, TPid, Hosts, _} = Open, transition({open = Key, TPid, _Hosts, T}, #watchdog{transport = TPid, - status = down} + status = down, + config = #config{suspect = OS, + okay = RO}} = S) -> - %% Store the info we need to notify the parent to reopen the - %% connection after the requisite DWA's are received, at which - %% time we eraser(open). The reopen message is a later addition, - %% to communicate the new capabilities as soon as they're known. - putr(Key, {TPid, T}), - set_watchdog(send_watchdog(S#watchdog{status = reopen, - num_dwa = 0})); + case RO of + 0 -> %% non-standard: skip REOPEN + set_watchdog(S#watchdog{status = okay, + num_dwa = OS}); + _ -> + %% Store the info we need to notify the parent to reopen + %% the connection after the requisite DWA's are received, + %% at which time we eraser(open). + putr(Key, {TPid, T}), + set_watchdog(send_watchdog(S#watchdog{status = reopen, + num_dwa = 0})) + end; %% OKAY Connection down CloseConnection() %% Failover() @@ -374,7 +426,7 @@ transition({'DOWN', _, process, TPid, _Reason}, #watchdog{transport = TPid, status = T} = S) -> - set_watchdog(S#watchdog{status = case T of initial -> T; _ -> down end, + set_watchdog(S#watchdog{status = ?CHOOSE(initial == T, T, down), pending = false, transport = undefined}); @@ -553,22 +605,27 @@ rcv(_, #watchdog{status = okay} = S) -> %% SUSPECT Receive non-DWA Failback() %% SetWatchdog() OKAY -rcv('DWA', #watchdog{status = suspect} = S) -> +rcv('DWA', #watchdog{status = suspect, config = #config{suspect = OS}} = S) -> set_watchdog(S#watchdog{status = okay, + num_dwa = OS, pending = false}); -rcv(_, #watchdog{status = suspect} = S) -> - set_watchdog(S#watchdog{status = okay}); +rcv(_, #watchdog{status = suspect, config = #config{suspect = OS}} = S) -> + set_watchdog(S#watchdog{status = okay, + num_dwa = OS}); %% REOPEN Receive DWA & Pending = FALSE %% NumDWA == 2 NumDWA++ %% Failback() OKAY rcv('DWA', #watchdog{status = reopen, - num_dwa = 2 = N} - = S) -> + num_dwa = N, + config = #config{suspect = OS, + okay = RO}} + = S) + when N+1 == RO -> S#watchdog{status = okay, - num_dwa = N+1, + num_dwa = OS, pending = false}; %% REOPEN Receive DWA & Pending = FALSE @@ -607,9 +664,17 @@ timeout(#watchdog{status = T, %% Pending SetWatchdog() SUSPECT timeout(#watchdog{status = okay, - pending = true} - = S) -> - S#watchdog{status = suspect}; + pending = true, + num_dwa = N} + = S) -> + case N of + 1 -> + S#watchdog{status = suspect}; + 0 -> %% non-standard: never move to suspect + S; + N -> %% non-standard: more timeouts before moving + S#watchdog{num_dwa = N-1} + end; %% SUSPECT Timer expires CloseConnection() %% SetWatchdog() DOWN |