diff options
Diffstat (limited to 'lib/diameter/src/base')
-rw-r--r-- | lib/diameter/src/base/diameter.erl | 1 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_codec.erl | 31 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_config.erl | 8 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_lib.erl | 62 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer.erl | 5 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 7 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_reg.erl | 5 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 533 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_stats.erl | 18 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_sync.erl | 5 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_traffic.erl | 88 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 55 |
12 files changed, 528 insertions, 290 deletions
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index e82c2c168c..de88f6befd 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -312,6 +312,7 @@ call(SvcName, App, Message) -> | {sequence, sequence() | evaluable()} | {share_peers, remotes()} | {string_decode, boolean()} + | {strict_mbit, boolean()} | {incoming_maxlen, message_length()} | {use_shared_peers, remotes()} | {spawn_opt, list()}. diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl index 34276a1674..1ea5357924 100644 --- a/lib/diameter/src/base/diameter_codec.erl +++ b/lib/diameter/src/base/diameter_codec.erl @@ -77,9 +77,10 @@ setopts(Opts) when is_list(Opts) -> lists:foreach(fun setopt/1, Opts). -%% Decode stringish types to string()? The default true is for -%% backwards compatibility. -setopt({string_decode = K, false = B}) -> +%% The default string_decode true is for backwards compatibility. +setopt({K, false = B}) + when K == string_decode; + K == strict_mbit -> setopt(K, B); %% Regard anything but the generated RFC 3588 dictionary as modern. @@ -97,7 +98,8 @@ setopt(Key, Value) -> getopt(Key) -> case get({diameter, Key}) of - undefined when Key == string_decode -> + undefined when Key == string_decode; + Key == strict_mbit -> true; undefined when Key == rfc -> 6733; @@ -657,16 +659,23 @@ split_data(Bin, Len) -> %% The normal case here is data as an #diameter_avp{} list or an %% iolist, which are the cases that generated codec modules use. The -%% other case is as a convenience in the relay case in which the +%% other cases are a convenience in the relay case in which the %% dictionary doesn't know about specific AVP's. -%% Grouped AVP whose components need packing ... -pack_avp([#diameter_avp{} = A | Avps]) -> - pack_avp(A#diameter_avp{data = Avps}); -pack_avp(#diameter_avp{data = [#diameter_avp{} | _] = Avps} = A) -> - pack_avp(A#diameter_avp{data = encode_avps(Avps)}); +%% Decoded Grouped AVP with decoded components: ignore components +%% since they're already encoded in the Grouped AVP. +pack_avp([#diameter_avp{} = Grouped | _Components]) -> + pack_avp(Grouped); -%% ... data as a type/value tuple ... +%% Grouped AVP whose components need packing. It's intentional that +%% this isn't equivalent to [Grouped | Components]: here the +%% components need to be encoded before wrapping with the Grouped AVP, +%% and the list is flat, nesting being accomplished in the data +%% fields. +pack_avp(#diameter_avp{data = [#diameter_avp{} | _] = Components} = Grouped) -> + pack_avp(Grouped#diameter_avp{data = encode_avps(Components)}); + +%% Data as a type/value tuple ... pack_avp(#diameter_avp{data = {Type, Value}} = A) when is_atom(Type) -> pack_avp(A#diameter_avp{data = diameter_types:Type(encode, Value)}); diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index 5ae4ff1f46..702f11593a 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -38,8 +38,7 @@ -module(diameter_config). -behaviour(gen_server). --compile({no_auto_import, [monitor/2, now/0]}). --import(diameter_lib, [now/0]). +-compile({no_auto_import, [monitor/2]}). -export([start_service/2, stop_service/1, @@ -70,7 +69,7 @@ -include("diameter_internal.hrl"). %% Server state. --record(state, {id = now()}). +-record(state, {id = diameter_lib:now()}). %% Registered name of the server. -define(SERVER, ?MODULE). @@ -648,6 +647,7 @@ make_config(SvcName, Opts) -> {?NOMASK, sequence}, {nodes, restrict_connections}, {16#FFFFFF, incoming_maxlen}, + {true, strict_mbit}, {true, string_decode}, {[], spawn_opt}]), @@ -686,12 +686,14 @@ opt(K, false = B) K == use_shared_peers; K == monitor; K == restrict_connections; + K == strict_mbit; K == string_decode -> B; opt(K, true = B) when K == share_peers; K == use_shared_peers; + K == strict_mbit; K == string_decode -> B; diff --git a/lib/diameter/src/base/diameter_lib.erl b/lib/diameter/src/base/diameter_lib.erl index b5b37dcf62..43b0ca24ab 100644 --- a/lib/diameter/src/base/diameter_lib.erl +++ b/lib/diameter/src/base/diameter_lib.erl @@ -103,32 +103,18 @@ fmt(T) -> %% # now/0 %% --------------------------------------------------------------------------- --type timestamp() :: {non_neg_integer(), 0..999999, 0..999999}. --type now() :: integer() %% monotonic time - | timestamp(). - -spec now() - -> now(). - -%% Use monotonic time if it exists, fall back to erlang:now() -%% otherwise. + -> integer(). now() -> - try - erlang:monotonic_time() - catch - error: undef -> erlang:now() - end. + erlang:monotonic_time(). %% --------------------------------------------------------------------------- %% # timestamp/1 %% --------------------------------------------------------------------------- --spec timestamp(NowT :: now()) - -> timestamp(). - -timestamp({_,_,_} = T) -> %% erlang:now() - T; +-spec timestamp(integer()) + -> erlang:timestamp(). timestamp(MonoT) -> %% monotonic time MicroSecs = monotonic_to_microseconds(MonoT + erlang:time_offset()), @@ -142,30 +128,27 @@ monotonic_to_microseconds(MonoT) -> %% # now_diff/1 %% --------------------------------------------------------------------------- --spec now_diff(NowT :: now()) +-spec now_diff(T0 :: integer()) -> {Hours, Mins, Secs, MicroSecs} when Hours :: non_neg_integer(), Mins :: 0..59, Secs :: 0..59, MicroSecs :: 0..999999. -%% Return timer:now_diff(now(), NowT) as an {H, M, S, MicroS} tuple -%% instead of as integer microseconds. +%% Return time difference as an {H, M, S, MicroS} tuple instead of as +%% integer microseconds. -now_diff(Time) -> - time(micro_diff(Time)). +now_diff(T0) -> + time(micro_diff(T0)). %% --------------------------------------------------------------------------- %% # micro_diff/1 %% --------------------------------------------------------------------------- --spec micro_diff(NowT :: now()) +-spec micro_diff(T0 :: integer()) -> MicroSecs when MicroSecs :: non_neg_integer(). -micro_diff({_,_,_} = T0) -> - timer:now_diff(erlang:now(), T0); - micro_diff(T0) -> %% monotonic time monotonic_to_microseconds(erlang:monotonic_time() - T0). @@ -173,16 +156,12 @@ micro_diff(T0) -> %% monotonic time %% # micro_diff/2 %% --------------------------------------------------------------------------- --spec micro_diff(T1 :: now(), T0 :: now()) +-spec micro_diff(T1 :: integer(), T0 :: integer()) -> MicroSecs when MicroSecs :: non_neg_integer(). -micro_diff(T1, T0) - when is_integer(T1), is_integer(T0) -> %% monotonic time - monotonic_to_microseconds(T1 - T0); - -micro_diff(T1, T0) -> %% at least one erlang:now() - timer:now_diff(timestamp(T1), timestamp(T0)). +micro_diff(T1, T0) -> %% monotonic time + monotonic_to_microseconds(T1 - T0). %% --------------------------------------------------------------------------- %% # time/1 @@ -190,19 +169,13 @@ micro_diff(T1, T0) -> %% at least one erlang:now() %% Return an elapsed time as an {H, M, S, MicroS} tuple. %% --------------------------------------------------------------------------- --spec time(NowT | Diff) +-spec time(Diff :: non_neg_integer()) -> {Hours, Mins, Secs, MicroSecs} - when NowT :: timestamp(), - Diff :: non_neg_integer(), - Hours :: non_neg_integer(), + when Hours :: non_neg_integer(), Mins :: 0..59, Secs :: 0..59, MicroSecs :: 0..999999. -time({_,_,_} = NowT) -> %% time of day - %% 24 hours = 24*60*60*1000000 = 86400000000 microsec - time(timer:now_diff(NowT, {0,0,0}) rem 86400000000); - time(Micro) -> %% elapsed time Seconds = Micro div 1000000, H = Seconds div 3600, @@ -215,7 +188,7 @@ time(Micro) -> %% elapsed time %% --------------------------------------------------------------------------- -spec seed() - -> {timestamp(), {integer(), integer(), integer()}}. + -> {erlang:timestamp(), {integer(), integer(), integer()}}. %% Return an argument for random:seed/1. @@ -225,9 +198,6 @@ seed() -> %% seed/1 -seed({_,_,_} = T) -> - T; - seed(T) -> %% monotonic time {erlang:phash2(node()), T, erlang:unique_integer()}. diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index acec91c43f..2759f17e64 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -21,9 +21,6 @@ -module(diameter_peer). -behaviour(gen_server). --compile({no_auto_import, [now/0]}). --import(diameter_lib, [now/0]). - %% Interface towards transport modules ... -export([recv/2, up/1, @@ -60,7 +57,7 @@ -define(SERVER, ?MODULE). %% Server state. --record(state, {id = now()}). +-record(state, {id = diameter_lib:now()}). %% Default transport_module/config. -define(DEFAULT_TMOD, diameter_tcp). diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index f5e04d3eae..2b23183d18 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -315,7 +315,7 @@ handle_info(T, #state{} = State) -> ?LOG(stop, Reason), {stop, {shutdown, Reason}, State}; stop -> - ?LOG(stop, T), + ?LOG(stop, truncate(T)), {stop, {shutdown, T}, State} catch exit: {diameter_codec, encode, T} = Reason -> @@ -348,6 +348,11 @@ code_change(_, State, _) -> %% --------------------------------------------------------------------------- %% --------------------------------------------------------------------------- +truncate({'DOWN' = T, _, process, Pid, _}) -> + {T, Pid}; +truncate(T) -> + T. + putr(Key, Val) -> put({?MODULE, Key}, Val). diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index ce29baae45..7f198080ba 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -25,8 +25,7 @@ -module(diameter_reg). -behaviour(gen_server). --compile({no_auto_import, [monitor/2, now/0]}). --import(diameter_lib, [now/0]). +-compile({no_auto_import, [monitor/2]}). -export([add/1, add_new/1, @@ -68,7 +67,7 @@ %% Table entry containing the Term -> Pid mapping. -define(MAPPING(Term, Pid), {Term, Pid}). --record(state, {id = now(), +-record(state, {id = diameter_lib:now(), q = []}). %% [{From, Pat}] %% =========================================================================== diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 5cb7fa5abe..87ef2e522d 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -25,9 +25,6 @@ -module(diameter_service). -behaviour(gen_server). --compile({no_auto_import, [now/0]}). --import(diameter_lib, [now/0]). - %% towards diameter_service_sup -export([start_link/1]). @@ -86,16 +83,8 @@ -define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1 -define(RESTART_TC, 1000). %% if restart was this recent -%% Used to be able to swap this with anything else dict-like but now -%% rely on the fact that a service's #state{} record does not change -%% in storing in it ?STATE table and not always going through the -%% service process. In particular, rely on the fact that operations on -%% a ?Dict don't change the handle to it. --define(Dict, diameter_dict). - -%% Maintains state in a table. In contrast to previously, a service's -%% stat is not constant and is accessed outside of the service -%% process. +%% Maintain state in a table since a service's state is accessed +%% outside of the service process. -define(STATE_TABLE, ?MODULE). %% The default sequence mask. @@ -115,23 +104,23 @@ %% to determine whether or not we need to call the process for a %% pick_peer callback in the statefull case. -record(state, - {id = now(), + {id = diameter_lib:now(), service_name :: diameter:service_name(), %% key in ?STATE_TABLE service :: #diameter_service{}, watchdogT = ets_new(watchdogs) %% #watchdog{} at start :: ets:tid(), - peerT = ets_new(peers) %% #peer{pid = TPid} at okay/reopen - :: ets:tid(), - shared_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...] - :: ets:tid(), - local_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...] - :: ets:tid(), + peerT, %% undefined in new code, but remain for upgrade + shared_peers, %% reasons. Replaced by local/remote. + local_peers, %% + local :: {ets:tid(), ets:tid(), ets:tid()}, + remote :: {ets:tid(), ets:tid(), ets:tid()}, monitor = false :: false | pid(), %% process to die with options :: [{sequence, diameter:sequence()} %% sequence mask | {share_peers, diameter:remotes()} %% broadcast to | {use_shared_peers, diameter:remotes()} %% use from | {restrict_connections, diameter:restriction()} + | {strict_mbit, boolean()} | {string_decode, boolean()} | {incoming_maxlen, diameter:message_length()}]}). %% shared_peers reflects the peers broadcast from remote nodes. @@ -144,18 +133,20 @@ ref :: match(reference()), %% key into diameter_config options :: match([diameter:transport_opt()]),%% from start_transport state = ?WD_INITIAL :: match(wd_state()), - started = now(), %% at process start + started = diameter_lib:now(),%% at process start peer = false :: match(boolean() | pid())}). %% true at accepted, pid() at okay/reopen -%% Record representing an Peer State Machine processes implemented by +%% Record representing a Peer State Machine processes implemented by %% diameter_peer_fsm. -record(peer, - {pid :: pid(), - apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias} - caps :: #diameter_caps{}, - started = now(), %% at process start - watchdog :: pid()}). %% key into watchdogT + {pid :: pid(), + apps :: match([{0..16#FFFFFFFF, diameter:app_alias()}] %% {Id, Alias} + | [diameter:app_alias()]), %% remote + caps :: match(#diameter_caps{}), + started = diameter_lib:now(), %% at process start or sharing + watchdog :: match(pid() %% key into watchdogT + | undefined)}). %% undefined if remote %% --------------------------------------------------------------------------- %% # start/1 @@ -183,7 +174,7 @@ stop(SvcName) -> end. stop(ok, Pid) -> - MRef = erlang:monitor(process, Pid), + MRef = monitor(process, Pid), receive {'DOWN', MRef, process, _, _} -> ok end; stop(No, _) -> No. @@ -210,7 +201,7 @@ stop_transport(SvcName, [_|_] = Refs) -> info(SvcName, Item) -> case lookup_state(SvcName) of - [#state{} = S] -> + [S] -> service_info(Item, S); [] -> undefined @@ -219,7 +210,12 @@ info(SvcName, Item) -> %% lookup_state/1 lookup_state(SvcName) -> - ets:lookup(?STATE_TABLE, SvcName). + case ets:lookup(?STATE_TABLE, SvcName) of + [#state{}] = L -> + L; + _ -> + [] + end. %% --------------------------------------------------------------------------- %% # subscribe/1 @@ -492,6 +488,9 @@ transition({service, Pid}, S) -> transition({peer, TPid, Aliases, Caps}, S) -> remote_peer_up(TPid, Aliases, Caps, S), ok; +transition({peer, TPid}, S) -> + remote_peer_down(TPid, S), + ok; %% Remote peer process has died. transition({'DOWN', _, process, TPid, _}, S) -> @@ -511,7 +510,7 @@ transition(Req, S) -> %% # terminate/2 %% --------------------------------------------------------------------------- -terminate(Reason, #state{service_name = Name, peerT = PeerT} = S) -> +terminate(Reason, #state{service_name = Name, local = {PeerT, _, _}} = S) -> send_event(Name, stop), ets:delete(?STATE_TABLE, Name), @@ -533,23 +532,80 @@ terminate(Reason, #state{service_name = Name, peerT = PeerT} = S) -> %% # code_change/3 %% --------------------------------------------------------------------------- -code_change(FromVsn, - #state{service_name = SvcName, - service = #diameter_service{applications = Apps}} - = S, - Extra) -> - lists:foreach(fun(A) -> - code_change(FromVsn, SvcName, Extra, A) - end, - Apps), +code_change(_FromVsn, #state{} = S, _Extra) -> + {ok, S}; + +%% Don't support downgrade since we won't in appup. +code_change({down = T, _}, _, _Extra) -> + {error, T}; + +%% Upgrade local/shared peers dicts populated in old code. Don't +code_change(_FromVsn, S0, _Extra) -> + {state, Id, SvcName, Svc, WT, PeerT, SDict, LDict, Monitor, Opts} + = S0, + + init_peers(LT = setelement(1, {PT, _, _} = init_peers(), PeerT), + fun({_,A}) -> A end), + init_peers(init_peers(RT = init_peers(), SDict), + fun(A) -> A end), + + S = #state{id = Id, + service_name = SvcName, + service = Svc, + watchdogT = WT, + peerT = PT, %% empty + shared_peers = SDict, + local_peers = LDict, + local = LT, + remote = RT, + monitor = Monitor, + options = Opts}, + + %% Replacing the table entry and deleting the old shared tables + %% can make outgoing requests return {error, no_connection} until + %% everyone is running new code. Don't delete the tables to avoid + %% crashing request processes. + ets:delete_all_objects(SDict), + ets:delete_all_objects(LDict), + ets:insert(?STATE_TABLE, S), {ok, S}. -code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) -> - {ok, S} = cb(A, code_change, [FromVsn, - mod_state(Alias), - Extra, - SvcName]), - mod_state(Alias, S). +%% init_peers/2 + +%% Populate app and identity bags from a new-style #peer{} sets. +init_peers({PeerT, _, _} = T, F) + when is_function(F) -> + ets:foldl(fun(#peer{pid = P, apps = As, caps = C}, N) -> + insert_peer(P, lists:map(F, As), C, T), + N+1 + end, + 0, + PeerT); + +%% Populate #peer{} table given a shared peers dict. +init_peers({PeerT, _, _}, SDict) -> + dict:fold(fun(P, As, N) -> + ets:update_element(PeerT, P, {#peer.apps, As}), + N+1 + end, + 0, + diameter_dict:fold(fun(A, Ps, D) -> + init_peers(A, Ps, PeerT, D) + end, + dict:new(), + SDict)). + +%% init_peers/4 + +init_peers(App, TCs, PeerT, Dict) -> + lists:foldl(fun({P,C}, D) -> + ets:insert(PeerT, #peer{pid = P, + apps = [], + caps = C}), + dict:append(P, App, D) + end, + Dict, + TCs). %% =========================================================================== %% =========================================================================== @@ -557,9 +613,6 @@ code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) -> unexpected(F, A, #state{service_name = Name}) -> ?UNEXPECTED(F, A ++ [Name]). -cb(#diameter_app{module = [_|_] = M}, F, A) -> - eval(M, F, A). - eval([M|X], F, A) -> apply(M, F, A ++ X). @@ -579,10 +632,6 @@ choose(false, _, X) -> X. ets_new(Tbl) -> ets:new(Tbl, [{keypos, 2}]). -insert(Tbl, Rec) -> - ets:insert(Tbl, Rec), - Rec. - %% Using the process dictionary for the callback state was initially %% just a way to make what was horrendous trace (big state record and %% much else everywhere) somewhat more readable. There's not as much @@ -683,6 +732,8 @@ cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts}, lists:foreach(fun init_mod/1, Apps), S = #state{service_name = SvcName, service = Rec#diameter_service{pid = self()}, + local = init_peers(), + remote = init_peers(), monitor = mref(get_value(monitor, Opts)), options = service_options(Opts)}, {S, Acc}; @@ -692,6 +743,13 @@ cfg_acc({_Ref, Type, _Opts} = T, {S, Acc}) Type == listen -> {S, [T | Acc]}. +init_peers() -> + {ets_new(caps), %% #peer{} + ets:new(apps, [bag]), %% {Alias, TPid} + ets:new(idents, [bag])}. %% {{host, OH} | {realm, OR} | {OR, OH}, + %% Alias, + %% TPid} + service_options(Opts) -> [{sequence, proplists:get_value(sequence, Opts, ?NOMASK)}, {share_peers, get_value(share_peers, Opts)}, @@ -701,13 +759,14 @@ service_options(Opts) -> ?RESTRICT)}, {spawn_opt, proplists:get_value(spawn_opt, Opts, [])}, {string_decode, proplists:get_value(string_decode, Opts, true)}, - {incoming_maxlen, proplists:get_value(incoming_maxlen, Opts, 16#FFFFFF)}]. + {incoming_maxlen, proplists:get_value(incoming_maxlen, Opts, 16#FFFFFF)}, + {strict_mbit, proplists:get_value(strict_mbit, Opts, true)}]. %% The order of options is significant since we match against the list. mref(false = No) -> No; mref(P) -> - erlang:monitor(process, P). + monitor(process, P). init_shared(#state{options = [_, _, {_,T} | _], service_name = Svc}) -> @@ -806,7 +865,7 @@ start(Ref, Type, Opts, State) -> %% start/5 start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT, - peerT = PeerT, + local = {PeerT, _, _}, options = SvcOpts, service_name = SvcName, service = Svc0}) @@ -837,7 +896,7 @@ binary_caps(#diameter_service{capabilities = Caps} = Svc, false) -> wd(Type, Ref, T, WatchdogT, Rec) -> Pid = start_watchdog(Type, Ref, T), - insert(WatchdogT, Rec#watchdog{pid = Pid}), + ets:insert(WatchdogT, Rec#watchdog{pid = Pid}), Pid. %% Note that the service record passed into the watchdog is the merged @@ -894,8 +953,8 @@ accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) -> #watchdog{ref = Ref, type = accept = T, peer = false, options = Opts} = Wd = fetch(WatchdogT, Pid), - insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement as started - start(Ref, T, Opts, S). %% start new watchdog + ets:insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement started + start(Ref, T, Opts, S). %% start new watchdog fetch(Tid, Key) -> [T] = ets:lookup(Tid, Key), @@ -921,13 +980,14 @@ watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) -> #watchdog{peer = TPid} = Wd, %% assert connection_up(Wd, State); -%% Watchdog has an unresponsive connection. +%% Watchdog has an unresponsive connection. Note that the peer table +%% entry isn't removed until DOWN. watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) -> #watchdog{peer = TPid} = Wd, %% assert watchdog_down(Wd, To, State); %% Watchdog has lost its connection. -watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) -> +watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{local = {PeerT, _, _}} = S) -> close(Wd), watchdog_down(Wd, To, S), ets:delete(PeerT, TPid); @@ -936,7 +996,7 @@ watchdog(_, [], _, _, _, _) -> ok. watchdog_down(Wd, To, #state{watchdogT = WatchdogT} = S) -> - insert(WatchdogT, Wd#watchdog{state = To}), + ets:insert(WatchdogT, Wd#watchdog{state = To}), connection_down(Wd, To, S). %% --------------------------------------------------------------------------- @@ -948,14 +1008,14 @@ watchdog_down(Wd, To, #state{watchdogT = WatchdogT} = S) -> connection_up({TPid, {Caps, SupportedApps, Pkt}}, #watchdog{pid = Pid} = Wd, - #state{peerT = PeerT} + #state{local = {PeerT, _, _}} = S) -> - Pr = #peer{pid = TPid, - apps = SupportedApps, - caps = Caps, - watchdog = Pid}, - insert(PeerT, Pr), - connection_up([Pkt], Wd#watchdog{peer = TPid}, Pr, S). + Rec = #peer{pid = TPid, + apps = SupportedApps, + caps = Caps, + watchdog = Pid}, + ets:insert(PeerT, Rec), + connection_up([Pkt], Wd#watchdog{peer = TPid}, Rec, S). %% --------------------------------------------------------------------------- %% # reopen/3 @@ -965,22 +1025,23 @@ reopen({TPid, {Caps, SupportedApps, _Pkt}}, #watchdog{pid = Pid} = Wd, #state{watchdogT = WatchdogT, - peerT = PeerT}) -> - insert(PeerT, #peer{pid = TPid, - apps = SupportedApps, - caps = Caps, - watchdog = Pid}), - insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN, - peer = TPid}). + local = {PeerT, _, _}}) -> + ets:insert(PeerT, #peer{pid = TPid, + apps = SupportedApps, + caps = Caps, + watchdog = Pid}), + ets:insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN, + peer = TPid}). %% --------------------------------------------------------------------------- %% # connection_up/2 %% --------------------------------------------------------------------------- -%% Watchdog has recovered as suspect connection. Note that there has +%% Watchdog has recovered a suspect connection. Note that there has %% been no new capabilties exchange in this case. -connection_up(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> +connection_up(#watchdog{peer = TPid} = Wd, #state{local = {PeerT, _, _}} + = S) -> connection_up([], Wd, fetch(PeerT, TPid), S). %% connection_up/4 @@ -991,23 +1052,23 @@ connection_up(Extra, #peer{apps = SApps, caps = Caps} = Pr, #state{watchdogT = WatchdogT, - local_peers = LDict, + local = LT, service_name = SvcName, service = #diameter_service{applications = Apps}} = S) -> - insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}), + ets:insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}), diameter_traffic:peer_up(TPid), - insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), + local_peer_up(SApps, {TPid, Caps}, {SvcName, Apps}, LT), report_status(up, Wd, Pr, S, Extra). -insert_local_peer(SApps, T, LDict) -> - lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps). +local_peer_up(SApps, {TPid, Caps} = TC, SA, LT) -> + insert_peer(TPid, [A || {_,A} <- SApps], Caps, LT), + lists:foreach(fun(A) -> peer_up(A, TC, SA) end, SApps). -ilp({Id, Alias}, {TC, SA}, LDict) -> - init_conn(Id, Alias, TC, SA), - ?Dict:append(Alias, TC, LDict). +peer_up({Id, Alias}, TC, SA) -> + peer_up(Id, Alias, TC, SA). -init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) -> +peer_up(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) -> #diameter_app{id = Id} %% assert = App = find_app(Alias, Apps), @@ -1105,17 +1166,17 @@ connection_down(#watchdog{state = ?WD_OKAY, = Pr, #state{service_name = SvcName, service = #diameter_service{applications = Apps}, - local_peers = LDict} + local = LT} = S) -> report_status(down, Wd, Pr, S, []), - remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), + local_peer_down(SApps, {TPid, Caps}, {SvcName, Apps}, LT), diameter_traffic:peer_down(TPid); connection_down(#watchdog{state = ?WD_OKAY, peer = TPid} = Wd, To, - #state{peerT = PeerT} + #state{local = {PeerT, _, _}} = S) when is_atom(To) -> connection_down(Wd, #peer{} = fetch(PeerT, TPid), S); @@ -1123,15 +1184,14 @@ connection_down(#watchdog{state = ?WD_OKAY, connection_down(#watchdog{}, _, _) -> ok. -remove_local_peer(SApps, T, LDict) -> - lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). +local_peer_down(SApps, {TPid, _Caps} = TC, SA, LT) -> + delete_peer(TPid, LT), + lists:foreach(fun(A) -> peer_down(A, TC, SA) end, SApps). -rlp({Id, Alias}, {TC, SA}, LDict) -> - L = ?Dict:fetch(Alias, LDict), - down_conn(Id, Alias, TC, SA), - ?Dict:store(Alias, lists:delete(TC, L), LDict). +peer_down({Id, Alias}, TC, SA) -> + peer_down(Id, Alias, TC, SA). -down_conn(Id, Alias, TC, {SvcName, Apps}) -> +peer_down(Id, Alias, TC, {SvcName, Apps}) -> #diameter_app{id = Id} %% assert = App = find_app(Alias, Apps), @@ -1156,7 +1216,7 @@ wd_down(#watchdog{peer = B}, _) ok; %% ... or maybe it has. -wd_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> +wd_down(#watchdog{peer = TPid} = Wd, #state{local = {PeerT, _, _}} = S) -> connection_down(Wd, ?WD_DOWN, S), ets:delete(PeerT, TPid). @@ -1364,19 +1424,37 @@ share_peer(up, Caps, Apps, TPid, #state{options = [_, {_,T} | _], service_name = Svc}) -> notify(T, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps}); -share_peer(_, _, _, _, _) -> - ok. +share_peer(down, _Caps, _Apps, TPid, #state{options = [_, {_,T} | _], + service_name = Svc}) -> + notify(T, Svc, {peer, TPid}). %% --------------------------------------------------------------------------- %% # share_peers/2 %% --------------------------------------------------------------------------- -share_peers(Pid, #state{options = [_, {_,T} | _], local_peers = PDict}) -> - is_remote(Pid, T) - andalso ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict). - -sp(Pid, Alias, Peers) -> - lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers). +share_peers(Pid, #state{options = [_, {_,SP} | _], + local = {PeerT, AppT, _}}) -> + is_remote(Pid, SP) + andalso ets:foldl(fun(T, N) -> N + sp(Pid, AppT, T) end, + 0, + PeerT). + +%% An entry in the peer table doesn't mean the watchdog state is OKAY, +%% an entry in the app table does. + +sp(Pid, AppT, #peer{pid = TPid, + apps = [{_, Alias} | _] = Apps, + caps = Caps}) -> + Spec = [{{'$1', TPid}, + [{'==', '$1', {const, Alias}}], + ['$_']}], + case ets:select(AppT, Spec, 1) of + '$end_of_table' -> + 0; + _ -> + Pid ! {peer, TPid, [A || {_,A} <- Apps], Caps}, + 1 + end. is_remote(Pid, T) -> Node = node(Pid), @@ -1386,32 +1464,49 @@ is_remote(Pid, T) -> %% # remote_peer_up/4 %% --------------------------------------------------------------------------- -remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) -> - is_remote(Pid, T) - andalso rpu(Pid, Aliases, Caps, S). +remote_peer_up(TPid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) -> + is_remote(TPid, T) andalso rpu(TPid, Aliases, Caps, S). -rpu(Pid, Aliases, Caps, #state{service = Svc, shared_peers = PDict}) -> +rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) -> #diameter_service{applications = Apps} = Svc, Key = #diameter_app.alias, F = fun(A) -> lists:keymember(A, Key, Apps) end, - rpu(Pid, lists:filter(F, Aliases), Caps, PDict); + rpu(TPid, lists:filter(F, Aliases), Caps, RT); rpu(_, [] = No, _, _) -> No; -rpu(Pid, Aliases, Caps, PDict) -> - erlang:monitor(process, Pid), - T = {Pid, Caps}, - lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases). +rpu(TPid, Aliases, Caps, {PeerT, _, _} = RT) -> + monitor(process, TPid), + ets:insert(PeerT, #peer{pid = TPid, + apps = Aliases, + caps = Caps}), + insert_peer(TPid, Aliases, Caps, RT). + +%% insert_peer/4 + +insert_peer(TPid, Aliases, Caps, {_PeerT, AppT, IdentT}) -> + #diameter_caps{origin_host = {_, OH}, + origin_realm = {_, OR}} + = Caps, + ets:insert(AppT, [{A, TPid} || A <- Aliases]), + H = iolist_to_binary(OH), + R = iolist_to_binary(OR), + ets:insert(IdentT, [{T, A, TPid} || T <- [{host, H}, {realm, R}, {R, H}], + A <- Aliases]). %% --------------------------------------------------------------------------- %% # remote_peer_down/2 %% --------------------------------------------------------------------------- -remote_peer_down(Pid, #state{shared_peers = PDict}) -> - lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)). +remote_peer_down(TPid, #state{remote = {PeerT, _, _} = RT}) -> + ets:delete(PeerT, TPid), + delete_peer(TPid, RT). -rpd(Pid, Alias, PDict) -> - ?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict). +%% delete_peer/2 + +delete_peer(TPid, {_PeerT, AppT, IdentT}) -> + ets:select_delete(AppT, [{{'_', TPid}, [], [true]}]), + ets:select_delete(IdentT, [{{'_', '_', TPid}, [], [true]}]). %% --------------------------------------------------------------------------- %% pick_peer/4 @@ -1421,12 +1516,12 @@ pick_peer(#diameter_app{alias = Alias} = App, RealmAndHost, Filter, - #state{local_peers = L, - shared_peers = S, + #state{local = LT, + remote = RT, service_name = SvcName, service = #diameter_service{pid = Pid}}) -> - pick_peer(peers(Alias, RealmAndHost, Filter, L), - peers(Alias, RealmAndHost, Filter, S), + pick_peer(peers(Alias, RealmAndHost, Filter, LT), + peers(Alias, RealmAndHost, Filter, RT), Pid, SvcName, App). @@ -1491,54 +1586,196 @@ pick_peer(Local, %% peers/4 -peers(Alias, RH, Filter, Peers) -> - case ?Dict:find(Alias, Peers) of - {ok, L} -> - filter(L, RH, Filter); - error -> +peers(Alias, RH, Filter, T) -> + filter(Alias, RH, Filter, T, true). + +%% filter/5 +%% +%% Try to limit the peers list by starting with a host/realm lookup. + +filter(Alias, RH, {neg, F}, T, B) -> + filter(Alias, RH, F, T, not B); + +filter(_, _, none, _, false) -> + []; + +filter(Alias, _, none, T, true) -> + all_peers(Alias, T); + +filter(Alias, [DR,DH] = RH, K, T, B) + when K == realm, DR == undefined; + K == host, DH == undefined -> + filter(Alias, RH, none, T, B); + +filter(Alias, [DR,_] = RH, realm = K, T, B) -> + filter(Alias, RH, {K, DR}, T, B); + +filter(Alias, [_,DH] = RH, host = K, T, B) -> + filter(Alias, RH, {K, DH}, T, B); + +filter(Alias, _, {K, D}, {PeerT, _AppT, IdentT}, true) + when K == host; + K == realm -> + try iolist_to_binary(D) of + B -> + caps(PeerT, ets:select(IdentT, [{{{K, B}, '$1', '$2'}, + [{'==', '$1', {const, Alias}}], + ['$2']}])) + catch + error:_ -> [] - end. + end; -%% filter/3 +filter(Alias, RH, {all, Filters}, T, B) + when is_list(Filters) -> + fltr_all(Alias, RH, Filters, T, B); + +filter(Alias, RH, {first, Filters}, T, B) + when is_list(Filters) -> + fltr_first(Alias, RH, Filters, T, B); + +filter(Alias, RH, Filter, T, B) -> + {Ts, Fs} = filter(all_peers(Alias, T), RH, Filter), + choose(B, Ts, Fs). + +%% fltr_all/5 + +fltr_all(Alias, RH, [{K, any} | Filters], T, B) + when K == host; + K == realm -> + fltr_all(Alias, RH, Filters, T, B); + +fltr_all(Alias, RH, [{host, _} = H, {realm, _} = R | Filters], T, B) -> + fltr_all(Alias, RH, [R, H | Filters], T, B); + +fltr_all(Alias, RH, [{realm, _} = R, {host, any} | Filters], T, B) -> + fltr_all(Alias, RH, [R | Filters], T, B); + +fltr_all(Alias, RH, [{realm, OR}, {host, OH} | Filters], T, true) -> + {PeerT, _AppT, IdentT} = T, + try {iolist_to_binary(OR), iolist_to_binary(OH)} of + BT -> + Peers = caps(PeerT, + ets:select(IdentT, [{{BT, '$1', '$2'}, + [{'==', '$1', {const, Alias}}], + ['$2']}])), + {Ts, _} = filter(Peers, RH, {all, Filters}), + Ts + catch + error:_ -> + [] + end; + +fltr_all(Alias, [undefined,_] = RH, [realm | Filters], T, B) -> + fltr_all(Alias, RH, Filters, T, B); + +fltr_all(Alias, [DR,_] = RH, [realm | Filters], T, B) -> + fltr_all(Alias, RH, [{realm, DR} | Filters], T, B); + +fltr_all(Alias, [_,undefined] = RH, [host | Filters], T, B) -> + fltr_all(Alias, RH, Filters, T, B); + +fltr_all(Alias, [_,DH] = RH, [host | Filters], T, B) -> + fltr_all(Alias, RH, [{host, DH} | Filters], T, B); + +fltr_all(Alias, RH, [{K, _} = KT, KA | Filters], T, B) + when K == host, KA == realm; + K == realm, KA == host -> + fltr_all(Alias, RH, [KA, KT | Filters], T, B); + +fltr_all(Alias, RH, [F | Filters], T, B) -> + {Ts, Fs} = filter(filter(Alias, RH, F, T, B), RH, {all, Filters}), + choose(B, Ts, Fs); + +fltr_all(Alias, RH, [], T, B) -> + filter(Alias, RH, none, T, B). + +%% fltr_first/5 %% -%% Return peers in match order. +%% Like any, but stop at the first filter with any matches. -filter(Peers, RH, Filter) -> - {Ts, _} = fltr(Peers, RH, Filter), - Ts. +fltr_first(Alias, RH, [F | Filters], T, B) -> + case filter(Alias, RH, F, T, B) of + [] -> + fltr_first(Alias, RH, Filters, T, B); + [_|_] = Ts -> + Ts + end; -%% fltr/4 +fltr_first(Alias, RH, [], T, B) -> + filter(Alias, RH, none, T, not B). -fltr(Peers, _, none) -> +%% all_peers/2 + +all_peers(Alias, {PeerT, AppT, _}) -> + ets:select(PeerT, [{#peer{pid = P, caps = '$1', _ = '_'}, + [], + [{{P, '$1'}}]} + || {_,P} <- ets:lookup(AppT, Alias)]). + +%% caps/2 + +caps(PeerT, Pids) -> + ets:select(PeerT, [{#peer{pid = P, caps = '$1', _ = '_'}, + [], + [{{P, '$1'}}]} + || P <- Pids]). + +%% filter/3 +%% +%% Return peers in match order. + +filter(Peers, _, none) -> {Peers, []}; -fltr(Peers, RH, {neg, F}) -> - {Ts, Fs} = fltr(Peers, RH, F), +filter(Peers, RH, {neg, F}) -> + {Ts, Fs} = filter(Peers, RH, F), {Fs, Ts}; -fltr(Peers, RH, {all, L}) +filter(Peers, RH, {all, L}) when is_list(L) -> lists:foldl(fun(F,A) -> fltr_all(F, A, RH) end, {Peers, []}, L); -fltr(Peers, RH, {any, L}) +filter(Peers, RH, {any, L}) when is_list(L) -> lists:foldl(fun(F,A) -> fltr_any(F, A, RH) end, {[], Peers}, L); -fltr(Peers, RH, F) -> +filter(Peers, RH, {first, L}) + when is_list(L) -> + fltr_first(Peers, RH, L); + +filter(Peers, RH, F) -> lists:partition(fun({_,C}) -> caps_filter(C, RH, F) end, Peers). +%% fltr_all/3 + fltr_all(F, {Ts0, Fs0}, RH) -> - {Ts1, Fs1} = fltr(Ts0, RH, F), + {Ts1, Fs1} = filter(Ts0, RH, F), {Ts1, Fs0 ++ Fs1}. +%% fltr_any/3 + fltr_any(F, {Ts0, Fs0}, RH) -> - {Ts1, Fs1} = fltr(Fs0, RH, F), + {Ts1, Fs1} = filter(Fs0, RH, F), {Ts0 ++ Ts1, Fs1}. +%% fltr_first/3 + +fltr_first(Peers, _, []) -> + {[], Peers}; + +fltr_first(Peers, RH, [F | Filters]) -> + case filter(Peers, RH, F) of + {[], _} -> + fltr_first(Peers, RH, Filters); + {_, _} = T -> + T + end. + %% caps_filter/3 caps_filter(#diameter_caps{origin_host = {_,OH}}, [_,DH], host) -> @@ -1582,8 +1819,8 @@ eq(Any, Id, PeerId) -> transports(#state{watchdogT = WatchdogT}) -> ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'}, - [{'is_pid', '$1'}], - ['$1']}]). + [{'is_pid', '$1'}], + ['$1']}]). %% --------------------------------------------------------------------------- %% # service_info/2 @@ -1636,7 +1873,7 @@ tagged_info(Item, S) undefined end; -tagged_info(TPid, #state{watchdogT = WatchdogT, peerT = PeerT}) +tagged_info(TPid, #state{watchdogT = WatchdogT, local = {PeerT, _, _}}) when is_pid(TPid) -> try [#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid), @@ -1786,7 +2023,7 @@ keys(connect = T, Opts) -> keys(_, _) -> [{listen, accept}]. -peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) -> +peer_dict(#state{watchdogT = WatchdogT, local = {PeerT, _, _}}, Dict0) -> try ets:tab2list(WatchdogT) of L -> lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L) catch diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl index 83e562e7fe..8c10464e98 100644 --- a/lib/diameter/src/base/diameter_stats.erl +++ b/lib/diameter/src/base/diameter_stats.erl @@ -25,9 +25,6 @@ -module(diameter_stats). -behaviour(gen_server). --compile({no_auto_import, [now/0]}). --import(diameter_lib, [now/0]). - -export([reg/2, reg/1, incr/3, incr/1, read/1, @@ -61,7 +58,7 @@ -define(SERVER, ?MODULE). %% Server state. --record(state, {id = now()}). +-record(state, {id = diameter_lib:now()}). -type counter() :: any(). -type ref() :: any(). @@ -143,9 +140,14 @@ read(Refs, B) -> L. to_refdict(L) -> - lists:foldl(fun({{C,R}, N}, D) -> orddict:append(R, {C,N}, D) end, - orddict:new(), - L). + lists:foldl(fun append/2, orddict:new(), L). + +%% Order both references and counters in the returned list. +append({{Ctr, Ref}, N}, Dict) -> + orddict:update(Ref, + fun(D) -> orddict:store(Ctr, N, D) end, + [{Ctr, N}], + Dict). %% --------------------------------------------------------------------------- %% # sum(Refs) @@ -221,7 +223,7 @@ uptime() -> %% ---------------------------------------------------------- init([]) -> - ets:new(?TABLE, [named_table, ordered_set, public]), + ets:new(?TABLE, [named_table, set, public, {write_concurrency, true}]), {ok, #state{}}. %% ---------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_sync.erl b/lib/diameter/src/base/diameter_sync.erl index 3309224399..7fb6888e21 100644 --- a/lib/diameter/src/base/diameter_sync.erl +++ b/lib/diameter/src/base/diameter_sync.erl @@ -28,9 +28,6 @@ -module(diameter_sync). -behaviour(gen_server). --compile({no_auto_import, [now/0]}). --import(diameter_lib, [now/0]). - -export([call/4, call/5, cast/4, cast/5, carp/1, carp/2]). @@ -73,7 +70,7 @@ %% Server state. -record(state, - {time = now(), + {time = diameter_lib:now(), pending = 0 :: non_neg_integer(), %% outstanding requests monitor = new() :: ets:tid(), %% MonitorRef -> {Name, From} queue = new() :: ets:tid()}). %% Name -> queue of {Pid, Ref} diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index 692a01e651..c169d3fc2c 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2013-2015. All Rights Reserved. +%% Copyright Ericsson AB 2013-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -41,7 +41,6 @@ -export([make_recvdata/1, peer_up/1, peer_down/1, - failover/1, pending/1]). %% towards ?MODULE @@ -80,6 +79,7 @@ apps :: [#diameter_app{}], sequence :: diameter:sequence(), codec :: [{string_decode, boolean()} + | {strict_mbit, boolean()} | {incoming_maxlen, diameter:message_length()}]}). %% Note that incoming_maxlen is currently handled in diameter_peer_fsm, %% so that any message exceeding the maximum is discarded. Retain the @@ -106,7 +106,8 @@ make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) -> sequence = Mask, codec = [T || {K,_} = T <- SvcOpts, lists:member(K, [string_decode, - incoming_maxlen])]}. + incoming_maxlen, + strict_mbit])]}. %% --------------------------------------------------------------------------- %% peer_up/1 @@ -167,7 +168,7 @@ incr_error(Dir, Id, TPid, _) -> incr_error(Dir, Id, TPid) -> incr(TPid, {Id, Dir, error}). - + %% --------------------------------------------------------------------------- %% incr_rc/4 %% --------------------------------------------------------------------------- @@ -1483,16 +1484,12 @@ send_R(Pkt0, caps = Caps, packet = Pkt0}, - try - incr(send, Pkt, TPid, AppDict), - TRef = send_request(TPid, Pkt, Req, SvcName, Timeout), - Pid ! Ref, %% tell caller a send has been attempted - handle_answer(SvcName, - App, - recv_A(Timeout, SvcName, App, Opts, {TRef, Req})) - after - erase_requests(Pkt) - end. + incr(send, Pkt, TPid, AppDict), + TRef = send_request(TPid, Pkt, Req, SvcName, Timeout), + Pid ! Ref, %% tell caller a send has been attempted + handle_answer(SvcName, + App, + recv_A(Timeout, SvcName, App, Opts, {TRef, Req})). %% recv_A/5 @@ -1568,6 +1565,8 @@ answer(Pkt, Req) -> a(Pkt, SvcName, ModX, AE, Req). +-spec a(_, _, _) -> no_return(). %% silence dialyzer + a(#diameter_packet{errors = Es} = Pkt, SvcName, @@ -1692,9 +1691,18 @@ encode(_, _, #diameter_packet{} = Pkt) -> send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout) when node() == node(TPid) -> - %% Store the outgoing request before sending to avoid a race with - %% reply reception. - TRef = store_request(TPid, Bin, Req, Timeout), + Seqs = diameter_codec:sequence_numbers(Bin), + TRef = erlang:start_timer(Timeout, self(), TPid), + Entry = {Seqs, Req, TRef}, + + %% Ensure that request table is cleaned even if we receive an exit + %% signal. An alternative would be to simply trap exits, but + %% callbacks are applied in this process, and these could possibly + %% be expecting the prevailing behaviour. + Self = self(), + spawn(fun() -> diameter_lib:wait([Self]), erase_request(Entry) end), + + store_request(Entry, TPid), send(TPid, Pkt), TRef; @@ -1709,31 +1717,21 @@ send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) -> %% send/1 send({TPid, Pkt, #request{handler = Pid} = Req0, SvcName, Timeout, TRef}) -> - Seqs = diameter_codec:sequence_numbers(Pkt), Req = Req0#request{handler = self()}, - Ref = send_request(TPid, Pkt, Req, SvcName, Timeout), - - try - recv(TPid, Pid, TRef, Ref) - after - %% Remove only the entry for this specific send since a resend - %% from the originating node can pick another transport on - %% this one. - ets:delete_object(?REQUEST_TABLE, {Seqs, Req, Ref}) - end. + recv(TPid, Pid, TRef, send_request(TPid, Pkt, Req, SvcName, Timeout)). %% recv/4 %% %% Relay an answer from a remote node. -recv(TPid, Pid, TRef, Ref) -> +recv(TPid, Pid, TRef, LocalTRef) -> receive {answer, _, _, _, _} = A -> Pid ! A; - {failover = T, Ref} -> + {failover = T, LocalTRef} -> Pid ! {T, TRef}; T -> - exit({timeout, Ref, TPid} = T) + exit({timeout, LocalTRef, TPid} = T) end. %% send/2 @@ -1810,17 +1808,21 @@ resend_request(Pkt0, TRef = send_request(TPid, Pkt, Req, SvcName, Tmo), {TRef, Req}. -%% store_request/4 +%% store_request/2 -store_request(TPid, Bin, Req, Timeout) -> - Seqs = diameter_codec:sequence_numbers(Bin), - TRef = erlang:start_timer(Timeout, self(), TPid), - ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}), +store_request(T, TPid) -> + ets:insert(?REQUEST_TABLE, T), ets:member(?REQUEST_TABLE, TPid) - orelse (self() ! {failover, TRef}), %% failover/1 may have missed - TRef. + orelse begin + {_Seqs, _Req, TRef} = T, + self() ! {failover, TRef} %% failover/1 may have missed + end. %% lookup_request/2 +%% +%% Note the match on both the key and transport pid. The latter is +%% necessary since the same Hop-by-Hop and End-to-End identifiers are +%% reused in the case of retransmission. lookup_request(Msg, TPid) -> Seqs = diameter_codec:sequence_numbers(Msg), @@ -1834,10 +1836,10 @@ lookup_request(Msg, TPid) -> false end. -%% erase_requests/1 +%% erase_request/1 -erase_requests(Pkt) -> - ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)). +erase_request(T) -> + ets:delete_object(?REQUEST_TABLE, T). %% match_requests/1 @@ -1860,10 +1862,10 @@ failover(TPid) when is_pid(TPid) -> lists:foreach(fun failover/1, match_requests(TPid)); %% Note that a request process can store its request after failover -%% notifications are sent here: store_request/4 sends the notification +%% notifications are sent here: store_request/2 sends the notification %% in that case. -%% Failover as a consequence of request_peer_down/1: inform the +%% Failover as a consequence of peer_down/1: inform the %% request process. failover({_, Req, TRef}) -> #request{handler = Pid, diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index c6d0a0b6ed..ea8b2fdb0e 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -66,7 +66,9 @@ %% end PCB parent = self() :: pid(), %% service process transport :: pid() | undefined, %% peer_fsm process - tref :: reference(), %% reference for current watchdog timer + tref :: reference() %% reference for current watchdog timer + | integer() %% monotonic time + | undefined, dictionary :: module(), %% common dictionary receive_data :: term(), %% term passed into diameter_service with incoming message @@ -94,7 +96,7 @@ start({_,_} = Type, T) -> Ack = make_ref(), {ok, Pid} = diameter_watchdog_sup:start_child({Ack, Type, self(), T}), try - {erlang:monitor(process, Pid), Pid} + {monitor(process, Pid), Pid} after send(Pid, Ack) end. @@ -121,7 +123,7 @@ i({Ack, T, Pid, {RecvData, #diameter_service{applications = Apps, capabilities = Caps} = Svc}}) -> - erlang:monitor(process, Pid), + monitor(process, Pid), wait(Ack, Pid), {_, Seed} = diameter_lib:seed(), random:seed(Seed), @@ -246,11 +248,16 @@ handle_info(T, #watchdog{} = State) -> event(T, State, S), %% before 'watchdog' {noreply, S}; stop -> - ?LOG(stop, T), + ?LOG(stop, truncate(T)), event(T, State, State#watchdog{status = down}), {stop, {shutdown, T}, State} end. +truncate({'DOWN' = T, _, process, Pid, _}) -> + {T, Pid}; +truncate(T) -> + T. + close({'DOWN', _, process, TPid, {shutdown, Reason}}, #watchdog{transport = TPid, parent = Pid}) -> @@ -447,11 +454,12 @@ transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) -> %% Current watchdog has timed out. transition({timeout, TRef, tw}, #watchdog{tref = TRef} = S) -> - set_watchdog(timeout(S)); + set_watchdog(0, timeout(S)); -%% Timer was canceled after message was already sent. -transition({timeout, _, tw}, #watchdog{}) -> - ok; +%% Message has arrived since the timer was started: subtract time +%% already elapsed from new timer. +transition({timeout, _, tw}, #watchdog{tref = T0} = S) -> + set_watchdog(diameter_lib:micro_diff(T0) div 1000, S); %% State query. transition({state, Pid}, #watchdog{status = S}) -> @@ -527,18 +535,27 @@ role() -> %% set_watchdog/1 -set_watchdog(#watchdog{tw = TwInit, - tref = TRef} - = S) -> - cancel(TRef), - S#watchdog{tref = erlang:start_timer(tw(TwInit), self(), tw)}; -set_watchdog(stop = No) -> - No. +%% Timer not yet set. +set_watchdog(#watchdog{tref = undefined} = S) -> + set_watchdog(0, S); -cancel(undefined) -> - ok; -cancel(TRef) -> - erlang:cancel_timer(TRef). +%% Timer already set: start at new one only at expiry. +set_watchdog(#watchdog{} = S) -> + S#watchdog{tref = diameter_lib:now()}. + +%% set_watchdog/2 + +set_watchdog(_, stop = No) -> + No; + +set_watchdog(Ms, #watchdog{tw = TwInit} = S) -> + S#watchdog{tref = erlang:start_timer(tw(TwInit, Ms), self(), tw)}. + +%% A callback could return anything, so ensure the result isn't +%% negative. Don't prevent abuse, even though the smallest valid +%% timeout is 4000. +tw(TwInit, Ms) -> + max(tw(TwInit) - Ms, 0). tw(T) when is_integer(T), T >= 6000 -> |