aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_service.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base/diameter_service.erl')
-rw-r--r--lib/diameter/src/base/diameter_service.erl955
1 files changed, 685 insertions, 270 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index 9dd8aafc61..ccf68f4d93 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -1,18 +1,19 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
%%
-%% The contents of this file are subject to the Erlang Public License,
-%% Version 1.1, (the "License"); you may not use this file except in
-%% compliance with the License. You should have received a copy of the
-%% Erlang Public License along with this software. If not, it can be
-%% retrieved online at http://www.erlang.org/.
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and limitations
-%% under the License.
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
%%
%% %CopyrightEnd%
%%
@@ -31,6 +32,7 @@
-export([subscribe/1,
unsubscribe/1,
services/0,
+ peer_info/1,
info/2]).
%% towards diameter_config
@@ -82,16 +84,8 @@
-define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1
-define(RESTART_TC, 1000). %% if restart was this recent
-%% Used to be able to swap this with anything else dict-like but now
-%% rely on the fact that a service's #state{} record does not change
-%% in storing in it ?STATE table and not always going through the
-%% service process. In particular, rely on the fact that operations on
-%% a ?Dict don't change the handle to it.
--define(Dict, diameter_dict).
-
-%% Maintains state in a table. In contrast to previously, a service's
-%% stat is not constant and is accessed outside of the service
-%% process.
+%% Maintain state in a table since a service's state is accessed
+%% outside of the service process.
-define(STATE_TABLE, ?MODULE).
%% The default sequence mask.
@@ -111,45 +105,49 @@
%% to determine whether or not we need to call the process for a
%% pick_peer callback in the statefull case.
-record(state,
- {id = now(),
+ {id = diameter_lib:now(),
service_name :: diameter:service_name(), %% key in ?STATE_TABLE
service :: #diameter_service{},
watchdogT = ets_new(watchdogs) %% #watchdog{} at start
:: ets:tid(),
- peerT = ets_new(peers) %% #peer{pid = TPid} at okay/reopen
- :: ets:tid(),
- shared_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...]
- :: ets:tid(),
- local_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...]
- :: ets:tid(),
+ peerT, %% undefined in new code, but remain for upgrade
+ shared_peers, %% reasons. Replaced by local/remote.
+ local_peers, %%
+ local :: {ets:tid(), ets:tid(), ets:tid()},
+ remote :: {ets:tid(), ets:tid(), ets:tid()},
monitor = false :: false | pid(), %% process to die with
options
:: [{sequence, diameter:sequence()} %% sequence mask
| {share_peers, diameter:remotes()} %% broadcast to
| {use_shared_peers, diameter:remotes()} %% use from
- | {restrict_connections, diameter:restriction()}]}).
+ | {restrict_connections, diameter:restriction()}
+ | {strict_mbit, boolean()}
+ | {string_decode, boolean()}
+ | {incoming_maxlen, diameter:message_length()}]}).
%% shared_peers reflects the peers broadcast from remote nodes.
%% Record representing an RFC 3539 watchdog process implemented by
%% diameter_watchdog.
-record(watchdog,
- {pid :: match(pid()),
+ {pid :: match(pid()) | undefined,
type :: match(connect | accept),
ref :: match(reference()), %% key into diameter_config
options :: match([diameter:transport_opt()]),%% from start_transport
state = ?WD_INITIAL :: match(wd_state()),
- started = now(), %% at process start
+ started = diameter_lib:now(),%% at process start
peer = false :: match(boolean() | pid())}).
- %% true at accepted, pid() at okay/reopen
+ %% true at accepted/remove, pid() at okay/reopen
-%% Record representing an Peer State Machine processes implemented by
+%% Record representing a Peer State Machine processes implemented by
%% diameter_peer_fsm.
-record(peer,
- {pid :: pid(),
- apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias}
- caps :: #diameter_caps{},
- started = now(), %% at process start
- watchdog :: pid()}). %% key into watchdogT
+ {pid :: pid(),
+ apps :: match([{0..16#FFFFFFFF, diameter:app_alias()}] %% {Id, Alias}
+ | [diameter:app_alias()]), %% remote
+ caps :: match(#diameter_caps{}),
+ started = diameter_lib:now(), %% at process start or sharing
+ watchdog :: match(pid() %% key into watchdogT
+ | undefined)}). %% undefined if remote
%% ---------------------------------------------------------------------------
%% # start/1
@@ -177,7 +175,7 @@ stop(SvcName) ->
end.
stop(ok, Pid) ->
- MRef = erlang:monitor(process, Pid),
+ MRef = monitor(process, Pid),
receive {'DOWN', MRef, process, _, _} -> ok end;
stop(No, _) ->
No.
@@ -204,7 +202,7 @@ stop_transport(SvcName, [_|_] = Refs) ->
info(SvcName, Item) ->
case lookup_state(SvcName) of
- [#state{} = S] ->
+ [S] ->
service_info(Item, S);
[] ->
undefined
@@ -213,7 +211,35 @@ info(SvcName, Item) ->
%% lookup_state/1
lookup_state(SvcName) ->
- ets:lookup(?STATE_TABLE, SvcName).
+ case ets:lookup(?STATE_TABLE, SvcName) of
+ [#state{}] = L ->
+ L;
+ _ ->
+ []
+ end.
+
+%% ---------------------------------------------------------------------------
+%% # peer_info/2
+%% ---------------------------------------------------------------------------
+
+%% An extended version of info_peer/1 for peer_info/1.
+peer_info(Pid) ->
+ try
+ {_, PD} = process_info(Pid, dictionary),
+ {_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD),
+ {TPid, {{Type, Ref}, TMod, Cfg}} = T,
+ {_, TD} = process_info(TPid, dictionary),
+ {_, Data} = lists:keyfind({TMod, info}, 1, TD),
+ [{ref, Ref},
+ {type, Type},
+ {owner, TPid},
+ {module, TMod},
+ {config, Cfg}
+ | try TMod:info(Data) catch _:_ -> [] end]
+ catch
+ error:_ ->
+ []
+ end.
%% ---------------------------------------------------------------------------
%% # subscribe/1
@@ -224,7 +250,7 @@ subscribe(SvcName) ->
diameter_reg:add({?MODULE, subscriber, SvcName}).
unsubscribe(SvcName) ->
- diameter_reg:del({?MODULE, subscriber, SvcName}).
+ diameter_reg:remove({?MODULE, subscriber, SvcName}).
subscriptions(Pat) ->
pmap(diameter_reg:match({?MODULE, subscriber, Pat})).
@@ -258,16 +284,22 @@ whois(SvcName) ->
%% ---------------------------------------------------------------------------
-spec pick_peer(SvcName, AppOrAlias, Opts)
- -> {{TPid, Caps, App}, Mask}
- | false
- | {error, term()}
+ -> {{TPid, Caps, App}, Mask, SvcOpts}
+ | false %% no selection
+ | {error, no_service}
when SvcName :: diameter:service_name(),
- AppOrAlias :: {alias, diameter:app_alias()} | #diameter_app{},
- Opts :: tuple(),
+ AppOrAlias :: #diameter_app{}
+ | {alias, diameter:app_alias()},
+ Opts :: {fun((Dict :: module()) -> [term()]),
+ diameter:peer_filter(),
+ Xtra :: list()},
TPid :: pid(),
Caps :: #diameter_caps{},
App :: #diameter_app{},
- Mask :: diameter:sequence().
+ Mask :: diameter:sequence(),
+ SvcOpts :: [diameter:service_opt()].
+%% Extract Mask in the returned tuple so that diameter_traffic doesn't
+%% need to know about the ordering of SvcOpts used here.
pick_peer(SvcName, App, Opts) ->
pick(lookup_state(SvcName), App, Opts).
@@ -284,10 +316,10 @@ pick(#state{service = #diameter_service{applications = Apps}}
Opts) -> %% initial call from diameter:call/4
pick(S, find_outgoing_app(Alias, Apps), Opts);
-pick(_, false, _) ->
- false;
+pick(_, false = No, _) ->
+ No;
-pick(#state{options = [{_, Mask} | _]}
+pick(#state{options = [{_, Mask} | SvcOpts]}
= S,
#diameter_app{module = ModX, dictionary = Dict}
= App0,
@@ -296,7 +328,7 @@ pick(#state{options = [{_, Mask} | _]}
[_,_] = RealmAndHost = diameter_lib:eval([DestF, Dict]),
case pick_peer(App, RealmAndHost, Filter, S) of
{TPid, Caps} ->
- {{TPid, Caps, App}, Mask};
+ {{TPid, Caps, App}, Mask, SvcOpts};
false = No ->
No
end.
@@ -480,6 +512,9 @@ transition({service, Pid}, S) ->
transition({peer, TPid, Aliases, Caps}, S) ->
remote_peer_up(TPid, Aliases, Caps, S),
ok;
+transition({peer, TPid}, S) ->
+ remote_peer_down(TPid, S),
+ ok;
%% Remote peer process has died.
transition({'DOWN', _, process, TPid, _}, S) ->
@@ -499,9 +534,21 @@ transition(Req, S) ->
%% # terminate/2
%% ---------------------------------------------------------------------------
-terminate(Reason, #state{service_name = Name} = S) ->
+terminate(Reason, #state{service_name = Name, local = {PeerT, _, _}} = S) ->
send_event(Name, stop),
ets:delete(?STATE_TABLE, Name),
+
+ %% Communicate pending loss of any peers that connection_down/3
+ %% won't. This is needed when stopping a service since we don't
+ %% wait for watchdog state changes to take care of if. That this
+ %% takes place after deleting the state entry ensures that the
+ %% resulting failover by request processes accomplishes nothing.
+ ets:foldl(fun(#peer{pid = TPid}, _) ->
+ diameter_traffic:peer_down(TPid)
+ end,
+ ok,
+ PeerT),
+
shutdown == Reason %% application shutdown
andalso shutdown(application, S).
@@ -509,23 +556,80 @@ terminate(Reason, #state{service_name = Name} = S) ->
%% # code_change/3
%% ---------------------------------------------------------------------------
-code_change(FromVsn,
- #state{service_name = SvcName,
- service = #diameter_service{applications = Apps}}
- = S,
- Extra) ->
- lists:foreach(fun(A) ->
- code_change(FromVsn, SvcName, Extra, A)
- end,
- Apps),
+code_change(_FromVsn, #state{} = S, _Extra) ->
+ {ok, S};
+
+%% Don't support downgrade since we won't in appup.
+code_change({down = T, _}, _, _Extra) ->
+ {error, T};
+
+%% Upgrade local/shared peers dicts populated in old code. Don't
+code_change(_FromVsn, S0, _Extra) ->
+ {state, Id, SvcName, Svc, WT, PeerT, SDict, LDict, Monitor, Opts}
+ = S0,
+
+ init_peers(LT = setelement(1, {PT, _, _} = init_peers(), PeerT),
+ fun({_,A}) -> A end),
+ init_peers(init_peers(RT = init_peers(), SDict),
+ fun(A) -> A end),
+
+ S = #state{id = Id,
+ service_name = SvcName,
+ service = Svc,
+ watchdogT = WT,
+ peerT = PT, %% empty
+ shared_peers = SDict,
+ local_peers = LDict,
+ local = LT,
+ remote = RT,
+ monitor = Monitor,
+ options = Opts},
+
+ %% Replacing the table entry and deleting the old shared tables
+ %% can make outgoing requests return {error, no_connection} until
+ %% everyone is running new code. Don't delete the tables to avoid
+ %% crashing request processes.
+ ets:delete_all_objects(SDict),
+ ets:delete_all_objects(LDict),
+ ets:insert(?STATE_TABLE, S),
{ok, S}.
-code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) ->
- {ok, S} = cb(A, code_change, [FromVsn,
- mod_state(Alias),
- Extra,
- SvcName]),
- mod_state(Alias, S).
+%% init_peers/2
+
+%% Populate app and identity bags from a new-style #peer{} sets.
+init_peers({PeerT, _, _} = T, F)
+ when is_function(F) ->
+ ets:foldl(fun(#peer{pid = P, apps = As, caps = C}, N) ->
+ insert_peer(P, lists:map(F, As), C, T),
+ N+1
+ end,
+ 0,
+ PeerT);
+
+%% Populate #peer{} table given a shared peers dict.
+init_peers({PeerT, _, _}, SDict) ->
+ dict:fold(fun(P, As, N) ->
+ ets:update_element(PeerT, P, {#peer.apps, As}),
+ N+1
+ end,
+ 0,
+ diameter_dict:fold(fun(A, Ps, D) ->
+ init_peers(A, Ps, PeerT, D)
+ end,
+ dict:new(),
+ SDict)).
+
+%% init_peers/4
+
+init_peers(App, TCs, PeerT, Dict) ->
+ lists:foldl(fun({P,C}, D) ->
+ ets:insert(PeerT, #peer{pid = P,
+ apps = [],
+ caps = C}),
+ dict:append(P, App, D)
+ end,
+ Dict,
+ TCs).
%% ===========================================================================
%% ===========================================================================
@@ -533,9 +637,6 @@ code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) ->
unexpected(F, A, #state{service_name = Name}) ->
?UNEXPECTED(F, A ++ [Name]).
-cb(#diameter_app{module = [_|_] = M}, F, A) ->
- eval(M, F, A).
-
eval([M|X], F, A) ->
apply(M, F, A ++ X).
@@ -555,10 +656,6 @@ choose(false, _, X) -> X.
ets_new(Tbl) ->
ets:new(Tbl, [{keypos, 2}]).
-insert(Tbl, Rec) ->
- ets:insert(Tbl, Rec),
- Rec.
-
%% Using the process dictionary for the callback state was initially
%% just a way to make what was horrendous trace (big state record and
%% much else everywhere) somewhat more readable. There's not as much
@@ -579,27 +676,37 @@ mod_state(Alias, ModS) ->
%% remove_transport
shutdown(Refs, #state{watchdogT = WatchdogT})
when is_list(Refs) ->
- ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, WatchdogT);
+ ets:insert(WatchdogT, ets:foldl(fun(R,A) -> st(R, Refs, A) end,
+ [],
+ WatchdogT));
%% application/service shutdown
shutdown(Reason, #state{watchdogT = WatchdogT})
when Reason == application;
Reason == service ->
- diameter_lib:wait(ets:foldl(fun(P,A) -> st(P, Reason, A) end,
+ diameter_lib:wait(ets:foldl(fun(P,A) -> ss(P, Reason, A) end,
[],
WatchdogT)).
-%% st/2
+%% st/3
-st(#watchdog{ref = Ref, pid = Pid}, Refs) ->
- lists:member(Ref, Refs)
- andalso (Pid ! {shutdown, self(), transport}). %% 'DOWN' cleans up
+%% Mark replacement as started so that a subsequent accept doesn't
+%% result in a new process that isn't terminated.
+st(#watchdog{ref = Ref, pid = Pid, peer = P} = Rec, Refs, Acc) ->
+ case lists:member(Ref, Refs) of
+ true ->
+ Pid ! {shutdown, self(), transport}, %% 'DOWN' cleans up
+ [Rec#watchdog{peer = true} || P == false] ++ Acc;
+ false ->
+ Acc
+ end.
-%% st/3
+%% ss/3
-st(#watchdog{pid = Pid}, Reason, Acc) ->
+ss(#watchdog{pid = Pid}, Reason, Acc) ->
+ MRef = monitor(process, Pid),
Pid ! {shutdown, self(), Reason},
- [Pid | Acc].
+ [MRef | Acc].
%% ---------------------------------------------------------------------------
%% # call_service/2
@@ -658,6 +765,8 @@ cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts},
lists:foreach(fun init_mod/1, Apps),
S = #state{service_name = SvcName,
service = Rec#diameter_service{pid = self()},
+ local = init_peers(),
+ remote = init_peers(),
monitor = mref(get_value(monitor, Opts)),
options = service_options(Opts)},
{S, Acc};
@@ -667,19 +776,30 @@ cfg_acc({_Ref, Type, _Opts} = T, {S, Acc})
Type == listen ->
{S, [T | Acc]}.
+init_peers() ->
+ {ets_new(caps), %% #peer{}
+ ets:new(apps, [bag]), %% {Alias, TPid}
+ ets:new(idents, [bag])}. %% {{host, OH} | {realm, OR} | {OR, OH},
+ %% Alias,
+ %% TPid}
+
service_options(Opts) ->
[{sequence, proplists:get_value(sequence, Opts, ?NOMASK)},
{share_peers, get_value(share_peers, Opts)},
{use_shared_peers, get_value(use_shared_peers, Opts)},
{restrict_connections, proplists:get_value(restrict_connections,
Opts,
- ?RESTRICT)}].
+ ?RESTRICT)},
+ {spawn_opt, proplists:get_value(spawn_opt, Opts, [])},
+ {string_decode, proplists:get_value(string_decode, Opts, true)},
+ {incoming_maxlen, proplists:get_value(incoming_maxlen, Opts, 16#FFFFFF)},
+ {strict_mbit, proplists:get_value(strict_mbit, Opts, true)}].
%% The order of options is significant since we match against the list.
mref(false = No) ->
No;
mref(P) ->
- erlang:monitor(process, P).
+ monitor(process, P).
init_shared(#state{options = [_, _, {_,T} | _],
service_name = Svc}) ->
@@ -700,8 +820,7 @@ notify(Share, SvcName, T) ->
Nodes = remotes(Share),
[] /= Nodes andalso diameter_peer:notify(Nodes, SvcName, T).
%% Test for the empty list for upgrade reasons: there's no
-%% diameter_peer:notify/3 in old code so no call means no load order
-%% requirement.
+%% diameter_peer:notify/3 in old code.
remotes(false) ->
[];
@@ -719,14 +838,27 @@ remotes(F) ->
L when is_list(L) ->
L;
T ->
- diameter_lib:error_report({invalid_return, T}, F),
+ ?LOG(invalid_return, {F,T}),
+ error_report(invalid_return, share_peers, F),
[]
catch
E:R ->
- diameter_lib:error_report({failure, {E, R, ?STACK}}, F),
+ ?LOG(failure, {E, R, F, diameter_lib:get_stacktrace()}),
+ error_report(failure, share_peers, F),
[]
end.
+%% error_report/3
+
+error_report(T, What, F) ->
+ Reason = io_lib:format("~s from ~p callback", [reason(T), What]),
+ diameter_lib:error_report(Reason, diameter_lib:eval_name(F)).
+
+reason(invalid_return) ->
+ "invalid return";
+reason(failure) ->
+ "failure".
+
%% ---------------------------------------------------------------------------
%% # start/3
%% ---------------------------------------------------------------------------
@@ -740,8 +872,9 @@ remotes(F) ->
start(Ref, {T, Opts}, S)
when T == connect;
T == listen ->
+ N = proplists:get_value(pool_size, Opts, 1),
try
- {ok, start(Ref, type(T), Opts, S)}
+ {ok, start(Ref, type(T), Opts, N, S)}
catch
?FAILURE(Reason) ->
{error, Reason}
@@ -759,26 +892,44 @@ type(connect = T) -> T.
%% start/4
-start(Ref, Type, Opts, #state{watchdogT = WatchdogT,
- peerT = PeerT,
- options = SvcOpts,
- service_name = SvcName,
- service = Svc0})
+start(Ref, Type, Opts, State) ->
+ start(Ref, Type, Opts, 1, State).
+
+%% start/5
+
+start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT,
+ local = {PeerT, _, _},
+ options = SvcOpts,
+ service_name = SvcName,
+ service = Svc0})
when Type == connect;
Type == accept ->
#diameter_service{applications = Apps}
- = Svc
+ = Svc1
= merge_service(Opts, Svc0),
- {_,_} = Mask = proplists:get_value(sequence, SvcOpts),
- RecvData = diameter_traffic:make_recvdata([SvcName, PeerT, Apps, Mask]),
- Pid = s(Type, Ref, {{spawn_opts([Opts, SvcOpts]), RecvData},
- Opts,
- SvcOpts,
- Svc}),
- insert(WatchdogT, #watchdog{pid = Pid,
- type = Type,
- ref = Ref,
- options = Opts}),
+ Svc = binary_caps(Svc1, proplists:get_value(string_decode, SvcOpts, true)),
+ RecvData = diameter_traffic:make_recvdata([SvcName,
+ PeerT,
+ Apps,
+ SvcOpts]),
+ T = {{spawn_opts([Opts, SvcOpts]), RecvData}, Opts, SvcOpts, Svc},
+ Rec = #watchdog{type = Type,
+ ref = Ref,
+ options = Opts},
+ diameter_lib:fold_n(fun(_,A) ->
+ [wd(Type, Ref, T, WatchdogT, Rec) | A]
+ end,
+ [],
+ N).
+
+binary_caps(Svc, true) ->
+ Svc;
+binary_caps(#diameter_service{capabilities = Caps} = Svc, false) ->
+ Svc#diameter_service{capabilities = diameter_capx:binary_caps(Caps)}.
+
+wd(Type, Ref, T, WatchdogT, Rec) ->
+ Pid = start_watchdog(Type, Ref, T),
+ ets:insert(WatchdogT, Rec#watchdog{pid = Pid}),
Pid.
%% Note that the service record passed into the watchdog is the merged
@@ -791,7 +942,7 @@ spawn_opts(Optss) ->
T /= link,
T /= monitor].
-s(Type, Ref, T) ->
+start_watchdog(Type, Ref, T) ->
{_MRef, Pid} = diameter_watchdog:start({Type, Ref}, T),
Pid.
@@ -812,7 +963,7 @@ ms({applications, As}, #diameter_service{applications = Apps} = S)
%% The fact that all capabilities can be configured on the transports
%% means that the service doesn't necessarily represent a single
-%% locally implemented Diameter peer as identified by Origin-Host: a
+%% locally implemented Diameter node as identified by Origin-Host: a
%% transport can configure its own Origin-Host. This means that the
%% service little more than a placeholder for default capabilities
%% plus a list of applications that individual transports can choose
@@ -832,11 +983,22 @@ ms(_, Svc) ->
%% ---------------------------------------------------------------------------
accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) ->
- #watchdog{ref = Ref, type = accept = T, peer = false, options = Opts}
+ #watchdog{type = accept = T, peer = P}
= Wd
= fetch(WatchdogT, Pid),
- insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement as started
- start(Ref, T, Opts, S). %% start new watchdog
+ if not P ->
+ #watchdog{ref = Ref, options = Opts} = Wd,
+ %% Mark replacement started, and start new watchdog.
+ ets:insert(WatchdogT, Wd#watchdog{peer = true}),
+ start(Ref, T, Opts, S);
+ P ->
+ %% Transport removal in progress: true has been set in
+ %% shutdown/2, and the transport will die as a
+ %% consequence.
+ ok
+ end.
+
+%% fetch/2
fetch(Tid, Key) ->
[T] = ets:lookup(Tid, Key),
@@ -862,14 +1024,15 @@ watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) ->
#watchdog{peer = TPid} = Wd, %% assert
connection_up(Wd, State);
-%% Watchdog has an unresponsive connection.
+%% Watchdog has an unresponsive connection. Note that the peer table
+%% entry isn't removed until DOWN.
watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) ->
#watchdog{peer = TPid} = Wd, %% assert
watchdog_down(Wd, To, State);
%% Watchdog has lost its connection.
-watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) ->
- close(Wd, S),
+watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{local = {PeerT, _, _}} = S) ->
+ close(Wd),
watchdog_down(Wd, To, S),
ets:delete(PeerT, TPid);
@@ -877,7 +1040,7 @@ watchdog(_, [], _, _, _, _) ->
ok.
watchdog_down(Wd, To, #state{watchdogT = WatchdogT} = S) ->
- insert(WatchdogT, Wd#watchdog{state = To}),
+ ets:insert(WatchdogT, Wd#watchdog{state = To}),
connection_down(Wd, To, S).
%% ---------------------------------------------------------------------------
@@ -889,14 +1052,14 @@ watchdog_down(Wd, To, #state{watchdogT = WatchdogT} = S) ->
connection_up({TPid, {Caps, SupportedApps, Pkt}},
#watchdog{pid = Pid}
= Wd,
- #state{peerT = PeerT}
+ #state{local = {PeerT, _, _}}
= S) ->
- Pr = #peer{pid = TPid,
- apps = SupportedApps,
- caps = Caps,
- watchdog = Pid},
- insert(PeerT, Pr),
- connection_up([Pkt], Wd#watchdog{peer = TPid}, Pr, S).
+ Rec = #peer{pid = TPid,
+ apps = SupportedApps,
+ caps = Caps,
+ watchdog = Pid},
+ ets:insert(PeerT, Rec),
+ connection_up([Pkt], Wd#watchdog{peer = TPid}, Rec, S).
%% ---------------------------------------------------------------------------
%% # reopen/3
@@ -906,22 +1069,23 @@ reopen({TPid, {Caps, SupportedApps, _Pkt}},
#watchdog{pid = Pid}
= Wd,
#state{watchdogT = WatchdogT,
- peerT = PeerT}) ->
- insert(PeerT, #peer{pid = TPid,
- apps = SupportedApps,
- caps = Caps,
- watchdog = Pid}),
- insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN,
- peer = TPid}).
+ local = {PeerT, _, _}}) ->
+ ets:insert(PeerT, #peer{pid = TPid,
+ apps = SupportedApps,
+ caps = Caps,
+ watchdog = Pid}),
+ ets:insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN,
+ peer = TPid}).
%% ---------------------------------------------------------------------------
%% # connection_up/2
%% ---------------------------------------------------------------------------
-%% Watchdog has recovered as suspect connection. Note that there has
+%% Watchdog has recovered a suspect connection. Note that there has
%% been no new capabilties exchange in this case.
-connection_up(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
+connection_up(#watchdog{peer = TPid} = Wd, #state{local = {PeerT, _, _}}
+ = S) ->
connection_up([], Wd, fetch(PeerT, TPid), S).
%% connection_up/4
@@ -932,23 +1096,23 @@ connection_up(Extra,
#peer{apps = SApps, caps = Caps}
= Pr,
#state{watchdogT = WatchdogT,
- local_peers = LDict,
+ local = LT,
service_name = SvcName,
service = #diameter_service{applications = Apps}}
= S) ->
- insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}),
+ ets:insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}),
diameter_traffic:peer_up(TPid),
- insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
+ local_peer_up(SApps, {TPid, Caps}, {SvcName, Apps}, LT),
report_status(up, Wd, Pr, S, Extra).
-insert_local_peer(SApps, T, LDict) ->
- lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps).
+local_peer_up(SApps, {TPid, Caps} = TC, SA, LT) ->
+ insert_peer(TPid, [A || {_,A} <- SApps], Caps, LT),
+ lists:foreach(fun(A) -> peer_up(A, TC, SA) end, SApps).
-ilp({Id, Alias}, {TC, SA}, LDict) ->
- init_conn(Id, Alias, TC, SA),
- ?Dict:append(Alias, TC, LDict).
+peer_up({Id, Alias}, TC, SA) ->
+ peer_up(Id, Alias, TC, SA).
-init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) ->
+peer_up(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) ->
#diameter_app{id = Id} %% assert
= App
= find_app(Alias, Apps),
@@ -1026,8 +1190,11 @@ peer_cb(App, F, A) ->
true
catch
E:R ->
- diameter_lib:error_report({failure, {E, R, ?STACK}},
- {App, F, A}),
+ %% Don't include arguments since a #diameter_caps{} strings
+ %% from the peer, which could be anything (especially, large).
+ [Mod|X] = App#diameter_app.module,
+ ?LOG(failure, {E, R, Mod, F, diameter_lib:get_stacktrace()}),
+ error_report(failure, F, {Mod, F, A ++ X}),
false
end.
@@ -1043,17 +1210,17 @@ connection_down(#watchdog{state = ?WD_OKAY,
= Pr,
#state{service_name = SvcName,
service = #diameter_service{applications = Apps},
- local_peers = LDict}
+ local = LT}
= S) ->
report_status(down, Wd, Pr, S, []),
- remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
+ local_peer_down(SApps, {TPid, Caps}, {SvcName, Apps}, LT),
diameter_traffic:peer_down(TPid);
connection_down(#watchdog{state = ?WD_OKAY,
peer = TPid}
= Wd,
To,
- #state{peerT = PeerT}
+ #state{local = {PeerT, _, _}}
= S)
when is_atom(To) ->
connection_down(Wd, #peer{} = fetch(PeerT, TPid), S);
@@ -1061,15 +1228,14 @@ connection_down(#watchdog{state = ?WD_OKAY,
connection_down(#watchdog{}, _, _) ->
ok.
-remove_local_peer(SApps, T, LDict) ->
- lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps).
+local_peer_down(SApps, {TPid, _Caps} = TC, SA, LT) ->
+ delete_peer(TPid, LT),
+ lists:foreach(fun(A) -> peer_down(A, TC, SA) end, SApps).
-rlp({Id, Alias}, {TC, SA}, LDict) ->
- L = ?Dict:fetch(Alias, LDict),
- down_conn(Id, Alias, TC, SA),
- ?Dict:store(Alias, lists:delete(TC, L), LDict).
+peer_down({Id, Alias}, TC, SA) ->
+ peer_down(Id, Alias, TC, SA).
-down_conn(Id, Alias, TC, {SvcName, Apps}) ->
+peer_down(Id, Alias, TC, {SvcName, Apps}) ->
#diameter_app{id = Id} %% assert
= App
= find_app(Alias, Apps),
@@ -1094,7 +1260,7 @@ wd_down(#watchdog{peer = B}, _)
ok;
%% ... or maybe it has.
-wd_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
+wd_down(#watchdog{peer = TPid} = Wd, #state{local = {PeerT, _, _}} = S) ->
connection_down(Wd, ?WD_DOWN, S),
ets:delete(PeerT, TPid).
@@ -1142,15 +1308,22 @@ q_restart(false, _) ->
%% communicate.
default_tc(connect, Opts) ->
- proplists:get_value(reconnect_timer, Opts, ?DEFAULT_TC);
+ connect_timer(Opts, ?DEFAULT_TC);
default_tc(accept, _) ->
0.
+%% Accept both connect_timer and the (older) reconnect_timer, the
+%% latter being a remnant from a time in which the timer did apply to
+%% reconnect attempts.
+connect_timer(Opts, Def0) ->
+ Def = proplists:get_value(reconnect_timer, Opts, Def0),
+ proplists:get_value(connect_timer, Opts, Def).
+
%% Bound tc below if the watchdog was restarted recently to avoid
%% continuous restarted in case of faulty config or other problems.
tc(Time, Tc) ->
choose(Tc > ?RESTART_TC
- orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC,
+ orelse diameter_lib:micro_diff(Time) > 1000*?RESTART_TC,
Tc,
?RESTART_TC).
@@ -1164,8 +1337,7 @@ start_tc(Tc, T, _) ->
tc_timeout({Ref, _Type, _Opts} = T, #state{service_name = SvcName} = S) ->
tc(diameter_config:have_transport(SvcName, Ref), T, S).
-tc(true, {Ref, Type, Opts}, #state{service_name = SvcName}
- = S) ->
+tc(true, {Ref, Type, Opts}, #state{service_name = SvcName} = S) ->
send_event(SvcName, {reconnect, Ref, Opts}),
start(Ref, Type, Opts, S);
tc(false = No, _, _) -> %% removed
@@ -1180,29 +1352,19 @@ tc(false = No, _, _) -> %% removed
%% another watchdog to be able to detect that it should transition
%% from initial into reopen rather than okay. That someone is either
%% the accepting watchdog upon reception of a CER from the previously
-%% connected peer, or us after reconnect_timer timeout.
+%% connected peer, or us after connect_timer timeout or immediately.
-close(#watchdog{type = connect}, _) ->
+close(#watchdog{type = connect}) ->
ok;
+
close(#watchdog{type = accept,
pid = Pid,
- ref = Ref,
- options = Opts},
- #state{service_name = SvcName}) ->
- c(Pid, diameter_config:have_transport(SvcName, Ref), Opts).
-
-%% Tell watchdog to (maybe) die later ...
-c(Pid, true, Opts) ->
- Tc = proplists:get_value(reconnect_timer, Opts, 2*?DEFAULT_TC),
- erlang:send_after(Tc, Pid, close);
-
-%% ... or now.
-c(Pid, false, _Opts) ->
- Pid ! close.
-
-%% The RFC's only document the behaviour of Tc, our reconnect_timer,
+ options = Opts}) ->
+ Tc = connect_timer(Opts, 2*?DEFAULT_TC),
+ erlang:send_after(Tc, Pid, close).
+%% The RFC's only document the behaviour of Tc, our connect_timer,
%% for the establishment of connections but we also give
-%% reconnect_timer semantics for a listener, being the time within
+%% connect_timer semantics for a listener, being the time within
%% which a new connection attempt is expected of a connecting peer.
%% The value should be greater than the peer's Tc + jitter.
@@ -1253,13 +1415,14 @@ cm([#diameter_app{alias = Alias} = App], Req, From, Svc) ->
mod_state(Alias, ModS),
{T, RC};
T ->
- diameter_lib:error_report({invalid, T},
- {App, handle_call, Args}),
+ ModX = App#diameter_app.module,
+ ?LOG(invalid_return, {ModX, handle_call, Args, T}),
invalid
catch
E: Reason ->
- diameter_lib:error_report({failure, {E, Reason, ?STACK}},
- {App, handle_call, Args}),
+ ModX = App#diameter_app.module,
+ Stack = diameter_lib:get_stacktrace(),
+ ?LOG(failure, {E, Reason, ModX, handle_call, Stack}),
failure
end;
@@ -1304,19 +1467,37 @@ share_peer(up, Caps, Apps, TPid, #state{options = [_, {_,T} | _],
service_name = Svc}) ->
notify(T, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps});
-share_peer(_, _, _, _, _) ->
- ok.
+share_peer(down, _Caps, _Apps, TPid, #state{options = [_, {_,T} | _],
+ service_name = Svc}) ->
+ notify(T, Svc, {peer, TPid}).
%% ---------------------------------------------------------------------------
%% # share_peers/2
%% ---------------------------------------------------------------------------
-share_peers(Pid, #state{options = [_, {_,T} | _], local_peers = PDict}) ->
- is_remote(Pid, T)
- andalso ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict).
-
-sp(Pid, Alias, Peers) ->
- lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers).
+share_peers(Pid, #state{options = [_, {_,SP} | _],
+ local = {PeerT, AppT, _}}) ->
+ is_remote(Pid, SP)
+ andalso ets:foldl(fun(T, N) -> N + sp(Pid, AppT, T) end,
+ 0,
+ PeerT).
+
+%% An entry in the peer table doesn't mean the watchdog state is OKAY,
+%% an entry in the app table does.
+
+sp(Pid, AppT, #peer{pid = TPid,
+ apps = [{_, Alias} | _] = Apps,
+ caps = Caps}) ->
+ Spec = [{{'$1', TPid},
+ [{'==', '$1', {const, Alias}}],
+ ['$_']}],
+ case ets:select(AppT, Spec, 1) of
+ '$end_of_table' ->
+ 0;
+ _ ->
+ Pid ! {peer, TPid, [A || {_,A} <- Apps], Caps},
+ 1
+ end.
is_remote(Pid, T) ->
Node = node(Pid),
@@ -1326,32 +1507,49 @@ is_remote(Pid, T) ->
%% # remote_peer_up/4
%% ---------------------------------------------------------------------------
-remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) ->
- is_remote(Pid, T)
- andalso rpu(Pid, Aliases, Caps, S).
+remote_peer_up(TPid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) ->
+ is_remote(TPid, T) andalso rpu(TPid, Aliases, Caps, S).
-rpu(Pid, Aliases, Caps, #state{service = Svc, shared_peers = PDict}) ->
+rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) ->
#diameter_service{applications = Apps} = Svc,
Key = #diameter_app.alias,
F = fun(A) -> lists:keymember(A, Key, Apps) end,
- rpu(Pid, lists:filter(F, Aliases), Caps, PDict);
+ rpu(TPid, lists:filter(F, Aliases), Caps, RT);
rpu(_, [] = No, _, _) ->
No;
-rpu(Pid, Aliases, Caps, PDict) ->
- erlang:monitor(process, Pid),
- T = {Pid, Caps},
- lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases).
+rpu(TPid, Aliases, Caps, {PeerT, _, _} = RT) ->
+ monitor(process, TPid),
+ ets:insert(PeerT, #peer{pid = TPid,
+ apps = Aliases,
+ caps = Caps}),
+ insert_peer(TPid, Aliases, Caps, RT).
+
+%% insert_peer/4
+
+insert_peer(TPid, Aliases, Caps, {_PeerT, AppT, IdentT}) ->
+ #diameter_caps{origin_host = {_, OH},
+ origin_realm = {_, OR}}
+ = Caps,
+ ets:insert(AppT, [{A, TPid} || A <- Aliases]),
+ H = iolist_to_binary(OH),
+ R = iolist_to_binary(OR),
+ ets:insert(IdentT, [{T, A, TPid} || T <- [{host, H}, {realm, R}, {R, H}],
+ A <- Aliases]).
%% ---------------------------------------------------------------------------
%% # remote_peer_down/2
%% ---------------------------------------------------------------------------
-remote_peer_down(Pid, #state{shared_peers = PDict}) ->
- lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)).
+remote_peer_down(TPid, #state{remote = {PeerT, _, _} = RT}) ->
+ ets:delete(PeerT, TPid),
+ delete_peer(TPid, RT).
-rpd(Pid, Alias, PDict) ->
- ?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict).
+%% delete_peer/2
+
+delete_peer(TPid, {_PeerT, AppT, IdentT}) ->
+ ets:select_delete(AppT, [{{'_', TPid}, [], [true]}]),
+ ets:select_delete(IdentT, [{{'_', '_', TPid}, [], [true]}]).
%% ---------------------------------------------------------------------------
%% pick_peer/4
@@ -1361,12 +1559,12 @@ pick_peer(#diameter_app{alias = Alias}
= App,
RealmAndHost,
Filter,
- #state{local_peers = L,
- shared_peers = S,
+ #state{local = LT,
+ remote = RT,
service_name = SvcName,
service = #diameter_service{pid = Pid}}) ->
- pick_peer(peers(Alias, RealmAndHost, Filter, L),
- peers(Alias, RealmAndHost, Filter, S),
+ pick_peer(peers(Alias, RealmAndHost, Filter, LT),
+ peers(Alias, RealmAndHost, Filter, RT),
Pid,
SvcName,
App).
@@ -1382,6 +1580,8 @@ pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App)
case call_service(Pid, {pick_peer, Local, Remote, App}) of
{TPid, _} = T when is_pid(TPid) ->
T;
+ false = No ->
+ No;
{error, _} ->
false
end;
@@ -1415,56 +1615,211 @@ pick_peer(Local,
T; %% Accept returned state in the immutable
{false = No, S} -> %% case as long it isn't changed.
No;
- T ->
- diameter_lib:error_report({invalid, T, App},
- {App, pick_peer, Args})
+ T when M ->
+ ModX = App#diameter_app.module,
+ ?LOG(invalid_return, {ModX, pick_peer, T}),
+ false
catch
- E: Reason ->
- diameter_lib:error_report({failure, {E, Reason, ?STACK}},
- {App, pick_peer, Args})
+ E: Reason when M ->
+ ModX = App#diameter_app.module,
+ Stack = diameter_lib:get_stacktrace(),
+ ?LOG(failure, {E, Reason, ModX, pick_peer, Stack}),
+ false
end.
%% peers/4
-peers(Alias, RH, Filter, Peers) ->
- case ?Dict:find(Alias, Peers) of
- {ok, L} ->
- ps(L, RH, Filter, {[],[]});
- error ->
+peers(Alias, RH, Filter, T) ->
+ filter(Alias, RH, Filter, T, true).
+
+%% filter/5
+%%
+%% Try to limit the peers list by starting with a host/realm lookup.
+
+filter(Alias, RH, {neg, F}, T, B) ->
+ filter(Alias, RH, F, T, not B);
+
+filter(_, _, none, _, false) ->
+ [];
+
+filter(Alias, _, none, T, true) ->
+ all_peers(Alias, T);
+
+filter(Alias, [DR,DH] = RH, K, T, B)
+ when K == realm, DR == undefined;
+ K == host, DH == undefined ->
+ filter(Alias, RH, none, T, B);
+
+filter(Alias, [DR,_] = RH, realm = K, T, B) ->
+ filter(Alias, RH, {K, DR}, T, B);
+
+filter(Alias, [_,DH] = RH, host = K, T, B) ->
+ filter(Alias, RH, {K, DH}, T, B);
+
+filter(Alias, _, {K, D}, {PeerT, _AppT, IdentT}, true)
+ when K == host;
+ K == realm ->
+ try iolist_to_binary(D) of
+ B ->
+ caps(PeerT, ets:select(IdentT, [{{{K, B}, '$1', '$2'},
+ [{'==', '$1', {const, Alias}}],
+ ['$2']}]))
+ catch
+ error:_ ->
[]
- end.
+ end;
-%% Place a peer whose Destination-Host/Realm matches those of the
-%% request at the front of the result list. Could add some sort of
-%% 'sort' option to allow more control.
-
-ps([], _, _, {Ys, Ns}) ->
- lists:reverse(Ys, Ns);
-ps([{_TPid, #diameter_caps{} = Caps} = TC | Rest], RH, Filter, Acc) ->
- ps(Rest, RH, Filter, pacc(caps_filter(Caps, RH, Filter),
- caps_filter(Caps, RH, {all, [host, realm]}),
- TC,
- Acc)).
-
-pacc(true, true, Peer, {Ts, Fs}) ->
- {[Peer|Ts], Fs};
-pacc(true, false, Peer, {Ts, Fs}) ->
- {Ts, [Peer|Fs]};
-pacc(_, _, _, Acc) ->
- Acc.
+filter(Alias, RH, {all, Filters}, T, B)
+ when is_list(Filters) ->
+ fltr_all(Alias, RH, Filters, T, B);
+
+filter(Alias, RH, {first, Filters}, T, B)
+ when is_list(Filters) ->
+ fltr_first(Alias, RH, Filters, T, B);
+
+filter(Alias, RH, Filter, T, B) ->
+ {Ts, Fs} = filter(all_peers(Alias, T), RH, Filter),
+ choose(B, Ts, Fs).
+
+%% fltr_all/5
+
+fltr_all(Alias, RH, [{K, any} | Filters], T, B)
+ when K == host;
+ K == realm ->
+ fltr_all(Alias, RH, Filters, T, B);
+
+fltr_all(Alias, RH, [{host, _} = H, {realm, _} = R | Filters], T, B) ->
+ fltr_all(Alias, RH, [R, H | Filters], T, B);
+
+fltr_all(Alias, RH, [{realm, _} = R, {host, any} | Filters], T, B) ->
+ fltr_all(Alias, RH, [R | Filters], T, B);
+
+fltr_all(Alias, RH, [{realm, OR}, {host, OH} | Filters], T, true) ->
+ {PeerT, _AppT, IdentT} = T,
+ try {iolist_to_binary(OR), iolist_to_binary(OH)} of
+ BT ->
+ Peers = caps(PeerT,
+ ets:select(IdentT, [{{BT, '$1', '$2'},
+ [{'==', '$1', {const, Alias}}],
+ ['$2']}])),
+ {Ts, _} = filter(Peers, RH, {all, Filters}),
+ Ts
+ catch
+ error:_ ->
+ []
+ end;
-%% caps_filter/3
+fltr_all(Alias, [undefined,_] = RH, [realm | Filters], T, B) ->
+ fltr_all(Alias, RH, Filters, T, B);
+
+fltr_all(Alias, [DR,_] = RH, [realm | Filters], T, B) ->
+ fltr_all(Alias, RH, [{realm, DR} | Filters], T, B);
+
+fltr_all(Alias, [_,undefined] = RH, [host | Filters], T, B) ->
+ fltr_all(Alias, RH, Filters, T, B);
+
+fltr_all(Alias, [_,DH] = RH, [host | Filters], T, B) ->
+ fltr_all(Alias, RH, [{host, DH} | Filters], T, B);
+
+fltr_all(Alias, RH, [{K, _} = KT, KA | Filters], T, B)
+ when K == host, KA == realm;
+ K == realm, KA == host ->
+ fltr_all(Alias, RH, [KA, KT | Filters], T, B);
+
+fltr_all(Alias, RH, [F | Filters], T, B) ->
+ {Ts, Fs} = filter(filter(Alias, RH, F, T, B), RH, {all, Filters}),
+ choose(B, Ts, Fs);
-caps_filter(C, RH, {neg, F}) ->
- not caps_filter(C, RH, F);
+fltr_all(Alias, RH, [], T, B) ->
+ filter(Alias, RH, none, T, B).
-caps_filter(C, RH, {all, L})
+%% fltr_first/5
+%%
+%% Like any, but stop at the first filter with any matches.
+
+fltr_first(Alias, RH, [F | Filters], T, B) ->
+ case filter(Alias, RH, F, T, B) of
+ [] ->
+ fltr_first(Alias, RH, Filters, T, B);
+ [_|_] = Ts ->
+ Ts
+ end;
+
+fltr_first(Alias, RH, [], T, B) ->
+ filter(Alias, RH, none, T, not B).
+
+%% all_peers/2
+
+all_peers(Alias, {PeerT, AppT, _}) ->
+ ets:select(PeerT, [{#peer{pid = P, caps = '$1', _ = '_'},
+ [],
+ [{{P, '$1'}}]}
+ || {_,P} <- ets:lookup(AppT, Alias)]).
+
+%% caps/2
+
+caps(PeerT, Pids) ->
+ ets:select(PeerT, [{#peer{pid = P, caps = '$1', _ = '_'},
+ [],
+ [{{P, '$1'}}]}
+ || P <- Pids]).
+
+%% filter/3
+%%
+%% Return peers in match order.
+
+filter(Peers, _, none) ->
+ {Peers, []};
+
+filter(Peers, RH, {neg, F}) ->
+ {Ts, Fs} = filter(Peers, RH, F),
+ {Fs, Ts};
+
+filter(Peers, RH, {all, L})
+ when is_list(L) ->
+ lists:foldl(fun(F,A) -> fltr_all(F, A, RH) end,
+ {Peers, []},
+ L);
+
+filter(Peers, RH, {any, L})
when is_list(L) ->
- lists:all(fun(F) -> caps_filter(C, RH, F) end, L);
+ lists:foldl(fun(F,A) -> fltr_any(F, A, RH) end,
+ {[], Peers},
+ L);
-caps_filter(C, RH, {any, L})
+filter(Peers, RH, {first, L})
when is_list(L) ->
- lists:any(fun(F) -> caps_filter(C, RH, F) end, L);
+ fltr_first(Peers, RH, L);
+
+filter(Peers, RH, F) ->
+ lists:partition(fun({_,C}) -> caps_filter(C, RH, F) end, Peers).
+
+%% fltr_all/3
+
+fltr_all(F, {Ts0, Fs0}, RH) ->
+ {Ts1, Fs1} = filter(Ts0, RH, F),
+ {Ts1, Fs0 ++ Fs1}.
+
+%% fltr_any/3
+
+fltr_any(F, {Ts0, Fs0}, RH) ->
+ {Ts1, Fs1} = filter(Fs0, RH, F),
+ {Ts0 ++ Ts1, Fs1}.
+
+%% fltr_first/3
+
+fltr_first(Peers, _, []) ->
+ {[], Peers};
+
+fltr_first(Peers, RH, [F | Filters]) ->
+ case filter(Peers, RH, F) of
+ {[], _} ->
+ fltr_first(Peers, RH, Filters);
+ {_, _} = T ->
+ T
+ end.
+
+%% caps_filter/3
caps_filter(#diameter_caps{origin_host = {_,OH}}, [_,DH], host) ->
eq(undefined, DH, OH);
@@ -1477,9 +1832,6 @@ caps_filter(C, _, Filter) ->
%% caps_filter/2
-caps_filter(_, none) ->
- true;
-
caps_filter(#diameter_caps{origin_host = {_,OH}}, {host, H}) ->
eq(any, H, OH);
@@ -1510,8 +1862,8 @@ eq(Any, Id, PeerId) ->
transports(#state{watchdogT = WatchdogT}) ->
ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'},
- [{'is_pid', '$1'}],
- ['$1']}]).
+ [{'is_pid', '$1'}],
+ ['$1']}]).
%% ---------------------------------------------------------------------------
%% # service_info/2
@@ -1542,7 +1894,8 @@ transports(#state{watchdogT = WatchdogT}) ->
-define(OTHER_INFO, [connections,
name,
peers,
- statistics]).
+ statistics,
+ info]).
service_info(Item, S)
when is_atom(Item) ->
@@ -1563,7 +1916,7 @@ tagged_info(Item, S)
undefined
end;
-tagged_info(TPid, #state{watchdogT = WatchdogT, peerT = PeerT})
+tagged_info(TPid, #state{watchdogT = WatchdogT, local = {PeerT, _, _}})
when is_pid(TPid) ->
try
[#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid),
@@ -1632,6 +1985,7 @@ complete_info(Item, #state{service = Svc} = S) ->
keys -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO;
all -> service_info(?ALL_INFO, S);
statistics -> info_stats(S);
+ info -> info_info(S);
connections -> info_connections(S);
peers -> info_peers(S)
end.
@@ -1678,31 +2032,43 @@ info_transport(S) ->
[],
PeerD).
-%% Only a config entry for a listening transport: use it.
-transport([[{type, listen}, _] = L]) ->
- L ++ [{accept, []}];
-
-%% Only one config or peer entry for a connecting transport: use it.
-transport([[{type, connect} | _] = L]) ->
- L;
+%% Single config entry. Distinguish between pool_size config or not on
+%% a connecting transport for backwards compatibility: with the option
+%% the form is similar to the listening case, with connections grouped
+%% in a pool tuple (for lack of a better name), without as before.
+transport([[{type, Type}, {options, Opts}] = L])
+ when Type == listen;
+ Type == connect ->
+ L ++ [{K, []} || [{_,K}] <- [keys(Type, Opts)]];
%% Peer entries: discard config. Note that the peer entries have
%% length at least 3.
transport([[_,_] | L]) ->
transport(L);
-%% Possibly many peer entries for a listening transport. Note that all
-%% have the same options by construction, which is not terribly space
-%% efficient.
-transport([[{type, accept}, {options, Opts} | _] | _] = Ls) ->
- [{type, listen},
+%% Multiple tranports. Note that all have the same options by
+%% construction, which is not terribly space efficient.
+transport([[{type, Type}, {options, Opts} | _] | _] = Ls) ->
+ transport(keys(Type, Opts), Ls).
+
+%% Group transports in an accept or pool tuple ...
+transport([{Type, Key}], [[{type, _}, {options, Opts} | _] | _] = Ls) ->
+ [{type, Type},
{options, Opts},
- {accept, [lists:nthtail(2,L) || L <- Ls]}].
+ {Key, [tl(tl(L)) || L <- Ls]}];
+
+%% ... or not: there can only be one.
+transport([], [L]) ->
+ L.
-peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) ->
+keys(connect = T, Opts) ->
+ [{T, pool} || lists:keymember(pool_size, 1, Opts)];
+keys(_, _) ->
+ [{listen, accept}].
+
+peer_dict(#state{watchdogT = WatchdogT, local = {PeerT, _, _}}, Dict0) ->
try ets:tab2list(WatchdogT) of
- L ->
- lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L)
+ L -> lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L)
catch
error: badarg -> Dict0 %% service has gone down
end.
@@ -1714,12 +2080,11 @@ peer_acc(PeerT, Acc, #watchdog{pid = Pid,
state = WS,
started = At,
peer = TPid}) ->
- dict:append(Ref,
- [{type, Type},
- {options, Opts},
- {watchdog, {Pid, At, WS}}
- | info_peer(PeerT, TPid, WS)],
- Acc).
+ Info = [{type, Type},
+ {options, Opts},
+ {watchdog, {Pid, At, WS}}
+ | info_peer(PeerT, TPid, WS)],
+ dict:append(Ref, Info ++ [{info, info_process_info(Info)}], Acc).
info_peer(PeerT, TPid, WS)
when is_pid(TPid), WS /= ?WD_DOWN ->
@@ -1731,6 +2096,49 @@ info_peer(PeerT, TPid, WS)
info_peer(_, _, _) ->
[].
+info_process_info(Info) ->
+ lists:flatmap(fun ipi/1, Info).
+
+ipi({watchdog, {Pid, _, _}}) ->
+ info_pid(Pid);
+
+ipi({peer, {Pid, _}}) ->
+ info_pid(Pid);
+
+ipi({port, [{owner, Pid} | _]}) ->
+ info_pid(Pid);
+
+ipi(_) ->
+ [].
+
+info_pid(Pid) ->
+ case process_info(Pid, [message_queue_len, memory, binary]) of
+ undefined ->
+ [];
+ L ->
+ [{Pid, lists:map(fun({K,V}) -> {K, map_info(K,V)} end, L)}]
+ end.
+
+%% The binary list consists of 3-tuples {Ptr, Size, Count}, where Ptr
+%% is a C pointer value, Size is the size of a referenced binary in
+%% bytes, and Count is a global reference count. The same Ptr can
+%% occur multiple times, once for each reference on the process heap.
+%% In this case, the corresponding tuples will have Size in common but
+%% Count may differ just because no global lock is taken when the
+%% value is retrieved.
+%%
+%% The list can be quite large, and we aren't often interested in the
+%% pointers or counts, so whittle this down to the number of binaries
+%% referenced and their total byte count.
+map_info(binary, L) ->
+ SzD = lists:foldl(fun({P,S,_}, D) -> dict:store(P,S,D) end,
+ dict:new(),
+ L),
+ {dict:size(SzD), dict:fold(fun(_,S,N) -> S + N end, 0, SzD)};
+
+map_info(_, T) ->
+ T.
+
%% The point of extracting the config here is so that 'transport' info
%% has one entry for each transport ref, the peer table only
%% containing entries that have a living watchdog.
@@ -1788,6 +2196,13 @@ mk_app(#diameter_app{} = A) ->
info_pending(#state{} = S) ->
diameter_traffic:pending(transports(S)).
+%% info_info/1
+%%
+%% Extract process_info from connections info.
+
+info_info(S) ->
+ [I || L <- conn_list(S), {info, I} <- L].
+
%% info_connections/1
%%
%% One entry per transport connection. Statistics for each entry are