diff options
Diffstat (limited to 'lib/diameter/src/base/diameter_service.erl')
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 955 |
1 files changed, 685 insertions, 270 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 9dd8aafc61..ccf68f4d93 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -31,6 +32,7 @@ -export([subscribe/1, unsubscribe/1, services/0, + peer_info/1, info/2]). %% towards diameter_config @@ -82,16 +84,8 @@ -define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1 -define(RESTART_TC, 1000). %% if restart was this recent -%% Used to be able to swap this with anything else dict-like but now -%% rely on the fact that a service's #state{} record does not change -%% in storing in it ?STATE table and not always going through the -%% service process. In particular, rely on the fact that operations on -%% a ?Dict don't change the handle to it. --define(Dict, diameter_dict). - -%% Maintains state in a table. In contrast to previously, a service's -%% stat is not constant and is accessed outside of the service -%% process. +%% Maintain state in a table since a service's state is accessed +%% outside of the service process. -define(STATE_TABLE, ?MODULE). %% The default sequence mask. @@ -111,45 +105,49 @@ %% to determine whether or not we need to call the process for a %% pick_peer callback in the statefull case. -record(state, - {id = now(), + {id = diameter_lib:now(), service_name :: diameter:service_name(), %% key in ?STATE_TABLE service :: #diameter_service{}, watchdogT = ets_new(watchdogs) %% #watchdog{} at start :: ets:tid(), - peerT = ets_new(peers) %% #peer{pid = TPid} at okay/reopen - :: ets:tid(), - shared_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...] - :: ets:tid(), - local_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...] - :: ets:tid(), + peerT, %% undefined in new code, but remain for upgrade + shared_peers, %% reasons. Replaced by local/remote. + local_peers, %% + local :: {ets:tid(), ets:tid(), ets:tid()}, + remote :: {ets:tid(), ets:tid(), ets:tid()}, monitor = false :: false | pid(), %% process to die with options :: [{sequence, diameter:sequence()} %% sequence mask | {share_peers, diameter:remotes()} %% broadcast to | {use_shared_peers, diameter:remotes()} %% use from - | {restrict_connections, diameter:restriction()}]}). + | {restrict_connections, diameter:restriction()} + | {strict_mbit, boolean()} + | {string_decode, boolean()} + | {incoming_maxlen, diameter:message_length()}]}). %% shared_peers reflects the peers broadcast from remote nodes. %% Record representing an RFC 3539 watchdog process implemented by %% diameter_watchdog. -record(watchdog, - {pid :: match(pid()), + {pid :: match(pid()) | undefined, type :: match(connect | accept), ref :: match(reference()), %% key into diameter_config options :: match([diameter:transport_opt()]),%% from start_transport state = ?WD_INITIAL :: match(wd_state()), - started = now(), %% at process start + started = diameter_lib:now(),%% at process start peer = false :: match(boolean() | pid())}). - %% true at accepted, pid() at okay/reopen + %% true at accepted/remove, pid() at okay/reopen -%% Record representing an Peer State Machine processes implemented by +%% Record representing a Peer State Machine processes implemented by %% diameter_peer_fsm. -record(peer, - {pid :: pid(), - apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias} - caps :: #diameter_caps{}, - started = now(), %% at process start - watchdog :: pid()}). %% key into watchdogT + {pid :: pid(), + apps :: match([{0..16#FFFFFFFF, diameter:app_alias()}] %% {Id, Alias} + | [diameter:app_alias()]), %% remote + caps :: match(#diameter_caps{}), + started = diameter_lib:now(), %% at process start or sharing + watchdog :: match(pid() %% key into watchdogT + | undefined)}). %% undefined if remote %% --------------------------------------------------------------------------- %% # start/1 @@ -177,7 +175,7 @@ stop(SvcName) -> end. stop(ok, Pid) -> - MRef = erlang:monitor(process, Pid), + MRef = monitor(process, Pid), receive {'DOWN', MRef, process, _, _} -> ok end; stop(No, _) -> No. @@ -204,7 +202,7 @@ stop_transport(SvcName, [_|_] = Refs) -> info(SvcName, Item) -> case lookup_state(SvcName) of - [#state{} = S] -> + [S] -> service_info(Item, S); [] -> undefined @@ -213,7 +211,35 @@ info(SvcName, Item) -> %% lookup_state/1 lookup_state(SvcName) -> - ets:lookup(?STATE_TABLE, SvcName). + case ets:lookup(?STATE_TABLE, SvcName) of + [#state{}] = L -> + L; + _ -> + [] + end. + +%% --------------------------------------------------------------------------- +%% # peer_info/2 +%% --------------------------------------------------------------------------- + +%% An extended version of info_peer/1 for peer_info/1. +peer_info(Pid) -> + try + {_, PD} = process_info(Pid, dictionary), + {_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD), + {TPid, {{Type, Ref}, TMod, Cfg}} = T, + {_, TD} = process_info(TPid, dictionary), + {_, Data} = lists:keyfind({TMod, info}, 1, TD), + [{ref, Ref}, + {type, Type}, + {owner, TPid}, + {module, TMod}, + {config, Cfg} + | try TMod:info(Data) catch _:_ -> [] end] + catch + error:_ -> + [] + end. %% --------------------------------------------------------------------------- %% # subscribe/1 @@ -224,7 +250,7 @@ subscribe(SvcName) -> diameter_reg:add({?MODULE, subscriber, SvcName}). unsubscribe(SvcName) -> - diameter_reg:del({?MODULE, subscriber, SvcName}). + diameter_reg:remove({?MODULE, subscriber, SvcName}). subscriptions(Pat) -> pmap(diameter_reg:match({?MODULE, subscriber, Pat})). @@ -258,16 +284,22 @@ whois(SvcName) -> %% --------------------------------------------------------------------------- -spec pick_peer(SvcName, AppOrAlias, Opts) - -> {{TPid, Caps, App}, Mask} - | false - | {error, term()} + -> {{TPid, Caps, App}, Mask, SvcOpts} + | false %% no selection + | {error, no_service} when SvcName :: diameter:service_name(), - AppOrAlias :: {alias, diameter:app_alias()} | #diameter_app{}, - Opts :: tuple(), + AppOrAlias :: #diameter_app{} + | {alias, diameter:app_alias()}, + Opts :: {fun((Dict :: module()) -> [term()]), + diameter:peer_filter(), + Xtra :: list()}, TPid :: pid(), Caps :: #diameter_caps{}, App :: #diameter_app{}, - Mask :: diameter:sequence(). + Mask :: diameter:sequence(), + SvcOpts :: [diameter:service_opt()]. +%% Extract Mask in the returned tuple so that diameter_traffic doesn't +%% need to know about the ordering of SvcOpts used here. pick_peer(SvcName, App, Opts) -> pick(lookup_state(SvcName), App, Opts). @@ -284,10 +316,10 @@ pick(#state{service = #diameter_service{applications = Apps}} Opts) -> %% initial call from diameter:call/4 pick(S, find_outgoing_app(Alias, Apps), Opts); -pick(_, false, _) -> - false; +pick(_, false = No, _) -> + No; -pick(#state{options = [{_, Mask} | _]} +pick(#state{options = [{_, Mask} | SvcOpts]} = S, #diameter_app{module = ModX, dictionary = Dict} = App0, @@ -296,7 +328,7 @@ pick(#state{options = [{_, Mask} | _]} [_,_] = RealmAndHost = diameter_lib:eval([DestF, Dict]), case pick_peer(App, RealmAndHost, Filter, S) of {TPid, Caps} -> - {{TPid, Caps, App}, Mask}; + {{TPid, Caps, App}, Mask, SvcOpts}; false = No -> No end. @@ -480,6 +512,9 @@ transition({service, Pid}, S) -> transition({peer, TPid, Aliases, Caps}, S) -> remote_peer_up(TPid, Aliases, Caps, S), ok; +transition({peer, TPid}, S) -> + remote_peer_down(TPid, S), + ok; %% Remote peer process has died. transition({'DOWN', _, process, TPid, _}, S) -> @@ -499,9 +534,21 @@ transition(Req, S) -> %% # terminate/2 %% --------------------------------------------------------------------------- -terminate(Reason, #state{service_name = Name} = S) -> +terminate(Reason, #state{service_name = Name, local = {PeerT, _, _}} = S) -> send_event(Name, stop), ets:delete(?STATE_TABLE, Name), + + %% Communicate pending loss of any peers that connection_down/3 + %% won't. This is needed when stopping a service since we don't + %% wait for watchdog state changes to take care of if. That this + %% takes place after deleting the state entry ensures that the + %% resulting failover by request processes accomplishes nothing. + ets:foldl(fun(#peer{pid = TPid}, _) -> + diameter_traffic:peer_down(TPid) + end, + ok, + PeerT), + shutdown == Reason %% application shutdown andalso shutdown(application, S). @@ -509,23 +556,80 @@ terminate(Reason, #state{service_name = Name} = S) -> %% # code_change/3 %% --------------------------------------------------------------------------- -code_change(FromVsn, - #state{service_name = SvcName, - service = #diameter_service{applications = Apps}} - = S, - Extra) -> - lists:foreach(fun(A) -> - code_change(FromVsn, SvcName, Extra, A) - end, - Apps), +code_change(_FromVsn, #state{} = S, _Extra) -> + {ok, S}; + +%% Don't support downgrade since we won't in appup. +code_change({down = T, _}, _, _Extra) -> + {error, T}; + +%% Upgrade local/shared peers dicts populated in old code. Don't +code_change(_FromVsn, S0, _Extra) -> + {state, Id, SvcName, Svc, WT, PeerT, SDict, LDict, Monitor, Opts} + = S0, + + init_peers(LT = setelement(1, {PT, _, _} = init_peers(), PeerT), + fun({_,A}) -> A end), + init_peers(init_peers(RT = init_peers(), SDict), + fun(A) -> A end), + + S = #state{id = Id, + service_name = SvcName, + service = Svc, + watchdogT = WT, + peerT = PT, %% empty + shared_peers = SDict, + local_peers = LDict, + local = LT, + remote = RT, + monitor = Monitor, + options = Opts}, + + %% Replacing the table entry and deleting the old shared tables + %% can make outgoing requests return {error, no_connection} until + %% everyone is running new code. Don't delete the tables to avoid + %% crashing request processes. + ets:delete_all_objects(SDict), + ets:delete_all_objects(LDict), + ets:insert(?STATE_TABLE, S), {ok, S}. -code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) -> - {ok, S} = cb(A, code_change, [FromVsn, - mod_state(Alias), - Extra, - SvcName]), - mod_state(Alias, S). +%% init_peers/2 + +%% Populate app and identity bags from a new-style #peer{} sets. +init_peers({PeerT, _, _} = T, F) + when is_function(F) -> + ets:foldl(fun(#peer{pid = P, apps = As, caps = C}, N) -> + insert_peer(P, lists:map(F, As), C, T), + N+1 + end, + 0, + PeerT); + +%% Populate #peer{} table given a shared peers dict. +init_peers({PeerT, _, _}, SDict) -> + dict:fold(fun(P, As, N) -> + ets:update_element(PeerT, P, {#peer.apps, As}), + N+1 + end, + 0, + diameter_dict:fold(fun(A, Ps, D) -> + init_peers(A, Ps, PeerT, D) + end, + dict:new(), + SDict)). + +%% init_peers/4 + +init_peers(App, TCs, PeerT, Dict) -> + lists:foldl(fun({P,C}, D) -> + ets:insert(PeerT, #peer{pid = P, + apps = [], + caps = C}), + dict:append(P, App, D) + end, + Dict, + TCs). %% =========================================================================== %% =========================================================================== @@ -533,9 +637,6 @@ code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) -> unexpected(F, A, #state{service_name = Name}) -> ?UNEXPECTED(F, A ++ [Name]). -cb(#diameter_app{module = [_|_] = M}, F, A) -> - eval(M, F, A). - eval([M|X], F, A) -> apply(M, F, A ++ X). @@ -555,10 +656,6 @@ choose(false, _, X) -> X. ets_new(Tbl) -> ets:new(Tbl, [{keypos, 2}]). -insert(Tbl, Rec) -> - ets:insert(Tbl, Rec), - Rec. - %% Using the process dictionary for the callback state was initially %% just a way to make what was horrendous trace (big state record and %% much else everywhere) somewhat more readable. There's not as much @@ -579,27 +676,37 @@ mod_state(Alias, ModS) -> %% remove_transport shutdown(Refs, #state{watchdogT = WatchdogT}) when is_list(Refs) -> - ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, WatchdogT); + ets:insert(WatchdogT, ets:foldl(fun(R,A) -> st(R, Refs, A) end, + [], + WatchdogT)); %% application/service shutdown shutdown(Reason, #state{watchdogT = WatchdogT}) when Reason == application; Reason == service -> - diameter_lib:wait(ets:foldl(fun(P,A) -> st(P, Reason, A) end, + diameter_lib:wait(ets:foldl(fun(P,A) -> ss(P, Reason, A) end, [], WatchdogT)). -%% st/2 +%% st/3 -st(#watchdog{ref = Ref, pid = Pid}, Refs) -> - lists:member(Ref, Refs) - andalso (Pid ! {shutdown, self(), transport}). %% 'DOWN' cleans up +%% Mark replacement as started so that a subsequent accept doesn't +%% result in a new process that isn't terminated. +st(#watchdog{ref = Ref, pid = Pid, peer = P} = Rec, Refs, Acc) -> + case lists:member(Ref, Refs) of + true -> + Pid ! {shutdown, self(), transport}, %% 'DOWN' cleans up + [Rec#watchdog{peer = true} || P == false] ++ Acc; + false -> + Acc + end. -%% st/3 +%% ss/3 -st(#watchdog{pid = Pid}, Reason, Acc) -> +ss(#watchdog{pid = Pid}, Reason, Acc) -> + MRef = monitor(process, Pid), Pid ! {shutdown, self(), Reason}, - [Pid | Acc]. + [MRef | Acc]. %% --------------------------------------------------------------------------- %% # call_service/2 @@ -658,6 +765,8 @@ cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts}, lists:foreach(fun init_mod/1, Apps), S = #state{service_name = SvcName, service = Rec#diameter_service{pid = self()}, + local = init_peers(), + remote = init_peers(), monitor = mref(get_value(monitor, Opts)), options = service_options(Opts)}, {S, Acc}; @@ -667,19 +776,30 @@ cfg_acc({_Ref, Type, _Opts} = T, {S, Acc}) Type == listen -> {S, [T | Acc]}. +init_peers() -> + {ets_new(caps), %% #peer{} + ets:new(apps, [bag]), %% {Alias, TPid} + ets:new(idents, [bag])}. %% {{host, OH} | {realm, OR} | {OR, OH}, + %% Alias, + %% TPid} + service_options(Opts) -> [{sequence, proplists:get_value(sequence, Opts, ?NOMASK)}, {share_peers, get_value(share_peers, Opts)}, {use_shared_peers, get_value(use_shared_peers, Opts)}, {restrict_connections, proplists:get_value(restrict_connections, Opts, - ?RESTRICT)}]. + ?RESTRICT)}, + {spawn_opt, proplists:get_value(spawn_opt, Opts, [])}, + {string_decode, proplists:get_value(string_decode, Opts, true)}, + {incoming_maxlen, proplists:get_value(incoming_maxlen, Opts, 16#FFFFFF)}, + {strict_mbit, proplists:get_value(strict_mbit, Opts, true)}]. %% The order of options is significant since we match against the list. mref(false = No) -> No; mref(P) -> - erlang:monitor(process, P). + monitor(process, P). init_shared(#state{options = [_, _, {_,T} | _], service_name = Svc}) -> @@ -700,8 +820,7 @@ notify(Share, SvcName, T) -> Nodes = remotes(Share), [] /= Nodes andalso diameter_peer:notify(Nodes, SvcName, T). %% Test for the empty list for upgrade reasons: there's no -%% diameter_peer:notify/3 in old code so no call means no load order -%% requirement. +%% diameter_peer:notify/3 in old code. remotes(false) -> []; @@ -719,14 +838,27 @@ remotes(F) -> L when is_list(L) -> L; T -> - diameter_lib:error_report({invalid_return, T}, F), + ?LOG(invalid_return, {F,T}), + error_report(invalid_return, share_peers, F), [] catch E:R -> - diameter_lib:error_report({failure, {E, R, ?STACK}}, F), + ?LOG(failure, {E, R, F, diameter_lib:get_stacktrace()}), + error_report(failure, share_peers, F), [] end. +%% error_report/3 + +error_report(T, What, F) -> + Reason = io_lib:format("~s from ~p callback", [reason(T), What]), + diameter_lib:error_report(Reason, diameter_lib:eval_name(F)). + +reason(invalid_return) -> + "invalid return"; +reason(failure) -> + "failure". + %% --------------------------------------------------------------------------- %% # start/3 %% --------------------------------------------------------------------------- @@ -740,8 +872,9 @@ remotes(F) -> start(Ref, {T, Opts}, S) when T == connect; T == listen -> + N = proplists:get_value(pool_size, Opts, 1), try - {ok, start(Ref, type(T), Opts, S)} + {ok, start(Ref, type(T), Opts, N, S)} catch ?FAILURE(Reason) -> {error, Reason} @@ -759,26 +892,44 @@ type(connect = T) -> T. %% start/4 -start(Ref, Type, Opts, #state{watchdogT = WatchdogT, - peerT = PeerT, - options = SvcOpts, - service_name = SvcName, - service = Svc0}) +start(Ref, Type, Opts, State) -> + start(Ref, Type, Opts, 1, State). + +%% start/5 + +start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT, + local = {PeerT, _, _}, + options = SvcOpts, + service_name = SvcName, + service = Svc0}) when Type == connect; Type == accept -> #diameter_service{applications = Apps} - = Svc + = Svc1 = merge_service(Opts, Svc0), - {_,_} = Mask = proplists:get_value(sequence, SvcOpts), - RecvData = diameter_traffic:make_recvdata([SvcName, PeerT, Apps, Mask]), - Pid = s(Type, Ref, {{spawn_opts([Opts, SvcOpts]), RecvData}, - Opts, - SvcOpts, - Svc}), - insert(WatchdogT, #watchdog{pid = Pid, - type = Type, - ref = Ref, - options = Opts}), + Svc = binary_caps(Svc1, proplists:get_value(string_decode, SvcOpts, true)), + RecvData = diameter_traffic:make_recvdata([SvcName, + PeerT, + Apps, + SvcOpts]), + T = {{spawn_opts([Opts, SvcOpts]), RecvData}, Opts, SvcOpts, Svc}, + Rec = #watchdog{type = Type, + ref = Ref, + options = Opts}, + diameter_lib:fold_n(fun(_,A) -> + [wd(Type, Ref, T, WatchdogT, Rec) | A] + end, + [], + N). + +binary_caps(Svc, true) -> + Svc; +binary_caps(#diameter_service{capabilities = Caps} = Svc, false) -> + Svc#diameter_service{capabilities = diameter_capx:binary_caps(Caps)}. + +wd(Type, Ref, T, WatchdogT, Rec) -> + Pid = start_watchdog(Type, Ref, T), + ets:insert(WatchdogT, Rec#watchdog{pid = Pid}), Pid. %% Note that the service record passed into the watchdog is the merged @@ -791,7 +942,7 @@ spawn_opts(Optss) -> T /= link, T /= monitor]. -s(Type, Ref, T) -> +start_watchdog(Type, Ref, T) -> {_MRef, Pid} = diameter_watchdog:start({Type, Ref}, T), Pid. @@ -812,7 +963,7 @@ ms({applications, As}, #diameter_service{applications = Apps} = S) %% The fact that all capabilities can be configured on the transports %% means that the service doesn't necessarily represent a single -%% locally implemented Diameter peer as identified by Origin-Host: a +%% locally implemented Diameter node as identified by Origin-Host: a %% transport can configure its own Origin-Host. This means that the %% service little more than a placeholder for default capabilities %% plus a list of applications that individual transports can choose @@ -832,11 +983,22 @@ ms(_, Svc) -> %% --------------------------------------------------------------------------- accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) -> - #watchdog{ref = Ref, type = accept = T, peer = false, options = Opts} + #watchdog{type = accept = T, peer = P} = Wd = fetch(WatchdogT, Pid), - insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement as started - start(Ref, T, Opts, S). %% start new watchdog + if not P -> + #watchdog{ref = Ref, options = Opts} = Wd, + %% Mark replacement started, and start new watchdog. + ets:insert(WatchdogT, Wd#watchdog{peer = true}), + start(Ref, T, Opts, S); + P -> + %% Transport removal in progress: true has been set in + %% shutdown/2, and the transport will die as a + %% consequence. + ok + end. + +%% fetch/2 fetch(Tid, Key) -> [T] = ets:lookup(Tid, Key), @@ -862,14 +1024,15 @@ watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) -> #watchdog{peer = TPid} = Wd, %% assert connection_up(Wd, State); -%% Watchdog has an unresponsive connection. +%% Watchdog has an unresponsive connection. Note that the peer table +%% entry isn't removed until DOWN. watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) -> #watchdog{peer = TPid} = Wd, %% assert watchdog_down(Wd, To, State); %% Watchdog has lost its connection. -watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) -> - close(Wd, S), +watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{local = {PeerT, _, _}} = S) -> + close(Wd), watchdog_down(Wd, To, S), ets:delete(PeerT, TPid); @@ -877,7 +1040,7 @@ watchdog(_, [], _, _, _, _) -> ok. watchdog_down(Wd, To, #state{watchdogT = WatchdogT} = S) -> - insert(WatchdogT, Wd#watchdog{state = To}), + ets:insert(WatchdogT, Wd#watchdog{state = To}), connection_down(Wd, To, S). %% --------------------------------------------------------------------------- @@ -889,14 +1052,14 @@ watchdog_down(Wd, To, #state{watchdogT = WatchdogT} = S) -> connection_up({TPid, {Caps, SupportedApps, Pkt}}, #watchdog{pid = Pid} = Wd, - #state{peerT = PeerT} + #state{local = {PeerT, _, _}} = S) -> - Pr = #peer{pid = TPid, - apps = SupportedApps, - caps = Caps, - watchdog = Pid}, - insert(PeerT, Pr), - connection_up([Pkt], Wd#watchdog{peer = TPid}, Pr, S). + Rec = #peer{pid = TPid, + apps = SupportedApps, + caps = Caps, + watchdog = Pid}, + ets:insert(PeerT, Rec), + connection_up([Pkt], Wd#watchdog{peer = TPid}, Rec, S). %% --------------------------------------------------------------------------- %% # reopen/3 @@ -906,22 +1069,23 @@ reopen({TPid, {Caps, SupportedApps, _Pkt}}, #watchdog{pid = Pid} = Wd, #state{watchdogT = WatchdogT, - peerT = PeerT}) -> - insert(PeerT, #peer{pid = TPid, - apps = SupportedApps, - caps = Caps, - watchdog = Pid}), - insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN, - peer = TPid}). + local = {PeerT, _, _}}) -> + ets:insert(PeerT, #peer{pid = TPid, + apps = SupportedApps, + caps = Caps, + watchdog = Pid}), + ets:insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN, + peer = TPid}). %% --------------------------------------------------------------------------- %% # connection_up/2 %% --------------------------------------------------------------------------- -%% Watchdog has recovered as suspect connection. Note that there has +%% Watchdog has recovered a suspect connection. Note that there has %% been no new capabilties exchange in this case. -connection_up(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> +connection_up(#watchdog{peer = TPid} = Wd, #state{local = {PeerT, _, _}} + = S) -> connection_up([], Wd, fetch(PeerT, TPid), S). %% connection_up/4 @@ -932,23 +1096,23 @@ connection_up(Extra, #peer{apps = SApps, caps = Caps} = Pr, #state{watchdogT = WatchdogT, - local_peers = LDict, + local = LT, service_name = SvcName, service = #diameter_service{applications = Apps}} = S) -> - insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}), + ets:insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}), diameter_traffic:peer_up(TPid), - insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), + local_peer_up(SApps, {TPid, Caps}, {SvcName, Apps}, LT), report_status(up, Wd, Pr, S, Extra). -insert_local_peer(SApps, T, LDict) -> - lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps). +local_peer_up(SApps, {TPid, Caps} = TC, SA, LT) -> + insert_peer(TPid, [A || {_,A} <- SApps], Caps, LT), + lists:foreach(fun(A) -> peer_up(A, TC, SA) end, SApps). -ilp({Id, Alias}, {TC, SA}, LDict) -> - init_conn(Id, Alias, TC, SA), - ?Dict:append(Alias, TC, LDict). +peer_up({Id, Alias}, TC, SA) -> + peer_up(Id, Alias, TC, SA). -init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) -> +peer_up(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) -> #diameter_app{id = Id} %% assert = App = find_app(Alias, Apps), @@ -1026,8 +1190,11 @@ peer_cb(App, F, A) -> true catch E:R -> - diameter_lib:error_report({failure, {E, R, ?STACK}}, - {App, F, A}), + %% Don't include arguments since a #diameter_caps{} strings + %% from the peer, which could be anything (especially, large). + [Mod|X] = App#diameter_app.module, + ?LOG(failure, {E, R, Mod, F, diameter_lib:get_stacktrace()}), + error_report(failure, F, {Mod, F, A ++ X}), false end. @@ -1043,17 +1210,17 @@ connection_down(#watchdog{state = ?WD_OKAY, = Pr, #state{service_name = SvcName, service = #diameter_service{applications = Apps}, - local_peers = LDict} + local = LT} = S) -> report_status(down, Wd, Pr, S, []), - remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), + local_peer_down(SApps, {TPid, Caps}, {SvcName, Apps}, LT), diameter_traffic:peer_down(TPid); connection_down(#watchdog{state = ?WD_OKAY, peer = TPid} = Wd, To, - #state{peerT = PeerT} + #state{local = {PeerT, _, _}} = S) when is_atom(To) -> connection_down(Wd, #peer{} = fetch(PeerT, TPid), S); @@ -1061,15 +1228,14 @@ connection_down(#watchdog{state = ?WD_OKAY, connection_down(#watchdog{}, _, _) -> ok. -remove_local_peer(SApps, T, LDict) -> - lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). +local_peer_down(SApps, {TPid, _Caps} = TC, SA, LT) -> + delete_peer(TPid, LT), + lists:foreach(fun(A) -> peer_down(A, TC, SA) end, SApps). -rlp({Id, Alias}, {TC, SA}, LDict) -> - L = ?Dict:fetch(Alias, LDict), - down_conn(Id, Alias, TC, SA), - ?Dict:store(Alias, lists:delete(TC, L), LDict). +peer_down({Id, Alias}, TC, SA) -> + peer_down(Id, Alias, TC, SA). -down_conn(Id, Alias, TC, {SvcName, Apps}) -> +peer_down(Id, Alias, TC, {SvcName, Apps}) -> #diameter_app{id = Id} %% assert = App = find_app(Alias, Apps), @@ -1094,7 +1260,7 @@ wd_down(#watchdog{peer = B}, _) ok; %% ... or maybe it has. -wd_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> +wd_down(#watchdog{peer = TPid} = Wd, #state{local = {PeerT, _, _}} = S) -> connection_down(Wd, ?WD_DOWN, S), ets:delete(PeerT, TPid). @@ -1142,15 +1308,22 @@ q_restart(false, _) -> %% communicate. default_tc(connect, Opts) -> - proplists:get_value(reconnect_timer, Opts, ?DEFAULT_TC); + connect_timer(Opts, ?DEFAULT_TC); default_tc(accept, _) -> 0. +%% Accept both connect_timer and the (older) reconnect_timer, the +%% latter being a remnant from a time in which the timer did apply to +%% reconnect attempts. +connect_timer(Opts, Def0) -> + Def = proplists:get_value(reconnect_timer, Opts, Def0), + proplists:get_value(connect_timer, Opts, Def). + %% Bound tc below if the watchdog was restarted recently to avoid %% continuous restarted in case of faulty config or other problems. tc(Time, Tc) -> choose(Tc > ?RESTART_TC - orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC, + orelse diameter_lib:micro_diff(Time) > 1000*?RESTART_TC, Tc, ?RESTART_TC). @@ -1164,8 +1337,7 @@ start_tc(Tc, T, _) -> tc_timeout({Ref, _Type, _Opts} = T, #state{service_name = SvcName} = S) -> tc(diameter_config:have_transport(SvcName, Ref), T, S). -tc(true, {Ref, Type, Opts}, #state{service_name = SvcName} - = S) -> +tc(true, {Ref, Type, Opts}, #state{service_name = SvcName} = S) -> send_event(SvcName, {reconnect, Ref, Opts}), start(Ref, Type, Opts, S); tc(false = No, _, _) -> %% removed @@ -1180,29 +1352,19 @@ tc(false = No, _, _) -> %% removed %% another watchdog to be able to detect that it should transition %% from initial into reopen rather than okay. That someone is either %% the accepting watchdog upon reception of a CER from the previously -%% connected peer, or us after reconnect_timer timeout. +%% connected peer, or us after connect_timer timeout or immediately. -close(#watchdog{type = connect}, _) -> +close(#watchdog{type = connect}) -> ok; + close(#watchdog{type = accept, pid = Pid, - ref = Ref, - options = Opts}, - #state{service_name = SvcName}) -> - c(Pid, diameter_config:have_transport(SvcName, Ref), Opts). - -%% Tell watchdog to (maybe) die later ... -c(Pid, true, Opts) -> - Tc = proplists:get_value(reconnect_timer, Opts, 2*?DEFAULT_TC), - erlang:send_after(Tc, Pid, close); - -%% ... or now. -c(Pid, false, _Opts) -> - Pid ! close. - -%% The RFC's only document the behaviour of Tc, our reconnect_timer, + options = Opts}) -> + Tc = connect_timer(Opts, 2*?DEFAULT_TC), + erlang:send_after(Tc, Pid, close). +%% The RFC's only document the behaviour of Tc, our connect_timer, %% for the establishment of connections but we also give -%% reconnect_timer semantics for a listener, being the time within +%% connect_timer semantics for a listener, being the time within %% which a new connection attempt is expected of a connecting peer. %% The value should be greater than the peer's Tc + jitter. @@ -1253,13 +1415,14 @@ cm([#diameter_app{alias = Alias} = App], Req, From, Svc) -> mod_state(Alias, ModS), {T, RC}; T -> - diameter_lib:error_report({invalid, T}, - {App, handle_call, Args}), + ModX = App#diameter_app.module, + ?LOG(invalid_return, {ModX, handle_call, Args, T}), invalid catch E: Reason -> - diameter_lib:error_report({failure, {E, Reason, ?STACK}}, - {App, handle_call, Args}), + ModX = App#diameter_app.module, + Stack = diameter_lib:get_stacktrace(), + ?LOG(failure, {E, Reason, ModX, handle_call, Stack}), failure end; @@ -1304,19 +1467,37 @@ share_peer(up, Caps, Apps, TPid, #state{options = [_, {_,T} | _], service_name = Svc}) -> notify(T, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps}); -share_peer(_, _, _, _, _) -> - ok. +share_peer(down, _Caps, _Apps, TPid, #state{options = [_, {_,T} | _], + service_name = Svc}) -> + notify(T, Svc, {peer, TPid}). %% --------------------------------------------------------------------------- %% # share_peers/2 %% --------------------------------------------------------------------------- -share_peers(Pid, #state{options = [_, {_,T} | _], local_peers = PDict}) -> - is_remote(Pid, T) - andalso ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict). - -sp(Pid, Alias, Peers) -> - lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers). +share_peers(Pid, #state{options = [_, {_,SP} | _], + local = {PeerT, AppT, _}}) -> + is_remote(Pid, SP) + andalso ets:foldl(fun(T, N) -> N + sp(Pid, AppT, T) end, + 0, + PeerT). + +%% An entry in the peer table doesn't mean the watchdog state is OKAY, +%% an entry in the app table does. + +sp(Pid, AppT, #peer{pid = TPid, + apps = [{_, Alias} | _] = Apps, + caps = Caps}) -> + Spec = [{{'$1', TPid}, + [{'==', '$1', {const, Alias}}], + ['$_']}], + case ets:select(AppT, Spec, 1) of + '$end_of_table' -> + 0; + _ -> + Pid ! {peer, TPid, [A || {_,A} <- Apps], Caps}, + 1 + end. is_remote(Pid, T) -> Node = node(Pid), @@ -1326,32 +1507,49 @@ is_remote(Pid, T) -> %% # remote_peer_up/4 %% --------------------------------------------------------------------------- -remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) -> - is_remote(Pid, T) - andalso rpu(Pid, Aliases, Caps, S). +remote_peer_up(TPid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) -> + is_remote(TPid, T) andalso rpu(TPid, Aliases, Caps, S). -rpu(Pid, Aliases, Caps, #state{service = Svc, shared_peers = PDict}) -> +rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) -> #diameter_service{applications = Apps} = Svc, Key = #diameter_app.alias, F = fun(A) -> lists:keymember(A, Key, Apps) end, - rpu(Pid, lists:filter(F, Aliases), Caps, PDict); + rpu(TPid, lists:filter(F, Aliases), Caps, RT); rpu(_, [] = No, _, _) -> No; -rpu(Pid, Aliases, Caps, PDict) -> - erlang:monitor(process, Pid), - T = {Pid, Caps}, - lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases). +rpu(TPid, Aliases, Caps, {PeerT, _, _} = RT) -> + monitor(process, TPid), + ets:insert(PeerT, #peer{pid = TPid, + apps = Aliases, + caps = Caps}), + insert_peer(TPid, Aliases, Caps, RT). + +%% insert_peer/4 + +insert_peer(TPid, Aliases, Caps, {_PeerT, AppT, IdentT}) -> + #diameter_caps{origin_host = {_, OH}, + origin_realm = {_, OR}} + = Caps, + ets:insert(AppT, [{A, TPid} || A <- Aliases]), + H = iolist_to_binary(OH), + R = iolist_to_binary(OR), + ets:insert(IdentT, [{T, A, TPid} || T <- [{host, H}, {realm, R}, {R, H}], + A <- Aliases]). %% --------------------------------------------------------------------------- %% # remote_peer_down/2 %% --------------------------------------------------------------------------- -remote_peer_down(Pid, #state{shared_peers = PDict}) -> - lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)). +remote_peer_down(TPid, #state{remote = {PeerT, _, _} = RT}) -> + ets:delete(PeerT, TPid), + delete_peer(TPid, RT). -rpd(Pid, Alias, PDict) -> - ?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict). +%% delete_peer/2 + +delete_peer(TPid, {_PeerT, AppT, IdentT}) -> + ets:select_delete(AppT, [{{'_', TPid}, [], [true]}]), + ets:select_delete(IdentT, [{{'_', '_', TPid}, [], [true]}]). %% --------------------------------------------------------------------------- %% pick_peer/4 @@ -1361,12 +1559,12 @@ pick_peer(#diameter_app{alias = Alias} = App, RealmAndHost, Filter, - #state{local_peers = L, - shared_peers = S, + #state{local = LT, + remote = RT, service_name = SvcName, service = #diameter_service{pid = Pid}}) -> - pick_peer(peers(Alias, RealmAndHost, Filter, L), - peers(Alias, RealmAndHost, Filter, S), + pick_peer(peers(Alias, RealmAndHost, Filter, LT), + peers(Alias, RealmAndHost, Filter, RT), Pid, SvcName, App). @@ -1382,6 +1580,8 @@ pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App) case call_service(Pid, {pick_peer, Local, Remote, App}) of {TPid, _} = T when is_pid(TPid) -> T; + false = No -> + No; {error, _} -> false end; @@ -1415,56 +1615,211 @@ pick_peer(Local, T; %% Accept returned state in the immutable {false = No, S} -> %% case as long it isn't changed. No; - T -> - diameter_lib:error_report({invalid, T, App}, - {App, pick_peer, Args}) + T when M -> + ModX = App#diameter_app.module, + ?LOG(invalid_return, {ModX, pick_peer, T}), + false catch - E: Reason -> - diameter_lib:error_report({failure, {E, Reason, ?STACK}}, - {App, pick_peer, Args}) + E: Reason when M -> + ModX = App#diameter_app.module, + Stack = diameter_lib:get_stacktrace(), + ?LOG(failure, {E, Reason, ModX, pick_peer, Stack}), + false end. %% peers/4 -peers(Alias, RH, Filter, Peers) -> - case ?Dict:find(Alias, Peers) of - {ok, L} -> - ps(L, RH, Filter, {[],[]}); - error -> +peers(Alias, RH, Filter, T) -> + filter(Alias, RH, Filter, T, true). + +%% filter/5 +%% +%% Try to limit the peers list by starting with a host/realm lookup. + +filter(Alias, RH, {neg, F}, T, B) -> + filter(Alias, RH, F, T, not B); + +filter(_, _, none, _, false) -> + []; + +filter(Alias, _, none, T, true) -> + all_peers(Alias, T); + +filter(Alias, [DR,DH] = RH, K, T, B) + when K == realm, DR == undefined; + K == host, DH == undefined -> + filter(Alias, RH, none, T, B); + +filter(Alias, [DR,_] = RH, realm = K, T, B) -> + filter(Alias, RH, {K, DR}, T, B); + +filter(Alias, [_,DH] = RH, host = K, T, B) -> + filter(Alias, RH, {K, DH}, T, B); + +filter(Alias, _, {K, D}, {PeerT, _AppT, IdentT}, true) + when K == host; + K == realm -> + try iolist_to_binary(D) of + B -> + caps(PeerT, ets:select(IdentT, [{{{K, B}, '$1', '$2'}, + [{'==', '$1', {const, Alias}}], + ['$2']}])) + catch + error:_ -> [] - end. + end; -%% Place a peer whose Destination-Host/Realm matches those of the -%% request at the front of the result list. Could add some sort of -%% 'sort' option to allow more control. - -ps([], _, _, {Ys, Ns}) -> - lists:reverse(Ys, Ns); -ps([{_TPid, #diameter_caps{} = Caps} = TC | Rest], RH, Filter, Acc) -> - ps(Rest, RH, Filter, pacc(caps_filter(Caps, RH, Filter), - caps_filter(Caps, RH, {all, [host, realm]}), - TC, - Acc)). - -pacc(true, true, Peer, {Ts, Fs}) -> - {[Peer|Ts], Fs}; -pacc(true, false, Peer, {Ts, Fs}) -> - {Ts, [Peer|Fs]}; -pacc(_, _, _, Acc) -> - Acc. +filter(Alias, RH, {all, Filters}, T, B) + when is_list(Filters) -> + fltr_all(Alias, RH, Filters, T, B); + +filter(Alias, RH, {first, Filters}, T, B) + when is_list(Filters) -> + fltr_first(Alias, RH, Filters, T, B); + +filter(Alias, RH, Filter, T, B) -> + {Ts, Fs} = filter(all_peers(Alias, T), RH, Filter), + choose(B, Ts, Fs). + +%% fltr_all/5 + +fltr_all(Alias, RH, [{K, any} | Filters], T, B) + when K == host; + K == realm -> + fltr_all(Alias, RH, Filters, T, B); + +fltr_all(Alias, RH, [{host, _} = H, {realm, _} = R | Filters], T, B) -> + fltr_all(Alias, RH, [R, H | Filters], T, B); + +fltr_all(Alias, RH, [{realm, _} = R, {host, any} | Filters], T, B) -> + fltr_all(Alias, RH, [R | Filters], T, B); + +fltr_all(Alias, RH, [{realm, OR}, {host, OH} | Filters], T, true) -> + {PeerT, _AppT, IdentT} = T, + try {iolist_to_binary(OR), iolist_to_binary(OH)} of + BT -> + Peers = caps(PeerT, + ets:select(IdentT, [{{BT, '$1', '$2'}, + [{'==', '$1', {const, Alias}}], + ['$2']}])), + {Ts, _} = filter(Peers, RH, {all, Filters}), + Ts + catch + error:_ -> + [] + end; -%% caps_filter/3 +fltr_all(Alias, [undefined,_] = RH, [realm | Filters], T, B) -> + fltr_all(Alias, RH, Filters, T, B); + +fltr_all(Alias, [DR,_] = RH, [realm | Filters], T, B) -> + fltr_all(Alias, RH, [{realm, DR} | Filters], T, B); + +fltr_all(Alias, [_,undefined] = RH, [host | Filters], T, B) -> + fltr_all(Alias, RH, Filters, T, B); + +fltr_all(Alias, [_,DH] = RH, [host | Filters], T, B) -> + fltr_all(Alias, RH, [{host, DH} | Filters], T, B); + +fltr_all(Alias, RH, [{K, _} = KT, KA | Filters], T, B) + when K == host, KA == realm; + K == realm, KA == host -> + fltr_all(Alias, RH, [KA, KT | Filters], T, B); + +fltr_all(Alias, RH, [F | Filters], T, B) -> + {Ts, Fs} = filter(filter(Alias, RH, F, T, B), RH, {all, Filters}), + choose(B, Ts, Fs); -caps_filter(C, RH, {neg, F}) -> - not caps_filter(C, RH, F); +fltr_all(Alias, RH, [], T, B) -> + filter(Alias, RH, none, T, B). -caps_filter(C, RH, {all, L}) +%% fltr_first/5 +%% +%% Like any, but stop at the first filter with any matches. + +fltr_first(Alias, RH, [F | Filters], T, B) -> + case filter(Alias, RH, F, T, B) of + [] -> + fltr_first(Alias, RH, Filters, T, B); + [_|_] = Ts -> + Ts + end; + +fltr_first(Alias, RH, [], T, B) -> + filter(Alias, RH, none, T, not B). + +%% all_peers/2 + +all_peers(Alias, {PeerT, AppT, _}) -> + ets:select(PeerT, [{#peer{pid = P, caps = '$1', _ = '_'}, + [], + [{{P, '$1'}}]} + || {_,P} <- ets:lookup(AppT, Alias)]). + +%% caps/2 + +caps(PeerT, Pids) -> + ets:select(PeerT, [{#peer{pid = P, caps = '$1', _ = '_'}, + [], + [{{P, '$1'}}]} + || P <- Pids]). + +%% filter/3 +%% +%% Return peers in match order. + +filter(Peers, _, none) -> + {Peers, []}; + +filter(Peers, RH, {neg, F}) -> + {Ts, Fs} = filter(Peers, RH, F), + {Fs, Ts}; + +filter(Peers, RH, {all, L}) + when is_list(L) -> + lists:foldl(fun(F,A) -> fltr_all(F, A, RH) end, + {Peers, []}, + L); + +filter(Peers, RH, {any, L}) when is_list(L) -> - lists:all(fun(F) -> caps_filter(C, RH, F) end, L); + lists:foldl(fun(F,A) -> fltr_any(F, A, RH) end, + {[], Peers}, + L); -caps_filter(C, RH, {any, L}) +filter(Peers, RH, {first, L}) when is_list(L) -> - lists:any(fun(F) -> caps_filter(C, RH, F) end, L); + fltr_first(Peers, RH, L); + +filter(Peers, RH, F) -> + lists:partition(fun({_,C}) -> caps_filter(C, RH, F) end, Peers). + +%% fltr_all/3 + +fltr_all(F, {Ts0, Fs0}, RH) -> + {Ts1, Fs1} = filter(Ts0, RH, F), + {Ts1, Fs0 ++ Fs1}. + +%% fltr_any/3 + +fltr_any(F, {Ts0, Fs0}, RH) -> + {Ts1, Fs1} = filter(Fs0, RH, F), + {Ts0 ++ Ts1, Fs1}. + +%% fltr_first/3 + +fltr_first(Peers, _, []) -> + {[], Peers}; + +fltr_first(Peers, RH, [F | Filters]) -> + case filter(Peers, RH, F) of + {[], _} -> + fltr_first(Peers, RH, Filters); + {_, _} = T -> + T + end. + +%% caps_filter/3 caps_filter(#diameter_caps{origin_host = {_,OH}}, [_,DH], host) -> eq(undefined, DH, OH); @@ -1477,9 +1832,6 @@ caps_filter(C, _, Filter) -> %% caps_filter/2 -caps_filter(_, none) -> - true; - caps_filter(#diameter_caps{origin_host = {_,OH}}, {host, H}) -> eq(any, H, OH); @@ -1510,8 +1862,8 @@ eq(Any, Id, PeerId) -> transports(#state{watchdogT = WatchdogT}) -> ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'}, - [{'is_pid', '$1'}], - ['$1']}]). + [{'is_pid', '$1'}], + ['$1']}]). %% --------------------------------------------------------------------------- %% # service_info/2 @@ -1542,7 +1894,8 @@ transports(#state{watchdogT = WatchdogT}) -> -define(OTHER_INFO, [connections, name, peers, - statistics]). + statistics, + info]). service_info(Item, S) when is_atom(Item) -> @@ -1563,7 +1916,7 @@ tagged_info(Item, S) undefined end; -tagged_info(TPid, #state{watchdogT = WatchdogT, peerT = PeerT}) +tagged_info(TPid, #state{watchdogT = WatchdogT, local = {PeerT, _, _}}) when is_pid(TPid) -> try [#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid), @@ -1632,6 +1985,7 @@ complete_info(Item, #state{service = Svc} = S) -> keys -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO; all -> service_info(?ALL_INFO, S); statistics -> info_stats(S); + info -> info_info(S); connections -> info_connections(S); peers -> info_peers(S) end. @@ -1678,31 +2032,43 @@ info_transport(S) -> [], PeerD). -%% Only a config entry for a listening transport: use it. -transport([[{type, listen}, _] = L]) -> - L ++ [{accept, []}]; - -%% Only one config or peer entry for a connecting transport: use it. -transport([[{type, connect} | _] = L]) -> - L; +%% Single config entry. Distinguish between pool_size config or not on +%% a connecting transport for backwards compatibility: with the option +%% the form is similar to the listening case, with connections grouped +%% in a pool tuple (for lack of a better name), without as before. +transport([[{type, Type}, {options, Opts}] = L]) + when Type == listen; + Type == connect -> + L ++ [{K, []} || [{_,K}] <- [keys(Type, Opts)]]; %% Peer entries: discard config. Note that the peer entries have %% length at least 3. transport([[_,_] | L]) -> transport(L); -%% Possibly many peer entries for a listening transport. Note that all -%% have the same options by construction, which is not terribly space -%% efficient. -transport([[{type, accept}, {options, Opts} | _] | _] = Ls) -> - [{type, listen}, +%% Multiple tranports. Note that all have the same options by +%% construction, which is not terribly space efficient. +transport([[{type, Type}, {options, Opts} | _] | _] = Ls) -> + transport(keys(Type, Opts), Ls). + +%% Group transports in an accept or pool tuple ... +transport([{Type, Key}], [[{type, _}, {options, Opts} | _] | _] = Ls) -> + [{type, Type}, {options, Opts}, - {accept, [lists:nthtail(2,L) || L <- Ls]}]. + {Key, [tl(tl(L)) || L <- Ls]}]; + +%% ... or not: there can only be one. +transport([], [L]) -> + L. -peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) -> +keys(connect = T, Opts) -> + [{T, pool} || lists:keymember(pool_size, 1, Opts)]; +keys(_, _) -> + [{listen, accept}]. + +peer_dict(#state{watchdogT = WatchdogT, local = {PeerT, _, _}}, Dict0) -> try ets:tab2list(WatchdogT) of - L -> - lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L) + L -> lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L) catch error: badarg -> Dict0 %% service has gone down end. @@ -1714,12 +2080,11 @@ peer_acc(PeerT, Acc, #watchdog{pid = Pid, state = WS, started = At, peer = TPid}) -> - dict:append(Ref, - [{type, Type}, - {options, Opts}, - {watchdog, {Pid, At, WS}} - | info_peer(PeerT, TPid, WS)], - Acc). + Info = [{type, Type}, + {options, Opts}, + {watchdog, {Pid, At, WS}} + | info_peer(PeerT, TPid, WS)], + dict:append(Ref, Info ++ [{info, info_process_info(Info)}], Acc). info_peer(PeerT, TPid, WS) when is_pid(TPid), WS /= ?WD_DOWN -> @@ -1731,6 +2096,49 @@ info_peer(PeerT, TPid, WS) info_peer(_, _, _) -> []. +info_process_info(Info) -> + lists:flatmap(fun ipi/1, Info). + +ipi({watchdog, {Pid, _, _}}) -> + info_pid(Pid); + +ipi({peer, {Pid, _}}) -> + info_pid(Pid); + +ipi({port, [{owner, Pid} | _]}) -> + info_pid(Pid); + +ipi(_) -> + []. + +info_pid(Pid) -> + case process_info(Pid, [message_queue_len, memory, binary]) of + undefined -> + []; + L -> + [{Pid, lists:map(fun({K,V}) -> {K, map_info(K,V)} end, L)}] + end. + +%% The binary list consists of 3-tuples {Ptr, Size, Count}, where Ptr +%% is a C pointer value, Size is the size of a referenced binary in +%% bytes, and Count is a global reference count. The same Ptr can +%% occur multiple times, once for each reference on the process heap. +%% In this case, the corresponding tuples will have Size in common but +%% Count may differ just because no global lock is taken when the +%% value is retrieved. +%% +%% The list can be quite large, and we aren't often interested in the +%% pointers or counts, so whittle this down to the number of binaries +%% referenced and their total byte count. +map_info(binary, L) -> + SzD = lists:foldl(fun({P,S,_}, D) -> dict:store(P,S,D) end, + dict:new(), + L), + {dict:size(SzD), dict:fold(fun(_,S,N) -> S + N end, 0, SzD)}; + +map_info(_, T) -> + T. + %% The point of extracting the config here is so that 'transport' info %% has one entry for each transport ref, the peer table only %% containing entries that have a living watchdog. @@ -1788,6 +2196,13 @@ mk_app(#diameter_app{} = A) -> info_pending(#state{} = S) -> diameter_traffic:pending(transports(S)). +%% info_info/1 +%% +%% Extract process_info from connections info. + +info_info(S) -> + [I || L <- conn_list(S), {info, I} <- L]. + %% info_connections/1 %% %% One entry per transport connection. Statistics for each entry are |