%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2013-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %% %% %CopyrightEnd% %% %% %% Implements the handling of incoming and outgoing Diameter messages %% except CER/CEA, DWR/DWA and DPR/DPA. That is, the messages that a %% diameter client sends and receives. %% -module(diameter_traffic). %% towards diameter -export([send_request/4]). %% towards diameter_watchdog -export([receive_message/5]). %% towards diameter_peer_fsm and diameter_watchdog -export([incr/4, incr_error/4, incr_rc/4]). %% towards diameter_service -export([make_recvdata/1, peer_up/1, peer_down/1]). %% internal -export([send/1, %% send from remote node init/1]). %% monitor process start -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). -define(LOGX(Reason, T), begin ?LOG(Reason, T), x({Reason, T}) end). -define(RELAY, ?DIAMETER_DICT_RELAY). -define(BASE, ?DIAMETER_DICT_COMMON). %% Note: the RFC 3588 dictionary -define(DEFAULT(V, Def), if V == undefined -> Def; true -> V end). %% Table containing outgoing entries that live and die with %% peer_up/down. The name is historic, since the table used to contain %% information about outgoing requests for which an answer has yet to %% be received. -define(REQUEST_TABLE, diameter_request). %% Record diameter:call/4 options are parsed into. -record(options, {peers = [] :: [diameter:peer_ref()], filter = none :: diameter:peer_filter(), extra = [] :: list(), timeout = 5000 :: 0..16#FFFFFFFF, %% for outgoing requests detach = false :: boolean()}). %% Term passed back to receive_message/6 with every incoming message. -record(recvdata, {peerT :: ets:tid(), service_name :: diameter:service_name(), apps :: [#diameter_app{}], sequence :: diameter:sequence(), codec :: #{record_decode := boolean() | map | list, string_decode := boolean(), strict_mbit := boolean(), incoming_maxlen := diameter:message_length()}}). %% Note that incoming_maxlen is currently handled in diameter_peer_fsm, %% so that any message exceeding the maximum is discarded. Retain the %% option in case we want to extend the values and semantics. %% Record stored in diameter_request for each outgoing request. -record(request, {ref :: reference(), %% used to receive answer caller :: pid() | undefined, %% calling process handler :: pid(), %% request process peer :: undefined | {pid(), #diameter_caps{}}, packet :: #diameter_packet{} | undefined}). %% of request %% --------------------------------------------------------------------------- %% make_recvdata/1 %% --------------------------------------------------------------------------- make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) -> #{sequence := {_,_} = Mask, spawn_opt := Opts} = SvcOpts, {Opts, #recvdata{service_name = SvcName, peerT = PeerT, apps = Apps, sequence = Mask, codec = maps:with([record_decode, string_decode, strict_mbit, ordered_encode, incoming_maxlen], SvcOpts)}}. %% --------------------------------------------------------------------------- %% peer_up/1 %% --------------------------------------------------------------------------- %% Start a process that dies with peer_down/1, on which request %% processes can monitor. There is no other process that dies with %% peer_down since failover doesn't imply the loss of transport in the %% case of a watchdog transition into state SUSPECT. peer_up(TPid) -> proc_lib:start(?MODULE, init, [TPid]). init(TPid) -> ets:insert(?REQUEST_TABLE, {TPid, self()}), proc_lib:init_ack(self()), proc_lib:hibernate(erlang, exit, [{shutdown, TPid}]). %% --------------------------------------------------------------------------- %% peer_down/1 %% --------------------------------------------------------------------------- peer_down(TPid) -> [{_, Pid}] = ets:lookup(?REQUEST_TABLE, TPid), ets:delete(?REQUEST_TABLE, TPid), Pid ! ok, %% make it die Pid. %% --------------------------------------------------------------------------- %% incr/4 %% --------------------------------------------------------------------------- incr(Dir, #diameter_packet{header = H}, TPid, AppDict) -> incr(Dir, H, TPid, AppDict); incr(Dir, #diameter_header{} = H, TPid, AppDict) -> incr(TPid, {msg_id(H, AppDict), Dir}). %% --------------------------------------------------------------------------- %% incr_error/4 %% --------------------------------------------------------------------------- %% Identify messages using the application dictionary, not the encode %% dictionary, which may differ in the case of answer-message. incr_error(Dir, T, Pid, {_MsgDict, AppDict}) -> incr_error(Dir, T, Pid, AppDict); %% Decoded message without errors. incr_error(recv, #diameter_packet{errors = []}, _, _) -> ok; incr_error(recv = D, #diameter_packet{header = H}, TPid, AppDict) -> incr_error(D, H, TPid, AppDict); %% Encoded message with errors and an identifiable header ... incr_error(send = D, {_, _, #diameter_header{} = H}, TPid, AppDict) -> incr_error(D, H, TPid, AppDict); %% ... or not. incr_error(send = D, {_,_}, TPid, _) -> incr_error(D, unknown, TPid); incr_error(Dir, #diameter_header{} = H, TPid, AppDict) -> incr_error(Dir, msg_id(H, AppDict), TPid); incr_error(Dir, Id, TPid, _) -> incr_error(Dir, Id, TPid). incr_error(Dir, Id, TPid) -> incr(TPid, {Id, Dir, error}). %% --------------------------------------------------------------------------- %% incr_rc/4 %% --------------------------------------------------------------------------- -spec incr_rc(send|recv, Pkt, TPid, DictT) -> {Counter, non_neg_integer()} | Reason when Pkt :: #diameter_packet{}, TPid :: pid(), DictT :: module() | {MsgDict :: module(), AppDict :: module(), CommonDict:: module()}, Counter :: {'Result-Code', integer()} | {'Experimental-Result', integer(), integer()}, Reason :: atom(). incr_rc(Dir, Pkt, TPid, {_, AppDict, _} = DictT) -> try incr_result(Dir, Pkt, TPid, DictT) catch exit: {E,_} when E == no_result_code; E == invalid_error_bit -> incr(TPid, {msg_id(Pkt#diameter_packet.header, AppDict), Dir, E}), E end; incr_rc(Dir, Pkt, TPid, Dict0) -> incr_rc(Dir, Pkt, TPid, {Dict0, Dict0, Dict0}). %% --------------------------------------------------------------------------- %% receive_message/5 %% %% Handle an incoming Diameter message in a watchdog process. %% --------------------------------------------------------------------------- -spec receive_message(pid(), Route, #diameter_packet{}, module(), RecvData) -> pid() %% request handler | boolean() %% answer, known request or not | discard %% request discarded by MFA when Route :: {Handler, RequestRef, Seqs} | Ack, RecvData :: {[SpawnOpt], #recvdata{}}, SpawnOpt :: term(), Handler :: pid(), RequestRef :: reference(), Seqs :: {0..16#FFFFFFFF, 0..16#FFFFFFFF}, Ack :: boolean(). receive_message(TPid, Route, Pkt, Dict0, RecvData) -> #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, recv(R, Route, TPid, Pkt, Dict0, RecvData). %% recv/6 %% Incoming request ... recv(true, Ack, TPid, Pkt, Dict0, T) when is_boolean(Ack) -> {Opts, RecvData} = T, spawn_request(Ack, TPid, Pkt, Dict0, RecvData, Opts); %% ... answer to known request ... recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) -> Pid ! {answer, Ref, TPid, Dict0, Pkt}, true; %% Note that failover could have happened prior to this message being %% received and triggering failback. That is, both a failover message %% and answer may be on their way to the handler process. In the worst %% case the request process gets notification of the failover and %% sends to the alternate peer before an answer arrives, so it's %% always the case that we can receive more than one answer after %% failover. The first answer received by the request process wins, %% any others are discarded. %% ... or not. recv(false, false, TPid, Pkt, _, _) -> ?LOG(discarded, Pkt#diameter_packet.header), incr(TPid, {{unknown, 0}, recv, discarded}), false. %% spawn_request/6 %% An MFA should return a pid() or the atom 'discard'. The latter %% results in an acknowledgment back to the transport process when %% appropriate, to ensure that send/recv callbacks can count %% outstanding requests. Acknowledgement is implicit if the %% handler process dies (in a handle_request callback for example). spawn_request(Ack, TPid, Pkt, Dict0, RecvData, {M,F,A}) -> ReqF = fun() -> ack(Ack, TPid, recv_request(Ack, TPid, Pkt, Dict0, RecvData)) end, ack(Ack, TPid, apply(M, F, [ReqF | A])); %% A spawned process acks implicitly when it dies, so there's no need %% to handle 'discard'. spawn_request(Ack, TPid, Pkt, Dict0, RecvData, Opts) -> spawn_opt(fun() -> recv_request(Ack, TPid, Pkt, Dict0, RecvData) end, Opts). %% ack/3 ack(Ack, TPid, RC) -> RC == discard andalso Ack andalso (TPid ! {send, false}), RC. %% --------------------------------------------------------------------------- %% recv_request/5 %% --------------------------------------------------------------------------- -spec recv_request(Ack :: boolean(), TPid :: pid(), #diameter_packet{}, Dict0 :: module(), #recvdata{}) -> ok %% answer was sent | discard %% or not | false. %% no transport recv_request(Ack, TPid, #diameter_packet{header = #diameter_header{application_id = Id}} = Pkt, Dict0, #recvdata{peerT = PeerT, apps = Apps} = RecvData) -> Ack andalso (TPid ! {handler, self()}), case diameter_service:find_incoming_app(PeerT, TPid, Id, Apps) of {#diameter_app{id = Aid, dictionary = AppDict} = App, Caps} -> incr(recv, Pkt, TPid, AppDict), DecPkt = decode(Aid, AppDict, RecvData, Pkt), incr_error(recv, DecPkt, TPid, AppDict), send_A(recv_R(App, TPid, Dict0, Caps, RecvData, DecPkt), TPid, App, Dict0, RecvData, DecPkt, Caps); #diameter_caps{} = Caps -> %% DIAMETER_APPLICATION_UNSUPPORTED 3007 %% A request was sent for an application that is not %% supported. RC = 3007, Es = Pkt#diameter_packet.errors, DecPkt = Pkt#diameter_packet{avps = collect_avps(Pkt), errors = [RC | Es]}, send_answer(answer_message(RC, Dict0, Caps, DecPkt), TPid, Dict0, Dict0, Dict0, RecvData, DecPkt, [[]]); false = No -> %% transport has gone down No end. decode(Id, Dict, #recvdata{codec = Opts}, Pkt) -> errors(Id, diameter_codec:decode(Id, Dict, Opts, Pkt)). collect_avps(Pkt) -> case diameter_codec:collect_avps(Pkt) of {_Error, Avps} -> Avps; Avps -> Avps end. %% send_A/7 send_A([T | Fs], TPid, App, Dict0, RecvData, DecPkt, Caps) -> send_A(T, TPid, App, Dict0, RecvData, DecPkt, Caps, Fs); send_A(discard = No, _, _, _, _, _, _) -> No. %% recv_R/6 %% Answer errors ourselves ... recv_R(#diameter_app{options = [_, {request_errors, E} | _]}, _TPid, Dict0, _Caps, _RecvData, #diameter_packet{errors = [RC|_]}) %% a detected 3xxx is hd when E == answer, Dict0 /= ?BASE orelse 3 == RC div 1000; E == answer_3xxx, 3 == RC div 1000 -> [{answer_message, rc(RC)}, []]; %% ... or make a handle_request callback. Note that %% Pkt#diameter_packet.msg = undefined in the 3001 case. recv_R(App, TPid, _Dict0, Caps, #recvdata{service_name = SvcName}, Pkt) -> request_cb(cb(App, handle_request, [Pkt, SvcName, {TPid, Caps}]), App, [], []). rc({N,_}) -> N; rc(N) -> N. %% errors/1 %% %% Look for additional errors in a decoded message, prepending the %% errors field with the first detected error. It's odd/unfortunate %% that 501[15] aren't protocol errors. With RFC 3588 this means that %% a handle_request callback has to formulate the answer. With RFC %% 6733 it's acceptable for 5xxx to be sent in an answer-message. %% DIAMETER_INVALID_MESSAGE_LENGTH 5015 %% This error is returned when a request is received with an invalid %% message length. errors(_, #diameter_packet{header = #diameter_header{length = Len} = H, bin = Bin, errors = Es} = Pkt) when Len < 20; 0 /= Len rem 4; 8*Len /= bit_size(Bin) -> ?LOG(invalid_message_length, {H, bit_size(Bin)}), Pkt#diameter_packet{errors = [5015 | Es]}; %% DIAMETER_UNSUPPORTED_VERSION 5011 %% This error is returned when a request was received, whose version %% number is unsupported. errors(_, #diameter_packet{header = #diameter_header{version = V} = H, errors = Es} = Pkt) when V /= ?DIAMETER_VERSION -> ?LOG(unsupported_version, H), Pkt#diameter_packet{errors = [5011 | Es]}; %% DIAMETER_COMMAND_UNSUPPORTED 3001 %% The Request contained a Command-Code that the receiver did not %% recognize or support. This MUST be used when a Diameter node %% receives an experimental command that it does not understand. errors(Id, #diameter_packet{header = #diameter_header{is_proxiable = P} = H, msg = M, errors = Es} = Pkt) when ?APP_ID_RELAY /= Id, undefined == M; %% don't know the command ?APP_ID_RELAY == Id, not P -> %% command isn't proxiable ?LOG(command_unsupported, H), Pkt#diameter_packet{errors = [3001 | Es]}; %% DIAMETER_INVALID_HDR_BITS 3008 %% A request was received whose bits in the Diameter header were %% either set to an invalid combination, or to a value that is %% inconsistent with the command code's definition. errors(_, #diameter_packet{header = #diameter_header{is_request = true, is_error = true} = H, errors = Es} = Pkt) -> ?LOG(invalid_hdr_bits, H), Pkt#diameter_packet{errors = [3008 | Es]}; %% Green. errors(_, Pkt) -> Pkt. %% request_cb/4 %% A reply may be an answer-message, constructed either here or by %% the handle_request callback. The header from the incoming request %% is passed into the encode so that it can retrieve the relevant %% command code in this case. It will also then ignore Dict and use %% the base encoder. request_cb({reply, _Ans} = T, _App, EvalPktFs, EvalFs) -> [T, EvalPktFs | EvalFs]; %% An 3xxx result code, for which the E-bit is set in the header. request_cb({protocol_error, RC}, _App, EvalPktFs, EvalFs) when 3 == RC div 1000 -> [{answer_message, RC}, EvalPktFs | EvalFs]; request_cb({answer_message, RC} = T, _App, EvalPktFs, EvalFs) when 3 == RC div 1000; 5 == RC div 1000 -> [T, EvalPktFs | EvalFs]; %% RFC 3588 says we must reply 3001 to anything unrecognized or %% unsupported. 'noreply' is undocumented (and inappropriately named) %% backwards compatibility for this, protocol_error the documented %% alternative. request_cb(noreply, _App, EvalPktFs, EvalFs) -> [{answer_message, 3001}, EvalPktFs | EvalFs]; %% Relay a request to another peer. This is equivalent to doing an %% explicit call/4 with the message in question except that (1) a loop %% will be detected by examining Route-Record AVP's, (3) a %% Route-Record AVP will be added to the outgoing request and (3) the %% End-to-End Identifier will default to that in the %% #diameter_header{} without the need for an end_to_end_identifier %% option. %% %% relay and proxy are similar in that they require the same handling %% with respect to Route-Record and End-to-End identifier. The %% difference is that a proxy advertises specific applications, while %% a relay advertises the relay application. If a callback doesn't %% want to distinguish between the cases in the callback return value %% then 'resend' is a neutral alternative. %% request_cb({A, Opts}, #diameter_app{id = Id}, EvalPktFs, EvalFs) when A == relay, Id == ?APP_ID_RELAY; A == proxy, Id /= ?APP_ID_RELAY; A == resend -> [{call, Opts}, EvalPktFs | EvalFs]; request_cb(discard = No, _, _, _) -> No; request_cb({eval_packet, RC, F}, App, Fs, EvalFs) -> request_cb(RC, App, [F|Fs], EvalFs); request_cb({eval, RC, F}, App, EvalPktFs, Fs) -> request_cb(RC, App, EvalPktFs, [F|Fs]); request_cb(T, App, _, _) -> ?ERROR({invalid_return, T, handle_request, App}). %% send_A/8 send_A({reply, Ans}, TPid, App, Dict0, RecvData, Pkt, _Caps, Fs) -> AppDict = App#diameter_app.dictionary, MsgDict = msg_dict(AppDict, Dict0, Ans), send_answer(Ans, TPid, MsgDict, AppDict, Dict0, RecvData, Pkt, Fs); send_A({call, Opts}, TPid, App, Dict0, RecvData, Pkt, Caps, Fs) -> AppDict = App#diameter_app.dictionary, case resend(Opts, Caps, Pkt, App, Dict0, RecvData) of #diameter_packet{bin = Bin} = Ans -> %% answer: reset hop by hop id #diameter_packet{header = #diameter_header{hop_by_hop_id = Id}, transport_data = TD} = Pkt, Reset = diameter_codec:hop_by_hop_id(Id, Bin), MsgDict = msg_dict(AppDict, Dict0, Ans), send_answer(Ans#diameter_packet{bin = Reset, transport_data = TD}, TPid, MsgDict, AppDict, Dict0, Fs); RC -> send_answer(answer_message(RC, Dict0, Caps, Pkt), TPid, Dict0, AppDict, Dict0, RecvData, Pkt, Fs) end; %% RFC 3588 only allows 3xxx errors in an answer-message. RFC 6733 %% added the possibility of setting 5xxx. send_A({answer_message, RC} = T, TPid, App, Dict0, RecvData, Pkt, Caps, Fs) -> Dict0 /= ?BASE orelse 3 == RC div 1000 orelse ?ERROR({invalid_return, T, handle_request, App}), send_answer(answer_message(RC, Dict0, Caps, Pkt), TPid, Dict0, App#diameter_app.dictionary, Dict0, RecvData, Pkt, Fs). %% send_answer/8 %% Skip the setting of Result-Code and Failed-AVP's below. This is %% undocumented and shouldn't be relied on. send_answer([Ans], TPid, MsgDict, AppDict, Dict0, RecvData, Pkt, Fs) when [] == Pkt#diameter_packet.errors -> send_answer(Ans, TPid, MsgDict, AppDict, Dict0, RecvData, Pkt, Fs); send_answer([Ans], TPid, MsgDict, AppDict, Dict0, RecvData, Pkt0, Fs) -> Pkt = Pkt0#diameter_packet{errors = []}, send_answer(Ans, TPid, MsgDict, AppDict, Dict0, RecvData, Pkt, Fs); send_answer(Ans, TPid, MsgDict, AppDict, Dict0, RecvData, DecPkt, Fs) -> Pkt = encode({MsgDict, AppDict}, TPid, RecvData#recvdata.codec, make_answer_packet(Ans, DecPkt, MsgDict, Dict0)), send_answer(Pkt, TPid, MsgDict, AppDict, Dict0, Fs). %% send_answer/6 send_answer(Pkt, TPid, MsgDict, AppDict, Dict0, [EvalPktFs | EvalFs]) -> eval_packet(Pkt, EvalPktFs), incr(send, Pkt, TPid, AppDict), incr_rc(send, Pkt, TPid, {MsgDict, AppDict, Dict0}), %% count outgoing send(TPid, z(Pkt), _Route = self()), lists:foreach(fun diameter_lib:eval/1, EvalFs). %% msg_dict/3 %% %% Return the dictionary defining the message grammar in question: the %% application dictionary or the common dictionary. msg_dict(AppDict, Dict0, [Msg]) -> msg_dict(AppDict, Dict0, Msg); msg_dict(AppDict, Dict0, Msg) -> choose(is_answer_message(Msg, Dict0), Dict0, AppDict). %% Incoming, not yet decoded. is_answer_message(#diameter_packet{header = #diameter_header{} = H, msg = undefined}, Dict0) -> is_answer_message([H], Dict0); is_answer_message(#diameter_packet{msg = Msg}, Dict0) -> is_answer_message(Msg, Dict0); %% Message sent as a header/avps list. is_answer_message([#diameter_header{is_request = R, is_error = E} | _], _) -> E andalso not R; %% Message sent as a tagged avp/value list. is_answer_message([Name | _], _) -> Name == 'answer-message'; %% Message sent as a map. is_answer_message(Map, _) when is_map(Map) -> #{':name' := Name} = Map, Name == 'answer-message'; %% Message sent as a record. is_answer_message(Rec, Dict) -> try 'answer-message' == Dict:rec2msg(element(1,Rec)) catch error:_ -> false end. %% resend/6 resend(Opts, Caps, Pkt, App, Dict0, RecvData) -> resend(is_loop(Dict0, Caps, Pkt), Opts, Caps, Pkt, App, Dict0, RecvData). %% resend/7 %% DIAMETER_LOOP_DETECTED 3005 %% An agent detected a loop while trying to get the message to the %% intended recipient. The message MAY be sent to an alternate peer, %% if one is available, but the peer reporting the error has %% identified a configuration problem. resend(true, _Opts, _Caps, _Pkt, _App, _Dict0, _RecvData) -> 3005; %% 6.1.8. Relaying and Proxying Requests %% %% A relay or proxy agent MUST append a Route-Record AVP to all requests %% forwarded. The AVP contains the identity of the peer the request was %% received from. resend(false, Opts, #diameter_caps{origin_host = {_,OH}}, #diameter_packet{header = Hdr0, avps = Avps}, App, Dict0, #recvdata{service_name = SvcName, sequence = Mask}) -> Route = #diameter_avp{data = {Dict0, 'Route-Record', OH}}, Seq = diameter_session:sequence(Mask), Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq}, Msg = [Hdr, Route | Avps], %% reordered at encode case send_request(SvcName, App, Msg, Opts) of #diameter_packet{} = Ans -> Ans; _ -> 3002 %% DIAMETER_UNABLE_TO_DELIVER. end. %% The incoming request is relayed with the addition of a %% Route-Record. Note the requirement on the return from call/4 below, %% which places a requirement on the value returned by the %% handle_answer callback of the application module in question. %% %% Note that there's nothing stopping the request from being relayed %% back to the sender. A pick_peer callback may want to avoid this but %% a smart peer might recognize the potential loop and choose another %% route. A less smart one will probably just relay the request back %% again and force us to detect the loop. A pick_peer that wants to %% avoid this can specify filter to avoid the possibility. %% Eg. {neg, {host, OH} where #diameter_caps{origin_host = {OH, _}}. %% %% RFC 6.3 says that a relay agent does not modify Origin-Host but %% says nothing about a proxy. Assume it should behave the same way. %% is_loop/3 is_loop(Dict0, #diameter_caps{origin_host = {OH,_}}, #diameter_packet{avps = Avps}) -> {Code, _Flags, Vid} = Dict0:avp_header('Route-Record'), is_loop(Code, Vid, OH, Avps). %% is_loop/4 %% %% Is there a Route-Record AVP with our Origin-Host? is_loop(Code, Vid, Bin, [#diameter_avp{code = Code, vendor_id = Vid, data = Bin} | _]) -> true; is_loop(_, _, _, []) -> false; is_loop(Code, Vid, OH, [_ | Avps]) when is_binary(OH) -> is_loop(Code, Vid, OH, Avps); is_loop(Code, Vid, OH, Avps) -> is_loop(Code, Vid, list_to_binary(OH), Avps). %% select_error/3 %% %% Extract the first appropriate RC or {RC, #diameter_avp{}} %% pair from an errors list, along with any leading #diameter_avp{}. %% %% RFC 6733: %% %% 7.5. Failed-AVP AVP %% %% The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides %% debugging information in cases where a request is rejected or not %% fully processed due to erroneous information in a specific AVP. The %% value of the Result-Code AVP will provide information on the reason %% for the Failed-AVP AVP. A Diameter answer message SHOULD contain an %% instance of the Failed-AVP AVP that corresponds to the error %% indicated by the Result-Code AVP. For practical purposes, this %% Failed-AVP would typically refer to the first AVP processing error %% that a Diameter node encounters. %% %% 3xxx can only be set in an answer setting the E-bit. RFC 6733 also %% allows 5xxx, RFC 3588 doesn't. select_error(E, Es, Dict0) -> select(E, Es, Dict0, []). %% select/4 select(E, [{RC, _} = T | Es], Dict0, Avps) -> select(E, RC, T, Es, Dict0, Avps); select(E, [#diameter_avp{} = A | Es], Dict0, Avps) -> select(E, Es, Dict0, [A | Avps]); select(E, [RC | Es], Dict0, Avps) -> select(E, RC, RC, Es, Dict0, Avps); select(_, [], _, Avps) -> Avps. %% select/6 select(E, RC, T, _, Dict0, Avps) when E, 3000 =< RC, RC < 4000; %% E-bit with 3xxx E, ?BASE /= Dict0, 5000 =< RC, RC < 6000; %% E-bit with 5xxx not E, RC < 3000 orelse 4000 =< RC -> %% no E-bit [T | Avps]; select(E, _, _, Es, Dict0, Avps) -> select(E, Es, Dict0, Avps). %% eval_packet/2 eval_packet(Pkt, Fs) -> lists:foreach(fun(F) -> diameter_lib:eval([F,Pkt]) end, Fs). %% make_answer_packet/4 %% Use decode errors to set Result-Code and/or Failed-AVP unless the %% the errors field has been explicitly set. Unfortunately, the %% default value is the empty list rather than 'undefined' so use the %% atom 'false' for "set nothing". (This is historical and changing %% the default value would impact anyone expecting relying on the old %% default.) make_answer_packet(#diameter_packet{header = Hdr, msg = Msg, errors = Es, transport_data = TD}, #diameter_packet{header = Hdr0, errors = Es0}, MsgDict, Dict0) -> #diameter_packet{header = make_answer_header(Hdr0, Hdr), msg = reset(Msg, Es0, Es, MsgDict, Dict0), transport_data = TD}; %% Binaries and header/avp lists are sent as-is. make_answer_packet(Bin, #diameter_packet{transport_data = TD}, _, _) when is_binary(Bin) -> #diameter_packet{bin = Bin, transport_data = TD}; make_answer_packet([#diameter_header{} | _] = Msg, #diameter_packet{transport_data = TD}, _, _) -> #diameter_packet{msg = Msg, transport_data = TD}; make_answer_packet(Msg, #diameter_packet{header = Hdr, errors = Es, transport_data = TD}, MsgDict, Dict0) -> #diameter_packet{header = make_answer_header(Hdr, undefined), msg = reset(Msg, [], Es, MsgDict, Dict0), transport_data = TD}. %% make_answer_header/2 %% A reply message clears the R and T flags and retains the P flag. %% The E flag will be set at encode. 6.2 of 3588 requires the same P %% flag on an answer as on the request. A #diameter_packet{} returned %% from a handle_request callback can circumvent this by setting its %% own header values. make_answer_header(ReqHdr, Hdr) -> Hdr0 = ReqHdr#diameter_header{version = ?DIAMETER_VERSION, is_request = false, is_error = undefined, is_retransmitted = false}, fold_record(Hdr0, Hdr). %% reset/5 reset(Msg, [_|_] = Es0, [] = Es, MsgDict, Dict0) -> reset(Msg, Es, Es0, MsgDict, Dict0); reset(Msg, _, Es, _, _) when Es == false; Es == [] -> Msg; reset(Msg, _, Es, MsgDict, Dict0) -> E = is_answer_message(Msg, Dict0), reset(Msg, select_error(E, Es, Dict0), choose(E, Dict0, MsgDict)). %% reset/4 %% %% Set Result-Code and/or Failed-AVP (maybe). Only RC and {RC, AVP} %% are the result of decode. AVP or {RC, [AVP]} can be set in an %% answer for encode, as a convenience for injecting additional AVPs %% into Failed-AVP; eg. 5001 = DIAMETER_AVP_UNSUPPORTED. reset(Msg, [], _) -> Msg; reset(Msg, [{RC, As} | Avps], Dict) when is_list(As) -> reset(Msg, [RC | As ++ Avps], Dict); reset(Msg, [{RC, Avp} | Avps], Dict) -> reset(Msg, [RC, Avp | Avps], Dict); reset(Msg, [#diameter_avp{} | _] = Avps, Dict) -> set(Msg, failed_avp(Msg, Avps, Dict), Dict); reset(Msg, [RC | Avps], Dict) -> set(Msg, rc(Msg, RC, Dict) ++ failed_avp(Msg, Avps, Dict), Dict). %% set/3 %% Reply as name and tuple list ... set([_|_] = Ans, Avps, _) -> Ans ++ Avps; %% Values nearer tail take precedence. %% ... a map ... set(Ans, Avps, _) when is_map(Ans) -> maps:merge(Ans, maps:from_list(Avps)); %% ... or record. set(Rec, Avps, Dict) -> Dict:'#set-'(Avps, Rec). %% rc/3 %% %% Turn the result code into a list if its optional and only set it if %% the arity is 1 or {0,1}. In other cases (which probably shouldn't %% exist in practice) we can't know what's appropriate. rc([MsgName | _], RC, Dict) -> K = 'Result-Code', case Dict:avp_arity(MsgName, K) of 1 -> [{K, RC}]; {0,1} -> [{K, [RC]}]; _ -> [] end; rc(#{':name' := Name}, RC, Dict) -> rc([Name], RC, Dict); rc(Rec, RC, Dict) -> rc([Dict:rec2msg(element(1, Rec))], RC, Dict). %% failed_avp/3 failed_avp(_, [] = No, _) -> No; failed_avp(Msg, [_|_] = Avps, Dict) -> [failed(Msg, [{'AVP', Avps}], Dict)]. %% failed/3 failed(Msg, FailedAvp, Dict) -> RecName = msg2rec(Msg, Dict), try Dict:'#info-'(RecName, {index, 'Failed-AVP'}), %% assert existence {'Failed-AVP', [FailedAvp]} catch error: _ -> Avps = values(Msg, 'AVP', Dict), A = #diameter_avp{name = 'Failed-AVP', value = FailedAvp}, {'AVP', [A|Avps]} end. %% msg2rec/2 %% Message as name/values list ... msg2rec([MsgName | _], Dict) -> Dict:msg2rec(MsgName); %% ... map ... msg2rec(#{':name' := MsgName}, Dict) -> Dict:msg2rec(MsgName); %% ... or record. msg2rec(Rec, _) -> element(1, Rec). %% values/2 %% Message as name/values list ... values([_ | Avps], F, _) -> proplists:get_value(F, Avps, []); %% ... map ... values(Msg, F, _) when is_map(Msg) -> maps:get(F, Msg, []); %% ... or record. values(Rec, F, Dict) -> Dict:'#get-'(F, Rec). %% 3. Diameter Header %% %% E(rror) - If set, the message contains a protocol error, %% and the message will not conform to the ABNF %% described for this command. Messages with the 'E' %% bit set are commonly referred to as error %% messages. This bit MUST NOT be set in request %% messages. See Section 7.2. %% 3.2. Command Code ABNF specification %% %% e-bit = ", ERR" %% ; If present, the 'E' bit in the Command %% ; Flags is set, indicating that the answer %% ; message contains a Result-Code AVP in %% ; the "protocol error" class. %% 7.1.3. Protocol Errors %% %% Errors that fall within the Protocol Error category SHOULD be treated %% on a per-hop basis, and Diameter proxies MAY attempt to correct the %% error, if it is possible. Note that these and only these errors MUST %% only be used in answer messages whose 'E' bit is set. %% Thus, only construct answers to protocol errors. Other errors %% require an message-specific answer and must be handled by the %% application. %% 6.2. Diameter Answer Processing %% %% When a request is locally processed, the following procedures MUST be %% applied to create the associated answer, in addition to any %% additional procedures that MAY be discussed in the Diameter %% application defining the command: %% %% - The same Hop-by-Hop identifier in the request is used in the %% answer. %% %% - The local host's identity is encoded in the Origin-Host AVP. %% %% - The Destination-Host and Destination-Realm AVPs MUST NOT be %% present in the answer message. %% %% - The Result-Code AVP is added with its value indicating success or %% failure. %% %% - If the Session-Id is present in the request, it MUST be included %% in the answer. %% %% - Any Proxy-Info AVPs in the request MUST be added to the answer %% message, in the same order they were present in the request. %% %% - The 'P' bit is set to the same value as the one in the request. %% %% - The same End-to-End identifier in the request is used in the %% answer. %% %% Note that the error messages (see Section 7.3) are also subjected to %% the above processing rules. %% 7.3. Error-Message AVP %% %% The Error-Message AVP (AVP Code 281) is of type UTF8String. It MAY %% accompany a Result-Code AVP as a human readable error message. The %% Error-Message AVP is not intended to be useful in real-time, and %% SHOULD NOT be expected to be parsed by network entities. %% answer_message/4 answer_message(RC, Dict0, #diameter_caps{origin_host = {OH,_}, origin_realm = {OR,_}}, #diameter_packet{avps = Avps, errors = Es}) -> {Code, _, Vid} = Dict0:avp_header('Session-Id'), ['answer-message', {'Origin-Host', OH}, {'Origin-Realm', OR}, {'Result-Code', RC}] ++ session_id(Code, Vid, Avps) ++ failed_avp(RC, Es). session_id(Code, Vid, Avps) when is_list(Avps) -> try #diameter_avp{data = Bin} = find_avp(Code, Vid, Avps), [{'Session-Id', [Bin]}] catch error: _ -> [] end. %% Note that this should only match 5xxx result codes currently but %% don't bother distinguishing this case. failed_avp(RC, [{RC, Avp} | _]) -> [{'Failed-AVP', [{'AVP', [Avp]}]}]; failed_avp(RC, [_ | Es]) -> failed_avp(RC, Es); failed_avp(_, [] = No) -> No. %% find_avp/3 %% Grouped ... find_avp(Code, VId, [[#diameter_avp{code = Code, vendor_id = VId} | _] = As | _]) -> As; %% ... or not. find_avp(Code, VId, [#diameter_avp{code = Code, vendor_id = VId} = A | _]) -> A; find_avp(Code, VId, [_ | Avps]) -> find_avp(Code, VId, Avps). %% 7. Error Handling %% %% There are certain Result-Code AVP application errors that require %% additional AVPs to be present in the answer. In these cases, the %% Diameter node that sets the Result-Code AVP to indicate the error %% MUST add the AVPs. Examples are: %% %% - An unrecognized AVP is received with the 'M' bit (Mandatory bit) %% set, causes an answer to be sent with the Result-Code AVP set to %% DIAMETER_AVP_UNSUPPORTED, and the Failed-AVP AVP containing the %% offending AVP. %% %% - An AVP that is received with an unrecognized value causes an %% answer to be returned with the Result-Code AVP set to %% DIAMETER_INVALID_AVP_VALUE, with the Failed-AVP AVP containing the %% AVP causing the error. %% %% - A command is received with an AVP that is omitted, yet is %% mandatory according to the command's ABNF. The receiver issues an %% answer with the Result-Code set to DIAMETER_MISSING_AVP, and %% creates an AVP with the AVP Code and other fields set as expected %% in the missing AVP. The created AVP is then added to the Failed- %% AVP AVP. %% %% The Result-Code AVP describes the error that the Diameter node %% encountered in its processing. In case there are multiple errors, %% the Diameter node MUST report only the first error it encountered %% (detected possibly in some implementation dependent order). The %% specific errors that can be described by this AVP are described in %% the following section. %% 7.5. Failed-AVP AVP %% %% The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides %% debugging information in cases where a request is rejected or not %% fully processed due to erroneous information in a specific AVP. The %% value of the Result-Code AVP will provide information on the reason %% for the Failed-AVP AVP. %% %% The possible reasons for this AVP are the presence of an improperly %% constructed AVP, an unsupported or unrecognized AVP, an invalid AVP %% value, the omission of a required AVP, the presence of an explicitly %% excluded AVP (see tables in Section 10), or the presence of two or %% more occurrences of an AVP which is restricted to 0, 1, or 0-1 %% occurrences. %% %% A Diameter message MAY contain one Failed-AVP AVP, containing the %% entire AVP that could not be processed successfully. If the failure %% reason is omission of a required AVP, an AVP with the missing AVP %% code, the missing vendor id, and a zero filled payload of the minimum %% required length for the omitted AVP will be added. %% incr_result/5 %% %% Increment a stats counter for result codes in incoming and outgoing %% answers. %% Message sent as a header/avps list. incr_result(send = Dir, #diameter_packet{msg = [#diameter_header{} = H | _]} = Pkt, TPid, DictT) -> incr_res(Dir, Pkt#diameter_packet{header = H}, TPid, DictT); %% Outgoing message as binary: don't count. (Sending binaries is only %% partially supported.) incr_result(send, #diameter_packet{header = undefined = No}, _, _) -> No; %% Incoming or outgoing. Outgoing with encode errors never gets here %% since encode fails. incr_result(Dir, Pkt, TPid, DictT) -> incr_res(Dir, Pkt, TPid, DictT). incr_res(Dir, #diameter_packet{header = #diameter_header{is_error = E} = Hdr, errors = Es} = Pkt, TPid, DictT) -> {MsgDict, AppDict, Dict0} = DictT, Id = msg_id(Hdr, AppDict), %% Could be {relay, 0}, in which case the R-bit is redundant since %% only answers are being counted. Let it be however, so that the %% same tuple is in both send/recv and result code counters. %% Count incoming decode errors. recv /= Dir orelse [] == Es orelse incr_error(Dir, Id, TPid, AppDict), %% Exit on a missing result code. T = rc_counter(MsgDict, Dir, Pkt), T == false andalso ?LOGX(no_result_code, {MsgDict, Dir, Hdr}), {Ctr, RC, Avp} = T, %% Or on an inappropriate value. is_result(RC, E, Dict0) orelse ?LOGX(invalid_error_bit, {MsgDict, Dir, Hdr, Avp}), incr(TPid, {Id, Dir, Ctr}), Ctr. %% msg_id/2 msg_id(#diameter_packet{header = H}, AppDict) -> msg_id(H, AppDict); %% Only count on known keys so as not to be vulnerable to attack: %% there are 2^32 (application ids) * 2^24 (command codes) = 2^56 %% pairs for an attacker to choose from. msg_id(Hdr, AppDict) -> {Aid, Code, R} = Id = diameter_codec:msg_id(Hdr), case AppDict:id() of ?APP_ID_RELAY -> {relay, R}; A -> unknown(A /= Aid orelse '' == AppDict:msg_name(Code, 0 == R), Id) end. unknown(true, {_, _, R}) -> {unknown, R}; unknown(false, Id) -> Id. %% No E-bit: can't be 3xxx. is_result(RC, false, _Dict0) -> RC < 3000 orelse 4000 =< RC; %% E-bit in RFC 3588: only 3xxx. is_result(RC, true, ?BASE) -> 3000 =< RC andalso RC < 4000; %% E-bit in RFC 6733: 3xxx or 5xxx. is_result(RC, true, _) -> 3000 =< RC andalso RC < 4000 orelse 5000 =< RC andalso RC < 6000. %% incr/2 incr(TPid, Counter) -> diameter_stats:incr(Counter, TPid, 1). %% rc_counter/3 %% RFC 3588, 7.6: %% %% All Diameter answer messages defined in vendor-specific %% applications MUST include either one Result-Code AVP or one %% Experimental-Result AVP. rc_counter(Dict, Dir, #diameter_packet{header = H, avps = As, msg = Msg}) when Dir == recv; %% decoded incoming Msg == undefined -> %% relayed outgoing rc_counter(Dict, [H|As]); rc_counter(Dict, _, #diameter_packet{msg = Msg}) -> rc_counter(Dict, Msg). rc_counter(Dict, Msg) -> rcc(get_result(Dict, Msg)). rcc(#diameter_avp{name = 'Result-Code' = Name, value = N} = A) when is_integer(N) -> {{Name, N}, N, A}; rcc(#diameter_avp{name = 'Result-Code' = Name, value = [N|_]} = A) when is_integer(N) -> {{Name, N}, N, A}; rcc(#diameter_avp{name = 'Experimental-Result', value = {_,_,N} = T} = A) when is_integer(N) -> {T, N, A}; rcc(#diameter_avp{name = 'Experimental-Result', value = [{_,_,N} = T|_]} = A) when is_integer(N) -> {T, N, A}; rcc(_) -> false. %% get_result/2 get_result(Dict, Msg) -> try [throw(A) || N <- ['Result-Code', 'Experimental-Result'], #diameter_avp{} = A <- [get_avp(Dict, N, Msg)]] catch #diameter_avp{} = A -> A end. x(T) -> exit(T). %% --------------------------------------------------------------------------- %% send_request/4 %% %% Handle an outgoing Diameter request. %% --------------------------------------------------------------------------- send_request(SvcName, AppOrAlias, Msg, Options) when is_list(Options) -> Rec = make_options(Options), Ref = make_ref(), Caller = {self(), Ref}, ReqF = fun() -> exit({Ref, send_R(SvcName, AppOrAlias, Msg, Rec, Caller)}) end, try spawn_monitor(ReqF) of {_, MRef} -> recv_A(MRef, Ref, Rec#options.detach, false) catch error: system_limit = E -> {error, E} end. %% The R in send_R is because Diameter request are usually given short %% names of the form XXR. (eg. CER, DWR, etc.) Similarly, answers have %% names of the form XXA. %% Don't rely on gen_server:call/3 for the timeout handling since it %% makes no guarantees about not leaving a reply message in the %% mailbox if we catch its exit at timeout. It currently *can* do so, %% which is also undocumented. recv_A(MRef, _, true, true) -> erlang:demonitor(MRef, [flush]), ok; recv_A(MRef, Ref, Detach, Sent) -> receive Ref -> %% send has been attempted recv_A(MRef, Ref, Detach, true); {'DOWN', MRef, process, _, Reason} -> answer_rc(Reason, Ref, Sent) end. %% send_R/5 has returned ... answer_rc({Ref, Ans}, Ref, _) -> Ans; %% ... or not. Note that failure/encode are documented return values. answer_rc(_, _, Sent) -> {error, choose(Sent, failure, encode)}. %% send_R/5 %% %% In the process spawned for the outgoing request. send_R(SvcName, AppOrAlias, Msg, CallOpts, Caller) -> case pick_peer(SvcName, AppOrAlias, Msg, CallOpts) of {{_,_} = Transport, SvcOpts} -> send_request(Transport, SvcOpts, Msg, CallOpts, Caller, SvcName); {error, _} = No -> No end. %% make_options/1 make_options(Options) -> make_opts(Options, [], false, [], none, 5000). %% Do our own recursion since this is faster than a lists:foldl/3 %% setting elements in an #options{} accumulator. make_opts([], Peers, Detach, Extra, Filter, Tmo) -> #options{peers = lists:reverse(Peers), detach = Detach, extra = Extra, filter = Filter, timeout = Tmo}; make_opts([{timeout, Tmo} | Rest], Peers, Detach, Extra, Filter, _) when is_integer(Tmo), 0 =< Tmo -> make_opts(Rest, Peers, Detach, Extra, Filter, Tmo); make_opts([{filter, F} | Rest], Peers, Detach, Extra, none, Tmo) -> make_opts(Rest, Peers, Detach, Extra, F, Tmo); make_opts([{filter, F} | Rest], Peers, Detach, Extra, {all, Fs}, Tmo) -> make_opts(Rest, Peers, Detach, Extra, {all, [F|Fs]}, Tmo); make_opts([{filter, F} | Rest], Peers, Detach, Extra, F0, Tmo) -> make_opts(Rest, Peers, Detach, Extra, {all, [F0, F]}, Tmo); make_opts([{extra, L} | Rest], Peers, Detach, Extra, Filter, Tmo) when is_list(L) -> make_opts(Rest, Peers, Detach, Extra ++ L, Filter, Tmo); make_opts([detach | Rest], Peers, _, Extra, Filter, Tmo) -> make_opts(Rest, Peers, true, Extra, Filter, Tmo); make_opts([{peer, TPid} | Rest], Peers, Detach, Extra, Filter, Tmo) when is_pid(TPid) -> make_opts(Rest, [TPid | Peers], Detach, Extra, Filter, Tmo); make_opts([T | _], _, _, _, _, _) -> ?ERROR({invalid_option, T}). %% --------------------------------------------------------------------------- %% send_request/6 %% --------------------------------------------------------------------------- %% Send an outgoing request in its dedicated process. %% %% Note that both encode of the outgoing request and of the received %% answer happens in this process. It's also this process that replies %% to the caller. The service process only handles the state-retaining %% callbacks. %% %% The module field of the #diameter_app{} here includes any extra %% arguments passed to diameter:call/4. send_request({{TPid, _Caps} = TC, App} = Transport, #{sequence := Mask} = SvcOpts, Msg0, CallOpts, Caller, SvcName) -> Pkt = make_prepare_packet(Mask, Msg0), case prepare(cb(App, prepare_request, [Pkt, SvcName, TC]), []) of [Msg | Fs] -> ReqPkt = make_request_packet(Msg, Pkt), EncPkt = encode(App#diameter_app.dictionary, TPid, SvcOpts, ReqPkt), eval_packet(EncPkt, Fs), T = send_R(ReqPkt, EncPkt, Transport, CallOpts, Caller, SvcName), Ans = recv_answer(SvcName, App, CallOpts, T), handle_answer(SvcName, SvcOpts, App, Ans); {discard, Reason} -> {error, Reason}; discard -> {error, discarded}; {error, Reason} -> ?ERROR({invalid_return, Reason, prepare_request, App}) end. %% prepare/2 prepare({send, Msg}, Fs) -> [Msg | Fs]; prepare({eval_packet, RC, F}, Fs) -> prepare(RC, [F|Fs]); prepare({discard, _Reason} = RC, _) -> RC; prepare(discard = RC, _) -> RC; prepare(Reason, _) -> {error, Reason}. %% make_prepare_packet/2 %% %% Turn an outgoing request as passed to call/4 into a diameter_packet %% record in preparation for a prepare_request callback. make_prepare_packet(_, Bin) when is_binary(Bin) -> #diameter_packet{header = diameter_codec:decode_header(Bin), bin = Bin}; make_prepare_packet(Mask, #diameter_packet{msg = [#diameter_header{} = Hdr | Avps]} = Pkt) -> Pkt#diameter_packet{msg = [make_prepare_header(Mask, Hdr) | Avps]}; make_prepare_packet(Mask, #diameter_packet{header = Hdr} = Pkt) -> Pkt#diameter_packet{header = make_prepare_header(Mask, Hdr)}; make_prepare_packet(Mask, [#diameter_header{} = Hdr | Avps]) -> #diameter_packet{msg = [make_prepare_header(Mask, Hdr) | Avps]}; make_prepare_packet(Mask, Msg) -> #diameter_packet{header = make_prepare_header(Mask, undefined), msg = Msg}. %% make_prepare_header/2 make_prepare_header(Mask, undefined) -> Seq = diameter_session:sequence(Mask), #diameter_header{version = ?DIAMETER_VERSION, end_to_end_id = Seq, hop_by_hop_id = Seq}; make_prepare_header(Mask, #diameter_header{version = V, end_to_end_id = EI, hop_by_hop_id = HI} = H) when EI == undefined; HI == undefined -> Id = diameter_session:sequence(Mask), H#diameter_header{version = ?DEFAULT(V, ?DIAMETER_VERSION), end_to_end_id = ?DEFAULT(EI, Id), hop_by_hop_id = ?DEFAULT(HI, Id)}; make_prepare_header(_, #diameter_header{version = undefined} = H) -> H#diameter_header{version = ?DIAMETER_VERSION}; make_prepare_header(_, #diameter_header{} = H) -> H; make_prepare_header(_, T) -> ?ERROR({invalid_header, T}). %% make_request_packet/2 %% %% Reconstruct a diameter_packet from the return value of %% prepare_request or prepare_retransmit callback. make_request_packet(Bin, _) when is_binary(Bin) -> make_prepare_packet(false, Bin); make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]} = Pkt, _) -> Pkt; %% Returning a diameter_packet with no header from a prepare_request %% or prepare_retransmit callback retains the header passed into it. %% This is primarily so that the end to end and hop by hop identifiers %% are retained. make_request_packet(#diameter_packet{header = Hdr} = Pkt, #diameter_packet{header = Hdr0}) -> Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)}; make_request_packet(Msg, Pkt) -> Pkt#diameter_packet{msg = Msg}. %% make_retransmit_packet/1 make_retransmit_packet(#diameter_packet{msg = [#diameter_header{} = Hdr | Avps]} = Pkt) -> Pkt#diameter_packet{msg = [make_retransmit_header(Hdr) | Avps]}; make_retransmit_packet(#diameter_packet{header = Hdr} = Pkt) -> Pkt#diameter_packet{header = make_retransmit_header(Hdr)}. %% make_retransmit_header/1 make_retransmit_header(Hdr) -> Hdr#diameter_header{is_retransmitted = true}. %% fold_record/2 %% %% Replace elements in the first record by those in the second that %% differ from undefined. fold_record(Rec0, undefined) -> Rec0; fold_record(Rec0, Rec) -> list_to_tuple(fold(tuple_to_list(Rec0), tuple_to_list(Rec))). fold([], []) -> []; fold([H | T0], [undefined | T]) -> [H | fold(T0, T)]; fold([_ | T0], [H | T]) -> [H | fold(T0, T)]. %% send_R/6 send_R(ReqPkt, EncPkt, {{TPid, _Caps} = TC, #diameter_app{dictionary = AppDict}}, #options{timeout = Timeout}, {Pid, Ref}, SvcName) -> Req = #request{ref = Ref, caller = Pid, handler = self(), peer = TC, packet = ReqPkt}, incr(send, EncPkt, TPid, AppDict), {TRef, MRef} = zend_requezt(TPid, EncPkt, Req, SvcName, Timeout), Pid ! Ref, %% tell caller a send has been attempted {TRef, MRef, Req}. %% recv_answer/4 recv_answer(SvcName, App, CallOpts, {TRef, MRef, #request{ref = Ref} = Req}) -> %% Matching on TRef below ensures we ignore messages that pertain %% to a previous transport prior to failover. The answer message %% includes the pid of the transport on which it was received, %% which may not be the last peer to which we've transmitted. receive {answer = A, Ref, TPid, Dict0, Pkt} -> %% Answer from peer {A, #request{} = erase(TPid), Dict0, Pkt}; {timeout = Reason, TRef, _} -> %% No timely reply {error, Req, Reason}; {'DOWN', MRef, process, _, _} when false /= MRef -> %% local peer_down failover(SvcName, App, Req, CallOpts); {failover, TRef} -> %% local or remote peer_down failover(SvcName, App, Req, CallOpts) end. %% failover/4 failover(SvcName, App, Req, CallOpts) -> resend_request(pick_peer(SvcName, App, Req, CallOpts), Req, CallOpts, SvcName). %% handle_answer/4 handle_answer(SvcName, _, App, {error, Req, Reason}) -> #request{packet = Pkt, peer = {_TPid, _Caps} = TC} = Req, cb(App, handle_error, [Reason, msg(Pkt), SvcName, TC]); handle_answer(SvcName, SvcOpts, #diameter_app{id = Id, dictionary = AppDict, options = [{answer_errors, AE} | _]} = App, {answer, Req, Dict0, Pkt}) -> MsgDict = msg_dict(AppDict, Dict0, Pkt), DecPkt = errors(Id, diameter_codec:decode({MsgDict, AppDict}, SvcOpts, Pkt)), #request{peer = {TPid, _}} = Req, incr(recv, DecPkt, TPid, AppDict), AnsPkt = try incr_result(recv, DecPkt, TPid, {MsgDict, AppDict, Dict0}) of _ -> DecPkt catch exit: {no_result_code, _} -> %% RFC 6733 requires one of Result-Code or %% Experimental-Result, but the decode will have %% detected a missing AVP. If both are optional in %% the dictionary then this isn't a decode error: %% just continue on. DecPkt; exit: {invalid_error_bit, {_, _, _, Avp}} -> #diameter_packet{errors = Es} = DecPkt, E = {5004, Avp}, DecPkt#diameter_packet{errors = [E|Es]} end, handle_answer(AnsPkt, SvcName, App, AE, Req). %% handle_answer/5 handle_answer(#diameter_packet{errors = Es} = Pkt, SvcName, App, AE, #request{peer = {_TPid, _Caps} = TC, packet = P}) when callback == AE; [] == Es -> cb(App, handle_answer, [Pkt, msg(P), SvcName, TC]); handle_answer(#diameter_packet{header = H}, SvcName, _, AE, _) -> handle_error(H, SvcName, AE). %% handle_error/3 -spec handle_error(_, _, _) -> no_return(). %% silence dialyzer handle_error(Hdr, SvcName, report) -> MFA = {?MODULE, handle_answer, [SvcName, Hdr]}, diameter_lib:warning_report(errors, MFA), handle_error(Hdr, SvcName, discard); handle_error(Hdr, SvcName, discard) -> x({answer_errors, {SvcName, Hdr}}). %% Note that we don't check that the application id in the answer's %% header is what we expect. (TODO: Does the rfc says anything about %% this?) %% Note that failover starts a new timer and that expiry of an old %% timer value is ignored. This means that an answer could be accepted %% from a peer after timeout in the case of failover. %% resend_request/4 resend_request({{{TPid, _Caps} = TC, App}, SvcOpts}, Req0, #options{timeout = Timeout} = CallOpts, SvcName) -> case undefined == get(TPid) andalso prepare_retransmit(TC, App, Req0, SvcName) of [ReqPkt | Fs] -> AppDict = App#diameter_app.dictionary, EncPkt = encode(AppDict, TPid, SvcOpts, ReqPkt), eval_packet(EncPkt, Fs), Req = Req0#request{peer = TC, packet = ReqPkt}, ?LOG(retransmission, EncPkt#diameter_packet.header), incr(TPid, {msg_id(EncPkt, AppDict), send, retransmission}), {TRef, MRef} = zend_requezt(TPid, EncPkt, Req, SvcName, Timeout), recv_answer(SvcName, App, CallOpts, {TRef, MRef, Req}); false -> {error, Req0, timeout}; {discard, Reason} -> {error, Req0, Reason}; discard -> {error, Req0, discarded}; {error, T} -> ?ERROR({invalid_return, T, prepare_retransmit, App}) end; resend_request(_, Req, _, _) -> %% no alternate peer {error, Req, failover}. %% pick_peer/4 %% Retransmission after failover: call-specific arguments have already %% been appended in App. pick_peer(SvcName, App, #request{packet = #diameter_packet{msg = Msg}}, CallOpts) -> pick_peer(SvcName, App, Msg, CallOpts#options{extra = []}); pick_peer(_, _, undefined, _) -> {error, no_connection}; pick_peer(SvcName, AppOrAlias, Msg, #options{peers = TPids, filter = Filter, extra = Xtra}) -> X = {fun(D) -> get_destination(D, Msg) end, Filter, Xtra, TPids}, case diameter_service:pick_peer(SvcName, AppOrAlias, X) of false -> {error, no_connection}; T -> T end. msg(#diameter_packet{msg = undefined, bin = Bin}) -> Bin; msg(#diameter_packet{msg = Msg}) -> Msg. %% encode/4 %% Note that prepare_request can return a diameter_packet containing a %% header or transport_data. Even allow the returned record to contain %% an encoded binary. This isn't the usual case and doesn't properly %% support retransmission but is useful for test. encode(Dict, TPid, Opts, Pkt) when is_atom(Dict) -> encode({Dict, Dict}, TPid, Opts, Pkt); %% A message to be encoded. encode(DictT, TPid, Opts, #diameter_packet{bin = undefined} = Pkt) -> {Dict, AppDict} = DictT, try diameter_codec:encode(Dict, Opts, Pkt) catch exit: {diameter_codec, encode, T} = Reason -> incr_error(send, T, TPid, AppDict), exit(Reason) end; %% An encoded binary: just send. encode(_, _, _, #diameter_packet{} = Pkt) -> Pkt. %% zend_requezt/5 %% %% Strip potentially large record fields that aren't used by the %% processes the records can be send to, possibly on a remote node. zend_requezt(TPid, Pkt, Req, SvcName, Timeout) -> put(TPid, Req), send_request(TPid, z(Pkt), Req, SvcName, Timeout). %% send_request/5 send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout) when node() == node(TPid) -> Seqs = diameter_codec:sequence_numbers(Bin), TRef = erlang:start_timer(Timeout, self(), TPid), send(TPid, Pkt, _Route = {self(), Req#request.ref, Seqs}), {TRef, _MRef = peer_monitor(TPid, TRef)}; %% Send using a remote transport: spawn a process on the remote node %% to relay the answer. send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) -> TRef = erlang:start_timer(Timeout, self(), TPid), T = {TPid, Pkt, z(Req), SvcName, Timeout, TRef}, spawn(node(TPid), ?MODULE, send, [T]), {TRef, false}. %% z/1 %% %% Avoid sending potentially large terms unnecessarily. The records %% themselves are retained since they're sent between nodes in send/1 %% and changing what's sent causes upgrade issues. z(#request{ref = Ref, handler = Pid}) -> #request{ref = Ref, handler = Pid}; z(#diameter_packet{header = H, bin = Bin, transport_data = T}) -> #diameter_packet{header = H, bin = Bin, transport_data = T}. %% send/1 send({TPid, Pkt, #request{handler = Pid} = Req0, SvcName, Timeout, TRef}) -> Req = Req0#request{handler = self()}, recv(TPid, Pid, TRef, zend_requezt(TPid, Pkt, Req, SvcName, Timeout)). %% recv/4 %% %% Relay an answer from a remote node. recv(TPid, Pid, TRef, {LocalTRef, MRef}) -> receive {answer, _, _, _, _} = A -> Pid ! A; {'DOWN', MRef, process, _, _} -> Pid ! {failover, TRef}; {failover = T, LocalTRef} -> Pid ! {T, TRef}; T -> exit({timeout, LocalTRef, TPid} = T) end. %% send/3 send(Pid, Pkt, Route) -> Pid ! {send, Pkt, Route}. %% prepare_retransmit/4 prepare_retransmit({_TPid, _Caps} = TC, App, Req, SvcName) -> Pkt = make_retransmit_packet(Req#request.packet), case prepare(cb(App, prepare_retransmit, [Pkt, SvcName, TC]), []) of [Msg | Fs] -> [make_request_packet(Msg, Pkt) | Fs]; No -> No end. %% When sending a binary, it's up to prepare_retransmit to modify it %% accordingly. %% peer_monitor/2 peer_monitor(TPid, TRef) -> case ets:lookup(?REQUEST_TABLE, TPid) of %% at peer_up/1 [{_, MPid}] -> monitor(process, MPid); [] -> %% transport has gone down self() ! {failover, TRef}, false end. %% get_destination/2 get_destination(Dict, Msg) -> [str(get_avp_value(Dict, D, Msg)) || D <- ['Destination-Realm', 'Destination-Host']]. %% This is not entirely correct. The avp could have an arity 1, in %% which case an empty list is a DiameterIdentity of length 0 rather %% than the list of no values we treat it as by mapping to undefined. %% This behaviour is documented. str([]) -> undefined; str(T) -> T. %% get_avp/3 %% %% Find an AVP in a message of one of three forms: %% %% - a message record (as generated from a .dia spec) or %% - a list of an atom message name followed by 2-tuple, avp name/value pairs. %% - a list of a #diameter_header{} followed by #diameter_avp{} records, %% %% In the first two forms a dictionary module is used at encode to %% identify the type of the AVP and its arity in the message in %% question. The third form allows messages to be sent as is, without %% a dictionary, which is needed in the case of relay agents, for one. %% Messages will be header/avps list as a relay and the only AVP's we %% look for are in the common dictionary. This is required since the %% relay dictionary doesn't inherit the common dictionary (which maybe %% it should). get_avp(?RELAY, Name, Msg) -> get_avp(?BASE, Name, Msg); %% Message is a header/avps list. get_avp(Dict, Name, [#diameter_header{} | Avps]) -> try {Code, _, VId} = Dict:avp_header(Name), find_avp(Code, VId, Avps) of A -> (avp_decode(Dict, Name, ungroup(A)))#diameter_avp{name = Name} catch error: _ -> undefined end; %% Message as name/values list ... get_avp(_, Name, [_MsgName | Avps]) -> case lists:keyfind(Name, 1, Avps) of {_, V} -> #diameter_avp{name = Name, value = V}; _ -> undefined end; %% ... map ... get_avp(_, Name, Map) when is_map(Map) -> case maps:find(Name, Map) of {ok, V} -> #diameter_avp{name = Name, value = V}; error -> undefined end; %% ... or record (but not necessarily). get_avp(Dict, Name, Rec) -> try #diameter_avp{name = Name, value = Dict:'#get-'(Name, Rec)} catch error:_ -> undefined end. %% get_avp_value/3 get_avp_value(Dict, Name, Msg) -> case get_avp(Dict, Name, Msg) of #diameter_avp{value = V} -> V; undefined = No -> No end. %% ungroup/1 ungroup([Avp|_]) -> Avp; ungroup(Avp) -> Avp. %% avp_decode/3 avp_decode(Dict, Name, #diameter_avp{value = undefined, data = Bin} = Avp) when is_binary(Bin) -> try Dict:avp(decode, Bin, Name, decode_opts(Dict)) of V -> Avp#diameter_avp{value = V} catch error:_ -> Avp end; avp_decode(_, _, #diameter_avp{} = Avp) -> Avp. cb(#diameter_app{module = [_|_] = M}, F, A) -> eval(M, F, A). eval([M|X], F, A) -> apply(M, F, A ++ X). choose(true, X, _) -> X; choose(false, _, X) -> X. %% Decode options sufficient for AVP extraction. decode_opts(Dict) -> #{record_decode => true, string_decode => false, strict_mbit => false, failed_avp => false, dictionary => Dict}.