%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%%
%% Implements the process that represents a service.
%%
-module(diameter_service).
-behaviour(gen_server).
-export([start/1,
stop/1,
start_transport/2,
stop_transport/2,
info/2,
call/4]).
%% towards diameter_watchdog
-export([receive_message/3]).
%% service supervisor
-export([start_link/1]).
-export([subscribe/1,
unsubscribe/1,
subscriptions/1,
subscriptions/0,
services/0,
services/1,
whois/1,
flush_stats/1]).
%% test/debug
-export([call_module/3,
state/1,
uptime/1]).
%%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
%% Other callbacks.
-export([send/1]).
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
-define(STATE_UP, up).
-define(STATE_DOWN, down).
-define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1
-define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests
-define(RESTART_TC, 1000). %% if restart was this recent
%% Used to be able to swap this with anything else dict-like but now
%% rely on the fact that a service's #state{} record does not change
%% in storing in it ?STATE table and not always going through the
%% service process. In particular, rely on the fact that operations on
%% a ?Dict don't change the handle to it.
-define(Dict, diameter_dict).
%% Table containing outgoing requests for which a reply has yet to be
%% received.
-define(REQUEST_TABLE, diameter_request).
%% Maintains state in a table. In contrast to previously, a service's
%% stat is not constant and is accessed outside of the service
%% process.
-define(STATE_TABLE, ?MODULE).
%% Workaround for dialyzer's lack of understanding of match specs.
-type match(T)
:: T | '_' | '$1' | '$2' | '$3' | '$4'.
%% State of service gen_server.
-record(state,
{id = now(),
service_name, %% as passed to start_service/2, key in ?STATE_TABLE
service :: #diameter_service{},
peerT = ets_new(peers) :: ets:tid(), %% #peer{} at start_fsm
connT = ets_new(conns) :: ets:tid(), %% #conn{} at connection_up
share_peers = false :: boolean(), %% broadcast peers to remote nodes?
use_shared_peers = false :: boolean(), %% use broadcasted peers?
shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
monitor = false :: false | pid()}). %% process to die with
%% shared_peers reflects the peers broadcast from remote nodes. Note
%% that the state term itself doesn't change, which is relevant for
%% the stateless application callbacks since the state is retrieved
%% from ?STATE_TABLE from outside the service process. The pid in the
%% service record is used to determine whether or not we need to call
%% the process for a pick_peer callback.
%% Record representing a watchdog process.
-record(peer,
{pid :: match(pid()),
type :: match(connect | accept),
ref :: match(reference()), %% key into diameter_config
options :: match([diameter:transport_opt()]),%% from start_transport
op_state = ?STATE_DOWN :: match(?STATE_DOWN | ?STATE_UP),
started = now(), %% at process start
conn = false :: match(boolean() | pid())}).
%% true at accept, pid() at connection_up (connT key)
%% Record representing a peer_fsm process.
-record(conn,
{pid :: pid(),
apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias}
caps :: #diameter_caps{},
started = now(), %% at process start
peer :: pid()}). %% key into peerT
%% Record stored in diameter_request for each outgoing request.
-record(request,
{from, %% arg 2 of handle_call/3
handler :: match(pid()), %% request process
transport :: match(pid()), %% peer process
caps :: match(#diameter_caps{}),
app :: match(diameter:app_alias()), %% #diameter_app.alias
dictionary :: match(module()), %% #diameter_app.dictionary
module :: match([module() | list()]),
%% #diameter_app.module
filter :: match(diameter:peer_filter()),
packet :: match(#diameter_packet{})}).
%% Record call/4 options are parsed into.
-record(options,
{filter = none :: diameter:peer_filter(),
extra = [] :: list(),
timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF,
detach = false :: boolean()}).
%% Since RFC 3588 requires that a Diameter agent not modify End-to-End
%% Identifiers, the possibility of explicitly setting an End-to-End
%% Identifier would be needed to be able to implement an agent in
%% which one side of the communication is not implemented on top of
%% diameter. For example, Diameter being sent or received encapsulated
%% in some other protocol, or even another Diameter stack in a
%% non-Erlang environment. (Not that this is likely to be a normal
%% case.)
%%
%% The implemented solution is not an option but to respect any header
%% values set in a diameter_header record returned from a
%% prepare_request callback. A call to diameter:call/4 can communicate
%% values to the callback using the 'extra' option if so desired.
%%% ---------------------------------------------------------------------------
%%% # start(SvcName)
%%% ---------------------------------------------------------------------------
start(SvcName) ->
diameter_service_sup:start_child(SvcName).
start_link(SvcName) ->
Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}],
gen_server:start_link(?MODULE, [SvcName], Options).
%% Put the arbitrary term SvcName in a list in case we ever want to
%% send more than this and need to distinguish old from new.
%%% ---------------------------------------------------------------------------
%%% # stop(SvcName)
%%% ---------------------------------------------------------------------------
stop(SvcName) ->
case whois(SvcName) of
undefined ->
{error, not_started};
Pid ->
stop(call_service(Pid, stop), Pid)
end.
stop(ok, Pid) ->
MRef = erlang:monitor(process, Pid),
receive {'DOWN', MRef, process, _, _} -> ok end;
stop(No, _) ->
No.
%%% ---------------------------------------------------------------------------
%%% # start_transport(SvcName, {Ref, Type, Opts})
%%% ---------------------------------------------------------------------------
start_transport(SvcName, {_,_,_} = T) ->
call_service_by_name(SvcName, {start, T}).
%%% ---------------------------------------------------------------------------
%%% # stop_transport(SvcName, Refs)
%%% ---------------------------------------------------------------------------
stop_transport(_, []) ->
ok;
stop_transport(SvcName, [_|_] = Refs) ->
call_service_by_name(SvcName, {stop, Refs}).
%%% ---------------------------------------------------------------------------
%%% # info(SvcName, Item)
%%% ---------------------------------------------------------------------------
info(SvcName, Item) ->
info_rc(call_service_by_name(SvcName, {info, Item})).
info_rc({error, _}) ->
undefined;
info_rc(Info) ->
Info.
%%% ---------------------------------------------------------------------------
%%% # receive_message(TPid, Pkt, MessageData)
%%% ---------------------------------------------------------------------------
%% Handle an incoming message in the watchdog process. This used to
%% come through the service process but this avoids that becoming a
%% bottleneck.
receive_message(TPid, Pkt, T)
when is_pid(TPid) ->
#diameter_packet{header = #diameter_header{is_request = R}} = Pkt,
recv(R, (not R) andalso lookup_request(Pkt, TPid), TPid, Pkt, T).
%% Incoming request ...
recv(true, false, TPid, Pkt, T) ->
try
spawn(fun() -> recv_request(TPid, Pkt, T) end)
catch
error: system_limit = E -> %% discard
?LOG({error, E}, now())
end;
%% ... answer to known request ...
recv(false, #request{from = {_, Ref}, handler = Pid} = Req, _, Pkt, _) ->
Pid ! {answer, Ref, Req, Pkt};
%% Note that failover could have happened prior to this message being
%% received and triggering failback. That is, both a failover message
%% and answer may be on their way to the handler process. In the worst
%% case the request process gets notification of the failover and
%% sends to the alternate peer before an answer arrives, so it's
%% always the case that we can receive more than one answer after
%% failover. The first answer received by the request process wins,
%% any others are discarded.
%% ... or not.
recv(false, false, _, _, _) ->
ok.
%%% ---------------------------------------------------------------------------
%%% # call(SvcName, App, Msg, Options)
%%% ---------------------------------------------------------------------------
call(SvcName, App, Msg, Options)
when is_list(Options) ->
Rec = make_options(Options),
Ref = make_ref(),
Caller = {self(), Ref},
Fun = fun() -> exit({Ref, call(SvcName, App, Msg, Rec, Caller)}) end,
try spawn_monitor(Fun) of
{_, MRef} ->
recv(MRef, Ref, Rec#options.detach, false)
catch
error: system_limit = E ->
{error, E}
end.
%% Don't rely on gen_server:call/3 for the timeout handling since it
%% makes no guarantees about not leaving a reply message in the
%% mailbox if we catch its exit at timeout. It currently *can* do so,
%% which is also undocumented.
recv(MRef, _, true, true) ->
erlang:demonitor(MRef, [flush]),
ok;
recv(MRef, Ref, Detach, Sent) ->
receive
Ref -> %% send has been attempted
recv(MRef, Ref, Detach, true);
{'DOWN', MRef, process, _, Reason} ->
call_rc(Reason, Ref, Sent)
end.
%% call/5 has returned ...
call_rc({Ref, Ans}, Ref, _) ->
Ans;
%% ... or not. In this case failure/encode are documented.
call_rc(_, _, Sent) ->
{error, choose(Sent, failure, encode)}.
%% call/5
%%
%% In the process spawned for the outgoing request.
call(SvcName, App, Msg, Opts, Caller) ->
c(ets:lookup(?STATE_TABLE, SvcName), App, Msg, Opts, Caller).
c([#state{service_name = SvcName} = S], App, Msg, Opts, Caller) ->
case find_transport(App, Msg, Opts, S) of
{_,_,_} = T ->
send_request(T, Msg, Opts, Caller, SvcName);
false ->
{error, no_connection};
{error, _} = No ->
No
end;
c([], _, _, _, _) ->
{error, no_service}.
%% make_options/1
make_options(Options) ->
lists:foldl(fun mo/2, #options{}, Options).
mo({timeout, T}, Rec)
when is_integer(T), 0 =< T ->
Rec#options{timeout = T};
mo({filter, F}, #options{filter = none} = Rec) ->
Rec#options{filter = F};
mo({filter, F}, #options{filter = {all, Fs}} = Rec) ->
Rec#options{filter = {all, [F | Fs]}};
mo({filter, F}, #options{filter = F0} = Rec) ->
Rec#options{filter = {all, [F0, F]}};
mo({extra, L}, #options{extra = X} = Rec)
when is_list(L) ->
Rec#options{extra = X ++ L};
mo(detach, Rec) ->
Rec#options{detach = true};
mo(T, _) ->
?ERROR({invalid_option, T}).
%%% ---------------------------------------------------------------------------
%%% # subscribe(SvcName)
%%% # unsubscribe(SvcName)
%%% ---------------------------------------------------------------------------
subscribe(SvcName) ->
diameter_reg:add({?MODULE, subscriber, SvcName}).
unsubscribe(SvcName) ->
diameter_reg:del({?MODULE, subscriber, SvcName}).
subscriptions(Pat) ->
pmap(diameter_reg:match({?MODULE, subscriber, Pat})).
subscriptions() ->
subscriptions('_').
pmap(Props) ->
lists:map(fun({{?MODULE, _, Name}, Pid}) -> {Name, Pid} end, Props).
%%% ---------------------------------------------------------------------------
%%% # services(Pattern)
%%% ---------------------------------------------------------------------------
services(Pat) ->
pmap(diameter_reg:match({?MODULE, service, Pat})).
services() ->
services('_').
whois(SvcName) ->
case diameter_reg:match({?MODULE, service, SvcName}) of
[{_, Pid}] ->
Pid;
[] ->
undefined
end.
%%% ---------------------------------------------------------------------------
%%% # flush_stats/1
%%%
%%% Output: list of {{SvcName, Alias, Counter}, Value}
%%% ---------------------------------------------------------------------------
flush_stats(TPid) ->
diameter_stats:flush(TPid).
%% ===========================================================================
%% ===========================================================================
state(Svc) ->
call_service(Svc, state).
uptime(Svc) ->
call_service(Svc, uptime).
%% call_module/3
call_module(Service, AppMod, Request) ->
call_service(Service, {call_module, AppMod, Request}).
%%% ---------------------------------------------------------------------------
%%% # init([SvcName])
%%% ---------------------------------------------------------------------------
init([SvcName]) ->
process_flag(trap_exit, true), %% ensure terminate(shutdown, _)
i(SvcName, diameter_reg:add_new({?MODULE, service, SvcName})).
i(SvcName, true) ->
{ok, i(SvcName)};
i(_, false) ->
{stop, {shutdown, already_started}}.
%%% ---------------------------------------------------------------------------
%%% # handle_call(Req, From, State)
%%% ---------------------------------------------------------------------------
handle_call(state, _, S) ->
{reply, S, S};
handle_call(uptime, _, #state{id = T} = S) ->
{reply, diameter_lib:now_diff(T), S};
%% Start a transport.
handle_call({start, {Ref, Type, Opts}}, _From, S) ->
{reply, start(Ref, {Type, Opts}, S), S};
%% Stop transports.
handle_call({stop, Refs}, _From, S) ->
shutdown(Refs, S),
{reply, ok, S};
%% pick_peer with mutable state
handle_call({pick_peer, Local, Remote, App}, _From, S) ->
#diameter_app{mutable = true} = App, %% assert
{reply, pick_peer(Local, Remote, self(), S#state.service_name, App), S};
handle_call({call_module, AppMod, Req}, From, S) ->
call_module(AppMod, Req, From, S);
handle_call({info, Item}, _From, S) ->
{reply, service_info(Item, S), S};
handle_call(stop, _From, S) ->
shutdown(S),
{stop, normal, ok, S};
%% The server currently isn't guaranteed to be dead when the caller
%% gets the reply. We deal with this in the call to the server,
%% stating a monitor that waits for DOWN before returning.
handle_call(Req, From, S) ->
unexpected(handle_call, [Req, From], S),
{reply, nok, S}.
%%% ---------------------------------------------------------------------------
%%% # handle_cast(Req, State)
%%% ---------------------------------------------------------------------------
handle_cast(Req, S) ->
unexpected(handle_cast, [Req], S),
{noreply, S}.
%%% ---------------------------------------------------------------------------
%%% # handle_info(Req, State)
%%% ---------------------------------------------------------------------------
handle_info(T,S) ->
case transition(T,S) of
ok ->
{noreply, S};
#state{} = NS ->
{noreply, NS};
{stop, Reason} ->
{stop, {shutdown, Reason}, S}
end.
%% transition/2
%% Peer process is telling us to start a new accept process.
transition({accepted, Pid, TPid}, S) ->
accepted(Pid, TPid, S),
ok;
%% Peer process has a new open connection.
transition({connection_up, Pid, T}, S) ->
connection_up(Pid, T, S);
%% Peer process has left state open.
transition({connection_down, Pid}, S) ->
connection_down(Pid, S);
%% Peer process has returned to state open.
transition({connection_up, Pid}, S) ->
connection_up(Pid, S);
%% Accepting transport has lost connectivity.
transition({close, Pid}, S) ->
close(Pid, S),
ok;
%% Connecting transport is being restarted by watchdog.
transition({reconnect, Pid}, S) ->
reconnect(Pid, S),
ok;
%% Monitor process has died. Just die with a reason that tells
%% diameter_config about the happening. If a cleaner shutdown is
%% required then someone should stop us.
transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) ->
{stop, {monitor, Reason}};
%% Local peer process has died.
transition({'DOWN', _, process, Pid, Reason}, S)
when node(Pid) == node() ->
peer_down(Pid, Reason, S);
%% Remote service wants to know about shared transports.
transition({service, Pid}, S) ->
share_peers(Pid, S),
ok;
%% Remote service is communicating a shared peer.
transition({peer, TPid, Aliases, Caps}, S) ->
remote_peer_up(TPid, Aliases, Caps, S);
%% Remote peer process has died.
transition({'DOWN', _, process, TPid, _}, S) ->
remote_peer_down(TPid, S);
%% Restart after tc expiry.
transition({tc_timeout, T}, S) ->
tc_timeout(T, S),
ok;
%% Request process is telling us it may have missed a failover message
%% after a transport went down and the service process looked up
%% outstanding requests.
transition({failover, TRef, Seqs}, S) ->
failover(TRef, Seqs, S),
ok;
transition(Req, S) ->
unexpected(handle_info, [Req], S),
ok.
%%% ---------------------------------------------------------------------------
%%% # terminate(Reason, State)
%%% ---------------------------------------------------------------------------
terminate(Reason, #state{service_name = Name} = S) ->
ets:delete(?STATE_TABLE, Name),
shutdown == Reason %% application shutdown
andalso shutdown(S).
%%% ---------------------------------------------------------------------------
%%% # code_change(FromVsn, State, Extra)
%%% ---------------------------------------------------------------------------
code_change(FromVsn,
#state{service_name = SvcName,
service = #diameter_service{applications = Apps}}
= S,
Extra) ->
lists:foreach(fun(A) ->
code_change(FromVsn, SvcName, Extra, A)
end,
Apps),
{ok, S}.
code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) ->
{ok, S} = cb(A, code_change, [FromVsn,
mod_state(Alias),
Extra,
SvcName]),
mod_state(Alias, S).
%% ===========================================================================
%% ===========================================================================
unexpected(F, A, #state{service_name = Name}) ->
?UNEXPECTED(F, A ++ [Name]).
cb([_|_] = M, F, A) ->
eval(M, F, A);
cb(Rec, F, A) ->
{_, M} = app(Rec),
eval(M, F, A).
app(#request{app = A, module = M}) ->
{A,M};
app(#diameter_app{alias = A, module = M}) ->
{A,M}.
eval([M|X], F, A) ->
apply(M, F, A ++ X).
%% Callback with state.
state_cb(#diameter_app{mutable = false, init_state = S}, {ModX, F, A}) ->
eval(ModX, F, A ++ [S]);
state_cb(#diameter_app{mutable = true, alias = Alias}, {_,_,_} = MFA) ->
state_cb(MFA, Alias);
state_cb({ModX,F,A}, Alias)
when is_list(ModX) ->
eval(ModX, F, A ++ [mod_state(Alias)]).
choose(true, X, _) -> X;
choose(false, _, X) -> X.
ets_new(Tbl) ->
ets:new(Tbl, [{keypos, 2}]).
insert(Tbl, Rec) ->
ets:insert(Tbl, Rec),
Rec.
monitor(Pid) ->
erlang:monitor(process, Pid),
Pid.
%% Using the process dictionary for the callback state was initially
%% just a way to make what was horrendous trace (big state record and
%% much else everywhere) somewhat more readable. There's not as much
%% need for it now but it's no worse (except possibly that we don't
%% see the table identifier being passed around) than an ets table so
%% keep it.
mod_state(Alias) ->
get({?MODULE, mod_state, Alias}).
mod_state(Alias, ModS) ->
put({?MODULE, mod_state, Alias}, ModS).
%%% ---------------------------------------------------------------------------
%%% # shutdown/2
%%% ---------------------------------------------------------------------------
shutdown(Refs, #state{peerT = PeerT}) ->
ets:foldl(fun(P,ok) -> s(P, Refs), ok end, ok, PeerT).
s(#peer{ref = Ref, pid = Pid}, Refs) ->
s(lists:member(Ref, Refs), Pid);
s(true, Pid) ->
Pid ! {shutdown, self()}; %% 'DOWN' will cleanup as usual
s(false, _) ->
ok.
%%% ---------------------------------------------------------------------------
%%% # shutdown/1
%%% ---------------------------------------------------------------------------
shutdown(#state{peerT = PeerT}) ->
%% A transport might not be alive to receive the shutdown request
%% but give those that are a chance to shutdown gracefully.
wait(fun st/2, PeerT),
%% Kill the watchdogs explicitly in case there was no transport.
wait(fun sw/2, PeerT).
wait(Fun, T) ->
diameter_lib:wait(ets:foldl(Fun, [], T)).
st(#peer{conn = B}, Acc)
when is_boolean(B) ->
Acc;
st(#peer{conn = Pid}, Acc) ->
Pid ! shutdown,
[Pid | Acc].
sw(#peer{pid = Pid}, Acc) ->
exit(Pid, shutdown),
[Pid | Acc].
%%% ---------------------------------------------------------------------------
%%% # call_service/2
%%% ---------------------------------------------------------------------------
call_service(Pid, Req)
when is_pid(Pid) ->
cs(Pid, Req);
call_service(SvcName, Req) ->
call_service_by_name(SvcName, Req).
call_service_by_name(SvcName, Req) ->
cs(whois(SvcName), Req).
cs(Pid, Req)
when is_pid(Pid) ->
try
gen_server:call(Pid, Req, infinity)
catch
E: Reason when E == exit ->
{error, {E, Reason}}
end;
cs(undefined, _) ->
{error, no_service}.
%%% ---------------------------------------------------------------------------
%%% # i/1
%%%
%%% Output: #state{}
%%% ---------------------------------------------------------------------------
%% Intialize the state of a service gen_server.
i(SvcName) ->
%% Split the config into a server state and a list of transports.
{#state{} = S, CL} = lists:foldl(fun cfg_acc/2,
{false, []},
diameter_config:lookup(SvcName)),
%% Publish the state in order to be able to access it outside of
%% the service process. Originally table identifiers were only
%% known to the service process but we now want to provide the
%% option of application callbacks being 'stateless' in order to
%% avoid having to go through a common process. (Eg. An agent that
%% sends a request for every incoming request.)
true = ets:insert_new(?STATE_TABLE, S),
%% Start fsms for each transport.
lists:foreach(fun(T) -> start_fsm(T,S) end, CL),
init_shared(S),
S.
cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts},
{false, Acc}) ->
lists:foreach(fun init_mod/1, Apps),
S = #state{service_name = SvcName,
service = Rec#diameter_service{pid = self()},
share_peers = get_value(share_peers, Opts),
use_shared_peers = get_value(use_shared_peers, Opts),
monitor = mref(get_value(monitor, Opts))},
{S, Acc};
cfg_acc({_Ref, Type, _Opts} = T, {S, Acc})
when Type == connect;
Type == listen ->
{S, [T | Acc]}.
mref(false = No) ->
No;
mref(P) ->
erlang:monitor(process, P).
init_shared(#state{use_shared_peers = true,
service_name = Svc}) ->
diameter_peer:notify(Svc, {service, self()});
init_shared(#state{use_shared_peers = false}) ->
ok.
init_mod(#diameter_app{alias = Alias,
init_state = S}) ->
mod_state(Alias, S).
start_fsm({Ref, Type, Opts}, S) ->
start(Ref, {Type, Opts}, S).
get_value(Key, Vs) ->
{_, V} = lists:keyfind(Key, 1, Vs),
V.
%%% ---------------------------------------------------------------------------
%%% # start/3
%%% ---------------------------------------------------------------------------
%% If the initial start/3 at service/transport start succeeds then
%% subsequent calls to start/4 on the same service will also succeed
%% since they involve the same call to merge_service/2. We merge here
%% rather than earlier since the service may not yet be configured
%% when the transport is configured.
start(Ref, {T, Opts}, S)
when T == connect;
T == listen ->
try
{ok, start(Ref, type(T), Opts, S)}
catch
?FAILURE(Reason) ->
{error, Reason}
end.
%% TODO: don't actually raise any errors yet
%% There used to be a difference here between the handling of
%% configured listening and connecting transports but now we simply
%% tell the transport_module to start an accepting or connecting
%% process respectively, the transport implementation initiating
%% listening on a port as required.
type(listen) -> accept;
type(accept) -> listen;
type(connect = T) -> T.
%% start/4
start(Ref, Type, Opts, #state{peerT = PeerT,
connT = ConnT,
service_name = SvcName,
service = Svc})
when Type == connect;
Type == accept ->
Pid = monitor(s(Type, Ref, {ConnT,
Opts,
SvcName,
merge_service(Opts, Svc)})),
insert(PeerT, #peer{pid = Pid,
type = Type,
ref = Ref,
options = Opts}),
Pid.
%% Note that the service record passed into the watchdog is the merged
%% record so that each watchdog (and peer_fsm) may get a different
%% record. This record is what is passed back into application
%% callbacks.
s(Type, Ref, T) ->
diameter_watchdog:start({Type, Ref}, T).
%% merge_service/2
merge_service(Opts, Svc) ->
lists:foldl(fun ms/2, Svc, Opts).
%% Limit the applications known to the fsm to those in the 'apps'
%% option. That this might be empty is checked by the fsm. It's not
%% checked at config-time since there's no requirement that the
%% service be configured first. (Which could be considered a bit odd.)
ms({applications, As}, #diameter_service{applications = Apps} = S)
when is_list(As) ->
S#diameter_service{applications
= [A || A <- Apps,
lists:member(A#diameter_app.alias, As)]};
%% The fact that all capabilities can be configured on the transports
%% means that the service doesn't necessarily represent a single
%% locally implemented Diameter peer as identified by Origin-Host: a
%% transport can configure its own Origin-Host. This means that the
%% service little more than a placeholder for default capabilities
%% plus a list of applications that individual transports can choose
%% to support (or not).
ms({capabilities, Opts}, #diameter_service{capabilities = Caps0} = Svc)
when is_list(Opts) ->
%% make_caps has already succeeded in diameter_config so it will succeed
%% again here.
{ok, Caps} = diameter_capx:make_caps(Caps0, Opts),
Svc#diameter_service{capabilities = Caps};
ms(_, Svc) ->
Svc.
%%% ---------------------------------------------------------------------------
%%% # accepted/3
%%% ---------------------------------------------------------------------------
accepted(Pid, _TPid, #state{peerT = PeerT} = S) ->
#peer{ref = Ref, type = accept = T, conn = false, options = Opts}
= P
= fetch(PeerT, Pid),
insert(PeerT, P#peer{conn = true}), %% mark replacement transport started
start(Ref, T, Opts, S). %% start new peer
fetch(Tid, Key) ->
[T] = ets:lookup(Tid, Key),
T.
%%% ---------------------------------------------------------------------------
%%% # connection_up/3
%%%
%%% Output: #state{}
%%% ---------------------------------------------------------------------------
%% Peer process has reached the open state.
connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT,
connT = ConnT}
= S) ->
P = fetch(PeerT, Pid),
C = #conn{pid = TPid,
apps = SApps,
caps = Caps,
peer = Pid},
insert(ConnT, C),
connection_up([Pkt], P#peer{conn = TPid}, C, S).
%%% ---------------------------------------------------------------------------
%%% # connection_up/2
%%%
%%% Output: #state{}
%%% ---------------------------------------------------------------------------
%% Peer process has transitioned back into the open state. Note that there
%% has been no new capabilties exchange in this case.
connection_up(Pid, #state{peerT = PeerT,
connT = ConnT}
= S) ->
#peer{conn = TPid} = P = fetch(PeerT, Pid),
C = fetch(ConnT, TPid),
connection_up([], P, C, S).
%% connection_up/4
connection_up(T, P, C, #state{peerT = PeerT,
local_peers = LDict,
service_name = SvcName,
service
= #diameter_service{applications = Apps}}
= S) ->
#peer{conn = TPid, op_state = ?STATE_DOWN}
= P,
#conn{apps = SApps, caps = Caps}
= C,
insert(PeerT, P#peer{op_state = ?STATE_UP}),
request_peer_up(TPid),
report_status(up, P, C, S, T),
S#state{local_peers = insert_local_peer(SApps,
{{TPid, Caps}, {SvcName, Apps}},
LDict)}.
insert_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps).
ilp({Id, Alias}, {TC, SA}, LDict) ->
init_conn(Id, Alias, TC, SA),
?Dict:append(Alias, TC, LDict).
init_conn(Id, Alias, TC, {SvcName, Apps}) ->
#diameter_app{module = ModX,
id = Id} %% assert
= find_app(Alias, Apps),
peer_cb({ModX, peer_up, [SvcName, TC]}, Alias).
find_app(Alias, Apps) ->
lists:keyfind(Alias, #diameter_app.alias, Apps).
%% A failing peer callback brings down the service. In the case of
%% peer_up we could just kill the transport and emit an error but for
%% peer_down we have no way to cleanup any state change that peer_up
%% may have introduced.
peer_cb(MFA, Alias) ->
try state_cb(MFA, Alias) of
ModS ->
mod_state(Alias, ModS)
catch
E: Reason ->
?ERROR({E, Reason, MFA, ?STACK})
end.
%%% ---------------------------------------------------------------------------
%%% # connection_down/2
%%%
%%% Output: #state{}
%%% ---------------------------------------------------------------------------
%% Peer process has transitioned out of the open state.
connection_down(Pid, #state{peerT = PeerT,
connT = ConnT}
= S) ->
#peer{op_state = ?STATE_UP, %% assert
conn = TPid}
= P
= fetch(PeerT, Pid),
C = fetch(ConnT, TPid),
insert(PeerT, P#peer{op_state = ?STATE_DOWN}),
connection_down(P,C,S).
%% connection_down/3
connection_down(#peer{op_state = ?STATE_DOWN}, _, S) ->
S;
connection_down(#peer{conn = TPid,
op_state = ?STATE_UP}
= P,
#conn{caps = Caps,
apps = SApps}
= C,
#state{service_name = SvcName,
service = #diameter_service{applications = Apps},
local_peers = LDict}
= S) ->
report_status(down, P, C, S, []),
NewS = S#state{local_peers
= remove_local_peer(SApps,
{{TPid, Caps}, {SvcName, Apps}},
LDict)},
request_peer_down(TPid, NewS),
NewS.
remove_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps).
rlp({Id, Alias}, {TC, SA}, LDict) ->
L = ?Dict:fetch(Alias, LDict),
down_conn(Id, Alias, TC, SA),
?Dict:store(Alias, lists:delete(TC, L), LDict).
down_conn(Id, Alias, TC, {SvcName, Apps}) ->
#diameter_app{module = ModX,
id = Id} %% assert
= find_app(Alias, Apps),
peer_cb({ModX, peer_down, [SvcName, TC]}, Alias).
%%% ---------------------------------------------------------------------------
%%% # peer_down/3
%%%
%%% Output: #state{}
%%% ---------------------------------------------------------------------------
%% Peer process has died.
peer_down(Pid, Reason, #state{peerT = PeerT} = S) ->
P = fetch(PeerT, Pid),
ets:delete_object(PeerT, P),
closed(Reason, P, S),
restart(P,S),
peer_down(P,S).
%% Send an event at connection establishment failure.
closed({shutdown, {close, _TPid, Reason}},
#peer{op_state = ?STATE_DOWN,
ref = Ref,
type = Type,
options = Opts},
#state{service_name = SvcName}) ->
send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}});
closed(_, _, _) ->
ok.
%% The peer has never come up ...
peer_down(#peer{conn = B}, S)
when is_boolean(B) ->
S;
%% ... or it has.
peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) ->
#conn{} = C = fetch(ConnT, TPid),
ets:delete_object(ConnT, C),
connection_down(P,C,S).
%% restart/2
restart(P,S) ->
q_restart(restart(P), S).
%% restart/1
%% Always try to reconnect.
restart(#peer{ref = Ref,
type = connect = T,
options = Opts,
started = Time}) ->
{Time, {Ref, T, Opts}};
%% Transport connection hasn't yet been accepted ...
restart(#peer{ref = Ref,
type = accept = T,
options = Opts,
conn = false,
started = Time}) ->
{Time, {Ref, T, Opts}};
%% ... or it has: a replacement transport has already been spawned.
restart(#peer{type = accept}) ->
false.
%% q_restart/2
%% Start the reconnect timer.
q_restart({Time, {_Ref, Type, Opts} = T}, S) ->
start_tc(tc(Time, default_tc(Type, Opts)), T, S);
q_restart(false, _) ->
ok.
%% RFC 3588, 2.1:
%%
%% When no transport connection exists with a peer, an attempt to
%% connect SHOULD be periodically made. This behavior is handled via
%% the Tc timer, whose recommended value is 30 seconds. There are
%% certain exceptions to this rule, such as when a peer has terminated
%% the transport connection stating that it does not wish to
%% communicate.
default_tc(connect, Opts) ->
proplists:get_value(reconnect_timer, Opts, ?DEFAULT_TC);
default_tc(accept, _) ->
0.
%% Bound tc below if the peer was restarted recently to avoid
%% continuous in case of faulty config or other problems.
tc(Time, Tc) ->
choose(Tc > ?RESTART_TC
orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC,
Tc,
?RESTART_TC).
start_tc(0, T, S) ->
tc_timeout(T, S);
start_tc(Tc, T, _) ->
erlang:send_after(Tc, self(), {tc_timeout, T}).
%% tc_timeout/2
tc_timeout({Ref, _Type, _Opts} = T, #state{service_name = SvcName} = S) ->
tc(diameter_config:have_transport(SvcName, Ref), T, S).
tc(true, {Ref, Type, Opts}, #state{service_name = SvcName}
= S) ->
send_event(SvcName, {reconnect, Ref, Opts}),
start(Ref, Type, Opts, S);
tc(false = No, _, _) -> %% removed
No.
%%% ---------------------------------------------------------------------------
%%% # close/2
%%% ---------------------------------------------------------------------------
%% The watchdog doesn't start a new fsm in the accept case, it
%% simply stays alive until someone tells it to die in order for
%% another watchdog to be able to detect that it should transition
%% from initial into reopen rather than okay. That someone is either
%% the accepting watchdog upon reception of a CER from the previously
%% connected peer, or us after reconnect_timer timeout.
close(Pid, #state{service_name = SvcName,
peerT = PeerT}) ->
#peer{pid = Pid,
type = accept,
ref = Ref,
options = Opts}
= fetch(PeerT, Pid),
c(Pid, diameter_config:have_transport(SvcName, Ref), Opts).
%% Tell watchdog to (maybe) die later ...
c(Pid, true, Opts) ->
Tc = proplists:get_value(reconnect_timer, Opts, 2*?DEFAULT_TC),
erlang:send_after(Tc, Pid, close);
%% ... or now.
c(Pid, false, _Opts) ->
Pid ! close.
%% The RFC's only document the behaviour of Tc, our reconnect_timer,
%% for the establishment of connections but we also give
%% reconnect_timer semantics for a listener, being the time within
%% which a new connection attempt is expected of a connecting peer.
%% The value should be greater than the peer's Tc + jitter.
%%% ---------------------------------------------------------------------------
%%% # reconnect/2
%%% ---------------------------------------------------------------------------
reconnect(Pid, #state{service_name = SvcName,
peerT = PeerT}) ->
#peer{ref = Ref,
type = connect,
options = Opts}
= fetch(PeerT, Pid),
send_event(SvcName, {reconnect, Ref, Opts}).
%%% ---------------------------------------------------------------------------
%%% # call_module/4
%%% ---------------------------------------------------------------------------
%% Backwards compatibility and never documented/advertised. May be
%% removed.
call_module(Mod, Req, From, #state{service
= #diameter_service{applications = Apps},
service_name = Svc}
= S) ->
case cm([A || A <- Apps, Mod == hd(A#diameter_app.module)],
Req,
From,
Svc)
of
{reply = T, RC} ->
{T, RC, S};
noreply = T ->
{T, S};
Reason ->
{reply, {error, Reason}, S}
end.
cm([#diameter_app{module = ModX, alias = Alias}], Req, From, Svc) ->
MFA = {ModX, handle_call, [Req, From, Svc]},
try state_cb(MFA, Alias) of
{noreply = T, ModS} ->
mod_state(Alias, ModS),
T;
{reply = T, RC, ModS} ->
mod_state(Alias, ModS),
{T, RC};
T ->
diameter_lib:error_report({invalid, T}, MFA),
invalid
catch
E: Reason ->
diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA),
failure
end;
cm([], _, _, _) ->
unknown;
cm([_,_|_], _, _, _) ->
multiple.
%%% ---------------------------------------------------------------------------
%%% # send_request/5
%%% ---------------------------------------------------------------------------
%% Send an outgoing request in its dedicated process.
%%
%% Note that both encode of the outgoing request and of the received
%% answer happens in this process. It's also this process that replies
%% to the caller. The service process only handles the state-retaining
%% callbacks.
%%
%% The mod field of the #diameter_app{} here includes any extra
%% arguments passed to diameter:call/2.
send_request({TPid, Caps, App}, Msg, Opts, Caller, SvcName) ->
#diameter_app{module = ModX}
= App,
Pkt = make_request_packet(Msg),
case cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]) of
{send, P} ->
send_request(make_request_packet(P, Pkt),
TPid,
Caps,
App,
Opts,
Caller,
SvcName);
{discard, Reason} ->
{error, Reason};
discard ->
{error, discarded};
T ->
?ERROR({invalid_return, prepare_request, App, T})
end.
%% make_request_packet/1
%%
%% Turn an outgoing request as passed to call/4 into a diameter_packet
%% record in preparation for a prepare_request callback.
make_request_packet(Bin)
when is_binary(Bin) ->
#diameter_packet{header = diameter_codec:decode_header(Bin),
bin = Bin};
make_request_packet(#diameter_packet{msg = [#diameter_header{} = Hdr | Avps]}
= Pkt) ->
Pkt#diameter_packet{msg = [make_request_header(Hdr) | Avps]};
make_request_packet(#diameter_packet{header = Hdr} = Pkt) ->
Pkt#diameter_packet{header = make_request_header(Hdr)};
make_request_packet(Msg) ->
make_request_packet(#diameter_packet{msg = Msg}).
%% make_request_header/1
make_request_header(undefined) ->
Seq = diameter_session:sequence(),
make_request_header(#diameter_header{end_to_end_id = Seq,
hop_by_hop_id = Seq});
make_request_header(#diameter_header{version = undefined} = Hdr) ->
make_request_header(Hdr#diameter_header{version = ?DIAMETER_VERSION});
make_request_header(#diameter_header{end_to_end_id = undefined} = H) ->
Seq = diameter_session:sequence(),
make_request_header(H#diameter_header{end_to_end_id = Seq});
make_request_header(#diameter_header{hop_by_hop_id = undefined} = H) ->
Seq = diameter_session:sequence(),
make_request_header(H#diameter_header{hop_by_hop_id = Seq});
make_request_header(#diameter_header{} = Hdr) ->
Hdr;
make_request_header(T) ->
?ERROR({invalid_header, T}).
%% make_request_packet/2
%%
%% Reconstruct a diameter_packet from the return value of
%% prepare_request or prepare_retransmit callback.
make_request_packet(Bin, _)
when is_binary(Bin) ->
make_request_packet(Bin);
make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]}
= Pkt,
_) ->
Pkt;
%% Returning a diameter_packet with no header from a prepare_request
%% or prepare_retransmit callback retains the header passed into it.
%% This is primarily so that the end to end and hop by hop identifiers
%% are retained.
make_request_packet(#diameter_packet{header = Hdr} = Pkt,
#diameter_packet{header = Hdr0}) ->
Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)};
make_request_packet(Msg, Pkt) ->
Pkt#diameter_packet{msg = Msg}.
%% fold_record/2
fold_record(undefined, R) ->
R;
fold_record(Rec, R) ->
diameter_lib:fold_tuple(2, Rec, R).
%% send_request/7
send_request(Pkt, TPid, Caps, App, Opts, Caller, SvcName) ->
#diameter_app{alias = Alias,
dictionary = Dict,
module = ModX,
answer_errors = AE}
= App,
EPkt = encode(Dict, Pkt),
#options{filter = Filter,
timeout = Timeout}
= Opts,
Req = #request{packet = Pkt,
from = Caller,
handler = self(),
transport = TPid,
caps = Caps,
app = Alias,
filter = Filter,
dictionary = Dict,
module = ModX},
try
TRef = send_request(TPid, EPkt, Req, Timeout),
ack(Caller),
handle_answer(SvcName, AE, recv_answer(Timeout, SvcName, {TRef, Req}))
after
erase_request(EPkt)
end.
%% Tell caller a send has been attempted.
ack({Pid, Ref}) ->
Pid ! Ref.
%% recv_answer/3
recv_answer(Timeout,
SvcName,
{TRef, #request{from = {_, Ref}, packet = RPkt} = Req}
= T) ->
%% Matching on TRef below ensures we ignore messages that pertain
%% to a previous transport prior to failover. The answer message
%% includes the #request{} since it's not necessarily Req; that
%% is, from the last peer to which we've transmitted.
receive
{answer = A, Ref, Rq, Pkt} -> %% Answer from peer
{A, Rq, Pkt};
{timeout = Reason, TRef, _} -> %% No timely reply
{error, Req, Reason};
{failover = Reason, TRef, false} -> %% No alternate peer
{error, Req, Reason};
{failover, TRef, Transport} -> %% Resend to alternate peer
try_retransmit(Timeout, SvcName, Req, Transport);
{failover, TRef} -> %% May have missed failover notification
Seqs = diameter_codec:sequence_numbers(RPkt),
Pid = whois(SvcName),
is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}),
recv_answer(Timeout, SvcName, T)
end.
%% Note that failover starts a new timer and that expiry of an old
%% timer value is ignored. This means that an answer could be accepted
%% from a peer after timeout in the case of failover.
try_retransmit(Timeout, SvcName, Req, Transport) ->
try retransmit(Transport, Req, SvcName, Timeout) of
T -> recv_answer(Timeout, SvcName, T)
catch
?FAILURE(Reason) -> {error, Req, Reason}
end.
%% handle_error/3
handle_error(Req, Reason, SvcName) ->
#request{module = ModX,
packet = Pkt,
transport = TPid,
caps = Caps}
= Req,
cb(ModX, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]).
msg(#diameter_packet{msg = undefined, bin = Bin}) ->
Bin;
msg(#diameter_packet{msg = Msg}) ->
Msg.
%% encode/2
%% Note that prepare_request can return a diameter_packet containing
%% header or transport_data. Even allow the returned record to contain
%% an encoded binary. This isn't the usual case but could some in
%% handy, for test at least. (For example, to send garbage.)
%% The normal case: encode the returned message.
encode(Dict, #diameter_packet{msg = Msg, bin = undefined} = Pkt) ->
D = pick_dictionary([Dict, ?BASE], Msg),
diameter_codec:encode(D, Pkt);
%% Callback has returned an encoded binary: just send.
encode(_, #diameter_packet{} = Pkt) ->
Pkt.
%% pick_dictionary/2
%% Pick the first dictionary that declares the application id in the
%% specified header.
pick_dictionary(Ds, [#diameter_header{application_id = Id} | _]) ->
pd(Ds, fun(D) -> Id = D:id() end);
%% Pick the first dictionary that knows the specified message name.
pick_dictionary(Ds, [MsgName|_]) ->
pd(Ds, fun(D) -> D:msg2rec(MsgName) end);
%% Pick the first dictionary that knows the name of the specified
%% message record.
pick_dictionary(Ds, Rec) ->
Name = element(1, Rec),
pd(Ds, fun(D) -> D:rec2msg(Name) end).
pd([D|Ds], F) ->
try
F(D),
D
catch
error:_ ->
pd(Ds, F)
end;
pd([], _) ->
?ERROR(no_dictionary).
%% send_request/4
send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, Timeout)
when node() == node(TPid) ->
%% Store the outgoing request before sending to avoid a race with
%% reply reception.
TRef = store_request(TPid, Bin, Req, Timeout),
send(TPid, Pkt),
TRef;
%% Send using a remote transport: spawn a process on the remote node
%% to relay the answer.
send_request(TPid, #diameter_packet{} = Pkt, Req, Timeout) ->
TRef = erlang:start_timer(Timeout, self(), timeout),
T = {TPid, Pkt, Req, Timeout, TRef},
spawn(node(TPid), ?MODULE, send, [T]),
TRef.
%% send/1
send({TPid, Pkt, #request{handler = Pid} = Req, Timeout, TRef}) ->
Ref = send_request(TPid, Pkt, Req#request{handler = self()}, Timeout),
Pid ! reref(receive T -> T end, Ref, TRef).
reref({T, Ref, R}, Ref, TRef) ->
{T, TRef, R};
reref(T, _, _) ->
T.
%% send/2
send(Pid, Pkt) ->
Pid ! {send, Pkt}.
%% retransmit/4
retransmit({TPid, Caps, #diameter_app{alias = Alias} = App},
#request{app = Alias,
packet = Pkt}
= Req,
SvcName,
Timeout) ->
have_request(Pkt, TPid) %% Don't failover to a peer we've
andalso ?THROW(timeout), %% already sent to.
case cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]) of
{send, P} ->
retransmit(make_request_packet(P, Pkt), TPid, Caps, Req, Timeout);
{discard, Reason} ->
?THROW(Reason);
discard ->
?THROW(discarded);
T ->
?ERROR({invalid_return, prepare_retransmit, App, T})
end.
%% retransmit/5
retransmit(Pkt, TPid, Caps, #request{dictionary = Dict} = Req, Timeout) ->
EPkt = encode(Dict, Pkt),
NewReq = Req#request{transport = TPid,
packet = Pkt,
caps = Caps},
?LOG(retransmission, NewReq),
TRef = send_request(TPid, EPkt, NewReq, Timeout),
{TRef, NewReq}.
%% store_request/4
store_request(TPid, Bin, Req, Timeout) ->
Seqs = diameter_codec:sequence_numbers(Bin),
TRef = erlang:start_timer(Timeout, self(), timeout),
ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}),
ets:member(?REQUEST_TABLE, TPid)
orelse (self() ! {failover, TRef}), %% possibly missed failover
TRef.
%% lookup_request/2
lookup_request(Msg, TPid)
when is_pid(TPid) ->
lookup(Msg, TPid, '_');
lookup_request(Msg, TRef)
when is_reference(TRef) ->
lookup(Msg, '_', TRef).
lookup(Msg, TPid, TRef) ->
Seqs = diameter_codec:sequence_numbers(Msg),
Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, TRef},
[],
['$_']}],
case ets:select(?REQUEST_TABLE, Spec) of
[{_, Req, _}] ->
Req;
[] ->
false
end.
%% erase_request/1
erase_request(Pkt) ->
ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)).
%% match_requests/1
match_requests(TPid) ->
Pat = {'_', #request{transport = TPid, _ = '_'}, '_'},
ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}]).
%% have_request/2
have_request(Pkt, TPid) ->
Seqs = diameter_codec:sequence_numbers(Pkt),
Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'},
'$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1).
%% request_peer_up/1
request_peer_up(TPid) ->
ets:insert(?REQUEST_TABLE, {TPid}).
%% request_peer_down/2
request_peer_down(TPid, S) ->
ets:delete(?REQUEST_TABLE, TPid),
lists:foreach(fun(T) -> failover(T,S) end, match_requests(TPid)).
%% Note that a request process can store its request after failover
%% notifications are sent here: store_request/4 sends the notification
%% in that case. Note also that we'll send as many notifications to a
%% given handler as there are peers its sent to. All but one of these
%% will be ignored.
%%% ---------------------------------------------------------------------------
%%% recv_request/3
%%% ---------------------------------------------------------------------------
recv_request(TPid, Pkt, {ConnT, SvcName, Apps}) ->
try ets:lookup(ConnT, TPid) of
[C] ->
recv_request(C, TPid, Pkt, SvcName, Apps);
[] -> %% transport has gone down
ok
catch
error: badarg -> %% service has gone down (and taken table with it)
ok
end.
%% recv_request/5
recv_request(#conn{apps = SApps, caps = Caps}, TPid, Pkt, SvcName, Apps) ->
#diameter_caps{origin_host = {OH,_},
origin_realm = {OR,_}}
= Caps,
#diameter_packet{header = #diameter_header{application_id = Id}}
= Pkt,
recv_request(find_recv_app(Id, SApps),
{SvcName, OH, OR},
TPid,
Apps,
Caps,
Pkt).
%% find_recv_app/2
%% No one should be sending the relay identifier.
find_recv_app(?APP_ID_RELAY, _) ->
false;
%% With any other id we either support it locally or as a relay.
find_recv_app(Id, SApps) ->
keyfind([Id, ?APP_ID_RELAY], 1, SApps).
%% keyfind/3
keyfind([], _, _) ->
false;
keyfind([Key | Rest], Pos, L) ->
case lists:keyfind(Key, Pos, L) of
false ->
keyfind(Rest, Pos, L);
T ->
T
end.
%% recv_request/6
recv_request({Id, Alias}, T, TPid, Apps, Caps, Pkt) ->
#diameter_app{dictionary = Dict}
= A
= find_app(Alias, Apps),
recv_request(T, {TPid, Caps}, A, diameter_codec:decode(Id, Dict, Pkt));
%% Note that the decode is different depending on whether or not Id is
%% ?APP_ID_RELAY.
%% DIAMETER_APPLICATION_UNSUPPORTED 3007
%% A request was sent for an application that is not supported.
recv_request(false, T, TPid, _, _, Pkt) ->
As = collect_avps(Pkt),
protocol_error(3007, T, TPid, Pkt#diameter_packet{avps = As}).
collect_avps(Pkt) ->
case diameter_codec:collect_avps(Pkt) of
{_Bs, As} ->
As;
As ->
As
end.
%% recv_request/4
%% Wrong number of bits somewhere in the message: reply.
%%
%% DIAMETER_INVALID_AVP_BITS 3009
%% A request was received that included an AVP whose flag bits are
%% set to an unrecognized value, or that is inconsistent with the
%% AVP's definition.
%%
recv_request(T, {TPid, _}, _, #diameter_packet{errors = [Bs | _]} = Pkt)
when is_bitstring(Bs) ->
protocol_error(3009, T, TPid, Pkt);
%% Either we support this application but don't recognize the command
%% or we're a relay and the command isn't proxiable.
%%
%% DIAMETER_COMMAND_UNSUPPORTED 3001
%% The Request contained a Command-Code that the receiver did not
%% recognize or support. This MUST be used when a Diameter node
%% receives an experimental command that it does not understand.
%%
recv_request(T,
{TPid, _},
#diameter_app{id = Id},
#diameter_packet{header = #diameter_header{is_proxiable = P},
msg = M}
= Pkt)
when ?APP_ID_RELAY /= Id, undefined == M;
?APP_ID_RELAY == Id, not P ->
protocol_error(3001, T, TPid, Pkt);
%% Error bit was set on a request.
%%
%% DIAMETER_INVALID_HDR_BITS 3008
%% A request was received whose bits in the Diameter header were
%% either set to an invalid combination, or to a value that is
%% inconsistent with the command code's definition.
%%
recv_request(T,
{TPid, _},
_,
#diameter_packet{header = #diameter_header{is_error = true}}
= Pkt) ->
protocol_error(3008, T, TPid, Pkt);
%% A message in a locally supported application or a proxiable message
%% in the relay application. Don't distinguish between the two since
%% each application has its own callback config. That is, the user can
%% easily distinguish between the two cases.
recv_request(T, TC, App, Pkt) ->
request_cb(T, TC, App, examine(Pkt)).
%% Note that there may still be errors but these aren't protocol
%% (3xxx) errors that lead to an answer-message.
request_cb({SvcName, _OH, _OR} = T, TC, App, Pkt) ->
request_cb(cb(App, handle_request, [Pkt, SvcName, TC]), App, T, TC, Pkt).
%% examine/1
%%
%% Look for errors in a decoded message. Length errors result in
%% decode failure in diameter_codec.
examine(#diameter_packet{header = #diameter_header{version
= ?DIAMETER_VERSION}}
= Pkt) ->
Pkt;
%% DIAMETER_UNSUPPORTED_VERSION 5011
%% This error is returned when a request was received, whose version
%% number is unsupported.
examine(#diameter_packet{errors = Es} = Pkt) ->
Pkt#diameter_packet{errors = [5011 | Es]}.
%% It's odd/unfortunate that this isn't a protocol error.
%% request_cb/5
%% A reply may be an answer-message, constructed either here or by
%% the handle_request callback. The header from the incoming request
%% is passed into the encode so that it can retrieve the relevant
%% command code in this case. It will also then ignore Dict and use
%% the base encoder.
request_cb({reply, Ans},
#diameter_app{dictionary = Dict},
_,
{TPid, _},
Pkt) ->
reply(Ans, Dict, TPid, Pkt);
%% An 3xxx result code, for which the E-bit is set in the header.
request_cb({protocol_error, RC}, _, T, {TPid, _}, Pkt)
when 3000 =< RC, RC < 4000 ->
protocol_error(RC, T, TPid, Pkt);
%% RFC 3588 says we must reply 3001 to anything unrecognized or
%% unsupported. 'noreply' is undocumented (and inappropriately named)
%% backwards compatibility for this, protocol_error the documented
%% alternative.
request_cb(noreply, _, T, {TPid, _}, Pkt) ->
protocol_error(3001, T, TPid, Pkt);
%% Relay a request to another peer. This is equivalent to doing an
%% explicit call/4 with the message in question except that (1) a loop
%% will be detected by examining Route-Record AVP's, (3) a
%% Route-Record AVP will be added to the outgoing request and (3) the
%% End-to-End Identifier will default to that in the
%% #diameter_header{} without the need for an end_to_end_identifier
%% option.
%%
%% relay and proxy are similar in that they require the same handling
%% with respect to Route-Record and End-to-End identifier. The
%% difference is that a proxy advertises specific applications, while
%% a relay advertises the relay application. If a callback doesn't
%% want to distinguish between the cases in the callback return value
%% then 'resend' is a neutral alternative.
%%
request_cb({A, Opts},
#diameter_app{id = Id}
= App,
T,
TC,
Pkt)
when A == relay, Id == ?APP_ID_RELAY;
A == proxy, Id /= ?APP_ID_RELAY;
A == resend ->
resend(Opts, App, T, TC, Pkt);
request_cb(discard, _, _, _, _) ->
ok;
request_cb({eval, RC, F}, App, T, TC, Pkt) ->
request_cb(RC, App, T, TC, Pkt),
diameter_lib:eval(F).
%% protocol_error/4
protocol_error(RC, {_, OH, OR}, TPid, #diameter_packet{avps = Avps} = Pkt) ->
?LOG({error, RC}, Pkt),
reply(answer_message({OH, OR, RC}, Avps), ?BASE, TPid, Pkt).
%% resend/5
%%
%% Resend a message as a relay or proxy agent.
resend(Opts,
#diameter_app{} = App,
{_SvcName, OH, _OR} = T,
{_TPid, _Caps} = TC,
#diameter_packet{avps = Avps} = Pkt) ->
{Code, _Flags, Vid} = ?BASE:avp_header('Route-Record'),
resend(is_loop(Code, Vid, OH, Avps), Opts, App, T, TC, Pkt).
%% DIAMETER_LOOP_DETECTED 3005
%% An agent detected a loop while trying to get the message to the
%% intended recipient. The message MAY be sent to an alternate peer,
%% if one is available, but the peer reporting the error has
%% identified a configuration problem.
resend(true, _, _, T, {TPid, _}, Pkt) -> %% Route-Record loop
protocol_error(3005, T, TPid, Pkt);
%% 6.1.8. Relaying and Proxying Requests
%%
%% A relay or proxy agent MUST append a Route-Record AVP to all requests
%% forwarded. The AVP contains the identity of the peer the request was
%% received from.
resend(false,
Opts,
App,
{SvcName, _, _} = T,
{TPid, #diameter_caps{origin_host = {_, OH}}},
#diameter_packet{header = Hdr0,
avps = Avps}
= Pkt) ->
Route = #diameter_avp{data = {?BASE, 'Route-Record', OH}},
Seq = diameter_session:sequence(),
Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq},
Msg = [Hdr, Route | Avps],
resend(call(SvcName, App, Msg, Opts), T, TPid, Pkt).
%% The incoming request is relayed with the addition of a
%% Route-Record. Note the requirement on the return from call/4 below,
%% which places a requirement on the value returned by the
%% handle_answer callback of the application module in question.
%%
%% Note that there's nothing stopping the request from being relayed
%% back to the sender. A pick_peer callback may want to avoid this but
%% a smart peer might recognize the potential loop and choose another
%% route. A less smart one will probably just relay the request back
%% again and force us to detect the loop. A pick_peer that wants to
%% avoid this can specify filter to avoid the possibility.
%% Eg. {neg, {host, OH} where #diameter_caps{origin_host = {OH, _}}.
%%
%% RFC 6.3 says that a relay agent does not modify Origin-Host but
%% says nothing about a proxy. Assume it should behave the same way.
%% resend/4
%%
%% Relay a reply to a relayed request.
%% Answer from the peer: reset the hop by hop identifier and send.
resend(#diameter_packet{bin = B}
= Pkt,
_,
TPid,
#diameter_packet{header = #diameter_header{hop_by_hop_id = Id},
transport_data = TD}) ->
send(TPid, Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B),
transport_data = TD});
%% TODO: counters
%% Or not: DIAMETER_UNABLE_TO_DELIVER.
resend(_, T, TPid, Pkt) ->
protocol_error(3002, T, TPid, Pkt).
%% is_loop/4
%%
%% Is there a Route-Record AVP with our Origin-Host?
is_loop(Code,
Vid,
Bin,
[#diameter_avp{code = Code, vendor_id = Vid, data = Bin} | _]) ->
true;
is_loop(_, _, _, []) ->
false;
is_loop(Code, Vid, OH, [_ | Avps])
when is_binary(OH) ->
is_loop(Code, Vid, OH, Avps);
is_loop(Code, Vid, OH, Avps) ->
is_loop(Code, Vid, ?BASE:avp(encode, OH, 'Route-Record'), Avps).
%% reply/4
%%
%% Send a locally originating reply.
%% No errors or a diameter_header/avp list.
reply(Msg, Dict, TPid, #diameter_packet{errors = Es,
transport_data = TD}
= ReqPkt)
when [] == Es;
is_record(hd(Msg), diameter_header) ->
Pkt = diameter_codec:encode(Dict, make_answer_packet(Msg, ReqPkt)),
incr(send, Pkt, TPid), %% count result codes in sent answers
send(TPid, Pkt#diameter_packet{transport_data = TD});
%% Or not: set Result-Code and Failed-AVP AVP's.
reply(Msg, Dict, TPid, #diameter_packet{errors = [H|_] = Es} = Pkt) ->
reply(rc(Msg, rc(H), [A || {_,A} <- Es], Dict),
Dict,
TPid,
Pkt#diameter_packet{errors = []}).
%% make_answer_packet/2
%% Binaries and header/avp lists are sent as-is.
make_answer_packet(Bin, _)
when is_binary(Bin) ->
#diameter_packet{bin = Bin};
make_answer_packet([#diameter_header{} | _] = Msg, _) ->
#diameter_packet{msg = Msg};
%% Otherwise a reply message clears the R and T flags and retains the
%% P flag. The E flag will be set at encode. 6.2 of 3588 requires the
%% same P flag on an answer as on the request.
make_answer_packet(Msg, #diameter_packet{header = ReqHdr}) ->
Hdr = ReqHdr#diameter_header{version = ?DIAMETER_VERSION,
is_request = false,
is_error = undefined,
is_retransmitted = false},
#diameter_packet{header = Hdr,
msg = Msg}.
%% rc/1
rc({RC, _}) ->
RC;
rc(RC) ->
RC.
%% rc/4
rc(Rec, RC, Failed, Dict)
when is_integer(RC) ->
set(Rec, [{'Result-Code', RC} | failed_avp(Rec, Failed, Dict)], Dict).
%% Reply as name and tuple list ...
set([_|_] = Ans, Avps, _) ->
Ans ++ Avps; %% Values nearer tail take precedence.
%% ... or record.
set(Rec, Avps, Dict) ->
Dict:'#set-'(Avps, Rec).
%% failed_avp/3
failed_avp(_, [] = No, _) ->
No;
failed_avp(Rec, Failed, Dict) ->
[fa(Rec, [{'AVP', Failed}], Dict)].
%% Reply as name and tuple list ...
fa([MsgName | Values], FailedAvp, Dict) ->
R = Dict:msg2rec(MsgName),
try
Dict:'#info-'(R, {index, 'Failed-AVP'}),
{'Failed-AVP', [FailedAvp]}
catch
error: _ ->
Avps = proplists:get_value('AVP', Values, []),
A = #diameter_avp{name = 'Failed-AVP',
value = FailedAvp},
{'AVP', [A|Avps]}
end;
%% ... or record.
fa(Rec, FailedAvp, Dict) ->
try
{'Failed-AVP', [FailedAvp]}
catch
error: _ ->
Avps = Dict:'get-'('AVP', Rec),
A = #diameter_avp{name = 'Failed-AVP',
value = FailedAvp},
{'AVP', [A|Avps]}
end.
%% 3. Diameter Header
%%
%% E(rror) - If set, the message contains a protocol error,
%% and the message will not conform to the ABNF
%% described for this command. Messages with the 'E'
%% bit set are commonly referred to as error
%% messages. This bit MUST NOT be set in request
%% messages. See Section 7.2.
%% 3.2. Command Code ABNF specification
%%
%% e-bit = ", ERR"
%% ; If present, the 'E' bit in the Command
%% ; Flags is set, indicating that the answer
%% ; message contains a Result-Code AVP in
%% ; the "protocol error" class.
%% 7.1.3. Protocol Errors
%%
%% Errors that fall within the Protocol Error category SHOULD be treated
%% on a per-hop basis, and Diameter proxies MAY attempt to correct the
%% error, if it is possible. Note that these and only these errors MUST
%% only be used in answer messages whose 'E' bit is set.
%% Thus, only construct answers to protocol errors. Other errors
%% require an message-specific answer and must be handled by the
%% application.
%% 6.2. Diameter Answer Processing
%%
%% When a request is locally processed, the following procedures MUST be
%% applied to create the associated answer, in addition to any
%% additional procedures that MAY be discussed in the Diameter
%% application defining the command:
%%
%% - The same Hop-by-Hop identifier in the request is used in the
%% answer.
%%
%% - The local host's identity is encoded in the Origin-Host AVP.
%%
%% - The Destination-Host and Destination-Realm AVPs MUST NOT be
%% present in the answer message.
%%
%% - The Result-Code AVP is added with its value indicating success or
%% failure.
%%
%% - If the Session-Id is present in the request, it MUST be included
%% in the answer.
%%
%% - Any Proxy-Info AVPs in the request MUST be added to the answer
%% message, in the same order they were present in the request.
%%
%% - The 'P' bit is set to the same value as the one in the request.
%%
%% - The same End-to-End identifier in the request is used in the
%% answer.
%%
%% Note that the error messages (see Section 7.3) are also subjected to
%% the above processing rules.
%% 7.3. Error-Message AVP
%%
%% The Error-Message AVP (AVP Code 281) is of type UTF8String. It MAY
%% accompany a Result-Code AVP as a human readable error message. The
%% Error-Message AVP is not intended to be useful in real-time, and
%% SHOULD NOT be expected to be parsed by network entities.
%% answer_message/2
answer_message({OH, OR, RC}, Avps) ->
{Code, _, Vid} = ?BASE:avp_header('Session-Id'),
['answer-message', {'Origin-Host', OH},
{'Origin-Realm', OR},
{'Result-Code', RC}
| session_id(Code, Vid, Avps)].
session_id(Code, Vid, Avps)
when is_list(Avps) ->
try
{value, #diameter_avp{data = D}} = find_avp(Code, Vid, Avps),
[{'Session-Id', [?BASE:avp(decode, D, 'Session-Id')]}]
catch
error: _ ->
[]
end.
%% find_avp/3
find_avp(Code, Vid, Avps)
when is_integer(Code), (undefined == Vid orelse is_integer(Vid)) ->
find(fun(A) -> is_avp(Code, Vid, A) end, Avps).
%% The final argument here could be a list of AVP's, depending on the case,
%% but we're only searching at the top level.
is_avp(Code, Vid, #diameter_avp{code = Code, vendor_id = Vid}) ->
true;
is_avp(_, _, _) ->
false.
find(_, []) ->
false;
find(Pred, [H|T]) ->
case Pred(H) of
true ->
{value, H};
false ->
find(Pred, T)
end.
%% 7. Error Handling
%%
%% There are certain Result-Code AVP application errors that require
%% additional AVPs to be present in the answer. In these cases, the
%% Diameter node that sets the Result-Code AVP to indicate the error
%% MUST add the AVPs. Examples are:
%%
%% - An unrecognized AVP is received with the 'M' bit (Mandatory bit)
%% set, causes an answer to be sent with the Result-Code AVP set to
%% DIAMETER_AVP_UNSUPPORTED, and the Failed-AVP AVP containing the
%% offending AVP.
%%
%% - An AVP that is received with an unrecognized value causes an
%% answer to be returned with the Result-Code AVP set to
%% DIAMETER_INVALID_AVP_VALUE, with the Failed-AVP AVP containing the
%% AVP causing the error.
%%
%% - A command is received with an AVP that is omitted, yet is
%% mandatory according to the command's ABNF. The receiver issues an
%% answer with the Result-Code set to DIAMETER_MISSING_AVP, and
%% creates an AVP with the AVP Code and other fields set as expected
%% in the missing AVP. The created AVP is then added to the Failed-
%% AVP AVP.
%%
%% The Result-Code AVP describes the error that the Diameter node
%% encountered in its processing. In case there are multiple errors,
%% the Diameter node MUST report only the first error it encountered
%% (detected possibly in some implementation dependent order). The
%% specific errors that can be described by this AVP are described in
%% the following section.
%% 7.5. Failed-AVP AVP
%%
%% The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides
%% debugging information in cases where a request is rejected or not
%% fully processed due to erroneous information in a specific AVP. The
%% value of the Result-Code AVP will provide information on the reason
%% for the Failed-AVP AVP.
%%
%% The possible reasons for this AVP are the presence of an improperly
%% constructed AVP, an unsupported or unrecognized AVP, an invalid AVP
%% value, the omission of a required AVP, the presence of an explicitly
%% excluded AVP (see tables in Section 10), or the presence of two or
%% more occurrences of an AVP which is restricted to 0, 1, or 0-1
%% occurrences.
%%
%% A Diameter message MAY contain one Failed-AVP AVP, containing the
%% entire AVP that could not be processed successfully. If the failure
%% reason is omission of a required AVP, an AVP with the missing AVP
%% code, the missing vendor id, and a zero filled payload of the minimum
%% required length for the omitted AVP will be added.
%%% ---------------------------------------------------------------------------
%%% # handle_answer/3
%%% ---------------------------------------------------------------------------
%% Process an answer message in call-specific process.
handle_answer(SvcName, _, {error, Req, Reason}) ->
handle_error(Req, Reason, SvcName);
handle_answer(SvcName,
AnswerErrors,
{answer, #request{dictionary = Dict} = Req, Pkt}) ->
a(examine(diameter_codec:decode(Dict, Pkt)),
SvcName,
AnswerErrors,
Req).
%% We don't really need to do a full decode if we're a relay and will
%% just resend with a new hop by hop identifier, but might a proxy
%% want to examine the answer?
a(#diameter_packet{errors = []}
= Pkt,
SvcName,
AE,
#request{transport = TPid,
caps = Caps,
packet = P}
= Req) ->
try
incr(in, Pkt, TPid)
of
_ ->
cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}])
catch
exit: {invalid_error_bit, _} = E ->
e(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req)
end;
a(#diameter_packet{} = Pkt, SvcName, AE, Req) ->
e(Pkt, SvcName, AE, Req).
e(Pkt, SvcName, callback, #request{transport = TPid,
caps = Caps,
packet = Pkt}
= Req) ->
cb(Req, handle_answer, [Pkt, msg(Pkt), SvcName, {TPid, Caps}]);
e(Pkt, SvcName, report, Req) ->
x(errors, handle_answer, [SvcName, Req, Pkt]);
e(Pkt, SvcName, discard, Req) ->
x({errors, handle_answer, [SvcName, Req, Pkt]}).
%% Note that we don't check that the application id in the answer's
%% header is what we expect. (TODO: Does the rfc says anything about
%% this?)
%% incr/4
%%
%% Increment a stats counter for an incoming or outgoing message.
%% TODO: fix
incr(_, #diameter_packet{msg = undefined}, _) ->
ok;
incr(Dir, Pkt, TPid)
when is_pid(TPid) ->
#diameter_packet{header = #diameter_header{is_error = E}
= Hdr,
msg = Rec}
= Pkt,
RC = int(get_avp_value(?BASE, 'Result-Code', Rec)),
PE = is_protocol_error(RC),
%% Check that the E bit is set only for 3xxx result codes.
(not (E orelse PE))
orelse (E andalso PE)
orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]),
Ctr = rc_counter(Rec, RC),
is_tuple(Ctr)
andalso incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}).
%% incr/2
incr(TPid, Counter) ->
diameter_stats:incr(Counter, TPid, 1).
%% RFC 3588, 7.6:
%%
%% All Diameter answer messages defined in vendor-specific
%% applications MUST include either one Result-Code AVP or one
%% Experimental-Result AVP.
%%
%% Maintain statistics assuming one or the other, not both, which is
%% surely the intent of the RFC.
rc_counter(_, RC)
when is_integer(RC) ->
{'Result-Code', RC};
rc_counter(Rec, _) ->
rcc(get_avp_value(?BASE, 'Experimental-Result', Rec)).
%% Outgoing answers may be in any of the forms messages can be sent
%% in. Incoming messages will be records. We're assuming here that the
%% arity of the result code AVP's is 0 or 1.
rcc([{_,_,RC} = T])
when is_integer(RC) ->
T;
rcc({_,_,RC} = T)
when is_integer(RC) ->
T;
rcc(_) ->
undefined.
int([N])
when is_integer(N) ->
N;
int(N)
when is_integer(N) ->
N;
int(_) ->
undefined.
is_protocol_error(RC) ->
3000 =< RC andalso RC < 4000.
-spec x(any(), atom(), list()) -> no_return().
%% Warn and exit request process on errors in an incoming answer.
x(Reason, F, A) ->
diameter_lib:warning_report(Reason, {?MODULE, F, A}),
x(Reason).
x(T) ->
exit(T).
%%% ---------------------------------------------------------------------------
%%% # failover/[23]
%%% ---------------------------------------------------------------------------
%% Failover as a consequence of request_peer_down/2.
failover({_, #request{handler = Pid} = Req, TRef}, S) ->
Pid ! {failover, TRef, rt(Req, S)}.
%% Failover as a consequence of store_request/4.
failover(TRef, Seqs, S)
when is_reference(TRef) ->
case lookup_request(Seqs, TRef) of
#request{} = Req ->
failover({Seqs, Req, TRef}, S);
false ->
ok
end.
%% prepare_request returned a binary ...
rt(#request{packet = #diameter_packet{msg = undefined}}, _) ->
false; %% TODO: Not what we should do.
%% ... or not.
rt(#request{packet = #diameter_packet{msg = Msg}} = Req, S) ->
find_transport(get_destination(Msg), Req, S).
%%% ---------------------------------------------------------------------------
%%% # report_status/5
%%% ---------------------------------------------------------------------------
report_status(Status,
#peer{ref = Ref,
conn = TPid,
type = Type,
options = Opts},
#conn{apps = [_|_] = As,
caps = Caps},
#state{service_name = SvcName}
= S,
Extra) ->
share_peer(Status, Caps, As, TPid, S),
Info = [Status, Ref, {TPid, Caps}, {type(Type), Opts} | Extra],
send_event(SvcName, list_to_tuple(Info)).
%% send_event/2
send_event(SvcName, Info) ->
send_event(#diameter_event{service = SvcName,
info = Info}).
send_event(#diameter_event{service = SvcName} = E) ->
lists:foreach(fun({_, Pid}) -> Pid ! E end, subscriptions(SvcName)).
%%% ---------------------------------------------------------------------------
%%% # share_peer/5
%%% ---------------------------------------------------------------------------
share_peer(up, Caps, Aliases, TPid, #state{share_peers = true,
service_name = Svc}) ->
diameter_peer:notify(Svc, {peer, TPid, Aliases, Caps});
share_peer(_, _, _, _, _) ->
ok.
%%% ---------------------------------------------------------------------------
%%% # share_peers/2
%%% ---------------------------------------------------------------------------
share_peers(Pid, #state{share_peers = true,
local_peers = PDict}) ->
?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict);
share_peers(_, #state{share_peers = false}) ->
ok.
sp(Pid, Alias, Peers) ->
lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers).
%%% ---------------------------------------------------------------------------
%%% # remote_peer_up/4
%%% ---------------------------------------------------------------------------
remote_peer_up(Pid, Aliases, Caps, #state{use_shared_peers = true,
service = Svc,
shared_peers = PDict}
= S) ->
#diameter_service{applications = Apps} = Svc,
Update = lists:filter(fun(A) ->
lists:keymember(A, #diameter_app.alias, Apps)
end,
Aliases),
S#state{shared_peers = rpu(Pid, Caps, PDict, Update)};
remote_peer_up(_, _, _, #state{use_shared_peers = false} = S) ->
S.
rpu(_, _, PDict, []) ->
PDict;
rpu(Pid, Caps, PDict, Aliases) ->
erlang:monitor(process, Pid),
T = {Pid, Caps},
lists:foldl(fun(A,D) -> ?Dict:append(A, T, D) end,
PDict,
Aliases).
%%% ---------------------------------------------------------------------------
%%% # remote_peer_down/2
%%% ---------------------------------------------------------------------------
remote_peer_down(Pid, #state{use_shared_peers = true,
shared_peers = PDict}
= S) ->
S#state{shared_peers = lists:foldl(fun(A,D) -> rpd(Pid, A, D) end,
PDict,
?Dict:fetch_keys(PDict))}.
rpd(Pid, Alias, PDict) ->
?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict).
%%% ---------------------------------------------------------------------------
%%% find_transport/[34]
%%%
%%% Output: {TransportPid, #diameter_caps{}, #diameter_app{}}
%%% | false
%%% | {error, Reason}
%%% ---------------------------------------------------------------------------
%% Initial call, from an arbitrary process.
find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) ->
#diameter_service{applications = Apps} = Svc,
ft(find_send_app(Alias, Apps), Msg, Opts, S);
%% Relay or proxy send.
find_transport(#diameter_app{} = App, Msg, Opts, S) ->
ft(App, Msg, Opts, S).
ft(#diameter_app{module = Mod} = App, Msg, Opts, S) ->
#options{filter = Filter,
extra = Xtra}
= Opts,
pick_peer(App#diameter_app{module = Mod ++ Xtra},
get_destination(Msg),
Filter,
S);
ft(false = No, _, _, _) ->
No.
%% This can't be used if we're a relay and sending a message
%% in an application not known locally. (TODO)
find_send_app(Alias, Apps) ->
case lists:keyfind(Alias, #diameter_app.alias, Apps) of
#diameter_app{id = ?APP_ID_RELAY} ->
false;
T ->
T
end.
%% Retransmission, in the service process.
find_transport([_,_] = RH,
Req,
#state{service = #diameter_service{pid = Pid,
applications = Apps}}
= S)
when self() == Pid ->
#request{app = Alias,
filter = Filter,
module = ModX}
= Req,
#diameter_app{}
= App
= lists:keyfind(Alias, #diameter_app.alias, Apps),
pick_peer(App#diameter_app{module = ModX},
RH,
Filter,
S).
%% get_destination/1
get_destination(Msg) ->
[str(get_avp_value(?BASE, 'Destination-Realm', Msg)),
str(get_avp_value(?BASE, 'Destination-Host', Msg))].
%% This is not entirely correct. The avp could have an arity 1, in
%% which case an empty list is a DiameterIdentity of length 0 rather
%% than the list of no values we treat it as by mapping to undefined.
%% This behaviour is documented.
str([]) ->
undefined;
str(T) ->
T.
%% get_avp_value/3
%%
%% Find an AVP in a message of one of three forms:
%%
%% - a message record (as generated from a .dia spec) or
%% - a list of an atom message name followed by 2-tuple, avp name/value pairs.
%% - a list of a #diameter_header{} followed by #diameter_avp{} records,
%%
%% In the first two forms a dictionary module is used at encode to
%% identify the type of the AVP and its arity in the message in
%% question. The third form allows messages to be sent as is, without
%% a dictionary, which is needed in the case of relay agents, for one.
get_avp_value(Dict, Name, [#diameter_header{} | Avps]) ->
try
{Code, _, VId} = Dict:avp_header(Name),
[A|_] = lists:dropwhile(fun(#diameter_avp{code = C, vendor_id = V}) ->
C /= Code orelse V /= VId
end,
Avps),
avp_decode(Dict, Name, A)
catch
error: _ ->
undefined
end;
get_avp_value(_, Name, [_MsgName | Avps]) ->
case lists:keyfind(Name, 1, Avps) of
{_, V} ->
V;
_ ->
undefined
end;
%% Message is typically a record but not necessarily: diameter:call/4
%% can be passed an arbitrary term.
get_avp_value(Dict, Name, Rec) ->
try
Dict:'#get-'(Name, Rec)
catch
error:_ ->
undefined
end.
avp_decode(Dict, Name, #diameter_avp{value = undefined,
data = Bin}) ->
Dict:avp(decode, Bin, Name);
avp_decode(_, _, #diameter_avp{value = V}) ->
V.
%%% ---------------------------------------------------------------------------
%%% # pick_peer(App, [DestRealm, DestHost], Filter, #state{})
%%%
%%% Output: {TransportPid, #diameter_caps{}, App}
%%% | false
%%% | {error, Reason}
%%% ---------------------------------------------------------------------------
%% Find transports to a given realm/host.
pick_peer(#diameter_app{alias = Alias}
= App,
[_,_] = RH,
Filter,
#state{local_peers = L,
shared_peers = S,
service_name = SvcName,
service = #diameter_service{pid = Pid}}) ->
pick_peer(peers(Alias, RH, Filter, L),
peers(Alias, RH, Filter, S),
Pid,
SvcName,
App).
%% pick_peer/5
pick_peer([], [], _, _, _) ->
false;
%% App state is mutable but we're not in the service process: go there.
pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App)
when self() /= Pid ->
call_service(Pid, {pick_peer, Local, Remote, App});
%% App state isn't mutable or it is and we're in the service process:
%% do the deed.
pick_peer(Local,
Remote,
_Pid,
SvcName,
#diameter_app{module = ModX,
alias = Alias,
init_state = S,
mutable = M}
= App) ->
MFA = {ModX, pick_peer, [Local, Remote, SvcName]},
try state_cb(App, MFA) of
{ok, {TPid, #diameter_caps{} = Caps}} when is_pid(TPid) ->
{TPid, Caps, App};
{{TPid, #diameter_caps{} = Caps}, ModS} when is_pid(TPid), M ->
mod_state(Alias, ModS),
{TPid, Caps, App};
{false = No, ModS} when M ->
mod_state(Alias, ModS),
No;
{ok, false = No} ->
No;
false = No ->
No;
{{TPid, #diameter_caps{} = Caps}, S} when is_pid(TPid) ->
{TPid, Caps, App}; %% Accept returned state in the immutable
{false = No, S} -> %% case as long it isn't changed.
No;
T ->
diameter_lib:error_report({invalid, T, App}, MFA)
catch
E: Reason ->
diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA)
end.
%% peers/4
peers(Alias, RH, Filter, Peers) ->
case ?Dict:find(Alias, Peers) of
{ok, L} ->
ps(L, RH, Filter, {[],[]});
error ->
[]
end.
%% Place a peer whose Destination-Host/Realm matches those of the
%% request at the front of the result list. Could add some sort of
%% 'sort' option to allow more control.
ps([], _, _, {Ys, Ns}) ->
lists:reverse(Ys, Ns);
ps([{_TPid, #diameter_caps{} = Caps} = TC | Rest], RH, Filter, Acc) ->
ps(Rest, RH, Filter, pacc(caps_filter(Caps, RH, Filter),
caps_filter(Caps, RH, {all, [host, realm]}),
TC,
Acc)).
pacc(true, true, Peer, {Ts, Fs}) ->
{[Peer|Ts], Fs};
pacc(true, false, Peer, {Ts, Fs}) ->
{Ts, [Peer|Fs]};
pacc(_, _, _, Acc) ->
Acc.
%% caps_filter/3
caps_filter(C, RH, {neg, F}) ->
not caps_filter(C, RH, F);
caps_filter(C, RH, {all, L})
when is_list(L) ->
lists:all(fun(F) -> caps_filter(C, RH, F) end, L);
caps_filter(C, RH, {any, L})
when is_list(L) ->
lists:any(fun(F) -> caps_filter(C, RH, F) end, L);
caps_filter(#diameter_caps{origin_host = {_,OH}}, [_,DH], host) ->
eq(undefined, DH, OH);
caps_filter(#diameter_caps{origin_realm = {_,OR}}, [DR,_], realm) ->
eq(undefined, DR, OR);
caps_filter(C, _, Filter) ->
caps_filter(C, Filter).
%% caps_filter/2
caps_filter(_, none) ->
true;
caps_filter(#diameter_caps{origin_host = {_,OH}}, {host, H}) ->
eq(any, H, OH);
caps_filter(#diameter_caps{origin_realm = {_,OR}}, {realm, R}) ->
eq(any, R, OR);
%% Anything else is expected to be an eval filter. Filter failure is
%% documented as being equivalent to a non-matching filter.
caps_filter(C, T) ->
try
{eval, F} = T,
diameter_lib:eval([F,C])
catch
_:_ -> false
end.
eq(Any, Id, PeerId) ->
Any == Id orelse try
iolist_to_binary(Id) == iolist_to_binary(PeerId)
catch
_:_ -> false
end.
%% OctetString() can be specified as an iolist() so test for string
%% rather then term equality.
%% transports/1
transports(#state{peerT = PeerT}) ->
ets:select(PeerT, [{#peer{conn = '$1', _ = '_'},
[{'is_pid', '$1'}],
['$1']}]).
%%% ---------------------------------------------------------------------------
%%% # service_info/2
%%% ---------------------------------------------------------------------------
%% The config passed to diameter:start_service/2.
-define(CAP_INFO, ['Origin-Host',
'Origin-Realm',
'Vendor-Id',
'Product-Name',
'Origin-State-Id',
'Host-IP-Address',
'Supported-Vendor-Id',
'Auth-Application-Id',
'Inband-Security-Id',
'Acct-Application-Id',
'Vendor-Specific-Application-Id',
'Firmware-Revision']).
-define(ALL_INFO, [capabilities,
applications,
transport,
pending,
statistics]).
service_info(Items, S)
when is_list(Items) ->
[{complete(I), service_info(I,S)} || I <- Items];
service_info(Item, S)
when is_atom(Item) ->
service_info(Item, S, true).
service_info(Item, #state{service = Svc} = S, Complete) ->
case Item of
name ->
S#state.service_name;
'Origin-Host' ->
(Svc#diameter_service.capabilities)
#diameter_caps.origin_host;
'Origin-Realm' ->
(Svc#diameter_service.capabilities)
#diameter_caps.origin_realm;
'Vendor-Id' ->
(Svc#diameter_service.capabilities)
#diameter_caps.vendor_id;
'Product-Name' ->
(Svc#diameter_service.capabilities)
#diameter_caps.product_name;
'Origin-State-Id' ->
(Svc#diameter_service.capabilities)
#diameter_caps.origin_state_id;
'Host-IP-Address' ->
(Svc#diameter_service.capabilities)
#diameter_caps.host_ip_address;
'Supported-Vendor-Id' ->
(Svc#diameter_service.capabilities)
#diameter_caps.supported_vendor_id;
'Auth-Application-Id' ->
(Svc#diameter_service.capabilities)
#diameter_caps.auth_application_id;
'Inband-Security-Id' ->
(Svc#diameter_service.capabilities)
#diameter_caps.inband_security_id;
'Acct-Application-Id' ->
(Svc#diameter_service.capabilities)
#diameter_caps.acct_application_id;
'Vendor-Specific-Application-Id' ->
(Svc#diameter_service.capabilities)
#diameter_caps.vendor_specific_application_id;
'Firmware-Revision' ->
(Svc#diameter_service.capabilities)
#diameter_caps.firmware_revision;
capabilities -> service_info(?CAP_INFO, S);
applications -> info_apps(S);
transport -> info_transport(S);
pending -> info_pending(S);
statistics -> info_stats(S);
keys -> ?ALL_INFO ++ ?CAP_INFO; %% mostly for test
all -> service_info(?ALL_INFO, S);
_ when Complete -> service_info(complete(Item), S, false);
_ -> undefined
end.
complete(Pre) ->
P = atom_to_list(Pre),
case [I || I <- [name | ?ALL_INFO] ++ ?CAP_INFO,
lists:prefix(P, atom_to_list(I))]
of
[I] -> I;
_ -> Pre
end.
info_stats(#state{peerT = PeerT}) ->
Peers = ets:select(PeerT, [{#peer{ref = '$1', conn = '$2', _ = '_'},
[{'is_pid', '$2'}],
[['$1', '$2']]}]),
diameter_stats:read(lists:append(Peers)).
%% TODO: include peer identities in return value
info_transport(#state{peerT = PeerT, connT = ConnT}) ->
dict:fold(fun it/3,
[],
ets:foldl(fun(T,A) -> it_acc(ConnT, A, T) end,
dict:new(),
PeerT)).
it(Ref, [[{type, connect} | _] = L], Acc) ->
[[{ref, Ref} | L] | Acc];
it(Ref, [[{type, accept}, {options, Opts} | _] | _] = L, Acc) ->
[[{ref, Ref},
{type, listen},
{options, Opts},
{accept, [lists:nthtail(2,A) || A <- L]}]
| Acc].
%% Each entry has the same Opts. (TODO)
it_acc(ConnT, Acc, #peer{pid = Pid,
type = Type,
ref = Ref,
options = Opts,
op_state = OS,
started = T,
conn = TPid}) ->
dict:append(Ref,
[{type, Type},
{options, Opts},
{watchdog, {Pid, T, OS}}
| info_conn(ConnT, TPid)],
Acc).
info_conn(ConnT, TPid) ->
info_conn(ets:lookup(ConnT, TPid)).
info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
[{peer, {Pid, T}},
{apps, SApps},
{caps, info_caps(Caps)}];
info_conn([] = No) ->
No.
info_caps(#diameter_caps{} = C) ->
lists:zip(record_info(fields, diameter_caps), tl(tuple_to_list(C))).
info_apps(#state{service = #diameter_service{applications = Apps}}) ->
lists:map(fun mk_app/1, Apps).
mk_app(#diameter_app{alias = Alias,
dictionary = Dict,
module = ModX,
id = Id}) ->
[{alias, Alias},
{dictionary, Dict},
{module, ModX},
{id, Id}].
info_pending(#state{} = S) ->
MatchSpec = [{{'$1',
#request{transport = '$2',
from = '$3',
app = '$4',
_ = '_'},
'_'},
[?ORCOND([{'==', T, '$2'} || T <- transports(S)])],
[{{'$1', [{{app, '$4'}},
{{transport, '$2'}},
{{from, '$3'}}]}}]}],
ets:select(?REQUEST_TABLE, MatchSpec).