diff options
author | Anders Svensson <[email protected]> | 2011-05-18 18:58:01 +0200 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2011-05-18 18:58:01 +0200 |
commit | 1756ed583f24ba2206a0f573635a6fa3cdea5c54 (patch) | |
tree | c762a6e700f09fb580f8c9945fd6886ecc2c9923 /lib/diameter/src/app/diameter_service.erl | |
parent | e993da4426a76bb172290a10999267d3023120d5 (diff) | |
parent | 3c15ff32e89e401b4dde2b8acc9699be2614b996 (diff) | |
download | otp-1756ed583f24ba2206a0f573635a6fa3cdea5c54.tar.gz otp-1756ed583f24ba2206a0f573635a6fa3cdea5c54.tar.bz2 otp-1756ed583f24ba2206a0f573635a6fa3cdea5c54.zip |
Merge branch 'anders/diameter_import/OTP-9321' into dev
* anders/diameter_import/OTP-9321:
Initial commit of the diameter application.
Diffstat (limited to 'lib/diameter/src/app/diameter_service.erl')
-rw-r--r-- | lib/diameter/src/app/diameter_service.erl | 2931 |
1 files changed, 2931 insertions, 0 deletions
diff --git a/lib/diameter/src/app/diameter_service.erl b/lib/diameter/src/app/diameter_service.erl new file mode 100644 index 0000000000..82a8d7a994 --- /dev/null +++ b/lib/diameter/src/app/diameter_service.erl @@ -0,0 +1,2931 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% Implements the process that represents a service. +%% + +-module(diameter_service). +-behaviour(gen_server). + +-export([start/1, + stop/1, + start_transport/2, + stop_transport/2, + info/2, + call/4]). + +%% towards diameter_watchdog +-export([receive_message/3]). + +%% service supervisor +-export([start_link/1]). + +-export([subscribe/1, + unsubscribe/1, + subscriptions/1, + subscriptions/0, + services/0, + services/1, + whois/1, + flush_stats/1]). + +%% test/debug +-export([call_module/3, + state/1, + uptime/1]). + +%%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +%% Other callbacks. +-export([send/1]). + +-include_lib("diameter/include/diameter.hrl"). +-include("diameter_internal.hrl"). +-include("diameter_types.hrl"). + +-define(STATE_UP, up). +-define(STATE_DOWN, down). + +-define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1 +-define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests +-define(RESTART_TC, 1000). %% if restart was this recent + +%% Used to be able to swap this with anything else dict-like but now +%% rely on the fact that a service's #state{} record does not change +%% in storing in it ?STATE table and not always going through the +%% service process. In particular, rely on the fact that operations on +%% a ?Dict don't change the handle to it. +-define(Dict, diameter_dict). + +%% Table containing outgoing requests for which a reply has yet to be +%% received. +-define(REQUEST_TABLE, diameter_request). + +%% Maintains state in a table. In contrast to previously, a service's +%% stat is not constant and is accessed outside of the service +%% process. +-define(STATE_TABLE, ?MODULE). + +%% Workaround for dialyzer's lack of understanding of match specs. +-type match(T) + :: T | '_' | '$1' | '$2' | '$3' | '$4'. + +%% State of service gen_server. +-record(state, + {id = now(), + service_name, %% as passed to start_service/2, key in ?STATE_TABLE + service :: #diameter_service{}, + peerT = ets_new(peers) :: ets:tid(), %% #peer{} at start_fsm + connT = ets_new(conns) :: ets:tid(), %% #conn{} at connection_up + share_peers = false :: boolean(), %% broadcast peers to remote nodes? + use_shared_peers = false :: boolean(), %% use broadcasted peers? + shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] + local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] + monitor = false :: false | pid()}). %% process to die with +%% shared_peers reflects the peers broadcast from remote nodes. Note +%% that the state term itself doesn't change, which is relevant for +%% the stateless application callbacks since the state is retrieved +%% from ?STATE_TABLE from outside the service process. The pid in the +%% service record is used to determine whether or not we need to call +%% the process for a pick_peer callback. + +%% Record representing a watchdog process. +-record(peer, + {pid :: match(pid()), + type :: match(connect | accept), + ref :: match(reference()), %% key into diameter_config + options :: match([transport_opt()]), %% as passed to start_transport + op_state = ?STATE_DOWN :: match(?STATE_DOWN | ?STATE_UP), + started = now(), %% at process start + conn = false :: match(boolean() | pid())}). + %% true at accept, pid() at connection_up (connT key) + +%% Record representing a peer_fsm process. +-record(conn, + {pid :: pid(), + apps :: [{0..16#FFFFFFFF, app_alias()}], %% {Id, Alias} + caps :: #diameter_caps{}, + started = now(), %% at process start + peer :: pid()}). %% key into peerT + +%% Record stored in diameter_request for each outgoing request. +-record(request, + {from, %% arg 2 of handle_call/3 + handler :: match(pid()), %% request process + transport :: match(pid()), %% peer process + caps :: match(#diameter_caps{}), + app :: match(app_alias()), %% #diameter_app.alias + dictionary :: match(module()), %% #diameter_app.dictionary + module :: match(nonempty_improper_list(module(), list())), + %% #diameter_app.module + filter :: match(peer_filter()), + packet :: match(#diameter_packet{})}). + +%% Record call/4 options are parsed into. +-record(options, + {filter = none :: peer_filter(), + extra = [] :: list(), + timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF, + detach = false :: boolean()}). + +%% Since RFC 3588 requires that a Diameter agent not modify End-to-End +%% Identifiers, the possibility of explicitly setting an End-to-End +%% Identifier would be needed to be able to implement an agent in +%% which one side of the communication is not implemented on top of +%% diameter. For example, Diameter being sent or received encapsulated +%% in some other protocol, or even another Diameter stack in a +%% non-Erlang environment. (Not that this is likely to be a normal +%% case.) +%% +%% The implemented solution is not an option but to respect any header +%% values set in a diameter_header record returned from a +%% prepare_request callback. A call to diameter:call/4 can communicate +%% values to the callback using the 'extra' option if so desired. + +%%% --------------------------------------------------------------------------- +%%% # start(SvcName) +%%% --------------------------------------------------------------------------- + +start(SvcName) -> + diameter_service_sup:start_child(SvcName). + +start_link(SvcName) -> + Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}], + gen_server:start_link(?MODULE, [SvcName], Options). +%% Put the arbitrary term SvcName in a list in case we ever want to +%% send more than this and need to distinguish old from new. + +%%% --------------------------------------------------------------------------- +%%% # stop(SvcName) +%%% --------------------------------------------------------------------------- + +stop(SvcName) -> + case whois(SvcName) of + undefined -> + {error, not_started}; + Pid -> + stop(call_service(Pid, stop), Pid) + end. + +stop(ok, Pid) -> + MRef = erlang:monitor(process, Pid), + receive {'DOWN', MRef, process, _, _} -> ok end; +stop(No, _) -> + No. + +%%% --------------------------------------------------------------------------- +%%% # start_transport(SvcName, {Ref, Type, Opts}) +%%% --------------------------------------------------------------------------- + +start_transport(SvcName, {_,_,_} = T) -> + call_service_by_name(SvcName, {start, T}). + +%%% --------------------------------------------------------------------------- +%%% # stop_transport(SvcName, Refs) +%%% --------------------------------------------------------------------------- + +stop_transport(_, []) -> + ok; +stop_transport(SvcName, [_|_] = Refs) -> + call_service_by_name(SvcName, {stop, Refs}). + +%%% --------------------------------------------------------------------------- +%%% # info(SvcName, Item) +%%% --------------------------------------------------------------------------- + +info(SvcName, Item) -> + info_rc(call_service_by_name(SvcName, {info, Item})). + +info_rc({error, _}) -> + undefined; +info_rc(Info) -> + Info. + +%%% --------------------------------------------------------------------------- +%%% # receive_message(TPid, Pkt, MessageData) +%%% --------------------------------------------------------------------------- + +%% Handle an incoming message in the watchdog process. This used to +%% come through the service process but this avoids that becoming a +%% bottleneck. + +receive_message(TPid, Pkt, T) + when is_pid(TPid) -> + #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, + recv(R, (not R) andalso lookup_request(Pkt, TPid), TPid, Pkt, T). + +%% Incoming request ... +recv(true, false, TPid, Pkt, T) -> + try + spawn(fun() -> recv_request(TPid, Pkt, T) end) + catch + error: system_limit = E -> %% discard + ?LOG({error, E}, now()) + end; + +%% ... answer to known request ... +recv(false, #request{from = {_, Ref}, handler = Pid} = Req, _, Pkt, _) -> + Pid ! {answer, Ref, Req, Pkt}; +%% Note that failover could have happened prior to this message being +%% received and triggering failback. That is, both a failover message +%% and answer may be on their way to the handler process. In the worst +%% case the request process gets notification of the failover and +%% sends to the alternate peer before an answer arrives, so it's +%% always the case that we can receive more than one answer after +%% failover. The first answer received by the request process wins, +%% any others are discarded. + +%% ... or not. +recv(false, false, _, _, _) -> + ok. + +%%% --------------------------------------------------------------------------- +%%% # call(SvcName, App, Msg, Options) +%%% --------------------------------------------------------------------------- + +call(SvcName, App, Msg, Options) + when is_list(Options) -> + Rec = make_options(Options), + Ref = make_ref(), + Caller = {self(), Ref}, + Fun = fun() -> exit({Ref, call(SvcName, App, Msg, Rec, Caller)}) end, + try spawn_monitor(Fun) of + {_, MRef} -> + recv(MRef, Ref, Rec#options.detach, false) + catch + error: system_limit = E -> + {error, E} + end. + +%% Don't rely on gen_server:call/3 for the timeout handling since it +%% makes no guarantees about not leaving a reply message in the +%% mailbox if we catch its exit at timeout. It currently *can* do so, +%% which is also undocumented. + +recv(MRef, _, true, true) -> + erlang:demonitor(MRef, [flush]), + ok; + +recv(MRef, Ref, Detach, Sent) -> + receive + Ref -> %% send has been attempted + recv(MRef, Ref, Detach, true); + {'DOWN', MRef, process, _, Reason} -> + call_rc(Reason, Ref, Sent) + end. + +%% call/5 has returned ... +call_rc({Ref, Ans}, Ref, _) -> + Ans; + +%% ... or not. In this case failure/encode are documented. +call_rc(_, _, Sent) -> + {error, choose(Sent, failure, encode)}. + +%% call/5 +%% +%% In the process spawned for the outgoing request. + +call(SvcName, App, Msg, Opts, Caller) -> + c(ets:lookup(?STATE_TABLE, SvcName), App, Msg, Opts, Caller). + +c([#state{service_name = SvcName} = S], App, Msg, Opts, Caller) -> + case find_transport(App, Msg, Opts, S) of + {_,_,_} = T -> + send_request(T, Msg, Opts, Caller, SvcName); + false -> + {error, no_connection}; + {error, _} = No -> + No + end; + +c([], _, _, _, _) -> + {error, no_service}. + +%% make_options/1 + +make_options(Options) -> + lists:foldl(fun mo/2, #options{}, Options). + +mo({timeout, T}, Rec) + when is_integer(T), 0 =< T -> + Rec#options{timeout = T}; + +mo({filter, F}, #options{filter = none} = Rec) -> + Rec#options{filter = F}; +mo({filter, F}, #options{filter = {all, Fs}} = Rec) -> + Rec#options{filter = {all, [F | Fs]}}; +mo({filter, F}, #options{filter = F0} = Rec) -> + Rec#options{filter = {all, [F0, F]}}; + +mo({extra, L}, #options{extra = X} = Rec) + when is_list(L) -> + Rec#options{extra = X ++ L}; + +mo(detach, Rec) -> + Rec#options{detach = true}; + +mo(T, _) -> + ?ERROR({invalid_option, T}). + +%%% --------------------------------------------------------------------------- +%%% # subscribe(SvcName) +%%% # unsubscribe(SvcName) +%%% --------------------------------------------------------------------------- + +subscribe(SvcName) -> + diameter_reg:add({?MODULE, subscriber, SvcName}). + +unsubscribe(SvcName) -> + diameter_reg:del({?MODULE, subscriber, SvcName}). + +subscriptions(Pat) -> + pmap(diameter_reg:match({?MODULE, subscriber, Pat})). + +subscriptions() -> + subscriptions('_'). + +pmap(Props) -> + lists:map(fun({{?MODULE, _, Name}, Pid}) -> {Name, Pid} end, Props). + +%%% --------------------------------------------------------------------------- +%%% # services(Pattern) +%%% --------------------------------------------------------------------------- + +services(Pat) -> + pmap(diameter_reg:match({?MODULE, service, Pat})). + +services() -> + services('_'). + +whois(SvcName) -> + case diameter_reg:match({?MODULE, service, SvcName}) of + [{_, Pid}] -> + Pid; + [] -> + undefined + end. + +%%% --------------------------------------------------------------------------- +%%% # flush_stats/1 +%%% +%%% Output: list of {{SvcName, Alias, Counter}, Value} +%%% --------------------------------------------------------------------------- + +flush_stats(TPid) -> + diameter_stats:flush(TPid). + +%% =========================================================================== +%% =========================================================================== + +state(Svc) -> + call_service(Svc, state). + +uptime(Svc) -> + call_service(Svc, uptime). + +%% call_module/3 + +call_module(Service, AppMod, Request) -> + call_service(Service, {call_module, AppMod, Request}). + +%%% --------------------------------------------------------------------------- +%%% # init([SvcName]) +%%% --------------------------------------------------------------------------- + +init([SvcName]) -> + process_flag(trap_exit, true), %% ensure terminate(shutdown, _) + i(SvcName, diameter_reg:add_new({?MODULE, service, SvcName})). + +i(SvcName, true) -> + {ok, i(SvcName)}; +i(_, false) -> + {stop, {shutdown, already_started}}. + +%%% --------------------------------------------------------------------------- +%%% # handle_call(Req, From, State) +%%% --------------------------------------------------------------------------- + +handle_call(state, _, S) -> + {reply, S, S}; + +handle_call(uptime, _, #state{id = T} = S) -> + {reply, diameter_lib:now_diff(T), S}; + +%% Start a transport. +handle_call({start, {Ref, Type, Opts}}, _From, S) -> + {reply, start(Ref, {Type, Opts}, S), S}; + +%% Stop transports. +handle_call({stop, Refs}, _From, S) -> + shutdown(Refs, S), + {reply, ok, S}; + +%% pick_peer with mutable state +handle_call({pick_peer, Local, Remote, App}, _From, S) -> + #diameter_app{mutable = true} = App, %% assert + {reply, pick_peer(Local, Remote, self(), S#state.service_name, App), S}; + +handle_call({call_module, AppMod, Req}, From, S) -> + call_module(AppMod, Req, From, S); + +handle_call({info, Item}, _From, S) -> + {reply, service_info(Item, S), S}; + +handle_call(stop, _From, S) -> + shutdown(S), + {stop, normal, ok, S}; +%% The server currently isn't guaranteed to be dead when the caller +%% gets the reply. We deal with this in the call to the server, +%% stating a monitor that waits for DOWN before returning. + +handle_call(Req, From, S) -> + ?REPORT(unknown_request, ?FUNC, [Req, From]), + {reply, nok, S}. + +%%% --------------------------------------------------------------------------- +%%% # handle_cast(Req, State) +%%% --------------------------------------------------------------------------- + +handle_cast(Req, S) -> + ?REPORT(unknown_request, ?FUNC, [Req]), + {noreply, S}. + +%%% --------------------------------------------------------------------------- +%%% # handle_info(Req, State) +%%% --------------------------------------------------------------------------- + +handle_info(T,S) -> + case transition(T,S) of + ok -> + {noreply, S}; + #state{} = NS -> + {noreply, NS}; + {stop, Reason} -> + {stop, {shutdown, Reason}, S} + end. + +%% transition/2 + +%% Peer process is telling us to start a new accept process. +transition({accepted, Pid, TPid}, S) -> + accepted(Pid, TPid, S), + ok; + +%% Peer process has a new open connection. +transition({connection_up, Pid, T}, S) -> + connection_up(Pid, T, S); + +%% Peer process has left state open. +transition({connection_down, Pid}, S) -> + connection_down(Pid, S); + +%% Peer process has returned to state open. +transition({connection_up, Pid}, S) -> + connection_up(Pid, S); + +%% Accepting transport has lost connectivity. +transition({close, Pid}, S) -> + close(Pid, S), + ok; + +%% Connecting transport is being restarted by watchdog. +transition({reconnect, Pid}, S) -> + reconnect(Pid, S), + ok; + +%% Monitor process has died. Just die with a reason that tells +%% diameter_config about the happening. If a cleaner shutdown is +%% required then someone should stop us. +transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) -> + {stop, {monitor, Reason}}; + +%% Local peer process has died. +transition({'DOWN', _, process, Pid, Reason}, S) + when node(Pid) == node() -> + peer_down(Pid, Reason, S); + +%% Remote service wants to know about shared transports. +transition({service, Pid}, S) -> + share_peers(Pid, S), + ok; + +%% Remote service is communicating a shared peer. +transition({peer, TPid, Aliases, Caps}, S) -> + remote_peer_up(TPid, Aliases, Caps, S); + +%% Remote peer process has died. +transition({'DOWN', _, process, TPid, _}, S) -> + remote_peer_down(TPid, S); + +%% Restart after tc expiry. +transition({tc_timeout, T}, S) -> + tc_timeout(T, S), + ok; + +%% Request process is telling us it may have missed a failover message +%% after a transport went down and the service process looked up +%% outstanding requests. +transition({failover, TRef, Seqs}, S) -> + failover(TRef, Seqs, S), + ok; + +transition(Req, _) -> + ?REPORT(unknown_request, ?FUNC, [Req]), + ok. + +%%% --------------------------------------------------------------------------- +%%% # terminate(Reason, State) +%%% --------------------------------------------------------------------------- + +terminate(Reason, #state{service_name = Name} = S) -> + ets:delete(?STATE_TABLE, Name), + shutdown == Reason %% application shutdown + andalso shutdown(S). + +%%% --------------------------------------------------------------------------- +%%% # code_change(FromVsn, State, Extra) +%%% --------------------------------------------------------------------------- + +code_change(FromVsn, + #state{service_name = SvcName, + service = #diameter_service{applications = Apps}} + = S, + Extra) -> + lists:foreach(fun(A) -> + code_change(FromVsn, SvcName, Extra, A) + end, + Apps), + {ok, S}. + +code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) -> + {ok, S} = cb(A, code_change, [FromVsn, + mod_state(Alias), + Extra, + SvcName]), + mod_state(Alias, S). + +%% =========================================================================== +%% =========================================================================== + +cb([_|_] = M, F, A) -> + eval(M, F, A); +cb(Rec, F, A) -> + {_, M} = app(Rec), + eval(M, F, A). + +app(#request{app = A, module = M}) -> + {A,M}; +app(#diameter_app{alias = A, module = M}) -> + {A,M}. + +eval([M|X], F, A) -> + apply(M, F, A ++ X). + +%% Callback with state. + +state_cb(#diameter_app{mutable = false, init_state = S}, {ModX, F, A}) -> + eval(ModX, F, A ++ [S]); + +state_cb(#diameter_app{mutable = true, alias = Alias}, {_,_,_} = MFA) -> + state_cb(MFA, Alias); + +state_cb({ModX,F,A}, Alias) + when is_list(ModX) -> + eval(ModX, F, A ++ [mod_state(Alias)]). + +choose(true, X, _) -> X; +choose(false, _, X) -> X. + +ets_new(Tbl) -> + ets:new(Tbl, [{keypos, 2}]). + +insert(Tbl, Rec) -> + ets:insert(Tbl, Rec), + Rec. + +monitor(Pid) -> + erlang:monitor(process, Pid), + Pid. + +%% Using the process dictionary for the callback state was initially +%% just a way to make what was horrendous trace (big state record and +%% much else everywhere) somewhat more readable. There's not as much +%% need for it now but it's no worse (except possibly that we don't +%% see the table identifier being passed around) than an ets table so +%% keep it. + +mod_state(Alias) -> + get({?MODULE, mod_state, Alias}). + +mod_state(Alias, ModS) -> + put({?MODULE, mod_state, Alias}, ModS). + +%% have_transport/2 + +have_transport(SvcName, Ref) -> + [] /= diameter_config:have_transport(SvcName, Ref). + +%%% --------------------------------------------------------------------------- +%%% # shutdown/2 +%%% --------------------------------------------------------------------------- + +shutdown(Refs, #state{peerT = PeerT}) -> + ets:foldl(fun(P,ok) -> s(P, Refs), ok end, ok, PeerT). + +s(#peer{ref = Ref, pid = Pid}, Refs) -> + s(lists:member(Ref, Refs), Pid); + +s(true, Pid) -> + Pid ! {shutdown, self()}; %% 'DOWN' will cleanup as usual +s(false, _) -> + ok. + +%%% --------------------------------------------------------------------------- +%%% # shutdown/1 +%%% --------------------------------------------------------------------------- + +shutdown(#state{peerT = PeerT}) -> + %% A transport might not be alive to receive the shutdown request + %% but give those that are a chance to shutdown gracefully. + wait(fun st/2, PeerT), + %% Kill the watchdogs explicitly in case there was no transport. + wait(fun sw/2, PeerT). + +wait(Fun, T) -> + diameter_lib:wait(ets:foldl(Fun, [], T)). + +st(#peer{conn = B}, Acc) + when is_boolean(B) -> + Acc; +st(#peer{conn = Pid}, Acc) -> + Pid ! shutdown, + [Pid | Acc]. + +sw(#peer{pid = Pid}, Acc) -> + exit(Pid, shutdown), + [Pid | Acc]. + +%%% --------------------------------------------------------------------------- +%%% # call_service/2 +%%% --------------------------------------------------------------------------- + +call_service(Pid, Req) + when is_pid(Pid) -> + cs(Pid, Req); +call_service(SvcName, Req) -> + call_service_by_name(SvcName, Req). + +call_service_by_name(SvcName, Req) -> + cs(whois(SvcName), Req). + +cs(Pid, Req) + when is_pid(Pid) -> + try + gen_server:call(Pid, Req, infinity) + catch + E: Reason when E == exit -> + {error, {E, Reason}} + end; + +cs(undefined, _) -> + {error, no_service}. + +%%% --------------------------------------------------------------------------- +%%% # i/1 +%%% +%%% Output: #state{} +%%% --------------------------------------------------------------------------- + +%% Intialize the state of a service gen_server. + +i(SvcName) -> + %% Split the config into a server state and a list of transports. + {#state{} = S, CL} = lists:foldl(fun cfg_acc/2, + {false, []}, + diameter_config:lookup(SvcName)), + + %% Publish the state in order to be able to access it outside of + %% the service process. Originally table identifiers were only + %% known to the service process but we now want to provide the + %% option of application callbacks being 'stateless' in order to + %% avoid having to go through a common process. (Eg. An agent that + %% sends a request for every incoming request.) + true = ets:insert_new(?STATE_TABLE, S), + + %% Start fsms for each transport. + lists:foreach(fun(T) -> start_fsm(T,S) end, CL), + + init_shared(S), + S. + +cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts}, + {false, Acc}) -> + lists:foreach(fun init_mod/1, Apps), + S = #state{service_name = SvcName, + service = Rec#diameter_service{pid = self()}, + share_peers = get_value(share_peers, Opts), + use_shared_peers = get_value(use_shared_peers, Opts), + monitor = mref(get_value(monitor, Opts))}, + {S, Acc}; + +cfg_acc({_Ref, Type, _Opts} = T, {S, Acc}) + when Type == connect; + Type == listen -> + {S, [T | Acc]}. + +mref(false = No) -> + No; +mref(P) -> + erlang:monitor(process, P). + +init_shared(#state{use_shared_peers = true, + service_name = Svc}) -> + diameter_peer:notify(Svc, {service, self()}); +init_shared(#state{use_shared_peers = false}) -> + ok. + +init_mod(#diameter_app{alias = Alias, + init_state = S}) -> + mod_state(Alias, S). + +start_fsm({Ref, Type, Opts}, S) -> + start(Ref, {Type, Opts}, S). + +get_value(Key, Vs) -> + {_, V} = lists:keyfind(Key, 1, Vs), + V. + +%%% --------------------------------------------------------------------------- +%%% # start/3 +%%% --------------------------------------------------------------------------- + +%% If the initial start/3 at service/transport start succeeds then +%% subsequent calls to start/4 on the same service will also succeed +%% since they involve the same call to merge_service/2. We merge here +%% rather than earlier since the service may not yet be configured +%% when the transport is configured. + +start(Ref, {T, Opts}, S) + when T == connect; + T == listen -> + try + {ok, start(Ref, type(T), Opts, S)} + catch + ?FAILURE(Reason) -> + {error, Reason} + end. +%% TODO: don't actually raise any errors yet + +%% There used to be a difference here between the handling of +%% configured listening and connecting transports but now we simply +%% tell the transport_module to start an accepting or connecting +%% process respectively, the transport implementation initiating +%% listening on a port as required. +type(listen) -> accept; +type(accept) -> listen; +type(connect = T) -> T. + +%% start/4 + +start(Ref, Type, Opts, #state{peerT = PeerT, + connT = ConnT, + service_name = SvcName, + service = Svc}) + when Type == connect; + Type == accept -> + Pid = monitor(s(Type, Ref, {ConnT, + Opts, + SvcName, + merge_service(Opts, Svc)})), + insert(PeerT, #peer{pid = Pid, + type = Type, + ref = Ref, + options = Opts}), + Pid. + +%% Note that the service record passed into the watchdog is the merged +%% record so that each watchdog (and peer_fsm) may get a different +%% record. This record is what is passed back into application +%% callbacks. + +s(Type, Ref, T) -> + diameter_watchdog:start({Type, Ref}, T). + +%% merge_service/2 + +merge_service(Opts, Svc) -> + lists:foldl(fun ms/2, Svc, Opts). + +%% Limit the applications known to the fsm to those in the 'apps' +%% option. That this might be empty is checked by the fsm. It's not +%% checked at config-time since there's no requirement that the +%% service be configured first. (Which could be considered a bit odd.) +ms({applications, As}, #diameter_service{applications = Apps} = S) + when is_list(As) -> + S#diameter_service{applications + = [A || A <- Apps, + lists:member(A#diameter_app.alias, As)]}; + +%% The fact that all capabilities can be configured on the transports +%% means that the service doesn't necessarily represent a single +%% locally implemented Diameter peer as identified by Origin-Host: a +%% transport can configure its own Origin-Host. This means that the +%% service little more than a placeholder for default capabilities +%% plus a list of applications that individual transports can choose +%% to support (or not). +ms({capabilities, Opts}, #diameter_service{capabilities = Caps0} = Svc) + when is_list(Opts) -> + %% make_caps has already succeeded in diameter_config so it will succeed + %% again here. + {ok, Caps} = diameter_capx:make_caps(Caps0, Opts), + Svc#diameter_service{capabilities = Caps}; + +ms(_, Svc) -> + Svc. + +%%% --------------------------------------------------------------------------- +%%% # accepted/3 +%%% --------------------------------------------------------------------------- + +accepted(Pid, _TPid, #state{peerT = PeerT} = S) -> + #peer{ref = Ref, type = accept = T, conn = false, options = Opts} + = P + = fetch(PeerT, Pid), + insert(PeerT, P#peer{conn = true}), %% mark replacement transport started + start(Ref, T, Opts, S). %% start new peer + +fetch(Tid, Key) -> + [T] = ets:lookup(Tid, Key), + T. + +%%% --------------------------------------------------------------------------- +%%% # connection_up/3 +%%% +%%% Output: #state{} +%%% --------------------------------------------------------------------------- + +%% Peer process has reached the open state. + +connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT, + connT = ConnT} + = S) -> + P = fetch(PeerT, Pid), + C = #conn{pid = TPid, + apps = SApps, + caps = Caps, + peer = Pid}, + + insert(ConnT, C), + connection_up([Pkt], P#peer{conn = TPid}, C, S). + +%%% --------------------------------------------------------------------------- +%%% # connection_up/2 +%%% +%%% Output: #state{} +%%% --------------------------------------------------------------------------- + +%% Peer process has transitioned back into the open state. Note that there +%% has been no new capabilties exchange in this case. + +connection_up(Pid, #state{peerT = PeerT, + connT = ConnT} + = S) -> + #peer{conn = TPid} = P = fetch(PeerT, Pid), + C = fetch(ConnT, TPid), + connection_up([], P, C, S). + +%% connection_up/4 + +connection_up(T, P, C, #state{peerT = PeerT, + local_peers = LDict, + service_name = SvcName, + service + = #diameter_service{applications = Apps}} + = S) -> + #peer{conn = TPid, op_state = ?STATE_DOWN} + = P, + #conn{apps = SApps, caps = Caps} + = C, + + insert(PeerT, P#peer{op_state = ?STATE_UP}), + + request_peer_up(TPid), + report_status(up, P, C, S, T), + S#state{local_peers = insert_local_peer(SApps, + {{TPid, Caps}, {SvcName, Apps}}, + LDict)}. + +insert_local_peer(SApps, T, LDict) -> + lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps). + +ilp({Id, Alias}, {TC, SA}, LDict) -> + init_conn(Id, Alias, TC, SA), + ?Dict:append(Alias, TC, LDict). + +init_conn(Id, Alias, TC, {SvcName, Apps}) -> + #diameter_app{module = ModX, + id = Id} %% assert + = find_app(Alias, Apps), + + peer_cb({ModX, peer_up, [SvcName, TC]}, Alias). + +find_app(Alias, Apps) -> + lists:keyfind(Alias, #diameter_app.alias, Apps). + +%% A failing peer callback brings down the service. In the case of +%% peer_up we could just kill the transport and emit an error but for +%% peer_down we have no way to cleanup any state change that peer_up +%% may have introduced. +peer_cb(MFA, Alias) -> + try state_cb(MFA, Alias) of + ModS -> + mod_state(Alias, ModS) + catch + E: Reason -> + ?ERROR({E, Reason, MFA, ?STACK}) + end. + +%%% --------------------------------------------------------------------------- +%%% # connection_down/2 +%%% +%%% Output: #state{} +%%% --------------------------------------------------------------------------- + +%% Peer process has transitioned out of the open state. + +connection_down(Pid, #state{peerT = PeerT, + connT = ConnT} + = S) -> + #peer{conn = TPid} + = P + = fetch(PeerT, Pid), + + C = fetch(ConnT, TPid), + insert(PeerT, P#peer{op_state = ?STATE_DOWN}), + connection_down(P,C,S). + +%% connection_down/3 + +connection_down(#peer{conn = TPid, + op_state = ?STATE_UP} + = P, + #conn{caps = Caps, + apps = SApps} + = C, + #state{service_name = SvcName, + service = #diameter_service{applications = Apps}, + local_peers = LDict} + = S) -> + report_status(down, P, C, S, []), + NewS = S#state{local_peers + = remove_local_peer(SApps, + {{TPid, Caps}, {SvcName, Apps}}, + LDict)}, + request_peer_down(TPid, NewS), + NewS. + +remove_local_peer(SApps, T, LDict) -> + lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). + +rlp({Id, Alias}, {TC, SA}, LDict) -> + L = ?Dict:fetch(Alias, LDict), + down_conn(Id, Alias, TC, SA), + ?Dict:store(Alias, lists:delete(TC, L), LDict). + +down_conn(Id, Alias, TC, {SvcName, Apps}) -> + #diameter_app{module = ModX, + id = Id} %% assert + = find_app(Alias, Apps), + + peer_cb({ModX, peer_down, [SvcName, TC]}, Alias). + +%%% --------------------------------------------------------------------------- +%%% # peer_down/3 +%%% +%%% Output: #state{} +%%% --------------------------------------------------------------------------- + +%% Peer process has died. + +peer_down(Pid, _Reason, #state{peerT = PeerT} = S) -> + P = fetch(PeerT, Pid), + ets:delete_object(PeerT, P), + restart(P,S), + peer_down(P,S). + +%% peer_down/2 + +%% The peer has never come up ... +peer_down(#peer{conn = B}, S) + when is_boolean(B) -> + S; + +%% ... or it has. +peer_down(#peer{ref = Ref, + conn = TPid, + type = Type, + options = Opts} + = P, + #state{service_name = SvcName, + connT = ConnT} + = S) -> + #conn{caps = Caps} + = C + = fetch(ConnT, TPid), + ets:delete_object(ConnT, C), + try + pd(P,C,S) + after + send_event(SvcName, {closed, Ref, {TPid, Caps}, {type(Type), Opts}}) + end. + +pd(#peer{op_state = ?STATE_DOWN}, _, S) -> + S; +pd(#peer{op_state = ?STATE_UP} = P, C, S) -> + connection_down(P,C,S). + +%% restart/2 + +restart(P,S) -> + q_restart(restart(P), S). + +%% restart/1 + +%% Always try to reconnect. +restart(#peer{ref = Ref, + type = connect = T, + options = Opts, + started = Time}) -> + {Time, {Ref, T, Opts}}; + +%% Transport connection hasn't yet been accepted ... +restart(#peer{ref = Ref, + type = accept = T, + options = Opts, + conn = false, + started = Time}) -> + {Time, {Ref, T, Opts}}; + +%% ... or it has: a replacement transport has already been spawned. +restart(#peer{type = accept}) -> + false. + +%% q_restart/2 + +%% Start the reconnect timer. +q_restart({Time, {_Ref, Type, Opts} = T}, S) -> + start_tc(tc(Time, default_tc(Type, Opts)), T, S); +q_restart(false, _) -> + ok. + +%% RFC 3588, 2.1: +%% +%% When no transport connection exists with a peer, an attempt to +%% connect SHOULD be periodically made. This behavior is handled via +%% the Tc timer, whose recommended value is 30 seconds. There are +%% certain exceptions to this rule, such as when a peer has terminated +%% the transport connection stating that it does not wish to +%% communicate. + +default_tc(connect, Opts) -> + proplists:get_value(reconnect_timer, Opts, ?DEFAULT_TC); +default_tc(accept, _) -> + 0. + +%% Bound tc below if the peer was restarted recently to avoid +%% continuous in case of faulty config or other problems. +tc(Time, Tc) -> + choose(Tc > ?RESTART_TC + orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC, + Tc, + ?RESTART_TC). + +start_tc(0, T, S) -> + tc_timeout(T, S); +start_tc(Tc, T, _) -> + erlang:send_after(Tc, self(), {tc_timeout, T}). + +%% tc_timeout/2 + +tc_timeout({Ref, _Type, _Opts} = T, #state{service_name = SvcName} = S) -> + tc(have_transport(SvcName, Ref), T, S). + +tc(true, {Ref, Type, Opts}, #state{service_name = SvcName} + = S) -> + send_event(SvcName, {reconnect, Ref, Opts}), + start(Ref, Type, Opts, S); +tc(false = No, _, _) -> %% removed + No. + +%%% --------------------------------------------------------------------------- +%%% # close/2 +%%% --------------------------------------------------------------------------- + +%% The watchdog doesn't start a new fsm in the accept case, it +%% simply stays alive until someone tells it to die in order for +%% another watchdog to be able to detect that it should transition +%% from initial into reopen rather than okay. That someone is either +%% the accepting watchdog upon reception of a CER from the previously +%% connected peer, or us after reconnect_timer timeout. + +close(Pid, #state{service_name = SvcName, + peerT = PeerT}) -> + #peer{pid = Pid, + type = accept, + ref = Ref, + options = Opts} + = fetch(PeerT, Pid), + + c(Pid, have_transport(SvcName, Ref), Opts). + +%% Tell watchdog to (maybe) die later ... +c(Pid, true, Opts) -> + Tc = proplists:get_value(reconnect_timer, Opts, 2*?DEFAULT_TC), + erlang:send_after(Tc, Pid, close); + +%% ... or now. +c(Pid, false, _Opts) -> + Pid ! close. + +%% The RFC's only document the behaviour of Tc, our reconnect_timer, +%% for the establishment of connections but we also give +%% reconnect_timer semantics for a listener, being the time within +%% which a new connection attempt is expected of a connecting peer. +%% The value should be greater than the peer's Tc + jitter. + +%%% --------------------------------------------------------------------------- +%%% # reconnect/2 +%%% --------------------------------------------------------------------------- + +reconnect(Pid, #state{service_name = SvcName, + peerT = PeerT}) -> + #peer{ref = Ref, + type = connect, + options = Opts} + = fetch(PeerT, Pid), + send_event(SvcName, {reconnect, Ref, Opts}). + +%%% --------------------------------------------------------------------------- +%%% # call_module/4 +%%% --------------------------------------------------------------------------- + +%% Backwards compatibility and never documented/advertised. May be +%% removed. + +call_module(Mod, Req, From, #state{service + = #diameter_service{applications = Apps}, + service_name = Svc} + = S) -> + case cm([A || A <- Apps, Mod == hd(A#diameter_app.module)], + Req, + From, + Svc) + of + {reply = T, RC} -> + {T, RC, S}; + noreply = T -> + {T, S}; + Reason -> + {reply, {error, Reason}, S} + end. + +cm([#diameter_app{module = ModX, alias = Alias}], Req, From, Svc) -> + MFA = {ModX, handle_call, [Req, From, Svc]}, + + try state_cb(MFA, Alias) of + {noreply = T, ModS} -> + mod_state(Alias, ModS), + T; + {reply = T, RC, ModS} -> + mod_state(Alias, ModS), + {T, RC}; + T -> + diameter_lib:error_report({invalid, T}, MFA), + invalid + catch + E: Reason -> + diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA), + failure + end; + +cm([], _, _, _) -> + unknown; + +cm([_,_|_], _, _, _) -> + multiple. + +%%% --------------------------------------------------------------------------- +%%% # send_request/5 +%%% --------------------------------------------------------------------------- + +%% Send an outgoing request in its dedicated process. +%% +%% Note that both encode of the outgoing request and of the received +%% answer happens in this process. It's also this process that replies +%% to the caller. The service process only handles the state-retaining +%% callbacks. +%% +%% The mod field of the #diameter_app{} here includes any extra +%% arguments passed to diameter:call/2. + +send_request({TPid, Caps, App}, Msg, Opts, Caller, SvcName) -> + #diameter_app{module = ModX} + = App, + + Pkt = make_packet(Msg), + + case cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]) of + {send, P} -> + send_request(make_packet(P, Pkt), + TPid, + Caps, + App, + Opts, + Caller, + SvcName); + {discard, Reason} -> + {error, Reason}; + discard -> + {error, discarded}; + T -> + ?ERROR({invalid_return, prepare_request, App, T}) + end. + +%% make_packet/1 +%% +%% Turn an outgoing request as passed to call/4 into a diameter_packet +%% record in preparation for a prepare_request callback. There are two +%% cases: a diameter_packet as argument when we're calling call/4 +%% ourselves in order to relay a request or a bare message in case the +%% call came by way of diameter:call/4. + +make_packet(Bin) + when is_binary(Bin) -> + #diameter_packet{header = diameter_codec:decode_header(Bin), + bin = Bin}; + +make_packet(#diameter_packet{msg = [#diameter_header{} | _]} = Pkt) -> + Pkt; + +make_packet(#diameter_packet{header = Hdr} = Pkt) -> + Pkt#diameter_packet{header = make_header(Hdr)}; + +make_packet(Msg) -> + make_packet(#diameter_packet{msg = Msg}). + +%% make_header/1 + +make_header(undefined) -> + Seq = diameter_session:sequence(), + make_header(#diameter_header{end_to_end_id = Seq, + hop_by_hop_id = Seq}); + +make_header(#diameter_header{version = undefined} = Hdr) -> + make_header(Hdr#diameter_header{version = ?DIAMETER_VERSION}); + +make_header(#diameter_header{end_to_end_id = undefined} = H) -> + Seq = diameter_session:sequence(), + make_header(H#diameter_header{end_to_end_id = Seq}); + +make_header(#diameter_header{hop_by_hop_id = undefined} = H) -> + Seq = diameter_session:sequence(), + make_header(H#diameter_header{hop_by_hop_id = Seq}); + +make_header(#diameter_header{} = Hdr) -> + Hdr; + +make_header(T) -> + ?ERROR({invalid_header, T}). + +%% make_packet/2 +%% +%% Reconstruct a diameter_packet from the return value of +%% prepare_request or prepare_retransmit callback. + +make_packet(Bin, _) + when is_binary(Bin) -> + make_packet(Bin); + +make_packet(#diameter_packet{msg = [#diameter_header{} | _]} = Pkt, _) -> + Pkt; + +%% Returning a diameter_packet with no header from a prepare_request +%% or prepare_retransmit callback retains the header passed into it. +%% This is primarily so that the end to end and hop by hop identifiers +%% are retained. +make_packet(#diameter_packet{header = Hdr} = Pkt, + #diameter_packet{header = Hdr0}) -> + Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)}; + +make_packet(Msg, Pkt) -> + Pkt#diameter_packet{msg = Msg}. + +%% fold_record/2 + +fold_record(undefined, R) -> + R; +fold_record(Rec, R) -> + diameter_lib:fold_tuple(2, Rec, R). + +%% send_request/7 + +send_request(Pkt, TPid, Caps, App, Opts, Caller, SvcName) -> + #diameter_app{alias = Alias, + dictionary = Dict, + module = ModX, + answer_errors = AE} + = App, + + EPkt = encode(Dict, Pkt), + + #options{filter = Filter, + timeout = Timeout} + = Opts, + + Req = #request{packet = Pkt, + from = Caller, + handler = self(), + transport = TPid, + caps = Caps, + app = Alias, + filter = Filter, + dictionary = Dict, + module = ModX}, + + try + TRef = send_request(TPid, EPkt, Req, Timeout), + ack(Caller), + handle_answer(SvcName, AE, recv_answer(Timeout, SvcName, {TRef, Req})) + after + erase_request(EPkt) + end. + +%% Tell caller a send has been attempted. +ack({Pid, Ref}) -> + Pid ! Ref. + +%% recv_answer/3 + +recv_answer(Timeout, + SvcName, + {TRef, #request{from = {_, Ref}, packet = RPkt} = Req} + = T) -> + + %% Matching on TRef below ensures we ignore messages that pertain + %% to a previous transport prior to failover. The answer message + %% includes the #request{} since it's not necessarily Req; that + %% is, from the last peer to which we've transmitted. + + receive + {answer = A, Ref, Rq, Pkt} -> %% Answer from peer. + {A, Rq, Pkt}; + {timeout = Reason, TRef, _} -> %% No timely reply + {error, Req, Reason}; + {failover = Reason, TRef, false} -> %% No alternative peer. + {error, Req, Reason}; + {failover, TRef, Transport} -> %% Resend to alternate peer. + try_retransmit(Timeout, SvcName, Req, Transport); + {failover, TRef} -> %% May have missed failover notification. + Seqs = diameter_codec:sequence_numbers(RPkt), + Pid = whois(SvcName), + is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}), + recv_answer(Timeout, SvcName, T) + end. +%% Note that failover starts a new timer and that expiry of an old +%% timer value is ignored. This means that an answer could be accepted +%% from a peer after timeout in the case of failover. + +try_retransmit(Timeout, SvcName, Req, Transport) -> + try retransmit(Transport, Req, SvcName, Timeout) of + T -> recv_answer(Timeout, SvcName, T) + catch + ?FAILURE(Reason) -> {error, Req, Reason} + end. + +%% handle_error/3 + +handle_error(Req, Reason, SvcName) -> + #request{module = ModX, + packet = Pkt, + transport = TPid, + caps = Caps} + = Req, + cb(ModX, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]). + +msg(#diameter_packet{msg = undefined, bin = Bin}) -> + Bin; +msg(#diameter_packet{msg = Msg}) -> + Msg. + +%% encode/2 + +%% Note that prepare_request can return a diameter_packet containing +%% header or transport_data. Even allow the returned record to contain +%% an encoded binary. This isn't the usual case but could some in +%% handy, for test at least. (For example, to send garbage.) + +%% The normal case: encode the returned message. +encode(Dict, #diameter_packet{msg = Msg, bin = undefined} = Pkt) -> + D = pick_dictionary([Dict, ?BASE], Msg), + diameter_codec:encode(D, Pkt); + +%% Callback has returned an encoded binary: just send. +encode(_, #diameter_packet{} = Pkt) -> + Pkt. + +%% pick_dictionary/2 + +%% Pick the first dictionary that declares the application id in the +%% specified header. +pick_dictionary(Ds, [#diameter_header{application_id = Id} | _]) -> + pd(Ds, fun(D) -> Id = D:id() end); + +%% Pick the first dictionary that knows the specified message name. +pick_dictionary(Ds, [MsgName|_]) -> + pd(Ds, fun(D) -> D:msg2rec(MsgName) end); + +%% Pick the first dictionary that knows the name of the specified +%% message record. +pick_dictionary(Ds, Rec) -> + Name = element(1, Rec), + pd(Ds, fun(D) -> D:rec2msg(Name) end). + +pd([D|Ds], F) -> + try + F(D), + D + catch + error:_ -> + pd(Ds, F) + end; + +pd([], _) -> + ?ERROR(no_dictionary). + +%% send_request/4 + +send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, Timeout) + when node() == node(TPid) -> + %% Store the outgoing request before sending to avoid a race with + %% reply reception. + TRef = store_request(TPid, Bin, Req, Timeout), + send(TPid, Pkt), + TRef; + +%% Send using a remote transport: spawn a process on the remote node +%% to relay the answer. +send_request(TPid, #diameter_packet{} = Pkt, Req, Timeout) -> + TRef = erlang:start_timer(Timeout, self(), timeout), + T = {TPid, Pkt, Req, Timeout, TRef}, + spawn(node(TPid), ?MODULE, send, [T]), + TRef. + +%% send/1 + +send({TPid, Pkt, #request{handler = Pid} = Req, Timeout, TRef}) -> + Ref = send_request(TPid, Pkt, Req#request{handler = self()}, Timeout), + Pid ! reref(receive T -> T end, Ref, TRef). + +reref({T, Ref, R}, Ref, TRef) -> + {T, TRef, R}; +reref(T, _, _) -> + T. + +%% send/2 + +send(Pid, Pkt) -> + Pid ! {send, Pkt}. + +%% retransmit/4 + +retransmit({TPid, Caps, #diameter_app{alias = Alias} = App}, + #request{app = Alias, + packet = Pkt} + = Req, + SvcName, + Timeout) -> + have_request(Pkt, TPid) %% Don't failover to a peer we've + andalso ?THROW(timeout), %% already sent to. + + case cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]) of + {send, P} -> + retransmit(make_packet(P, Pkt), TPid, Caps, Req, Timeout); + {discard, Reason} -> + ?THROW(Reason); + discard -> + ?THROW(discarded); + T -> + ?ERROR({invalid_return, prepare_retransmit, App, T}) + end. + +%% retransmit/5 + +retransmit(Pkt, TPid, Caps, #request{dictionary = Dict} = Req, Timeout) -> + EPkt = encode(Dict, Pkt), + + NewReq = Req#request{transport = TPid, + packet = Pkt, + caps = Caps}, + + ?LOG(retransmission, NewReq), + TRef = send_request(TPid, EPkt, NewReq, Timeout), + {TRef, NewReq}. + +%% store_request/4 + +store_request(TPid, Bin, Req, Timeout) -> + Seqs = diameter_codec:sequence_numbers(Bin), + TRef = erlang:start_timer(Timeout, self(), timeout), + ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}), + ets:member(?REQUEST_TABLE, TPid) + orelse (self() ! {failover, TRef}), %% possibly missed failover + TRef. + +%% lookup_request/2 + +lookup_request(Msg, TPid) + when is_pid(TPid) -> + lookup(Msg, TPid, '_'); + +lookup_request(Msg, TRef) + when is_reference(TRef) -> + lookup(Msg, '_', TRef). + +lookup(Msg, TPid, TRef) -> + Seqs = diameter_codec:sequence_numbers(Msg), + Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, TRef}, + [], + ['$_']}], + case ets:select(?REQUEST_TABLE, Spec) of + [{_, Req, _}] -> + Req; + [] -> + false + end. + +%% erase_request/1 + +erase_request(Pkt) -> + ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)). + +%% match_requests/1 + +match_requests(TPid) -> + Pat = {'_', #request{transport = TPid, _ = '_'}, '_'}, + ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}]). + +%% have_request/2 + +have_request(Pkt, TPid) -> + Seqs = diameter_codec:sequence_numbers(Pkt), + Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'}, + '$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1). + +%% request_peer_up/1 + +request_peer_up(TPid) -> + ets:insert(?REQUEST_TABLE, {TPid}). + +%% request_peer_down/2 + +request_peer_down(TPid, S) -> + ets:delete(?REQUEST_TABLE, TPid), + lists:foreach(fun(T) -> failover(T,S) end, match_requests(TPid)). +%% Note that a request process can store its request after failover +%% notifications are sent here: store_request/4 sends the notification +%% in that case. Note also that we'll send as many notifications to a +%% given handler as there are peers its sent to. All but one of these +%% will be ignored. + +%%% --------------------------------------------------------------------------- +%%% recv_request/3 +%%% --------------------------------------------------------------------------- + +recv_request(TPid, Pkt, {ConnT, SvcName, Apps}) -> + try ets:lookup(ConnT, TPid) of + [C] -> + recv_request(C, TPid, Pkt, SvcName, Apps); + [] -> %% transport has gone down + ok + catch + error: badarg -> %% service has gone down (and taken table with it) + ok + end. + +%% recv_request/5 + +recv_request(#conn{apps = SApps, caps = Caps}, TPid, Pkt, SvcName, Apps) -> + #diameter_caps{origin_host = {OH,_}, + origin_realm = {OR,_}} + = Caps, + + #diameter_packet{header = #diameter_header{application_id = Id}} + = Pkt, + + recv_request(find_recv_app(Id, SApps), + {SvcName, OH, OR}, + TPid, + Apps, + Caps, + Pkt). + +%% find_recv_app/2 + +%% No one should be sending the relay identifier. +find_recv_app(?APP_ID_RELAY, _) -> + false; + +%% With any other id we either support it locally or as a relay. +find_recv_app(Id, SApps) -> + keyfind([Id, ?APP_ID_RELAY], 1, SApps). + +%% keyfind/3 + +keyfind([], _, _) -> + false; +keyfind([Key | Rest], Pos, L) -> + case lists:keyfind(Key, Pos, L) of + false -> + keyfind(Rest, Pos, L); + T -> + T + end. + +%% recv_request/6 + +recv_request({Id, Alias}, T, TPid, Apps, Caps, Pkt) -> + #diameter_app{dictionary = Dict} + = A + = find_app(Alias, Apps), + recv_request(T, {TPid, Caps}, A, diameter_codec:decode(Id, Dict, Pkt)); +%% Note that the decode is different depending on whether or not Id is +%% ?APP_ID_RELAY. + +%% DIAMETER_APPLICATION_UNSUPPORTED 3007 +%% A request was sent for an application that is not supported. + +recv_request(false, {_, OH, OR}, TPid, _, _, Pkt) -> + ?LOG({error, application}, Pkt), + reply(answer_message({OH, OR, 3007}, collect_avps(Pkt)), ?BASE, TPid, Pkt). + +collect_avps(Pkt) -> + case diameter_codec:collect_avps(Pkt) of + {_Bs, As} -> + As; + As -> + As + end. + +%% recv_request/4 + +%% Wrong number of bits somewhere in the message: reply. +%% +%% DIAMETER_INVALID_AVP_BITS 3009 +%% A request was received that included an AVP whose flag bits are +%% set to an unrecognized value, or that is inconsistent with the +%% AVP's definition. +%% +recv_request({_, OH, OR}, {TPid, _}, _, #diameter_packet{errors = [Bs | _], + bin = Bin, + avps = Avps} + = Pkt) + when is_bitstring(Bs) -> + ?LOG({error, invalid_avp_bits}, Bin), + reply(answer_message({OH, OR, 3009}, Avps), ?BASE, TPid, Pkt); + +%% Either we support this application but don't recognize the command +%% or we're a relay and the command isn't proxiable. +%% +%% DIAMETER_COMMAND_UNSUPPORTED 3001 +%% The Request contained a Command-Code that the receiver did not +%% recognize or support. This MUST be used when a Diameter node +%% receives an experimental command that it does not understand. +%% +recv_request({_, OH, OR}, + {TPid, _}, + #diameter_app{id = Id}, + #diameter_packet{header = #diameter_header{is_proxiable = P}, + msg = M, + avps = Avps, + bin = Bin} + = Pkt) + when ?APP_ID_RELAY /= Id, undefined == M; + ?APP_ID_RELAY == Id, not P -> + ?LOG({error, command_unsupported}, Bin), + reply(answer_message({OH, OR, 3001}, Avps), ?BASE, TPid, Pkt); + +%% Error bit was set on a request. +%% +%% DIAMETER_INVALID_HDR_BITS 3008 +%% A request was received whose bits in the Diameter header were +%% either set to an invalid combination, or to a value that is +%% inconsistent with the command code's definition. +%% +recv_request({_, OH, OR}, + {TPid, _}, + _, + #diameter_packet{header = #diameter_header{is_error = true}, + avps = Avps, + bin = Bin} + = Pkt) -> + ?LOG({error, error_bit}, Bin), + reply(answer_message({OH, OR, 3008}, Avps), ?BASE, TPid, Pkt); + +%% A message in a locally supported application or a proxiable message +%% in the relay application. Don't distinguish between the two since +%% each application has its own callback config. That is, the user can +%% easily distinguish between the two cases. +recv_request(T, TC, App, Pkt) -> + request_cb(T, TC, App, examine(Pkt)). + +%% Note that there may still be errors but these aren't protocol +%% (3xxx) errors that lead to an answer-message. + +request_cb({SvcName, _OH, _OR} = T, TC, App, Pkt) -> + request_cb(cb(App, handle_request, [Pkt, SvcName, TC]), App, T, TC, Pkt). + +%% examine/1 +%% +%% Look for errors in a decoded message. Length errors result in +%% decode failure in diameter_codec. + +examine(#diameter_packet{header = #diameter_header{version + = ?DIAMETER_VERSION}} + = Pkt) -> + Pkt; + +%% DIAMETER_UNSUPPORTED_VERSION 5011 +%% This error is returned when a request was received, whose version +%% number is unsupported. + +examine(#diameter_packet{errors = Es} = Pkt) -> + Pkt#diameter_packet{errors = [5011 | Es]}. +%% It's odd/unfortunate that this isn't a protocol error. + +%% request_cb/5 + +%% A reply may be an answer-message, constructed either here or by +%% the handle_request callback. The header from the incoming request +%% is passed into the encode so that it can retrieve the relevant +%% command code in this case. It will also then ignore Dict and use +%% the base encoder. +request_cb({reply, Ans}, + #diameter_app{dictionary = Dict}, + _, + {TPid, _}, + Pkt) -> + reply(Ans, Dict, TPid, Pkt); + +%% An 3xxx result code, for which the E-bit is set in the header. +request_cb({protocol_error, RC}, _, T, {TPid, _}, Pkt) + when 3000 =< RC, RC < 4000 -> + protocol_error(RC, T, TPid, Pkt); + +%% RFC 3588 says we must reply 3001 to anything unrecognized or +%% unsupported. 'noreply' is undocumented (and inappropriately named) +%% backwards compatibility for this, protocol_error the documented +%% alternative. +request_cb(noreply, _, T, {TPid, _}, Pkt) -> + protocol_error(3001, T, TPid, Pkt); + +%% Relay a request to another peer. This is equivalent to doing an +%% explicit call/4 with the message in question except that (1) a loop +%% will be detected by examining Route-Record AVP's, (3) a +%% Route-Record AVP will be added to the outgoing request and (3) the +%% End-to-End Identifier will default to that in the +%% #diameter_header{} without the need for an end_to_end_identifier +%% option. +%% +%% relay and proxy are similar in that they require the same handling +%% with respect to Route-Record and End-to-End identifier. The +%% difference is that a proxy advertises specific applications, while +%% a relay advertises the relay application. If a callback doesn't +%% want to distinguish between the cases in the callback return value +%% then 'resend' is a neutral alternative. +%% +request_cb({A, Opts}, + #diameter_app{id = Id} + = App, + T, + TC, + Pkt) + when A == relay, Id == ?APP_ID_RELAY; + A == proxy, Id /= ?APP_ID_RELAY; + A == resend -> + resend(Opts, App, T, TC, Pkt); + +request_cb(discard, _, _, _, _) -> + ok; + +request_cb({eval, RC, F}, App, T, TC, Pkt) -> + request_cb(RC, App, T, TC, Pkt), + diameter_lib:eval(F). + +%% protocol_error/4 + +protocol_error(RC, {_, OH, OR}, TPid, #diameter_packet{avps = Avps} = Pkt) -> + ?LOG({error, RC}, Pkt), + reply(answer_message({OH, OR, RC}, Avps), ?BASE, TPid, Pkt). + +%% resend/5 +%% +%% Resend a message as a relay or proxy agent. + +resend(Opts, + #diameter_app{} = App, + {_SvcName, OH, _OR} = T, + {_TPid, _Caps} = TC, + #diameter_packet{avps = Avps} = Pkt) -> + {Code, _Flags, Vid} = ?BASE:avp_header('Route-Record'), + resend(is_loop(Code, Vid, OH, Avps), Opts, App, T, TC, Pkt). + +%% DIAMETER_LOOP_DETECTED 3005 +%% An agent detected a loop while trying to get the message to the +%% intended recipient. The message MAY be sent to an alternate peer, +%% if one is available, but the peer reporting the error has +%% identified a configuration problem. + +resend(true, _, _, T, {TPid, _}, Pkt) -> %% Route-Record loop + protocol_error(3005, T, TPid, Pkt); + +%% 6.1.8. Relaying and Proxying Requests +%% +%% A relay or proxy agent MUST append a Route-Record AVP to all requests +%% forwarded. The AVP contains the identity of the peer the request was +%% received from. + +resend(false, + Opts, + App, + {SvcName, _, _}, + {TPid, #diameter_caps{origin_host = {_, OH}}}, + #diameter_packet{header = Hdr0, + avps = Avps} + = Pkt) -> + Route = #diameter_avp{data = {?BASE, 'Route-Record', OH}}, + Seq = diameter_session:sequence(), + Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq}, + Msg = [Hdr, Route | Avps], + %% Filter sender as ineligible receiver. + reply(call(SvcName, App, Msg, [{filter, {neg, {host, OH}}} | Opts]), + TPid, + Pkt). +%% The incoming request is relayed with the addition of a +%% Route-Record. Note the requirement on the return from call/4. +%% This places a requirement on the values returned by the +%% handle_answer and handle_error callbacks of the application module +%% in question. +%% +%% RFC 6.3 says that a relay agent does not modify Origin-Host but +%% says nothing about a proxy. Assume it should behave the same way. + +%% reply/3 +%% +%% Relay a reply to a relayed request. + +%% Answer from the peer: reset the hop by hop identifier and send. +reply(#diameter_packet{bin = B} + = Pkt, + TPid, + #diameter_packet{header = #diameter_header{hop_by_hop_id = Id}, + transport_data = TD}) -> + send(TPid, Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B), + transport_data = TD}); +%% TODO: counters + +%% Not. Ignoring the error feels harsh but there is no appropriate +%% Result-Code for a protocol error (which this isn't really anyway) +%% and the RFC doesn't provide any guidance how to act. A weakness +%% here is that we don't deal well with a decode error: the request +%% will simply timeout on the peer's end. Better would be to just send +%% the answer (with modified hop by hop identifier) on regardless, at +%% least in the relay case in which there's no examination of the +%% answer. In the proxy case it's not clear that the callback won't +%% examine the answer. Just be quiet here since a decode error causes +%% the request process to crash (or not depending on the error and +%% config and/or handle_answer callback). +reply(_, _, _) -> + ok. + +%% is_loop/4 +%% +%% Is there a Route-Record AVP with our Origin-Host? + +is_loop(Code, + Vid, + Bin, + [#diameter_avp{code = Code, vendor_id = Vid, data = Bin} | _]) -> + true; + +is_loop(_, _, _, []) -> + false; + +is_loop(Code, Vid, OH, [_ | Avps]) + when is_binary(OH) -> + is_loop(Code, Vid, OH, Avps); + +is_loop(Code, Vid, OH, Avps) -> + is_loop(Code, Vid, ?BASE:avp(encode, OH, 'Route-Record'), Avps). + +%% reply/4 +%% +%% Send a locally originating reply. + +reply(Msg, Dict, TPid, #diameter_packet{errors = Es, + transport_data = TD} + = ReqPkt) + when [] == Es; + is_record(hd(Msg), diameter_header) -> + Pkt = diameter_codec:encode(Dict, make_reply_packet(Msg, ReqPkt)), + incr(send, Pkt, Dict, TPid), %% count result codes in sent answers + send(TPid, Pkt#diameter_packet{transport_data = TD}); + +%% Simplify the handling of error cases by accepting a list consisting +%% of an answer record followed by failed AVPs to be packed into a +%% Failed-AVP field, either directly or into an AVP field. Only if +%% the message is a tuple-list or record however, not a list +%% with a list of #diameter_header{} and #diameter_avp{}. +reply(Msg, Dict, TPid, #diameter_packet{errors = [H|_] = Es} = Pkt) -> + reply(rc(Msg, rc(H), [A || {_,A} <- Es], Dict), + Dict, + TPid, + Pkt#diameter_packet{errors = []}). + +%% make_reply_packet/2 + +make_reply_packet(Bin, _) + when is_binary(Bin) -> + #diameter_packet{bin = Bin}; + +make_reply_packet([#diameter_header{} | _] = Msg, _) -> + #diameter_packet{msg = Msg}; + +make_reply_packet(Msg, #diameter_packet{header = ReqHdr}) -> + #diameter_header{end_to_end_id = EId, + hop_by_hop_id = Hid, + is_proxiable = P} + = ReqHdr, + + Hdr = #diameter_header{version = ?DIAMETER_VERSION, + end_to_end_id = EId, + hop_by_hop_id = Hid, + is_proxiable = P, + is_retransmitted = false}, + #diameter_packet{header = Hdr, + msg = Msg}. + +%% rc/1 + +rc({RC, _}) -> + RC; +rc(RC) -> + RC. + +%% rc/4 + +rc(Rec, RC, Failed, Dict) + when is_integer(RC) -> + set(Rec, [{'Result-Code', RC} | failed_avp(Rec, Failed, Dict)], Dict). + +%% Reply as name and tuple list ... +set([_|_] = Ans, Avps, _) -> + Ans ++ Avps; %% Values nearer tail take precedence. + +%% ... or record. +set(Rec, Avps, Dict) -> + Dict:'#set-'(Avps, Rec). + +%% failed_avp/3 + +failed_avp(_, [] = No, _) -> + No; + +failed_avp(Rec, Failed, Dict) -> + [fa(Rec, [{'AVP', Failed}], Dict)]. + +%% Reply as name and tuple list ... +fa([MsgName | Values], FailedAvp, Dict) -> + R = Dict:msg2rec(MsgName), + try + Dict:'#info-'(R, {index, 'Failed-AVP'}), + {'Failed-AVP', [FailedAvp]} + catch + error: _ -> + Avps = proplists:get_value('AVP', Values, []), + A = #diameter_avp{name = 'Failed-AVP', + value = FailedAvp}, + {'AVP', [A|Avps]} + end; + +%% ... or record. +fa(Rec, FailedAvp, Dict) -> + try + {'Failed-AVP', [FailedAvp]} + catch + error: _ -> + Avps = Dict:'get-'('AVP', Rec), + A = #diameter_avp{name = 'Failed-AVP', + value = FailedAvp}, + {'AVP', [A|Avps]} + end. + +%% 3. Diameter Header +%% +%% E(rror) - If set, the message contains a protocol error, +%% and the message will not conform to the ABNF +%% described for this command. Messages with the 'E' +%% bit set are commonly referred to as error +%% messages. This bit MUST NOT be set in request +%% messages. See Section 7.2. + +%% 3.2. Command Code ABNF specification +%% +%% e-bit = ", ERR" +%% ; If present, the 'E' bit in the Command +%% ; Flags is set, indicating that the answer +%% ; message contains a Result-Code AVP in +%% ; the "protocol error" class. + +%% 7.1.3. Protocol Errors +%% +%% Errors that fall within the Protocol Error category SHOULD be treated +%% on a per-hop basis, and Diameter proxies MAY attempt to correct the +%% error, if it is possible. Note that these and only these errors MUST +%% only be used in answer messages whose 'E' bit is set. + +%% Thus, only construct answers to protocol errors. Other errors +%% require an message-specific answer and must be handled by the +%% application. + +%% 6.2. Diameter Answer Processing +%% +%% When a request is locally processed, the following procedures MUST be +%% applied to create the associated answer, in addition to any +%% additional procedures that MAY be discussed in the Diameter +%% application defining the command: +%% +%% - The same Hop-by-Hop identifier in the request is used in the +%% answer. +%% +%% - The local host's identity is encoded in the Origin-Host AVP. +%% +%% - The Destination-Host and Destination-Realm AVPs MUST NOT be +%% present in the answer message. +%% +%% - The Result-Code AVP is added with its value indicating success or +%% failure. +%% +%% - If the Session-Id is present in the request, it MUST be included +%% in the answer. +%% +%% - Any Proxy-Info AVPs in the request MUST be added to the answer +%% message, in the same order they were present in the request. +%% +%% - The 'P' bit is set to the same value as the one in the request. +%% +%% - The same End-to-End identifier in the request is used in the +%% answer. +%% +%% Note that the error messages (see Section 7.3) are also subjected to +%% the above processing rules. + +%% 7.3. Error-Message AVP +%% +%% The Error-Message AVP (AVP Code 281) is of type UTF8String. It MAY +%% accompany a Result-Code AVP as a human readable error message. The +%% Error-Message AVP is not intended to be useful in real-time, and +%% SHOULD NOT be expected to be parsed by network entities. + +%% answer_message/2 + +answer_message({OH, OR, RC}, Avps) -> + {Code, _, Vid} = ?BASE:avp_header('Session-Id'), + ['answer-message', {'Origin-Host', OH}, + {'Origin-Realm', OR}, + {'Result-Code', RC} + | session_id(Code, Vid, Avps)]. + +session_id(Code, Vid, Avps) + when is_list(Avps) -> + try + {value, #diameter_avp{} = Avp} = find_avp(Code, Vid, Avps), + Avp + catch + error: _ -> + [] + end; + +session_id(Code, Vid, Avps) + when is_list(Avps) -> + try + {value, #diameter_avp{data = D}} = find_avp(Code, Vid, Avps), + [{'Session-Id', [?BASE:avp(decode, D, 'Session-Id')]}] + catch + error: _ -> + [] + end. + +%% find_avp/3 + +find_avp(Code, Vid, Avps) + when is_integer(Code), (undefined == Vid orelse is_integer(Vid)) -> + find(fun(A) -> is_avp(Code, Vid, A) end, Avps). + +%% The final argument here could be a list of AVP's, depending on the case, +%% but we're only searching at the top level. +is_avp(Code, Vid, #diameter_avp{code = Code, vendor_id = Vid}) -> + true; +is_avp(_, _, _) -> + false. + +find(_, []) -> + false; +find(Pred, [H|T]) -> + case Pred(H) of + true -> + {value, H}; + false -> + find(Pred, T) + end. + +%% 7. Error Handling +%% +%% There are certain Result-Code AVP application errors that require +%% additional AVPs to be present in the answer. In these cases, the +%% Diameter node that sets the Result-Code AVP to indicate the error +%% MUST add the AVPs. Examples are: +%% +%% - An unrecognized AVP is received with the 'M' bit (Mandatory bit) +%% set, causes an answer to be sent with the Result-Code AVP set to +%% DIAMETER_AVP_UNSUPPORTED, and the Failed-AVP AVP containing the +%% offending AVP. +%% +%% - An AVP that is received with an unrecognized value causes an +%% answer to be returned with the Result-Code AVP set to +%% DIAMETER_INVALID_AVP_VALUE, with the Failed-AVP AVP containing the +%% AVP causing the error. +%% +%% - A command is received with an AVP that is omitted, yet is +%% mandatory according to the command's ABNF. The receiver issues an +%% answer with the Result-Code set to DIAMETER_MISSING_AVP, and +%% creates an AVP with the AVP Code and other fields set as expected +%% in the missing AVP. The created AVP is then added to the Failed- +%% AVP AVP. +%% +%% The Result-Code AVP describes the error that the Diameter node +%% encountered in its processing. In case there are multiple errors, +%% the Diameter node MUST report only the first error it encountered +%% (detected possibly in some implementation dependent order). The +%% specific errors that can be described by this AVP are described in +%% the following section. + +%% 7.5. Failed-AVP AVP +%% +%% The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides +%% debugging information in cases where a request is rejected or not +%% fully processed due to erroneous information in a specific AVP. The +%% value of the Result-Code AVP will provide information on the reason +%% for the Failed-AVP AVP. +%% +%% The possible reasons for this AVP are the presence of an improperly +%% constructed AVP, an unsupported or unrecognized AVP, an invalid AVP +%% value, the omission of a required AVP, the presence of an explicitly +%% excluded AVP (see tables in Section 10), or the presence of two or +%% more occurrences of an AVP which is restricted to 0, 1, or 0-1 +%% occurrences. +%% +%% A Diameter message MAY contain one Failed-AVP AVP, containing the +%% entire AVP that could not be processed successfully. If the failure +%% reason is omission of a required AVP, an AVP with the missing AVP +%% code, the missing vendor id, and a zero filled payload of the minimum +%% required length for the omitted AVP will be added. + +%%% --------------------------------------------------------------------------- +%%% # handle_answer/3 +%%% --------------------------------------------------------------------------- + +%% Process an answer message in call-specific process. + +handle_answer(SvcName, _, {error, Req, Reason}) -> + handle_error(Req, Reason, SvcName); + +handle_answer(SvcName, + AnswerErrors, + {answer, #request{dictionary = Dict} = Req, Pkt}) -> + a(examine(diameter_codec:decode(Dict, Pkt)), + SvcName, + AnswerErrors, + Req). + +%% We don't really need to do a full decode if we're a relay and will +%% just resend with a new hop by hop identifier, but might a proxy +%% want to examine the answer? + +a(#diameter_packet{errors = []} + = Pkt, + SvcName, + AE, + #request{transport = TPid, + dictionary = Dict, + caps = Caps, + packet = P} + = Req) -> + try + incr(in, Pkt, Dict, TPid) + of + _ -> + cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]) + catch + exit: {invalid_error_bit, _} = E -> + e(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req) + end; + +a(#diameter_packet{} = Pkt, SvcName, AE, Req) -> + e(Pkt, SvcName, AE, Req). + +e(Pkt, SvcName, callback, #request{transport = TPid, + caps = Caps, + packet = Pkt} + = Req) -> + cb(Req, handle_answer, [Pkt, msg(Pkt), SvcName, {TPid, Caps}]); +e(Pkt, SvcName, report, Req) -> + x(errors, handle_answer, [SvcName, Req, Pkt]); +e(Pkt, SvcName, discard, Req) -> + x({errors, handle_answer, [SvcName, Req, Pkt]}). + +%% Note that we don't check that the application id in the answer's +%% header is what we expect. (TODO: Does the rfc says anything about +%% this?) + +%% incr/4 +%% +%% Increment a stats counter for an incoming or outgoing message. + +%% TODO: fix +incr(_, #diameter_packet{msg = undefined}, _, _) -> + ok; + +incr(Dir, Pkt, Dict, TPid) + when is_pid(TPid) -> + #diameter_packet{header = #diameter_header{is_error = E} + = Hdr, + msg = Rec} + = Pkt, + + D = choose(E, ?BASE, Dict), + RC = int(get_avp_value(D, 'Result-Code', Rec)), + PE = is_protocol_error(RC), + + %% Check that the E bit is set only for 3xxx result codes. + (not (E orelse PE)) + orelse (E andalso PE) + orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]), + + Ctr = rc_counter(D, Rec, RC), + is_tuple(Ctr) + andalso incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}). + +%% incr/2 + +incr(TPid, Counter) -> + diameter_stats:incr(Counter, TPid, 1). + +%% RFC 3588, 7.6: +%% +%% All Diameter answer messages defined in vendor-specific +%% applications MUST include either one Result-Code AVP or one +%% Experimental-Result AVP. +%% +%% Maintain statistics assuming one or the other, not both, which is +%% surely the intent of the RFC. + +rc_counter(_, _, RC) + when is_integer(RC) -> + {'Result-Code', RC}; +rc_counter(D, Rec, _) -> + rcc(get_avp_value(D, 'Experimental-Result', Rec)). + +%% Outgoing answers may be in any of the forms messages can be sent +%% in. Incoming messages will be records. We're assuming here that the +%% arity of the result code AVP's is 0 or 1. + +rcc([{_,_,RC} = T]) + when is_integer(RC) -> + T; +rcc({_,_,RC} = T) + when is_integer(RC) -> + T; +rcc(_) -> + undefined. + +int([N]) + when is_integer(N) -> + N; +int(N) + when is_integer(N) -> + N; +int(_) -> + undefined. + +is_protocol_error(RC) -> + 3000 =< RC andalso RC < 4000. + +-spec x(any(), atom(), list()) -> no_return(). + +%% Warn and exit request process on errors in an incoming answer. +x(Reason, F, A) -> + diameter_lib:warning_report(Reason, {?MODULE, F, A}), + x(Reason). + +x(T) -> + exit(T). + +%%% --------------------------------------------------------------------------- +%%% # failover/[23] +%%% --------------------------------------------------------------------------- + +%% Failover as a consequence of request_peer_down/2. +failover({_, #request{handler = Pid} = Req, TRef}, S) -> + Pid ! {failover, TRef, rt(Req, S)}. + +%% Failover as a consequence of store_request/4. +failover(TRef, Seqs, S) + when is_reference(TRef) -> + case lookup_request(Seqs, TRef) of + #request{} = Req -> + failover({Seqs, Req, TRef}, S); + false -> + ok + end. + +%% prepare_request returned a binary ... +rt(#request{packet = #diameter_packet{msg = undefined}}, _) -> + false; %% TODO: Not what we should do. + +%% ... or not. +rt(#request{packet = #diameter_packet{msg = Msg}, dictionary = D} = Req, S) -> + find_transport(get_destination(Msg, D), Req, S). + +%%% --------------------------------------------------------------------------- +%%% # report_status/5 +%%% --------------------------------------------------------------------------- + +report_status(Status, + #peer{ref = Ref, + conn = TPid, + type = Type, + options = Opts}, + #conn{apps = [_|_] = As, + caps = Caps}, + #state{service_name = SvcName} + = S, + Extra) -> + share_peer(Status, Caps, As, TPid, S), + Info = [Status, Ref, {TPid, Caps}, {type(Type), Opts} | Extra], + send_event(SvcName, list_to_tuple(Info)). + +%% send_event/2 + +send_event(SvcName, Info) -> + send_event(#diameter_event{service = SvcName, + info = Info}). + +send_event(#diameter_event{service = SvcName} = E) -> + lists:foreach(fun({_, Pid}) -> Pid ! E end, subscriptions(SvcName)). + +%%% --------------------------------------------------------------------------- +%%% # share_peer/5 +%%% --------------------------------------------------------------------------- + +share_peer(up, Caps, Aliases, TPid, #state{share_peers = true, + service_name = Svc}) -> + diameter_peer:notify(Svc, {peer, TPid, Aliases, Caps}); + +share_peer(_, _, _, _, _) -> + ok. + +%%% --------------------------------------------------------------------------- +%%% # share_peers/2 +%%% --------------------------------------------------------------------------- + +share_peers(Pid, #state{share_peers = true, + local_peers = PDict}) -> + ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict); + +share_peers(_, #state{share_peers = false}) -> + ok. + +sp(Pid, Alias, Peers) -> + lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers). + +%%% --------------------------------------------------------------------------- +%%% # remote_peer_up/4 +%%% --------------------------------------------------------------------------- + +remote_peer_up(Pid, Aliases, Caps, #state{use_shared_peers = true, + service = Svc, + shared_peers = PDict} + = S) -> + #diameter_service{applications = Apps} = Svc, + Update = lists:filter(fun(A) -> + lists:keymember(A, #diameter_app.alias, Apps) + end, + Aliases), + S#state{shared_peers = rpu(Pid, Caps, PDict, Update)}; + +remote_peer_up(_, _, _, #state{use_shared_peers = false} = S) -> + S. + +rpu(_, _, PDict, []) -> + PDict; +rpu(Pid, Caps, PDict, Aliases) -> + erlang:monitor(process, Pid), + T = {Pid, Caps}, + lists:foldl(fun(A,D) -> ?Dict:append(A, T, D) end, + PDict, + Aliases). + +%%% --------------------------------------------------------------------------- +%%% # remote_peer_down/2 +%%% --------------------------------------------------------------------------- + +remote_peer_down(Pid, #state{use_shared_peers = true, + shared_peers = PDict} + = S) -> + S#state{shared_peers = lists:foldl(fun(A,D) -> rpd(Pid, A, D) end, + PDict, + ?Dict:fetch_keys(PDict))}. + +rpd(Pid, Alias, PDict) -> + ?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict). + +%%% --------------------------------------------------------------------------- +%%% find_transport/[34] +%%% +%%% Output: {TransportPid, #diameter_caps{}, #diameter_app{}} +%%% | false +%%% --------------------------------------------------------------------------- + +%% Initial call, from an arbitrary process. +find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) -> + #diameter_service{applications = Apps} = Svc, + ft(find_send_app(Alias, Apps), Msg, Opts, S); + +%% Relay or proxy send. +find_transport(#diameter_app{} = App, Msg, Opts, S) -> + ft(App, Msg, Opts, S). + +ft(#diameter_app{module = Mod, dictionary = D} = App, Msg, Opts, S) -> + #options{filter = Filter, + extra = Xtra} + = Opts, + pick_peer(App#diameter_app{module = Mod ++ Xtra}, + get_destination(Msg, D), + Filter, + S); +ft(false = No, _, _, _) -> + No. + +%% This can't be used if we're a relay and sending a message +%% in an application not known locally. (TODO) +find_send_app(Alias, Apps) -> + case lists:keyfind(Alias, #diameter_app.alias, Apps) of + #diameter_app{id = ?APP_ID_RELAY} -> + false; + T -> + T + end. + +%% Retransmission, in the service process. +find_transport([_,_] = RH, + Req, + #state{service = #diameter_service{pid = Pid, + applications = Apps}} + = S) + when self() == Pid -> + #request{app = Alias, + filter = Filter, + module = ModX} + = Req, + #diameter_app{} + = App + = lists:keyfind(Alias, #diameter_app.alias, Apps), + + pick_peer(App#diameter_app{module = ModX}, + RH, + Filter, + S). + +%% get_destination/2 + +get_destination(Msg, Dict) -> + [str(get_avp_value(Dict, 'Destination-Realm', Msg)), + str(get_avp_value(Dict, 'Destination-Host', Msg))]. + +%% TODO: +%% +%% Should add some way of specifying destination directly so that the +%% only requirement is that the prepare_request callback returns +%% something specific. (eg. {host, DH}; that is, let the caller specify.) +%% +%% Also, there is no longer any need to call get_destination at all in +%% the default case. + +str(T) + when T == undefined; + T == [] -> + undefined; +str([X]) + when is_list(X) -> + X; +str(T) -> + T. + +%% get_avp_value/3 +%% +%% Support outgoing messages in one of three forms: +%% +%% - a message record (as generated from a .dia spec) or +%% - a list of an atom message name followed by 2-tuple, avp name/value pairs. +%% - a list of a #diameter_header{} followed by #diameter_avp{} records, +%% +%% In the first two forms a dictionary module is used at encode to +%% identify the type of the AVP and its arity in the message in +%% question. The third form allows messages to be sent as is, without +%% a dictionary, which is needed in the case of relay agents, for one. + +get_avp_value(Dict, Name, [#diameter_header{} | Avps]) -> + try + {Code, _, VId} = Dict:avp_header(Name), + [A|_] = lists:dropwhile(fun(#diameter_avp{code = C, vendor_id = V}) -> + C /= Code orelse V /= VId + end, + Avps), + avp_decode(Dict, Name, A) + catch + error: _ -> + undefined + end; + +get_avp_value(_, Name, [_MsgName | Avps]) -> + case lists:keyfind(Name, 1, Avps) of + {_, V} -> + V; + _ -> + undefined + end; + +get_avp_value(Dict, Name, Rec) + when is_tuple(Rec) -> + try + Dict:'#get-'(Name, Rec) + catch + error:_ -> + undefined + end. + +avp_decode(Dict, Name, #diameter_avp{value = undefined, + data = Bin}) -> + Dict:avp(decode, Bin, Name); +avp_decode(_, _, #diameter_avp{value = V}) -> + V. + +%%% --------------------------------------------------------------------------- +%%% # pick_peer(App, [DestRealm, DestHost], Filter, #state{}) +%%% +%%% Output: {TransportPid, #diameter_caps{}, App} +%%% | false +%%% | {error, Reason} +%%% --------------------------------------------------------------------------- + +%% Find transports to a given realm/host. + +pick_peer(#diameter_app{alias = Alias} + = App, + [_,_] = RH, + Filter, + #state{local_peers = L, + shared_peers = S, + service_name = SvcName, + service = #diameter_service{pid = Pid}}) -> + pick_peer(peers(Alias, RH, Filter, L), + peers(Alias, RH, Filter, S), + Pid, + SvcName, + App). + +%% pick_peer/5 + +pick_peer([], [], _, _, _) -> + false; + +%% App state is mutable but we're not in the service process: go there. +pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App) + when self() /= Pid -> + call_service(Pid, {pick_peer, Local, Remote, App}); + +%% App state isn't mutable or it is and we're in the service process: +%% do the deed. +pick_peer(Local, + Remote, + _Pid, + SvcName, + #diameter_app{module = ModX, + alias = Alias, + init_state = S, + mutable = M} + = App) -> + MFA = {ModX, pick_peer, [Local, Remote, SvcName]}, + + try state_cb(App, MFA) of + {ok, {TPid, #diameter_caps{} = Caps}} when is_pid(TPid) -> + {TPid, Caps, App}; + {{TPid, #diameter_caps{} = Caps}, ModS} when is_pid(TPid), M -> + mod_state(Alias, ModS), + {TPid, Caps, App}; + {false = No, ModS} when M -> + mod_state(Alias, ModS), + No; + {ok, false = No} -> + No; + false = No -> + No; + {{TPid, #diameter_caps{} = Caps}, S} when is_pid(TPid) -> + {TPid, Caps, App}; %% Accept returned state in the immutable + {false = No, S} -> %% case as long it isn't changed. + No; + T -> + diameter_lib:error_report({invalid, T, App}, MFA) + catch + E: Reason -> + diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA) + end. + +%% peers/4 + +peers(Alias, RH, Filter, Peers) -> + case ?Dict:find(Alias, Peers) of + {ok, L} -> + ps(L, RH, Filter, {[],[]}); + error -> + [] + end. + +%% Place a peer whose Destination-Host/Realm matches those of the +%% request at the front of the result list. + +ps([], _, _, {Ys, Ns}) -> + lists:reverse(Ys, Ns); +ps([{_TPid, #diameter_caps{} = Caps} = TC | Rest], RH, Filter, Acc) -> + ps(Rest, RH, Filter, pacc(caps_filter(Caps, RH, Filter), + caps_filter(Caps, RH, {all, [host, realm]}), + TC, + Acc)). + +pacc(true, true, TC, {Ts, Fs}) -> + {[TC|Ts], Fs}; +pacc(true, false, TC, {Ts, Fs}) -> + {Ts, [TC|Fs]}; +pacc(false, _, _, Acc) -> + Acc. + +%% caps_filter/3 + +caps_filter(C, RH, {neg, F}) -> + not caps_filter(C, RH, F); + +caps_filter(C, RH, {all, L}) -> + lists:all(fun(F) -> caps_filter(C, RH, F) end, L); + +caps_filter(C, RH, {any, L}) -> + lists:any(fun(F) -> caps_filter(C, RH, F) end, L); + +caps_filter(#diameter_caps{origin_host = {_,H}}, [_,DH], host) -> + eq(undefined, DH, H); + +caps_filter(#diameter_caps{origin_realm = {_,R}}, [DR,_], realm) -> + eq(undefined, DR, R); + +caps_filter(C, _, Filter) -> + caps_filter(C, Filter). + +%% caps_filter/2 + +caps_filter(_, none) -> + true; + +caps_filter(#diameter_caps{origin_host = {_,OH}}, {host, H}) -> + eq(any, H, OH); + +caps_filter(#diameter_caps{origin_realm = {_,OR}}, {realm, R}) -> + eq(any, R, OR); + +caps_filter(C, T) -> + try + {eval, F} = T, + diameter_lib:eval([F,C]) + catch + _:_ -> false + end. + +eq(X, A, B) -> + X == A orelse A == B. + +%% transports/1 + +transports(#state{peerT = PeerT}) -> + ets:select(PeerT, [{#peer{conn = '$1', _ = '_'}, + [{'is_pid', '$1'}], + ['$1']}]). + +%%% --------------------------------------------------------------------------- +%%% # service_info/2 +%%% --------------------------------------------------------------------------- + +%% The config passed to diameter:start_service/2. +-define(CAP_INFO, ['Origin-Host', + 'Origin-Realm', + 'Vendor-Id', + 'Product-Name', + 'Origin-State-Id', + 'Host-IP-Address', + 'Supported-Vendor-Id', + 'Auth-Application-Id', + 'Inband-Security-Id', + 'Acct-Application-Id', + 'Vendor-Specific-Application-Id', + 'Firmware-Revision']). + +-define(ALL_INFO, [capabilities, + applications, + transport, + pending, + statistics]). + +service_info(Items, S) + when is_list(Items) -> + [{complete(I), service_info(I,S)} || I <- Items]; +service_info(Item, S) + when is_atom(Item) -> + service_info(Item, S, true). + +service_info(Item, #state{service = Svc} = S, Complete) -> + case Item of + name -> + S#state.service_name; + 'Origin-Host' -> + (Svc#diameter_service.capabilities) + #diameter_caps.origin_host; + 'Origin-Realm' -> + (Svc#diameter_service.capabilities) + #diameter_caps.origin_realm; + 'Vendor-Id' -> + (Svc#diameter_service.capabilities) + #diameter_caps.vendor_id; + 'Product-Name' -> + (Svc#diameter_service.capabilities) + #diameter_caps.product_name; + 'Origin-State-Id' -> + (Svc#diameter_service.capabilities) + #diameter_caps.origin_state_id; + 'Host-IP-Address' -> + (Svc#diameter_service.capabilities) + #diameter_caps.host_ip_address; + 'Supported-Vendor-Id' -> + (Svc#diameter_service.capabilities) + #diameter_caps.supported_vendor_id; + 'Auth-Application-Id' -> + (Svc#diameter_service.capabilities) + #diameter_caps.auth_application_id; + 'Inband-Security-Id' -> + (Svc#diameter_service.capabilities) + #diameter_caps.inband_security_id; + 'Acct-Application-Id' -> + (Svc#diameter_service.capabilities) + #diameter_caps.acct_application_id; + 'Vendor-Specific-Application-Id' -> + (Svc#diameter_service.capabilities) + #diameter_caps.vendor_specific_application_id; + 'Firmware-Revision' -> + (Svc#diameter_service.capabilities) + #diameter_caps.firmware_revision; + capabilities -> service_info(?CAP_INFO, S); + applications -> info_apps(S); + transport -> info_transport(S); + pending -> info_pending(S); + statistics -> info_stats(S); + keys -> ?ALL_INFO ++ ?CAP_INFO; %% mostly for test + all -> service_info(?ALL_INFO, S); + _ when Complete -> service_info(complete(Item), S, false); + _ -> undefined + end. + +complete(Pre) -> + P = atom_to_list(Pre), + case [I || I <- [name | ?ALL_INFO] ++ ?CAP_INFO, + lists:prefix(P, atom_to_list(I))] + of + [I] -> I; + _ -> Pre + end. + +info_stats(#state{peerT = PeerT}) -> + Peers = ets:select(PeerT, [{#peer{ref = '$1', conn = '$2', _ = '_'}, + [{'is_pid', '$2'}], + [['$1', '$2']]}]), + diameter_stats:read(lists:append(Peers)). +%% TODO: include peer identities in return value + +info_transport(#state{peerT = PeerT, connT = ConnT}) -> + dict:fold(fun it/3, + [], + ets:foldl(fun(T,A) -> it_acc(ConnT, A, T) end, + dict:new(), + PeerT)). + +it(Ref, [[{type, connect} | _] = L], Acc) -> + [[{ref, Ref} | L] | Acc]; +it(Ref, [[{type, accept}, {options, Opts} | _] | _] = L, Acc) -> + [[{ref, Ref}, + {type, listen}, + {options, Opts}, + {accept, [lists:nthtail(2,A) || A <- L]}] + | Acc]. +%% Each entry has the same Opts. (TODO) + +it_acc(ConnT, Acc, #peer{pid = Pid, + type = Type, + ref = Ref, + options = Opts, + op_state = OS, + started = T, + conn = TPid}) -> + dict:append(Ref, + [{type, Type}, + {options, Opts}, + {watchdog, {Pid, T, OS}} + | info_conn(ConnT, TPid)], + Acc). + +info_conn(ConnT, TPid) -> + info_conn(ets:lookup(ConnT, TPid)). + +info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) -> + [{peer, {Pid, T}}, + {apps, SApps}, + {caps, info_caps(Caps)}]; +info_conn([] = No) -> + No. + +info_caps(#diameter_caps{} = C) -> + lists:zip(record_info(fields, diameter_caps), tl(tuple_to_list(C))). + +info_apps(#state{service = #diameter_service{applications = Apps}}) -> + lists:map(fun mk_app/1, Apps). + +mk_app(#diameter_app{alias = Alias, + dictionary = Dict, + module = ModX, + id = Id}) -> + [{alias, Alias}, + {dictionary, Dict}, + {module, ModX}, + {id, Id}]. + +info_pending(#state{} = S) -> + MatchSpec = [{{'$1', + #request{transport = '$2', + from = '$3', + app = '$4', + _ = '_'}, + '_'}, + [?ORCOND([{'==', T, '$2'} || T <- transports(S)])], + [{{'$1', [{{app, '$4'}}, + {{transport, '$2'}}, + {{from, '$3'}}]}}]}], + + ets:select(?REQUEST_TABLE, MatchSpec). |