aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base')
-rw-r--r--lib/diameter/src/base/diameter.erl4
-rw-r--r--lib/diameter/src/base/diameter_capx.erl145
-rw-r--r--lib/diameter/src/base/diameter_codec.erl57
-rw-r--r--lib/diameter/src/base/diameter_config.erl49
-rw-r--r--lib/diameter/src/base/diameter_internal.hrl4
-rw-r--r--lib/diameter/src/base/diameter_peer.erl11
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl114
-rw-r--r--lib/diameter/src/base/diameter_service.erl2121
-rw-r--r--lib/diameter/src/base/diameter_stats.erl4
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl1633
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl99
11 files changed, 2241 insertions, 2000 deletions
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl
index 8f9901907a..6be544e950 100644
--- a/lib/diameter/src/base/diameter.erl
+++ b/lib/diameter/src/base/diameter.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -213,7 +213,7 @@ origin_state_id() ->
-> any().
call(SvcName, App, Message, Options) ->
- diameter_service:call(SvcName, {alias, App}, Message, Options).
+ diameter_traffic:send_request(SvcName, {alias, App}, Message, Options).
call(SvcName, App, Message) ->
call(SvcName, App, Message, []).
diff --git a/lib/diameter/src/base/diameter_capx.erl b/lib/diameter/src/base/diameter_capx.erl
index c6c3d2934d..715b15628c 100644
--- a/lib/diameter/src/base/diameter_capx.erl
+++ b/lib/diameter/src/base/diameter_capx.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -47,14 +47,13 @@
-module(diameter_capx).
--export([build_CER/1,
- recv_CER/2,
- recv_CEA/2,
+-export([build_CER/2,
+ recv_CER/3,
+ recv_CEA/3,
make_caps/2]).
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
--include("diameter_gen_base_rfc3588.hrl").
-define(SUCCESS, 2001). %% DIAMETER_SUCCESS
-define(NOAPP, 5010). %% DIAMETER_NO_COMMON_APPLICATION
@@ -67,27 +66,31 @@
-type tried(T) :: {ok, T} | {error, {term(), list()}}.
--spec build_CER(#diameter_caps{})
- -> tried(#diameter_base_CER{}).
+-spec build_CER(#diameter_caps{}, module())
+ -> tried(CER)
+ when CER :: tuple().
-build_CER(Caps) ->
- try_it([fun bCER/1, Caps]).
+build_CER(Caps, Dict) ->
+ try_it([fun bCER/2, Caps, Dict]).
--spec recv_CER(#diameter_base_CER{}, #diameter_service{})
+-spec recv_CER(CER, #diameter_service{}, module())
-> tried({[diameter:'Unsigned32'()],
#diameter_caps{},
- #diameter_base_CEA{}}).
+ CEA})
+ when CER :: tuple(),
+ CEA :: tuple().
-recv_CER(CER, Svc) ->
- try_it([fun rCER/2, CER, Svc]).
+recv_CER(CER, Svc, Dict) ->
+ try_it([fun rCER/3, CER, Svc, Dict]).
--spec recv_CEA(#diameter_base_CEA{}, #diameter_service{})
+-spec recv_CEA(CEA, #diameter_service{}, module())
-> tried({[diameter:'Unsigned32'()],
[diameter:'Unsigned32'()],
- #diameter_caps{}}).
+ #diameter_caps{}})
+ when CEA :: tuple().
-recv_CEA(CEA, Svc) ->
- try_it([fun rCEA/2, CEA, Svc]).
+recv_CEA(CEA, Svc, Dict) ->
+ try_it([fun rCEA/3, CEA, Svc, Dict]).
make_caps(Caps, Opts) ->
try_it([fun mk_caps/2, Caps, Opts]).
@@ -161,16 +164,17 @@ ipaddr(A) ->
?THROW(T)
end.
-%% bCER/1
+%% bCER/2
%%
%% Build a CER record to send to a remote peer.
%% Use the fact that diameter_caps has the same field names as CER.
-bCER(#diameter_caps{} = Rec) ->
- #diameter_base_CER{}
- = list_to_tuple([diameter_base_CER | tl(tuple_to_list(Rec))]).
+bCER(#diameter_caps{} = Rec, Dict) ->
+ Values = lists:zip(Dict:'#info-'(diameter_base_CER, fields),
+ tl(tuple_to_list(Rec))),
+ Dict:'#new-'(diameter_base_CER, Values).
-%% rCER/2
+%% rCER/3
%%
%% Build a CEA record to send to a remote peer in response to an
%% incoming CER. RFC 3588 gives no guidance on what should be sent
@@ -214,12 +218,9 @@ bCER(#diameter_caps{} = Rec) ->
%% TLS 1
%% This node supports TLS security, as defined by [TLS].
-rCER(CER, #diameter_service{capabilities = LCaps} = Svc) ->
- #diameter_base_CEA{}
- = CEA
- = cea_from_cer(bCER(LCaps)),
-
- RCaps = capx_to_caps(CER),
+rCER(CER, #diameter_service{capabilities = LCaps} = Svc, Dict) ->
+ CEA = cea_from_cer(bCER(LCaps, Dict), Dict),
+ RCaps = capx_to_caps(CER, Dict),
SApps = common_applications(LCaps, RCaps, Svc),
{SApps,
@@ -227,17 +228,18 @@ rCER(CER, #diameter_service{capabilities = LCaps} = Svc) ->
build_CEA(SApps,
LCaps,
RCaps,
- CEA#diameter_base_CEA{'Result-Code' = ?SUCCESS})}.
+ Dict,
+ Dict:'#set-'({'Result-Code', ?SUCCESS}, CEA))}.
-build_CEA([], _, _, CEA) ->
- CEA#diameter_base_CEA{'Result-Code' = ?NOAPP};
+build_CEA([], _, _, Dict, CEA) ->
+ Dict:'#set-'({'Result-Code', ?NOAPP}, CEA);
-build_CEA(_, LCaps, RCaps, CEA) ->
+build_CEA(_, LCaps, RCaps, Dict, CEA) ->
case common_security(LCaps, RCaps) of
[] ->
- CEA#diameter_base_CEA{'Result-Code' = ?NOSECURITY};
+ Dict:'#set-'({'Result-Code', ?NOSECURITY}, CEA);
[_] = IS ->
- CEA#diameter_base_CEA{'Inband-Security-Id' = IS}
+ Dict:'#set-'({'Inband-Security-Id', IS}, CEA)
end.
%% common_security/2
@@ -275,46 +277,41 @@ cs(LS, RS) ->
%% practice something there may be a need for more synchronization
%% than notification by way of an event subscription offers.
-%% cea_from_cer/1
+%% cea_from_cer/2
%% CER is a subset of CEA, the latter adding Result-Code and a few
%% more AVP's.
-cea_from_cer(#diameter_base_CER{} = CER) ->
- lists:foldl(fun(F,A) -> to_cea(CER, F, A) end,
- #diameter_base_CEA{},
- record_info(fields, diameter_base_CER)).
-
-to_cea(CER, Field, CEA) ->
- try ?BASE:'#get-'(Field, CER) of
- V -> ?BASE:'#set-'({Field, V}, CEA)
- catch
- error: _ -> CEA
- end.
-
-%% rCEA/2
+cea_from_cer(CER, Dict) ->
+ [diameter_base_CER | Values] = Dict:'#get-'(CER),
+ Dict:'#set-'(Values, Dict:'#new-'(diameter_base_CEA)).
-rCEA(CEA, #diameter_service{capabilities = LCaps} = Svc) ->
- RCaps = capx_to_caps(CEA),
+%% rCEA/3
+
+rCEA(CEA, #diameter_service{capabilities = LCaps} = Svc, Dict) ->
+ RCaps = capx_to_caps(CEA, Dict),
SApps = common_applications(LCaps, RCaps, Svc),
IS = common_security(LCaps, RCaps),
{SApps, IS, RCaps}.
-%% capx_to_caps/1
-
-capx_to_caps(#diameter_base_CEA{'Origin-Host' = OH,
- 'Origin-Realm' = OR,
- 'Host-IP-Address' = IP,
- 'Vendor-Id' = VId,
- 'Product-Name' = PN,
- 'Origin-State-Id' = OSI,
- 'Supported-Vendor-Id' = SV,
- 'Auth-Application-Id' = Auth,
- 'Inband-Security-Id' = IS,
- 'Acct-Application-Id' = Acct,
- 'Vendor-Specific-Application-Id' = VSA,
- 'Firmware-Revision' = FR,
- 'AVP' = X}) ->
+%% capx_to_caps/2
+
+capx_to_caps(CEX, Dict) ->
+ [OH, OR, IP, VId, PN, OSI, SV, Auth, IS, Acct, VSA, FR, X]
+ = Dict:'#get-'(['Origin-Host',
+ 'Origin-Realm',
+ 'Host-IP-Address',
+ 'Vendor-Id',
+ 'Product-Name',
+ 'Origin-State-Id',
+ 'Supported-Vendor-Id',
+ 'Auth-Application-Id',
+ 'Inband-Security-Id',
+ 'Acct-Application-Id',
+ 'Vendor-Specific-Application-Id',
+ 'Firmware-Revision',
+ 'AVP'],
+ CEX),
#diameter_caps{origin_host = OH,
origin_realm = OR,
vendor_id = VId,
@@ -327,10 +324,7 @@ capx_to_caps(#diameter_base_CEA{'Origin-Host' = OH,
acct_application_id = Acct,
vendor_specific_application_id = VSA,
firmware_revision = FR,
- avp = X};
-
-capx_to_caps(#diameter_base_CER{} = CER) ->
- capx_to_caps(cea_from_cer(CER)).
+ avp = X}.
%% ---------------------------------------------------------------------------
%% ---------------------------------------------------------------------------
@@ -365,13 +359,12 @@ app_union(#diameter_caps{auth_application_id = U,
vendor_specific_application_id = V}) ->
set_list(U ++ C ++ lists:flatmap(fun vsa_apps/1, V)).
-vsa_apps(#'diameter_base_Vendor-Specific-Application-Id'
- {'Auth-Application-Id' = U,
- 'Acct-Application-Id' = C}) ->
- U ++ C;
-vsa_apps(L) ->
- Rec = ?BASE:'#new-'('diameter_base_Vendor-Specific-Application-Id', L),
- vsa_apps(Rec).
+vsa_apps([_ | [_,_] = Ids]) ->
+ lists:append(Ids);
+vsa_apps(Rec)
+ when is_tuple(Rec) ->
+ [_|T] = tuple_to_list(Rec),
+ vsa_apps(T).
%% It's a configuration error for a locally advertised application not
%% to be represented in Apps. Don't just match on lists:keyfind/3 in
diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl
index 0b0bfe3f0a..e446a0209c 100644
--- a/lib/diameter/src/base/diameter_codec.erl
+++ b/lib/diameter/src/base/diameter_codec.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -26,7 +26,7 @@
decode_header/1,
sequence_numbers/1,
hop_by_hop_id/2,
- msg_name/1,
+ msg_name/2,
msg_id/1]).
%% Towards generated encoders (from diameter_gen.hrl).
@@ -99,13 +99,13 @@ e(_, #diameter_packet{msg = [#diameter_header{} = Hdr | As]} = Pkt) ->
Eid:32,
Avps/binary>>};
-e(Mod0, #diameter_packet{header = Hdr, msg = Msg} = Pkt) ->
+e(Mod, #diameter_packet{header = Hdr, msg = Msg} = Pkt) ->
#diameter_header{version = Vsn,
hop_by_hop_id = Hid,
end_to_end_id = Eid}
= Hdr,
- {Mod, MsgName} = rec2msg(Mod0, Msg),
+ MsgName = rec2msg(Mod, Msg),
{Code, Flags0, Aid} = msg_header(Mod, MsgName, Hdr),
Flags = make_flags(Flags0, Hdr),
@@ -192,11 +192,11 @@ encode_avps(Avps) ->
%% msg_header/3
msg_header(Mod, 'answer-message' = MsgName, Header) ->
- ?BASE = Mod,
+ 0 = Mod:id(), %% assert
#diameter_header{application_id = Aid,
cmd_code = Code}
= Header,
- {-1, Flags, ?DIAMETER_APP_ID_COMMON} = ?BASE:msg_header(MsgName),
+ {-1, Flags, ?DIAMETER_APP_ID_COMMON} = Mod:msg_header(MsgName),
{Code, Flags, Aid};
msg_header(Mod, MsgName, _) ->
@@ -204,22 +204,12 @@ msg_header(Mod, MsgName, _) ->
%% rec2msg/2
-rec2msg(_, ['answer-message' = M | _]) ->
- {?BASE, M};
-
-rec2msg(Mod, [MsgName|_])
- when is_atom(MsgName) ->
- {Mod, MsgName};
+rec2msg(_, [Name|_])
+ when is_atom(Name) ->
+ Name;
rec2msg(Mod, Rec) ->
- R = element(1, Rec),
- A = 'answer-message',
- case ?BASE:msg2rec(A) of
- R ->
- {?BASE, A};
- _ ->
- {Mod, Mod:rec2msg(R)}
- end.
+ Mod:rec2msg(element(1, Rec)).
%%% ---------------------------------------------------------------------------
%%% # decode/2
@@ -243,20 +233,19 @@ decode(?APP_ID_RELAY, _, #diameter_packet{} = Pkt) ->
end;
%% Otherwise decode using the dictionary.
-decode(_, Mod, #diameter_packet{header = Hdr} = Pkt)
- when is_atom(Mod) ->
+decode(_, Mod, #diameter_packet{header = Hdr} = Pkt) ->
#diameter_header{cmd_code = CmdCode,
is_request = IsRequest,
is_error = IsError}
= Hdr,
- {M, MsgName} = if IsError andalso not IsRequest ->
- {?BASE, 'answer-message'};
- true ->
- {Mod, Mod:msg_name(CmdCode, IsRequest)}
- end,
+ MsgName = if IsError andalso not IsRequest ->
+ 'answer-message';
+ true ->
+ Mod:msg_name(CmdCode, IsRequest)
+ end,
- decode_avps(MsgName, M, Pkt, collect_avps(Pkt));
+ decode_avps(MsgName, Mod, Pkt, collect_avps(Pkt));
decode(Id, Mod, Bin)
when is_bitstring(Bin) ->
@@ -360,15 +349,15 @@ hop_by_hop_id(Id, <<H:12/binary, _:32, T/binary>>) ->
<<H/binary, Id:32, T/binary>>.
%%% ---------------------------------------------------------------------------
-%%% # msg_name/1
+%%% # msg_name/2
%%% ---------------------------------------------------------------------------
-msg_name(#diameter_header{application_id = ?APP_ID_COMMON,
- cmd_code = C,
- is_request = R}) ->
- ?BASE:msg_name(C,R);
+msg_name(Dict0, #diameter_header{application_id = ?APP_ID_COMMON,
+ cmd_code = C,
+ is_request = R}) ->
+ Dict0:msg_name(C,R);
-msg_name(Hdr) ->
+msg_name(_, Hdr) ->
msg_id(Hdr).
%% Note that messages in different applications could have the same
diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl
index 63d28f25a2..1486071573 100644
--- a/lib/diameter/src/base/diameter_config.erl
+++ b/lib/diameter/src/base/diameter_config.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -113,15 +113,22 @@
-define(VALUES(Rec), tl(tuple_to_list(Rec))).
+%% The RFC 3588 common dictionary is used to validate capabilities
+%% configuration. That a given transport may use the RFC 6733
+%% dictionary is of no consequence.
+-define(BASE, diameter_gen_base_rfc3588).
+
%%% The return values below assume the server diameter_config is started.
%%% The functions will exit if it isn't.
%% --------------------------------------------------------------------------
-%% # start_service(SvcName, Opts)
-%%
-%% Output: ok | {error, Reason}
+%% # start_service/2
%% --------------------------------------------------------------------------
+-spec start_service(diameter:service_name(), [diameter:service_opt()])
+ -> ok
+ | {error, term()}.
+
start_service(SvcName, Opts)
when is_list(Opts) ->
start_rc(sync(SvcName, {start_service, SvcName, Opts})).
@@ -134,22 +141,23 @@ start_rc(timeout) ->
{error, application_not_started}.
%% --------------------------------------------------------------------------
-%% # stop_service(SvcName)
-%%
-%% Output: ok
+%% # stop_service/1
%% --------------------------------------------------------------------------
+-spec stop_service(diameter:service_name())
+ -> ok.
+
stop_service(SvcName) ->
sync(SvcName, {stop_service, SvcName}).
%% --------------------------------------------------------------------------
-%% # add_transport(SvcName, {Type, Opts})
-%%
-%% Input: Type = connect | listen
-%%
-%% Output: {ok, Ref} | {error, Reason}
+%% # add_transport/2
%% --------------------------------------------------------------------------
+-spec add_transport(diameter:service_name(), {connect|listen, [diameter:transport_opt()]})
+ -> {ok, diameter:transport_ref()}
+ | {error, term()}.
+
add_transport(SvcName, {T, Opts})
when is_list(Opts), (T == connect orelse T == listen) ->
sync(SvcName, {add, SvcName, T, Opts}).
@@ -171,6 +179,10 @@ add_transport(SvcName, {T, Opts})
%% Output: ok | {error, Reason}
%% --------------------------------------------------------------------------
+-spec remove_transport(diameter:service_name(), diameter:transport_pred())
+ -> ok
+ | {error, term()}.
+
remove_transport(SvcName, Pred) ->
try
sync(SvcName, {remove, SvcName, pred(Pred)})
@@ -473,6 +485,10 @@ stop(SvcName) ->
%% add/3
+%% Can't check for a single common dictionary since a transport may
+%% restrict applications so that that there's one while the service
+%% has many.
+
add(SvcName, Type, Opts) ->
%% Ensure usable capabilities. diameter_service:merge_service/2
%% depends on this.
@@ -545,7 +561,7 @@ make_config(SvcName, Opts) ->
[] == Apps andalso ?THROW(no_apps),
%% Use the fact that diameter_caps has the same field names as CER.
- Fields = diameter_gen_base_rfc3588:'#info-'(diameter_base_CER) -- ['AVP'],
+ Fields = ?BASE:'#info-'(diameter_base_CER) -- ['AVP'],
COpts = [T || {K,_} = T <- Opts, lists:member(K, Fields)],
Caps = make_caps(#diameter_caps{}, COpts),
@@ -608,14 +624,14 @@ opt(sequence = K, F) ->
E:R ->
?THROW({value, {K, E, R, ?STACK}})
end;
-
+
opt(K, _) ->
?THROW({value, K}).
sequence({H,N} = T)
when 0 =< N, N =< 32, 0 =< H, 0 == H bsr N ->
T;
-
+
sequence(_) ->
?THROW({value, sequence}).
@@ -629,7 +645,8 @@ make_caps(Caps, Opts) ->
%% Validate types by encoding a CER.
encode_CER(Opts) ->
- {ok, CER} = diameter_capx:build_CER(make_caps(?EXAMPLE_CAPS, Opts)),
+ {ok, CER} = diameter_capx:build_CER(make_caps(?EXAMPLE_CAPS, Opts),
+ ?BASE),
Hdr = #diameter_header{version = ?DIAMETER_VERSION,
end_to_end_id = 0,
diff --git a/lib/diameter/src/base/diameter_internal.hrl b/lib/diameter/src/base/diameter_internal.hrl
index 63b35550a8..4b672aa071 100644
--- a/lib/diameter/src/base/diameter_internal.hrl
+++ b/lib/diameter/src/base/diameter_internal.hrl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -58,8 +58,6 @@
-define(APP_ID_COMMON, 0).
-define(APP_ID_RELAY, 16#FFFFFFFF).
--define(BASE, diameter_gen_base_rfc3588).
-
%%% ---------------------------------------------------------
%%% RFC 3588, ch 2.6 Peer table
diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl
index 25c9eab4cb..130bedda84 100644
--- a/lib/diameter/src/base/diameter_peer.erl
+++ b/lib/diameter/src/base/diameter_peer.erl
@@ -123,7 +123,7 @@ pair([_ | Rest], Mods, Acc) ->
pair(Rest, Mods, Acc);
%% No transport_module or transport_config: defaults.
-pair([], [], []) ->
+pair([], [], []) ->
[{[?DEFAULT_TMOD], ?DEFAULT_TCFG, ?DEFAULT_TTMO}];
%% One transport_module, one transport_config.
@@ -272,7 +272,7 @@ handle_cast(Msg, State) ->
%% Remote service is distributing a message.
handle_info({notify, SvcName, T}, S) ->
- bang(diameter_service:whois(SvcName), T),
+ diameter_service:notify(SvcName, T),
{noreply, S};
handle_info(Info, State) ->
@@ -304,13 +304,6 @@ code_change(_OldVsn, State, _Extra) ->
ifc_send(Pid, T) ->
Pid ! {diameter, T}.
-%% bang/2
-
-bang(undefined = No, _) ->
- No;
-bang(Pid, T) ->
- Pid ! T.
-
%% call/1
call(Request) ->
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index de341741db..ad26f230ef 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -46,16 +46,19 @@
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
--include("diameter_gen_base_rfc3588.hrl").
%% Values of Disconnect-Cause in DPR.
--define(GOAWAY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_DO_NOT_WANT_TO_TALK_TO_YOU').
--define(REBOOT, ?'DIAMETER_BASE_DISCONNECT-CAUSE_REBOOTING').
--define(BUSY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_BUSY').
+-define(GOAWAY, 2). %% DO_NOT_WANT_TO_TALK_TO_YOU
+-define(BUSY, 1). %% BUSY
+-define(REBOOT, 0). %% REBOOTING
+%% Values of Inband-Security-Id.
-define(NO_INBAND_SECURITY, 0).
-define(TLS, 1).
+%% Note that the a common dictionary hrl is purposely not included
+%% since the common dictionary is an argument to start/3.
+
%% Keys in process dictionary.
-define(CB_KEY, cb). %% capabilities callback
-define(DPR_KEY, dpr). %% disconnect callback
@@ -100,8 +103,9 @@
| {'Wait-CEA', uint32(), uint32()}
| 'Open',
mode :: accept | connect | {connect, reference()},
- parent :: pid(), %% watchdog process
- transport :: pid(), %% transport process
+ parent :: pid(), %% watchdog process
+ transport :: pid(), %% transport process
+ dictionary :: module(), %% common dictionary
service :: #diameter_service{},
dpr = false :: false | {uint32(), uint32()}}).
%% | hop by hop and end to end identifiers
@@ -134,7 +138,8 @@
%% ---------------------------------------------------------------------------
-spec start(T, [Opt], {diameter:sequence(),
- diameter:restriction(),
+ [node()],
+ module(),
#diameter_service{}})
-> {reference(), pid()}
when T :: {connect|accept, diameter:transport_ref()},
@@ -173,6 +178,7 @@ init(T) ->
i({Ack, WPid, {M, Ref} = T, Opts, {Mask,
Nodes,
+ Dict0,
#diameter_service{capabilities = LCaps}
= Svc}}) ->
erlang:monitor(process, WPid),
@@ -191,6 +197,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {Mask,
#state{state = {'Wait-Conn-Ack', Tmo},
parent = WPid,
transport = TPid,
+ dictionary = Dict0,
mode = M,
service = svc(Svc, Addrs)}.
%% The transport returns its local ip addresses so that different
@@ -199,8 +206,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {Mask,
%% sending capabilities exchange messages.
%%
%% Invalid transport config may cause us to crash but note that the
-%% watchdog start (start/2) succeeds regardless so as not to crash the
-%% service.
+%% watchdog start (start/2) succeeds regardless.
%% Wait for the caller to have a monitor to avoid a race with our
%% death. (Since the exit reason is used in diameter_service.)
@@ -455,7 +461,8 @@ start_next(#state{service = Svc0} = S) ->
send_CER(#state{state = {'Wait-Conn-Ack', Tmo},
mode = {connect, Remote},
service = #diameter_service{capabilities = LCaps},
- transport = TPid}
+ transport = TPid,
+ dictionary = Dict}
= S) ->
OH = LCaps#diameter_caps.origin_host,
req_send_CER(OH, Remote)
@@ -466,7 +473,7 @@ send_CER(#state{state = {'Wait-Conn-Ack', Tmo},
#diameter_packet{header = #diameter_header{end_to_end_id = Eid,
hop_by_hop_id = Hid}}
= Pkt
- = encode(CER),
+ = encode(CER, Dict),
send(TPid, Pkt),
start_timer(Tmo, S#state{state = {'Wait-CEA', Hid, Eid}}).
@@ -488,19 +495,20 @@ start_timer(Tmo, #state{state = PS} = S) ->
%% build_CER/1
-build_CER(#state{service = #diameter_service{capabilities = LCaps}}) ->
- {ok, CER} = diameter_capx:build_CER(LCaps),
+build_CER(#state{service = #diameter_service{capabilities = LCaps},
+ dictionary = Dict}) ->
+ {ok, CER} = diameter_capx:build_CER(LCaps, Dict),
CER.
-%% encode/1
+%% encode/2
-encode(Rec) ->
+encode(Rec, Dict) ->
Seq = diameter_session:sequence({_,_} = getr(?SEQUENCE_KEY)),
Hdr = #diameter_header{version = ?DIAMETER_VERSION,
end_to_end_id = Seq,
hop_by_hop_id = Seq},
- diameter_codec:encode(?BASE, #diameter_packet{header = Hdr,
- msg = Rec}).
+ diameter_codec:encode(Dict, #diameter_packet{header = Hdr,
+ msg = Rec}).
%% recv/2
@@ -521,9 +529,10 @@ recv(#diameter_packet{header = #diameter_header{length = Len}
recv(#diameter_packet{header = #diameter_header{} = Hdr}
= Pkt,
- #state{parent = Pid}
+ #state{parent = Pid,
+ dictionary = Dict0}
= S) ->
- Name = diameter_codec:msg_name(Hdr),
+ Name = diameter_codec:msg_name(Dict0, Hdr),
Pid ! {recv, self(), Name, Pkt},
diameter_stats:incr({msg_id(Name, Hdr), recv}), %% count received
rcv(Name, Pkt, S);
@@ -608,13 +617,13 @@ send(Pid, Msg) ->
%% handle_request/3
-handle_request(Type, #diameter_packet{} = Pkt, S) ->
+handle_request(Type, #diameter_packet{} = Pkt, #state{dictionary = D} = S) ->
?LOG(recv, Type),
- send_answer(Type, diameter_codec:decode(?BASE, Pkt), S).
+ send_answer(Type, diameter_codec:decode(D, Pkt), S).
%% send_answer/3
-send_answer(Type, ReqPkt, #state{transport = TPid} = S) ->
+send_answer(Type, ReqPkt, #state{transport = TPid, dictionary = Dict} = S) ->
#diameter_packet{header = H,
transport_data = TD}
= ReqPkt,
@@ -631,13 +640,15 @@ send_answer(Type, ReqPkt, #state{transport = TPid} = S) ->
msg = Msg,
transport_data = TD},
- send(TPid, diameter_codec:encode(?BASE, Pkt)),
+ send(TPid, diameter_codec:encode(Dict, Pkt)),
eval(PostF, S).
eval([F|A], S) ->
apply(F, A ++ [S]);
eval(ok, S) ->
- S.
+ S;
+eval(T, _) ->
+ close(T).
%% build_answer/3
@@ -648,11 +659,11 @@ build_answer('CER',
is_error = false},
errors = []}
= Pkt,
- S) ->
- {SupportedApps, RCaps, #diameter_base_CEA{'Result-Code' = RC,
- 'Inband-Security-Id' = IS}
- = CEA}
- = recv_CER(CER, S),
+ #state{dictionary = Dict0}
+ = S) ->
+ {SupportedApps, RCaps, CEA} = recv_CER(CER, S),
+
+ [RC, IS] = Dict0:'#get-'(['Result-Code', 'Inband-Security-Id'], CEA),
#diameter_caps{origin_host = {OH, DH}}
= Caps
@@ -665,10 +676,10 @@ build_answer('CER',
orelse ?THROW(4003), %% DIAMETER_ELECTION_LOST
caps_cb(Caps)
of
- N -> {cea(CEA, N), [fun open/5, Pkt,
- SupportedApps,
- Caps,
- {accept, hd([_] = IS)}]}
+ N -> {cea(CEA, N, Dict0), [fun open/5, Pkt,
+ SupportedApps,
+ Caps,
+ {accept, hd([_] = IS)}]}
catch
?FAILURE(Reason) ->
rejected(Reason, {'CER', Reason, Caps, Pkt}, S)
@@ -685,15 +696,15 @@ build_answer(Type,
RC = rc(H, Es),
{answer(Type, RC, Es, S), post(Type, RC, Pkt, S)}.
-cea(CEA, ok) ->
+cea(CEA, ok, _) ->
CEA;
-cea(CEA, 2001) ->
+cea(CEA, 2001, _) ->
CEA;
-cea(CEA, RC) ->
- CEA#diameter_base_CEA{'Result-Code' = RC}.
+cea(CEA, RC, Dict0) ->
+ Dict0:'#set-'({'Result-Code', RC}, CEA).
post('CER' = T, RC, Pkt, S) ->
- [fun(_) -> close({T, caps(S), {RC, Pkt}}) end];
+ {T, caps(S), {RC, Pkt}};
post(_, _, _, _) ->
ok.
@@ -703,7 +714,7 @@ rejected({capabilities_cb, _F, Reason}, T, S) ->
rejected(discard, T, _) ->
close(T);
rejected({N, Es}, T, S) ->
- {answer('CER', N, Es, S), [fun(_) -> close(T) end]};
+ {answer('CER', N, Es, S), T};
rejected(N, T, S) ->
rejected({N, []}, T, S).
@@ -729,7 +740,7 @@ is_origin({N, _}) ->
orelse N == 'Origin-State-Id'.
%% failed_avp/1
-
+
failed_avp([] = No) ->
No;
failed_avp(Avps) ->
@@ -818,22 +829,23 @@ a('DPR', #diameter_caps{origin_host = {Host, _},
%% recv_CER/2
-recv_CER(CER, #state{service = Svc}) ->
- {ok, T} = diameter_capx:recv_CER(CER, Svc),
+recv_CER(CER, #state{service = Svc, dictionary = Dict}) ->
+ {ok, T} = diameter_capx:recv_CER(CER, Svc, Dict),
T.
%% handle_CEA/1
handle_CEA(#diameter_packet{bin = Bin}
= Pkt,
- #state{service = #diameter_service{capabilities = LCaps}}
+ #state{dictionary = Dict0,
+ service = #diameter_service{capabilities = LCaps}}
= S)
when is_binary(Bin) ->
?LOG(recv, 'CEA'),
#diameter_packet{msg = CEA}
= DPkt
- = diameter_codec:decode(?BASE, Pkt),
+ = diameter_codec:decode(Dict0, Pkt),
{SApps, IS, RCaps} = recv_CEA(DPkt, S),
@@ -841,8 +853,7 @@ handle_CEA(#diameter_packet{bin = Bin}
= Caps
= capz(LCaps, RCaps),
- #diameter_base_CEA{'Result-Code' = RC}
- = CEA,
+ RC = Dict0:'#get-'('Result-Code', CEA),
%% Ensure that we don't already have a connection to the peer in
%% question. This isn't the peer election of 3588 except in the
@@ -878,8 +889,9 @@ recv_CEA(#diameter_packet{header = #diameter_header{version
is_error = false},
msg = CEA,
errors = []},
- #state{service = Svc}) ->
- {ok, T} = diameter_capx:recv_CEA(CEA, Svc),
+ #state{service = Svc,
+ dictionary = Dict}) ->
+ {ok, T} = diameter_capx:recv_CEA(CEA, Svc, Dict),
T;
recv_CEA(Pkt, S) ->
@@ -1022,13 +1034,14 @@ dpr([CB|Rest], [Reason | _] = Args, S) ->
diameter_lib:error_report(failure, No),
{stop, No}
end;
-
+
dpr([], [Reason | _], S) ->
send_dpr(Reason, [], S).
-record(opts, {cause, timeout = ?DPA_TIMEOUT}).
send_dpr(Reason, Opts, #state{transport = TPid,
+ dictionary = Dict,
service = #diameter_service{capabilities = Caps}}
= S) ->
#opts{cause = Cause, timeout = Tmo}
@@ -1048,7 +1061,8 @@ send_dpr(Reason, Opts, #state{transport = TPid,
= Pkt
= encode(['DPR', {'Origin-Host', OH},
{'Origin-Realm', OR},
- {'Disconnect-Cause', Cause}]),
+ {'Disconnect-Cause', Cause}],
+ Dict),
send(TPid, Pkt),
dpa_timer(Tmo),
?LOG(send, 'DPR'),
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index 3cab914fdb..f1342df16c 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -24,33 +24,38 @@
-module(diameter_service).
-behaviour(gen_server).
+%% towards diameter_service_sup
+-export([start_link/1]).
+
+%% towards diameter
+-export([subscribe/1,
+ unsubscribe/1,
+ services/0,
+ info/2]).
+
+%% towards diameter_config
-export([start/1,
stop/1,
start_transport/2,
- stop_transport/2,
- info/2,
- call/4]).
+ stop_transport/2]).
-%% towards diameter_watchdog
--export([receive_message/3]).
+%% towards diameter_peer
+-export([notify/2]).
-%% service supervisor
--export([start_link/1]).
+%% towards diameter_traffic
+-export([find_incoming_app/4,
+ pick_peer/3]).
--export([subscribe/1,
- unsubscribe/1,
+%% test/debug
+-export([services/1,
subscriptions/1,
subscriptions/0,
- services/0,
- services/1,
- whois/1]).
-
-%% test/debug
--export([call_module/3,
+ call_module/3,
+ whois/1,
state/1,
uptime/1]).
-%%% gen_server callbacks
+%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
@@ -58,9 +63,6 @@
terminate/2,
code_change/3]).
-%% Other callbacks.
--export([send/1]).
-
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
@@ -78,11 +80,8 @@
| ?WD_REOPEN.
-define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1
--define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests
-define(RESTART_TC, 1000). %% if restart was this recent
--define(RELAY, ?DIAMETER_DICT_RELAY).
-
%% Used to be able to swap this with anything else dict-like but now
%% rely on the fact that a service's #state{} record does not change
%% in storing in it ?STATE table and not always going through the
@@ -90,10 +89,6 @@
%% a ?Dict don't change the handle to it.
-define(Dict, diameter_dict).
-%% Table containing outgoing requests for which a reply has yet to be
-%% received.
--define(REQUEST_TABLE, diameter_request).
-
%% Maintains state in a table. In contrast to previously, a service's
%% stat is not constant and is accessed outside of the service
%% process.
@@ -107,31 +102,33 @@
%% Workaround for dialyzer's lack of understanding of match specs.
-type match(T)
- :: T | '_' | '$1' | '$2' | '$3' | '$4'.
-
-%% State of service gen_server.
+ :: T | '_' | '$1' | '$2'.
+
+%% State of service gen_server. Note that the state term itself
+%% doesn't change, which is relevant for the stateless application
+%% callbacks since the state is retrieved from ?STATE_TABLE from
+%% outside the service process. The pid in the service record is used
+%% to determine whether or not we need to call the process for a
+%% pick_peer callback in the statefull case.
-record(state,
{id = now(),
- service_name, %% as passed to start_service/2, key in ?STATE_TABLE
+ service_name :: diameter:service_name(), %% key in ?STATE_TABLE
service :: #diameter_service{},
watchdogT = ets_new(watchdogs) %% #watchdog{} at start
:: ets:tid(),
peerT = ets_new(peers) %% #peer{pid = TPid} at okay/reopen
:: ets:tid(),
- shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
- local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
+ shared_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...]
+ :: ets:tid(),
+ local_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...]
+ :: ets:tid(),
monitor = false :: false | pid(), %% process to die with
options
:: [{sequence, diameter:sequence()} %% sequence mask
| {restrict_connections, diameter:restriction()}
| {share_peers, boolean()} %% broadcast peers to remote nodes?
| {use_shared_peers, boolean()}]}).%% use broadcasted peers?
-%% shared_peers reflects the peers broadcast from remote nodes. Note
-%% that the state term itself doesn't change, which is relevant for
-%% the stateless application callbacks since the state is retrieved
-%% from ?STATE_TABLE from outside the service process. The pid in the
-%% service record is used to determine whether or not we need to call
-%% the process for a pick_peer callback.
+%% shared_peers reflects the peers broadcast from remote nodes.
%% Record representing an RFC 3539 watchdog process implemented by
%% diameter_watchdog.
@@ -154,28 +151,9 @@
started = now(), %% at process start
watchdog :: pid()}). %% key into watchdogT
-%% Record stored in diameter_request for each outgoing request.
--record(request,
- {from, %% arg 2 of handle_call/3
- handler :: match(pid()), %% request process
- transport :: match(pid()), %% peer process
- caps :: match(#diameter_caps{}),
- app :: match(diameter:app_alias()),%% #diameter_app.alias
- dictionary :: match(module()), %% #diameter_app.dictionary
- module :: match([module() | list()]), %% #diameter_app.module
- filter :: match(diameter:peer_filter()),
- packet :: match(#diameter_packet{})}).
-
-%% Record call/4 options are parsed into.
--record(options,
- {filter = none :: diameter:peer_filter(),
- extra = [] :: list(),
- timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF,
- detach = false :: boolean()}).
-
-%%% ---------------------------------------------------------------------------
-%%% # start(SvcName)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # start/1
+%% ---------------------------------------------------------------------------
start(SvcName) ->
diameter_service_sup:start_child(SvcName).
@@ -186,9 +164,9 @@ start_link(SvcName) ->
%% Put the arbitrary term SvcName in a list in case we ever want to
%% send more than this and need to distinguish old from new.
-%%% ---------------------------------------------------------------------------
-%%% # stop(SvcName)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # stop/1
+%% ---------------------------------------------------------------------------
stop(SvcName) ->
case whois(SvcName) of
@@ -204,180 +182,43 @@ stop(ok, Pid) ->
stop(No, _) ->
No.
-%%% ---------------------------------------------------------------------------
-%%% # start_transport(SvcName, {Ref, Type, Opts})
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # start_transport/3
+%% ---------------------------------------------------------------------------
-start_transport(SvcName, {_,_,_} = T) ->
+start_transport(SvcName, {_Ref, _Type, _Opts} = T) ->
call_service_by_name(SvcName, {start, T}).
-%%% ---------------------------------------------------------------------------
-%%% # stop_transport(SvcName, Refs)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # stop_transport/2
+%% ---------------------------------------------------------------------------
stop_transport(_, []) ->
ok;
stop_transport(SvcName, [_|_] = Refs) ->
call_service_by_name(SvcName, {stop, Refs}).
-%%% ---------------------------------------------------------------------------
-%%% # info(SvcName, Item)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # info/2
+%% ---------------------------------------------------------------------------
info(SvcName, Item) ->
- case find_state(SvcName) of
- #state{} = S ->
+ case lookup_state(SvcName) of
+ [#state{} = S] ->
service_info(Item, S);
- false ->
+ [] ->
undefined
end.
-%%% ---------------------------------------------------------------------------
-%%% # receive_message(TPid, Pkt, MessageData)
-%%% ---------------------------------------------------------------------------
-
-%% Handle an incoming Diameter message in the watchdog process. This
-%% used to come through the service process but this avoids that
-%% becoming a bottleneck.
-
-receive_message(TPid, Pkt, T)
- when is_pid(TPid) ->
- #diameter_packet{header = #diameter_header{is_request = R}} = Pkt,
- recv(R, (not R) andalso lookup_request(Pkt, TPid), TPid, Pkt, T).
-
-%% Incoming request ...
-recv(true, false, TPid, Pkt, T) ->
- try
- spawn(fun() -> recv_request(TPid, Pkt, T) end)
- catch
- error: system_limit = E -> %% discard
- ?LOG({error, E}, now())
- end;
-
-%% ... answer to known request ...
-recv(false, #request{from = {_, Ref}, handler = Pid} = Req, _, Pkt, _) ->
- Pid ! {answer, Ref, Req, Pkt};
-%% Note that failover could have happened prior to this message being
-%% received and triggering failback. That is, both a failover message
-%% and answer may be on their way to the handler process. In the worst
-%% case the request process gets notification of the failover and
-%% sends to the alternate peer before an answer arrives, so it's
-%% always the case that we can receive more than one answer after
-%% failover. The first answer received by the request process wins,
-%% any others are discarded.
-
-%% ... or not.
-recv(false, false, _, _, _) ->
- ok.
-
-%%% ---------------------------------------------------------------------------
-%%% # call(SvcName, App, Msg, Options)
-%%% ---------------------------------------------------------------------------
-
-call(SvcName, App, Msg, Options)
- when is_list(Options) ->
- Rec = make_options(Options),
- Ref = make_ref(),
- Caller = {self(), Ref},
- Fun = fun() -> exit({Ref, call(SvcName, App, Msg, Rec, Caller)}) end,
- try spawn_monitor(Fun) of
- {_, MRef} ->
- recv(MRef, Ref, Rec#options.detach, false)
- catch
- error: system_limit = E ->
- {error, E}
- end.
-
-%% Don't rely on gen_server:call/3 for the timeout handling since it
-%% makes no guarantees about not leaving a reply message in the
-%% mailbox if we catch its exit at timeout. It currently *can* do so,
-%% which is also undocumented.
-
-recv(MRef, _, true, true) ->
- erlang:demonitor(MRef, [flush]),
- ok;
-
-recv(MRef, Ref, Detach, Sent) ->
- receive
- Ref -> %% send has been attempted
- recv(MRef, Ref, Detach, true);
- {'DOWN', MRef, process, _, Reason} ->
- call_rc(Reason, Ref, Sent)
- end.
-
-%% call/5 has returned ...
-call_rc({Ref, Ans}, Ref, _) ->
- Ans;
-
-%% ... or not. In this case failure/encode are documented.
-call_rc(_, _, Sent) ->
- {error, choose(Sent, failure, encode)}.
-
-%% call/5
-%%
-%% In the process spawned for the outgoing request.
-
-call(SvcName, App, Msg, Opts, Caller) ->
- c(find_state(SvcName), App, Msg, Opts, Caller).
-
-c(#state{service_name = Svc, options = [{_, Mask} | _]} = S,
- App,
- Msg,
- Opts,
- Caller) ->
- case find_transport(App, Msg, Opts, S) of
- {_,_,_} = T ->
- send_request(T, Mask, Msg, Opts, Caller, Svc);
- false ->
- {error, no_connection};
- {error, _} = No ->
- No
- end;
-
-c(false, _, _, _, _) ->
- {error, no_service}.
-
-%% find_state/1
-
-find_state(SvcName) ->
- fs(ets:lookup(?STATE_TABLE, SvcName)).
-
-fs([#state{} = S]) ->
- S;
-
-fs([]) ->
- false.
-
-%% make_options/1
+%% lookup_state/1
-make_options(Options) ->
- lists:foldl(fun mo/2, #options{}, Options).
+lookup_state(SvcName) ->
+ ets:lookup(?STATE_TABLE, SvcName).
-mo({timeout, T}, Rec)
- when is_integer(T), 0 =< T ->
- Rec#options{timeout = T};
-
-mo({filter, F}, #options{filter = none} = Rec) ->
- Rec#options{filter = F};
-mo({filter, F}, #options{filter = {all, Fs}} = Rec) ->
- Rec#options{filter = {all, [F | Fs]}};
-mo({filter, F}, #options{filter = F0} = Rec) ->
- Rec#options{filter = {all, [F0, F]}};
-
-mo({extra, L}, #options{extra = X} = Rec)
- when is_list(L) ->
- Rec#options{extra = X ++ L};
-
-mo(detach, Rec) ->
- Rec#options{detach = true};
-
-mo(T, _) ->
- ?ERROR({invalid_option, T}).
-
-%%% ---------------------------------------------------------------------------
-%%% # subscribe(SvcName)
-%%% # unsubscribe(SvcName)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # subscribe/1
+%% # unsubscribe/1
+%% ---------------------------------------------------------------------------
subscribe(SvcName) ->
diameter_reg:add({?MODULE, subscriber, SvcName}).
@@ -394,9 +235,9 @@ subscriptions() ->
pmap(Props) ->
lists:map(fun({{?MODULE, _, Name}, Pid}) -> {Name, Pid} end, Props).
-%%% ---------------------------------------------------------------------------
-%%% # services(Pattern)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # services/1
+%% ---------------------------------------------------------------------------
services(Pat) ->
pmap(diameter_reg:match({?MODULE, service, Pat})).
@@ -412,6 +253,86 @@ whois(SvcName) ->
undefined
end.
+%% ---------------------------------------------------------------------------
+%% # pick_peer/3
+%% ---------------------------------------------------------------------------
+
+-spec pick_peer(SvcName, AppOrAlias, Opts)
+ -> {{TPid, Caps, App}, Mask}
+ | false
+ | {error, term()}
+ when SvcName :: diameter:service_name(),
+ AppOrAlias :: {alias, diameter:app_alias()} | #diameter_app{},
+ Opts :: tuple(),
+ TPid :: pid(),
+ Caps :: #diameter_caps{},
+ App :: #diameter_app{},
+ Mask :: diameter:sequence().
+
+pick_peer(SvcName, App, Opts) ->
+ pick(lookup_state(SvcName), App, Opts).
+
+pick([], _, _) ->
+ {error, no_service};
+
+pick([S], App, Opts) ->
+ pick(S, App, Opts);
+
+pick(#state{service = #diameter_service{applications = Apps}}
+ = S,
+ {alias, Alias},
+ Opts) -> %% initial call from diameter:call/4
+ pick(S, find_outgoing_app(Alias, Apps), Opts);
+
+pick(_, false, _) ->
+ false;
+
+pick(#state{options = [{_, Mask} | _]}
+ = S,
+ #diameter_app{module = ModX, dictionary = Dict}
+ = App0,
+ {DestF, Filter, Xtra}) ->
+ App = App0#diameter_app{module = ModX ++ Xtra},
+ [_,_] = RealmAndHost = diameter_lib:eval([DestF, Dict]),
+ case pick_peer(App, RealmAndHost, Filter, S) of
+ {TPid, Caps} ->
+ {{TPid, Caps, App}, Mask};
+ false = No ->
+ No
+ end.
+
+%% ---------------------------------------------------------------------------
+%% # find_incoming_app/4
+%% ---------------------------------------------------------------------------
+
+-spec find_incoming_app(PeerT, TPid, Id, Apps)
+ -> {#diameter_app{}, #diameter_caps{}} %% connection and suitable app
+ | #diameter_caps{} %% connection but no suitable app
+ | false %% no connection
+ when PeerT :: ets:tid(),
+ TPid :: pid(),
+ Id :: non_neg_integer(),
+ Apps :: [#diameter_app{}].
+
+find_incoming_app(PeerT, TPid, Id, Apps) ->
+ try ets:lookup(PeerT, TPid) of
+ [#peer{} = P] ->
+ find_incoming_app(P, Id, Apps);
+ [] -> %% transport has gone down
+ false
+ catch
+ error: badarg -> %% service has gone down (and taken table with it)
+ false
+ end.
+
+%% ---------------------------------------------------------------------------
+%% # notify/2
+%% ---------------------------------------------------------------------------
+
+notify(SvcName, Msg) ->
+ Pid = whois(SvcName),
+ is_pid(Pid) andalso (Pid ! Msg).
+
%% ===========================================================================
%% ===========================================================================
@@ -426,9 +347,9 @@ uptime(Svc) ->
call_module(Service, AppMod, Request) ->
call_service(Service, {call_module, AppMod, Request}).
-%%% ---------------------------------------------------------------------------
-%%% # init([SvcName])
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # init/1
+%% ---------------------------------------------------------------------------
init([SvcName]) ->
process_flag(trap_exit, true), %% ensure terminate(shutdown, _)
@@ -439,9 +360,9 @@ i(SvcName, true) ->
i(_, false) ->
{stop, {shutdown, already_started}}.
-%%% ---------------------------------------------------------------------------
-%%% # handle_call(Req, From, State)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # handle_call/3
+%% ---------------------------------------------------------------------------
handle_call(state, _, S) ->
{reply, S, S};
@@ -477,17 +398,17 @@ handle_call(Req, From, S) ->
unexpected(handle_call, [Req, From], S),
{reply, nok, S}.
-%%% ---------------------------------------------------------------------------
-%%% # handle_cast(Req, State)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # handle_cast/2
+%% ---------------------------------------------------------------------------
handle_cast(Req, S) ->
unexpected(handle_cast, [Req], S),
{noreply, S}.
-%%% ---------------------------------------------------------------------------
-%%% # handle_info(Req, State)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # handle_info/2
+%% ---------------------------------------------------------------------------
handle_info(T, #state{} = S) ->
case transition(T,S) of
@@ -570,20 +491,13 @@ transition({tc_timeout, T}, S) ->
tc_timeout(T, S),
ok;
-%% Request process is telling us it may have missed a failover message
-%% after a transport went down and the service process looked up
-%% outstanding requests.
-transition({failover, TRef, Seqs}, S) ->
- failover(TRef, Seqs, S),
- ok;
-
transition(Req, S) ->
unexpected(handle_info, [Req], S),
ok.
-%%% ---------------------------------------------------------------------------
-%%% # terminate(Reason, State)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # terminate/2
+%% ---------------------------------------------------------------------------
terminate(Reason, #state{service_name = Name} = S) ->
send_event(Name, stop),
@@ -591,9 +505,9 @@ terminate(Reason, #state{service_name = Name} = S) ->
shutdown == Reason %% application shutdown
andalso shutdown(application, S).
-%%% ---------------------------------------------------------------------------
-%%% # code_change(FromVsn, State, Extra)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # code_change/3
+%% ---------------------------------------------------------------------------
code_change(FromVsn,
#state{service_name = SvcName,
@@ -619,30 +533,20 @@ code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) ->
unexpected(F, A, #state{service_name = Name}) ->
?UNEXPECTED(F, A ++ [Name]).
-cb([_|_] = M, F, A) ->
- eval(M, F, A);
-cb(Rec, F, A) ->
- {_, M} = app(Rec),
+cb(#diameter_app{module = [_|_] = M}, F, A) ->
eval(M, F, A).
-app(#request{app = A, module = M}) ->
- {A,M};
-app(#diameter_app{alias = A, module = M}) ->
- {A,M}.
-
eval([M|X], F, A) ->
apply(M, F, A ++ X).
%% Callback with state.
-state_cb(#diameter_app{mutable = false, init_state = S}, {ModX, F, A}) ->
+state_cb(#diameter_app{module = ModX, mutable = false, init_state = S},
+ pick_peer = F,
+ A) ->
eval(ModX, F, A ++ [S]);
-state_cb(#diameter_app{mutable = true, alias = Alias}, {_,_,_} = MFA) ->
- state_cb(MFA, Alias);
-
-state_cb({ModX,F,A}, Alias)
- when is_list(ModX) ->
+state_cb(#diameter_app{module = ModX, alias = Alias}, F, A) ->
eval(ModX, F, A ++ [mod_state(Alias)]).
choose(true, X, _) -> X;
@@ -668,9 +572,9 @@ mod_state(Alias) ->
mod_state(Alias, ModS) ->
put({?MODULE, mod_state, Alias}, ModS).
-%%% ---------------------------------------------------------------------------
-%%% # shutdown/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # shutdown/2
+%% ---------------------------------------------------------------------------
%% remove_transport
shutdown(Refs, #state{watchdogT = WatchdogT})
@@ -697,9 +601,9 @@ st(#watchdog{pid = Pid}, Reason, Acc) ->
Pid ! {shutdown, self(), Reason},
[Pid | Acc].
-%%% ---------------------------------------------------------------------------
-%%% # call_service/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # call_service/2
+%% ---------------------------------------------------------------------------
call_service(Pid, Req)
when is_pid(Pid) ->
@@ -722,9 +626,9 @@ cs(Pid, Req)
cs(undefined, _) ->
{error, no_service}.
-%%% ---------------------------------------------------------------------------
-%%% # i/1
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # i/1
+%% ---------------------------------------------------------------------------
%% Intialize the state of a service gen_server.
@@ -794,9 +698,9 @@ get_value(Key, Vs) ->
{_, V} = lists:keyfind(Key, 1, Vs),
V.
-%%% ---------------------------------------------------------------------------
-%%% # start/3
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # start/3
+%% ---------------------------------------------------------------------------
%% If the initial start/3 at service/transport start succeeds then
%% subsequent calls to start/4 on the same service will also succeed
@@ -830,14 +734,20 @@ start(Ref, Type, Opts, #state{watchdogT = WatchdogT,
peerT = PeerT,
options = SvcOpts,
service_name = SvcName,
- service = Svc})
+ service = Svc0})
when Type == connect;
Type == accept ->
- Pid = s(Type, Ref, {PeerT,
+ #diameter_service{applications = Apps}
+ = Svc
+ = merge_service(Opts, Svc0),
+ {_,_} = Mask = proplists:get_value(sequence, SvcOpts),
+ Pid = s(Type, Ref, {diameter_traffic:make_recvdata([SvcName,
+ PeerT,
+ Apps,
+ Mask]),
Opts,
- SvcName,
SvcOpts,
- merge_service(Opts, Svc)}),
+ Svc}),
insert(WatchdogT, #watchdog{pid = Pid,
type = Type,
ref = Ref,
@@ -884,9 +794,9 @@ ms({capabilities, Opts}, #diameter_service{capabilities = Caps0} = Svc)
ms(_, Svc) ->
Svc.
-%%% ---------------------------------------------------------------------------
-%%% # accepted/3
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # accepted/3
+%% ---------------------------------------------------------------------------
accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) ->
#watchdog{ref = Ref, type = accept = T, peer = false, options = Opts}
@@ -899,11 +809,11 @@ fetch(Tid, Key) ->
[T] = ets:lookup(Tid, Key),
T.
-%%% ---------------------------------------------------------------------------
-%%% # watchdog/6
-%%%
-%%% React to a watchdog state transition.
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # watchdog/6
+%%
+%% React to a watchdog state transition.
+%% ---------------------------------------------------------------------------
%% Watchdog has a new open connection.
watchdog(TPid, [T], _, ?WD_OKAY, Wd, State) ->
@@ -933,43 +843,43 @@ watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) ->
watchdog(_, [], _, _, _, _) ->
ok.
-%%% ---------------------------------------------------------------------------
-%%% # connection_up/3
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # connection_up/3
+%% ---------------------------------------------------------------------------
%% Watchdog process has reached state OKAY.
-connection_up({TPid, {Caps, SApps, Pkt}},
+connection_up({TPid, {Caps, SupportedApps, Pkt}},
#watchdog{pid = Pid}
= Wd,
#state{peerT = PeerT}
= S) ->
Pr = #peer{pid = TPid,
- apps = SApps,
+ apps = SupportedApps,
caps = Caps,
watchdog = Pid},
insert(PeerT, Pr),
connection_up([Pkt], Wd#watchdog{peer = TPid}, Pr, S).
-%%% ---------------------------------------------------------------------------
-%%% # reopen/3
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # reopen/3
+%% ---------------------------------------------------------------------------
-reopen({TPid, {Caps, SApps, _Pkt}},
+reopen({TPid, {Caps, SupportedApps, _Pkt}},
#watchdog{pid = Pid}
= Wd,
#state{watchdogT = WatchdogT,
peerT = PeerT}) ->
insert(PeerT, #peer{pid = TPid,
- apps = SApps,
+ apps = SupportedApps,
caps = Caps,
watchdog = Pid}),
insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN,
peer = TPid}).
-%%% ---------------------------------------------------------------------------
-%%% # connection_up/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # connection_up/2
+%% ---------------------------------------------------------------------------
%% Watchdog has recovered as suspect connection. Note that there has
%% been no new capabilties exchange in this case.
@@ -987,11 +897,10 @@ connection_up(Extra,
#state{watchdogT = WatchdogT,
local_peers = LDict,
service_name = SvcName,
- service
- = #diameter_service{applications = Apps}}
+ service = #diameter_service{applications = Apps}}
= S) ->
insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}),
- request_peer_up(TPid),
+ diameter_traffic:peer_up(TPid),
insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
report_status(up, Wd, Pr, S, Extra).
@@ -1003,13 +912,57 @@ ilp({Id, Alias}, {TC, SA}, LDict) ->
?Dict:append(Alias, TC, LDict).
init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) ->
- #diameter_app{module = ModX,
- id = Id} %% assert
+ #diameter_app{id = Id} %% assert
+ = App
= find_app(Alias, Apps),
- peer_cb({ModX, peer_up, [SvcName, TC]}, Alias)
+ peer_cb(App, peer_up, [SvcName, TC])
orelse exit(TPid, kill). %% fake transport failure
+%% ---------------------------------------------------------------------------
+%% # find_incoming_app/3
+%% ---------------------------------------------------------------------------
+
+%% No one should be sending the relay identifier.
+find_incoming_app(#peer{caps = Caps}, ?APP_ID_RELAY, _) ->
+ Caps;
+
+find_incoming_app(Peer, Id, Apps)
+ when is_integer(Id) ->
+ find_incoming_app(Peer, [Id, ?APP_ID_RELAY], Apps);
+
+%% Note that the apps represented in SApps may be a strict subset of
+%% those in Apps.
+find_incoming_app(#peer{apps = SApps, caps = Caps}, Ids, Apps) ->
+ case keyfind(Ids, 1, SApps) of
+ {_Id, Alias} ->
+ {#diameter_app{} = find_app(Alias, Apps), Caps};
+ false ->
+ Caps
+ end.
+
+%% keyfind/3
+
+keyfind([], _, _) ->
+ false;
+keyfind([Key | Rest], Pos, L) ->
+ case lists:keyfind(Key, Pos, L) of
+ false ->
+ keyfind(Rest, Pos, L);
+ T ->
+ T
+ end.
+
+%% find_outgoing_app/2
+
+find_outgoing_app(Alias, Apps) ->
+ case find_app(Alias, Apps) of
+ #diameter_app{id = ?APP_ID_RELAY} ->
+ false;
+ A ->
+ A
+ end.
+
%% find_app/2
find_app(Alias, Apps) ->
@@ -1017,20 +970,21 @@ find_app(Alias, Apps) ->
%% Don't bring down the service (and all associated connections)
%% regardless of what happens.
-peer_cb(MFA, Alias) ->
- try state_cb(MFA, Alias) of
+peer_cb(App, F, A) ->
+ try state_cb(App, F, A) of
ModS ->
- mod_state(Alias, ModS),
+ mod_state(App#diameter_app.alias, ModS),
true
catch
E:R ->
- diameter_lib:error_report({failure, {E, R, Alias, ?STACK}}, MFA),
+ diameter_lib:error_report({failure, {E, R, ?STACK}},
+ {App, F, A}),
false
end.
-%%% ---------------------------------------------------------------------------
-%%% # connection_down/3
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # connection_down/3
+%% ---------------------------------------------------------------------------
connection_down(#watchdog{state = ?WD_OKAY,
peer = TPid}
@@ -1044,7 +998,7 @@ connection_down(#watchdog{state = ?WD_OKAY,
= S) ->
report_status(down, Wd, Pr, S, []),
remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
- request_peer_down(TPid, S);
+ diameter_traffic:peer_down(TPid);
connection_down(#watchdog{}, #peer{}, _) ->
ok;
@@ -1071,15 +1025,15 @@ rlp({Id, Alias}, {TC, SA}, LDict) ->
?Dict:store(Alias, lists:delete(TC, L), LDict).
down_conn(Id, Alias, TC, {SvcName, Apps}) ->
- #diameter_app{module = ModX,
- id = Id} %% assert
+ #diameter_app{id = Id} %% assert
+ = App
= find_app(Alias, Apps),
- peer_cb({ModX, peer_down, [SvcName, TC]}, Alias).
+ peer_cb(App, peer_down, [SvcName, TC]).
-%%% ---------------------------------------------------------------------------
-%%% # watchdog_down/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # watchdog_down/2
+%% ---------------------------------------------------------------------------
%% Watchdog process has died.
@@ -1172,9 +1126,9 @@ tc(true, {Ref, Type, Opts}, #state{service_name = SvcName}
tc(false = No, _, _) -> %% removed
No.
-%%% ---------------------------------------------------------------------------
-%%% # close/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # close/2
+%% ---------------------------------------------------------------------------
%% The watchdog doesn't start a new fsm in the accept case, it
%% simply stays alive until someone tells it to die in order for
@@ -1207,9 +1161,9 @@ c(Pid, false, _Opts) ->
%% which a new connection attempt is expected of a connecting peer.
%% The value should be greater than the peer's Tc + jitter.
-%%% ---------------------------------------------------------------------------
-%%% # reconnect/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # reconnect/2
+%% ---------------------------------------------------------------------------
reconnect(Pid, #state{service_name = SvcName,
watchdogT = WatchdogT}) ->
@@ -1219,9 +1173,9 @@ reconnect(Pid, #state{service_name = SvcName,
= fetch(WatchdogT, Pid),
send_event(SvcName, {reconnect, Ref, Opts}).
-%%% ---------------------------------------------------------------------------
-%%% # call_module/4
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # call_module/4
+%% ---------------------------------------------------------------------------
%% Backwards compatibility and never documented/advertised. May be
%% removed.
@@ -1243,10 +1197,10 @@ call_module(Mod, Req, From, #state{service
{reply, {error, Reason}, S}
end.
-cm([#diameter_app{module = ModX, alias = Alias}], Req, From, Svc) ->
- MFA = {ModX, handle_call, [Req, From, Svc]},
+cm([#diameter_app{alias = Alias} = App], Req, From, Svc) ->
+ Args = [Req, From, Svc],
- try state_cb(MFA, Alias) of
+ try state_cb(App, handle_call, Args) of
{noreply = T, ModS} ->
mod_state(Alias, ModS),
T;
@@ -1254,11 +1208,13 @@ cm([#diameter_app{module = ModX, alias = Alias}], Req, From, Svc) ->
mod_state(Alias, ModS),
{T, RC};
T ->
- diameter_lib:error_report({invalid, T}, MFA),
+ diameter_lib:error_report({invalid, T},
+ {App, handle_call, Args}),
invalid
catch
E: Reason ->
- diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA),
+ diameter_lib:error_report({failure, {E, Reason, ?STACK}},
+ {App, handle_call, Args}),
failure
end;
@@ -1268,1265 +1224,9 @@ cm([], _, _, _) ->
cm([_,_|_], _, _, _) ->
multiple.
-%%% ---------------------------------------------------------------------------
-%%% # send_request/6
-%%% ---------------------------------------------------------------------------
-
-%% Send an outgoing request in its dedicated process.
-%%
-%% Note that both encode of the outgoing request and of the received
-%% answer happens in this process. It's also this process that replies
-%% to the caller. The service process only handles the state-retaining
-%% callbacks.
-%%
-%% The mod field of the #diameter_app{} here includes any extra
-%% arguments passed to diameter:call/2.
-
-send_request({TPid, Caps, App} = T, Mask, Msg, Opts, Caller, SvcName) ->
- #diameter_app{module = ModX}
- = App,
-
- Pkt = make_prepare_packet(Mask, Msg),
-
- send_req(cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]),
- Pkt,
- T,
- Opts,
- Caller,
- SvcName,
- []).
-
-send_req({send, P}, Pkt, T, Opts, Caller, SvcName, Fs) ->
- send_req(make_request_packet(P, Pkt), T, Opts, Caller, SvcName, Fs);
-
-send_req({discard, Reason} , _, _, _, _, _, _) ->
- {error, Reason};
-
-send_req(discard, _, _, _, _, _, _) ->
- {error, discarded};
-
-send_req({eval_packet, RC, F}, Pkt, T, Opts, Caller, SvcName, Fs) ->
- send_req(RC, Pkt, T, Opts, Caller, SvcName, [F|Fs]);
-
-send_req(E, _, {_, _, App}, _, _, _, _) ->
- ?ERROR({invalid_return, prepare_request, App, E}).
-
-%% make_prepare_packet/2
-%%
-%% Turn an outgoing request as passed to call/4 into a diameter_packet
-%% record in preparation for a prepare_request callback.
-
-make_prepare_packet(_, Bin)
- when is_binary(Bin) ->
- #diameter_packet{header = diameter_codec:decode_header(Bin),
- bin = Bin};
-
-make_prepare_packet(Mask, #diameter_packet{msg = [#diameter_header{} = Hdr
- | Avps]}
- = Pkt) ->
- Pkt#diameter_packet{msg = [make_prepare_header(Mask, Hdr) | Avps]};
-
-make_prepare_packet(Mask, #diameter_packet{header = Hdr} = Pkt) ->
- Pkt#diameter_packet{header = make_prepare_header(Mask, Hdr)};
-
-make_prepare_packet(Mask, Msg) ->
- make_prepare_packet(Mask, #diameter_packet{msg = Msg}).
-
-%% make_prepare_header/2
-
-make_prepare_header(Mask, undefined) ->
- Seq = diameter_session:sequence(Mask),
- make_prepare_header(#diameter_header{end_to_end_id = Seq,
- hop_by_hop_id = Seq});
-
-make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined,
- hop_by_hop_id = undefined}
- = H) ->
- Seq = diameter_session:sequence(Mask),
- make_prepare_header(H#diameter_header{end_to_end_id = Seq,
- hop_by_hop_id = Seq});
-
-make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined} = H) ->
- Seq = diameter_session:sequence(Mask),
- make_prepare_header(H#diameter_header{end_to_end_id = Seq});
-
-make_prepare_header(Mask, #diameter_header{hop_by_hop_id = undefined} = H) ->
- Seq = diameter_session:sequence(Mask),
- make_prepare_header(H#diameter_header{hop_by_hop_id = Seq});
-
-make_prepare_header(_, Hdr) ->
- make_prepare_header(Hdr).
-
-%% make_prepare_header/1
-
-make_prepare_header(#diameter_header{version = undefined} = Hdr) ->
- make_prepare_header(Hdr#diameter_header{version = ?DIAMETER_VERSION});
-
-make_prepare_header(#diameter_header{} = Hdr) ->
- Hdr;
-
-make_prepare_header(T) ->
- ?ERROR({invalid_header, T}).
-
-%% make_request_packet/2
-%%
-%% Reconstruct a diameter_packet from the return value of
-%% prepare_request or prepare_retransmit callback.
-
-make_request_packet(Bin, _)
- when is_binary(Bin) ->
- make_prepare_packet(false, Bin);
-
-make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]}
- = Pkt,
- _) ->
- Pkt;
-
-%% Returning a diameter_packet with no header from a prepare_request
-%% or prepare_retransmit callback retains the header passed into it.
-%% This is primarily so that the end to end and hop by hop identifiers
-%% are retained.
-make_request_packet(#diameter_packet{header = Hdr} = Pkt,
- #diameter_packet{header = Hdr0}) ->
- Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)};
-
-make_request_packet(Msg, Pkt) ->
- Pkt#diameter_packet{msg = Msg}.
-
-%% fold_record/2
-
-fold_record(undefined, R) ->
- R;
-fold_record(Rec, R) ->
- diameter_lib:fold_tuple(2, Rec, R).
-
-%% send_req/6
-
-send_req(Pkt, {TPid, Caps, App}, Opts, Caller, SvcName, Fs) ->
- #diameter_app{alias = Alias,
- dictionary = Dict,
- module = ModX,
- options = [{answer_errors, AE} | _]}
- = App,
-
- EPkt = encode(Dict, Pkt, Fs),
-
- #options{filter = Filter,
- timeout = Timeout}
- = Opts,
-
- Req = #request{packet = Pkt,
- from = Caller,
- handler = self(),
- transport = TPid,
- caps = Caps,
- app = Alias,
- filter = Filter,
- dictionary = Dict,
- module = ModX},
-
- try
- TRef = send_request(TPid, EPkt, Req, Timeout),
- ack(Caller),
- handle_answer(SvcName, AE, recv_answer(Timeout, SvcName, {TRef, Req}))
- after
- erase_request(EPkt)
- end.
-
-%% Tell caller a send has been attempted.
-ack({Pid, Ref}) ->
- Pid ! Ref.
-
-%% recv_answer/3
-
-recv_answer(Timeout,
- SvcName,
- {TRef, #request{from = {_, Ref}, packet = RPkt} = Req}
- = T) ->
-
- %% Matching on TRef below ensures we ignore messages that pertain
- %% to a previous transport prior to failover. The answer message
- %% includes the #request{} since it's not necessarily Req; that
- %% is, from the last peer to which we've transmitted.
-
- receive
- {answer = A, Ref, Rq, Pkt} -> %% Answer from peer
- {A, Rq, Pkt};
- {timeout = Reason, TRef, _} -> %% No timely reply
- {error, Req, Reason};
- {failover = Reason, TRef, false} -> %% No alternate peer
- {error, Req, Reason};
- {failover, TRef, Transport} -> %% Resend to alternate peer
- try_retransmit(Timeout, SvcName, Req, Transport);
- {failover, TRef} -> %% May have missed failover notification
- Seqs = diameter_codec:sequence_numbers(RPkt),
- Pid = whois(SvcName),
- is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}),
- recv_answer(Timeout, SvcName, T)
- end.
-%% Note that failover starts a new timer and that expiry of an old
-%% timer value is ignored. This means that an answer could be accepted
-%% from a peer after timeout in the case of failover.
-
-try_retransmit(Timeout, SvcName, Req, Transport) ->
- try retransmit(Transport, Req, SvcName, Timeout) of
- T -> recv_answer(Timeout, SvcName, T)
- catch
- ?FAILURE(Reason) -> {error, Req, Reason}
- end.
-
-%% handle_error/3
-
-handle_error(Req, Reason, SvcName) ->
- #request{module = ModX,
- packet = Pkt,
- transport = TPid,
- caps = Caps}
- = Req,
- cb(ModX, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]).
-
-msg(#diameter_packet{msg = undefined, bin = Bin}) ->
- Bin;
-msg(#diameter_packet{msg = Msg}) ->
- Msg.
-
-%% encode/3
-
-encode(Dict, Pkt, Fs) ->
- P = encode(Dict, Pkt),
- eval_packet(P, Fs),
- P.
-
-%% encode/2
-
-%% Note that prepare_request can return a diameter_packet containing
-%% header or transport_data. Even allow the returned record to contain
-%% an encoded binary. This isn't the usual case but could some in
-%% handy, for test at least. (For example, to send garbage.)
-
-%% The normal case: encode the returned message.
-encode(Dict, #diameter_packet{msg = Msg, bin = undefined} = Pkt) ->
- D = pick_dictionary([Dict, ?BASE], Msg),
- diameter_codec:encode(D, Pkt);
-
-%% Callback has returned an encoded binary: just send.
-encode(_, #diameter_packet{} = Pkt) ->
- Pkt.
-
-%% pick_dictionary/2
-
-%% Pick the first dictionary that declares the application id in the
-%% specified header.
-pick_dictionary(Ds, [#diameter_header{application_id = Id} | _]) ->
- pd(Ds, fun(D) -> Id = D:id() end);
-
-%% Pick the first dictionary that knows the specified message name.
-pick_dictionary(Ds, [MsgName|_]) ->
- pd(Ds, fun(D) -> D:msg2rec(MsgName) end);
-
-%% Pick the first dictionary that knows the name of the specified
-%% message record.
-pick_dictionary(Ds, Rec) ->
- Name = element(1, Rec),
- pd(Ds, fun(D) -> D:rec2msg(Name) end).
-
-pd([D|Ds], F) ->
- try
- F(D),
- D
- catch
- error:_ ->
- pd(Ds, F)
- end;
-
-pd([], _) ->
- ?ERROR(no_dictionary).
-
-%% send_request/4
-
-send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, Timeout)
- when node() == node(TPid) ->
- %% Store the outgoing request before sending to avoid a race with
- %% reply reception.
- TRef = store_request(TPid, Bin, Req, Timeout),
- send(TPid, Pkt),
- TRef;
-
-%% Send using a remote transport: spawn a process on the remote node
-%% to relay the answer.
-send_request(TPid, #diameter_packet{} = Pkt, Req, Timeout) ->
- TRef = erlang:start_timer(Timeout, self(), timeout),
- T = {TPid, Pkt, Req, Timeout, TRef},
- spawn(node(TPid), ?MODULE, send, [T]),
- TRef.
-
-%% send/1
-
-send({TPid, Pkt, #request{handler = Pid} = Req, Timeout, TRef}) ->
- Ref = send_request(TPid, Pkt, Req#request{handler = self()}, Timeout),
- Pid ! reref(receive T -> T end, Ref, TRef).
-
-reref({T, Ref, R}, Ref, TRef) ->
- {T, TRef, R};
-reref(T, _, _) ->
- T.
-
-%% send/2
-
-send(Pid, Pkt) ->
- Pid ! {send, Pkt}.
-
-%% retransmit/4
-
-retransmit({TPid, Caps, #diameter_app{alias = Alias} = App} = T,
- #request{app = Alias, packet = Pkt0}
- = Req,
- SvcName,
- Timeout) ->
- have_request(Pkt0, TPid) %% Don't failover to a peer we've
- andalso ?THROW(timeout), %% already sent to.
-
- #diameter_packet{header = Hdr0} = Pkt0,
- Hdr = Hdr0#diameter_header{is_retransmitted = true},
- Pkt = Pkt0#diameter_packet{header = Hdr},
-
- resend_req(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]),
- T,
- Req#request{packet = Pkt},
- Timeout,
- []).
-
-resend_req({send, P}, T, #request{packet = Pkt} = Req, Timeout, Fs) ->
- retransmit(make_request_packet(P, Pkt), T, Req, Timeout, Fs);
-
-resend_req({discard, Reason}, _, _, _, _) ->
- ?THROW(Reason);
-
-resend_req(discard, _, _, _, _) ->
- ?THROW(discarded);
-
-resend_req({eval_packet, RC, F}, T, Req, Timeout, Fs) ->
- resend_req(RC, T, Req, Timeout, [F|Fs]);
-
-resend_req(T, {_, _, App}, _, _, _) ->
- ?ERROR({invalid_return, prepare_retransmit, App, T}).
-
-%% retransmit/6
-
-retransmit(Pkt, {TPid, Caps, _}, #request{dictionary = D} = Req0, Tmo, Fs) ->
- EPkt = encode(D, Pkt, Fs),
-
- Req = Req0#request{transport = TPid,
- packet = Pkt,
- caps = Caps},
-
- ?LOG(retransmission, Req),
- TRef = send_request(TPid, EPkt, Req, Tmo),
- {TRef, Req}.
-
-%% store_request/4
-
-store_request(TPid, Bin, Req, Timeout) ->
- Seqs = diameter_codec:sequence_numbers(Bin),
- TRef = erlang:start_timer(Timeout, self(), timeout),
- ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}),
- ets:member(?REQUEST_TABLE, TPid)
- orelse (self() ! {failover, TRef}), %% possibly missed failover
- TRef.
-
-%% lookup_request/2
-
-lookup_request(Msg, TPid)
- when is_pid(TPid) ->
- lookup(Msg, TPid, '_');
-
-lookup_request(Msg, TRef)
- when is_reference(TRef) ->
- lookup(Msg, '_', TRef).
-
-lookup(Msg, TPid, TRef) ->
- Seqs = diameter_codec:sequence_numbers(Msg),
- Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, TRef},
- [],
- ['$_']}],
- case ets:select(?REQUEST_TABLE, Spec) of
- [{_, Req, _}] ->
- Req;
- [] ->
- false
- end.
-
-%% erase_request/1
-
-erase_request(Pkt) ->
- ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)).
-
-%% match_requests/1
-
-match_requests(TPid) ->
- Pat = {'_', #request{transport = TPid, _ = '_'}, '_'},
- ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}]).
-
-%% have_request/2
-
-have_request(Pkt, TPid) ->
- Seqs = diameter_codec:sequence_numbers(Pkt),
- Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'},
- '$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1).
-
-%% request_peer_up/1
-
-%% Insert an element that is used to detect whether or not there has
-%% been a failover when inserting an outgoing request.
-request_peer_up(TPid) ->
- ets:insert(?REQUEST_TABLE, {TPid}).
-
-%% request_peer_down/2
-
-request_peer_down(TPid, S) ->
- ets:delete(?REQUEST_TABLE, TPid),
- lists:foreach(fun(T) -> failover(T,S) end, match_requests(TPid)).
-%% Note that a request process can store its request after failover
-%% notifications are sent here: store_request/4 sends the notification
-%% in that case. Note also that we'll send as many notifications to a
-%% given handler as there are peers its sent to. All but one of these
-%% will be ignored.
-
-%%% ---------------------------------------------------------------------------
-%%% recv_request/3
-%%% ---------------------------------------------------------------------------
-
-recv_request(TPid, Pkt, {PeerT, SvcName, Apps, Mask}) ->
- try ets:lookup(PeerT, TPid) of
- [C] ->
- recv_request(C, TPid, Pkt, SvcName, Apps, Mask);
- [] -> %% transport has gone down
- ok
- catch
- error: badarg -> %% service has gone down (and taken table with it)
- ok
- end.
-
-%% recv_request/5
-
-recv_request(#peer{apps = SApps, caps = Caps},
- TPid,
- Pkt,
- SvcName,
- Apps,
- Mask) ->
- #diameter_caps{origin_host = {OH,_},
- origin_realm = {OR,_}}
- = Caps,
-
- #diameter_packet{header = #diameter_header{application_id = Id}}
- = Pkt,
-
- recv_request(find_recv_app(Id, SApps),
- {SvcName, OH, OR},
- TPid,
- Apps,
- Mask,
- Caps,
- Pkt).
-
-%% find_recv_app/2
-
-%% No one should be sending the relay identifier.
-find_recv_app(?APP_ID_RELAY, _) ->
- false;
-
-%% With any other id we either support it locally or as a relay.
-find_recv_app(Id, SApps) ->
- keyfind([Id, ?APP_ID_RELAY], 1, SApps).
-
-%% keyfind/3
-
-keyfind([], _, _) ->
- false;
-keyfind([Key | Rest], Pos, L) ->
- case lists:keyfind(Key, Pos, L) of
- false ->
- keyfind(Rest, Pos, L);
- T ->
- T
- end.
-
-%% recv_request/7
-
-recv_request({Id, Alias}, T, TPid, Apps, Mask, Caps, Pkt) ->
- #diameter_app{dictionary = Dict}
- = A
- = find_app(Alias, Apps),
- recv_request(T,
- {TPid, Caps},
- A,
- Mask,
- diameter_codec:decode(Id, Dict, Pkt));
-%% Note that the decode is different depending on whether or not Id is
-%% ?APP_ID_RELAY.
-
-%% DIAMETER_APPLICATION_UNSUPPORTED 3007
-%% A request was sent for an application that is not supported.
-
-recv_request(false, T, TPid, _, _, _, Pkt) ->
- As = collect_avps(Pkt),
- protocol_error(3007, T, TPid, Pkt#diameter_packet{avps = As}).
-
-collect_avps(Pkt) ->
- case diameter_codec:collect_avps(Pkt) of
- {_Bs, As} ->
- As;
- As ->
- As
- end.
-
-%% recv_request/5
-
-%% Wrong number of bits somewhere in the message: reply.
-%%
-%% DIAMETER_INVALID_AVP_BITS 3009
-%% A request was received that included an AVP whose flag bits are
-%% set to an unrecognized value, or that is inconsistent with the
-%% AVP's definition.
-%%
-recv_request(T, {TPid, _}, _, _, #diameter_packet{errors = [Bs | _]} = Pkt)
- when is_bitstring(Bs) ->
- protocol_error(3009, T, TPid, Pkt);
-
-%% Either we support this application but don't recognize the command
-%% or we're a relay and the command isn't proxiable.
-%%
-%% DIAMETER_COMMAND_UNSUPPORTED 3001
-%% The Request contained a Command-Code that the receiver did not
-%% recognize or support. This MUST be used when a Diameter node
-%% receives an experimental command that it does not understand.
-%%
-recv_request(T,
- {TPid, _},
- #diameter_app{id = Id},
- _,
- #diameter_packet{header = #diameter_header{is_proxiable = P},
- msg = M}
- = Pkt)
- when ?APP_ID_RELAY /= Id, undefined == M;
- ?APP_ID_RELAY == Id, not P ->
- protocol_error(3001, T, TPid, Pkt);
-
-%% Error bit was set on a request.
-%%
-%% DIAMETER_INVALID_HDR_BITS 3008
-%% A request was received whose bits in the Diameter header were
-%% either set to an invalid combination, or to a value that is
-%% inconsistent with the command code's definition.
-%%
-recv_request(T,
- {TPid, _},
- _,
- _,
- #diameter_packet{header = #diameter_header{is_error = true}}
- = Pkt) ->
- protocol_error(3008, T, TPid, Pkt);
-
-%% A message in a locally supported application or a proxiable message
-%% in the relay application. Don't distinguish between the two since
-%% each application has its own callback config. That is, the user can
-%% easily distinguish between the two cases.
-recv_request(T, TC, App, Mask, Pkt) ->
- request_cb(T, TC, App, Mask, examine(Pkt)).
-
-%% Note that there may still be errors but these aren't protocol
-%% (3xxx) errors that lead to an answer-message.
-
-request_cb({SvcName, _OH, _OR} = T, TC, App, Mask, Pkt) ->
- request_cb(cb(App, handle_request, [Pkt, SvcName, TC]),
- App,
- Mask,
- T,
- TC,
- [],
- Pkt).
-
-%% examine/1
-%%
-%% Look for errors in a decoded message. Length errors result in
-%% decode failure in diameter_codec.
-
-examine(#diameter_packet{header = #diameter_header{version
- = ?DIAMETER_VERSION}}
- = Pkt) ->
- Pkt;
-
-%% DIAMETER_UNSUPPORTED_VERSION 5011
-%% This error is returned when a request was received, whose version
-%% number is unsupported.
-
-examine(#diameter_packet{errors = Es} = Pkt) ->
- Pkt#diameter_packet{errors = [5011 | Es]}.
-%% It's odd/unfortunate that this isn't a protocol error.
-
-%% request_cb/7
-
-%% A reply may be an answer-message, constructed either here or by
-%% the handle_request callback. The header from the incoming request
-%% is passed into the encode so that it can retrieve the relevant
-%% command code in this case. It will also then ignore Dict and use
-%% the base encoder.
-request_cb({reply, Ans},
- #diameter_app{dictionary = Dict},
- _,
- _,
- {TPid, _},
- Fs,
- Pkt) ->
- reply(Ans, Dict, TPid, Fs, Pkt);
-
-%% An 3xxx result code, for which the E-bit is set in the header.
-request_cb({protocol_error, RC}, _, _, T, {TPid, _}, Fs, Pkt)
- when 3000 =< RC, RC < 4000 ->
- protocol_error(RC, T, TPid, Fs, Pkt);
-
-%% RFC 3588 says we must reply 3001 to anything unrecognized or
-%% unsupported. 'noreply' is undocumented (and inappropriately named)
-%% backwards compatibility for this, protocol_error the documented
-%% alternative.
-request_cb(noreply, _, _, T, {TPid, _}, Fs, Pkt) ->
- protocol_error(3001, T, TPid, Fs, Pkt);
-
-%% Relay a request to another peer. This is equivalent to doing an
-%% explicit call/4 with the message in question except that (1) a loop
-%% will be detected by examining Route-Record AVP's, (3) a
-%% Route-Record AVP will be added to the outgoing request and (3) the
-%% End-to-End Identifier will default to that in the
-%% #diameter_header{} without the need for an end_to_end_identifier
-%% option.
-%%
-%% relay and proxy are similar in that they require the same handling
-%% with respect to Route-Record and End-to-End identifier. The
-%% difference is that a proxy advertises specific applications, while
-%% a relay advertises the relay application. If a callback doesn't
-%% want to distinguish between the cases in the callback return value
-%% then 'resend' is a neutral alternative.
-%%
-request_cb({A, Opts},
- #diameter_app{id = Id}
- = App,
- Mask,
- T,
- TC,
- Fs,
- Pkt)
- when A == relay, Id == ?APP_ID_RELAY;
- A == proxy, Id /= ?APP_ID_RELAY;
- A == resend ->
- resend(Opts, App, Mask, T, TC, Fs, Pkt);
-
-request_cb(discard, _, _, _, _, _, _) ->
- ok;
-
-request_cb({eval_packet, RC, F}, App, Mask, T, TC, Fs, Pkt) ->
- request_cb(RC, App, Mask, T, TC, [F|Fs], Pkt);
-
-request_cb({eval, RC, F}, App, Mask, T, TC, Fs, Pkt) ->
- request_cb(RC, App, Mask, T, TC, Fs, Pkt),
- diameter_lib:eval(F).
-
-%% protocol_error/5
-
-protocol_error(RC, {_, OH, OR}, TPid, Fs, Pkt) ->
- #diameter_packet{avps = Avps, errors = Es} = Pkt,
- ?LOG({error, RC}, Pkt),
- reply(answer_message({OH, OR, RC}, Avps),
- ?BASE,
- TPid,
- Fs,
- Pkt#diameter_packet{errors = [RC | Es]}).
-%% Note that reply/5 may set the result code once more. It's set in
-%% answer_message/2 in case reply/5 doesn't.
-
-%% protocol_error/4
-
-protocol_error(RC, T, TPid, Pkt) ->
- protocol_error(RC, T, TPid, [], Pkt).
-
-%% resend/7
-%%
-%% Resend a message as a relay or proxy agent.
-
-resend(Opts,
- #diameter_app{} = App,
- Mask,
- {_SvcName, OH, _OR} = T,
- {_TPid, _Caps} = TC,
- Fs,
- #diameter_packet{avps = Avps} = Pkt) ->
- {Code, _Flags, Vid} = ?BASE:avp_header('Route-Record'),
- resend(is_loop(Code, Vid, OH, Avps), Opts, App, Mask, T, TC, Fs, Pkt).
-
-%% DIAMETER_LOOP_DETECTED 3005
-%% An agent detected a loop while trying to get the message to the
-%% intended recipient. The message MAY be sent to an alternate peer,
-%% if one is available, but the peer reporting the error has
-%% identified a configuration problem.
-
-resend(true, _, _, _, T, {TPid, _}, Fs, Pkt) -> %% Route-Record loop
- protocol_error(3005, T, TPid, Fs, Pkt);
-
-%% 6.1.8. Relaying and Proxying Requests
-%%
-%% A relay or proxy agent MUST append a Route-Record AVP to all requests
-%% forwarded. The AVP contains the identity of the peer the request was
-%% received from.
-
-resend(false,
- Opts,
- App,
- Mask,
- {SvcName, _, _} = T,
- {TPid, #diameter_caps{origin_host = {_, OH}}},
- Fs,
- #diameter_packet{header = Hdr0,
- avps = Avps}
- = Pkt) ->
- Route = #diameter_avp{data = {?BASE, 'Route-Record', OH}},
- Seq = diameter_session:sequence(Mask),
- Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq},
- Msg = [Hdr, Route | Avps],
- resend(call(SvcName, App, Msg, Opts), T, TPid, Fs, Pkt).
-%% The incoming request is relayed with the addition of a
-%% Route-Record. Note the requirement on the return from call/4 below,
-%% which places a requirement on the value returned by the
-%% handle_answer callback of the application module in question.
-%%
-%% Note that there's nothing stopping the request from being relayed
-%% back to the sender. A pick_peer callback may want to avoid this but
-%% a smart peer might recognize the potential loop and choose another
-%% route. A less smart one will probably just relay the request back
-%% again and force us to detect the loop. A pick_peer that wants to
-%% avoid this can specify filter to avoid the possibility.
-%% Eg. {neg, {host, OH} where #diameter_caps{origin_host = {OH, _}}.
-%%
-%% RFC 6.3 says that a relay agent does not modify Origin-Host but
-%% says nothing about a proxy. Assume it should behave the same way.
-
-%% resend/4
-%%
-%% Relay a reply to a relayed request.
-
-%% Answer from the peer: reset the hop by hop identifier and send.
-resend(#diameter_packet{bin = B}
- = Pkt,
- _,
- TPid,
- Fs,
- #diameter_packet{header = #diameter_header{hop_by_hop_id = Id},
- transport_data = TD}) ->
- P = Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B),
- transport_data = TD},
- eval_packet(P, Fs),
- send(TPid, P);
-%% TODO: counters
-
-%% Or not: DIAMETER_UNABLE_TO_DELIVER.
-resend(_, T, TPid, Fs, Pkt) ->
- protocol_error(3002, T, TPid, Fs, Pkt).
-
-%% is_loop/4
-%%
-%% Is there a Route-Record AVP with our Origin-Host?
-
-is_loop(Code,
- Vid,
- Bin,
- [#diameter_avp{code = Code, vendor_id = Vid, data = Bin} | _]) ->
- true;
-
-is_loop(_, _, _, []) ->
- false;
-
-is_loop(Code, Vid, OH, [_ | Avps])
- when is_binary(OH) ->
- is_loop(Code, Vid, OH, Avps);
-
-is_loop(Code, Vid, OH, Avps) ->
- is_loop(Code, Vid, ?BASE:avp(encode, OH, 'Route-Record'), Avps).
-
-%% reply/5
-%%
-%% Send a locally originating reply.
-
-%% Skip the setting of Result-Code and Failed-AVP's below. This is
-%% currently undocumented.
-reply([Msg], Dict, TPid, Fs, Pkt)
- when is_list(Msg);
- is_tuple(Msg) ->
- reply(Msg, Dict, TPid, Fs, Pkt#diameter_packet{errors = []});
-
-%% No errors or a diameter_header/avp list.
-reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = Es} = ReqPkt)
- when [] == Es;
- is_record(hd(Msg), diameter_header) ->
- Pkt = diameter_codec:encode(Dict, make_answer_packet(Msg, ReqPkt)),
- eval_packet(Pkt, Fs),
- incr(send, Pkt, Dict, TPid), %% count result codes in sent answers
- send(TPid, Pkt);
-
-%% Or not: set Result-Code and Failed-AVP AVP's.
-reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = [H|_] = Es} = Pkt) ->
- reply(rc(Msg, rc(H), [A || {_,A} <- Es], Dict),
- Dict,
- TPid,
- Fs,
- Pkt#diameter_packet{errors = []}).
-
-eval_packet(Pkt, Fs) ->
- lists:foreach(fun(F) -> diameter_lib:eval([F,Pkt]) end, Fs).
-
-%% make_answer_packet/2
-
-%% A reply message clears the R and T flags and retains the P flag.
-%% The E flag will be set at encode. 6.2 of 3588 requires the same P
-%% flag on an answer as on the request. A #diameter_packet{} returned
-%% from a handle_request callback can circumvent this by setting its
-%% own header values.
-make_answer_packet(#diameter_packet{header = Hdr,
- msg = Msg,
- transport_data = TD},
- #diameter_packet{header = ReqHdr}) ->
- Hdr0 = ReqHdr#diameter_header{version = ?DIAMETER_VERSION,
- is_request = false,
- is_error = undefined,
- is_retransmitted = false},
- #diameter_packet{header = fold_record(Hdr0, Hdr),
- msg = Msg,
- transport_data = TD};
-
-%% Binaries and header/avp lists are sent as-is.
-make_answer_packet(Bin, #diameter_packet{transport_data = TD})
- when is_binary(Bin) ->
- #diameter_packet{bin = Bin,
- transport_data = TD};
-make_answer_packet([#diameter_header{} | _] = Msg,
- #diameter_packet{transport_data = TD}) ->
- #diameter_packet{msg = Msg,
- transport_data = TD};
-
-%% Otherwise, preserve transport_data.
-make_answer_packet(Msg, #diameter_packet{transport_data = TD} = Pkt) ->
- make_answer_packet(#diameter_packet{msg = Msg, transport_data = TD}, Pkt).
-
-%% rc/1
-
-rc({RC, _}) ->
- RC;
-rc(RC) ->
- RC.
-
-%% rc/4
-
-rc(#diameter_packet{msg = Rec} = Pkt, RC, Failed, Dict) ->
- Pkt#diameter_packet{msg = rc(Rec, RC, Failed, Dict)};
-
-rc(Rec, RC, Failed, Dict)
- when is_integer(RC) ->
- set(Rec,
- lists:append([rc(Rec, {'Result-Code', RC}, Dict),
- failed_avp(Rec, Failed, Dict)]),
- Dict).
-
-%% Reply as name and tuple list ...
-set([_|_] = Ans, Avps, _) ->
- Ans ++ Avps; %% Values nearer tail take precedence.
-
-%% ... or record.
-set(Rec, Avps, Dict) ->
- Dict:'#set-'(Avps, Rec).
-
-%% rc/3
-%%
-%% Turn the result code into a list if its optional and only set it if
-%% the arity is 1 or {0,1}. In other cases (which probably shouldn't
-%% exist in practise) we can't know what's appropriate.
-
-rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) ->
- case Dict:avp_arity(MsgName, 'Result-Code') of
- 1 -> [T];
- {0,1} -> [{K, [RC]}];
- _ -> []
- end;
-
-rc(Rec, T, Dict) ->
- rc([Dict:rec2msg(element(1, Rec))], T, Dict).
-
-%% failed_avp/3
-
-failed_avp(_, [] = No, _) ->
- No;
-
-failed_avp(Rec, Failed, Dict) ->
- [fa(Rec, [{'AVP', Failed}], Dict)].
-
-%% Reply as name and tuple list ...
-fa([MsgName | Values], FailedAvp, Dict) ->
- R = Dict:msg2rec(MsgName),
- try
- Dict:'#info-'(R, {index, 'Failed-AVP'}),
- {'Failed-AVP', [FailedAvp]}
- catch
- error: _ ->
- Avps = proplists:get_value('AVP', Values, []),
- A = #diameter_avp{name = 'Failed-AVP',
- value = FailedAvp},
- {'AVP', [A|Avps]}
- end;
-
-%% ... or record.
-fa(Rec, FailedAvp, Dict) ->
- try
- {'Failed-AVP', [FailedAvp]}
- catch
- error: _ ->
- Avps = Dict:'get-'('AVP', Rec),
- A = #diameter_avp{name = 'Failed-AVP',
- value = FailedAvp},
- {'AVP', [A|Avps]}
- end.
-
-%% 3. Diameter Header
-%%
-%% E(rror) - If set, the message contains a protocol error,
-%% and the message will not conform to the ABNF
-%% described for this command. Messages with the 'E'
-%% bit set are commonly referred to as error
-%% messages. This bit MUST NOT be set in request
-%% messages. See Section 7.2.
-
-%% 3.2. Command Code ABNF specification
-%%
-%% e-bit = ", ERR"
-%% ; If present, the 'E' bit in the Command
-%% ; Flags is set, indicating that the answer
-%% ; message contains a Result-Code AVP in
-%% ; the "protocol error" class.
-
-%% 7.1.3. Protocol Errors
-%%
-%% Errors that fall within the Protocol Error category SHOULD be treated
-%% on a per-hop basis, and Diameter proxies MAY attempt to correct the
-%% error, if it is possible. Note that these and only these errors MUST
-%% only be used in answer messages whose 'E' bit is set.
-
-%% Thus, only construct answers to protocol errors. Other errors
-%% require an message-specific answer and must be handled by the
-%% application.
-
-%% 6.2. Diameter Answer Processing
-%%
-%% When a request is locally processed, the following procedures MUST be
-%% applied to create the associated answer, in addition to any
-%% additional procedures that MAY be discussed in the Diameter
-%% application defining the command:
-%%
-%% - The same Hop-by-Hop identifier in the request is used in the
-%% answer.
-%%
-%% - The local host's identity is encoded in the Origin-Host AVP.
-%%
-%% - The Destination-Host and Destination-Realm AVPs MUST NOT be
-%% present in the answer message.
-%%
-%% - The Result-Code AVP is added with its value indicating success or
-%% failure.
-%%
-%% - If the Session-Id is present in the request, it MUST be included
-%% in the answer.
-%%
-%% - Any Proxy-Info AVPs in the request MUST be added to the answer
-%% message, in the same order they were present in the request.
-%%
-%% - The 'P' bit is set to the same value as the one in the request.
-%%
-%% - The same End-to-End identifier in the request is used in the
-%% answer.
-%%
-%% Note that the error messages (see Section 7.3) are also subjected to
-%% the above processing rules.
-
-%% 7.3. Error-Message AVP
-%%
-%% The Error-Message AVP (AVP Code 281) is of type UTF8String. It MAY
-%% accompany a Result-Code AVP as a human readable error message. The
-%% Error-Message AVP is not intended to be useful in real-time, and
-%% SHOULD NOT be expected to be parsed by network entities.
-
-%% answer_message/2
-
-answer_message({OH, OR, RC}, Avps) ->
- {Code, _, Vid} = ?BASE:avp_header('Session-Id'),
- ['answer-message', {'Origin-Host', OH},
- {'Origin-Realm', OR},
- {'Result-Code', RC}
- | session_id(Code, Vid, Avps)].
-
-session_id(Code, Vid, Avps)
- when is_list(Avps) ->
- try
- {value, #diameter_avp{data = D}} = find_avp(Code, Vid, Avps),
- [{'Session-Id', [?BASE:avp(decode, D, 'Session-Id')]}]
- catch
- error: _ ->
- []
- end.
-
-%% find_avp/3
-
-find_avp(Code, Vid, Avps)
- when is_integer(Code), (undefined == Vid orelse is_integer(Vid)) ->
- find(fun(A) -> is_avp(Code, Vid, A) end, Avps).
-
-%% The final argument here could be a list of AVP's, depending on the case,
-%% but we're only searching at the top level.
-is_avp(Code, Vid, #diameter_avp{code = Code, vendor_id = Vid}) ->
- true;
-is_avp(_, _, _) ->
- false.
-
-find(_, []) ->
- false;
-find(Pred, [H|T]) ->
- case Pred(H) of
- true ->
- {value, H};
- false ->
- find(Pred, T)
- end.
-
-%% 7. Error Handling
-%%
-%% There are certain Result-Code AVP application errors that require
-%% additional AVPs to be present in the answer. In these cases, the
-%% Diameter node that sets the Result-Code AVP to indicate the error
-%% MUST add the AVPs. Examples are:
-%%
-%% - An unrecognized AVP is received with the 'M' bit (Mandatory bit)
-%% set, causes an answer to be sent with the Result-Code AVP set to
-%% DIAMETER_AVP_UNSUPPORTED, and the Failed-AVP AVP containing the
-%% offending AVP.
-%%
-%% - An AVP that is received with an unrecognized value causes an
-%% answer to be returned with the Result-Code AVP set to
-%% DIAMETER_INVALID_AVP_VALUE, with the Failed-AVP AVP containing the
-%% AVP causing the error.
-%%
-%% - A command is received with an AVP that is omitted, yet is
-%% mandatory according to the command's ABNF. The receiver issues an
-%% answer with the Result-Code set to DIAMETER_MISSING_AVP, and
-%% creates an AVP with the AVP Code and other fields set as expected
-%% in the missing AVP. The created AVP is then added to the Failed-
-%% AVP AVP.
-%%
-%% The Result-Code AVP describes the error that the Diameter node
-%% encountered in its processing. In case there are multiple errors,
-%% the Diameter node MUST report only the first error it encountered
-%% (detected possibly in some implementation dependent order). The
-%% specific errors that can be described by this AVP are described in
-%% the following section.
-
-%% 7.5. Failed-AVP AVP
-%%
-%% The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides
-%% debugging information in cases where a request is rejected or not
-%% fully processed due to erroneous information in a specific AVP. The
-%% value of the Result-Code AVP will provide information on the reason
-%% for the Failed-AVP AVP.
-%%
-%% The possible reasons for this AVP are the presence of an improperly
-%% constructed AVP, an unsupported or unrecognized AVP, an invalid AVP
-%% value, the omission of a required AVP, the presence of an explicitly
-%% excluded AVP (see tables in Section 10), or the presence of two or
-%% more occurrences of an AVP which is restricted to 0, 1, or 0-1
-%% occurrences.
-%%
-%% A Diameter message MAY contain one Failed-AVP AVP, containing the
-%% entire AVP that could not be processed successfully. If the failure
-%% reason is omission of a required AVP, an AVP with the missing AVP
-%% code, the missing vendor id, and a zero filled payload of the minimum
-%% required length for the omitted AVP will be added.
-
-%%% ---------------------------------------------------------------------------
-%%% # handle_answer/3
-%%% ---------------------------------------------------------------------------
-
-%% Process an answer message in call-specific process.
-
-handle_answer(SvcName, _, {error, Req, Reason}) ->
- handle_error(Req, Reason, SvcName);
-
-handle_answer(SvcName,
- AnswerErrors,
- {answer, #request{dictionary = Dict} = Req, Pkt}) ->
- answer(examine(diameter_codec:decode(Dict, Pkt)),
- SvcName,
- AnswerErrors,
- Req).
-
-%% We don't really need to do a full decode if we're a relay and will
-%% just resend with a new hop by hop identifier, but might a proxy
-%% want to examine the answer?
-
-answer(Pkt, SvcName, AE, #request{transport = TPid,
- dictionary = Dict}
- = Req) ->
- try
- incr(recv, Pkt, Dict, TPid)
- of
- _ -> a(Pkt, SvcName, AE, Req)
- catch
- exit: {invalid_error_bit, _} = E ->
- a(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req)
- end.
-
-a(#diameter_packet{errors = Es} = Pkt, SvcName, AE, #request{transport = TPid,
- caps = Caps,
- packet = P}
- = Req)
- when [] == Es;
- callback == AE ->
- cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]);
-
-a(Pkt, SvcName, report, Req) ->
- x(errors, handle_answer, [SvcName, Req, Pkt]);
-
-a(Pkt, SvcName, discard, Req) ->
- x({errors, handle_answer, [SvcName, Req, Pkt]}).
-
-%% Note that we don't check that the application id in the answer's
-%% header is what we expect. (TODO: Does the rfc says anything about
-%% this?)
-
-%% incr/4
-%%
-%% Increment a stats counter for an incoming or outgoing message.
-
-%% TODO: fix
-incr(_, #diameter_packet{msg = undefined}, _, _) ->
- ok;
-
-incr(recv = D, #diameter_packet{header = H, errors = [_|_]}, _, TPid) ->
- incr(TPid, {diameter_codec:msg_id(H), D, error});
-
-incr(Dir, Pkt, Dict, TPid) ->
- #diameter_packet{header = #diameter_header{is_error = E}
- = Hdr,
- msg = Rec}
- = Pkt,
-
- RC = int(get_avp_value(Dict, 'Result-Code', Rec)),
- PE = is_protocol_error(RC),
-
- %% Check that the E bit is set only for 3xxx result codes.
- (not (E orelse PE))
- orelse (E andalso PE)
- orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]),
-
- irc(TPid, Hdr, Dir, rc_counter(Dict, Rec, RC)).
-
-irc(_, _, _, undefined) ->
- false;
-
-irc(TPid, Hdr, Dir, Ctr) ->
- incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}).
-
-%% incr/2
-
-incr(TPid, Counter) ->
- diameter_stats:incr(Counter, TPid, 1).
-
-%% error_counter/2
-
-%% RFC 3588, 7.6:
-%%
-%% All Diameter answer messages defined in vendor-specific
-%% applications MUST include either one Result-Code AVP or one
-%% Experimental-Result AVP.
-%%
-%% Maintain statistics assuming one or the other, not both, which is
-%% surely the intent of the RFC.
-
-rc_counter(Dict, Rec, undefined) ->
- er(get_avp_value(Dict, 'Experimental-Result', Rec));
-rc_counter(_, _, RC) ->
- {'Result-Code', RC}.
-
-%% Outgoing answers may be in any of the forms messages can be sent
-%% in. Incoming messages will be records. We're assuming here that the
-%% arity of the result code AVP's is 0 or 1.
-
-er([{_,_,N} = T | _])
- when is_integer(N) ->
- T;
-er({_,_,N} = T)
- when is_integer(N) ->
- T;
-er(_) ->
- undefined.
-
-%% Extract the first good looking integer. There's no guarantee
-%% that what we're looking for has arity 1.
-int([N|_])
- when is_integer(N) ->
- N;
-int(N)
- when is_integer(N) ->
- N;
-int(_) ->
- undefined.
-
-is_protocol_error(RC) ->
- 3000 =< RC andalso RC < 4000.
-
--spec x(any(), atom(), list()) -> no_return().
-
-%% Warn and exit request process on errors in an incoming answer.
-x(Reason, F, A) ->
- diameter_lib:warning_report(Reason, {?MODULE, F, A}),
- x(Reason).
-
-x(T) ->
- exit(T).
-
-%%% ---------------------------------------------------------------------------
-%%% # failover/[23]
-%%% ---------------------------------------------------------------------------
-
-%% Failover as a consequence of request_peer_down/2.
-failover({_, #request{handler = Pid} = Req, TRef}, S) ->
- Pid ! {failover, TRef, rt(Req, S)}.
-
-%% Failover as a consequence of store_request/4.
-failover(TRef, Seqs, S)
- when is_reference(TRef) ->
- case lookup_request(Seqs, TRef) of
- #request{} = Req ->
- failover({Seqs, Req, TRef}, S);
- false ->
- ok
- end.
-
-%% prepare_request returned a binary ...
-rt(#request{packet = #diameter_packet{msg = undefined}}, _) ->
- false; %% TODO: Not what we should do.
-
-%% ... or not.
-rt(#request{packet = #diameter_packet{msg = Msg},
- dictionary = Dict}
- = Req,
- S) ->
- find_transport(get_destination(Dict, Msg), Req, S).
-
-%%% ---------------------------------------------------------------------------
-%%% # report_status/5
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # report_status/5
+%% ---------------------------------------------------------------------------
report_status(Status,
#watchdog{ref = Ref,
@@ -2551,9 +1251,9 @@ send_event(SvcName, Info) ->
send_event(#diameter_event{service = SvcName} = E) ->
lists:foreach(fun({_, Pid}) -> Pid ! E end, subscriptions(SvcName)).
-%%% ---------------------------------------------------------------------------
-%%% # share_peer/5
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # share_peer/5
+%% ---------------------------------------------------------------------------
share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _],
service_name = Svc}) ->
@@ -2562,9 +1262,9 @@ share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _],
share_peer(_, _, _, _, _) ->
ok.
-%%% ---------------------------------------------------------------------------
-%%% # share_peers/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # share_peers/2
+%% ---------------------------------------------------------------------------
share_peers(Pid, #state{options = [_, {_, true} | _],
local_peers = PDict}) ->
@@ -2576,9 +1276,9 @@ share_peers(_, _) ->
sp(Pid, Alias, Peers) ->
lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers).
-%%% ---------------------------------------------------------------------------
-%%% # remote_peer_up/4
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # remote_peer_up/4
+%% ---------------------------------------------------------------------------
remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _],
service = Svc,
@@ -2598,9 +1298,9 @@ rpu(Pid, Caps, PDict, Aliases) ->
T = {Pid, Caps},
lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases).
-%%% ---------------------------------------------------------------------------
-%%% # remote_peer_down/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # remote_peer_down/2
+%% ---------------------------------------------------------------------------
remote_peer_down(Pid, #state{options = [_, _, {_, true} | _],
shared_peers = PDict}) ->
@@ -2609,164 +1309,20 @@ remote_peer_down(Pid, #state{options = [_, _, {_, true} | _],
rpd(Pid, Alias, PDict) ->
?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict).
-%%% ---------------------------------------------------------------------------
-%%% find_transport/[34]
-%%%
-%%% Return: {TransportPid, #diameter_caps{}, #diameter_app{}}
-%%% | false
-%%% | {error, Reason}
-%%% ---------------------------------------------------------------------------
-
-%% Initial call, from an arbitrary process.
-find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) ->
- #diameter_service{applications = Apps} = Svc,
- ft(find_send_app(Alias, Apps), Msg, Opts, S);
-
-%% Relay or proxy send.
-find_transport(#diameter_app{} = App, Msg, Opts, S) ->
- ft(App, Msg, Opts, S).
-
-ft(#diameter_app{module = Mod, dictionary = Dict} = App, Msg, Opts, S) ->
- #options{filter = Filter,
- extra = Xtra}
- = Opts,
- pick_peer(App#diameter_app{module = Mod ++ Xtra},
- get_destination(Dict, Msg),
- Filter,
- S);
-ft(false = No, _, _, _) ->
- No.
-
-%% This can't be used if we're a relay and sending a message
-%% in an application not known locally. (TODO)
-find_send_app(Alias, Apps) ->
- case lists:keyfind(Alias, #diameter_app.alias, Apps) of
- #diameter_app{id = ?APP_ID_RELAY} ->
- false;
- T ->
- T
- end.
-
-%% Retransmission, in the service process.
-find_transport([_,_] = RH,
- Req,
- #state{service = #diameter_service{pid = Pid,
- applications = Apps}}
- = S)
- when self() == Pid ->
- #request{app = Alias,
- filter = Filter,
- module = ModX}
- = Req,
- #diameter_app{}
- = App
- = lists:keyfind(Alias, #diameter_app.alias, Apps),
-
- pick_peer(App#diameter_app{module = ModX},
- RH,
- Filter,
- S).
-
-%% get_destination/2
-
-get_destination(Dict, Msg) ->
- [str(get_avp_value(Dict, 'Destination-Realm', Msg)),
- str(get_avp_value(Dict, 'Destination-Host', Msg))].
-
-%% This is not entirely correct. The avp could have an arity 1, in
-%% which case an empty list is a DiameterIdentity of length 0 rather
-%% than the list of no values we treat it as by mapping to undefined.
-%% This behaviour is documented.
-str([]) ->
- undefined;
-str(T) ->
- T.
-
-%% get_avp_value/3
-%%
-%% Find an AVP in a message of one of three forms:
-%%
-%% - a message record (as generated from a .dia spec) or
-%% - a list of an atom message name followed by 2-tuple, avp name/value pairs.
-%% - a list of a #diameter_header{} followed by #diameter_avp{} records,
-%%
-%% In the first two forms a dictionary module is used at encode to
-%% identify the type of the AVP and its arity in the message in
-%% question. The third form allows messages to be sent as is, without
-%% a dictionary, which is needed in the case of relay agents, for one.
-
-%% Messages will be header/avps list as a relay and the only AVP's we
-%% look for are in the common dictionary. This is required since the
-%% relay dictionary doesn't inherit the common dictionary (which maybe
-%% it should).
-get_avp_value(?RELAY, Name, Msg) ->
- get_avp_value(?BASE, Name, Msg);
-
-%% Message sent as a header/avps list, probably a relay case but not
-%% necessarily.
-get_avp_value(Dict, Name, [#diameter_header{} | Avps]) ->
- try
- {Code, _, VId} = Dict:avp_header(Name),
- [A|_] = lists:dropwhile(fun(#diameter_avp{code = C, vendor_id = V}) ->
- C /= Code orelse V /= VId
- end,
- Avps),
- avp_decode(Dict, Name, A)
- catch
- error: _ ->
- undefined
- end;
-
-%% Outgoing message as a name/values list.
-get_avp_value(_, Name, [_MsgName | Avps]) ->
- case lists:keyfind(Name, 1, Avps) of
- {_, V} ->
- V;
- _ ->
- undefined
- end;
-
-%% Record might be an answer message in the common dictionary.
-get_avp_value(Dict, Name, Rec)
- when Dict /= ?BASE, element(1, Rec) == 'diameter_base_answer-message' ->
- get_avp_value(?BASE, Name, Rec);
-
-%% Message is typically a record but not necessarily: diameter:call/4
-%% can be passed an arbitrary term.
-get_avp_value(Dict, Name, Rec) ->
- try
- Dict:'#get-'(Name, Rec)
- catch
- error:_ ->
- undefined
- end.
-
-avp_decode(Dict, Name, #diameter_avp{value = undefined,
- data = Bin}) ->
- Dict:avp(decode, Bin, Name);
-avp_decode(_, _, #diameter_avp{value = V}) ->
- V.
-
-%%% ---------------------------------------------------------------------------
-%%% # pick_peer(App, [DestRealm, DestHost], Filter, #state{})
-%%%
-%%% Return: {TransportPid, #diameter_caps{}, App}
-%%% | false
-%%% | {error, Reason}
-%%% ---------------------------------------------------------------------------
-
-%% Find transports to a given realm/host.
+%% ---------------------------------------------------------------------------
+%% pick_peer/4
+%% ---------------------------------------------------------------------------
pick_peer(#diameter_app{alias = Alias}
= App,
- [_,_] = RH,
+ RealmAndHost,
Filter,
#state{local_peers = L,
shared_peers = S,
service_name = SvcName,
service = #diameter_service{pid = Pid}}) ->
- pick_peer(peers(Alias, RH, Filter, L),
- peers(Alias, RH, Filter, S),
+ pick_peer(peers(Alias, RealmAndHost, Filter, L),
+ peers(Alias, RealmAndHost, Filter, S),
Pid,
SvcName,
App).
@@ -2779,7 +1335,12 @@ pick_peer([], [], _, _, _) ->
%% App state is mutable but we're not in the service process: go there.
pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App)
when self() /= Pid ->
- call_service(Pid, {pick_peer, Local, Remote, App});
+ case call_service(Pid, {pick_peer, Local, Remote, App}) of
+ {TPid, _} = T when is_pid(TPid) ->
+ T;
+ {error, _} ->
+ false
+ end;
%% App state isn't mutable or it is and we're in the service process:
%% do the deed.
@@ -2787,19 +1348,18 @@ pick_peer(Local,
Remote,
_Pid,
SvcName,
- #diameter_app{module = ModX,
- alias = Alias,
+ #diameter_app{alias = Alias,
init_state = S,
mutable = M}
= App) ->
- MFA = {ModX, pick_peer, [Local, Remote, SvcName]},
+ Args = [Local, Remote, SvcName],
- try state_cb(App, MFA) of
- {ok, {TPid, #diameter_caps{} = Caps}} when is_pid(TPid) ->
- {TPid, Caps, App};
- {{TPid, #diameter_caps{} = Caps}, ModS} when is_pid(TPid), M ->
+ try state_cb(App, pick_peer, Args) of
+ {ok, {TPid, #diameter_caps{}} = T} when is_pid(TPid) ->
+ T;
+ {{TPid, #diameter_caps{}} = T, ModS} when is_pid(TPid), M ->
mod_state(Alias, ModS),
- {TPid, Caps, App};
+ T;
{false = No, ModS} when M ->
mod_state(Alias, ModS),
No;
@@ -2807,15 +1367,17 @@ pick_peer(Local,
No;
false = No ->
No;
- {{TPid, #diameter_caps{} = Caps}, S} when is_pid(TPid) ->
- {TPid, Caps, App}; %% Accept returned state in the immutable
+ {{TPid, #diameter_caps{}} = T, S} when is_pid(TPid) ->
+ T; %% Accept returned state in the immutable
{false = No, S} -> %% case as long it isn't changed.
No;
T ->
- diameter_lib:error_report({invalid, T, App}, MFA)
+ diameter_lib:error_report({invalid, T, App},
+ {App, pick_peer, Args})
catch
E: Reason ->
- diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA)
+ diameter_lib:error_report({failure, {E, Reason, ?STACK}},
+ {App, pick_peer, Args})
end.
%% peers/4
@@ -2907,9 +1469,9 @@ transports(#state{watchdogT = WatchdogT}) ->
[{'is_pid', '$1'}],
['$1']}]).
-%%% ---------------------------------------------------------------------------
-%%% # service_info/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # service_info/2
+%% ---------------------------------------------------------------------------
%% The config passed to diameter:start_service/2.
-define(CAP_INFO, ['Origin-Host',
@@ -3186,22 +1748,7 @@ mk_app(#diameter_app{} = A) ->
%% One entry for each outgoing request whose answer is outstanding.
info_pending(#state{} = S) ->
- MatchSpec = [{{'$1',
- #request{transport = '$2',
- from = '$3',
- app = '$4',
- _ = '_'},
- '_'},
- [?ORCOND([{'==', T, '$2'} || T <- transports(S)])],
- [{{'$1', [{{app, '$4'}},
- {{transport, '$2'}},
- {{from, '$3'}}]}}]}],
-
- try
- ets:select(?REQUEST_TABLE, MatchSpec)
- catch
- error: badarg -> [] %% service has gone down
- end.
+ diameter_traffic:pending(transports(S)).
%% info_connections/1
%%
diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl
index 70727d068e..8fd5ded300 100644
--- a/lib/diameter/src/base/diameter_stats.erl
+++ b/lib/diameter/src/base/diameter_stats.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -136,7 +136,7 @@ read(Refs, B) ->
-spec flush([ref()])
-> [{ref(), {counter(), integer()}}].
-
+
flush(Refs) ->
try
call({flush, Refs})
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
new file mode 100644
index 0000000000..2f486861a2
--- /dev/null
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -0,0 +1,1633 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2013. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% Implements the handling of incoming and outgoing Diameter messages
+%% except CER/CEA, DWR/DWA and DPR/DPA. That is, the messages that a
+%% diameter client sends of receives.
+%%
+
+-module(diameter_traffic).
+
+%% towards diameter
+-export([send_request/4]).
+
+%% towards diameter_watchdog
+-export([receive_message/4]).
+
+%% towards diameter_service
+-export([make_recvdata/1,
+ peer_up/1,
+ peer_down/1,
+ failover/1,
+ pending/1]).
+
+%% Other callbacks.
+-export([send/1]). %% send from remote node
+
+-include_lib("diameter/include/diameter.hrl").
+-include("diameter_internal.hrl").
+
+-define(RELAY, ?DIAMETER_DICT_RELAY).
+-define(BASE, ?DIAMETER_DICT_COMMON). %% Note: the RFC 3588 dictionary
+
+-define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests
+
+%% Table containing outgoing requests for which a reply has yet to be
+%% received.
+-define(REQUEST_TABLE, diameter_request).
+
+%% Workaround for dialyzer's lack of understanding of match specs.
+-type match(T)
+ :: T | '_' | '$1' | '$2' | '$3' | '$4'.
+
+%% Record diameter:call/4 options are parsed into.
+-record(options,
+ {filter = none :: diameter:peer_filter(),
+ extra = [] :: list(),
+ timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF,
+ detach = false :: boolean()}).
+
+%% Term passed back to receive_message/4 with every incoming message.
+-record(recvdata,
+ {peerT :: ets:tid(),
+ service_name :: diameter:service_name(),
+ apps :: [#diameter_app{}],
+ sequence :: diameter:sequence()}).
+
+%% Record stored in diameter_request for each outgoing request.
+-record(request,
+ {ref :: match(reference()), %% used to receive answer
+ caller :: match(pid()), %% calling process
+ handler :: match(pid()), %% request process
+ transport :: match(pid()), %% peer process
+ caps :: match(#diameter_caps{}), %% of connection
+ packet :: match(#diameter_packet{})}). %% of request
+
+%% ---------------------------------------------------------------------------
+%% # make_recvdata/1
+%% ---------------------------------------------------------------------------
+
+make_recvdata([SvcName, PeerT, Apps, Mask | _]) ->
+ #recvdata{service_name = SvcName,
+ peerT = PeerT,
+ apps = Apps,
+ sequence = Mask}.
+%% Take a list so that the caller (diameter_service) can be upgraded
+%% first if new members are added. Note that receive_message/4 might
+%% still get an old term from any watchdog started in old code.
+
+%% ---------------------------------------------------------------------------
+%% peer_up/1
+%% ---------------------------------------------------------------------------
+
+%% Insert an element that is used to detect whether or not there has
+%% been a failover when inserting an outgoing request.
+peer_up(TPid) ->
+ ets:insert(?REQUEST_TABLE, {TPid}).
+
+%% ---------------------------------------------------------------------------
+%% peer_down/1
+%% ---------------------------------------------------------------------------
+
+peer_down(TPid) ->
+ ets:delete(?REQUEST_TABLE, TPid),
+ failover(TPid).
+
+%% ---------------------------------------------------------------------------
+%% pending/1
+%% ---------------------------------------------------------------------------
+
+pending(TPids) ->
+ MatchSpec = [{{'$1',
+ #request{caller = '$2',
+ handler = '$3',
+ transport = '$4',
+ _ = '_'},
+ '_'},
+ [?ORCOND([{'==', T, '$4'} || T <- TPids])],
+ [{{'$1', [{{caller, '$2'}},
+ {{handler, '$3'}},
+ {{transport, '$4'}}]}}]}],
+
+ try
+ ets:select(?REQUEST_TABLE, MatchSpec)
+ catch
+ error: badarg -> [] %% service has gone down
+ end.
+
+%% ---------------------------------------------------------------------------
+%% # receive_message/4
+%%
+%% Handle an incoming Diameter message.
+%% ---------------------------------------------------------------------------
+
+%% Handle an incoming Diameter message in the watchdog process. This
+%% used to come through the service process but this avoids that
+%% becoming a bottleneck.
+
+receive_message(TPid, Pkt, Dict0, RecvData)
+ when is_pid(TPid) ->
+ #diameter_packet{header = #diameter_header{is_request = R}} = Pkt,
+ recv(R,
+ (not R) andalso lookup_request(Pkt, TPid),
+ TPid,
+ Pkt,
+ Dict0,
+ RecvData).
+
+%% Incoming request ...
+recv(true, false, TPid, Pkt, Dict0, RecvData) ->
+ try
+ spawn(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end)
+ catch
+ error: system_limit = E -> %% discard
+ ?LOG({error, E}, now())
+ end;
+
+%% ... answer to known request ...
+recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) ->
+ Pid ! {answer, Ref, Req, Dict0, Pkt};
+%% Note that failover could have happened prior to this message being
+%% received and triggering failback. That is, both a failover message
+%% and answer may be on their way to the handler process. In the worst
+%% case the request process gets notification of the failover and
+%% sends to the alternate peer before an answer arrives, so it's
+%% always the case that we can receive more than one answer after
+%% failover. The first answer received by the request process wins,
+%% any others are discarded.
+
+%% ... or not.
+recv(false, false, _, _, _, _) ->
+ ok.
+
+%% ---------------------------------------------------------------------------
+%% recv_request/4
+%% ---------------------------------------------------------------------------
+
+recv_request(TPid,
+ #diameter_packet{header = #diameter_header{application_id = Id}}
+ = Pkt,
+ Dict0,
+ #recvdata{peerT = PeerT, apps = Apps}
+ = RecvData) ->
+ recv_request(diameter_service:find_incoming_app(PeerT, TPid, Id, Apps),
+ TPid,
+ Pkt,
+ Dict0,
+ RecvData).
+
+%% recv_request/5
+
+recv_request({#diameter_app{id = Id, dictionary = Dict} = App, Caps},
+ TPid,
+ Pkt,
+ Dict0,
+ RecvData) ->
+ recv_R(App,
+ TPid,
+ Caps,
+ Dict0,
+ RecvData,
+ diameter_codec:decode(Id, Dict, Pkt));
+%% Note that the decode is different depending on whether or not Id is
+%% ?APP_ID_RELAY.
+
+%% DIAMETER_APPLICATION_UNSUPPORTED 3007
+%% A request was sent for an application that is not supported.
+
+recv_request(#diameter_caps{} = Caps, TPid, Pkt, Dict0, _) ->
+ As = collect_avps(Pkt),
+ protocol_error(3007, TPid, Caps, Dict0, Pkt#diameter_packet{avps = As});
+
+recv_request(false, _, _, _, _) -> %% transport has gone down
+ ok.
+
+collect_avps(Pkt) ->
+ case diameter_codec:collect_avps(Pkt) of
+ {_Bs, As} ->
+ As;
+ As ->
+ As
+ end.
+
+%% recv_R/6
+
+%% Wrong number of bits somewhere in the message: reply.
+%%
+%% DIAMETER_INVALID_AVP_BITS 3009
+%% A request was received that included an AVP whose flag bits are
+%% set to an unrecognized value, or that is inconsistent with the
+%% AVP's definition.
+%%
+recv_R(_App,
+ TPid,
+ Caps,
+ Dict0,
+ _RecvData,
+ #diameter_packet{errors = [Bs | _]} = Pkt)
+ when is_bitstring(Bs) ->
+ protocol_error(3009, TPid, Caps, Dict0, Pkt);
+
+%% Either we support this application but don't recognize the command
+%% or we're a relay and the command isn't proxiable.
+%%
+%% DIAMETER_COMMAND_UNSUPPORTED 3001
+%% The Request contained a Command-Code that the receiver did not
+%% recognize or support. This MUST be used when a Diameter node
+%% receives an experimental command that it does not understand.
+%%
+recv_R(#diameter_app{id = Id},
+ TPid,
+ Caps,
+ Dict0,
+ _RecvData,
+ #diameter_packet{header = #diameter_header{is_proxiable = P},
+ msg = M}
+ = Pkt)
+ when ?APP_ID_RELAY /= Id, undefined == M;
+ ?APP_ID_RELAY == Id, not P ->
+ protocol_error(3001, TPid, Caps, Dict0, Pkt);
+
+%% Error bit was set on a request.
+%%
+%% DIAMETER_INVALID_HDR_BITS 3008
+%% A request was received whose bits in the Diameter header were
+%% either set to an invalid combination, or to a value that is
+%% inconsistent with the command code's definition.
+%%
+recv_R(_App,
+ TPid,
+ Caps,
+ Dict0,
+ _RecvData,
+ #diameter_packet{header = #diameter_header{is_error = true}}
+ = Pkt) ->
+ protocol_error(3008, TPid, Caps, Dict0, Pkt);
+
+%% A message in a locally supported application or a proxiable message
+%% in the relay application. Don't distinguish between the two since
+%% each application has its own callback config. That is, the user can
+%% easily distinguish between the two cases.
+recv_R(App, TPid, Caps, Dict0, RecvData, Pkt) ->
+ request_cb(App, TPid, Caps, Dict0, RecvData, examine(Pkt)).
+
+%% Note that there may still be errors but these aren't protocol
+%% (3xxx) errors that lead to an answer-message.
+
+request_cb(App,
+ TPid,
+ Caps,
+ Dict0,
+ #recvdata{service_name = SvcName}
+ = RecvData,
+ Pkt) ->
+ request_cb(cb(App, handle_request, [Pkt, SvcName, {TPid, Caps}]),
+ App,
+ TPid,
+ Caps,
+ Dict0,
+ RecvData,
+ [],
+ Pkt).
+
+%% examine/1
+%%
+%% Look for errors in a decoded message. Length errors result in
+%% decode failure in diameter_codec.
+
+examine(#diameter_packet{header = #diameter_header{version
+ = ?DIAMETER_VERSION}}
+ = Pkt) ->
+ Pkt;
+
+%% DIAMETER_UNSUPPORTED_VERSION 5011
+%% This error is returned when a request was received, whose version
+%% number is unsupported.
+
+examine(#diameter_packet{errors = Es} = Pkt) ->
+ Pkt#diameter_packet{errors = [5011 | Es]}.
+%% It's odd/unfortunate that this isn't a protocol error.
+
+%% request_cb/8
+
+%% A reply may be an answer-message, constructed either here or by
+%% the handle_request callback. The header from the incoming request
+%% is passed into the encode so that it can retrieve the relevant
+%% command code in this case. It will also then ignore Dict and use
+%% the base encoder.
+request_cb({reply, Ans},
+ #diameter_app{dictionary = Dict},
+ TPid,
+ _Caps,
+ Dict0,
+ _RecvData,
+ Fs,
+ Pkt) ->
+ reply(Ans, dict(Dict, Dict0, Ans), TPid, Fs, Pkt);
+
+%% An 3xxx result code, for which the E-bit is set in the header.
+request_cb({protocol_error, RC},
+ _App,
+ TPid,
+ Caps,
+ Dict0,
+ _RecvData,
+ Fs,
+ Pkt)
+ when 3000 =< RC, RC < 4000 ->
+ protocol_error(RC, TPid, Caps, Dict0, Fs, Pkt);
+
+%% RFC 3588 says we must reply 3001 to anything unrecognized or
+%% unsupported. 'noreply' is undocumented (and inappropriately named)
+%% backwards compatibility for this, protocol_error the documented
+%% alternative.
+request_cb(noreply,
+ _App,
+ TPid,
+ Caps,
+ Dict0,
+ _RecvData,
+ Fs,
+ Pkt) ->
+ protocol_error(3001, TPid, Caps, Dict0, Fs, Pkt);
+
+%% Relay a request to another peer. This is equivalent to doing an
+%% explicit call/4 with the message in question except that (1) a loop
+%% will be detected by examining Route-Record AVP's, (3) a
+%% Route-Record AVP will be added to the outgoing request and (3) the
+%% End-to-End Identifier will default to that in the
+%% #diameter_header{} without the need for an end_to_end_identifier
+%% option.
+%%
+%% relay and proxy are similar in that they require the same handling
+%% with respect to Route-Record and End-to-End identifier. The
+%% difference is that a proxy advertises specific applications, while
+%% a relay advertises the relay application. If a callback doesn't
+%% want to distinguish between the cases in the callback return value
+%% then 'resend' is a neutral alternative.
+%%
+request_cb({A, Opts},
+ #diameter_app{id = Id}
+ = App,
+ TPid,
+ Caps,
+ Dict0,
+ RecvData,
+ Fs,
+ Pkt)
+ when A == relay, Id == ?APP_ID_RELAY;
+ A == proxy, Id /= ?APP_ID_RELAY;
+ A == resend ->
+ resend(Opts, App, TPid, Caps, Dict0, RecvData, Fs, Pkt);
+
+request_cb(discard, _, _, _, _, _, _, _) ->
+ ok;
+
+request_cb({eval_packet, RC, F}, App, TPid, Caps, Dict0, RecvData, Fs, Pkt) ->
+ request_cb(RC, App, TPid, Caps, Dict0, RecvData, [F|Fs], Pkt);
+
+request_cb({eval, RC, F}, App, TPid, Caps, Dict0, RecvData, Fs, Pkt) ->
+ request_cb(RC, App, TPid, Caps, Dict0, RecvData, Fs, Pkt),
+ diameter_lib:eval(F).
+
+%% dict/3
+
+%% An incoming answer, not yet decoded.
+dict(Dict, Dict0, #diameter_packet{header
+ = #diameter_header{is_request = false,
+ is_error = E},
+ msg = undefined}) ->
+ if E -> Dict0; true -> Dict end;
+
+dict(Dict, Dict0, [Msg]) ->
+ dict(Dict, Dict0, Msg);
+
+dict(Dict, Dict0, #diameter_packet{msg = Msg}) ->
+ dict(Dict, Dict0, Msg);
+
+dict(_Dict, Dict0, ['answer-message' | _]) ->
+ Dict0;
+
+dict(Dict, Dict0, Rec) ->
+ try
+ 'answer-message' = Dict0:rec2msg(element(1,Rec)),
+ Dict0
+ catch
+ error:_ -> Dict
+ end.
+
+%% protocol_error/6
+
+protocol_error(RC, TPid, Caps, Dict0, Fs, Pkt) ->
+ #diameter_caps{origin_host = {OH,_},
+ origin_realm = {OR,_}}
+ = Caps,
+ #diameter_packet{avps = Avps, errors = Es}
+ = Pkt,
+
+ ?LOG({error, RC}, Pkt),
+ reply(answer_message({OH, OR, RC}, Dict0, Avps),
+ Dict0,
+ TPid,
+ Fs,
+ Pkt#diameter_packet{errors = [RC | Es]}).
+%% Note that reply/5 may set the result code once more. It's set in
+%% answer_message/3 in case reply/5 doesn't.
+
+%% protocol_error/5
+
+protocol_error(RC, TPid, Caps, Dict0, Pkt) ->
+ protocol_error(RC, TPid, Caps, Dict0, [], Pkt).
+
+%% resend/7
+%%
+%% Resend a message as a relay or proxy agent.
+
+resend(Opts,
+ #diameter_app{}
+ = App,
+ TPid,
+ #diameter_caps{origin_host = {OH,_}}
+ = Caps,
+ Dict0,
+ RecvData,
+ Fs,
+ #diameter_packet{avps = Avps}
+ = Pkt) ->
+ {Code, _Flags, Vid} = Dict0:avp_header('Route-Record'),
+ resend(is_loop(Code, Vid, OH, Dict0, Avps),
+ Opts,
+ App,
+ TPid,
+ Caps,
+ Dict0,
+ RecvData,
+ Fs,
+ Pkt).
+
+%% DIAMETER_LOOP_DETECTED 3005
+%% An agent detected a loop while trying to get the message to the
+%% intended recipient. The message MAY be sent to an alternate peer,
+%% if one is available, but the peer reporting the error has
+%% identified a configuration problem.
+
+resend(true, _Opts, _App, TPid, Caps, Dict0, _RecvData, Fs, Pkt) ->
+ protocol_error(3005, TPid, Caps, Dict0, Fs, Pkt);
+
+%% 6.1.8. Relaying and Proxying Requests
+%%
+%% A relay or proxy agent MUST append a Route-Record AVP to all requests
+%% forwarded. The AVP contains the identity of the peer the request was
+%% received from.
+
+resend(false,
+ Opts,
+ App,
+ TPid,
+ #diameter_caps{origin_host = {_,OH}}
+ = Caps,
+ Dict0,
+ #recvdata{service_name = SvcName,
+ sequence = Mask},
+ Fs,
+ #diameter_packet{header = Hdr0,
+ avps = Avps}
+ = Pkt) ->
+ Route = #diameter_avp{data = {Dict0, 'Route-Record', OH}},
+ Seq = diameter_session:sequence(Mask),
+ Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq},
+ Msg = [Hdr, Route | Avps],
+ resend(send_request(SvcName, App, Msg, Opts), TPid, Caps, Dict0, Fs, Pkt).
+%% The incoming request is relayed with the addition of a
+%% Route-Record. Note the requirement on the return from call/4 below,
+%% which places a requirement on the value returned by the
+%% handle_answer callback of the application module in question.
+%%
+%% Note that there's nothing stopping the request from being relayed
+%% back to the sender. A pick_peer callback may want to avoid this but
+%% a smart peer might recognize the potential loop and choose another
+%% route. A less smart one will probably just relay the request back
+%% again and force us to detect the loop. A pick_peer that wants to
+%% avoid this can specify filter to avoid the possibility.
+%% Eg. {neg, {host, OH} where #diameter_caps{origin_host = {OH, _}}.
+%%
+%% RFC 6.3 says that a relay agent does not modify Origin-Host but
+%% says nothing about a proxy. Assume it should behave the same way.
+
+%% resend/6
+%%
+%% Relay a reply to a relayed request.
+
+%% Answer from the peer: reset the hop by hop identifier and send.
+resend(#diameter_packet{bin = B}
+ = Pkt,
+ TPid,
+ _Caps,
+ _Dict0,
+ Fs,
+ #diameter_packet{header = #diameter_header{hop_by_hop_id = Id},
+ transport_data = TD}) ->
+ P = Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B),
+ transport_data = TD},
+ eval_packet(P, Fs),
+ send(TPid, P);
+%% TODO: counters
+
+%% Or not: DIAMETER_UNABLE_TO_DELIVER.
+resend(_, TPid, Caps, Dict0, Fs, Pkt) ->
+ protocol_error(3002, TPid, Caps, Dict0, Fs, Pkt).
+
+%% is_loop/5
+%%
+%% Is there a Route-Record AVP with our Origin-Host?
+
+is_loop(Code,
+ Vid,
+ Bin,
+ _Dict0,
+ [#diameter_avp{code = Code, vendor_id = Vid, data = Bin} | _]) ->
+ true;
+
+is_loop(_, _, _, _, []) ->
+ false;
+
+is_loop(Code, Vid, OH, Dict0, [_ | Avps])
+ when is_binary(OH) ->
+ is_loop(Code, Vid, OH, Dict0, Avps);
+
+is_loop(Code, Vid, OH, Dict0, Avps) ->
+ is_loop(Code, Vid, Dict0:avp(encode, OH, 'Route-Record'), Dict0, Avps).
+
+%% reply/5
+%%
+%% Send a locally originating reply.
+
+%% Skip the setting of Result-Code and Failed-AVP's below. This is
+%% currently undocumented.
+reply([Msg], Dict, TPid, Fs, Pkt)
+ when is_list(Msg);
+ is_tuple(Msg) ->
+ reply(Msg, Dict, TPid, Fs, Pkt#diameter_packet{errors = []});
+
+%% No errors or a diameter_header/avp list.
+reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = Es} = ReqPkt)
+ when [] == Es;
+ is_record(hd(Msg), diameter_header) ->
+ Pkt = encode(Dict, make_answer_packet(Msg, ReqPkt), Fs),
+ incr(send, Pkt, Dict, TPid), %% count result codes in sent answers
+ send(TPid, Pkt);
+
+%% Or not: set Result-Code and Failed-AVP AVP's.
+reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = [H|_] = Es} = Pkt) ->
+ reply(rc(Msg, rc(H), [A || {_,A} <- Es], Dict),
+ Dict,
+ TPid,
+ Fs,
+ Pkt#diameter_packet{errors = []}).
+
+eval_packet(Pkt, Fs) ->
+ lists:foreach(fun(F) -> diameter_lib:eval([F,Pkt]) end, Fs).
+
+%% make_answer_packet/2
+
+%% A reply message clears the R and T flags and retains the P flag.
+%% The E flag will be set at encode. 6.2 of 3588 requires the same P
+%% flag on an answer as on the request. A #diameter_packet{} returned
+%% from a handle_request callback can circumvent this by setting its
+%% own header values.
+make_answer_packet(#diameter_packet{header = Hdr,
+ msg = Msg,
+ transport_data = TD},
+ #diameter_packet{header = ReqHdr}) ->
+ Hdr0 = ReqHdr#diameter_header{version = ?DIAMETER_VERSION,
+ is_request = false,
+ is_error = undefined,
+ is_retransmitted = false},
+ #diameter_packet{header = fold_record(Hdr0, Hdr),
+ msg = Msg,
+ transport_data = TD};
+
+%% Binaries and header/avp lists are sent as-is.
+make_answer_packet(Bin, #diameter_packet{transport_data = TD})
+ when is_binary(Bin) ->
+ #diameter_packet{bin = Bin,
+ transport_data = TD};
+make_answer_packet([#diameter_header{} | _] = Msg,
+ #diameter_packet{transport_data = TD}) ->
+ #diameter_packet{msg = Msg,
+ transport_data = TD};
+
+%% Otherwise, preserve transport_data.
+make_answer_packet(Msg, #diameter_packet{transport_data = TD} = Pkt) ->
+ make_answer_packet(#diameter_packet{msg = Msg, transport_data = TD}, Pkt).
+
+%% rc/1
+
+rc({RC, _}) ->
+ RC;
+rc(RC) ->
+ RC.
+
+%% rc/4
+
+rc(#diameter_packet{msg = Rec} = Pkt, RC, Failed, DictT) ->
+ Pkt#diameter_packet{msg = rc(Rec, RC, Failed, DictT)};
+
+rc(Rec, RC, Failed, DictT)
+ when is_integer(RC) ->
+ set(Rec,
+ lists:append([rc(Rec, {'Result-Code', RC}, DictT),
+ failed_avp(Rec, Failed, DictT)]),
+ DictT).
+
+%% Reply as name and tuple list ...
+set([_|_] = Ans, Avps, _) ->
+ Ans ++ Avps; %% Values nearer tail take precedence.
+
+%% ... or record.
+set(Rec, Avps, Dict) ->
+ Dict:'#set-'(Avps, Rec).
+
+%% rc/3
+%%
+%% Turn the result code into a list if its optional and only set it if
+%% the arity is 1 or {0,1}. In other cases (which probably shouldn't
+%% exist in practise) we can't know what's appropriate.
+
+rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) ->
+ case Dict:avp_arity(MsgName, 'Result-Code') of
+ 1 -> [T];
+ {0,1} -> [{K, [RC]}];
+ _ -> []
+ end;
+
+rc(Rec, T, Dict) ->
+ rc([Dict:rec2msg(element(1, Rec))], T, Dict).
+
+%% failed_avp/3
+
+failed_avp(_, [] = No, _) ->
+ No;
+
+failed_avp(Rec, Failed, Dict) ->
+ [fa(Rec, [{'AVP', Failed}], Dict)].
+
+%% Reply as name and tuple list ...
+fa([MsgName | Values], FailedAvp, Dict) ->
+ R = Dict:msg2rec(MsgName),
+ try
+ Dict:'#info-'(R, {index, 'Failed-AVP'}),
+ {'Failed-AVP', [FailedAvp]}
+ catch
+ error: _ ->
+ Avps = proplists:get_value('AVP', Values, []),
+ A = #diameter_avp{name = 'Failed-AVP',
+ value = FailedAvp},
+ {'AVP', [A|Avps]}
+ end;
+
+%% ... or record.
+fa(Rec, FailedAvp, Dict) ->
+ try
+ {'Failed-AVP', [FailedAvp]}
+ catch
+ error: _ ->
+ Avps = Dict:'get-'('AVP', Rec),
+ A = #diameter_avp{name = 'Failed-AVP',
+ value = FailedAvp},
+ {'AVP', [A|Avps]}
+ end.
+
+%% 3. Diameter Header
+%%
+%% E(rror) - If set, the message contains a protocol error,
+%% and the message will not conform to the ABNF
+%% described for this command. Messages with the 'E'
+%% bit set are commonly referred to as error
+%% messages. This bit MUST NOT be set in request
+%% messages. See Section 7.2.
+
+%% 3.2. Command Code ABNF specification
+%%
+%% e-bit = ", ERR"
+%% ; If present, the 'E' bit in the Command
+%% ; Flags is set, indicating that the answer
+%% ; message contains a Result-Code AVP in
+%% ; the "protocol error" class.
+
+%% 7.1.3. Protocol Errors
+%%
+%% Errors that fall within the Protocol Error category SHOULD be treated
+%% on a per-hop basis, and Diameter proxies MAY attempt to correct the
+%% error, if it is possible. Note that these and only these errors MUST
+%% only be used in answer messages whose 'E' bit is set.
+
+%% Thus, only construct answers to protocol errors. Other errors
+%% require an message-specific answer and must be handled by the
+%% application.
+
+%% 6.2. Diameter Answer Processing
+%%
+%% When a request is locally processed, the following procedures MUST be
+%% applied to create the associated answer, in addition to any
+%% additional procedures that MAY be discussed in the Diameter
+%% application defining the command:
+%%
+%% - The same Hop-by-Hop identifier in the request is used in the
+%% answer.
+%%
+%% - The local host's identity is encoded in the Origin-Host AVP.
+%%
+%% - The Destination-Host and Destination-Realm AVPs MUST NOT be
+%% present in the answer message.
+%%
+%% - The Result-Code AVP is added with its value indicating success or
+%% failure.
+%%
+%% - If the Session-Id is present in the request, it MUST be included
+%% in the answer.
+%%
+%% - Any Proxy-Info AVPs in the request MUST be added to the answer
+%% message, in the same order they were present in the request.
+%%
+%% - The 'P' bit is set to the same value as the one in the request.
+%%
+%% - The same End-to-End identifier in the request is used in the
+%% answer.
+%%
+%% Note that the error messages (see Section 7.3) are also subjected to
+%% the above processing rules.
+
+%% 7.3. Error-Message AVP
+%%
+%% The Error-Message AVP (AVP Code 281) is of type UTF8String. It MAY
+%% accompany a Result-Code AVP as a human readable error message. The
+%% Error-Message AVP is not intended to be useful in real-time, and
+%% SHOULD NOT be expected to be parsed by network entities.
+
+%% answer_message/3
+
+answer_message({OH, OR, RC}, Dict0, Avps) ->
+ {Code, _, Vid} = Dict0:avp_header('Session-Id'),
+ ['answer-message', {'Origin-Host', OH},
+ {'Origin-Realm', OR},
+ {'Result-Code', RC}
+ | session_id(Code, Vid, Dict0, Avps)].
+
+session_id(Code, Vid, Dict0, Avps)
+ when is_list(Avps) ->
+ try
+ {value, #diameter_avp{data = D}} = find_avp(Code, Vid, Avps),
+ [{'Session-Id', [Dict0:avp(decode, D, 'Session-Id')]}]
+ catch
+ error: _ ->
+ []
+ end.
+
+%% find_avp/3
+
+find_avp(Code, Vid, Avps)
+ when is_integer(Code), (undefined == Vid orelse is_integer(Vid)) ->
+ find(fun(A) -> is_avp(Code, Vid, A) end, Avps).
+
+%% The final argument here could be a list of AVP's, depending on the case,
+%% but we're only searching at the top level.
+is_avp(Code, Vid, #diameter_avp{code = Code, vendor_id = Vid}) ->
+ true;
+is_avp(_, _, _) ->
+ false.
+
+find(_, []) ->
+ false;
+find(Pred, [H|T]) ->
+ case Pred(H) of
+ true ->
+ {value, H};
+ false ->
+ find(Pred, T)
+ end.
+
+%% 7. Error Handling
+%%
+%% There are certain Result-Code AVP application errors that require
+%% additional AVPs to be present in the answer. In these cases, the
+%% Diameter node that sets the Result-Code AVP to indicate the error
+%% MUST add the AVPs. Examples are:
+%%
+%% - An unrecognized AVP is received with the 'M' bit (Mandatory bit)
+%% set, causes an answer to be sent with the Result-Code AVP set to
+%% DIAMETER_AVP_UNSUPPORTED, and the Failed-AVP AVP containing the
+%% offending AVP.
+%%
+%% - An AVP that is received with an unrecognized value causes an
+%% answer to be returned with the Result-Code AVP set to
+%% DIAMETER_INVALID_AVP_VALUE, with the Failed-AVP AVP containing the
+%% AVP causing the error.
+%%
+%% - A command is received with an AVP that is omitted, yet is
+%% mandatory according to the command's ABNF. The receiver issues an
+%% answer with the Result-Code set to DIAMETER_MISSING_AVP, and
+%% creates an AVP with the AVP Code and other fields set as expected
+%% in the missing AVP. The created AVP is then added to the Failed-
+%% AVP AVP.
+%%
+%% The Result-Code AVP describes the error that the Diameter node
+%% encountered in its processing. In case there are multiple errors,
+%% the Diameter node MUST report only the first error it encountered
+%% (detected possibly in some implementation dependent order). The
+%% specific errors that can be described by this AVP are described in
+%% the following section.
+
+%% 7.5. Failed-AVP AVP
+%%
+%% The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides
+%% debugging information in cases where a request is rejected or not
+%% fully processed due to erroneous information in a specific AVP. The
+%% value of the Result-Code AVP will provide information on the reason
+%% for the Failed-AVP AVP.
+%%
+%% The possible reasons for this AVP are the presence of an improperly
+%% constructed AVP, an unsupported or unrecognized AVP, an invalid AVP
+%% value, the omission of a required AVP, the presence of an explicitly
+%% excluded AVP (see tables in Section 10), or the presence of two or
+%% more occurrences of an AVP which is restricted to 0, 1, or 0-1
+%% occurrences.
+%%
+%% A Diameter message MAY contain one Failed-AVP AVP, containing the
+%% entire AVP that could not be processed successfully. If the failure
+%% reason is omission of a required AVP, an AVP with the missing AVP
+%% code, the missing vendor id, and a zero filled payload of the minimum
+%% required length for the omitted AVP will be added.
+
+%% incr/4
+%%
+%% Increment a stats counter for an incoming or outgoing message.
+
+%% Outgoing message as binary: don't count. (Sending binaries is only
+%% partially supported.)
+incr(_, #diameter_packet{msg = undefined}, _, _) ->
+ ok;
+
+incr(recv = D, #diameter_packet{header = H, errors = [_|_]}, _, TPid) ->
+ incr(TPid, {diameter_codec:msg_id(H), D, error});
+
+incr(Dir, Pkt, Dict, TPid) ->
+ #diameter_packet{header = #diameter_header{is_error = E}
+ = Hdr,
+ msg = Rec}
+ = Pkt,
+
+ RC = int(get_avp_value(Dict, 'Result-Code', Rec)),
+ PE = is_protocol_error(RC),
+
+ %% Check that the E bit is set only for 3xxx result codes.
+ (not (E orelse PE))
+ orelse (E andalso PE)
+ orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]),
+
+ irc(TPid, Hdr, Dir, rc_counter(Dict, Rec, RC)).
+
+irc(_, _, _, undefined) ->
+ false;
+
+irc(TPid, Hdr, Dir, Ctr) ->
+ incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}).
+
+%% incr/2
+
+incr(TPid, Counter) ->
+ diameter_stats:incr(Counter, TPid, 1).
+
+%% error_counter/2
+
+%% RFC 3588, 7.6:
+%%
+%% All Diameter answer messages defined in vendor-specific
+%% applications MUST include either one Result-Code AVP or one
+%% Experimental-Result AVP.
+%%
+%% Maintain statistics assuming one or the other, not both, which is
+%% surely the intent of the RFC.
+
+rc_counter(Dict, Rec, undefined) ->
+ rcc(get_avp_value(Dict, 'Experimental-Result', Rec));
+rc_counter(_, _, RC) ->
+ {'Result-Code', RC}.
+
+%% Outgoing answers may be in any of the forms messages can be sent
+%% in. Incoming messages will be records. We're assuming here that the
+%% arity of the result code AVP's is 0 or 1.
+
+rcc([{_,_,N} = T | _])
+ when is_integer(N) ->
+ T;
+rcc({_,_,N} = T)
+ when is_integer(N) ->
+ T;
+rcc(_) ->
+ undefined.
+
+%% Extract the first good looking integer. There's no guarantee
+%% that what we're looking for has arity 1.
+int([N|_])
+ when is_integer(N) ->
+ N;
+int(N)
+ when is_integer(N) ->
+ N;
+int(_) ->
+ undefined.
+
+is_protocol_error(RC) ->
+ 3000 =< RC andalso RC < 4000.
+
+-spec x(any(), atom(), list()) -> no_return().
+
+%% Warn and exit request process on errors in an incoming answer.
+x(Reason, F, A) ->
+ diameter_lib:warning_report(Reason, {?MODULE, F, A}),
+ x(Reason).
+
+x(T) ->
+ exit(T).
+
+%% ---------------------------------------------------------------------------
+%% # send_request/4
+%%
+%% Handle an outgoing Diameter request.
+%% ---------------------------------------------------------------------------
+
+send_request(SvcName, AppOrAlias, Msg, Options)
+ when is_list(Options) ->
+ Rec = make_options(Options),
+ Ref = make_ref(),
+ Caller = {self(), Ref},
+ ReqF = fun() ->
+ exit({Ref, send_R(SvcName, AppOrAlias, Msg, Rec, Caller)})
+ end,
+ try spawn_monitor(ReqF) of
+ {_, MRef} ->
+ recv_A(MRef, Ref, Rec#options.detach, false)
+ catch
+ error: system_limit = E ->
+ {error, E}
+ end.
+%% The R in send_R is because Diameter request are usually given short
+%% names of the form XXR. (eg. CER, DWR, etc.) Similarly, answers have
+%% names of the form XXA.
+
+%% Don't rely on gen_server:call/3 for the timeout handling since it
+%% makes no guarantees about not leaving a reply message in the
+%% mailbox if we catch its exit at timeout. It currently *can* do so,
+%% which is also undocumented.
+
+recv_A(MRef, _, true, true) ->
+ erlang:demonitor(MRef, [flush]),
+ ok;
+
+recv_A(MRef, Ref, Detach, Sent) ->
+ receive
+ Ref -> %% send has been attempted
+ recv_A(MRef, Ref, Detach, true);
+ {'DOWN', MRef, process, _, Reason} ->
+ answer_rc(Reason, Ref, Sent)
+ end.
+
+%% send_R/5 has returned ...
+answer_rc({Ref, Ans}, Ref, _) ->
+ Ans;
+
+%% ... or not. Note that failure/encode are documented return values.
+answer_rc(_, _, Sent) ->
+ {error, choose(Sent, failure, encode)}.
+
+%% send_R/5
+%%
+%% In the process spawned for the outgoing request.
+
+send_R(SvcName, AppOrAlias, Msg, Opts, Caller) ->
+ case pick_peer(SvcName, AppOrAlias, Msg, Opts) of
+ {{_,_,_} = Transport, Mask} ->
+ send_request(Transport, Mask, Msg, Opts, Caller, SvcName);
+ false ->
+ {error, no_connection};
+ {error, _} = No ->
+ No
+ end.
+
+%% make_options/1
+
+make_options(Options) ->
+ lists:foldl(fun mo/2, #options{}, Options).
+
+mo({timeout, T}, Rec)
+ when is_integer(T), 0 =< T ->
+ Rec#options{timeout = T};
+
+mo({filter, F}, #options{filter = none} = Rec) ->
+ Rec#options{filter = F};
+mo({filter, F}, #options{filter = {all, Fs}} = Rec) ->
+ Rec#options{filter = {all, [F | Fs]}};
+mo({filter, F}, #options{filter = F0} = Rec) ->
+ Rec#options{filter = {all, [F0, F]}};
+
+mo({extra, L}, #options{extra = X} = Rec)
+ when is_list(L) ->
+ Rec#options{extra = X ++ L};
+
+mo(detach, Rec) ->
+ Rec#options{detach = true};
+
+mo(T, _) ->
+ ?ERROR({invalid_option, T}).
+
+%% ---------------------------------------------------------------------------
+%% # send_request/6
+%% ---------------------------------------------------------------------------
+
+%% Send an outgoing request in its dedicated process.
+%%
+%% Note that both encode of the outgoing request and of the received
+%% answer happens in this process. It's also this process that replies
+%% to the caller. The service process only handles the state-retaining
+%% callbacks.
+%%
+%% The module field of the #diameter_app{} here includes any extra
+%% arguments passed to diameter:call/4.
+
+send_request({TPid, Caps, App}
+ = Transport,
+ Mask,
+ Msg,
+ Opts,
+ Caller,
+ SvcName) ->
+ Pkt = make_prepare_packet(Mask, Msg),
+
+ send_R(cb(App, prepare_request, [Pkt, SvcName, {TPid, Caps}]),
+ Pkt,
+ Transport,
+ Opts,
+ Caller,
+ SvcName,
+ []).
+
+send_R({send, Msg}, Pkt, Transport, Opts, Caller, SvcName, Fs) ->
+ send_R(make_request_packet(Msg, Pkt),
+ Transport,
+ Opts,
+ Caller,
+ SvcName,
+ Fs);
+
+send_R({discard, Reason} , _, _, _, _, _, _) ->
+ {error, Reason};
+
+send_R(discard, _, _, _, _, _, _) ->
+ {error, discarded};
+
+send_R({eval_packet, RC, F}, Pkt, T, Opts, Caller, SvcName, Fs) ->
+ send_R(RC, Pkt, T, Opts, Caller, SvcName, [F|Fs]);
+
+send_R(E, _, {_, _, App}, _, _, _, _) ->
+ ?ERROR({invalid_return, prepare_request, App, E}).
+
+%% make_prepare_packet/2
+%%
+%% Turn an outgoing request as passed to call/4 into a diameter_packet
+%% record in preparation for a prepare_request callback.
+
+make_prepare_packet(_, Bin)
+ when is_binary(Bin) ->
+ #diameter_packet{header = diameter_codec:decode_header(Bin),
+ bin = Bin};
+
+make_prepare_packet(Mask, #diameter_packet{msg = [#diameter_header{} = Hdr
+ | Avps]}
+ = Pkt) ->
+ Pkt#diameter_packet{msg = [make_prepare_header(Mask, Hdr) | Avps]};
+
+make_prepare_packet(Mask, #diameter_packet{header = Hdr} = Pkt) ->
+ Pkt#diameter_packet{header = make_prepare_header(Mask, Hdr)};
+
+make_prepare_packet(Mask, Msg) ->
+ make_prepare_packet(Mask, #diameter_packet{msg = Msg}).
+
+%% make_prepare_header/2
+
+make_prepare_header(Mask, undefined) ->
+ Seq = diameter_session:sequence(Mask),
+ make_prepare_header(#diameter_header{end_to_end_id = Seq,
+ hop_by_hop_id = Seq});
+
+make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined,
+ hop_by_hop_id = undefined}
+ = H) ->
+ Seq = diameter_session:sequence(Mask),
+ make_prepare_header(H#diameter_header{end_to_end_id = Seq,
+ hop_by_hop_id = Seq});
+
+make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined} = H) ->
+ Seq = diameter_session:sequence(Mask),
+ make_prepare_header(H#diameter_header{end_to_end_id = Seq});
+
+make_prepare_header(Mask, #diameter_header{hop_by_hop_id = undefined} = H) ->
+ Seq = diameter_session:sequence(Mask),
+ make_prepare_header(H#diameter_header{hop_by_hop_id = Seq});
+
+make_prepare_header(_, Hdr) ->
+ make_prepare_header(Hdr).
+
+%% make_prepare_header/1
+
+make_prepare_header(#diameter_header{version = undefined} = Hdr) ->
+ make_prepare_header(Hdr#diameter_header{version = ?DIAMETER_VERSION});
+
+make_prepare_header(#diameter_header{} = Hdr) ->
+ Hdr;
+
+make_prepare_header(T) ->
+ ?ERROR({invalid_header, T}).
+
+%% make_request_packet/2
+%%
+%% Reconstruct a diameter_packet from the return value of
+%% prepare_request or prepare_retransmit callback.
+
+make_request_packet(Bin, _)
+ when is_binary(Bin) ->
+ make_prepare_packet(false, Bin);
+
+make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]}
+ = Pkt,
+ _) ->
+ Pkt;
+
+%% Returning a diameter_packet with no header from a prepare_request
+%% or prepare_retransmit callback retains the header passed into it.
+%% This is primarily so that the end to end and hop by hop identifiers
+%% are retained.
+make_request_packet(#diameter_packet{header = Hdr} = Pkt,
+ #diameter_packet{header = Hdr0}) ->
+ Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)};
+
+make_request_packet(Msg, Pkt) ->
+ Pkt#diameter_packet{msg = Msg}.
+
+%% fold_record/2
+
+fold_record(undefined, R) ->
+ R;
+fold_record(Rec, R) ->
+ diameter_lib:fold_tuple(2, Rec, R).
+
+%% send_R/6
+
+send_R(Pkt0,
+ {TPid, Caps, #diameter_app{dictionary = Dict} = App},
+ Opts,
+ {Pid, Ref},
+ SvcName,
+ Fs) ->
+ Pkt = encode(Dict, Pkt0, Fs),
+
+ #options{timeout = Timeout}
+ = Opts,
+
+ Req = #request{ref = Ref,
+ caller = Pid,
+ handler = self(),
+ transport = TPid,
+ caps = Caps,
+ packet = Pkt0},
+
+ try
+ TRef = send_request(TPid, Pkt, Req, SvcName, Timeout),
+ Pid ! Ref, %% tell caller a send has been attempted
+ handle_answer(SvcName,
+ App,
+ recv_A(Timeout, SvcName, App, Opts, {TRef, Req}))
+ after
+ erase_requests(Pkt)
+ end.
+
+%% recv_A/5
+
+recv_A(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} = Req}) ->
+ %% Matching on TRef below ensures we ignore messages that pertain
+ %% to a previous transport prior to failover. The answer message
+ %% includes the #request{} since it's not necessarily Req; that
+ %% is, from the last peer to which we've transmitted.
+ receive
+ {answer = A, Ref, Rq, Dict0, Pkt} -> %% Answer from peer
+ {A, Rq, Dict0, Pkt};
+ {timeout = Reason, TRef, _} -> %% No timely reply
+ {error, Req, Reason};
+ {failover, TRef} -> %% Service says peer has gone down
+ retransmit(pick_peer(SvcName, App, Req, Opts),
+ Req,
+ Opts,
+ SvcName,
+ Timeout)
+ end.
+
+%% handle_answer/3
+
+handle_answer(SvcName, App, {error, Req, Reason}) ->
+ handle_error(App, Req, Reason, SvcName);
+
+handle_answer(SvcName,
+ #diameter_app{dictionary = Dict}
+ = App,
+ {answer, Req, Dict0, Pkt}) ->
+ Mod = dict(Dict, Dict0, Pkt),
+ answer(examine(diameter_codec:decode(Mod, Pkt)),
+ SvcName,
+ Mod,
+ App,
+ Req).
+
+%% We don't really need to do a full decode if we're a relay and will
+%% just resend with a new hop by hop identifier, but might a proxy
+%% want to examine the answer?
+
+answer(Pkt, SvcName, Dict, App, #request{transport = TPid} = Req) ->
+ try
+ incr(recv, Pkt, Dict, TPid)
+ of
+ _ -> answer(Pkt, SvcName, App, Req)
+ catch
+ exit: {invalid_error_bit, _} = E ->
+ answer(Pkt#diameter_packet{errors = [E]}, SvcName, App, Req)
+ end.
+
+answer(Pkt,
+ SvcName,
+ #diameter_app{module = ModX,
+ options = [{answer_errors, AE} | _]},
+ Req) ->
+ a(Pkt, SvcName, ModX, AE, Req).
+
+a(#diameter_packet{errors = Es}
+ = Pkt,
+ SvcName,
+ ModX,
+ AE,
+ #request{transport = TPid,
+ caps = Caps,
+ packet = P})
+ when [] == Es;
+ callback == AE ->
+ cb(ModX, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]);
+
+a(Pkt, SvcName, _, report, Req) ->
+ x(errors, handle_answer, [SvcName, Req, Pkt]);
+
+a(Pkt, SvcName, _, discard, Req) ->
+ x({errors, handle_answer, [SvcName, Req, Pkt]}).
+
+%% Note that we don't check that the application id in the answer's
+%% header is what we expect. (TODO: Does the rfc says anything about
+%% this?)
+
+%% Note that failover starts a new timer and that expiry of an old
+%% timer value is ignored. This means that an answer could be accepted
+%% from a peer after timeout in the case of failover.
+
+retransmit({{_,_,App} = Transport, _Mask}, Req, Opts, SvcName, Timeout) ->
+ try retransmit(Transport, Req, SvcName, Timeout) of
+ T -> recv_A(Timeout, SvcName, App, Opts, T)
+ catch
+ ?FAILURE(Reason) -> {error, Req, Reason}
+ end;
+
+retransmit(_, Req, _, _, _) -> %% no alternate peer
+ {error, Req, failover}.
+
+%% pick_peer/4
+
+%% Retransmission after failover: call-specific arguments have already
+%% been appended in App.
+pick_peer(SvcName,
+ App,
+ #request{packet = #diameter_packet{msg = Msg}},
+ Opts) ->
+ pick_peer(SvcName, App, Msg, Opts#options{extra = []});
+
+pick_peer(_, _, undefined, _) ->
+ false;
+
+pick_peer(SvcName,
+ AppOrAlias,
+ Msg,
+ #options{filter = Filter, extra = Xtra}) ->
+ diameter_service:pick_peer(SvcName,
+ AppOrAlias,
+ {fun(D) -> get_destination(D, Msg) end,
+ Filter,
+ Xtra}).
+
+%% handle_error/4
+
+handle_error(App,
+ #request{packet = Pkt,
+ transport = TPid,
+ caps = Caps},
+ Reason,
+ SvcName) ->
+ cb(App, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]).
+
+msg(#diameter_packet{msg = undefined, bin = Bin}) ->
+ Bin;
+msg(#diameter_packet{msg = Msg}) ->
+ Msg.
+
+%% encode/3
+
+encode(Dict, Pkt, Fs) ->
+ P = encode(Dict, Pkt),
+ eval_packet(P, Fs),
+ P.
+
+%% encode/2
+
+%% Note that prepare_request can return a diameter_packet containing a
+%% header or transport_data. Even allow the returned record to contain
+%% an encoded binary. This isn't the usual case and doesn't properly
+%% support retransmission but is useful for test.
+
+%% A message to be encoded.
+encode(Dict, #diameter_packet{bin = undefined} = Pkt) ->
+ diameter_codec:encode(Dict, Pkt);
+
+%% An encoded binary: just send.
+encode(_, #diameter_packet{} = Pkt) ->
+ Pkt.
+
+%% send_request/5
+
+send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout)
+ when node() == node(TPid) ->
+ %% Store the outgoing request before sending to avoid a race with
+ %% reply reception.
+ TRef = store_request(TPid, Bin, Req, Timeout),
+ send(TPid, Pkt),
+ TRef;
+
+%% Send using a remote transport: spawn a process on the remote node
+%% to relay the answer.
+send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) ->
+ TRef = erlang:start_timer(Timeout, self(), TPid),
+ T = {TPid, Pkt, Req, SvcName, Timeout, TRef},
+ spawn(node(TPid), ?MODULE, send, [T]),
+ TRef.
+
+%% send/1
+
+send({TPid, Pkt, #request{handler = Pid} = Req, SvcName, Timeout, TRef}) ->
+ Ref = send_request(TPid,
+ Pkt,
+ Req#request{handler = self()},
+ SvcName,
+ Timeout),
+ Pid ! reref(receive T -> T end, Ref, TRef).
+
+reref({T, Ref, R}, Ref, TRef) ->
+ {T, TRef, R};
+reref(T, _, _) ->
+ T.
+
+%% send/2
+
+send(Pid, Pkt) ->
+ Pid ! {send, Pkt}.
+
+%% retransmit/4
+
+retransmit({TPid, Caps, App}
+ = Transport,
+ #request{packet = Pkt0}
+ = Req,
+ SvcName,
+ Timeout) ->
+ have_request(Pkt0, TPid) %% Don't failover to a peer we've
+ andalso ?THROW(timeout), %% already sent to.
+
+ #diameter_packet{header = Hdr0} = Pkt0,
+ Hdr = Hdr0#diameter_header{is_retransmitted = true},
+ Pkt = Pkt0#diameter_packet{header = Hdr},
+
+ retransmit(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]),
+ Transport,
+ Req#request{packet = Pkt},
+ SvcName,
+ Timeout,
+ []).
+
+retransmit({send, Msg},
+ Transport,
+ #request{packet = Pkt}
+ = Req,
+ SvcName,
+ Timeout,
+ Fs) ->
+ resend_request(make_request_packet(Msg, Pkt),
+ Transport,
+ Req,
+ SvcName,
+ Timeout,
+ Fs);
+
+retransmit({discard, Reason}, _, _, _, _, _) ->
+ ?THROW(Reason);
+
+retransmit(discard, _, _, _, _, _) ->
+ ?THROW(discarded);
+
+retransmit({eval_packet, RC, F}, Transport, Req, SvcName, Timeout, Fs) ->
+ retransmit(RC, Transport, Req, SvcName, Timeout, [F|Fs]);
+
+retransmit(T, {_, _, App}, _, _, _, _) ->
+ ?ERROR({invalid_return, prepare_retransmit, App, T}).
+
+resend_request(Pkt0,
+ {TPid, Caps, #diameter_app{dictionary = Dict}},
+ Req0,
+ SvcName,
+ Tmo,
+ Fs) ->
+ Pkt = encode(Dict, Pkt0, Fs),
+
+ Req = Req0#request{transport = TPid,
+ packet = Pkt0,
+ caps = Caps},
+
+ ?LOG(retransmission, Req),
+ TRef = send_request(TPid, Pkt, Req, SvcName, Tmo),
+ {TRef, Req}.
+
+%% store_request/4
+
+store_request(TPid, Bin, Req, Timeout) ->
+ Seqs = diameter_codec:sequence_numbers(Bin),
+ TRef = erlang:start_timer(Timeout, self(), timeout),
+ ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}),
+ ets:member(?REQUEST_TABLE, TPid)
+ orelse (self() ! {failover, TRef}), %% failover/1 may have missed
+ TRef.
+
+%% lookup_request/2
+
+lookup_request(Msg, TPid) ->
+ Seqs = diameter_codec:sequence_numbers(Msg),
+ Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, '_'},
+ [],
+ ['$_']}],
+ case ets:select(?REQUEST_TABLE, Spec) of
+ [{_, Req, _}] ->
+ Req;
+ [] ->
+ false
+ end.
+
+%% erase_requests/1
+
+erase_requests(Pkt) ->
+ ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)).
+
+%% match_requests/1
+
+match_requests(TPid) ->
+ Pat = {'_', #request{transport = TPid, _ = '_'}, '_'},
+ ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}]).
+
+%% have_request/2
+
+have_request(Pkt, TPid) ->
+ Seqs = diameter_codec:sequence_numbers(Pkt),
+ Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'},
+ '$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1).
+
+%% ---------------------------------------------------------------------------
+%% # failover/1-2
+%% ---------------------------------------------------------------------------
+
+failover(TPid)
+ when is_pid(TPid) ->
+ lists:foreach(fun failover/1, match_requests(TPid));
+%% Note that a request process can store its request after failover
+%% notifications are sent here: store_request/4 sends the notification
+%% in that case.
+
+%% Failover as a consequence of request_peer_down/1: inform the
+%% request process.
+failover({_, Req, TRef}) ->
+ #request{handler = Pid,
+ packet = #diameter_packet{msg = M}}
+ = Req,
+ M /= undefined andalso (Pid ! {failover, TRef}).
+%% Failover is not performed when msg = binary() since sending
+%% pre-encoded binaries is only partially supported. (Mostly for
+%% test.)
+
+%% get_destination/2
+
+get_destination(Dict, Msg) ->
+ [str(get_avp_value(Dict, D, Msg)) || D <- ['Destination-Realm',
+ 'Destination-Host']].
+
+%% This is not entirely correct. The avp could have an arity 1, in
+%% which case an empty list is a DiameterIdentity of length 0 rather
+%% than the list of no values we treat it as by mapping to undefined.
+%% This behaviour is documented.
+str([]) ->
+ undefined;
+str(T) ->
+ T.
+
+%% get_avp_value/3
+%%
+%% Find an AVP in a message of one of three forms:
+%%
+%% - a message record (as generated from a .dia spec) or
+%% - a list of an atom message name followed by 2-tuple, avp name/value pairs.
+%% - a list of a #diameter_header{} followed by #diameter_avp{} records,
+%%
+%% In the first two forms a dictionary module is used at encode to
+%% identify the type of the AVP and its arity in the message in
+%% question. The third form allows messages to be sent as is, without
+%% a dictionary, which is needed in the case of relay agents, for one.
+
+%% Messages will be header/avps list as a relay and the only AVP's we
+%% look for are in the common dictionary. This is required since the
+%% relay dictionary doesn't inherit the common dictionary (which maybe
+%% it should).
+get_avp_value(?RELAY, Name, Msg) ->
+ get_avp_value(?BASE, Name, Msg);
+
+%% Message sent as a header/avps list, probably a relay case but not
+%% necessarily.
+get_avp_value(Dict, Name, [#diameter_header{} | Avps]) ->
+ try
+ {Code, _, VId} = Dict:avp_header(Name),
+ [A|_] = lists:dropwhile(fun(#diameter_avp{code = C, vendor_id = V}) ->
+ C /= Code orelse V /= VId
+ end,
+ Avps),
+ avp_decode(Dict, Name, A)
+ catch
+ error: _ ->
+ undefined
+ end;
+
+%% Outgoing message as a name/values list.
+get_avp_value(_, Name, [_MsgName | Avps]) ->
+ case lists:keyfind(Name, 1, Avps) of
+ {_, V} ->
+ V;
+ _ ->
+ undefined
+ end;
+
+%% Message is typically a record but not necessarily.
+get_avp_value(Dict, Name, Rec) ->
+ try
+ Dict:'#get-'(Name, Rec)
+ catch
+ error:_ ->
+ undefined
+ end.
+
+avp_decode(Dict, Name, #diameter_avp{value = undefined,
+ data = Bin}) ->
+ Dict:avp(decode, Bin, Name);
+avp_decode(_, _, #diameter_avp{value = V}) ->
+ V.
+
+cb(#diameter_app{module = [_|_] = M}, F, A) ->
+ eval(M, F, A);
+cb([_|_] = M, F, A) ->
+ eval(M, F, A).
+
+eval([M|X], F, A) ->
+ apply(M, F, A ++ X).
+
+choose(true, X, _) -> X;
+choose(false, _, X) -> X.
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index 10ab246b88..073a415d10 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -45,6 +45,8 @@
-define(DEFAULT_TW_INIT, 30000). %% RFC 3539 ch 3.4.1
-define(NOMASK, {0,32}). %% default sequence mask
+-define(BASE, ?DIAMETER_DICT_COMMON).
+
-record(watchdog,
{%% PCB - Peer Control Block; see RFC 3539, Appendix A
status = initial :: initial | okay | suspect | down | reopen,
@@ -56,8 +58,10 @@
%% end PCB
parent = self() :: pid(), %% service process
transport :: pid() | undefined, %% peer_fsm process
- tref :: reference(), %% reference for current watchdog timer
- message_data, %% term passed into diameter_service with message
+ tref :: reference(), %% reference for current watchdog timer
+ dictionary :: module(), %% common dictionary
+ receive_data :: term(),
+ %% term passed into diameter_service with incoming message
sequence :: diameter:sequence(), %% mask
restrict :: {diameter:restriction(), boolean()},
shutdown = false :: boolean()}).
@@ -70,13 +74,12 @@
%% reason.
%% ---------------------------------------------------------------------------
--spec start(Type, {RecvData, [Opt], SvcName, SvcOpts, #diameter_service{}})
+-spec start(Type, {RecvData, [Opt], SvcOpts, #diameter_service{}})
-> {reference(), pid()}
when Type :: {connect|accept, diameter:transport_ref()},
RecvData :: term(),
Opt :: diameter:transport_opt(),
- SvcOpts :: [diameter:service_opt()],
- SvcName :: diameter:service_name().
+ SvcOpts :: [diameter:service_opt()].
start({_,_} = Type, T) ->
Ack = make_ref(),
@@ -105,7 +108,6 @@ init(T) ->
i({Ack, T, Pid, {RecvData,
Opts,
- SvcName,
SvcOpts,
#diameter_service{applications = Apps,
capabilities = Caps}
@@ -118,12 +120,14 @@ i({Ack, T, Pid, {RecvData,
{_,_} = Mask = proplists:get_value(sequence, SvcOpts),
Restrict = proplists:get_value(restrict_connections, SvcOpts),
Nodes = restrict_nodes(Restrict),
+ Dict0 = common_dictionary(Apps),
#watchdog{parent = Pid,
- transport = start(T, Opts, Mask, Nodes, Svc),
+ transport = start(T, Opts, Mask, Nodes, Dict0, Svc),
tw = proplists:get_value(watchdog_timer,
Opts,
?DEFAULT_TW_INIT),
- message_data = {RecvData, SvcName, Apps, Mask},
+ receive_data = RecvData,
+ dictionary = Dict0,
sequence = Mask,
restrict = {Restrict, lists:member(node(), Nodes)}}.
@@ -137,11 +141,60 @@ wait(Ref, Pid) ->
%% start/5
-start(T, Opts, Mask, Nodes, Svc) ->
+start(T, Opts, Mask, Nodes, Dict0, Svc) ->
{_MRef, Pid}
- = diameter_peer_fsm:start(T, Opts, {Mask, Nodes, Svc}),
+ = diameter_peer_fsm:start(T, Opts, {Mask, Nodes, Dict0, Svc}),
Pid.
+%% common_dictionary/1
+%%
+%% Determine the dictionary of the Diameter common application with
+%% Application Id 0. Fail on config errors.
+
+common_dictionary(Apps) ->
+ case
+ orddict:fold(fun dict0/3,
+ false,
+ lists:foldl(fun(#diameter_app{dictionary = M}, D) ->
+ orddict:append(M:id(), M, D)
+ end,
+ orddict:new(),
+ Apps))
+ of
+ {value, Mod} ->
+ Mod;
+ false ->
+ %% A transport should configure a common dictionary but
+ %% don't require it. Not configuring a common dictionary
+ %% means a user won't be able either send of receive
+ %% messages in the common dictionary: incoming request
+ %% will be answered with 3007 and outgoing requests cannot
+ %% be sent. The dictionary returned here is oly used for
+ %% messages diameter sends and receives: CER/CEA, DPR/DPA
+ %% and DWR/DWA.
+ ?BASE
+ end.
+
+%% Each application should be represented by a single dictionary.
+dict0(Id, [_,_|_] = Ms, _) ->
+ config_error({multiple_dictionaries, Ms, {application_id, Id}});
+
+%% An explicit common dictionary.
+dict0(?APP_ID_COMMON, [Mod], _) ->
+ {value, Mod};
+
+%% A pure relay, in which case the common application is implicit.
+%% This uses the fact that the common application will already have
+%% been folded.
+dict0(?APP_ID_RELAY, _, false) ->
+ {value, ?BASE};
+
+dict0(_, _, Acc) ->
+ Acc.
+
+config_error(T) ->
+ ?ERROR({configuration_error, T}).
+
%% handle_call/3
handle_call(_, _, State) ->
@@ -353,16 +406,16 @@ getr(Key) ->
eraser(Key) ->
erase({?MODULE, Key}).
-%% encode/2
+%% encode/3
-encode(Msg, Mask) ->
+encode(Msg, Mask, Dict) ->
Seq = diameter_session:sequence(Mask),
Hdr = #diameter_header{version = ?DIAMETER_VERSION,
end_to_end_id = Seq,
hop_by_hop_id = Seq},
Pkt = #diameter_packet{header = Hdr,
msg = Msg},
- #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt),
+ #diameter_packet{bin = Bin} = diameter_codec:encode(Dict, Pkt),
Bin.
%% okay/3
@@ -416,9 +469,10 @@ tw({M,F,A}) ->
send_watchdog(#watchdog{pending = false,
transport = TPid,
+ dictionary = Dict0,
sequence = Mask}
= S) ->
- send(TPid, {send, encode(getr(dwr), Mask)}),
+ send(TPid, {send, encode(getr(dwr), Mask, Dict0)}),
?LOG(send, 'DWR'),
S#watchdog{pending = true}.
@@ -446,8 +500,9 @@ rcv(N, _, _)
false;
rcv(_, Pkt, #watchdog{transport = TPid,
- message_data = T}) ->
- diameter_service:receive_message(TPid, Pkt, T).
+ dictionary = Dict0,
+ receive_data = T}) ->
+ diameter_traffic:receive_message(TPid, Pkt, Dict0, T).
throwaway(S) ->
throw({?MODULE, throwaway, S}).
@@ -627,13 +682,15 @@ restart(S) ->
%% state down rather then initial when receiving notification of an
%% open connection.
-restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid,
- sequence = Mask,
- restrict = {R,_}}
- = S) ->
+restart({{connect, _} = T, Opts, Svc},
+ #watchdog{parent = Pid,
+ sequence = Mask,
+ restrict = {R,_},
+ dictionary = Dict0}
+ = S) ->
send(Pid, {reconnect, self()}),
Nodes = restrict_nodes(R),
- S#watchdog{transport = start(T, Opts, Mask, Nodes, Svc),
+ S#watchdog{transport = start(T, Opts, Mask, Nodes, Dict0, Svc),
restrict = {R, lists:member(node(), Nodes)}};
%% No restriction on the number of connections to the same peer: just