aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_service.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base/diameter_service.erl')
-rw-r--r--lib/diameter/src/base/diameter_service.erl2903
1 files changed, 2903 insertions, 0 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
new file mode 100644
index 0000000000..421e36ccf5
--- /dev/null
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -0,0 +1,2903 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% Implements the process that represents a service.
+%%
+
+-module(diameter_service).
+-behaviour(gen_server).
+
+-export([start/1,
+ stop/1,
+ start_transport/2,
+ stop_transport/2,
+ info/2,
+ call/4]).
+
+%% towards diameter_watchdog
+-export([receive_message/3]).
+
+%% service supervisor
+-export([start_link/1]).
+
+-export([subscribe/1,
+ unsubscribe/1,
+ subscriptions/1,
+ subscriptions/0,
+ services/0,
+ services/1,
+ whois/1,
+ flush_stats/1]).
+
+%% test/debug
+-export([call_module/3,
+ state/1,
+ uptime/1]).
+
+%%% gen_server callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+
+%% Other callbacks.
+-export([send/1]).
+
+-include_lib("diameter/include/diameter.hrl").
+-include("diameter_internal.hrl").
+-include("diameter_types.hrl").
+
+-define(STATE_UP, up).
+-define(STATE_DOWN, down).
+
+-define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1
+-define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests
+-define(RESTART_TC, 1000). %% if restart was this recent
+
+%% Used to be able to swap this with anything else dict-like but now
+%% rely on the fact that a service's #state{} record does not change
+%% in storing in it ?STATE table and not always going through the
+%% service process. In particular, rely on the fact that operations on
+%% a ?Dict don't change the handle to it.
+-define(Dict, diameter_dict).
+
+%% Table containing outgoing requests for which a reply has yet to be
+%% received.
+-define(REQUEST_TABLE, diameter_request).
+
+%% Maintains state in a table. In contrast to previously, a service's
+%% stat is not constant and is accessed outside of the service
+%% process.
+-define(STATE_TABLE, ?MODULE).
+
+%% Workaround for dialyzer's lack of understanding of match specs.
+-type match(T)
+ :: T | '_' | '$1' | '$2' | '$3' | '$4'.
+
+%% State of service gen_server.
+-record(state,
+ {id = now(),
+ service_name, %% as passed to start_service/2, key in ?STATE_TABLE
+ service :: #diameter_service{},
+ peerT = ets_new(peers) :: ets:tid(), %% #peer{} at start_fsm
+ connT = ets_new(conns) :: ets:tid(), %% #conn{} at connection_up
+ share_peers = false :: boolean(), %% broadcast peers to remote nodes?
+ use_shared_peers = false :: boolean(), %% use broadcasted peers?
+ shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
+ local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
+ monitor = false :: false | pid()}). %% process to die with
+%% shared_peers reflects the peers broadcast from remote nodes. Note
+%% that the state term itself doesn't change, which is relevant for
+%% the stateless application callbacks since the state is retrieved
+%% from ?STATE_TABLE from outside the service process. The pid in the
+%% service record is used to determine whether or not we need to call
+%% the process for a pick_peer callback.
+
+%% Record representing a watchdog process.
+-record(peer,
+ {pid :: match(pid()),
+ type :: match(connect | accept),
+ ref :: match(reference()), %% key into diameter_config
+ options :: match([transport_opt()]), %% as passed to start_transport
+ op_state = ?STATE_DOWN :: match(?STATE_DOWN | ?STATE_UP),
+ started = now(), %% at process start
+ conn = false :: match(boolean() | pid())}).
+ %% true at accept, pid() at connection_up (connT key)
+
+%% Record representing a peer_fsm process.
+-record(conn,
+ {pid :: pid(),
+ apps :: [{0..16#FFFFFFFF, app_alias()}], %% {Id, Alias}
+ caps :: #diameter_caps{},
+ started = now(), %% at process start
+ peer :: pid()}). %% key into peerT
+
+%% Record stored in diameter_request for each outgoing request.
+-record(request,
+ {from, %% arg 2 of handle_call/3
+ handler :: match(pid()), %% request process
+ transport :: match(pid()), %% peer process
+ caps :: match(#diameter_caps{}),
+ app :: match(app_alias()), %% #diameter_app.alias
+ dictionary :: match(module()), %% #diameter_app.dictionary
+ module :: match(nonempty_improper_list(module(), list())),
+ %% #diameter_app.module
+ filter :: match(peer_filter()),
+ packet :: match(#diameter_packet{})}).
+
+%% Record call/4 options are parsed into.
+-record(options,
+ {filter = none :: peer_filter(),
+ extra = [] :: list(),
+ timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF,
+ detach = false :: boolean()}).
+
+%% Since RFC 3588 requires that a Diameter agent not modify End-to-End
+%% Identifiers, the possibility of explicitly setting an End-to-End
+%% Identifier would be needed to be able to implement an agent in
+%% which one side of the communication is not implemented on top of
+%% diameter. For example, Diameter being sent or received encapsulated
+%% in some other protocol, or even another Diameter stack in a
+%% non-Erlang environment. (Not that this is likely to be a normal
+%% case.)
+%%
+%% The implemented solution is not an option but to respect any header
+%% values set in a diameter_header record returned from a
+%% prepare_request callback. A call to diameter:call/4 can communicate
+%% values to the callback using the 'extra' option if so desired.
+
+%%% ---------------------------------------------------------------------------
+%%% # start(SvcName)
+%%% ---------------------------------------------------------------------------
+
+start(SvcName) ->
+ diameter_service_sup:start_child(SvcName).
+
+start_link(SvcName) ->
+ Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}],
+ gen_server:start_link(?MODULE, [SvcName], Options).
+%% Put the arbitrary term SvcName in a list in case we ever want to
+%% send more than this and need to distinguish old from new.
+
+%%% ---------------------------------------------------------------------------
+%%% # stop(SvcName)
+%%% ---------------------------------------------------------------------------
+
+stop(SvcName) ->
+ case whois(SvcName) of
+ undefined ->
+ {error, not_started};
+ Pid ->
+ stop(call_service(Pid, stop), Pid)
+ end.
+
+stop(ok, Pid) ->
+ MRef = erlang:monitor(process, Pid),
+ receive {'DOWN', MRef, process, _, _} -> ok end;
+stop(No, _) ->
+ No.
+
+%%% ---------------------------------------------------------------------------
+%%% # start_transport(SvcName, {Ref, Type, Opts})
+%%% ---------------------------------------------------------------------------
+
+start_transport(SvcName, {_,_,_} = T) ->
+ call_service_by_name(SvcName, {start, T}).
+
+%%% ---------------------------------------------------------------------------
+%%% # stop_transport(SvcName, Refs)
+%%% ---------------------------------------------------------------------------
+
+stop_transport(_, []) ->
+ ok;
+stop_transport(SvcName, [_|_] = Refs) ->
+ call_service_by_name(SvcName, {stop, Refs}).
+
+%%% ---------------------------------------------------------------------------
+%%% # info(SvcName, Item)
+%%% ---------------------------------------------------------------------------
+
+info(SvcName, Item) ->
+ info_rc(call_service_by_name(SvcName, {info, Item})).
+
+info_rc({error, _}) ->
+ undefined;
+info_rc(Info) ->
+ Info.
+
+%%% ---------------------------------------------------------------------------
+%%% # receive_message(TPid, Pkt, MessageData)
+%%% ---------------------------------------------------------------------------
+
+%% Handle an incoming message in the watchdog process. This used to
+%% come through the service process but this avoids that becoming a
+%% bottleneck.
+
+receive_message(TPid, Pkt, T)
+ when is_pid(TPid) ->
+ #diameter_packet{header = #diameter_header{is_request = R}} = Pkt,
+ recv(R, (not R) andalso lookup_request(Pkt, TPid), TPid, Pkt, T).
+
+%% Incoming request ...
+recv(true, false, TPid, Pkt, T) ->
+ try
+ spawn(fun() -> recv_request(TPid, Pkt, T) end)
+ catch
+ error: system_limit = E -> %% discard
+ ?LOG({error, E}, now())
+ end;
+
+%% ... answer to known request ...
+recv(false, #request{from = {_, Ref}, handler = Pid} = Req, _, Pkt, _) ->
+ Pid ! {answer, Ref, Req, Pkt};
+%% Note that failover could have happened prior to this message being
+%% received and triggering failback. That is, both a failover message
+%% and answer may be on their way to the handler process. In the worst
+%% case the request process gets notification of the failover and
+%% sends to the alternate peer before an answer arrives, so it's
+%% always the case that we can receive more than one answer after
+%% failover. The first answer received by the request process wins,
+%% any others are discarded.
+
+%% ... or not.
+recv(false, false, _, _, _) ->
+ ok.
+
+%%% ---------------------------------------------------------------------------
+%%% # call(SvcName, App, Msg, Options)
+%%% ---------------------------------------------------------------------------
+
+call(SvcName, App, Msg, Options)
+ when is_list(Options) ->
+ Rec = make_options(Options),
+ Ref = make_ref(),
+ Caller = {self(), Ref},
+ Fun = fun() -> exit({Ref, call(SvcName, App, Msg, Rec, Caller)}) end,
+ try spawn_monitor(Fun) of
+ {_, MRef} ->
+ recv(MRef, Ref, Rec#options.detach, false)
+ catch
+ error: system_limit = E ->
+ {error, E}
+ end.
+
+%% Don't rely on gen_server:call/3 for the timeout handling since it
+%% makes no guarantees about not leaving a reply message in the
+%% mailbox if we catch its exit at timeout. It currently *can* do so,
+%% which is also undocumented.
+
+recv(MRef, _, true, true) ->
+ erlang:demonitor(MRef, [flush]),
+ ok;
+
+recv(MRef, Ref, Detach, Sent) ->
+ receive
+ Ref -> %% send has been attempted
+ recv(MRef, Ref, Detach, true);
+ {'DOWN', MRef, process, _, Reason} ->
+ call_rc(Reason, Ref, Sent)
+ end.
+
+%% call/5 has returned ...
+call_rc({Ref, Ans}, Ref, _) ->
+ Ans;
+
+%% ... or not. In this case failure/encode are documented.
+call_rc(_, _, Sent) ->
+ {error, choose(Sent, failure, encode)}.
+
+%% call/5
+%%
+%% In the process spawned for the outgoing request.
+
+call(SvcName, App, Msg, Opts, Caller) ->
+ c(ets:lookup(?STATE_TABLE, SvcName), App, Msg, Opts, Caller).
+
+c([#state{service_name = SvcName} = S], App, Msg, Opts, Caller) ->
+ case find_transport(App, Msg, Opts, S) of
+ {_,_,_} = T ->
+ send_request(T, Msg, Opts, Caller, SvcName);
+ false ->
+ {error, no_connection};
+ {error, _} = No ->
+ No
+ end;
+
+c([], _, _, _, _) ->
+ {error, no_service}.
+
+%% make_options/1
+
+make_options(Options) ->
+ lists:foldl(fun mo/2, #options{}, Options).
+
+mo({timeout, T}, Rec)
+ when is_integer(T), 0 =< T ->
+ Rec#options{timeout = T};
+
+mo({filter, F}, #options{filter = none} = Rec) ->
+ Rec#options{filter = F};
+mo({filter, F}, #options{filter = {all, Fs}} = Rec) ->
+ Rec#options{filter = {all, [F | Fs]}};
+mo({filter, F}, #options{filter = F0} = Rec) ->
+ Rec#options{filter = {all, [F0, F]}};
+
+mo({extra, L}, #options{extra = X} = Rec)
+ when is_list(L) ->
+ Rec#options{extra = X ++ L};
+
+mo(detach, Rec) ->
+ Rec#options{detach = true};
+
+mo(T, _) ->
+ ?ERROR({invalid_option, T}).
+
+%%% ---------------------------------------------------------------------------
+%%% # subscribe(SvcName)
+%%% # unsubscribe(SvcName)
+%%% ---------------------------------------------------------------------------
+
+subscribe(SvcName) ->
+ diameter_reg:add({?MODULE, subscriber, SvcName}).
+
+unsubscribe(SvcName) ->
+ diameter_reg:del({?MODULE, subscriber, SvcName}).
+
+subscriptions(Pat) ->
+ pmap(diameter_reg:match({?MODULE, subscriber, Pat})).
+
+subscriptions() ->
+ subscriptions('_').
+
+pmap(Props) ->
+ lists:map(fun({{?MODULE, _, Name}, Pid}) -> {Name, Pid} end, Props).
+
+%%% ---------------------------------------------------------------------------
+%%% # services(Pattern)
+%%% ---------------------------------------------------------------------------
+
+services(Pat) ->
+ pmap(diameter_reg:match({?MODULE, service, Pat})).
+
+services() ->
+ services('_').
+
+whois(SvcName) ->
+ case diameter_reg:match({?MODULE, service, SvcName}) of
+ [{_, Pid}] ->
+ Pid;
+ [] ->
+ undefined
+ end.
+
+%%% ---------------------------------------------------------------------------
+%%% # flush_stats/1
+%%%
+%%% Output: list of {{SvcName, Alias, Counter}, Value}
+%%% ---------------------------------------------------------------------------
+
+flush_stats(TPid) ->
+ diameter_stats:flush(TPid).
+
+%% ===========================================================================
+%% ===========================================================================
+
+state(Svc) ->
+ call_service(Svc, state).
+
+uptime(Svc) ->
+ call_service(Svc, uptime).
+
+%% call_module/3
+
+call_module(Service, AppMod, Request) ->
+ call_service(Service, {call_module, AppMod, Request}).
+
+%%% ---------------------------------------------------------------------------
+%%% # init([SvcName])
+%%% ---------------------------------------------------------------------------
+
+init([SvcName]) ->
+ process_flag(trap_exit, true), %% ensure terminate(shutdown, _)
+ i(SvcName, diameter_reg:add_new({?MODULE, service, SvcName})).
+
+i(SvcName, true) ->
+ {ok, i(SvcName)};
+i(_, false) ->
+ {stop, {shutdown, already_started}}.
+
+%%% ---------------------------------------------------------------------------
+%%% # handle_call(Req, From, State)
+%%% ---------------------------------------------------------------------------
+
+handle_call(state, _, S) ->
+ {reply, S, S};
+
+handle_call(uptime, _, #state{id = T} = S) ->
+ {reply, diameter_lib:now_diff(T), S};
+
+%% Start a transport.
+handle_call({start, {Ref, Type, Opts}}, _From, S) ->
+ {reply, start(Ref, {Type, Opts}, S), S};
+
+%% Stop transports.
+handle_call({stop, Refs}, _From, S) ->
+ shutdown(Refs, S),
+ {reply, ok, S};
+
+%% pick_peer with mutable state
+handle_call({pick_peer, Local, Remote, App}, _From, S) ->
+ #diameter_app{mutable = true} = App, %% assert
+ {reply, pick_peer(Local, Remote, self(), S#state.service_name, App), S};
+
+handle_call({call_module, AppMod, Req}, From, S) ->
+ call_module(AppMod, Req, From, S);
+
+handle_call({info, Item}, _From, S) ->
+ {reply, service_info(Item, S), S};
+
+handle_call(stop, _From, S) ->
+ shutdown(S),
+ {stop, normal, ok, S};
+%% The server currently isn't guaranteed to be dead when the caller
+%% gets the reply. We deal with this in the call to the server,
+%% stating a monitor that waits for DOWN before returning.
+
+handle_call(Req, From, S) ->
+ unexpected(handle_call, [Req, From], S),
+ {reply, nok, S}.
+
+%%% ---------------------------------------------------------------------------
+%%% # handle_cast(Req, State)
+%%% ---------------------------------------------------------------------------
+
+handle_cast(Req, S) ->
+ unexpected(handle_cast, [Req], S),
+ {noreply, S}.
+
+%%% ---------------------------------------------------------------------------
+%%% # handle_info(Req, State)
+%%% ---------------------------------------------------------------------------
+
+handle_info(T,S) ->
+ case transition(T,S) of
+ ok ->
+ {noreply, S};
+ #state{} = NS ->
+ {noreply, NS};
+ {stop, Reason} ->
+ {stop, {shutdown, Reason}, S}
+ end.
+
+%% transition/2
+
+%% Peer process is telling us to start a new accept process.
+transition({accepted, Pid, TPid}, S) ->
+ accepted(Pid, TPid, S),
+ ok;
+
+%% Peer process has a new open connection.
+transition({connection_up, Pid, T}, S) ->
+ connection_up(Pid, T, S);
+
+%% Peer process has left state open.
+transition({connection_down, Pid}, S) ->
+ connection_down(Pid, S);
+
+%% Peer process has returned to state open.
+transition({connection_up, Pid}, S) ->
+ connection_up(Pid, S);
+
+%% Accepting transport has lost connectivity.
+transition({close, Pid}, S) ->
+ close(Pid, S),
+ ok;
+
+%% Connecting transport is being restarted by watchdog.
+transition({reconnect, Pid}, S) ->
+ reconnect(Pid, S),
+ ok;
+
+%% Monitor process has died. Just die with a reason that tells
+%% diameter_config about the happening. If a cleaner shutdown is
+%% required then someone should stop us.
+transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) ->
+ {stop, {monitor, Reason}};
+
+%% Local peer process has died.
+transition({'DOWN', _, process, Pid, Reason}, S)
+ when node(Pid) == node() ->
+ peer_down(Pid, Reason, S);
+
+%% Remote service wants to know about shared transports.
+transition({service, Pid}, S) ->
+ share_peers(Pid, S),
+ ok;
+
+%% Remote service is communicating a shared peer.
+transition({peer, TPid, Aliases, Caps}, S) ->
+ remote_peer_up(TPid, Aliases, Caps, S);
+
+%% Remote peer process has died.
+transition({'DOWN', _, process, TPid, _}, S) ->
+ remote_peer_down(TPid, S);
+
+%% Restart after tc expiry.
+transition({tc_timeout, T}, S) ->
+ tc_timeout(T, S),
+ ok;
+
+%% Request process is telling us it may have missed a failover message
+%% after a transport went down and the service process looked up
+%% outstanding requests.
+transition({failover, TRef, Seqs}, S) ->
+ failover(TRef, Seqs, S),
+ ok;
+
+transition(Req, S) ->
+ unexpected(handle_info, [Req], S),
+ ok.
+
+%%% ---------------------------------------------------------------------------
+%%% # terminate(Reason, State)
+%%% ---------------------------------------------------------------------------
+
+terminate(Reason, #state{service_name = Name} = S) ->
+ ets:delete(?STATE_TABLE, Name),
+ shutdown == Reason %% application shutdown
+ andalso shutdown(S).
+
+%%% ---------------------------------------------------------------------------
+%%% # code_change(FromVsn, State, Extra)
+%%% ---------------------------------------------------------------------------
+
+code_change(FromVsn,
+ #state{service_name = SvcName,
+ service = #diameter_service{applications = Apps}}
+ = S,
+ Extra) ->
+ lists:foreach(fun(A) ->
+ code_change(FromVsn, SvcName, Extra, A)
+ end,
+ Apps),
+ {ok, S}.
+
+code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) ->
+ {ok, S} = cb(A, code_change, [FromVsn,
+ mod_state(Alias),
+ Extra,
+ SvcName]),
+ mod_state(Alias, S).
+
+%% ===========================================================================
+%% ===========================================================================
+
+unexpected(F, A, #state{service_name = Name}) ->
+ ?UNEXPECTED(F, A ++ [Name]).
+
+cb([_|_] = M, F, A) ->
+ eval(M, F, A);
+cb(Rec, F, A) ->
+ {_, M} = app(Rec),
+ eval(M, F, A).
+
+app(#request{app = A, module = M}) ->
+ {A,M};
+app(#diameter_app{alias = A, module = M}) ->
+ {A,M}.
+
+eval([M|X], F, A) ->
+ apply(M, F, A ++ X).
+
+%% Callback with state.
+
+state_cb(#diameter_app{mutable = false, init_state = S}, {ModX, F, A}) ->
+ eval(ModX, F, A ++ [S]);
+
+state_cb(#diameter_app{mutable = true, alias = Alias}, {_,_,_} = MFA) ->
+ state_cb(MFA, Alias);
+
+state_cb({ModX,F,A}, Alias)
+ when is_list(ModX) ->
+ eval(ModX, F, A ++ [mod_state(Alias)]).
+
+choose(true, X, _) -> X;
+choose(false, _, X) -> X.
+
+ets_new(Tbl) ->
+ ets:new(Tbl, [{keypos, 2}]).
+
+insert(Tbl, Rec) ->
+ ets:insert(Tbl, Rec),
+ Rec.
+
+monitor(Pid) ->
+ erlang:monitor(process, Pid),
+ Pid.
+
+%% Using the process dictionary for the callback state was initially
+%% just a way to make what was horrendous trace (big state record and
+%% much else everywhere) somewhat more readable. There's not as much
+%% need for it now but it's no worse (except possibly that we don't
+%% see the table identifier being passed around) than an ets table so
+%% keep it.
+
+mod_state(Alias) ->
+ get({?MODULE, mod_state, Alias}).
+
+mod_state(Alias, ModS) ->
+ put({?MODULE, mod_state, Alias}, ModS).
+
+%% have_transport/2
+
+have_transport(SvcName, Ref) ->
+ [] /= diameter_config:have_transport(SvcName, Ref).
+
+%%% ---------------------------------------------------------------------------
+%%% # shutdown/2
+%%% ---------------------------------------------------------------------------
+
+shutdown(Refs, #state{peerT = PeerT}) ->
+ ets:foldl(fun(P,ok) -> s(P, Refs), ok end, ok, PeerT).
+
+s(#peer{ref = Ref, pid = Pid}, Refs) ->
+ s(lists:member(Ref, Refs), Pid);
+
+s(true, Pid) ->
+ Pid ! {shutdown, self()}; %% 'DOWN' will cleanup as usual
+s(false, _) ->
+ ok.
+
+%%% ---------------------------------------------------------------------------
+%%% # shutdown/1
+%%% ---------------------------------------------------------------------------
+
+shutdown(#state{peerT = PeerT}) ->
+ %% A transport might not be alive to receive the shutdown request
+ %% but give those that are a chance to shutdown gracefully.
+ wait(fun st/2, PeerT),
+ %% Kill the watchdogs explicitly in case there was no transport.
+ wait(fun sw/2, PeerT).
+
+wait(Fun, T) ->
+ diameter_lib:wait(ets:foldl(Fun, [], T)).
+
+st(#peer{conn = B}, Acc)
+ when is_boolean(B) ->
+ Acc;
+st(#peer{conn = Pid}, Acc) ->
+ Pid ! shutdown,
+ [Pid | Acc].
+
+sw(#peer{pid = Pid}, Acc) ->
+ exit(Pid, shutdown),
+ [Pid | Acc].
+
+%%% ---------------------------------------------------------------------------
+%%% # call_service/2
+%%% ---------------------------------------------------------------------------
+
+call_service(Pid, Req)
+ when is_pid(Pid) ->
+ cs(Pid, Req);
+call_service(SvcName, Req) ->
+ call_service_by_name(SvcName, Req).
+
+call_service_by_name(SvcName, Req) ->
+ cs(whois(SvcName), Req).
+
+cs(Pid, Req)
+ when is_pid(Pid) ->
+ try
+ gen_server:call(Pid, Req, infinity)
+ catch
+ E: Reason when E == exit ->
+ {error, {E, Reason}}
+ end;
+
+cs(undefined, _) ->
+ {error, no_service}.
+
+%%% ---------------------------------------------------------------------------
+%%% # i/1
+%%%
+%%% Output: #state{}
+%%% ---------------------------------------------------------------------------
+
+%% Intialize the state of a service gen_server.
+
+i(SvcName) ->
+ %% Split the config into a server state and a list of transports.
+ {#state{} = S, CL} = lists:foldl(fun cfg_acc/2,
+ {false, []},
+ diameter_config:lookup(SvcName)),
+
+ %% Publish the state in order to be able to access it outside of
+ %% the service process. Originally table identifiers were only
+ %% known to the service process but we now want to provide the
+ %% option of application callbacks being 'stateless' in order to
+ %% avoid having to go through a common process. (Eg. An agent that
+ %% sends a request for every incoming request.)
+ true = ets:insert_new(?STATE_TABLE, S),
+
+ %% Start fsms for each transport.
+ lists:foreach(fun(T) -> start_fsm(T,S) end, CL),
+
+ init_shared(S),
+ S.
+
+cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts},
+ {false, Acc}) ->
+ lists:foreach(fun init_mod/1, Apps),
+ S = #state{service_name = SvcName,
+ service = Rec#diameter_service{pid = self()},
+ share_peers = get_value(share_peers, Opts),
+ use_shared_peers = get_value(use_shared_peers, Opts),
+ monitor = mref(get_value(monitor, Opts))},
+ {S, Acc};
+
+cfg_acc({_Ref, Type, _Opts} = T, {S, Acc})
+ when Type == connect;
+ Type == listen ->
+ {S, [T | Acc]}.
+
+mref(false = No) ->
+ No;
+mref(P) ->
+ erlang:monitor(process, P).
+
+init_shared(#state{use_shared_peers = true,
+ service_name = Svc}) ->
+ diameter_peer:notify(Svc, {service, self()});
+init_shared(#state{use_shared_peers = false}) ->
+ ok.
+
+init_mod(#diameter_app{alias = Alias,
+ init_state = S}) ->
+ mod_state(Alias, S).
+
+start_fsm({Ref, Type, Opts}, S) ->
+ start(Ref, {Type, Opts}, S).
+
+get_value(Key, Vs) ->
+ {_, V} = lists:keyfind(Key, 1, Vs),
+ V.
+
+%%% ---------------------------------------------------------------------------
+%%% # start/3
+%%% ---------------------------------------------------------------------------
+
+%% If the initial start/3 at service/transport start succeeds then
+%% subsequent calls to start/4 on the same service will also succeed
+%% since they involve the same call to merge_service/2. We merge here
+%% rather than earlier since the service may not yet be configured
+%% when the transport is configured.
+
+start(Ref, {T, Opts}, S)
+ when T == connect;
+ T == listen ->
+ try
+ {ok, start(Ref, type(T), Opts, S)}
+ catch
+ ?FAILURE(Reason) ->
+ {error, Reason}
+ end.
+%% TODO: don't actually raise any errors yet
+
+%% There used to be a difference here between the handling of
+%% configured listening and connecting transports but now we simply
+%% tell the transport_module to start an accepting or connecting
+%% process respectively, the transport implementation initiating
+%% listening on a port as required.
+type(listen) -> accept;
+type(accept) -> listen;
+type(connect = T) -> T.
+
+%% start/4
+
+start(Ref, Type, Opts, #state{peerT = PeerT,
+ connT = ConnT,
+ service_name = SvcName,
+ service = Svc})
+ when Type == connect;
+ Type == accept ->
+ Pid = monitor(s(Type, Ref, {ConnT,
+ Opts,
+ SvcName,
+ merge_service(Opts, Svc)})),
+ insert(PeerT, #peer{pid = Pid,
+ type = Type,
+ ref = Ref,
+ options = Opts}),
+ Pid.
+
+%% Note that the service record passed into the watchdog is the merged
+%% record so that each watchdog (and peer_fsm) may get a different
+%% record. This record is what is passed back into application
+%% callbacks.
+
+s(Type, Ref, T) ->
+ diameter_watchdog:start({Type, Ref}, T).
+
+%% merge_service/2
+
+merge_service(Opts, Svc) ->
+ lists:foldl(fun ms/2, Svc, Opts).
+
+%% Limit the applications known to the fsm to those in the 'apps'
+%% option. That this might be empty is checked by the fsm. It's not
+%% checked at config-time since there's no requirement that the
+%% service be configured first. (Which could be considered a bit odd.)
+ms({applications, As}, #diameter_service{applications = Apps} = S)
+ when is_list(As) ->
+ S#diameter_service{applications
+ = [A || A <- Apps,
+ lists:member(A#diameter_app.alias, As)]};
+
+%% The fact that all capabilities can be configured on the transports
+%% means that the service doesn't necessarily represent a single
+%% locally implemented Diameter peer as identified by Origin-Host: a
+%% transport can configure its own Origin-Host. This means that the
+%% service little more than a placeholder for default capabilities
+%% plus a list of applications that individual transports can choose
+%% to support (or not).
+ms({capabilities, Opts}, #diameter_service{capabilities = Caps0} = Svc)
+ when is_list(Opts) ->
+ %% make_caps has already succeeded in diameter_config so it will succeed
+ %% again here.
+ {ok, Caps} = diameter_capx:make_caps(Caps0, Opts),
+ Svc#diameter_service{capabilities = Caps};
+
+ms(_, Svc) ->
+ Svc.
+
+%%% ---------------------------------------------------------------------------
+%%% # accepted/3
+%%% ---------------------------------------------------------------------------
+
+accepted(Pid, _TPid, #state{peerT = PeerT} = S) ->
+ #peer{ref = Ref, type = accept = T, conn = false, options = Opts}
+ = P
+ = fetch(PeerT, Pid),
+ insert(PeerT, P#peer{conn = true}), %% mark replacement transport started
+ start(Ref, T, Opts, S). %% start new peer
+
+fetch(Tid, Key) ->
+ [T] = ets:lookup(Tid, Key),
+ T.
+
+%%% ---------------------------------------------------------------------------
+%%% # connection_up/3
+%%%
+%%% Output: #state{}
+%%% ---------------------------------------------------------------------------
+
+%% Peer process has reached the open state.
+
+connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT,
+ connT = ConnT}
+ = S) ->
+ P = fetch(PeerT, Pid),
+ C = #conn{pid = TPid,
+ apps = SApps,
+ caps = Caps,
+ peer = Pid},
+
+ insert(ConnT, C),
+ connection_up([Pkt], P#peer{conn = TPid}, C, S).
+
+%%% ---------------------------------------------------------------------------
+%%% # connection_up/2
+%%%
+%%% Output: #state{}
+%%% ---------------------------------------------------------------------------
+
+%% Peer process has transitioned back into the open state. Note that there
+%% has been no new capabilties exchange in this case.
+
+connection_up(Pid, #state{peerT = PeerT,
+ connT = ConnT}
+ = S) ->
+ #peer{conn = TPid} = P = fetch(PeerT, Pid),
+ C = fetch(ConnT, TPid),
+ connection_up([], P, C, S).
+
+%% connection_up/4
+
+connection_up(T, P, C, #state{peerT = PeerT,
+ local_peers = LDict,
+ service_name = SvcName,
+ service
+ = #diameter_service{applications = Apps}}
+ = S) ->
+ #peer{conn = TPid, op_state = ?STATE_DOWN}
+ = P,
+ #conn{apps = SApps, caps = Caps}
+ = C,
+
+ insert(PeerT, P#peer{op_state = ?STATE_UP}),
+
+ request_peer_up(TPid),
+ report_status(up, P, C, S, T),
+ S#state{local_peers = insert_local_peer(SApps,
+ {{TPid, Caps}, {SvcName, Apps}},
+ LDict)}.
+
+insert_local_peer(SApps, T, LDict) ->
+ lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps).
+
+ilp({Id, Alias}, {TC, SA}, LDict) ->
+ init_conn(Id, Alias, TC, SA),
+ ?Dict:append(Alias, TC, LDict).
+
+init_conn(Id, Alias, TC, {SvcName, Apps}) ->
+ #diameter_app{module = ModX,
+ id = Id} %% assert
+ = find_app(Alias, Apps),
+
+ peer_cb({ModX, peer_up, [SvcName, TC]}, Alias).
+
+find_app(Alias, Apps) ->
+ lists:keyfind(Alias, #diameter_app.alias, Apps).
+
+%% A failing peer callback brings down the service. In the case of
+%% peer_up we could just kill the transport and emit an error but for
+%% peer_down we have no way to cleanup any state change that peer_up
+%% may have introduced.
+peer_cb(MFA, Alias) ->
+ try state_cb(MFA, Alias) of
+ ModS ->
+ mod_state(Alias, ModS)
+ catch
+ E: Reason ->
+ ?ERROR({E, Reason, MFA, ?STACK})
+ end.
+
+%%% ---------------------------------------------------------------------------
+%%% # connection_down/2
+%%%
+%%% Output: #state{}
+%%% ---------------------------------------------------------------------------
+
+%% Peer process has transitioned out of the open state.
+
+connection_down(Pid, #state{peerT = PeerT,
+ connT = ConnT}
+ = S) ->
+ #peer{conn = TPid}
+ = P
+ = fetch(PeerT, Pid),
+
+ C = fetch(ConnT, TPid),
+ insert(PeerT, P#peer{op_state = ?STATE_DOWN}),
+ connection_down(P,C,S).
+
+%% connection_down/3
+
+connection_down(#peer{conn = TPid,
+ op_state = ?STATE_UP}
+ = P,
+ #conn{caps = Caps,
+ apps = SApps}
+ = C,
+ #state{service_name = SvcName,
+ service = #diameter_service{applications = Apps},
+ local_peers = LDict}
+ = S) ->
+ report_status(down, P, C, S, []),
+ NewS = S#state{local_peers
+ = remove_local_peer(SApps,
+ {{TPid, Caps}, {SvcName, Apps}},
+ LDict)},
+ request_peer_down(TPid, NewS),
+ NewS.
+
+remove_local_peer(SApps, T, LDict) ->
+ lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps).
+
+rlp({Id, Alias}, {TC, SA}, LDict) ->
+ L = ?Dict:fetch(Alias, LDict),
+ down_conn(Id, Alias, TC, SA),
+ ?Dict:store(Alias, lists:delete(TC, L), LDict).
+
+down_conn(Id, Alias, TC, {SvcName, Apps}) ->
+ #diameter_app{module = ModX,
+ id = Id} %% assert
+ = find_app(Alias, Apps),
+
+ peer_cb({ModX, peer_down, [SvcName, TC]}, Alias).
+
+%%% ---------------------------------------------------------------------------
+%%% # peer_down/3
+%%%
+%%% Output: #state{}
+%%% ---------------------------------------------------------------------------
+
+%% Peer process has died.
+
+peer_down(Pid, _Reason, #state{peerT = PeerT} = S) ->
+ P = fetch(PeerT, Pid),
+ ets:delete_object(PeerT, P),
+ restart(P,S),
+ peer_down(P,S).
+
+%% peer_down/2
+
+%% The peer has never come up ...
+peer_down(#peer{conn = B}, S)
+ when is_boolean(B) ->
+ S;
+
+%% ... or it has.
+peer_down(#peer{ref = Ref,
+ conn = TPid,
+ type = Type,
+ options = Opts}
+ = P,
+ #state{service_name = SvcName,
+ connT = ConnT}
+ = S) ->
+ #conn{caps = Caps}
+ = C
+ = fetch(ConnT, TPid),
+ ets:delete_object(ConnT, C),
+ try
+ pd(P,C,S)
+ after
+ send_event(SvcName, {closed, Ref, {TPid, Caps}, {type(Type), Opts}})
+ end.
+
+pd(#peer{op_state = ?STATE_DOWN}, _, S) ->
+ S;
+pd(#peer{op_state = ?STATE_UP} = P, C, S) ->
+ connection_down(P,C,S).
+
+%% restart/2
+
+restart(P,S) ->
+ q_restart(restart(P), S).
+
+%% restart/1
+
+%% Always try to reconnect.
+restart(#peer{ref = Ref,
+ type = connect = T,
+ options = Opts,
+ started = Time}) ->
+ {Time, {Ref, T, Opts}};
+
+%% Transport connection hasn't yet been accepted ...
+restart(#peer{ref = Ref,
+ type = accept = T,
+ options = Opts,
+ conn = false,
+ started = Time}) ->
+ {Time, {Ref, T, Opts}};
+
+%% ... or it has: a replacement transport has already been spawned.
+restart(#peer{type = accept}) ->
+ false.
+
+%% q_restart/2
+
+%% Start the reconnect timer.
+q_restart({Time, {_Ref, Type, Opts} = T}, S) ->
+ start_tc(tc(Time, default_tc(Type, Opts)), T, S);
+q_restart(false, _) ->
+ ok.
+
+%% RFC 3588, 2.1:
+%%
+%% When no transport connection exists with a peer, an attempt to
+%% connect SHOULD be periodically made. This behavior is handled via
+%% the Tc timer, whose recommended value is 30 seconds. There are
+%% certain exceptions to this rule, such as when a peer has terminated
+%% the transport connection stating that it does not wish to
+%% communicate.
+
+default_tc(connect, Opts) ->
+ proplists:get_value(reconnect_timer, Opts, ?DEFAULT_TC);
+default_tc(accept, _) ->
+ 0.
+
+%% Bound tc below if the peer was restarted recently to avoid
+%% continuous in case of faulty config or other problems.
+tc(Time, Tc) ->
+ choose(Tc > ?RESTART_TC
+ orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC,
+ Tc,
+ ?RESTART_TC).
+
+start_tc(0, T, S) ->
+ tc_timeout(T, S);
+start_tc(Tc, T, _) ->
+ erlang:send_after(Tc, self(), {tc_timeout, T}).
+
+%% tc_timeout/2
+
+tc_timeout({Ref, _Type, _Opts} = T, #state{service_name = SvcName} = S) ->
+ tc(have_transport(SvcName, Ref), T, S).
+
+tc(true, {Ref, Type, Opts}, #state{service_name = SvcName}
+ = S) ->
+ send_event(SvcName, {reconnect, Ref, Opts}),
+ start(Ref, Type, Opts, S);
+tc(false = No, _, _) -> %% removed
+ No.
+
+%%% ---------------------------------------------------------------------------
+%%% # close/2
+%%% ---------------------------------------------------------------------------
+
+%% The watchdog doesn't start a new fsm in the accept case, it
+%% simply stays alive until someone tells it to die in order for
+%% another watchdog to be able to detect that it should transition
+%% from initial into reopen rather than okay. That someone is either
+%% the accepting watchdog upon reception of a CER from the previously
+%% connected peer, or us after reconnect_timer timeout.
+
+close(Pid, #state{service_name = SvcName,
+ peerT = PeerT}) ->
+ #peer{pid = Pid,
+ type = accept,
+ ref = Ref,
+ options = Opts}
+ = fetch(PeerT, Pid),
+
+ c(Pid, have_transport(SvcName, Ref), Opts).
+
+%% Tell watchdog to (maybe) die later ...
+c(Pid, true, Opts) ->
+ Tc = proplists:get_value(reconnect_timer, Opts, 2*?DEFAULT_TC),
+ erlang:send_after(Tc, Pid, close);
+
+%% ... or now.
+c(Pid, false, _Opts) ->
+ Pid ! close.
+
+%% The RFC's only document the behaviour of Tc, our reconnect_timer,
+%% for the establishment of connections but we also give
+%% reconnect_timer semantics for a listener, being the time within
+%% which a new connection attempt is expected of a connecting peer.
+%% The value should be greater than the peer's Tc + jitter.
+
+%%% ---------------------------------------------------------------------------
+%%% # reconnect/2
+%%% ---------------------------------------------------------------------------
+
+reconnect(Pid, #state{service_name = SvcName,
+ peerT = PeerT}) ->
+ #peer{ref = Ref,
+ type = connect,
+ options = Opts}
+ = fetch(PeerT, Pid),
+ send_event(SvcName, {reconnect, Ref, Opts}).
+
+%%% ---------------------------------------------------------------------------
+%%% # call_module/4
+%%% ---------------------------------------------------------------------------
+
+%% Backwards compatibility and never documented/advertised. May be
+%% removed.
+
+call_module(Mod, Req, From, #state{service
+ = #diameter_service{applications = Apps},
+ service_name = Svc}
+ = S) ->
+ case cm([A || A <- Apps, Mod == hd(A#diameter_app.module)],
+ Req,
+ From,
+ Svc)
+ of
+ {reply = T, RC} ->
+ {T, RC, S};
+ noreply = T ->
+ {T, S};
+ Reason ->
+ {reply, {error, Reason}, S}
+ end.
+
+cm([#diameter_app{module = ModX, alias = Alias}], Req, From, Svc) ->
+ MFA = {ModX, handle_call, [Req, From, Svc]},
+
+ try state_cb(MFA, Alias) of
+ {noreply = T, ModS} ->
+ mod_state(Alias, ModS),
+ T;
+ {reply = T, RC, ModS} ->
+ mod_state(Alias, ModS),
+ {T, RC};
+ T ->
+ diameter_lib:error_report({invalid, T}, MFA),
+ invalid
+ catch
+ E: Reason ->
+ diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA),
+ failure
+ end;
+
+cm([], _, _, _) ->
+ unknown;
+
+cm([_,_|_], _, _, _) ->
+ multiple.
+
+%%% ---------------------------------------------------------------------------
+%%% # send_request/5
+%%% ---------------------------------------------------------------------------
+
+%% Send an outgoing request in its dedicated process.
+%%
+%% Note that both encode of the outgoing request and of the received
+%% answer happens in this process. It's also this process that replies
+%% to the caller. The service process only handles the state-retaining
+%% callbacks.
+%%
+%% The mod field of the #diameter_app{} here includes any extra
+%% arguments passed to diameter:call/2.
+
+send_request({TPid, Caps, App}, Msg, Opts, Caller, SvcName) ->
+ #diameter_app{module = ModX}
+ = App,
+
+ Pkt = make_packet(Msg),
+
+ case cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]) of
+ {send, P} ->
+ send_request(make_packet(P, Pkt),
+ TPid,
+ Caps,
+ App,
+ Opts,
+ Caller,
+ SvcName);
+ {discard, Reason} ->
+ {error, Reason};
+ discard ->
+ {error, discarded};
+ T ->
+ ?ERROR({invalid_return, prepare_request, App, T})
+ end.
+
+%% make_packet/1
+%%
+%% Turn an outgoing request as passed to call/4 into a diameter_packet
+%% record in preparation for a prepare_request callback.
+
+make_packet(Bin)
+ when is_binary(Bin) ->
+ #diameter_packet{header = diameter_codec:decode_header(Bin),
+ bin = Bin};
+
+make_packet(#diameter_packet{msg = [#diameter_header{} = Hdr | Avps]} = Pkt) ->
+ Pkt#diameter_packet{msg = [make_header(Hdr) | Avps]};
+
+make_packet(#diameter_packet{header = Hdr} = Pkt) ->
+ Pkt#diameter_packet{header = make_header(Hdr)};
+
+make_packet(Msg) ->
+ make_packet(#diameter_packet{msg = Msg}).
+
+%% make_header/1
+
+make_header(undefined) ->
+ Seq = diameter_session:sequence(),
+ make_header(#diameter_header{end_to_end_id = Seq,
+ hop_by_hop_id = Seq});
+
+make_header(#diameter_header{version = undefined} = Hdr) ->
+ make_header(Hdr#diameter_header{version = ?DIAMETER_VERSION});
+
+make_header(#diameter_header{end_to_end_id = undefined} = H) ->
+ Seq = diameter_session:sequence(),
+ make_header(H#diameter_header{end_to_end_id = Seq});
+
+make_header(#diameter_header{hop_by_hop_id = undefined} = H) ->
+ Seq = diameter_session:sequence(),
+ make_header(H#diameter_header{hop_by_hop_id = Seq});
+
+make_header(#diameter_header{} = Hdr) ->
+ Hdr;
+
+make_header(T) ->
+ ?ERROR({invalid_header, T}).
+
+%% make_packet/2
+%%
+%% Reconstruct a diameter_packet from the return value of
+%% prepare_request or prepare_retransmit callback.
+
+make_packet(Bin, _)
+ when is_binary(Bin) ->
+ make_packet(Bin);
+
+make_packet(#diameter_packet{msg = [#diameter_header{} | _]} = Pkt, _) ->
+ Pkt;
+
+%% Returning a diameter_packet with no header from a prepare_request
+%% or prepare_retransmit callback retains the header passed into it.
+%% This is primarily so that the end to end and hop by hop identifiers
+%% are retained.
+make_packet(#diameter_packet{header = Hdr} = Pkt,
+ #diameter_packet{header = Hdr0}) ->
+ Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)};
+
+make_packet(Msg, Pkt) ->
+ Pkt#diameter_packet{msg = Msg}.
+
+%% fold_record/2
+
+fold_record(undefined, R) ->
+ R;
+fold_record(Rec, R) ->
+ diameter_lib:fold_tuple(2, Rec, R).
+
+%% send_request/7
+
+send_request(Pkt, TPid, Caps, App, Opts, Caller, SvcName) ->
+ #diameter_app{alias = Alias,
+ dictionary = Dict,
+ module = ModX,
+ answer_errors = AE}
+ = App,
+
+ EPkt = encode(Dict, Pkt),
+
+ #options{filter = Filter,
+ timeout = Timeout}
+ = Opts,
+
+ Req = #request{packet = Pkt,
+ from = Caller,
+ handler = self(),
+ transport = TPid,
+ caps = Caps,
+ app = Alias,
+ filter = Filter,
+ dictionary = Dict,
+ module = ModX},
+
+ try
+ TRef = send_request(TPid, EPkt, Req, Timeout),
+ ack(Caller),
+ handle_answer(SvcName, AE, recv_answer(Timeout, SvcName, {TRef, Req}))
+ after
+ erase_request(EPkt)
+ end.
+
+%% Tell caller a send has been attempted.
+ack({Pid, Ref}) ->
+ Pid ! Ref.
+
+%% recv_answer/3
+
+recv_answer(Timeout,
+ SvcName,
+ {TRef, #request{from = {_, Ref}, packet = RPkt} = Req}
+ = T) ->
+
+ %% Matching on TRef below ensures we ignore messages that pertain
+ %% to a previous transport prior to failover. The answer message
+ %% includes the #request{} since it's not necessarily Req; that
+ %% is, from the last peer to which we've transmitted.
+
+ receive
+ {answer = A, Ref, Rq, Pkt} -> %% Answer from peer
+ {A, Rq, Pkt};
+ {timeout = Reason, TRef, _} -> %% No timely reply
+ {error, Req, Reason};
+ {failover = Reason, TRef, false} -> %% No alternate peer
+ {error, Req, Reason};
+ {failover, TRef, Transport} -> %% Resend to alternate peer
+ try_retransmit(Timeout, SvcName, Req, Transport);
+ {failover, TRef} -> %% May have missed failover notification
+ Seqs = diameter_codec:sequence_numbers(RPkt),
+ Pid = whois(SvcName),
+ is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}),
+ recv_answer(Timeout, SvcName, T)
+ end.
+%% Note that failover starts a new timer and that expiry of an old
+%% timer value is ignored. This means that an answer could be accepted
+%% from a peer after timeout in the case of failover.
+
+try_retransmit(Timeout, SvcName, Req, Transport) ->
+ try retransmit(Transport, Req, SvcName, Timeout) of
+ T -> recv_answer(Timeout, SvcName, T)
+ catch
+ ?FAILURE(Reason) -> {error, Req, Reason}
+ end.
+
+%% handle_error/3
+
+handle_error(Req, Reason, SvcName) ->
+ #request{module = ModX,
+ packet = Pkt,
+ transport = TPid,
+ caps = Caps}
+ = Req,
+ cb(ModX, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]).
+
+msg(#diameter_packet{msg = undefined, bin = Bin}) ->
+ Bin;
+msg(#diameter_packet{msg = Msg}) ->
+ Msg.
+
+%% encode/2
+
+%% Note that prepare_request can return a diameter_packet containing
+%% header or transport_data. Even allow the returned record to contain
+%% an encoded binary. This isn't the usual case but could some in
+%% handy, for test at least. (For example, to send garbage.)
+
+%% The normal case: encode the returned message.
+encode(Dict, #diameter_packet{msg = Msg, bin = undefined} = Pkt) ->
+ D = pick_dictionary([Dict, ?BASE], Msg),
+ diameter_codec:encode(D, Pkt);
+
+%% Callback has returned an encoded binary: just send.
+encode(_, #diameter_packet{} = Pkt) ->
+ Pkt.
+
+%% pick_dictionary/2
+
+%% Pick the first dictionary that declares the application id in the
+%% specified header.
+pick_dictionary(Ds, [#diameter_header{application_id = Id} | _]) ->
+ pd(Ds, fun(D) -> Id = D:id() end);
+
+%% Pick the first dictionary that knows the specified message name.
+pick_dictionary(Ds, [MsgName|_]) ->
+ pd(Ds, fun(D) -> D:msg2rec(MsgName) end);
+
+%% Pick the first dictionary that knows the name of the specified
+%% message record.
+pick_dictionary(Ds, Rec) ->
+ Name = element(1, Rec),
+ pd(Ds, fun(D) -> D:rec2msg(Name) end).
+
+pd([D|Ds], F) ->
+ try
+ F(D),
+ D
+ catch
+ error:_ ->
+ pd(Ds, F)
+ end;
+
+pd([], _) ->
+ ?ERROR(no_dictionary).
+
+%% send_request/4
+
+send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, Timeout)
+ when node() == node(TPid) ->
+ %% Store the outgoing request before sending to avoid a race with
+ %% reply reception.
+ TRef = store_request(TPid, Bin, Req, Timeout),
+ send(TPid, Pkt),
+ TRef;
+
+%% Send using a remote transport: spawn a process on the remote node
+%% to relay the answer.
+send_request(TPid, #diameter_packet{} = Pkt, Req, Timeout) ->
+ TRef = erlang:start_timer(Timeout, self(), timeout),
+ T = {TPid, Pkt, Req, Timeout, TRef},
+ spawn(node(TPid), ?MODULE, send, [T]),
+ TRef.
+
+%% send/1
+
+send({TPid, Pkt, #request{handler = Pid} = Req, Timeout, TRef}) ->
+ Ref = send_request(TPid, Pkt, Req#request{handler = self()}, Timeout),
+ Pid ! reref(receive T -> T end, Ref, TRef).
+
+reref({T, Ref, R}, Ref, TRef) ->
+ {T, TRef, R};
+reref(T, _, _) ->
+ T.
+
+%% send/2
+
+send(Pid, Pkt) ->
+ Pid ! {send, Pkt}.
+
+%% retransmit/4
+
+retransmit({TPid, Caps, #diameter_app{alias = Alias} = App},
+ #request{app = Alias,
+ packet = Pkt}
+ = Req,
+ SvcName,
+ Timeout) ->
+ have_request(Pkt, TPid) %% Don't failover to a peer we've
+ andalso ?THROW(timeout), %% already sent to.
+
+ case cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]) of
+ {send, P} ->
+ retransmit(make_packet(P, Pkt), TPid, Caps, Req, Timeout);
+ {discard, Reason} ->
+ ?THROW(Reason);
+ discard ->
+ ?THROW(discarded);
+ T ->
+ ?ERROR({invalid_return, prepare_retransmit, App, T})
+ end.
+
+%% retransmit/5
+
+retransmit(Pkt, TPid, Caps, #request{dictionary = Dict} = Req, Timeout) ->
+ EPkt = encode(Dict, Pkt),
+
+ NewReq = Req#request{transport = TPid,
+ packet = Pkt,
+ caps = Caps},
+
+ ?LOG(retransmission, NewReq),
+ TRef = send_request(TPid, EPkt, NewReq, Timeout),
+ {TRef, NewReq}.
+
+%% store_request/4
+
+store_request(TPid, Bin, Req, Timeout) ->
+ Seqs = diameter_codec:sequence_numbers(Bin),
+ TRef = erlang:start_timer(Timeout, self(), timeout),
+ ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}),
+ ets:member(?REQUEST_TABLE, TPid)
+ orelse (self() ! {failover, TRef}), %% possibly missed failover
+ TRef.
+
+%% lookup_request/2
+
+lookup_request(Msg, TPid)
+ when is_pid(TPid) ->
+ lookup(Msg, TPid, '_');
+
+lookup_request(Msg, TRef)
+ when is_reference(TRef) ->
+ lookup(Msg, '_', TRef).
+
+lookup(Msg, TPid, TRef) ->
+ Seqs = diameter_codec:sequence_numbers(Msg),
+ Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, TRef},
+ [],
+ ['$_']}],
+ case ets:select(?REQUEST_TABLE, Spec) of
+ [{_, Req, _}] ->
+ Req;
+ [] ->
+ false
+ end.
+
+%% erase_request/1
+
+erase_request(Pkt) ->
+ ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)).
+
+%% match_requests/1
+
+match_requests(TPid) ->
+ Pat = {'_', #request{transport = TPid, _ = '_'}, '_'},
+ ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}]).
+
+%% have_request/2
+
+have_request(Pkt, TPid) ->
+ Seqs = diameter_codec:sequence_numbers(Pkt),
+ Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'},
+ '$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1).
+
+%% request_peer_up/1
+
+request_peer_up(TPid) ->
+ ets:insert(?REQUEST_TABLE, {TPid}).
+
+%% request_peer_down/2
+
+request_peer_down(TPid, S) ->
+ ets:delete(?REQUEST_TABLE, TPid),
+ lists:foreach(fun(T) -> failover(T,S) end, match_requests(TPid)).
+%% Note that a request process can store its request after failover
+%% notifications are sent here: store_request/4 sends the notification
+%% in that case. Note also that we'll send as many notifications to a
+%% given handler as there are peers its sent to. All but one of these
+%% will be ignored.
+
+%%% ---------------------------------------------------------------------------
+%%% recv_request/3
+%%% ---------------------------------------------------------------------------
+
+recv_request(TPid, Pkt, {ConnT, SvcName, Apps}) ->
+ try ets:lookup(ConnT, TPid) of
+ [C] ->
+ recv_request(C, TPid, Pkt, SvcName, Apps);
+ [] -> %% transport has gone down
+ ok
+ catch
+ error: badarg -> %% service has gone down (and taken table with it)
+ ok
+ end.
+
+%% recv_request/5
+
+recv_request(#conn{apps = SApps, caps = Caps}, TPid, Pkt, SvcName, Apps) ->
+ #diameter_caps{origin_host = {OH,_},
+ origin_realm = {OR,_}}
+ = Caps,
+
+ #diameter_packet{header = #diameter_header{application_id = Id}}
+ = Pkt,
+
+ recv_request(find_recv_app(Id, SApps),
+ {SvcName, OH, OR},
+ TPid,
+ Apps,
+ Caps,
+ Pkt).
+
+%% find_recv_app/2
+
+%% No one should be sending the relay identifier.
+find_recv_app(?APP_ID_RELAY, _) ->
+ false;
+
+%% With any other id we either support it locally or as a relay.
+find_recv_app(Id, SApps) ->
+ keyfind([Id, ?APP_ID_RELAY], 1, SApps).
+
+%% keyfind/3
+
+keyfind([], _, _) ->
+ false;
+keyfind([Key | Rest], Pos, L) ->
+ case lists:keyfind(Key, Pos, L) of
+ false ->
+ keyfind(Rest, Pos, L);
+ T ->
+ T
+ end.
+
+%% recv_request/6
+
+recv_request({Id, Alias}, T, TPid, Apps, Caps, Pkt) ->
+ #diameter_app{dictionary = Dict}
+ = A
+ = find_app(Alias, Apps),
+ recv_request(T, {TPid, Caps}, A, diameter_codec:decode(Id, Dict, Pkt));
+%% Note that the decode is different depending on whether or not Id is
+%% ?APP_ID_RELAY.
+
+%% DIAMETER_APPLICATION_UNSUPPORTED 3007
+%% A request was sent for an application that is not supported.
+
+recv_request(false, T, TPid, _, _, Pkt) ->
+ As = collect_avps(Pkt),
+ protocol_error(3007, T, TPid, Pkt#diameter_packet{avps = As}).
+
+collect_avps(Pkt) ->
+ case diameter_codec:collect_avps(Pkt) of
+ {_Bs, As} ->
+ As;
+ As ->
+ As
+ end.
+
+%% recv_request/4
+
+%% Wrong number of bits somewhere in the message: reply.
+%%
+%% DIAMETER_INVALID_AVP_BITS 3009
+%% A request was received that included an AVP whose flag bits are
+%% set to an unrecognized value, or that is inconsistent with the
+%% AVP's definition.
+%%
+recv_request(T, {TPid, _}, _, #diameter_packet{errors = [Bs | _]} = Pkt)
+ when is_bitstring(Bs) ->
+ protocol_error(3009, T, TPid, Pkt);
+
+%% Either we support this application but don't recognize the command
+%% or we're a relay and the command isn't proxiable.
+%%
+%% DIAMETER_COMMAND_UNSUPPORTED 3001
+%% The Request contained a Command-Code that the receiver did not
+%% recognize or support. This MUST be used when a Diameter node
+%% receives an experimental command that it does not understand.
+%%
+recv_request(T,
+ {TPid, _},
+ #diameter_app{id = Id},
+ #diameter_packet{header = #diameter_header{is_proxiable = P},
+ msg = M}
+ = Pkt)
+ when ?APP_ID_RELAY /= Id, undefined == M;
+ ?APP_ID_RELAY == Id, not P ->
+ protocol_error(3001, T, TPid, Pkt);
+
+%% Error bit was set on a request.
+%%
+%% DIAMETER_INVALID_HDR_BITS 3008
+%% A request was received whose bits in the Diameter header were
+%% either set to an invalid combination, or to a value that is
+%% inconsistent with the command code's definition.
+%%
+recv_request(T,
+ {TPid, _},
+ _,
+ #diameter_packet{header = #diameter_header{is_error = true}}
+ = Pkt) ->
+ protocol_error(3008, T, TPid, Pkt);
+
+%% A message in a locally supported application or a proxiable message
+%% in the relay application. Don't distinguish between the two since
+%% each application has its own callback config. That is, the user can
+%% easily distinguish between the two cases.
+recv_request(T, TC, App, Pkt) ->
+ request_cb(T, TC, App, examine(Pkt)).
+
+%% Note that there may still be errors but these aren't protocol
+%% (3xxx) errors that lead to an answer-message.
+
+request_cb({SvcName, _OH, _OR} = T, TC, App, Pkt) ->
+ request_cb(cb(App, handle_request, [Pkt, SvcName, TC]), App, T, TC, Pkt).
+
+%% examine/1
+%%
+%% Look for errors in a decoded message. Length errors result in
+%% decode failure in diameter_codec.
+
+examine(#diameter_packet{header = #diameter_header{version
+ = ?DIAMETER_VERSION}}
+ = Pkt) ->
+ Pkt;
+
+%% DIAMETER_UNSUPPORTED_VERSION 5011
+%% This error is returned when a request was received, whose version
+%% number is unsupported.
+
+examine(#diameter_packet{errors = Es} = Pkt) ->
+ Pkt#diameter_packet{errors = [5011 | Es]}.
+%% It's odd/unfortunate that this isn't a protocol error.
+
+%% request_cb/5
+
+%% A reply may be an answer-message, constructed either here or by
+%% the handle_request callback. The header from the incoming request
+%% is passed into the encode so that it can retrieve the relevant
+%% command code in this case. It will also then ignore Dict and use
+%% the base encoder.
+request_cb({reply, Ans},
+ #diameter_app{dictionary = Dict},
+ _,
+ {TPid, _},
+ Pkt) ->
+ reply(Ans, Dict, TPid, Pkt);
+
+%% An 3xxx result code, for which the E-bit is set in the header.
+request_cb({protocol_error, RC}, _, T, {TPid, _}, Pkt)
+ when 3000 =< RC, RC < 4000 ->
+ protocol_error(RC, T, TPid, Pkt);
+
+%% RFC 3588 says we must reply 3001 to anything unrecognized or
+%% unsupported. 'noreply' is undocumented (and inappropriately named)
+%% backwards compatibility for this, protocol_error the documented
+%% alternative.
+request_cb(noreply, _, T, {TPid, _}, Pkt) ->
+ protocol_error(3001, T, TPid, Pkt);
+
+%% Relay a request to another peer. This is equivalent to doing an
+%% explicit call/4 with the message in question except that (1) a loop
+%% will be detected by examining Route-Record AVP's, (3) a
+%% Route-Record AVP will be added to the outgoing request and (3) the
+%% End-to-End Identifier will default to that in the
+%% #diameter_header{} without the need for an end_to_end_identifier
+%% option.
+%%
+%% relay and proxy are similar in that they require the same handling
+%% with respect to Route-Record and End-to-End identifier. The
+%% difference is that a proxy advertises specific applications, while
+%% a relay advertises the relay application. If a callback doesn't
+%% want to distinguish between the cases in the callback return value
+%% then 'resend' is a neutral alternative.
+%%
+request_cb({A, Opts},
+ #diameter_app{id = Id}
+ = App,
+ T,
+ TC,
+ Pkt)
+ when A == relay, Id == ?APP_ID_RELAY;
+ A == proxy, Id /= ?APP_ID_RELAY;
+ A == resend ->
+ resend(Opts, App, T, TC, Pkt);
+
+request_cb(discard, _, _, _, _) ->
+ ok;
+
+request_cb({eval, RC, F}, App, T, TC, Pkt) ->
+ request_cb(RC, App, T, TC, Pkt),
+ diameter_lib:eval(F).
+
+%% protocol_error/4
+
+protocol_error(RC, {_, OH, OR}, TPid, #diameter_packet{avps = Avps} = Pkt) ->
+ ?LOG({error, RC}, Pkt),
+ reply(answer_message({OH, OR, RC}, Avps), ?BASE, TPid, Pkt).
+
+%% resend/5
+%%
+%% Resend a message as a relay or proxy agent.
+
+resend(Opts,
+ #diameter_app{} = App,
+ {_SvcName, OH, _OR} = T,
+ {_TPid, _Caps} = TC,
+ #diameter_packet{avps = Avps} = Pkt) ->
+ {Code, _Flags, Vid} = ?BASE:avp_header('Route-Record'),
+ resend(is_loop(Code, Vid, OH, Avps), Opts, App, T, TC, Pkt).
+
+%% DIAMETER_LOOP_DETECTED 3005
+%% An agent detected a loop while trying to get the message to the
+%% intended recipient. The message MAY be sent to an alternate peer,
+%% if one is available, but the peer reporting the error has
+%% identified a configuration problem.
+
+resend(true, _, _, T, {TPid, _}, Pkt) -> %% Route-Record loop
+ protocol_error(3005, T, TPid, Pkt);
+
+%% 6.1.8. Relaying and Proxying Requests
+%%
+%% A relay or proxy agent MUST append a Route-Record AVP to all requests
+%% forwarded. The AVP contains the identity of the peer the request was
+%% received from.
+
+resend(false,
+ Opts,
+ App,
+ {SvcName, _, _} = T,
+ {TPid, #diameter_caps{origin_host = {_, OH}}},
+ #diameter_packet{header = Hdr0,
+ avps = Avps}
+ = Pkt) ->
+ Route = #diameter_avp{data = {?BASE, 'Route-Record', OH}},
+ Seq = diameter_session:sequence(),
+ Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq},
+ Msg = [Hdr, Route | Avps],
+ resend(call(SvcName, App, Msg, Opts), T, TPid, Pkt).
+%% The incoming request is relayed with the addition of a
+%% Route-Record. Note the requirement on the return from call/4 below,
+%% which places a requirement on the value returned by the
+%% handle_answer callback of the application module in question.
+%%
+%% Note that there's nothing stopping the request from being relayed
+%% back to the sender. A pick_peer callback may want to avoid this but
+%% a smart peer might recognize the potential loop and choose another
+%% route. A less smart one will probably just relay the request back
+%% again and force us to detect the loop. A pick_peer that wants to
+%% avoid this can specify filter to avoid the possibility.
+%% Eg. {neg, {host, OH} where #diameter_caps{origin_host = {OH, _}}.
+%%
+%% RFC 6.3 says that a relay agent does not modify Origin-Host but
+%% says nothing about a proxy. Assume it should behave the same way.
+
+%% resend/4
+%%
+%% Relay a reply to a relayed request.
+
+%% Answer from the peer: reset the hop by hop identifier and send.
+resend(#diameter_packet{bin = B}
+ = Pkt,
+ _,
+ TPid,
+ #diameter_packet{header = #diameter_header{hop_by_hop_id = Id},
+ transport_data = TD}) ->
+ send(TPid, Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B),
+ transport_data = TD});
+%% TODO: counters
+
+%% Or not: DIAMETER_UNABLE_TO_DELIVER.
+resend(_, T, TPid, Pkt) ->
+ protocol_error(3002, T, TPid, Pkt).
+
+%% is_loop/4
+%%
+%% Is there a Route-Record AVP with our Origin-Host?
+
+is_loop(Code,
+ Vid,
+ Bin,
+ [#diameter_avp{code = Code, vendor_id = Vid, data = Bin} | _]) ->
+ true;
+
+is_loop(_, _, _, []) ->
+ false;
+
+is_loop(Code, Vid, OH, [_ | Avps])
+ when is_binary(OH) ->
+ is_loop(Code, Vid, OH, Avps);
+
+is_loop(Code, Vid, OH, Avps) ->
+ is_loop(Code, Vid, ?BASE:avp(encode, OH, 'Route-Record'), Avps).
+
+%% reply/4
+%%
+%% Send a locally originating reply.
+
+%% No errors or a diameter_header/avp list.
+reply(Msg, Dict, TPid, #diameter_packet{errors = Es,
+ transport_data = TD}
+ = ReqPkt)
+ when [] == Es;
+ is_record(hd(Msg), diameter_header) ->
+ Pkt = diameter_codec:encode(Dict, make_reply_packet(Msg, ReqPkt)),
+ incr(send, Pkt, Dict, TPid), %% count result codes in sent answers
+ send(TPid, Pkt#diameter_packet{transport_data = TD});
+
+%% Or not: set Result-Code and Failed-AVP AVP's.
+reply(Msg, Dict, TPid, #diameter_packet{errors = [H|_] = Es} = Pkt) ->
+ reply(rc(Msg, rc(H), [A || {_,A} <- Es], Dict),
+ Dict,
+ TPid,
+ Pkt#diameter_packet{errors = []}).
+
+%% make_reply_packet/2
+
+%% Binaries and header/avp lists are sent as-is.
+make_reply_packet(Bin, _)
+ when is_binary(Bin) ->
+ #diameter_packet{bin = Bin};
+make_reply_packet([#diameter_header{} | _] = Msg, _) ->
+ #diameter_packet{msg = Msg};
+
+%% Otherwise a reply message clears the R and T flags and retains the
+%% P flag. The E flag will be set at encode.
+make_reply_packet(Msg, #diameter_packet{header = ReqHdr}) ->
+ Hdr = ReqHdr#diameter_header{version = ?DIAMETER_VERSION,
+ is_request = false,
+ is_error = undefined,
+ is_retransmitted = false},
+ #diameter_packet{header = Hdr,
+ msg = Msg}.
+
+%% rc/1
+
+rc({RC, _}) ->
+ RC;
+rc(RC) ->
+ RC.
+
+%% rc/4
+
+rc(Rec, RC, Failed, Dict)
+ when is_integer(RC) ->
+ set(Rec, [{'Result-Code', RC} | failed_avp(Rec, Failed, Dict)], Dict).
+
+%% Reply as name and tuple list ...
+set([_|_] = Ans, Avps, _) ->
+ Ans ++ Avps; %% Values nearer tail take precedence.
+
+%% ... or record.
+set(Rec, Avps, Dict) ->
+ Dict:'#set-'(Avps, Rec).
+
+%% failed_avp/3
+
+failed_avp(_, [] = No, _) ->
+ No;
+
+failed_avp(Rec, Failed, Dict) ->
+ [fa(Rec, [{'AVP', Failed}], Dict)].
+
+%% Reply as name and tuple list ...
+fa([MsgName | Values], FailedAvp, Dict) ->
+ R = Dict:msg2rec(MsgName),
+ try
+ Dict:'#info-'(R, {index, 'Failed-AVP'}),
+ {'Failed-AVP', [FailedAvp]}
+ catch
+ error: _ ->
+ Avps = proplists:get_value('AVP', Values, []),
+ A = #diameter_avp{name = 'Failed-AVP',
+ value = FailedAvp},
+ {'AVP', [A|Avps]}
+ end;
+
+%% ... or record.
+fa(Rec, FailedAvp, Dict) ->
+ try
+ {'Failed-AVP', [FailedAvp]}
+ catch
+ error: _ ->
+ Avps = Dict:'get-'('AVP', Rec),
+ A = #diameter_avp{name = 'Failed-AVP',
+ value = FailedAvp},
+ {'AVP', [A|Avps]}
+ end.
+
+%% 3. Diameter Header
+%%
+%% E(rror) - If set, the message contains a protocol error,
+%% and the message will not conform to the ABNF
+%% described for this command. Messages with the 'E'
+%% bit set are commonly referred to as error
+%% messages. This bit MUST NOT be set in request
+%% messages. See Section 7.2.
+
+%% 3.2. Command Code ABNF specification
+%%
+%% e-bit = ", ERR"
+%% ; If present, the 'E' bit in the Command
+%% ; Flags is set, indicating that the answer
+%% ; message contains a Result-Code AVP in
+%% ; the "protocol error" class.
+
+%% 7.1.3. Protocol Errors
+%%
+%% Errors that fall within the Protocol Error category SHOULD be treated
+%% on a per-hop basis, and Diameter proxies MAY attempt to correct the
+%% error, if it is possible. Note that these and only these errors MUST
+%% only be used in answer messages whose 'E' bit is set.
+
+%% Thus, only construct answers to protocol errors. Other errors
+%% require an message-specific answer and must be handled by the
+%% application.
+
+%% 6.2. Diameter Answer Processing
+%%
+%% When a request is locally processed, the following procedures MUST be
+%% applied to create the associated answer, in addition to any
+%% additional procedures that MAY be discussed in the Diameter
+%% application defining the command:
+%%
+%% - The same Hop-by-Hop identifier in the request is used in the
+%% answer.
+%%
+%% - The local host's identity is encoded in the Origin-Host AVP.
+%%
+%% - The Destination-Host and Destination-Realm AVPs MUST NOT be
+%% present in the answer message.
+%%
+%% - The Result-Code AVP is added with its value indicating success or
+%% failure.
+%%
+%% - If the Session-Id is present in the request, it MUST be included
+%% in the answer.
+%%
+%% - Any Proxy-Info AVPs in the request MUST be added to the answer
+%% message, in the same order they were present in the request.
+%%
+%% - The 'P' bit is set to the same value as the one in the request.
+%%
+%% - The same End-to-End identifier in the request is used in the
+%% answer.
+%%
+%% Note that the error messages (see Section 7.3) are also subjected to
+%% the above processing rules.
+
+%% 7.3. Error-Message AVP
+%%
+%% The Error-Message AVP (AVP Code 281) is of type UTF8String. It MAY
+%% accompany a Result-Code AVP as a human readable error message. The
+%% Error-Message AVP is not intended to be useful in real-time, and
+%% SHOULD NOT be expected to be parsed by network entities.
+
+%% answer_message/2
+
+answer_message({OH, OR, RC}, Avps) ->
+ {Code, _, Vid} = ?BASE:avp_header('Session-Id'),
+ ['answer-message', {'Origin-Host', OH},
+ {'Origin-Realm', OR},
+ {'Result-Code', RC}
+ | session_id(Code, Vid, Avps)].
+
+session_id(Code, Vid, Avps)
+ when is_list(Avps) ->
+ try
+ {value, #diameter_avp{data = D}} = find_avp(Code, Vid, Avps),
+ [{'Session-Id', [?BASE:avp(decode, D, 'Session-Id')]}]
+ catch
+ error: _ ->
+ []
+ end.
+
+%% find_avp/3
+
+find_avp(Code, Vid, Avps)
+ when is_integer(Code), (undefined == Vid orelse is_integer(Vid)) ->
+ find(fun(A) -> is_avp(Code, Vid, A) end, Avps).
+
+%% The final argument here could be a list of AVP's, depending on the case,
+%% but we're only searching at the top level.
+is_avp(Code, Vid, #diameter_avp{code = Code, vendor_id = Vid}) ->
+ true;
+is_avp(_, _, _) ->
+ false.
+
+find(_, []) ->
+ false;
+find(Pred, [H|T]) ->
+ case Pred(H) of
+ true ->
+ {value, H};
+ false ->
+ find(Pred, T)
+ end.
+
+%% 7. Error Handling
+%%
+%% There are certain Result-Code AVP application errors that require
+%% additional AVPs to be present in the answer. In these cases, the
+%% Diameter node that sets the Result-Code AVP to indicate the error
+%% MUST add the AVPs. Examples are:
+%%
+%% - An unrecognized AVP is received with the 'M' bit (Mandatory bit)
+%% set, causes an answer to be sent with the Result-Code AVP set to
+%% DIAMETER_AVP_UNSUPPORTED, and the Failed-AVP AVP containing the
+%% offending AVP.
+%%
+%% - An AVP that is received with an unrecognized value causes an
+%% answer to be returned with the Result-Code AVP set to
+%% DIAMETER_INVALID_AVP_VALUE, with the Failed-AVP AVP containing the
+%% AVP causing the error.
+%%
+%% - A command is received with an AVP that is omitted, yet is
+%% mandatory according to the command's ABNF. The receiver issues an
+%% answer with the Result-Code set to DIAMETER_MISSING_AVP, and
+%% creates an AVP with the AVP Code and other fields set as expected
+%% in the missing AVP. The created AVP is then added to the Failed-
+%% AVP AVP.
+%%
+%% The Result-Code AVP describes the error that the Diameter node
+%% encountered in its processing. In case there are multiple errors,
+%% the Diameter node MUST report only the first error it encountered
+%% (detected possibly in some implementation dependent order). The
+%% specific errors that can be described by this AVP are described in
+%% the following section.
+
+%% 7.5. Failed-AVP AVP
+%%
+%% The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides
+%% debugging information in cases where a request is rejected or not
+%% fully processed due to erroneous information in a specific AVP. The
+%% value of the Result-Code AVP will provide information on the reason
+%% for the Failed-AVP AVP.
+%%
+%% The possible reasons for this AVP are the presence of an improperly
+%% constructed AVP, an unsupported or unrecognized AVP, an invalid AVP
+%% value, the omission of a required AVP, the presence of an explicitly
+%% excluded AVP (see tables in Section 10), or the presence of two or
+%% more occurrences of an AVP which is restricted to 0, 1, or 0-1
+%% occurrences.
+%%
+%% A Diameter message MAY contain one Failed-AVP AVP, containing the
+%% entire AVP that could not be processed successfully. If the failure
+%% reason is omission of a required AVP, an AVP with the missing AVP
+%% code, the missing vendor id, and a zero filled payload of the minimum
+%% required length for the omitted AVP will be added.
+
+%%% ---------------------------------------------------------------------------
+%%% # handle_answer/3
+%%% ---------------------------------------------------------------------------
+
+%% Process an answer message in call-specific process.
+
+handle_answer(SvcName, _, {error, Req, Reason}) ->
+ handle_error(Req, Reason, SvcName);
+
+handle_answer(SvcName,
+ AnswerErrors,
+ {answer, #request{dictionary = Dict} = Req, Pkt}) ->
+ a(examine(diameter_codec:decode(Dict, Pkt)),
+ SvcName,
+ AnswerErrors,
+ Req).
+
+%% We don't really need to do a full decode if we're a relay and will
+%% just resend with a new hop by hop identifier, but might a proxy
+%% want to examine the answer?
+
+a(#diameter_packet{errors = []}
+ = Pkt,
+ SvcName,
+ AE,
+ #request{transport = TPid,
+ dictionary = Dict,
+ caps = Caps,
+ packet = P}
+ = Req) ->
+ try
+ incr(in, Pkt, Dict, TPid)
+ of
+ _ ->
+ cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}])
+ catch
+ exit: {invalid_error_bit, _} = E ->
+ e(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req)
+ end;
+
+a(#diameter_packet{} = Pkt, SvcName, AE, Req) ->
+ e(Pkt, SvcName, AE, Req).
+
+e(Pkt, SvcName, callback, #request{transport = TPid,
+ caps = Caps,
+ packet = Pkt}
+ = Req) ->
+ cb(Req, handle_answer, [Pkt, msg(Pkt), SvcName, {TPid, Caps}]);
+e(Pkt, SvcName, report, Req) ->
+ x(errors, handle_answer, [SvcName, Req, Pkt]);
+e(Pkt, SvcName, discard, Req) ->
+ x({errors, handle_answer, [SvcName, Req, Pkt]}).
+
+%% Note that we don't check that the application id in the answer's
+%% header is what we expect. (TODO: Does the rfc says anything about
+%% this?)
+
+%% incr/4
+%%
+%% Increment a stats counter for an incoming or outgoing message.
+
+%% TODO: fix
+incr(_, #diameter_packet{msg = undefined}, _, _) ->
+ ok;
+
+incr(Dir, Pkt, Dict, TPid)
+ when is_pid(TPid) ->
+ #diameter_packet{header = #diameter_header{is_error = E}
+ = Hdr,
+ msg = Rec}
+ = Pkt,
+
+ D = choose(E, ?BASE, Dict),
+ RC = int(get_avp_value(D, 'Result-Code', Rec)),
+ PE = is_protocol_error(RC),
+
+ %% Check that the E bit is set only for 3xxx result codes.
+ (not (E orelse PE))
+ orelse (E andalso PE)
+ orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]),
+
+ Ctr = rc_counter(D, Rec, RC),
+ is_tuple(Ctr)
+ andalso incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}).
+
+%% incr/2
+
+incr(TPid, Counter) ->
+ diameter_stats:incr(Counter, TPid, 1).
+
+%% RFC 3588, 7.6:
+%%
+%% All Diameter answer messages defined in vendor-specific
+%% applications MUST include either one Result-Code AVP or one
+%% Experimental-Result AVP.
+%%
+%% Maintain statistics assuming one or the other, not both, which is
+%% surely the intent of the RFC.
+
+rc_counter(_, _, RC)
+ when is_integer(RC) ->
+ {'Result-Code', RC};
+rc_counter(D, Rec, _) ->
+ rcc(get_avp_value(D, 'Experimental-Result', Rec)).
+
+%% Outgoing answers may be in any of the forms messages can be sent
+%% in. Incoming messages will be records. We're assuming here that the
+%% arity of the result code AVP's is 0 or 1.
+
+rcc([{_,_,RC} = T])
+ when is_integer(RC) ->
+ T;
+rcc({_,_,RC} = T)
+ when is_integer(RC) ->
+ T;
+rcc(_) ->
+ undefined.
+
+int([N])
+ when is_integer(N) ->
+ N;
+int(N)
+ when is_integer(N) ->
+ N;
+int(_) ->
+ undefined.
+
+is_protocol_error(RC) ->
+ 3000 =< RC andalso RC < 4000.
+
+-spec x(any(), atom(), list()) -> no_return().
+
+%% Warn and exit request process on errors in an incoming answer.
+x(Reason, F, A) ->
+ diameter_lib:warning_report(Reason, {?MODULE, F, A}),
+ x(Reason).
+
+x(T) ->
+ exit(T).
+
+%%% ---------------------------------------------------------------------------
+%%% # failover/[23]
+%%% ---------------------------------------------------------------------------
+
+%% Failover as a consequence of request_peer_down/2.
+failover({_, #request{handler = Pid} = Req, TRef}, S) ->
+ Pid ! {failover, TRef, rt(Req, S)}.
+
+%% Failover as a consequence of store_request/4.
+failover(TRef, Seqs, S)
+ when is_reference(TRef) ->
+ case lookup_request(Seqs, TRef) of
+ #request{} = Req ->
+ failover({Seqs, Req, TRef}, S);
+ false ->
+ ok
+ end.
+
+%% prepare_request returned a binary ...
+rt(#request{packet = #diameter_packet{msg = undefined}}, _) ->
+ false; %% TODO: Not what we should do.
+
+%% ... or not.
+rt(#request{packet = #diameter_packet{msg = Msg}, dictionary = D} = Req, S) ->
+ find_transport(get_destination(Msg, D), Req, S).
+
+%%% ---------------------------------------------------------------------------
+%%% # report_status/5
+%%% ---------------------------------------------------------------------------
+
+report_status(Status,
+ #peer{ref = Ref,
+ conn = TPid,
+ type = Type,
+ options = Opts},
+ #conn{apps = [_|_] = As,
+ caps = Caps},
+ #state{service_name = SvcName}
+ = S,
+ Extra) ->
+ share_peer(Status, Caps, As, TPid, S),
+ Info = [Status, Ref, {TPid, Caps}, {type(Type), Opts} | Extra],
+ send_event(SvcName, list_to_tuple(Info)).
+
+%% send_event/2
+
+send_event(SvcName, Info) ->
+ send_event(#diameter_event{service = SvcName,
+ info = Info}).
+
+send_event(#diameter_event{service = SvcName} = E) ->
+ lists:foreach(fun({_, Pid}) -> Pid ! E end, subscriptions(SvcName)).
+
+%%% ---------------------------------------------------------------------------
+%%% # share_peer/5
+%%% ---------------------------------------------------------------------------
+
+share_peer(up, Caps, Aliases, TPid, #state{share_peers = true,
+ service_name = Svc}) ->
+ diameter_peer:notify(Svc, {peer, TPid, Aliases, Caps});
+
+share_peer(_, _, _, _, _) ->
+ ok.
+
+%%% ---------------------------------------------------------------------------
+%%% # share_peers/2
+%%% ---------------------------------------------------------------------------
+
+share_peers(Pid, #state{share_peers = true,
+ local_peers = PDict}) ->
+ ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict);
+
+share_peers(_, #state{share_peers = false}) ->
+ ok.
+
+sp(Pid, Alias, Peers) ->
+ lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers).
+
+%%% ---------------------------------------------------------------------------
+%%% # remote_peer_up/4
+%%% ---------------------------------------------------------------------------
+
+remote_peer_up(Pid, Aliases, Caps, #state{use_shared_peers = true,
+ service = Svc,
+ shared_peers = PDict}
+ = S) ->
+ #diameter_service{applications = Apps} = Svc,
+ Update = lists:filter(fun(A) ->
+ lists:keymember(A, #diameter_app.alias, Apps)
+ end,
+ Aliases),
+ S#state{shared_peers = rpu(Pid, Caps, PDict, Update)};
+
+remote_peer_up(_, _, _, #state{use_shared_peers = false} = S) ->
+ S.
+
+rpu(_, _, PDict, []) ->
+ PDict;
+rpu(Pid, Caps, PDict, Aliases) ->
+ erlang:monitor(process, Pid),
+ T = {Pid, Caps},
+ lists:foldl(fun(A,D) -> ?Dict:append(A, T, D) end,
+ PDict,
+ Aliases).
+
+%%% ---------------------------------------------------------------------------
+%%% # remote_peer_down/2
+%%% ---------------------------------------------------------------------------
+
+remote_peer_down(Pid, #state{use_shared_peers = true,
+ shared_peers = PDict}
+ = S) ->
+ S#state{shared_peers = lists:foldl(fun(A,D) -> rpd(Pid, A, D) end,
+ PDict,
+ ?Dict:fetch_keys(PDict))}.
+
+rpd(Pid, Alias, PDict) ->
+ ?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict).
+
+%%% ---------------------------------------------------------------------------
+%%% find_transport/[34]
+%%%
+%%% Output: {TransportPid, #diameter_caps{}, #diameter_app{}}
+%%% | false
+%%% | {error, Reason}
+%%% ---------------------------------------------------------------------------
+
+%% Initial call, from an arbitrary process.
+find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) ->
+ #diameter_service{applications = Apps} = Svc,
+ ft(find_send_app(Alias, Apps), Msg, Opts, S);
+
+%% Relay or proxy send.
+find_transport(#diameter_app{} = App, Msg, Opts, S) ->
+ ft(App, Msg, Opts, S).
+
+ft(#diameter_app{module = Mod, dictionary = D} = App, Msg, Opts, S) ->
+ #options{filter = Filter,
+ extra = Xtra}
+ = Opts,
+ pick_peer(App#diameter_app{module = Mod ++ Xtra},
+ get_destination(Msg, D),
+ Filter,
+ S);
+ft(false = No, _, _, _) ->
+ No.
+
+%% This can't be used if we're a relay and sending a message
+%% in an application not known locally. (TODO)
+find_send_app(Alias, Apps) ->
+ case lists:keyfind(Alias, #diameter_app.alias, Apps) of
+ #diameter_app{id = ?APP_ID_RELAY} ->
+ false;
+ T ->
+ T
+ end.
+
+%% Retransmission, in the service process.
+find_transport([_,_] = RH,
+ Req,
+ #state{service = #diameter_service{pid = Pid,
+ applications = Apps}}
+ = S)
+ when self() == Pid ->
+ #request{app = Alias,
+ filter = Filter,
+ module = ModX}
+ = Req,
+ #diameter_app{}
+ = App
+ = lists:keyfind(Alias, #diameter_app.alias, Apps),
+
+ pick_peer(App#diameter_app{module = ModX},
+ RH,
+ Filter,
+ S).
+
+%% get_destination/2
+
+get_destination(Msg, Dict) ->
+ [str(get_avp_value(Dict, 'Destination-Realm', Msg)),
+ str(get_avp_value(Dict, 'Destination-Host', Msg))].
+
+%% This is not entirely correct. The avp could have an arity 1, in
+%% which case an empty list is a DiameterIdentity of length 0 rather
+%% than the list of no values we treat it as by mapping to undefined.
+%% This behaviour is documented.
+str([]) ->
+ undefined;
+str(T) ->
+ T.
+
+%% get_avp_value/3
+%%
+%% Find an AVP in a message of one of three forms:
+%%
+%% - a message record (as generated from a .dia spec) or
+%% - a list of an atom message name followed by 2-tuple, avp name/value pairs.
+%% - a list of a #diameter_header{} followed by #diameter_avp{} records,
+%%
+%% In the first two forms a dictionary module is used at encode to
+%% identify the type of the AVP and its arity in the message in
+%% question. The third form allows messages to be sent as is, without
+%% a dictionary, which is needed in the case of relay agents, for one.
+
+get_avp_value(Dict, Name, [#diameter_header{} | Avps]) ->
+ try
+ {Code, _, VId} = Dict:avp_header(Name),
+ [A|_] = lists:dropwhile(fun(#diameter_avp{code = C, vendor_id = V}) ->
+ C /= Code orelse V /= VId
+ end,
+ Avps),
+ avp_decode(Dict, Name, A)
+ catch
+ error: _ ->
+ undefined
+ end;
+
+get_avp_value(_, Name, [_MsgName | Avps]) ->
+ case lists:keyfind(Name, 1, Avps) of
+ {_, V} ->
+ V;
+ _ ->
+ undefined
+ end;
+
+%% Message is typically a record but not necessarily: diameter:call/4
+%% can be passed an arbitrary term.
+get_avp_value(Dict, Name, Rec) ->
+ try
+ Dict:'#get-'(Name, Rec)
+ catch
+ error:_ ->
+ undefined
+ end.
+
+avp_decode(Dict, Name, #diameter_avp{value = undefined,
+ data = Bin}) ->
+ Dict:avp(decode, Bin, Name);
+avp_decode(_, _, #diameter_avp{value = V}) ->
+ V.
+
+%%% ---------------------------------------------------------------------------
+%%% # pick_peer(App, [DestRealm, DestHost], Filter, #state{})
+%%%
+%%% Output: {TransportPid, #diameter_caps{}, App}
+%%% | false
+%%% | {error, Reason}
+%%% ---------------------------------------------------------------------------
+
+%% Find transports to a given realm/host.
+
+pick_peer(#diameter_app{alias = Alias}
+ = App,
+ [_,_] = RH,
+ Filter,
+ #state{local_peers = L,
+ shared_peers = S,
+ service_name = SvcName,
+ service = #diameter_service{pid = Pid}}) ->
+ pick_peer(peers(Alias, RH, Filter, L),
+ peers(Alias, RH, Filter, S),
+ Pid,
+ SvcName,
+ App).
+
+%% pick_peer/5
+
+pick_peer([], [], _, _, _) ->
+ false;
+
+%% App state is mutable but we're not in the service process: go there.
+pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App)
+ when self() /= Pid ->
+ call_service(Pid, {pick_peer, Local, Remote, App});
+
+%% App state isn't mutable or it is and we're in the service process:
+%% do the deed.
+pick_peer(Local,
+ Remote,
+ _Pid,
+ SvcName,
+ #diameter_app{module = ModX,
+ alias = Alias,
+ init_state = S,
+ mutable = M}
+ = App) ->
+ MFA = {ModX, pick_peer, [Local, Remote, SvcName]},
+
+ try state_cb(App, MFA) of
+ {ok, {TPid, #diameter_caps{} = Caps}} when is_pid(TPid) ->
+ {TPid, Caps, App};
+ {{TPid, #diameter_caps{} = Caps}, ModS} when is_pid(TPid), M ->
+ mod_state(Alias, ModS),
+ {TPid, Caps, App};
+ {false = No, ModS} when M ->
+ mod_state(Alias, ModS),
+ No;
+ {ok, false = No} ->
+ No;
+ false = No ->
+ No;
+ {{TPid, #diameter_caps{} = Caps}, S} when is_pid(TPid) ->
+ {TPid, Caps, App}; %% Accept returned state in the immutable
+ {false = No, S} -> %% case as long it isn't changed.
+ No;
+ T ->
+ diameter_lib:error_report({invalid, T, App}, MFA)
+ catch
+ E: Reason ->
+ diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA)
+ end.
+
+%% peers/4
+
+peers(Alias, RH, Filter, Peers) ->
+ case ?Dict:find(Alias, Peers) of
+ {ok, L} ->
+ ps(L, RH, Filter, {[],[]});
+ error ->
+ []
+ end.
+
+%% Place a peer whose Destination-Host/Realm matches those of the
+%% request at the front of the result list. Could add some sort of
+%% 'sort' option to allow more control.
+
+ps([], _, _, {Ys, Ns}) ->
+ lists:reverse(Ys, Ns);
+ps([{_TPid, #diameter_caps{} = Caps} = TC | Rest], RH, Filter, Acc) ->
+ ps(Rest, RH, Filter, pacc(caps_filter(Caps, RH, Filter),
+ caps_filter(Caps, RH, {all, [host, realm]}),
+ TC,
+ Acc)).
+
+pacc(true, true, Peer, {Ts, Fs}) ->
+ {[Peer|Ts], Fs};
+pacc(true, false, Peer, {Ts, Fs}) ->
+ {Ts, [Peer|Fs]};
+pacc(_, _, _, Acc) ->
+ Acc.
+
+%% caps_filter/3
+
+caps_filter(C, RH, {neg, F}) ->
+ not caps_filter(C, RH, F);
+
+caps_filter(C, RH, {all, L})
+ when is_list(L) ->
+ lists:all(fun(F) -> caps_filter(C, RH, F) end, L);
+
+caps_filter(C, RH, {any, L})
+ when is_list(L) ->
+ lists:any(fun(F) -> caps_filter(C, RH, F) end, L);
+
+caps_filter(#diameter_caps{origin_host = {_,OH}}, [_,DH], host) ->
+ eq(undefined, DH, OH);
+
+caps_filter(#diameter_caps{origin_realm = {_,OR}}, [DR,_], realm) ->
+ eq(undefined, DR, OR);
+
+caps_filter(C, _, Filter) ->
+ caps_filter(C, Filter).
+
+%% caps_filter/2
+
+caps_filter(_, none) ->
+ true;
+
+caps_filter(#diameter_caps{origin_host = {_,OH}}, {host, H}) ->
+ eq(any, H, OH);
+
+caps_filter(#diameter_caps{origin_realm = {_,OR}}, {realm, R}) ->
+ eq(any, R, OR);
+
+%% Anything else is expected to be an eval filter. Filter failure is
+%% documented as being equivalent to a non-matching filter.
+
+caps_filter(C, T) ->
+ try
+ {eval, F} = T,
+ diameter_lib:eval([F,C])
+ catch
+ _:_ -> false
+ end.
+
+eq(Any, Id, PeerId) ->
+ Any == Id orelse try
+ iolist_to_binary(Id) == iolist_to_binary(PeerId)
+ catch
+ _:_ -> false
+ end.
+%% OctetString() can be specified as an iolist() so test for string
+%% rather then term equality.
+
+%% transports/1
+
+transports(#state{peerT = PeerT}) ->
+ ets:select(PeerT, [{#peer{conn = '$1', _ = '_'},
+ [{'is_pid', '$1'}],
+ ['$1']}]).
+
+%%% ---------------------------------------------------------------------------
+%%% # service_info/2
+%%% ---------------------------------------------------------------------------
+
+%% The config passed to diameter:start_service/2.
+-define(CAP_INFO, ['Origin-Host',
+ 'Origin-Realm',
+ 'Vendor-Id',
+ 'Product-Name',
+ 'Origin-State-Id',
+ 'Host-IP-Address',
+ 'Supported-Vendor-Id',
+ 'Auth-Application-Id',
+ 'Inband-Security-Id',
+ 'Acct-Application-Id',
+ 'Vendor-Specific-Application-Id',
+ 'Firmware-Revision']).
+
+-define(ALL_INFO, [capabilities,
+ applications,
+ transport,
+ pending,
+ statistics]).
+
+service_info(Items, S)
+ when is_list(Items) ->
+ [{complete(I), service_info(I,S)} || I <- Items];
+service_info(Item, S)
+ when is_atom(Item) ->
+ service_info(Item, S, true).
+
+service_info(Item, #state{service = Svc} = S, Complete) ->
+ case Item of
+ name ->
+ S#state.service_name;
+ 'Origin-Host' ->
+ (Svc#diameter_service.capabilities)
+ #diameter_caps.origin_host;
+ 'Origin-Realm' ->
+ (Svc#diameter_service.capabilities)
+ #diameter_caps.origin_realm;
+ 'Vendor-Id' ->
+ (Svc#diameter_service.capabilities)
+ #diameter_caps.vendor_id;
+ 'Product-Name' ->
+ (Svc#diameter_service.capabilities)
+ #diameter_caps.product_name;
+ 'Origin-State-Id' ->
+ (Svc#diameter_service.capabilities)
+ #diameter_caps.origin_state_id;
+ 'Host-IP-Address' ->
+ (Svc#diameter_service.capabilities)
+ #diameter_caps.host_ip_address;
+ 'Supported-Vendor-Id' ->
+ (Svc#diameter_service.capabilities)
+ #diameter_caps.supported_vendor_id;
+ 'Auth-Application-Id' ->
+ (Svc#diameter_service.capabilities)
+ #diameter_caps.auth_application_id;
+ 'Inband-Security-Id' ->
+ (Svc#diameter_service.capabilities)
+ #diameter_caps.inband_security_id;
+ 'Acct-Application-Id' ->
+ (Svc#diameter_service.capabilities)
+ #diameter_caps.acct_application_id;
+ 'Vendor-Specific-Application-Id' ->
+ (Svc#diameter_service.capabilities)
+ #diameter_caps.vendor_specific_application_id;
+ 'Firmware-Revision' ->
+ (Svc#diameter_service.capabilities)
+ #diameter_caps.firmware_revision;
+ capabilities -> service_info(?CAP_INFO, S);
+ applications -> info_apps(S);
+ transport -> info_transport(S);
+ pending -> info_pending(S);
+ statistics -> info_stats(S);
+ keys -> ?ALL_INFO ++ ?CAP_INFO; %% mostly for test
+ all -> service_info(?ALL_INFO, S);
+ _ when Complete -> service_info(complete(Item), S, false);
+ _ -> undefined
+ end.
+
+complete(Pre) ->
+ P = atom_to_list(Pre),
+ case [I || I <- [name | ?ALL_INFO] ++ ?CAP_INFO,
+ lists:prefix(P, atom_to_list(I))]
+ of
+ [I] -> I;
+ _ -> Pre
+ end.
+
+info_stats(#state{peerT = PeerT}) ->
+ Peers = ets:select(PeerT, [{#peer{ref = '$1', conn = '$2', _ = '_'},
+ [{'is_pid', '$2'}],
+ [['$1', '$2']]}]),
+ diameter_stats:read(lists:append(Peers)).
+%% TODO: include peer identities in return value
+
+info_transport(#state{peerT = PeerT, connT = ConnT}) ->
+ dict:fold(fun it/3,
+ [],
+ ets:foldl(fun(T,A) -> it_acc(ConnT, A, T) end,
+ dict:new(),
+ PeerT)).
+
+it(Ref, [[{type, connect} | _] = L], Acc) ->
+ [[{ref, Ref} | L] | Acc];
+it(Ref, [[{type, accept}, {options, Opts} | _] | _] = L, Acc) ->
+ [[{ref, Ref},
+ {type, listen},
+ {options, Opts},
+ {accept, [lists:nthtail(2,A) || A <- L]}]
+ | Acc].
+%% Each entry has the same Opts. (TODO)
+
+it_acc(ConnT, Acc, #peer{pid = Pid,
+ type = Type,
+ ref = Ref,
+ options = Opts,
+ op_state = OS,
+ started = T,
+ conn = TPid}) ->
+ dict:append(Ref,
+ [{type, Type},
+ {options, Opts},
+ {watchdog, {Pid, T, OS}}
+ | info_conn(ConnT, TPid)],
+ Acc).
+
+info_conn(ConnT, TPid) ->
+ info_conn(ets:lookup(ConnT, TPid)).
+
+info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
+ [{peer, {Pid, T}},
+ {apps, SApps},
+ {caps, info_caps(Caps)}];
+info_conn([] = No) ->
+ No.
+
+info_caps(#diameter_caps{} = C) ->
+ lists:zip(record_info(fields, diameter_caps), tl(tuple_to_list(C))).
+
+info_apps(#state{service = #diameter_service{applications = Apps}}) ->
+ lists:map(fun mk_app/1, Apps).
+
+mk_app(#diameter_app{alias = Alias,
+ dictionary = Dict,
+ module = ModX,
+ id = Id}) ->
+ [{alias, Alias},
+ {dictionary, Dict},
+ {module, ModX},
+ {id, Id}].
+
+info_pending(#state{} = S) ->
+ MatchSpec = [{{'$1',
+ #request{transport = '$2',
+ from = '$3',
+ app = '$4',
+ _ = '_'},
+ '_'},
+ [?ORCOND([{'==', T, '$2'} || T <- transports(S)])],
+ [{{'$1', [{{app, '$4'}},
+ {{transport, '$2'}},
+ {{from, '$3'}}]}}]}],
+
+ ets:select(?REQUEST_TABLE, MatchSpec).