diff options
Diffstat (limited to 'lib/diameter/src/base')
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 246 |
1 files changed, 116 insertions, 130 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index d5a09338cd..5015908582 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -157,15 +157,12 @@ %% Record stored in diameter_request for each outgoing request. -record(request, - {from, %% arg 2 of handle_call/3 - handler :: match(pid()), %% request process - transport :: match(pid()), %% peer process - caps :: match(#diameter_caps{}), - app :: match(diameter:app_alias()),%% #diameter_app.alias - dictionary :: match(module()), %% #diameter_app.dictionary - module :: match([module() | list()]), %% #diameter_app.module - filter :: match(diameter:peer_filter()), - packet :: match(#diameter_packet{})}). + {ref :: match(reference()), %% used to receive answer + caller :: match(pid()), %% calling process + handler :: match(pid()), %% request process + transport :: match(pid()), %% peer process + caps :: match(#diameter_caps{}), %% of connection + packet :: match(#diameter_packet{})}). %% of request %% Record call/4 options are parsed into. -record(options, @@ -176,10 +173,10 @@ %% Term passed back to receive_message/4 with every incoming message. -record(recvdata, - {peerT :: ets:tid(), + {peerT :: ets:tid(), service_name :: diameter:service_name(), - apps :: [#diameter_app{}], - sequence :: diameter:sequence()}). + apps :: [#diameter_app{}], + sequence :: diameter:sequence()}). %% --------------------------------------------------------------------------- %% # start/1 @@ -268,8 +265,7 @@ recv(true, false, TPid, Pkt, Dict0, RecvData) -> end; %% ... answer to known request ... -recv(false, #request{from = From, handler = Pid} = Req, _, Pkt, Dict0, _) -> - {_, Ref} = From, +recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> Pid ! {answer, Ref, Req, Dict0, Pkt}; %% Note that failover could have happened prior to this message being %% received and triggering failback. That is, both a failover message @@ -633,17 +629,11 @@ code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) -> unexpected(F, A, #state{service_name = Name}) -> ?UNEXPECTED(F, A ++ [Name]). -cb([_|_] = M, F, A) -> +cb(#diameter_app{module = [_|_] = M}, F, A) -> eval(M, F, A); -cb(Rec, F, A) -> - {_, M} = app(Rec), +cb([_|_] = M, F, A) -> eval(M, F, A). -app(#request{app = A, module = M}) -> - {A,M}; -app(#diameter_app{alias = A, module = M}) -> - {A,M}. - eval([M|X], F, A) -> apply(M, F, A ++ X). @@ -1299,8 +1289,8 @@ cm([_,_|_], _, _, _) -> %% to the caller. The service process only handles the state-retaining %% callbacks. %% -%% The mod field of the #diameter_app{} here includes any extra -%% arguments passed to diameter:call/2. +%% The module field of the #diameter_app{} here includes any extra +%% arguments passed to diameter:call/4. send_request({TPid, Caps, App} = Transport, @@ -1309,12 +1299,9 @@ send_request({TPid, Caps, App} Opts, Caller, SvcName) -> - #diameter_app{module = ModX} - = App, - Pkt = make_prepare_packet(Mask, Msg), - send_req(cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]), + send_req(cb(App, prepare_request, [Pkt, SvcName, {TPid, Caps}]), Pkt, Transport, Opts, @@ -1433,44 +1420,38 @@ fold_record(Rec, R) -> %% send_req/6 -send_req(Pkt0, {TPid, Caps, App}, Opts, Caller, SvcName, Fs) -> - #diameter_app{alias = Alias, - dictionary = Dict, - module = ModX, - options = [{answer_errors, AE} | _]} - = App, - +send_req(Pkt0, + {TPid, Caps, #diameter_app{dictionary = Dict} = App}, + Opts, + {Pid, Ref}, + SvcName, + Fs) -> Pkt = encode(Dict, Pkt0, Fs), - #options{filter = Filter, - timeout = Timeout} + #options{timeout = Timeout} = Opts, - Req = #request{packet = Pkt0, - from = Caller, + Req = #request{ref = Ref, + caller = Pid, handler = self(), transport = TPid, caps = Caps, - app = Alias, - filter = Filter, - dictionary = Dict, - module = ModX}, + packet = Pkt0}, try TRef = send_request(TPid, Pkt, Req, SvcName, Timeout), - ack(Caller), - handle_answer(SvcName, AE, recv_answer(Timeout, SvcName, {TRef, Req})) + Pid ! Ref, %% tell caller a send has been attempted + handle_answer(SvcName, + App, + recv_answer(Timeout, SvcName, App, Opts, {TRef, Req})) after erase_requests(Pkt) end. -%% Tell caller a send has been attempted. -ack({Pid, Ref}) -> - Pid ! Ref. +%% recv_answer/5 -%% recv_answer/3 - -recv_answer(Timeout, SvcName, {TRef, #request{from = {_, Ref}} = Req}) -> +recv_answer(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} + = Req}) -> %% Matching on TRef below ensures we ignore messages that pertain %% to a previous transport prior to failover. The answer message %% includes the #request{} since it's not necessarily Req; that @@ -1481,38 +1462,38 @@ recv_answer(Timeout, SvcName, {TRef, #request{from = {_, Ref}} = Req}) -> {timeout = Reason, TRef, _} -> %% No timely reply {error, Req, Reason}; {failover, TRef} -> %% Service says peer has gone down - retransmit(Req, find_state(SvcName), Timeout) + retransmit(Req, App, Opts, find_state(SvcName), Timeout) end. %% Note that failover starts a new timer and that expiry of an old %% timer value is ignored. This means that an answer could be accepted %% from a peer after timeout in the case of failover. -retransmit(Req, #state{service_name = SvcName} = S, Timeout) -> - rt(find_transport(Req, S), Req, SvcName, Timeout); +retransmit(Req, App, Opts, #state{service_name = SvcName} = S, Timeout) -> + rt(find_transport(Req, App, Opts, S), Req, Opts, SvcName, Timeout); -retransmit(Req, false, _) -> %% service has gone down +retransmit(Req, _, _, false, _) -> %% service has gone down {error, Req, failover}. -rt({_,_,_} = Transport, Req, SvcName, Timeout) -> +rt({_,_,App} = Transport, Req, Opts, SvcName, Timeout) -> try retransmit(Transport, Req, SvcName, Timeout) of - T -> recv_answer(Timeout, SvcName, T) + T -> recv_answer(Timeout, SvcName, App, Opts, T) catch ?FAILURE(Reason) -> {error, Req, Reason} end; -rt(_, Req, _, _) -> %% no alternate peer +rt(_, Req, _, _, _) -> %% no alternate peer {error, Req, failover}. -%% handle_error/3 +%% handle_error/4 -handle_error(Req, Reason, SvcName) -> - #request{module = ModX, - packet = Pkt, - transport = TPid, - caps = Caps} - = Req, - cb(ModX, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]). +handle_error(App, + #request{packet = Pkt, + transport = TPid, + caps = Caps}, + Reason, + SvcName) -> + cb(App, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]). msg(#diameter_packet{msg = undefined, bin = Bin}) -> Bin; @@ -1581,9 +1562,9 @@ send(Pid, Pkt) -> %% retransmit/4 -retransmit({TPid, Caps, #diameter_app{alias = Alias} = App} +retransmit({TPid, Caps, App} = Transport, - #request{app = Alias, packet = Pkt0} + #request{packet = Pkt0} = Req, SvcName, Timeout) -> @@ -1628,9 +1609,8 @@ retransmit(T, {_, _, App}, _, _, _, _) -> ?ERROR({invalid_return, prepare_retransmit, App, T}). resend_request(Pkt0, - {TPid, Caps, _}, - #request{dictionary = Dict} - = Req0, + {TPid, Caps, #diameter_app{dictionary = Dict}}, + Req0, SvcName, Tmo, Fs) -> @@ -2454,45 +2434,57 @@ find(Pred, [H|T]) -> %% Process an answer message in call-specific process. -handle_answer(SvcName, _, {error, Req, Reason}) -> - handle_error(Req, Reason, SvcName); +handle_answer(SvcName, App, {error, Req, Reason}) -> + handle_error(App, Req, Reason, SvcName); handle_answer(SvcName, - AnswerErrors, - {answer, #request{dictionary = Dict} = Req, Dict0, Pkt}) -> + #diameter_app{dictionary = Dict} + = App, + {answer, Req, Dict0, Pkt}) -> Mod = dict(Dict, Dict0, Pkt), answer(examine(diameter_codec:decode(Mod, Pkt)), SvcName, Mod, - AnswerErrors, + App, Req). %% We don't really need to do a full decode if we're a relay and will %% just resend with a new hop by hop identifier, but might a proxy %% want to examine the answer? -answer(Pkt, SvcName, Dict, AE, #request{transport = TPid} = Req) -> +answer(Pkt, SvcName, Dict, App, #request{transport = TPid} = Req) -> try incr(recv, Pkt, Dict, TPid) of - _ -> a(Pkt, SvcName, AE, Req) + _ -> answer(Pkt, SvcName, App, Req) catch exit: {invalid_error_bit, _} = E -> - a(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req) + answer(Pkt#diameter_packet{errors = [E]}, SvcName, App, Req) end. -a(#diameter_packet{errors = Es} = Pkt, SvcName, AE, #request{transport = TPid, - caps = Caps, - packet = P} - = Req) +answer(Pkt, + SvcName, + #diameter_app{module = ModX, + options = [{answer_errors, AE} | _]}, + Req) -> + a(Pkt, SvcName, ModX, AE, Req). + +a(#diameter_packet{errors = Es} + = Pkt, + SvcName, + ModX, + AE, + #request{transport = TPid, + caps = Caps, + packet = P}) when [] == Es; callback == AE -> - cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]); + cb(ModX, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]); -a(Pkt, SvcName, report, Req) -> +a(Pkt, SvcName, _, report, Req) -> x(errors, handle_answer, [SvcName, Req, Pkt]); -a(Pkt, SvcName, discard, Req) -> +a(Pkt, SvcName, _, discard, Req) -> x({errors, handle_answer, [SvcName, Req, Pkt]}). %% Note that we don't check that the application id in the answer's @@ -2701,7 +2693,7 @@ rpd(Pid, Alias, PDict) -> ?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict). %% --------------------------------------------------------------------------- -%% find_transport/2-3 +%% find_transport/4 %% %% Return: {TransportPid, #diameter_caps{}, #diameter_app{}} %% | false @@ -2709,56 +2701,50 @@ rpd(Pid, Alias, PDict) -> %% --------------------------------------------------------------------------- %% Initial call, from an arbitrary process. -find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) -> - #diameter_service{applications = Apps} = Svc, - ft(find_send_app(Alias, Apps), Msg, Opts, S); +find_transport({alias, Alias}, + Msg, + Opts, + #state{service = #diameter_service{applications = Apps}} + = S) -> + find_transport(find_send_app(Alias, Apps), Msg, Opts, S); %% Relay or proxy send. -find_transport(#diameter_app{} = App, Msg, Opts, S) -> - ft(App, Msg, Opts, S). - -ft(#diameter_app{module = Mod, dictionary = Dict} = App, Msg, Opts, S) -> - #options{filter = Filter, - extra = Xtra} - = Opts, - pick_peer(App#diameter_app{module = Mod ++ Xtra}, +find_transport(#diameter_app{module = ModX, dictionary = Dict} + = App, + Msg, + #options{filter = Filter, + extra = Xtra}, + S) -> + pick_peer(App#diameter_app{module = ModX ++ Xtra}, get_destination(Dict, Msg), Filter, S); -ft(false = No, _, _, _) -> - No. - -%% This can't be used if we're a relay and sending a message -%% in an application not known locally. (TODO) -find_send_app(Alias, Apps) -> - case lists:keyfind(Alias, #diameter_app.alias, Apps) of - #diameter_app{id = ?APP_ID_RELAY} -> - false; - T -> - T - end. %% Retransmission after failover. -find_transport(#request{app = Alias, - filter = Filter, - module = ModX, - packet = #diameter_packet{msg = Msg}, - dictionary = Dict}, - #state{service = #diameter_service{applications = Apps}} - = S) +find_transport(#request{packet = #diameter_packet{msg = Msg}}, + #diameter_app{dictionary = Dict} + = App, + #options{filter = Filter}, + S) when Msg /= undefined -> %% retransmission of binaries is unsupported - #diameter_app{} - = App - = lists:keyfind(Alias, #diameter_app.alias, Apps), - - pick_peer(App#diameter_app{module = ModX}, + pick_peer(App, get_destination(Dict, Msg), Filter, S); -find_transport(_, _) -> +find_transport(_, _, _, _) -> false. - + +find_send_app(Alias, Apps) -> + case find_app(Alias, Apps) of + #diameter_app{id = ?APP_ID_RELAY} -> + false; + #diameter_app{} = A -> + A; + false = No -> + No + end. + %% get_destination/2 get_destination(Dict, Msg) -> @@ -3273,15 +3259,15 @@ mk_app(#diameter_app{} = A) -> info_pending(#state{} = S) -> MatchSpec = [{{'$1', - #request{transport = '$2', - from = '$3', - app = '$4', + #request{caller = '$2', + handler = '$3', + transport = '$4', _ = '_'}, '_'}, [?ORCOND([{'==', T, '$2'} || T <- transports(S)])], - [{{'$1', [{{app, '$4'}}, - {{transport, '$2'}}, - {{from, '$3'}}]}}]}], + [{{'$1', [{{caller, '$2'}}, + {{handler, '$3'}}, + {{transport, '$4'}}]}}]}], try ets:select(?REQUEST_TABLE, MatchSpec) |