diff options
author | Anders Svensson <[email protected]> | 2013-02-06 18:24:26 +0100 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2013-02-08 19:28:47 +0100 |
commit | e49e7acc2af99bbd06fa028168f9e1549cc5c658 (patch) | |
tree | ff94bd6cdddeadbe6aa5b0bb1182a5564088b554 /lib/diameter/src/base/diameter_service.erl | |
parent | 1e6f879d41fd023f815609268447e763a9206396 (diff) | |
download | otp-e49e7acc2af99bbd06fa028168f9e1549cc5c658.tar.gz otp-e49e7acc2af99bbd06fa028168f9e1549cc5c658.tar.bz2 otp-e49e7acc2af99bbd06fa028168f9e1549cc5c658.zip |
Split message handling in diameter_service into diameter_traffic
Traffic handling is connected to the service implementation through the
pick_peer callback and failover but diameter_service was getting
unwieldy as home to both the service process and traffic handling.
Diffstat (limited to 'lib/diameter/src/base/diameter_service.erl')
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 1948 |
1 files changed, 207 insertions, 1741 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 5015908582..a4b788af4e 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -24,29 +24,33 @@ -module(diameter_service). -behaviour(gen_server). +%% towards diameter_service_sup +-export([start_link/1]). + +%% towards diameter +-export([subscribe/1, + unsubscribe/1, + services/0, + info/2]). + +%% towards diameter_config -export([start/1, stop/1, start_transport/2, - stop_transport/2, - info/2, - call/4]). + stop_transport/2]). -%% towards diameter_watchdog --export([receive_message/4]). +%% towards diameter_peer +-export([whois/1]). -%% service supervisor --export([start_link/1]). +%% towards diameter_traffic +-export([find_incoming_app/4, + pick_peer/3]). --export([subscribe/1, - unsubscribe/1, +%% test/debug +-export([services/1, subscriptions/1, subscriptions/0, - services/0, - services/1, - whois/1]). - -%% test/debug --export([call_module/3, + call_module/3, state/1, uptime/1]). @@ -58,9 +62,6 @@ terminate/2, code_change/3]). -%% Other callbacks. --export([send/1]). - -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). @@ -78,12 +79,8 @@ | ?WD_REOPEN. -define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1 --define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests -define(RESTART_TC, 1000). %% if restart was this recent --define(RELAY, ?DIAMETER_DICT_RELAY). --define(BASE, ?DIAMETER_DICT_COMMON). - %% Used to be able to swap this with anything else dict-like but now %% rely on the fact that a service's #state{} record does not change %% in storing in it ?STATE table and not always going through the @@ -91,10 +88,6 @@ %% a ?Dict don't change the handle to it. -define(Dict, diameter_dict). -%% Table containing outgoing requests for which a reply has yet to be -%% received. --define(REQUEST_TABLE, diameter_request). - %% Maintains state in a table. In contrast to previously, a service's %% stat is not constant and is accessed outside of the service %% process. @@ -108,9 +101,14 @@ %% Workaround for dialyzer's lack of understanding of match specs. -type match(T) - :: T | '_' | '$1' | '$2' | '$3' | '$4'. - -%% State of service gen_server. + :: T | '_' | '$1' | '$2'. + +%% State of service gen_server. Note that the state term itself +%% doesn't change, which is relevant for the stateless application +%% callbacks since the state is retrieved from ?STATE_TABLE from +%% outside the service process. The pid in the service record is used +%% to determine whether or not we need to call the process for a +%% pick_peer callback in the statefull case. -record(state, {id = now(), service_name :: diameter:service_name(), %% key in ?STATE_TABLE @@ -119,20 +117,17 @@ :: ets:tid(), peerT = ets_new(peers) %% #peer{pid = TPid} at okay/reopen :: ets:tid(), - shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] - local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] + shared_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...] + :: ets:tid(), + local_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...] + :: ets:tid(), monitor = false :: false | pid(), %% process to die with options :: [{sequence, diameter:sequence()} %% sequence mask | {restrict_connections, diameter:restriction()} | {share_peers, boolean()} %% broadcast peers to remote nodes? | {use_shared_peers, boolean()}]}).%% use broadcasted peers? -%% shared_peers reflects the peers broadcast from remote nodes. Note -%% that the state term itself doesn't change, which is relevant for -%% the stateless application callbacks since the state is retrieved -%% from ?STATE_TABLE from outside the service process. The pid in the -%% service record is used to determine whether or not we need to call -%% the process for a pick_peer callback. +%% shared_peers reflects the peers broadcast from remote nodes. %% Record representing an RFC 3539 watchdog process implemented by %% diameter_watchdog. @@ -155,29 +150,6 @@ started = now(), %% at process start watchdog :: pid()}). %% key into watchdogT -%% Record stored in diameter_request for each outgoing request. --record(request, - {ref :: match(reference()), %% used to receive answer - caller :: match(pid()), %% calling process - handler :: match(pid()), %% request process - transport :: match(pid()), %% peer process - caps :: match(#diameter_caps{}), %% of connection - packet :: match(#diameter_packet{})}). %% of request - -%% Record call/4 options are parsed into. --record(options, - {filter = none :: diameter:peer_filter(), - extra = [] :: list(), - timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF, - detach = false :: boolean()}). - -%% Term passed back to receive_message/4 with every incoming message. --record(recvdata, - {peerT :: ets:tid(), - service_name :: diameter:service_name(), - apps :: [#diameter_app{}], - sequence :: diameter:sequence()}). - %% --------------------------------------------------------------------------- %% # start/1 %% --------------------------------------------------------------------------- @@ -230,159 +202,17 @@ stop_transport(SvcName, [_|_] = Refs) -> %% --------------------------------------------------------------------------- info(SvcName, Item) -> - case find_state(SvcName) of - #state{} = S -> + case lookup_state(SvcName) of + [#state{} = S] -> service_info(Item, S); - false -> + [] -> undefined end. -%% --------------------------------------------------------------------------- -%% # receive_message/4 -%% --------------------------------------------------------------------------- - -%% Handle an incoming Diameter message in the watchdog process. This -%% used to come through the service process but this avoids that -%% becoming a bottleneck. - -receive_message(TPid, Pkt, Dict0, RecvData) - when is_pid(TPid) -> - #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, - recv(R, - (not R) andalso lookup_request(Pkt, TPid), - TPid, - Pkt, - Dict0, - RecvData). - -%% Incoming request ... -recv(true, false, TPid, Pkt, Dict0, RecvData) -> - try - spawn(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end) - catch - error: system_limit = E -> %% discard - ?LOG({error, E}, now()) - end; - -%% ... answer to known request ... -recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> - Pid ! {answer, Ref, Req, Dict0, Pkt}; -%% Note that failover could have happened prior to this message being -%% received and triggering failback. That is, both a failover message -%% and answer may be on their way to the handler process. In the worst -%% case the request process gets notification of the failover and -%% sends to the alternate peer before an answer arrives, so it's -%% always the case that we can receive more than one answer after -%% failover. The first answer received by the request process wins, -%% any others are discarded. - -%% ... or not. -recv(false, false, _, _, _, _) -> - ok. - -%% --------------------------------------------------------------------------- -%% # call/4 -%% --------------------------------------------------------------------------- - -call(SvcName, App, Msg, Options) - when is_list(Options) -> - Rec = make_options(Options), - Ref = make_ref(), - Caller = {self(), Ref}, - Fun = fun() -> exit({Ref, call(SvcName, App, Msg, Rec, Caller)}) end, - try spawn_monitor(Fun) of - {_, MRef} -> - recv(MRef, Ref, Rec#options.detach, false) - catch - error: system_limit = E -> - {error, E} - end. - -%% Don't rely on gen_server:call/3 for the timeout handling since it -%% makes no guarantees about not leaving a reply message in the -%% mailbox if we catch its exit at timeout. It currently *can* do so, -%% which is also undocumented. - -recv(MRef, _, true, true) -> - erlang:demonitor(MRef, [flush]), - ok; - -recv(MRef, Ref, Detach, Sent) -> - receive - Ref -> %% send has been attempted - recv(MRef, Ref, Detach, true); - {'DOWN', MRef, process, _, Reason} -> - call_rc(Reason, Ref, Sent) - end. - -%% call/5 has returned ... -call_rc({Ref, Ans}, Ref, _) -> - Ans; - -%% ... or not. In this case failure/encode are documented. -call_rc(_, _, Sent) -> - {error, choose(Sent, failure, encode)}. - -%% call/5 -%% -%% In the process spawned for the outgoing request. - -call(SvcName, App, Msg, Opts, Caller) -> - c(find_state(SvcName), App, Msg, Opts, Caller). - -c(#state{service_name = SvcName, options = [{_, Mask} | _]} = S, - App, - Msg, - Opts, - Caller) -> - case find_transport(App, Msg, Opts, S) of - {_,_,_} = T -> - send_request(T, Mask, Msg, Opts, Caller, SvcName); - false -> - {error, no_connection}; - {error, _} = No -> - No - end; - -c(false, _, _, _, _) -> - {error, no_service}. - -%% find_state/1 - -find_state(SvcName) -> - fs(ets:lookup(?STATE_TABLE, SvcName)). - -fs([#state{} = S]) -> - S; - -fs([]) -> - false. - -%% make_options/1 - -make_options(Options) -> - lists:foldl(fun mo/2, #options{}, Options). - -mo({timeout, T}, Rec) - when is_integer(T), 0 =< T -> - Rec#options{timeout = T}; - -mo({filter, F}, #options{filter = none} = Rec) -> - Rec#options{filter = F}; -mo({filter, F}, #options{filter = {all, Fs}} = Rec) -> - Rec#options{filter = {all, [F | Fs]}}; -mo({filter, F}, #options{filter = F0} = Rec) -> - Rec#options{filter = {all, [F0, F]}}; - -mo({extra, L}, #options{extra = X} = Rec) - when is_list(L) -> - Rec#options{extra = X ++ L}; - -mo(detach, Rec) -> - Rec#options{detach = true}; +%% lookup_state/1 -mo(T, _) -> - ?ERROR({invalid_option, T}). +lookup_state(SvcName) -> + ets:lookup(?STATE_TABLE, SvcName). %% --------------------------------------------------------------------------- %% # subscribe/1 @@ -422,6 +252,78 @@ whois(SvcName) -> undefined end. +%% --------------------------------------------------------------------------- +%% # pick_peer/3 +%% --------------------------------------------------------------------------- + +-spec pick_peer(SvcName, AppOrAlias, Opts) + -> {{TPid, Caps, App}, Mask} + | false + | {error, term()} + when SvcName :: diameter:service_name(), + AppOrAlias :: {alias, diameter:app_alias()} | #diameter_app{}, + Opts :: tuple(), + TPid :: pid(), + Caps :: #diameter_caps{}, + App :: #diameter_app{}, + Mask :: diameter:sequence(). + +pick_peer(SvcName, App, Opts) -> + pick(lookup_state(SvcName), App, Opts). + +pick([], _, _) -> + {error, no_service}; + +pick([S], App, Opts) -> + pick(S, App, Opts); + +pick(#state{service = #diameter_service{applications = Apps}} + = S, + {alias, Alias}, + Opts) -> %% initial call from diameter:call/4 + pick(S, find_outgoing_app(Alias, Apps), Opts); + +pick(_, false, _) -> + false; + +pick(#state{options = [{_, Mask} | _]} + = S, + #diameter_app{module = ModX, dictionary = Dict} + = App0, + {DestF, Filter, Xtra}) -> + App = App0#diameter_app{module = ModX ++ Xtra}, + [_,_] = RealmAndHost = diameter_lib:eval([DestF, Dict]), + case pick_peer(App, RealmAndHost, Filter, S) of + {TPid, Caps} -> + {{TPid, Caps, App}, Mask}; + false = No -> + No + end. + +%% --------------------------------------------------------------------------- +%% # find_incoming_app/4 +%% --------------------------------------------------------------------------- + +-spec find_incoming_app(PeerT, TPid, Id, Apps) + -> {#diameter_app{}, #diameter_caps{}} %% connection and suitable app + | #diameter_caps{} %% connection but no suitable app + | false %% no connection + when PeerT :: ets:tid(), + TPid :: pid(), + Id :: non_neg_integer(), + Apps :: [#diameter_app{}]. + +find_incoming_app(PeerT, TPid, Id, Apps) -> + try ets:lookup(PeerT, TPid) of + [#peer{} = P] -> + find_incoming_app(P, Id, Apps); + [] -> %% transport has gone down + false + catch + error: badarg -> %% service has gone down (and taken table with it) + false + end. + %% =========================================================================== %% =========================================================================== @@ -580,13 +482,6 @@ transition({tc_timeout, T}, S) -> tc_timeout(T, S), ok; -%% Request process is telling us it may have missed a failover message -%% after a transport went down and the service process looked up -%% outstanding requests. -transition({failover, TRef, Seqs}, _) -> - failover(TRef, Seqs), - ok; - transition(Req, S) -> unexpected(handle_info, [Req], S), ok. @@ -630,8 +525,6 @@ unexpected(F, A, #state{service_name = Name}) -> ?UNEXPECTED(F, A ++ [Name]). cb(#diameter_app{module = [_|_] = M}, F, A) -> - eval(M, F, A); -cb([_|_] = M, F, A) -> eval(M, F, A). eval([M|X], F, A) -> @@ -639,14 +532,12 @@ eval([M|X], F, A) -> %% Callback with state. -state_cb(#diameter_app{mutable = false, init_state = S}, {ModX, F, A}) -> +state_cb(#diameter_app{module = ModX, mutable = false, init_state = S}, + pick_peer = F, + A) -> eval(ModX, F, A ++ [S]); -state_cb(#diameter_app{mutable = true, alias = Alias}, {_,_,_} = MFA) -> - state_cb(MFA, Alias); - -state_cb({ModX,F,A}, Alias) - when is_list(ModX) -> +state_cb(#diameter_app{module = ModX, alias = Alias}, F, A) -> eval(ModX, F, A ++ [mod_state(Alias)]). choose(true, X, _) -> X; @@ -840,12 +731,11 @@ start(Ref, Type, Opts, #state{watchdogT = WatchdogT, #diameter_service{applications = Apps} = Svc = merge_service(Opts, Svc0), - Pid = s(Type, Ref, {#recvdata{service_name = SvcName, - peerT = PeerT, - apps = Apps, - sequence - = {_,_} - = proplists:get_value(sequence, SvcOpts)}, + {_,_} = Mask = proplists:get_value(sequence, SvcOpts), + Pid = s(Type, Ref, {diameter_traffic:make_recvdata([SvcName, + PeerT, + Apps, + Mask]), Opts, SvcOpts, Svc}), @@ -1001,7 +891,7 @@ connection_up(Extra, service = #diameter_service{applications = Apps}} = S) -> insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}), - request_peer_up(TPid), + diameter_traffic:peer_up(TPid), insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), report_status(up, Wd, Pr, S, Extra). @@ -1013,13 +903,57 @@ ilp({Id, Alias}, {TC, SA}, LDict) -> ?Dict:append(Alias, TC, LDict). init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) -> - #diameter_app{module = ModX, - id = Id} %% assert + #diameter_app{id = Id} %% assert + = App = find_app(Alias, Apps), - peer_cb({ModX, peer_up, [SvcName, TC]}, Alias) + peer_cb(App, peer_up, [SvcName, TC]) orelse exit(TPid, kill). %% fake transport failure +%% --------------------------------------------------------------------------- +%% # find_incoming_app/3 +%% --------------------------------------------------------------------------- + +%% No one should be sending the relay identifier. +find_incoming_app(#peer{caps = Caps}, ?APP_ID_RELAY, _) -> + Caps; + +find_incoming_app(Peer, Id, Apps) + when is_integer(Id) -> + find_incoming_app(Peer, [Id, ?APP_ID_RELAY], Apps); + +%% Note that the apps represented in SApps may be a strict subset of +%% those in Apps. +find_incoming_app(#peer{apps = SApps, caps = Caps}, Ids, Apps) -> + case keyfind(Ids, 1, SApps) of + {_Id, Alias} -> + {#diameter_app{} = find_app(Alias, Apps), Caps}; + false -> + Caps + end. + +%% keyfind/3 + +keyfind([], _, _) -> + false; +keyfind([Key | Rest], Pos, L) -> + case lists:keyfind(Key, Pos, L) of + false -> + keyfind(Rest, Pos, L); + T -> + T + end. + +%% find_outgoing_app/2 + +find_outgoing_app(Alias, Apps) -> + case find_app(Alias, Apps) of + #diameter_app{id = ?APP_ID_RELAY} -> + false; + A -> + A + end. + %% find_app/2 find_app(Alias, Apps) -> @@ -1027,14 +961,15 @@ find_app(Alias, Apps) -> %% Don't bring down the service (and all associated connections) %% regardless of what happens. -peer_cb(MFA, Alias) -> - try state_cb(MFA, Alias) of +peer_cb(App, F, A) -> + try state_cb(App, F, A) of ModS -> - mod_state(Alias, ModS), + mod_state(App#diameter_app.alias, ModS), true catch E:R -> - diameter_lib:error_report({failure, {E, R, Alias, ?STACK}}, MFA), + diameter_lib:error_report({failure, {E, R, ?STACK}}, + {App, F, A}), false end. @@ -1054,7 +989,7 @@ connection_down(#watchdog{state = ?WD_OKAY, = S) -> report_status(down, Wd, Pr, S, []), remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), - request_peer_down(TPid); + diameter_traffic:peer_down(TPid); connection_down(#watchdog{}, #peer{}, _) -> ok; @@ -1081,11 +1016,11 @@ rlp({Id, Alias}, {TC, SA}, LDict) -> ?Dict:store(Alias, lists:delete(TC, L), LDict). down_conn(Id, Alias, TC, {SvcName, Apps}) -> - #diameter_app{module = ModX, - id = Id} %% assert + #diameter_app{id = Id} %% assert + = App = find_app(Alias, Apps), - peer_cb({ModX, peer_down, [SvcName, TC]}, Alias). + peer_cb(App, peer_down, [SvcName, TC]). %% --------------------------------------------------------------------------- %% # watchdog_down/2 @@ -1253,10 +1188,10 @@ call_module(Mod, Req, From, #state{service {reply, {error, Reason}, S} end. -cm([#diameter_app{module = ModX, alias = Alias}], Req, From, Svc) -> - MFA = {ModX, handle_call, [Req, From, Svc]}, +cm([#diameter_app{alias = Alias} = App], Req, From, Svc) -> + Args = [Req, From, Svc], - try state_cb(MFA, Alias) of + try state_cb(App, handle_call, Args) of {noreply = T, ModS} -> mod_state(Alias, ModS), T; @@ -1264,11 +1199,13 @@ cm([#diameter_app{module = ModX, alias = Alias}], Req, From, Svc) -> mod_state(Alias, ModS), {T, RC}; T -> - diameter_lib:error_report({invalid, T}, MFA), + diameter_lib:error_report({invalid, T}, + {App, handle_call, Args}), invalid catch E: Reason -> - diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA), + diameter_lib:error_report({failure, {E, Reason, ?STACK}}, + {App, handle_call, Args}), failure end; @@ -1279,1335 +1216,6 @@ cm([_,_|_], _, _, _) -> multiple. %% --------------------------------------------------------------------------- -%% # send_request/6 -%% --------------------------------------------------------------------------- - -%% Send an outgoing request in its dedicated process. -%% -%% Note that both encode of the outgoing request and of the received -%% answer happens in this process. It's also this process that replies -%% to the caller. The service process only handles the state-retaining -%% callbacks. -%% -%% The module field of the #diameter_app{} here includes any extra -%% arguments passed to diameter:call/4. - -send_request({TPid, Caps, App} - = Transport, - Mask, - Msg, - Opts, - Caller, - SvcName) -> - Pkt = make_prepare_packet(Mask, Msg), - - send_req(cb(App, prepare_request, [Pkt, SvcName, {TPid, Caps}]), - Pkt, - Transport, - Opts, - Caller, - SvcName, - []). - -send_req({send, Msg}, Pkt, Transport, Opts, Caller, SvcName, Fs) -> - send_req(make_request_packet(Msg, Pkt), - Transport, - Opts, - Caller, - SvcName, - Fs); - -send_req({discard, Reason} , _, _, _, _, _, _) -> - {error, Reason}; - -send_req(discard, _, _, _, _, _, _) -> - {error, discarded}; - -send_req({eval_packet, RC, F}, Pkt, T, Opts, Caller, SvcName, Fs) -> - send_req(RC, Pkt, T, Opts, Caller, SvcName, [F|Fs]); - -send_req(E, _, {_, _, App}, _, _, _, _) -> - ?ERROR({invalid_return, prepare_request, App, E}). - -%% make_prepare_packet/2 -%% -%% Turn an outgoing request as passed to call/4 into a diameter_packet -%% record in preparation for a prepare_request callback. - -make_prepare_packet(_, Bin) - when is_binary(Bin) -> - #diameter_packet{header = diameter_codec:decode_header(Bin), - bin = Bin}; - -make_prepare_packet(Mask, #diameter_packet{msg = [#diameter_header{} = Hdr - | Avps]} - = Pkt) -> - Pkt#diameter_packet{msg = [make_prepare_header(Mask, Hdr) | Avps]}; - -make_prepare_packet(Mask, #diameter_packet{header = Hdr} = Pkt) -> - Pkt#diameter_packet{header = make_prepare_header(Mask, Hdr)}; - -make_prepare_packet(Mask, Msg) -> - make_prepare_packet(Mask, #diameter_packet{msg = Msg}). - -%% make_prepare_header/2 - -make_prepare_header(Mask, undefined) -> - Seq = diameter_session:sequence(Mask), - make_prepare_header(#diameter_header{end_to_end_id = Seq, - hop_by_hop_id = Seq}); - -make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined, - hop_by_hop_id = undefined} - = H) -> - Seq = diameter_session:sequence(Mask), - make_prepare_header(H#diameter_header{end_to_end_id = Seq, - hop_by_hop_id = Seq}); - -make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined} = H) -> - Seq = diameter_session:sequence(Mask), - make_prepare_header(H#diameter_header{end_to_end_id = Seq}); - -make_prepare_header(Mask, #diameter_header{hop_by_hop_id = undefined} = H) -> - Seq = diameter_session:sequence(Mask), - make_prepare_header(H#diameter_header{hop_by_hop_id = Seq}); - -make_prepare_header(_, Hdr) -> - make_prepare_header(Hdr). - -%% make_prepare_header/1 - -make_prepare_header(#diameter_header{version = undefined} = Hdr) -> - make_prepare_header(Hdr#diameter_header{version = ?DIAMETER_VERSION}); - -make_prepare_header(#diameter_header{} = Hdr) -> - Hdr; - -make_prepare_header(T) -> - ?ERROR({invalid_header, T}). - -%% make_request_packet/2 -%% -%% Reconstruct a diameter_packet from the return value of -%% prepare_request or prepare_retransmit callback. - -make_request_packet(Bin, _) - when is_binary(Bin) -> - make_prepare_packet(false, Bin); - -make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]} - = Pkt, - _) -> - Pkt; - -%% Returning a diameter_packet with no header from a prepare_request -%% or prepare_retransmit callback retains the header passed into it. -%% This is primarily so that the end to end and hop by hop identifiers -%% are retained. -make_request_packet(#diameter_packet{header = Hdr} = Pkt, - #diameter_packet{header = Hdr0}) -> - Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)}; - -make_request_packet(Msg, Pkt) -> - Pkt#diameter_packet{msg = Msg}. - -%% fold_record/2 - -fold_record(undefined, R) -> - R; -fold_record(Rec, R) -> - diameter_lib:fold_tuple(2, Rec, R). - -%% send_req/6 - -send_req(Pkt0, - {TPid, Caps, #diameter_app{dictionary = Dict} = App}, - Opts, - {Pid, Ref}, - SvcName, - Fs) -> - Pkt = encode(Dict, Pkt0, Fs), - - #options{timeout = Timeout} - = Opts, - - Req = #request{ref = Ref, - caller = Pid, - handler = self(), - transport = TPid, - caps = Caps, - packet = Pkt0}, - - try - TRef = send_request(TPid, Pkt, Req, SvcName, Timeout), - Pid ! Ref, %% tell caller a send has been attempted - handle_answer(SvcName, - App, - recv_answer(Timeout, SvcName, App, Opts, {TRef, Req})) - after - erase_requests(Pkt) - end. - -%% recv_answer/5 - -recv_answer(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} - = Req}) -> - %% Matching on TRef below ensures we ignore messages that pertain - %% to a previous transport prior to failover. The answer message - %% includes the #request{} since it's not necessarily Req; that - %% is, from the last peer to which we've transmitted. - receive - {answer = A, Ref, Rq, Dict0, Pkt} -> %% Answer from peer - {A, Rq, Dict0, Pkt}; - {timeout = Reason, TRef, _} -> %% No timely reply - {error, Req, Reason}; - {failover, TRef} -> %% Service says peer has gone down - retransmit(Req, App, Opts, find_state(SvcName), Timeout) - end. - -%% Note that failover starts a new timer and that expiry of an old -%% timer value is ignored. This means that an answer could be accepted -%% from a peer after timeout in the case of failover. - -retransmit(Req, App, Opts, #state{service_name = SvcName} = S, Timeout) -> - rt(find_transport(Req, App, Opts, S), Req, Opts, SvcName, Timeout); - -retransmit(Req, _, _, false, _) -> %% service has gone down - {error, Req, failover}. - -rt({_,_,App} = Transport, Req, Opts, SvcName, Timeout) -> - try retransmit(Transport, Req, SvcName, Timeout) of - T -> recv_answer(Timeout, SvcName, App, Opts, T) - catch - ?FAILURE(Reason) -> {error, Req, Reason} - end; - -rt(_, Req, _, _, _) -> %% no alternate peer - {error, Req, failover}. - -%% handle_error/4 - -handle_error(App, - #request{packet = Pkt, - transport = TPid, - caps = Caps}, - Reason, - SvcName) -> - cb(App, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]). - -msg(#diameter_packet{msg = undefined, bin = Bin}) -> - Bin; -msg(#diameter_packet{msg = Msg}) -> - Msg. - -%% encode/3 - -encode(Dict, Pkt, Fs) -> - P = encode(Dict, Pkt), - eval_packet(P, Fs), - P. - -%% encode/2 - -%% Note that prepare_request can return a diameter_packet containing a -%% header or transport_data. Even allow the returned record to contain -%% an encoded binary. This isn't the usual case and doesn't properly -%% support retransmission but is useful for test. - -%% A message to be encoded. -encode(Dict, #diameter_packet{bin = undefined} = Pkt) -> - diameter_codec:encode(Dict, Pkt); - -%% An encoded binary: just send. -encode(_, #diameter_packet{} = Pkt) -> - Pkt. - -%% send_request/5 - -send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, SvcName, Timeout) - when node() == node(TPid) -> - %% Store the outgoing request before sending to avoid a race with - %% reply reception. - TRef = store_request(TPid, Bin, Req, Timeout, SvcName), - send(TPid, Pkt), - TRef; - -%% Send using a remote transport: spawn a process on the remote node -%% to relay the answer. -send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) -> - TRef = erlang:start_timer(Timeout, self(), timeout), - T = {TPid, Pkt, Req, SvcName, Timeout, TRef}, - spawn(node(TPid), ?MODULE, send, [T]), - TRef. - -%% send/1 - -send({TPid, Pkt, #request{handler = Pid} = Req, SvcName, Timeout, TRef}) -> - Ref = send_request(TPid, - Pkt, - Req#request{handler = self()}, - SvcName, - Timeout), - Pid ! reref(receive T -> T end, Ref, TRef). - -reref({T, Ref, R}, Ref, TRef) -> - {T, TRef, R}; -reref(T, _, _) -> - T. - -%% send/2 - -send(Pid, Pkt) -> - Pid ! {send, Pkt}. - -%% retransmit/4 - -retransmit({TPid, Caps, App} - = Transport, - #request{packet = Pkt0} - = Req, - SvcName, - Timeout) -> - have_request(Pkt0, TPid) %% Don't failover to a peer we've - andalso ?THROW(timeout), %% already sent to. - - #diameter_packet{header = Hdr0} = Pkt0, - Hdr = Hdr0#diameter_header{is_retransmitted = true}, - Pkt = Pkt0#diameter_packet{header = Hdr}, - - retransmit(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]), - Transport, - Req#request{packet = Pkt}, - SvcName, - Timeout, - []). - -retransmit({send, Msg}, - Transport, - #request{packet = Pkt} - = Req, - SvcName, - Timeout, - Fs) -> - resend_request(make_request_packet(Msg, Pkt), - Transport, - Req, - SvcName, - Timeout, - Fs); - -retransmit({discard, Reason}, _, _, _, _, _) -> - ?THROW(Reason); - -retransmit(discard, _, _, _, _, _) -> - ?THROW(discarded); - -retransmit({eval_packet, RC, F}, Transport, Req, SvcName, Timeout, Fs) -> - retransmit(RC, Transport, Req, SvcName, Timeout, [F|Fs]); - -retransmit(T, {_, _, App}, _, _, _, _) -> - ?ERROR({invalid_return, prepare_retransmit, App, T}). - -resend_request(Pkt0, - {TPid, Caps, #diameter_app{dictionary = Dict}}, - Req0, - SvcName, - Tmo, - Fs) -> - Pkt = encode(Dict, Pkt0, Fs), - - Req = Req0#request{transport = TPid, - packet = Pkt0, - caps = Caps}, - - ?LOG(retransmission, Req), - TRef = send_request(TPid, Pkt, Req, SvcName, Tmo), - {TRef, Req}. - -%% store_request/5 - -store_request(TPid, Bin, Req, Timeout, SvcName) -> - Seqs = diameter_codec:sequence_numbers(Bin), - TRef = erlang:start_timer(Timeout, self(), timeout), - ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}), - ets:member(?REQUEST_TABLE, TPid) - orelse failover(whois(SvcName), TRef, Seqs), - TRef. - -%% Induce failover for a request that was stored after a transport -%% went down and which the service may have missed. -failover(Pid, TRef, Seqs) -> - is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}). - -%% lookup_request/2 - -lookup_request(Msg, TPid) - when is_pid(TPid) -> - lookup(Msg, TPid, '_'); - -lookup_request(Msg, TRef) - when is_reference(TRef) -> - lookup(Msg, '_', TRef). - -lookup(Msg, TPid, TRef) -> - Seqs = diameter_codec:sequence_numbers(Msg), - Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, TRef}, - [], - ['$_']}], - case ets:select(?REQUEST_TABLE, Spec) of - [{_, Req, _}] -> - Req; - [] -> - false - end. - -%% erase_requests/1 - -erase_requests(Pkt) -> - ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)). - -%% match_requests/1 - -match_requests(TPid) -> - Pat = {'_', #request{transport = TPid, _ = '_'}, '_'}, - ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}]). - -%% have_request/2 - -have_request(Pkt, TPid) -> - Seqs = diameter_codec:sequence_numbers(Pkt), - Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'}, - '$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1). - -%% request_peer_up/1 - -%% Insert an element that is used to detect whether or not there has -%% been a failover when inserting an outgoing request. -request_peer_up(TPid) -> - ets:insert(?REQUEST_TABLE, {TPid}). - -%% request_peer_down/1 - -request_peer_down(TPid) -> - ets:delete(?REQUEST_TABLE, TPid), - lists:foreach(fun failover/1, match_requests(TPid)). -%% Note that a request process can store its request after failover -%% notifications are sent here: store_request/4 sends the notification -%% in that case. - -%% --------------------------------------------------------------------------- -%% recv_request/4 -%% --------------------------------------------------------------------------- - -recv_request(TPid, Pkt, Dict0, #recvdata{peerT = PeerT} = RecvData) -> - try ets:lookup(PeerT, TPid) of - [Pr] -> - recv_request(Pr, TPid, Pkt, Dict0, RecvData); - [] -> %% transport has gone down - ok - catch - error: badarg -> %% service has gone down (and taken table with it) - ok - end. - -%% recv_request/5 - -recv_request(#peer{apps = SApps, caps = Caps}, - TPid, - Pkt, - Dict0, - RecvData) -> - #diameter_packet{header = #diameter_header{application_id = Id}} - = Pkt, - - recv_request(find_recv_app(Id, SApps), - TPid, - Caps, - Pkt, - Dict0, - RecvData). - -%% find_recv_app/2 - -%% No one should be sending the relay identifier. -find_recv_app(?APP_ID_RELAY, _) -> - false; - -%% With any other id, must either support it or be a relay. -find_recv_app(Id, SApps) -> - keyfind([Id, ?APP_ID_RELAY], 1, SApps). - -%% keyfind/3 - -keyfind([], _, _) -> - false; -keyfind([Key | Rest], Pos, L) -> - case lists:keyfind(Key, Pos, L) of - false -> - keyfind(Rest, Pos, L); - T -> - T - end. - -%% recv_request/6 - -recv_request({Id, Alias}, TPid, Caps, Pkt, Dict0, RecvData) -> - #diameter_app{dictionary = Dict} - = App - = find_app(Alias, RecvData#recvdata.apps), - recv_req(App, - TPid, - Caps, - Dict0, - RecvData, - diameter_codec:decode(Id, Dict, Pkt)); -%% Note that the decode is different depending on whether or not Id is -%% ?APP_ID_RELAY. - -%% DIAMETER_APPLICATION_UNSUPPORTED 3007 -%% A request was sent for an application that is not supported. - -recv_request(false, TPid, Caps, Pkt, Dict0, _) -> - As = collect_avps(Pkt), - protocol_error(3007, TPid, Caps, Dict0, Pkt#diameter_packet{avps = As}). - -collect_avps(Pkt) -> - case diameter_codec:collect_avps(Pkt) of - {_Bs, As} -> - As; - As -> - As - end. - -%% recv_req/6 - -%% Wrong number of bits somewhere in the message: reply. -%% -%% DIAMETER_INVALID_AVP_BITS 3009 -%% A request was received that included an AVP whose flag bits are -%% set to an unrecognized value, or that is inconsistent with the -%% AVP's definition. -%% -recv_req(_App, - TPid, - Caps, - Dict0, - _RecvData, - #diameter_packet{errors = [Bs | _]} = Pkt) - when is_bitstring(Bs) -> - protocol_error(3009, TPid, Caps, Dict0, Pkt); - -%% Either we support this application but don't recognize the command -%% or we're a relay and the command isn't proxiable. -%% -%% DIAMETER_COMMAND_UNSUPPORTED 3001 -%% The Request contained a Command-Code that the receiver did not -%% recognize or support. This MUST be used when a Diameter node -%% receives an experimental command that it does not understand. -%% -recv_req(#diameter_app{id = Id}, - TPid, - Caps, - Dict0, - _RecvData, - #diameter_packet{header = #diameter_header{is_proxiable = P}, - msg = M} - = Pkt) - when ?APP_ID_RELAY /= Id, undefined == M; - ?APP_ID_RELAY == Id, not P -> - protocol_error(3001, TPid, Caps, Dict0, Pkt); - -%% Error bit was set on a request. -%% -%% DIAMETER_INVALID_HDR_BITS 3008 -%% A request was received whose bits in the Diameter header were -%% either set to an invalid combination, or to a value that is -%% inconsistent with the command code's definition. -%% -recv_req(_App, - TPid, - Caps, - Dict0, - _RecvData, - #diameter_packet{header = #diameter_header{is_error = true}} - = Pkt) -> - protocol_error(3008, TPid, Caps, Dict0, Pkt); - -%% A message in a locally supported application or a proxiable message -%% in the relay application. Don't distinguish between the two since -%% each application has its own callback config. That is, the user can -%% easily distinguish between the two cases. -recv_req(App, TPid, Caps, Dict0, RecvData, Pkt) -> - request_cb(App, TPid, Caps, Dict0, RecvData, examine(Pkt)). - -%% Note that there may still be errors but these aren't protocol -%% (3xxx) errors that lead to an answer-message. - -request_cb(App, - TPid, - Caps, - Dict0, - #recvdata{service_name = SvcName} - = RecvData, - Pkt) -> - request_cb(cb(App, handle_request, [Pkt, SvcName, {TPid, Caps}]), - App, - TPid, - Caps, - Dict0, - RecvData, - [], - Pkt). - -%% examine/1 -%% -%% Look for errors in a decoded message. Length errors result in -%% decode failure in diameter_codec. - -examine(#diameter_packet{header = #diameter_header{version - = ?DIAMETER_VERSION}} - = Pkt) -> - Pkt; - -%% DIAMETER_UNSUPPORTED_VERSION 5011 -%% This error is returned when a request was received, whose version -%% number is unsupported. - -examine(#diameter_packet{errors = Es} = Pkt) -> - Pkt#diameter_packet{errors = [5011 | Es]}. -%% It's odd/unfortunate that this isn't a protocol error. - -%% request_cb/8 - -%% A reply may be an answer-message, constructed either here or by -%% the handle_request callback. The header from the incoming request -%% is passed into the encode so that it can retrieve the relevant -%% command code in this case. It will also then ignore Dict and use -%% the base encoder. -request_cb({reply, Ans}, - #diameter_app{dictionary = Dict}, - TPid, - _Caps, - Dict0, - _RecvData, - Fs, - Pkt) -> - reply(Ans, dict(Dict, Dict0, Ans), TPid, Fs, Pkt); - -%% An 3xxx result code, for which the E-bit is set in the header. -request_cb({protocol_error, RC}, - _App, - TPid, - Caps, - Dict0, - _RecvData, - Fs, - Pkt) - when 3000 =< RC, RC < 4000 -> - protocol_error(RC, TPid, Caps, Dict0, Fs, Pkt); - -%% RFC 3588 says we must reply 3001 to anything unrecognized or -%% unsupported. 'noreply' is undocumented (and inappropriately named) -%% backwards compatibility for this, protocol_error the documented -%% alternative. -request_cb(noreply, - _App, - TPid, - Caps, - Dict0, - _RecvData, - Fs, - Pkt) -> - protocol_error(3001, TPid, Caps, Dict0, Fs, Pkt); - -%% Relay a request to another peer. This is equivalent to doing an -%% explicit call/4 with the message in question except that (1) a loop -%% will be detected by examining Route-Record AVP's, (3) a -%% Route-Record AVP will be added to the outgoing request and (3) the -%% End-to-End Identifier will default to that in the -%% #diameter_header{} without the need for an end_to_end_identifier -%% option. -%% -%% relay and proxy are similar in that they require the same handling -%% with respect to Route-Record and End-to-End identifier. The -%% difference is that a proxy advertises specific applications, while -%% a relay advertises the relay application. If a callback doesn't -%% want to distinguish between the cases in the callback return value -%% then 'resend' is a neutral alternative. -%% -request_cb({A, Opts}, - #diameter_app{id = Id} - = App, - TPid, - Caps, - Dict0, - RecvData, - Fs, - Pkt) - when A == relay, Id == ?APP_ID_RELAY; - A == proxy, Id /= ?APP_ID_RELAY; - A == resend -> - resend(Opts, App, TPid, Caps, Dict0, RecvData, Fs, Pkt); - -request_cb(discard, _, _, _, _, _, _, _) -> - ok; - -request_cb({eval_packet, RC, F}, App, TPid, Caps, Dict0, RecvData, Fs, Pkt) -> - request_cb(RC, App, TPid, Caps, Dict0, RecvData, [F|Fs], Pkt); - -request_cb({eval, RC, F}, App, TPid, Caps, Dict0, RecvData, Fs, Pkt) -> - request_cb(RC, App, TPid, Caps, Dict0, RecvData, Fs, Pkt), - diameter_lib:eval(F). - -%% dict/3 - -%% An incoming answer, not yet decoded. -dict(Dict, Dict0, #diameter_packet{header - = #diameter_header{is_request = false, - is_error = E}, - msg = undefined}) -> - if E -> Dict0; true -> Dict end; - -dict(Dict, Dict0, [Msg]) -> - dict(Dict, Dict0, Msg); - -dict(Dict, Dict0, #diameter_packet{msg = Msg}) -> - dict(Dict, Dict0, Msg); - -dict(_Dict, Dict0, ['answer-message' | _]) -> - Dict0; - -dict(Dict, Dict0, Rec) -> - try - 'answer-message' = Dict0:rec2msg(element(1,Rec)), - Dict0 - catch - error:_ -> Dict - end. - -%% protocol_error/6 - -protocol_error(RC, TPid, Caps, Dict0, Fs, Pkt) -> - #diameter_caps{origin_host = {OH,_}, - origin_realm = {OR,_}} - = Caps, - #diameter_packet{avps = Avps, errors = Es} - = Pkt, - - ?LOG({error, RC}, Pkt), - reply(answer_message({OH, OR, RC}, Dict0, Avps), - Dict0, - TPid, - Fs, - Pkt#diameter_packet{errors = [RC | Es]}). -%% Note that reply/5 may set the result code once more. It's set in -%% answer_message/3 in case reply/5 doesn't. - -%% protocol_error/5 - -protocol_error(RC, TPid, Caps, Dict0, Pkt) -> - protocol_error(RC, TPid, Caps, Dict0, [], Pkt). - -%% resend/7 -%% -%% Resend a message as a relay or proxy agent. - -resend(Opts, - #diameter_app{} - = App, - TPid, - #diameter_caps{origin_host = {OH,_}} - = Caps, - Dict0, - RecvData, - Fs, - #diameter_packet{avps = Avps} - = Pkt) -> - {Code, _Flags, Vid} = Dict0:avp_header('Route-Record'), - resend(is_loop(Code, Vid, OH, Dict0, Avps), - Opts, - App, - TPid, - Caps, - Dict0, - RecvData, - Fs, - Pkt). - -%% DIAMETER_LOOP_DETECTED 3005 -%% An agent detected a loop while trying to get the message to the -%% intended recipient. The message MAY be sent to an alternate peer, -%% if one is available, but the peer reporting the error has -%% identified a configuration problem. - -resend(true, _Opts, _App, TPid, Caps, Dict0, _RecvData, Fs, Pkt) -> - protocol_error(3005, TPid, Caps, Dict0, Fs, Pkt); - -%% 6.1.8. Relaying and Proxying Requests -%% -%% A relay or proxy agent MUST append a Route-Record AVP to all requests -%% forwarded. The AVP contains the identity of the peer the request was -%% received from. - -resend(false, - Opts, - App, - TPid, - #diameter_caps{origin_host = {_,OH}} - = Caps, - Dict0, - #recvdata{service_name = SvcName, - sequence = Mask}, - Fs, - #diameter_packet{header = Hdr0, - avps = Avps} - = Pkt) -> - Route = #diameter_avp{data = {Dict0, 'Route-Record', OH}}, - Seq = diameter_session:sequence(Mask), - Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq}, - Msg = [Hdr, Route | Avps], - resend(call(SvcName, App, Msg, Opts), TPid, Caps, Dict0, Fs, Pkt). -%% The incoming request is relayed with the addition of a -%% Route-Record. Note the requirement on the return from call/4 below, -%% which places a requirement on the value returned by the -%% handle_answer callback of the application module in question. -%% -%% Note that there's nothing stopping the request from being relayed -%% back to the sender. A pick_peer callback may want to avoid this but -%% a smart peer might recognize the potential loop and choose another -%% route. A less smart one will probably just relay the request back -%% again and force us to detect the loop. A pick_peer that wants to -%% avoid this can specify filter to avoid the possibility. -%% Eg. {neg, {host, OH} where #diameter_caps{origin_host = {OH, _}}. -%% -%% RFC 6.3 says that a relay agent does not modify Origin-Host but -%% says nothing about a proxy. Assume it should behave the same way. - -%% resend/6 -%% -%% Relay a reply to a relayed request. - -%% Answer from the peer: reset the hop by hop identifier and send. -resend(#diameter_packet{bin = B} - = Pkt, - TPid, - _Caps, - _Dict0, - Fs, - #diameter_packet{header = #diameter_header{hop_by_hop_id = Id}, - transport_data = TD}) -> - P = Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B), - transport_data = TD}, - eval_packet(P, Fs), - send(TPid, P); -%% TODO: counters - -%% Or not: DIAMETER_UNABLE_TO_DELIVER. -resend(_, TPid, Caps, Dict0, Fs, Pkt) -> - protocol_error(3002, TPid, Caps, Dict0, Fs, Pkt). - -%% is_loop/5 -%% -%% Is there a Route-Record AVP with our Origin-Host? - -is_loop(Code, - Vid, - Bin, - _Dict0, - [#diameter_avp{code = Code, vendor_id = Vid, data = Bin} | _]) -> - true; - -is_loop(_, _, _, _, []) -> - false; - -is_loop(Code, Vid, OH, Dict0, [_ | Avps]) - when is_binary(OH) -> - is_loop(Code, Vid, OH, Dict0, Avps); - -is_loop(Code, Vid, OH, Dict0, Avps) -> - is_loop(Code, Vid, Dict0:avp(encode, OH, 'Route-Record'), Dict0, Avps). - -%% reply/5 -%% -%% Send a locally originating reply. - -%% Skip the setting of Result-Code and Failed-AVP's below. This is -%% currently undocumented. -reply([Msg], Dict, TPid, Fs, Pkt) - when is_list(Msg); - is_tuple(Msg) -> - reply(Msg, Dict, TPid, Fs, Pkt#diameter_packet{errors = []}); - -%% No errors or a diameter_header/avp list. -reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = Es} = ReqPkt) - when [] == Es; - is_record(hd(Msg), diameter_header) -> - Pkt = encode(Dict, make_answer_packet(Msg, ReqPkt), Fs), - incr(send, Pkt, Dict, TPid), %% count result codes in sent answers - send(TPid, Pkt); - -%% Or not: set Result-Code and Failed-AVP AVP's. -reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = [H|_] = Es} = Pkt) -> - reply(rc(Msg, rc(H), [A || {_,A} <- Es], Dict), - Dict, - TPid, - Fs, - Pkt#diameter_packet{errors = []}). - -eval_packet(Pkt, Fs) -> - lists:foreach(fun(F) -> diameter_lib:eval([F,Pkt]) end, Fs). - -%% make_answer_packet/2 - -%% A reply message clears the R and T flags and retains the P flag. -%% The E flag will be set at encode. 6.2 of 3588 requires the same P -%% flag on an answer as on the request. A #diameter_packet{} returned -%% from a handle_request callback can circumvent this by setting its -%% own header values. -make_answer_packet(#diameter_packet{header = Hdr, - msg = Msg, - transport_data = TD}, - #diameter_packet{header = ReqHdr}) -> - Hdr0 = ReqHdr#diameter_header{version = ?DIAMETER_VERSION, - is_request = false, - is_error = undefined, - is_retransmitted = false}, - #diameter_packet{header = fold_record(Hdr0, Hdr), - msg = Msg, - transport_data = TD}; - -%% Binaries and header/avp lists are sent as-is. -make_answer_packet(Bin, #diameter_packet{transport_data = TD}) - when is_binary(Bin) -> - #diameter_packet{bin = Bin, - transport_data = TD}; -make_answer_packet([#diameter_header{} | _] = Msg, - #diameter_packet{transport_data = TD}) -> - #diameter_packet{msg = Msg, - transport_data = TD}; - -%% Otherwise, preserve transport_data. -make_answer_packet(Msg, #diameter_packet{transport_data = TD} = Pkt) -> - make_answer_packet(#diameter_packet{msg = Msg, transport_data = TD}, Pkt). - -%% rc/1 - -rc({RC, _}) -> - RC; -rc(RC) -> - RC. - -%% rc/4 - -rc(#diameter_packet{msg = Rec} = Pkt, RC, Failed, DictT) -> - Pkt#diameter_packet{msg = rc(Rec, RC, Failed, DictT)}; - -rc(Rec, RC, Failed, DictT) - when is_integer(RC) -> - set(Rec, - lists:append([rc(Rec, {'Result-Code', RC}, DictT), - failed_avp(Rec, Failed, DictT)]), - DictT). - -%% Reply as name and tuple list ... -set([_|_] = Ans, Avps, _) -> - Ans ++ Avps; %% Values nearer tail take precedence. - -%% ... or record. -set(Rec, Avps, Dict) -> - Dict:'#set-'(Avps, Rec). - -%% rc/3 -%% -%% Turn the result code into a list if its optional and only set it if -%% the arity is 1 or {0,1}. In other cases (which probably shouldn't -%% exist in practise) we can't know what's appropriate. - -rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) -> - case Dict:avp_arity(MsgName, 'Result-Code') of - 1 -> [T]; - {0,1} -> [{K, [RC]}]; - _ -> [] - end; - -rc(Rec, T, Dict) -> - rc([Dict:rec2msg(element(1, Rec))], T, Dict). - -%% failed_avp/3 - -failed_avp(_, [] = No, _) -> - No; - -failed_avp(Rec, Failed, Dict) -> - [fa(Rec, [{'AVP', Failed}], Dict)]. - -%% Reply as name and tuple list ... -fa([MsgName | Values], FailedAvp, Dict) -> - R = Dict:msg2rec(MsgName), - try - Dict:'#info-'(R, {index, 'Failed-AVP'}), - {'Failed-AVP', [FailedAvp]} - catch - error: _ -> - Avps = proplists:get_value('AVP', Values, []), - A = #diameter_avp{name = 'Failed-AVP', - value = FailedAvp}, - {'AVP', [A|Avps]} - end; - -%% ... or record. -fa(Rec, FailedAvp, Dict) -> - try - {'Failed-AVP', [FailedAvp]} - catch - error: _ -> - Avps = Dict:'get-'('AVP', Rec), - A = #diameter_avp{name = 'Failed-AVP', - value = FailedAvp}, - {'AVP', [A|Avps]} - end. - -%% 3. Diameter Header -%% -%% E(rror) - If set, the message contains a protocol error, -%% and the message will not conform to the ABNF -%% described for this command. Messages with the 'E' -%% bit set are commonly referred to as error -%% messages. This bit MUST NOT be set in request -%% messages. See Section 7.2. - -%% 3.2. Command Code ABNF specification -%% -%% e-bit = ", ERR" -%% ; If present, the 'E' bit in the Command -%% ; Flags is set, indicating that the answer -%% ; message contains a Result-Code AVP in -%% ; the "protocol error" class. - -%% 7.1.3. Protocol Errors -%% -%% Errors that fall within the Protocol Error category SHOULD be treated -%% on a per-hop basis, and Diameter proxies MAY attempt to correct the -%% error, if it is possible. Note that these and only these errors MUST -%% only be used in answer messages whose 'E' bit is set. - -%% Thus, only construct answers to protocol errors. Other errors -%% require an message-specific answer and must be handled by the -%% application. - -%% 6.2. Diameter Answer Processing -%% -%% When a request is locally processed, the following procedures MUST be -%% applied to create the associated answer, in addition to any -%% additional procedures that MAY be discussed in the Diameter -%% application defining the command: -%% -%% - The same Hop-by-Hop identifier in the request is used in the -%% answer. -%% -%% - The local host's identity is encoded in the Origin-Host AVP. -%% -%% - The Destination-Host and Destination-Realm AVPs MUST NOT be -%% present in the answer message. -%% -%% - The Result-Code AVP is added with its value indicating success or -%% failure. -%% -%% - If the Session-Id is present in the request, it MUST be included -%% in the answer. -%% -%% - Any Proxy-Info AVPs in the request MUST be added to the answer -%% message, in the same order they were present in the request. -%% -%% - The 'P' bit is set to the same value as the one in the request. -%% -%% - The same End-to-End identifier in the request is used in the -%% answer. -%% -%% Note that the error messages (see Section 7.3) are also subjected to -%% the above processing rules. - -%% 7.3. Error-Message AVP -%% -%% The Error-Message AVP (AVP Code 281) is of type UTF8String. It MAY -%% accompany a Result-Code AVP as a human readable error message. The -%% Error-Message AVP is not intended to be useful in real-time, and -%% SHOULD NOT be expected to be parsed by network entities. - -%% answer_message/3 - -answer_message({OH, OR, RC}, Dict0, Avps) -> - {Code, _, Vid} = Dict0:avp_header('Session-Id'), - ['answer-message', {'Origin-Host', OH}, - {'Origin-Realm', OR}, - {'Result-Code', RC} - | session_id(Code, Vid, Dict0, Avps)]. - -session_id(Code, Vid, Dict0, Avps) - when is_list(Avps) -> - try - {value, #diameter_avp{data = D}} = find_avp(Code, Vid, Avps), - [{'Session-Id', [Dict0:avp(decode, D, 'Session-Id')]}] - catch - error: _ -> - [] - end. - -%% find_avp/3 - -find_avp(Code, Vid, Avps) - when is_integer(Code), (undefined == Vid orelse is_integer(Vid)) -> - find(fun(A) -> is_avp(Code, Vid, A) end, Avps). - -%% The final argument here could be a list of AVP's, depending on the case, -%% but we're only searching at the top level. -is_avp(Code, Vid, #diameter_avp{code = Code, vendor_id = Vid}) -> - true; -is_avp(_, _, _) -> - false. - -find(_, []) -> - false; -find(Pred, [H|T]) -> - case Pred(H) of - true -> - {value, H}; - false -> - find(Pred, T) - end. - -%% 7. Error Handling -%% -%% There are certain Result-Code AVP application errors that require -%% additional AVPs to be present in the answer. In these cases, the -%% Diameter node that sets the Result-Code AVP to indicate the error -%% MUST add the AVPs. Examples are: -%% -%% - An unrecognized AVP is received with the 'M' bit (Mandatory bit) -%% set, causes an answer to be sent with the Result-Code AVP set to -%% DIAMETER_AVP_UNSUPPORTED, and the Failed-AVP AVP containing the -%% offending AVP. -%% -%% - An AVP that is received with an unrecognized value causes an -%% answer to be returned with the Result-Code AVP set to -%% DIAMETER_INVALID_AVP_VALUE, with the Failed-AVP AVP containing the -%% AVP causing the error. -%% -%% - A command is received with an AVP that is omitted, yet is -%% mandatory according to the command's ABNF. The receiver issues an -%% answer with the Result-Code set to DIAMETER_MISSING_AVP, and -%% creates an AVP with the AVP Code and other fields set as expected -%% in the missing AVP. The created AVP is then added to the Failed- -%% AVP AVP. -%% -%% The Result-Code AVP describes the error that the Diameter node -%% encountered in its processing. In case there are multiple errors, -%% the Diameter node MUST report only the first error it encountered -%% (detected possibly in some implementation dependent order). The -%% specific errors that can be described by this AVP are described in -%% the following section. - -%% 7.5. Failed-AVP AVP -%% -%% The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides -%% debugging information in cases where a request is rejected or not -%% fully processed due to erroneous information in a specific AVP. The -%% value of the Result-Code AVP will provide information on the reason -%% for the Failed-AVP AVP. -%% -%% The possible reasons for this AVP are the presence of an improperly -%% constructed AVP, an unsupported or unrecognized AVP, an invalid AVP -%% value, the omission of a required AVP, the presence of an explicitly -%% excluded AVP (see tables in Section 10), or the presence of two or -%% more occurrences of an AVP which is restricted to 0, 1, or 0-1 -%% occurrences. -%% -%% A Diameter message MAY contain one Failed-AVP AVP, containing the -%% entire AVP that could not be processed successfully. If the failure -%% reason is omission of a required AVP, an AVP with the missing AVP -%% code, the missing vendor id, and a zero filled payload of the minimum -%% required length for the omitted AVP will be added. - -%% --------------------------------------------------------------------------- -%% # handle_answer/3 -%% --------------------------------------------------------------------------- - -%% Process an answer message in call-specific process. - -handle_answer(SvcName, App, {error, Req, Reason}) -> - handle_error(App, Req, Reason, SvcName); - -handle_answer(SvcName, - #diameter_app{dictionary = Dict} - = App, - {answer, Req, Dict0, Pkt}) -> - Mod = dict(Dict, Dict0, Pkt), - answer(examine(diameter_codec:decode(Mod, Pkt)), - SvcName, - Mod, - App, - Req). - -%% We don't really need to do a full decode if we're a relay and will -%% just resend with a new hop by hop identifier, but might a proxy -%% want to examine the answer? - -answer(Pkt, SvcName, Dict, App, #request{transport = TPid} = Req) -> - try - incr(recv, Pkt, Dict, TPid) - of - _ -> answer(Pkt, SvcName, App, Req) - catch - exit: {invalid_error_bit, _} = E -> - answer(Pkt#diameter_packet{errors = [E]}, SvcName, App, Req) - end. - -answer(Pkt, - SvcName, - #diameter_app{module = ModX, - options = [{answer_errors, AE} | _]}, - Req) -> - a(Pkt, SvcName, ModX, AE, Req). - -a(#diameter_packet{errors = Es} - = Pkt, - SvcName, - ModX, - AE, - #request{transport = TPid, - caps = Caps, - packet = P}) - when [] == Es; - callback == AE -> - cb(ModX, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]); - -a(Pkt, SvcName, _, report, Req) -> - x(errors, handle_answer, [SvcName, Req, Pkt]); - -a(Pkt, SvcName, _, discard, Req) -> - x({errors, handle_answer, [SvcName, Req, Pkt]}). - -%% Note that we don't check that the application id in the answer's -%% header is what we expect. (TODO: Does the rfc says anything about -%% this?) - -%% incr/4 -%% -%% Increment a stats counter for an incoming or outgoing message. - -%% Outgoing message as binary: don't yet count. (TODO) -incr(_, #diameter_packet{msg = undefined}, _, _) -> - ok; - -incr(recv = D, #diameter_packet{header = H, errors = [_|_]}, _, TPid) -> - incr(TPid, {diameter_codec:msg_id(H), D, error}); - -incr(Dir, Pkt, Dict, TPid) -> - #diameter_packet{header = #diameter_header{is_error = E} - = Hdr, - msg = Rec} - = Pkt, - - RC = int(get_avp_value(Dict, 'Result-Code', Rec)), - PE = is_protocol_error(RC), - - %% Check that the E bit is set only for 3xxx result codes. - (not (E orelse PE)) - orelse (E andalso PE) - orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]), - - irc(TPid, Hdr, Dir, rc_counter(Dict, Rec, RC)). - -irc(_, _, _, undefined) -> - false; - -irc(TPid, Hdr, Dir, Ctr) -> - incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}). - -%% incr/2 - -incr(TPid, Counter) -> - diameter_stats:incr(Counter, TPid, 1). - -%% error_counter/2 - -%% RFC 3588, 7.6: -%% -%% All Diameter answer messages defined in vendor-specific -%% applications MUST include either one Result-Code AVP or one -%% Experimental-Result AVP. -%% -%% Maintain statistics assuming one or the other, not both, which is -%% surely the intent of the RFC. - -rc_counter(Dict, Rec, undefined) -> - er(get_avp_value(Dict, 'Experimental-Result', Rec)); -rc_counter(_, _, RC) -> - {'Result-Code', RC}. - -%% Outgoing answers may be in any of the forms messages can be sent -%% in. Incoming messages will be records. We're assuming here that the -%% arity of the result code AVP's is 0 or 1. - -er([{_,_,N} = T | _]) - when is_integer(N) -> - T; -er({_,_,N} = T) - when is_integer(N) -> - T; -er(_) -> - undefined. - -%% Extract the first good looking integer. There's no guarantee -%% that what we're looking for has arity 1. -int([N|_]) - when is_integer(N) -> - N; -int(N) - when is_integer(N) -> - N; -int(_) -> - undefined. - -is_protocol_error(RC) -> - 3000 =< RC andalso RC < 4000. - --spec x(any(), atom(), list()) -> no_return(). - -%% Warn and exit request process on errors in an incoming answer. -x(Reason, F, A) -> - diameter_lib:warning_report(Reason, {?MODULE, F, A}), - x(Reason). - -x(T) -> - exit(T). - -%% --------------------------------------------------------------------------- -%% # failover/1-2 -%% --------------------------------------------------------------------------- - -%% Failover as a consequence of request_peer_down/1: inform the -%% request process. -failover({_, Req, TRef}) -> - #request{handler = Pid, - packet = #diameter_packet{msg = M}} - = Req, - M /= undefined andalso (Pid ! {failover, TRef}). -%% Failover is not performed when msg = binary() since sending -%% pre-encoded binaries is only partially supported. (Mostly for -%% test.) - -%% Failover as a consequence of store_request/4. -failover(TRef, Seqs) - when is_reference(TRef) -> - case lookup_request(Seqs, TRef) of - #request{} = Req -> - failover({Seqs, Req, TRef}); - false -> - ok - end. - -%% --------------------------------------------------------------------------- %% # report_status/5 %% --------------------------------------------------------------------------- @@ -2693,152 +1301,19 @@ rpd(Pid, Alias, PDict) -> ?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict). %% --------------------------------------------------------------------------- -%% find_transport/4 -%% -%% Return: {TransportPid, #diameter_caps{}, #diameter_app{}} -%% | false -%% | {error, Reason} -%% --------------------------------------------------------------------------- - -%% Initial call, from an arbitrary process. -find_transport({alias, Alias}, - Msg, - Opts, - #state{service = #diameter_service{applications = Apps}} - = S) -> - find_transport(find_send_app(Alias, Apps), Msg, Opts, S); - -%% Relay or proxy send. -find_transport(#diameter_app{module = ModX, dictionary = Dict} - = App, - Msg, - #options{filter = Filter, - extra = Xtra}, - S) -> - pick_peer(App#diameter_app{module = ModX ++ Xtra}, - get_destination(Dict, Msg), - Filter, - S); - -%% Retransmission after failover. -find_transport(#request{packet = #diameter_packet{msg = Msg}}, - #diameter_app{dictionary = Dict} - = App, - #options{filter = Filter}, - S) - when Msg /= undefined -> %% retransmission of binaries is unsupported - pick_peer(App, - get_destination(Dict, Msg), - Filter, - S); - -find_transport(_, _, _, _) -> - false. - -find_send_app(Alias, Apps) -> - case find_app(Alias, Apps) of - #diameter_app{id = ?APP_ID_RELAY} -> - false; - #diameter_app{} = A -> - A; - false = No -> - No - end. - -%% get_destination/2 - -get_destination(Dict, Msg) -> - [str(get_avp_value(Dict, 'Destination-Realm', Msg)), - str(get_avp_value(Dict, 'Destination-Host', Msg))]. - -%% This is not entirely correct. The avp could have an arity 1, in -%% which case an empty list is a DiameterIdentity of length 0 rather -%% than the list of no values we treat it as by mapping to undefined. -%% This behaviour is documented. -str([]) -> - undefined; -str(T) -> - T. - -%% get_avp_value/3 -%% -%% Find an AVP in a message of one of three forms: -%% -%% - a message record (as generated from a .dia spec) or -%% - a list of an atom message name followed by 2-tuple, avp name/value pairs. -%% - a list of a #diameter_header{} followed by #diameter_avp{} records, -%% -%% In the first two forms a dictionary module is used at encode to -%% identify the type of the AVP and its arity in the message in -%% question. The third form allows messages to be sent as is, without -%% a dictionary, which is needed in the case of relay agents, for one. - -%% Messages will be header/avps list as a relay and the only AVP's we -%% look for are in the common dictionary. This is required since the -%% relay dictionary doesn't inherit the common dictionary (which maybe -%% it should). -get_avp_value(?RELAY, Name, Msg) -> - get_avp_value(?BASE, Name, Msg); - -%% Message sent as a header/avps list, probably a relay case but not -%% necessarily. -get_avp_value(Dict, Name, [#diameter_header{} | Avps]) -> - try - {Code, _, VId} = Dict:avp_header(Name), - [A|_] = lists:dropwhile(fun(#diameter_avp{code = C, vendor_id = V}) -> - C /= Code orelse V /= VId - end, - Avps), - avp_decode(Dict, Name, A) - catch - error: _ -> - undefined - end; - -%% Outgoing message as a name/values list. -get_avp_value(_, Name, [_MsgName | Avps]) -> - case lists:keyfind(Name, 1, Avps) of - {_, V} -> - V; - _ -> - undefined - end; - -%% Message is typically a record but not necessarily. -get_avp_value(Dict, Name, Rec) -> - try - Dict:'#get-'(Name, Rec) - catch - error:_ -> - undefined - end. - -avp_decode(Dict, Name, #diameter_avp{value = undefined, - data = Bin}) -> - Dict:avp(decode, Bin, Name); -avp_decode(_, _, #diameter_avp{value = V}) -> - V. - -%% --------------------------------------------------------------------------- -%% # pick_peer/4 -%% -%% Return: {TransportPid, #diameter_caps{}, App} -%% | false -%% | {error, Reason} +%% pick_peer/4 %% --------------------------------------------------------------------------- -%% Find transports to a given realm/host. - pick_peer(#diameter_app{alias = Alias} = App, - [_Realm ,_Host] = RH, + RealmAndHost, Filter, #state{local_peers = L, shared_peers = S, service_name = SvcName, service = #diameter_service{pid = Pid}}) -> - pick_peer(peers(Alias, RH, Filter, L), - peers(Alias, RH, Filter, S), + pick_peer(peers(Alias, RealmAndHost, Filter, L), + peers(Alias, RealmAndHost, Filter, S), Pid, SvcName, App). @@ -2851,7 +1326,12 @@ pick_peer([], [], _, _, _) -> %% App state is mutable but we're not in the service process: go there. pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App) when self() /= Pid -> - call_service(Pid, {pick_peer, Local, Remote, App}); + case call_service(Pid, {pick_peer, Local, Remote, App}) of + {TPid, _} = T when is_pid(TPid) -> + T; + {error, _} -> + false + end; %% App state isn't mutable or it is and we're in the service process: %% do the deed. @@ -2859,19 +1339,18 @@ pick_peer(Local, Remote, _Pid, SvcName, - #diameter_app{module = ModX, - alias = Alias, + #diameter_app{alias = Alias, init_state = S, mutable = M} = App) -> - MFA = {ModX, pick_peer, [Local, Remote, SvcName]}, + Args = [Local, Remote, SvcName], - try state_cb(App, MFA) of - {ok, {TPid, #diameter_caps{} = Caps}} when is_pid(TPid) -> - {TPid, Caps, App}; - {{TPid, #diameter_caps{} = Caps}, ModS} when is_pid(TPid), M -> + try state_cb(App, pick_peer, Args) of + {ok, {TPid, #diameter_caps{}} = T} when is_pid(TPid) -> + T; + {{TPid, #diameter_caps{}} = T, ModS} when is_pid(TPid), M -> mod_state(Alias, ModS), - {TPid, Caps, App}; + T; {false = No, ModS} when M -> mod_state(Alias, ModS), No; @@ -2879,15 +1358,17 @@ pick_peer(Local, No; false = No -> No; - {{TPid, #diameter_caps{} = Caps}, S} when is_pid(TPid) -> - {TPid, Caps, App}; %% Accept returned state in the immutable + {{TPid, #diameter_caps{}} = T, S} when is_pid(TPid) -> + T; %% Accept returned state in the immutable {false = No, S} -> %% case as long it isn't changed. No; T -> - diameter_lib:error_report({invalid, T, App}, MFA) + diameter_lib:error_report({invalid, T, App}, + {App, pick_peer, Args}) catch E: Reason -> - diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA) + diameter_lib:error_report({failure, {E, Reason, ?STACK}}, + {App, pick_peer, Args}) end. %% peers/4 @@ -3258,22 +1739,7 @@ mk_app(#diameter_app{} = A) -> %% One entry for each outgoing request whose answer is outstanding. info_pending(#state{} = S) -> - MatchSpec = [{{'$1', - #request{caller = '$2', - handler = '$3', - transport = '$4', - _ = '_'}, - '_'}, - [?ORCOND([{'==', T, '$2'} || T <- transports(S)])], - [{{'$1', [{{caller, '$2'}}, - {{handler, '$3'}}, - {{transport, '$4'}}]}}]}], - - try - ets:select(?REQUEST_TABLE, MatchSpec) - catch - error: badarg -> [] %% service has gone down - end. + diameter_traffic:pending(transports(S)). %% info_connections/1 %% |