aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter')
-rw-r--r--lib/diameter/src/base/diameter_service.erl246
1 files changed, 116 insertions, 130 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index d5a09338cd..5015908582 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -157,15 +157,12 @@
%% Record stored in diameter_request for each outgoing request.
-record(request,
- {from, %% arg 2 of handle_call/3
- handler :: match(pid()), %% request process
- transport :: match(pid()), %% peer process
- caps :: match(#diameter_caps{}),
- app :: match(diameter:app_alias()),%% #diameter_app.alias
- dictionary :: match(module()), %% #diameter_app.dictionary
- module :: match([module() | list()]), %% #diameter_app.module
- filter :: match(diameter:peer_filter()),
- packet :: match(#diameter_packet{})}).
+ {ref :: match(reference()), %% used to receive answer
+ caller :: match(pid()), %% calling process
+ handler :: match(pid()), %% request process
+ transport :: match(pid()), %% peer process
+ caps :: match(#diameter_caps{}), %% of connection
+ packet :: match(#diameter_packet{})}). %% of request
%% Record call/4 options are parsed into.
-record(options,
@@ -176,10 +173,10 @@
%% Term passed back to receive_message/4 with every incoming message.
-record(recvdata,
- {peerT :: ets:tid(),
+ {peerT :: ets:tid(),
service_name :: diameter:service_name(),
- apps :: [#diameter_app{}],
- sequence :: diameter:sequence()}).
+ apps :: [#diameter_app{}],
+ sequence :: diameter:sequence()}).
%% ---------------------------------------------------------------------------
%% # start/1
@@ -268,8 +265,7 @@ recv(true, false, TPid, Pkt, Dict0, RecvData) ->
end;
%% ... answer to known request ...
-recv(false, #request{from = From, handler = Pid} = Req, _, Pkt, Dict0, _) ->
- {_, Ref} = From,
+recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) ->
Pid ! {answer, Ref, Req, Dict0, Pkt};
%% Note that failover could have happened prior to this message being
%% received and triggering failback. That is, both a failover message
@@ -633,17 +629,11 @@ code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) ->
unexpected(F, A, #state{service_name = Name}) ->
?UNEXPECTED(F, A ++ [Name]).
-cb([_|_] = M, F, A) ->
+cb(#diameter_app{module = [_|_] = M}, F, A) ->
eval(M, F, A);
-cb(Rec, F, A) ->
- {_, M} = app(Rec),
+cb([_|_] = M, F, A) ->
eval(M, F, A).
-app(#request{app = A, module = M}) ->
- {A,M};
-app(#diameter_app{alias = A, module = M}) ->
- {A,M}.
-
eval([M|X], F, A) ->
apply(M, F, A ++ X).
@@ -1299,8 +1289,8 @@ cm([_,_|_], _, _, _) ->
%% to the caller. The service process only handles the state-retaining
%% callbacks.
%%
-%% The mod field of the #diameter_app{} here includes any extra
-%% arguments passed to diameter:call/2.
+%% The module field of the #diameter_app{} here includes any extra
+%% arguments passed to diameter:call/4.
send_request({TPid, Caps, App}
= Transport,
@@ -1309,12 +1299,9 @@ send_request({TPid, Caps, App}
Opts,
Caller,
SvcName) ->
- #diameter_app{module = ModX}
- = App,
-
Pkt = make_prepare_packet(Mask, Msg),
- send_req(cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]),
+ send_req(cb(App, prepare_request, [Pkt, SvcName, {TPid, Caps}]),
Pkt,
Transport,
Opts,
@@ -1433,44 +1420,38 @@ fold_record(Rec, R) ->
%% send_req/6
-send_req(Pkt0, {TPid, Caps, App}, Opts, Caller, SvcName, Fs) ->
- #diameter_app{alias = Alias,
- dictionary = Dict,
- module = ModX,
- options = [{answer_errors, AE} | _]}
- = App,
-
+send_req(Pkt0,
+ {TPid, Caps, #diameter_app{dictionary = Dict} = App},
+ Opts,
+ {Pid, Ref},
+ SvcName,
+ Fs) ->
Pkt = encode(Dict, Pkt0, Fs),
- #options{filter = Filter,
- timeout = Timeout}
+ #options{timeout = Timeout}
= Opts,
- Req = #request{packet = Pkt0,
- from = Caller,
+ Req = #request{ref = Ref,
+ caller = Pid,
handler = self(),
transport = TPid,
caps = Caps,
- app = Alias,
- filter = Filter,
- dictionary = Dict,
- module = ModX},
+ packet = Pkt0},
try
TRef = send_request(TPid, Pkt, Req, SvcName, Timeout),
- ack(Caller),
- handle_answer(SvcName, AE, recv_answer(Timeout, SvcName, {TRef, Req}))
+ Pid ! Ref, %% tell caller a send has been attempted
+ handle_answer(SvcName,
+ App,
+ recv_answer(Timeout, SvcName, App, Opts, {TRef, Req}))
after
erase_requests(Pkt)
end.
-%% Tell caller a send has been attempted.
-ack({Pid, Ref}) ->
- Pid ! Ref.
+%% recv_answer/5
-%% recv_answer/3
-
-recv_answer(Timeout, SvcName, {TRef, #request{from = {_, Ref}} = Req}) ->
+recv_answer(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref}
+ = Req}) ->
%% Matching on TRef below ensures we ignore messages that pertain
%% to a previous transport prior to failover. The answer message
%% includes the #request{} since it's not necessarily Req; that
@@ -1481,38 +1462,38 @@ recv_answer(Timeout, SvcName, {TRef, #request{from = {_, Ref}} = Req}) ->
{timeout = Reason, TRef, _} -> %% No timely reply
{error, Req, Reason};
{failover, TRef} -> %% Service says peer has gone down
- retransmit(Req, find_state(SvcName), Timeout)
+ retransmit(Req, App, Opts, find_state(SvcName), Timeout)
end.
%% Note that failover starts a new timer and that expiry of an old
%% timer value is ignored. This means that an answer could be accepted
%% from a peer after timeout in the case of failover.
-retransmit(Req, #state{service_name = SvcName} = S, Timeout) ->
- rt(find_transport(Req, S), Req, SvcName, Timeout);
+retransmit(Req, App, Opts, #state{service_name = SvcName} = S, Timeout) ->
+ rt(find_transport(Req, App, Opts, S), Req, Opts, SvcName, Timeout);
-retransmit(Req, false, _) -> %% service has gone down
+retransmit(Req, _, _, false, _) -> %% service has gone down
{error, Req, failover}.
-rt({_,_,_} = Transport, Req, SvcName, Timeout) ->
+rt({_,_,App} = Transport, Req, Opts, SvcName, Timeout) ->
try retransmit(Transport, Req, SvcName, Timeout) of
- T -> recv_answer(Timeout, SvcName, T)
+ T -> recv_answer(Timeout, SvcName, App, Opts, T)
catch
?FAILURE(Reason) -> {error, Req, Reason}
end;
-rt(_, Req, _, _) -> %% no alternate peer
+rt(_, Req, _, _, _) -> %% no alternate peer
{error, Req, failover}.
-%% handle_error/3
+%% handle_error/4
-handle_error(Req, Reason, SvcName) ->
- #request{module = ModX,
- packet = Pkt,
- transport = TPid,
- caps = Caps}
- = Req,
- cb(ModX, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]).
+handle_error(App,
+ #request{packet = Pkt,
+ transport = TPid,
+ caps = Caps},
+ Reason,
+ SvcName) ->
+ cb(App, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]).
msg(#diameter_packet{msg = undefined, bin = Bin}) ->
Bin;
@@ -1581,9 +1562,9 @@ send(Pid, Pkt) ->
%% retransmit/4
-retransmit({TPid, Caps, #diameter_app{alias = Alias} = App}
+retransmit({TPid, Caps, App}
= Transport,
- #request{app = Alias, packet = Pkt0}
+ #request{packet = Pkt0}
= Req,
SvcName,
Timeout) ->
@@ -1628,9 +1609,8 @@ retransmit(T, {_, _, App}, _, _, _, _) ->
?ERROR({invalid_return, prepare_retransmit, App, T}).
resend_request(Pkt0,
- {TPid, Caps, _},
- #request{dictionary = Dict}
- = Req0,
+ {TPid, Caps, #diameter_app{dictionary = Dict}},
+ Req0,
SvcName,
Tmo,
Fs) ->
@@ -2454,45 +2434,57 @@ find(Pred, [H|T]) ->
%% Process an answer message in call-specific process.
-handle_answer(SvcName, _, {error, Req, Reason}) ->
- handle_error(Req, Reason, SvcName);
+handle_answer(SvcName, App, {error, Req, Reason}) ->
+ handle_error(App, Req, Reason, SvcName);
handle_answer(SvcName,
- AnswerErrors,
- {answer, #request{dictionary = Dict} = Req, Dict0, Pkt}) ->
+ #diameter_app{dictionary = Dict}
+ = App,
+ {answer, Req, Dict0, Pkt}) ->
Mod = dict(Dict, Dict0, Pkt),
answer(examine(diameter_codec:decode(Mod, Pkt)),
SvcName,
Mod,
- AnswerErrors,
+ App,
Req).
%% We don't really need to do a full decode if we're a relay and will
%% just resend with a new hop by hop identifier, but might a proxy
%% want to examine the answer?
-answer(Pkt, SvcName, Dict, AE, #request{transport = TPid} = Req) ->
+answer(Pkt, SvcName, Dict, App, #request{transport = TPid} = Req) ->
try
incr(recv, Pkt, Dict, TPid)
of
- _ -> a(Pkt, SvcName, AE, Req)
+ _ -> answer(Pkt, SvcName, App, Req)
catch
exit: {invalid_error_bit, _} = E ->
- a(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req)
+ answer(Pkt#diameter_packet{errors = [E]}, SvcName, App, Req)
end.
-a(#diameter_packet{errors = Es} = Pkt, SvcName, AE, #request{transport = TPid,
- caps = Caps,
- packet = P}
- = Req)
+answer(Pkt,
+ SvcName,
+ #diameter_app{module = ModX,
+ options = [{answer_errors, AE} | _]},
+ Req) ->
+ a(Pkt, SvcName, ModX, AE, Req).
+
+a(#diameter_packet{errors = Es}
+ = Pkt,
+ SvcName,
+ ModX,
+ AE,
+ #request{transport = TPid,
+ caps = Caps,
+ packet = P})
when [] == Es;
callback == AE ->
- cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]);
+ cb(ModX, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]);
-a(Pkt, SvcName, report, Req) ->
+a(Pkt, SvcName, _, report, Req) ->
x(errors, handle_answer, [SvcName, Req, Pkt]);
-a(Pkt, SvcName, discard, Req) ->
+a(Pkt, SvcName, _, discard, Req) ->
x({errors, handle_answer, [SvcName, Req, Pkt]}).
%% Note that we don't check that the application id in the answer's
@@ -2701,7 +2693,7 @@ rpd(Pid, Alias, PDict) ->
?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict).
%% ---------------------------------------------------------------------------
-%% find_transport/2-3
+%% find_transport/4
%%
%% Return: {TransportPid, #diameter_caps{}, #diameter_app{}}
%% | false
@@ -2709,56 +2701,50 @@ rpd(Pid, Alias, PDict) ->
%% ---------------------------------------------------------------------------
%% Initial call, from an arbitrary process.
-find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) ->
- #diameter_service{applications = Apps} = Svc,
- ft(find_send_app(Alias, Apps), Msg, Opts, S);
+find_transport({alias, Alias},
+ Msg,
+ Opts,
+ #state{service = #diameter_service{applications = Apps}}
+ = S) ->
+ find_transport(find_send_app(Alias, Apps), Msg, Opts, S);
%% Relay or proxy send.
-find_transport(#diameter_app{} = App, Msg, Opts, S) ->
- ft(App, Msg, Opts, S).
-
-ft(#diameter_app{module = Mod, dictionary = Dict} = App, Msg, Opts, S) ->
- #options{filter = Filter,
- extra = Xtra}
- = Opts,
- pick_peer(App#diameter_app{module = Mod ++ Xtra},
+find_transport(#diameter_app{module = ModX, dictionary = Dict}
+ = App,
+ Msg,
+ #options{filter = Filter,
+ extra = Xtra},
+ S) ->
+ pick_peer(App#diameter_app{module = ModX ++ Xtra},
get_destination(Dict, Msg),
Filter,
S);
-ft(false = No, _, _, _) ->
- No.
-
-%% This can't be used if we're a relay and sending a message
-%% in an application not known locally. (TODO)
-find_send_app(Alias, Apps) ->
- case lists:keyfind(Alias, #diameter_app.alias, Apps) of
- #diameter_app{id = ?APP_ID_RELAY} ->
- false;
- T ->
- T
- end.
%% Retransmission after failover.
-find_transport(#request{app = Alias,
- filter = Filter,
- module = ModX,
- packet = #diameter_packet{msg = Msg},
- dictionary = Dict},
- #state{service = #diameter_service{applications = Apps}}
- = S)
+find_transport(#request{packet = #diameter_packet{msg = Msg}},
+ #diameter_app{dictionary = Dict}
+ = App,
+ #options{filter = Filter},
+ S)
when Msg /= undefined -> %% retransmission of binaries is unsupported
- #diameter_app{}
- = App
- = lists:keyfind(Alias, #diameter_app.alias, Apps),
-
- pick_peer(App#diameter_app{module = ModX},
+ pick_peer(App,
get_destination(Dict, Msg),
Filter,
S);
-find_transport(_, _) ->
+find_transport(_, _, _, _) ->
false.
-
+
+find_send_app(Alias, Apps) ->
+ case find_app(Alias, Apps) of
+ #diameter_app{id = ?APP_ID_RELAY} ->
+ false;
+ #diameter_app{} = A ->
+ A;
+ false = No ->
+ No
+ end.
+
%% get_destination/2
get_destination(Dict, Msg) ->
@@ -3273,15 +3259,15 @@ mk_app(#diameter_app{} = A) ->
info_pending(#state{} = S) ->
MatchSpec = [{{'$1',
- #request{transport = '$2',
- from = '$3',
- app = '$4',
+ #request{caller = '$2',
+ handler = '$3',
+ transport = '$4',
_ = '_'},
'_'},
[?ORCOND([{'==', T, '$2'} || T <- transports(S)])],
- [{{'$1', [{{app, '$4'}},
- {{transport, '$2'}},
- {{from, '$3'}}]}}]}],
+ [{{'$1', [{{caller, '$2'}},
+ {{handler, '$3'}},
+ {{transport, '$4'}}]}}]}],
try
ets:select(?REQUEST_TABLE, MatchSpec)