aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src')
-rw-r--r--lib/diameter/src/Makefile22
-rw-r--r--lib/diameter/src/base/diameter.erl7
-rw-r--r--lib/diameter/src/base/diameter_capx.erl145
-rw-r--r--lib/diameter/src/base/diameter_codec.erl57
-rw-r--r--lib/diameter/src/base/diameter_config.erl49
-rw-r--r--lib/diameter/src/base/diameter_internal.hrl4
-rw-r--r--lib/diameter/src/base/diameter_peer.erl11
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl311
-rw-r--r--lib/diameter/src/base/diameter_reg.erl9
-rw-r--r--lib/diameter/src/base/diameter_service.erl2661
-rw-r--r--lib/diameter/src/base/diameter_stats.erl4
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl1647
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl293
-rw-r--r--lib/diameter/src/compiler/diameter_exprecs.erl60
-rw-r--r--lib/diameter/src/compiler/diameter_forms.hrl1
-rw-r--r--lib/diameter/src/dict/acct_rfc6733.dia72
-rw-r--r--lib/diameter/src/dict/base_rfc6733.dia415
-rw-r--r--lib/diameter/src/modules.mk9
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl9
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl121
20 files changed, 3306 insertions, 2601 deletions
diff --git a/lib/diameter/src/Makefile b/lib/diameter/src/Makefile
index a08c204a23..0e448c8912 100644
--- a/lib/diameter/src/Makefile
+++ b/lib/diameter/src/Makefile
@@ -1,7 +1,7 @@
#
# %CopyrightBegin%
#
-# Copyright Ericsson AB 2010-2012. All Rights Reserved.
+# Copyright Ericsson AB 2010-2013. All Rights Reserved.
#
# The contents of this file are subject to the Erlang Public License,
# Version 1.1, (the "License"); you may not use this file except in
@@ -119,7 +119,8 @@ ERL_COMPILE_FLAGS += \
# erl/hrl from dictionary file.
gen/diameter_gen_%.erl gen/diameter_gen_%.hrl: dict/%.dia
- $(dia_verbose)../bin/diameterc -o gen -i $(EBIN) $<
+ $(dia_verbose) \
+ ../bin/diameterc -o gen -i $(EBIN) $<
opt: $(TARGET_FILES)
@@ -134,11 +135,13 @@ debug:
# The dictionary parser.
gen/$(DICT_YRL).erl: compiler/$(DICT_YRL).yrl
- $(yecc_verbose)$(ERLC) -Werror -o $(@D) $<
+ $(yecc_verbose) \
+ $(ERLC) -Werror -o $(@D) $<
# Generate the app file.
$(APP_TARGET): $(APP_SRC) ../vsn.mk modules.mk
- $(gen_verbose)M=`echo $(notdir $(APP_MODULES)) | tr ' ' ,`; \
+ $(gen_verbose) \
+ M=`echo $(notdir $(APP_MODULES)) | tr ' ' ,`; \
R=`echo $(REGISTERED) | tr ' ' ,`; \
sed -e 's;%VSN%;$(VSN);' \
-e "s;%MODULES%;$$M;" \
@@ -146,7 +149,8 @@ $(APP_TARGET): $(APP_SRC) ../vsn.mk modules.mk
$< > $@
$(APPUP_TARGET): $(APPUP_SRC) ../vsn.mk
- $(vsn_verbose)sed -e 's;%VSN%;$(VSN);' $< > $@
+ $(vsn_verbose) \
+ sed -e 's;%VSN%;$(VSN);' $< > $@
app: $(APP_TARGET) $(APPUP_TARGET)
dict: $(DICT_ERLS)
@@ -251,7 +255,11 @@ gen/diameter_gen_base_accounting.erl gen/diameter_gen_relay.erl \
gen/diameter_gen_base_accounting.hrl gen/diameter_gen_relay.hrl: \
$(EBIN)/diameter_gen_base_rfc3588.$(EMULATOR)
-gen/diameter_gen_base_rfc3588.erl gen/diameter_gen_base_rfc3588.hrl: \
+gen/diameter_gen_acct_rfc6733.erl gen/diameter_gen_acct_rfc6733.hrl: \
+ $(EBIN)/diameter_gen_base_rfc6733.$(EMULATOR)
+
+gen/diameter_gen_base_rfc3588.erl gen/diameter_gen_base_rfc3588.hrl \
+gen/diameter_gen_base_rfc6733.erl gen/diameter_gen_base_rfc6733.hrl: \
$(COMPILER_MODULES:%=$(EBIN)/%.$(EMULATOR))
$(DICT_MODULES:gen/%=$(EBIN)/%.$(EMULATOR)): \
@@ -262,7 +270,7 @@ depend: depend.mk
# Generate dependencies makefile.
depend.mk: depend.sed $(MODULES:%=%.erl) Makefile
- $(gen_verbose)(for f in $(MODULES); do \
+ (for f in $(MODULES); do \
(echo $$f; cat $$f.erl) | sed -f $<; \
done) \
> $@
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl
index 8f9901907a..f563d244f6 100644
--- a/lib/diameter/src/base/diameter.erl
+++ b/lib/diameter/src/base/diameter.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -213,7 +213,7 @@ origin_state_id() ->
-> any().
call(SvcName, App, Message, Options) ->
- diameter_service:call(SvcName, {alias, App}, Message, Options).
+ diameter_traffic:send_request(SvcName, {alias, App}, Message, Options).
call(SvcName, App, Message) ->
call(SvcName, App, Message, []).
@@ -332,8 +332,9 @@ call(SvcName, App, Message) ->
| {capabilities_cb, evaluable()}
| {capx_timeout, 'Unsigned32'()}
| {disconnect_cb, evaluable()}
- | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}}
+ | {length_errors, exit | handle | discard}
| {reconnect_timer, 'Unsigned32'()}
+ | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}}
| {private, any()}.
%% Predicate passed to remove_transport/2
diff --git a/lib/diameter/src/base/diameter_capx.erl b/lib/diameter/src/base/diameter_capx.erl
index c6c3d2934d..715b15628c 100644
--- a/lib/diameter/src/base/diameter_capx.erl
+++ b/lib/diameter/src/base/diameter_capx.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -47,14 +47,13 @@
-module(diameter_capx).
--export([build_CER/1,
- recv_CER/2,
- recv_CEA/2,
+-export([build_CER/2,
+ recv_CER/3,
+ recv_CEA/3,
make_caps/2]).
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
--include("diameter_gen_base_rfc3588.hrl").
-define(SUCCESS, 2001). %% DIAMETER_SUCCESS
-define(NOAPP, 5010). %% DIAMETER_NO_COMMON_APPLICATION
@@ -67,27 +66,31 @@
-type tried(T) :: {ok, T} | {error, {term(), list()}}.
--spec build_CER(#diameter_caps{})
- -> tried(#diameter_base_CER{}).
+-spec build_CER(#diameter_caps{}, module())
+ -> tried(CER)
+ when CER :: tuple().
-build_CER(Caps) ->
- try_it([fun bCER/1, Caps]).
+build_CER(Caps, Dict) ->
+ try_it([fun bCER/2, Caps, Dict]).
--spec recv_CER(#diameter_base_CER{}, #diameter_service{})
+-spec recv_CER(CER, #diameter_service{}, module())
-> tried({[diameter:'Unsigned32'()],
#diameter_caps{},
- #diameter_base_CEA{}}).
+ CEA})
+ when CER :: tuple(),
+ CEA :: tuple().
-recv_CER(CER, Svc) ->
- try_it([fun rCER/2, CER, Svc]).
+recv_CER(CER, Svc, Dict) ->
+ try_it([fun rCER/3, CER, Svc, Dict]).
--spec recv_CEA(#diameter_base_CEA{}, #diameter_service{})
+-spec recv_CEA(CEA, #diameter_service{}, module())
-> tried({[diameter:'Unsigned32'()],
[diameter:'Unsigned32'()],
- #diameter_caps{}}).
+ #diameter_caps{}})
+ when CEA :: tuple().
-recv_CEA(CEA, Svc) ->
- try_it([fun rCEA/2, CEA, Svc]).
+recv_CEA(CEA, Svc, Dict) ->
+ try_it([fun rCEA/3, CEA, Svc, Dict]).
make_caps(Caps, Opts) ->
try_it([fun mk_caps/2, Caps, Opts]).
@@ -161,16 +164,17 @@ ipaddr(A) ->
?THROW(T)
end.
-%% bCER/1
+%% bCER/2
%%
%% Build a CER record to send to a remote peer.
%% Use the fact that diameter_caps has the same field names as CER.
-bCER(#diameter_caps{} = Rec) ->
- #diameter_base_CER{}
- = list_to_tuple([diameter_base_CER | tl(tuple_to_list(Rec))]).
+bCER(#diameter_caps{} = Rec, Dict) ->
+ Values = lists:zip(Dict:'#info-'(diameter_base_CER, fields),
+ tl(tuple_to_list(Rec))),
+ Dict:'#new-'(diameter_base_CER, Values).
-%% rCER/2
+%% rCER/3
%%
%% Build a CEA record to send to a remote peer in response to an
%% incoming CER. RFC 3588 gives no guidance on what should be sent
@@ -214,12 +218,9 @@ bCER(#diameter_caps{} = Rec) ->
%% TLS 1
%% This node supports TLS security, as defined by [TLS].
-rCER(CER, #diameter_service{capabilities = LCaps} = Svc) ->
- #diameter_base_CEA{}
- = CEA
- = cea_from_cer(bCER(LCaps)),
-
- RCaps = capx_to_caps(CER),
+rCER(CER, #diameter_service{capabilities = LCaps} = Svc, Dict) ->
+ CEA = cea_from_cer(bCER(LCaps, Dict), Dict),
+ RCaps = capx_to_caps(CER, Dict),
SApps = common_applications(LCaps, RCaps, Svc),
{SApps,
@@ -227,17 +228,18 @@ rCER(CER, #diameter_service{capabilities = LCaps} = Svc) ->
build_CEA(SApps,
LCaps,
RCaps,
- CEA#diameter_base_CEA{'Result-Code' = ?SUCCESS})}.
+ Dict,
+ Dict:'#set-'({'Result-Code', ?SUCCESS}, CEA))}.
-build_CEA([], _, _, CEA) ->
- CEA#diameter_base_CEA{'Result-Code' = ?NOAPP};
+build_CEA([], _, _, Dict, CEA) ->
+ Dict:'#set-'({'Result-Code', ?NOAPP}, CEA);
-build_CEA(_, LCaps, RCaps, CEA) ->
+build_CEA(_, LCaps, RCaps, Dict, CEA) ->
case common_security(LCaps, RCaps) of
[] ->
- CEA#diameter_base_CEA{'Result-Code' = ?NOSECURITY};
+ Dict:'#set-'({'Result-Code', ?NOSECURITY}, CEA);
[_] = IS ->
- CEA#diameter_base_CEA{'Inband-Security-Id' = IS}
+ Dict:'#set-'({'Inband-Security-Id', IS}, CEA)
end.
%% common_security/2
@@ -275,46 +277,41 @@ cs(LS, RS) ->
%% practice something there may be a need for more synchronization
%% than notification by way of an event subscription offers.
-%% cea_from_cer/1
+%% cea_from_cer/2
%% CER is a subset of CEA, the latter adding Result-Code and a few
%% more AVP's.
-cea_from_cer(#diameter_base_CER{} = CER) ->
- lists:foldl(fun(F,A) -> to_cea(CER, F, A) end,
- #diameter_base_CEA{},
- record_info(fields, diameter_base_CER)).
-
-to_cea(CER, Field, CEA) ->
- try ?BASE:'#get-'(Field, CER) of
- V -> ?BASE:'#set-'({Field, V}, CEA)
- catch
- error: _ -> CEA
- end.
-
-%% rCEA/2
+cea_from_cer(CER, Dict) ->
+ [diameter_base_CER | Values] = Dict:'#get-'(CER),
+ Dict:'#set-'(Values, Dict:'#new-'(diameter_base_CEA)).
-rCEA(CEA, #diameter_service{capabilities = LCaps} = Svc) ->
- RCaps = capx_to_caps(CEA),
+%% rCEA/3
+
+rCEA(CEA, #diameter_service{capabilities = LCaps} = Svc, Dict) ->
+ RCaps = capx_to_caps(CEA, Dict),
SApps = common_applications(LCaps, RCaps, Svc),
IS = common_security(LCaps, RCaps),
{SApps, IS, RCaps}.
-%% capx_to_caps/1
-
-capx_to_caps(#diameter_base_CEA{'Origin-Host' = OH,
- 'Origin-Realm' = OR,
- 'Host-IP-Address' = IP,
- 'Vendor-Id' = VId,
- 'Product-Name' = PN,
- 'Origin-State-Id' = OSI,
- 'Supported-Vendor-Id' = SV,
- 'Auth-Application-Id' = Auth,
- 'Inband-Security-Id' = IS,
- 'Acct-Application-Id' = Acct,
- 'Vendor-Specific-Application-Id' = VSA,
- 'Firmware-Revision' = FR,
- 'AVP' = X}) ->
+%% capx_to_caps/2
+
+capx_to_caps(CEX, Dict) ->
+ [OH, OR, IP, VId, PN, OSI, SV, Auth, IS, Acct, VSA, FR, X]
+ = Dict:'#get-'(['Origin-Host',
+ 'Origin-Realm',
+ 'Host-IP-Address',
+ 'Vendor-Id',
+ 'Product-Name',
+ 'Origin-State-Id',
+ 'Supported-Vendor-Id',
+ 'Auth-Application-Id',
+ 'Inband-Security-Id',
+ 'Acct-Application-Id',
+ 'Vendor-Specific-Application-Id',
+ 'Firmware-Revision',
+ 'AVP'],
+ CEX),
#diameter_caps{origin_host = OH,
origin_realm = OR,
vendor_id = VId,
@@ -327,10 +324,7 @@ capx_to_caps(#diameter_base_CEA{'Origin-Host' = OH,
acct_application_id = Acct,
vendor_specific_application_id = VSA,
firmware_revision = FR,
- avp = X};
-
-capx_to_caps(#diameter_base_CER{} = CER) ->
- capx_to_caps(cea_from_cer(CER)).
+ avp = X}.
%% ---------------------------------------------------------------------------
%% ---------------------------------------------------------------------------
@@ -365,13 +359,12 @@ app_union(#diameter_caps{auth_application_id = U,
vendor_specific_application_id = V}) ->
set_list(U ++ C ++ lists:flatmap(fun vsa_apps/1, V)).
-vsa_apps(#'diameter_base_Vendor-Specific-Application-Id'
- {'Auth-Application-Id' = U,
- 'Acct-Application-Id' = C}) ->
- U ++ C;
-vsa_apps(L) ->
- Rec = ?BASE:'#new-'('diameter_base_Vendor-Specific-Application-Id', L),
- vsa_apps(Rec).
+vsa_apps([_ | [_,_] = Ids]) ->
+ lists:append(Ids);
+vsa_apps(Rec)
+ when is_tuple(Rec) ->
+ [_|T] = tuple_to_list(Rec),
+ vsa_apps(T).
%% It's a configuration error for a locally advertised application not
%% to be represented in Apps. Don't just match on lists:keyfind/3 in
diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl
index 0b0bfe3f0a..e446a0209c 100644
--- a/lib/diameter/src/base/diameter_codec.erl
+++ b/lib/diameter/src/base/diameter_codec.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -26,7 +26,7 @@
decode_header/1,
sequence_numbers/1,
hop_by_hop_id/2,
- msg_name/1,
+ msg_name/2,
msg_id/1]).
%% Towards generated encoders (from diameter_gen.hrl).
@@ -99,13 +99,13 @@ e(_, #diameter_packet{msg = [#diameter_header{} = Hdr | As]} = Pkt) ->
Eid:32,
Avps/binary>>};
-e(Mod0, #diameter_packet{header = Hdr, msg = Msg} = Pkt) ->
+e(Mod, #diameter_packet{header = Hdr, msg = Msg} = Pkt) ->
#diameter_header{version = Vsn,
hop_by_hop_id = Hid,
end_to_end_id = Eid}
= Hdr,
- {Mod, MsgName} = rec2msg(Mod0, Msg),
+ MsgName = rec2msg(Mod, Msg),
{Code, Flags0, Aid} = msg_header(Mod, MsgName, Hdr),
Flags = make_flags(Flags0, Hdr),
@@ -192,11 +192,11 @@ encode_avps(Avps) ->
%% msg_header/3
msg_header(Mod, 'answer-message' = MsgName, Header) ->
- ?BASE = Mod,
+ 0 = Mod:id(), %% assert
#diameter_header{application_id = Aid,
cmd_code = Code}
= Header,
- {-1, Flags, ?DIAMETER_APP_ID_COMMON} = ?BASE:msg_header(MsgName),
+ {-1, Flags, ?DIAMETER_APP_ID_COMMON} = Mod:msg_header(MsgName),
{Code, Flags, Aid};
msg_header(Mod, MsgName, _) ->
@@ -204,22 +204,12 @@ msg_header(Mod, MsgName, _) ->
%% rec2msg/2
-rec2msg(_, ['answer-message' = M | _]) ->
- {?BASE, M};
-
-rec2msg(Mod, [MsgName|_])
- when is_atom(MsgName) ->
- {Mod, MsgName};
+rec2msg(_, [Name|_])
+ when is_atom(Name) ->
+ Name;
rec2msg(Mod, Rec) ->
- R = element(1, Rec),
- A = 'answer-message',
- case ?BASE:msg2rec(A) of
- R ->
- {?BASE, A};
- _ ->
- {Mod, Mod:rec2msg(R)}
- end.
+ Mod:rec2msg(element(1, Rec)).
%%% ---------------------------------------------------------------------------
%%% # decode/2
@@ -243,20 +233,19 @@ decode(?APP_ID_RELAY, _, #diameter_packet{} = Pkt) ->
end;
%% Otherwise decode using the dictionary.
-decode(_, Mod, #diameter_packet{header = Hdr} = Pkt)
- when is_atom(Mod) ->
+decode(_, Mod, #diameter_packet{header = Hdr} = Pkt) ->
#diameter_header{cmd_code = CmdCode,
is_request = IsRequest,
is_error = IsError}
= Hdr,
- {M, MsgName} = if IsError andalso not IsRequest ->
- {?BASE, 'answer-message'};
- true ->
- {Mod, Mod:msg_name(CmdCode, IsRequest)}
- end,
+ MsgName = if IsError andalso not IsRequest ->
+ 'answer-message';
+ true ->
+ Mod:msg_name(CmdCode, IsRequest)
+ end,
- decode_avps(MsgName, M, Pkt, collect_avps(Pkt));
+ decode_avps(MsgName, Mod, Pkt, collect_avps(Pkt));
decode(Id, Mod, Bin)
when is_bitstring(Bin) ->
@@ -360,15 +349,15 @@ hop_by_hop_id(Id, <<H:12/binary, _:32, T/binary>>) ->
<<H/binary, Id:32, T/binary>>.
%%% ---------------------------------------------------------------------------
-%%% # msg_name/1
+%%% # msg_name/2
%%% ---------------------------------------------------------------------------
-msg_name(#diameter_header{application_id = ?APP_ID_COMMON,
- cmd_code = C,
- is_request = R}) ->
- ?BASE:msg_name(C,R);
+msg_name(Dict0, #diameter_header{application_id = ?APP_ID_COMMON,
+ cmd_code = C,
+ is_request = R}) ->
+ Dict0:msg_name(C,R);
-msg_name(Hdr) ->
+msg_name(_, Hdr) ->
msg_id(Hdr).
%% Note that messages in different applications could have the same
diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl
index 63d28f25a2..1486071573 100644
--- a/lib/diameter/src/base/diameter_config.erl
+++ b/lib/diameter/src/base/diameter_config.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -113,15 +113,22 @@
-define(VALUES(Rec), tl(tuple_to_list(Rec))).
+%% The RFC 3588 common dictionary is used to validate capabilities
+%% configuration. That a given transport may use the RFC 6733
+%% dictionary is of no consequence.
+-define(BASE, diameter_gen_base_rfc3588).
+
%%% The return values below assume the server diameter_config is started.
%%% The functions will exit if it isn't.
%% --------------------------------------------------------------------------
-%% # start_service(SvcName, Opts)
-%%
-%% Output: ok | {error, Reason}
+%% # start_service/2
%% --------------------------------------------------------------------------
+-spec start_service(diameter:service_name(), [diameter:service_opt()])
+ -> ok
+ | {error, term()}.
+
start_service(SvcName, Opts)
when is_list(Opts) ->
start_rc(sync(SvcName, {start_service, SvcName, Opts})).
@@ -134,22 +141,23 @@ start_rc(timeout) ->
{error, application_not_started}.
%% --------------------------------------------------------------------------
-%% # stop_service(SvcName)
-%%
-%% Output: ok
+%% # stop_service/1
%% --------------------------------------------------------------------------
+-spec stop_service(diameter:service_name())
+ -> ok.
+
stop_service(SvcName) ->
sync(SvcName, {stop_service, SvcName}).
%% --------------------------------------------------------------------------
-%% # add_transport(SvcName, {Type, Opts})
-%%
-%% Input: Type = connect | listen
-%%
-%% Output: {ok, Ref} | {error, Reason}
+%% # add_transport/2
%% --------------------------------------------------------------------------
+-spec add_transport(diameter:service_name(), {connect|listen, [diameter:transport_opt()]})
+ -> {ok, diameter:transport_ref()}
+ | {error, term()}.
+
add_transport(SvcName, {T, Opts})
when is_list(Opts), (T == connect orelse T == listen) ->
sync(SvcName, {add, SvcName, T, Opts}).
@@ -171,6 +179,10 @@ add_transport(SvcName, {T, Opts})
%% Output: ok | {error, Reason}
%% --------------------------------------------------------------------------
+-spec remove_transport(diameter:service_name(), diameter:transport_pred())
+ -> ok
+ | {error, term()}.
+
remove_transport(SvcName, Pred) ->
try
sync(SvcName, {remove, SvcName, pred(Pred)})
@@ -473,6 +485,10 @@ stop(SvcName) ->
%% add/3
+%% Can't check for a single common dictionary since a transport may
+%% restrict applications so that that there's one while the service
+%% has many.
+
add(SvcName, Type, Opts) ->
%% Ensure usable capabilities. diameter_service:merge_service/2
%% depends on this.
@@ -545,7 +561,7 @@ make_config(SvcName, Opts) ->
[] == Apps andalso ?THROW(no_apps),
%% Use the fact that diameter_caps has the same field names as CER.
- Fields = diameter_gen_base_rfc3588:'#info-'(diameter_base_CER) -- ['AVP'],
+ Fields = ?BASE:'#info-'(diameter_base_CER) -- ['AVP'],
COpts = [T || {K,_} = T <- Opts, lists:member(K, Fields)],
Caps = make_caps(#diameter_caps{}, COpts),
@@ -608,14 +624,14 @@ opt(sequence = K, F) ->
E:R ->
?THROW({value, {K, E, R, ?STACK}})
end;
-
+
opt(K, _) ->
?THROW({value, K}).
sequence({H,N} = T)
when 0 =< N, N =< 32, 0 =< H, 0 == H bsr N ->
T;
-
+
sequence(_) ->
?THROW({value, sequence}).
@@ -629,7 +645,8 @@ make_caps(Caps, Opts) ->
%% Validate types by encoding a CER.
encode_CER(Opts) ->
- {ok, CER} = diameter_capx:build_CER(make_caps(?EXAMPLE_CAPS, Opts)),
+ {ok, CER} = diameter_capx:build_CER(make_caps(?EXAMPLE_CAPS, Opts),
+ ?BASE),
Hdr = #diameter_header{version = ?DIAMETER_VERSION,
end_to_end_id = 0,
diff --git a/lib/diameter/src/base/diameter_internal.hrl b/lib/diameter/src/base/diameter_internal.hrl
index 63b35550a8..4b672aa071 100644
--- a/lib/diameter/src/base/diameter_internal.hrl
+++ b/lib/diameter/src/base/diameter_internal.hrl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -58,8 +58,6 @@
-define(APP_ID_COMMON, 0).
-define(APP_ID_RELAY, 16#FFFFFFFF).
--define(BASE, diameter_gen_base_rfc3588).
-
%%% ---------------------------------------------------------
%%% RFC 3588, ch 2.6 Peer table
diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl
index 25c9eab4cb..130bedda84 100644
--- a/lib/diameter/src/base/diameter_peer.erl
+++ b/lib/diameter/src/base/diameter_peer.erl
@@ -123,7 +123,7 @@ pair([_ | Rest], Mods, Acc) ->
pair(Rest, Mods, Acc);
%% No transport_module or transport_config: defaults.
-pair([], [], []) ->
+pair([], [], []) ->
[{[?DEFAULT_TMOD], ?DEFAULT_TCFG, ?DEFAULT_TTMO}];
%% One transport_module, one transport_config.
@@ -272,7 +272,7 @@ handle_cast(Msg, State) ->
%% Remote service is distributing a message.
handle_info({notify, SvcName, T}, S) ->
- bang(diameter_service:whois(SvcName), T),
+ diameter_service:notify(SvcName, T),
{noreply, S};
handle_info(Info, State) ->
@@ -304,13 +304,6 @@ code_change(_OldVsn, State, _Extra) ->
ifc_send(Pid, T) ->
Pid ! {diameter, T}.
-%% bang/2
-
-bang(undefined = No, _) ->
- No;
-bang(Pid, T) ->
- Pid ! T.
-
%% call/1
call(Request) ->
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 5dab6214b1..66342f7b62 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -18,10 +18,10 @@
%%
%%
-%% This module implements (as a process) the RFC 3588 Peer State
+%% This module implements (as a process) the RFC 3588/6733 Peer State
%% Machine modulo the necessity of adapting the peer election to the
-%% fact that we don't know the identity of a peer until we've
-%% received a CER/CEA from it.
+%% fact that we don't know the identity of a peer until we've received
+%% a CER/CEA from it.
%%
-module(diameter_peer_fsm).
@@ -46,16 +46,19 @@
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
--include("diameter_gen_base_rfc3588.hrl").
%% Values of Disconnect-Cause in DPR.
--define(GOAWAY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_DO_NOT_WANT_TO_TALK_TO_YOU').
--define(REBOOT, ?'DIAMETER_BASE_DISCONNECT-CAUSE_REBOOTING').
--define(BUSY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_BUSY').
+-define(GOAWAY, 2). %% DO_NOT_WANT_TO_TALK_TO_YOU
+-define(BUSY, 1). %% BUSY
+-define(REBOOT, 0). %% REBOOTING
+%% Values of Inband-Security-Id.
-define(NO_INBAND_SECURITY, 0).
-define(TLS, 1).
+%% Note that the a common dictionary hrl is purposely not included
+%% since the common dictionary is an argument to start/3.
+
%% Keys in process dictionary.
-define(CB_KEY, cb). %% capabilities callback
-define(DPR_KEY, dpr). %% disconnect callback
@@ -100,11 +103,13 @@
| {'Wait-CEA', uint32(), uint32()}
| 'Open',
mode :: accept | connect | {connect, reference()},
- parent :: pid(), %% watchdog process
- transport :: pid(), %% transport process
+ parent :: pid(), %% watchdog process
+ transport :: pid(), %% transport process
+ dictionary :: module(), %% common dictionary
service :: #diameter_service{},
- dpr = false :: false | {uint32(), uint32()}}).
+ dpr = false :: false | {uint32(), uint32()},
%% | hop by hop and end to end identifiers
+ length_errors :: exit | handle | discard}).
%% There are non-3588 states possible as a consequence of 5.6.1 of the
%% standard and the corresponding problem for incoming CEA's: we don't
@@ -126,24 +131,18 @@
%% State Machine rather than closer to the transport. This is what we
%% now do below: connect/accept call diameter_watchdog and return the
%% pid of the watchdog process, and the watchdog in turn calls start/3
-%% below to start the process implementing the Peer State Machine. The
-%% former is a "peer" in diameter_service while the latter is a
-%% "conn". In a sense, diameter_service sees the watchdog as
-%% implementing the Peer State Machine and the process implemented
-%% here as being the transport, not being aware of the watchdog at
-%% all.
+%% below to start the process implementing the Peer State Machine.
%%
-%%% ---------------------------------------------------------------------------
-%%% # start({connect|accept, Ref}, Opts, Service)
-%%%
-%%% Output: Pid
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # start/3
+%% ---------------------------------------------------------------------------
-spec start(T, [Opt], {diameter:sequence(),
- diameter:restriction(),
+ [node()],
+ module(),
#diameter_service{}})
- -> pid()
+ -> {reference(), pid()}
when T :: {connect|accept, diameter:transport_ref()},
Opt :: diameter:transport_opt().
@@ -152,9 +151,15 @@
%% specified on the transport in question. Check here that the list is
%% still non-empty.
-start({_,_} = Type, Opts, MS) ->
- {ok, Pid} = diameter_peer_fsm_sup:start_child({self(), Type, Opts, MS}),
- Pid.
+start({_,_} = Type, Opts, S) ->
+ Ack = make_ref(),
+ T = {Ack, self(), Type, Opts, S},
+ {ok, Pid} = diameter_peer_fsm_sup:start_child(T),
+ try
+ {erlang:monitor(process, Pid), Pid}
+ after
+ Pid ! Ack
+ end.
start_link(T) ->
{ok, _} = proc_lib:start_link(?MODULE,
@@ -163,8 +168,8 @@ start_link(T) ->
infinity,
diameter_lib:spawn_opts(server, [])).
-%%% ---------------------------------------------------------------------------
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
%% init/1
@@ -172,12 +177,14 @@ init(T) ->
proc_lib:init_ack({ok, self()}),
gen_server:enter_loop(?MODULE, [], i(T)).
-i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps,
- capabilities = LCaps}
- = Svc}}) ->
- [] /= Apps orelse ?ERROR({no_apps, T, Opts}),
+i({Ack, WPid, {M, Ref} = T, Opts, {Mask,
+ Nodes,
+ Dict0,
+ #diameter_service{capabilities = LCaps}
+ = Svc}}) ->
+ erlang:monitor(process, WPid),
+ wait(Ack, WPid),
putr(?DWA_KEY, dwa(LCaps)),
- {M, Ref} = T,
diameter_stats:reg(Ref),
{[Cs,Ds], Rest} = proplists:split(Opts, [capabilities_cb, disconnect_cb]),
putr(?CB_KEY, {Ref, [F || {_,F} <- Cs]}),
@@ -185,23 +192,39 @@ i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps,
putr(?REF_KEY, Ref),
putr(?SEQUENCE_KEY, Mask),
putr(?RESTRICT_KEY, Nodes),
- erlang:monitor(process, WPid),
- {TPid, Addrs} = start_transport(T, Rest, Svc),
+
Tmo = proplists:get_value(capx_timeout, Opts, ?EVENT_TIMEOUT),
?IS_TIMEOUT(Tmo) orelse ?ERROR({invalid, {capx_timeout, Tmo}}),
+ OnLengthErr = proplists:get_value(length_errors, Opts, exit),
+ lists:member(OnLengthErr, [exit, handle, discard])
+ orelse ?ERROR({invalid, {length_errors, OnLengthErr}}),
+
+ {TPid, Addrs} = start_transport(T, Rest, Svc),
+
#state{state = {'Wait-Conn-Ack', Tmo},
parent = WPid,
transport = TPid,
+ dictionary = Dict0,
mode = M,
- service = svc(Svc, Addrs)}.
+ service = svc(Svc, Addrs),
+ length_errors = OnLengthErr}.
%% The transport returns its local ip addresses so that different
%% transports on the same service can use different local addresses.
%% The local addresses are put into Host-IP-Address avps here when
%% sending capabilities exchange messages.
%%
%% Invalid transport config may cause us to crash but note that the
-%% watchdog start (start/2) succeeds regardless so as not to crash the
-%% service.
+%% watchdog start (start/2) succeeds regardless.
+
+%% Wait for the caller to have a monitor to avoid a race with our
+%% death. (Since the exit reason is used in diameter_service.)
+wait(Ref, Pid) ->
+ receive
+ Ref ->
+ ok;
+ {'DOWN', _, process, Pid, _} = D ->
+ exit({shutdown, D})
+ end.
start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) ->
Addrs0 = LCaps#diameter_caps.host_ip_address,
@@ -274,13 +297,12 @@ handle_info(T, #state{} = State) ->
{noreply, S};
{stop, Reason} ->
?LOG(stop, Reason),
- x(Reason, State);
+ {stop, {shutdown, Reason}, State};
stop ->
?LOG(stop, T),
- x(T, State)
+ {stop, {shutdown, T}, State}
catch
exit: {diameter_codec, encode, _} = Reason ->
- close_wd(Reason, State#state.parent),
?LOG(stop, Reason),
%% diameter_codec:encode/2 emits an error report. Only
%% indicate the probable reason here.
@@ -300,10 +322,6 @@ handle_info(T, #state{} = State) ->
%% succesfully encoded. It's not checked at diameter:add_transport/2
%% since this can be called before creating the service.
-x(Reason, #state{} = S) ->
- close_wd(Reason, S),
- {stop, {shutdown, Reason}, S}.
-
%% terminate/2
terminate(_, _) ->
@@ -314,8 +332,8 @@ terminate(_, _) ->
code_change(_, State, _) ->
{ok, State}.
-%%% ---------------------------------------------------------------------------
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
putr(Key, Val) ->
put({?MODULE, Key}, Val).
@@ -378,9 +396,8 @@ transition({diameter, {recv, Pkt}}, S) ->
recv(Pkt, S);
%% Timeout when still in the same state ...
-transition({timeout = T, PS}, #state{state = PS} = S) ->
- close({capx(PS), T}, S),
- stop;
+transition({timeout = T, PS}, #state{state = PS}) ->
+ {stop, {capx(PS), T}};
%% ... or not.
transition({timeout, _}, _) ->
@@ -393,8 +410,6 @@ transition({send, Msg}, #state{transport = TPid}) ->
%% Request for graceful shutdown at remove_transport, stop_service of
%% application shutdown.
-transition({shutdown = T, Pid}, S) ->
- transition({T, Pid, transport}, S);
transition({shutdown, Pid, Reason}, #state{parent = Pid, dpr = false} = S) ->
dpr(Reason, S);
transition({shutdown, Pid, _}, #state{parent = Pid}) ->
@@ -454,18 +469,19 @@ start_next(#state{service = Svc0} = S) ->
send_CER(#state{state = {'Wait-Conn-Ack', Tmo},
mode = {connect, Remote},
service = #diameter_service{capabilities = LCaps},
- transport = TPid}
+ transport = TPid,
+ dictionary = Dict}
= S) ->
OH = LCaps#diameter_caps.origin_host,
req_send_CER(OH, Remote)
orelse
- close({already_connected, Remote, LCaps}, S),
+ close({already_connected, Remote, LCaps}),
CER = build_CER(S),
?LOG(send, 'CER'),
#diameter_packet{header = #diameter_header{end_to_end_id = Eid,
hop_by_hop_id = Hid}}
= Pkt
- = encode(CER),
+ = encode(CER, Dict),
send(TPid, Pkt),
start_timer(Tmo, S#state{state = {'Wait-CEA', Hid, Eid}}).
@@ -487,42 +503,29 @@ start_timer(Tmo, #state{state = PS} = S) ->
%% build_CER/1
-build_CER(#state{service = #diameter_service{capabilities = LCaps}}) ->
- {ok, CER} = diameter_capx:build_CER(LCaps),
+build_CER(#state{service = #diameter_service{capabilities = LCaps},
+ dictionary = Dict}) ->
+ {ok, CER} = diameter_capx:build_CER(LCaps, Dict),
CER.
-%% encode/1
+%% encode/2
-encode(Rec) ->
+encode(Rec, Dict) ->
Seq = diameter_session:sequence({_,_} = getr(?SEQUENCE_KEY)),
Hdr = #diameter_header{version = ?DIAMETER_VERSION,
end_to_end_id = Seq,
hop_by_hop_id = Seq},
- diameter_codec:encode(?BASE, #diameter_packet{header = Hdr,
- msg = Rec}).
+ diameter_codec:encode(Dict, #diameter_packet{header = Hdr,
+ msg = Rec}).
%% recv/2
-%% RFC 3588 has result code 5015 for an invalid length but if a
-%% transport is detecting message boundaries using the length header
-%% then a length error will likely lead to further errors.
-
-recv(#diameter_packet{header = #diameter_header{length = Len}
- = Hdr,
- bin = Bin},
- S)
- when Len < 20;
- (0 /= Len rem 4 orelse bit_size(Bin) /= 8*Len) ->
- discard(invalid_message_length, recv, [size(Bin),
- bit_size(Bin) rem 8,
- Hdr,
- S]);
-
recv(#diameter_packet{header = #diameter_header{} = Hdr}
= Pkt,
- #state{parent = Pid}
+ #state{parent = Pid,
+ dictionary = Dict0}
= S) ->
- Name = diameter_codec:msg_name(Hdr),
+ Name = diameter_codec:msg_name(Dict0, Hdr),
Pid ! {recv, self(), Name, Pkt},
diameter_stats:incr({msg_id(Name, Hdr), recv}), %% count received
rcv(Name, Pkt, S);
@@ -531,29 +534,52 @@ recv(#diameter_packet{header = undefined,
bin = Bin}
= Pkt,
S) ->
- recv(Pkt#diameter_packet{header = diameter_codec:decode_header(Bin)}, S);
+ recv(diameter_codec:decode_header(Bin), Pkt, S);
-recv(Bin, S)
- when is_binary(Bin) ->
- recv(#diameter_packet{bin = Bin}, S);
+recv(Bin, S) ->
+ recv(#diameter_packet{bin = Bin}, S).
-recv(#diameter_packet{header = false} = Pkt, S) ->
- discard(truncated_header, recv, [Pkt, S]).
+%% recv/3
-msg_id({_,_,_} = T, _) ->
- T;
-msg_id(_, Hdr) ->
- diameter_codec:msg_id(Hdr).
+recv(#diameter_header{length = Len}
+ = H,
+ #diameter_packet{bin = Bin}
+ = Pkt,
+ #state{length_errors = E}
+ = S)
+ when E == handle;
+ 0 == Len rem 4, bit_size(Bin) == 8*Len ->
+ recv(Pkt#diameter_packet{header = H}, S);
+
+recv(#diameter_header{}
+ = H,
+ #diameter_packet{bin = Bin},
+ #state{length_errors = E}
+ = S) ->
+ invalid(E,
+ invalid_message_length,
+ recv,
+ [size(Bin), bit_size(Bin) rem 8, H, S]);
-%% Treat invalid length as a transport error and die. Especially in
-%% the TCP case, in which there's no telling where the next message
-%% begins in the incoming byte stream, keeping a crippled connection
-%% alive may just make things worse.
+recv(false, Pkt, #state{length_errors = E} = S) ->
+ invalid(E, truncated_header, recv, [Pkt, S]).
-discard(Reason, F, A) ->
+%% Note that counters here only count discarded messages.
+invalid(E, Reason, F, A) ->
diameter_stats:incr(Reason),
+ abort(E, Reason, F, A).
+
+abort(exit, Reason, F, A) ->
diameter_lib:warning_report(Reason, {?MODULE, F, A}),
- throw({?MODULE, abort, Reason}).
+ throw({?MODULE, abort, Reason});
+
+abort(_, _, _, _) ->
+ ok.
+
+msg_id({_,_,_} = T, _) ->
+ T;
+msg_id(_, Hdr) ->
+ {_,_,_} = diameter_codec:msg_id(Hdr).
%% rcv/3
@@ -607,13 +633,13 @@ send(Pid, Msg) ->
%% handle_request/3
-handle_request(Type, #diameter_packet{} = Pkt, S) ->
+handle_request(Type, #diameter_packet{} = Pkt, #state{dictionary = D} = S) ->
?LOG(recv, Type),
- send_answer(Type, diameter_codec:decode(?BASE, Pkt), S).
+ send_answer(Type, diameter_codec:decode(D, Pkt), S).
%% send_answer/3
-send_answer(Type, ReqPkt, #state{transport = TPid} = S) ->
+send_answer(Type, ReqPkt, #state{transport = TPid, dictionary = Dict} = S) ->
#diameter_packet{header = H,
transport_data = TD}
= ReqPkt,
@@ -630,13 +656,15 @@ send_answer(Type, ReqPkt, #state{transport = TPid} = S) ->
msg = Msg,
transport_data = TD},
- send(TPid, diameter_codec:encode(?BASE, Pkt)),
+ send(TPid, diameter_codec:encode(Dict, Pkt)),
eval(PostF, S).
eval([F|A], S) ->
apply(F, A ++ [S]);
eval(ok, S) ->
- S.
+ S;
+eval(T, _) ->
+ close(T).
%% build_answer/3
@@ -647,11 +675,11 @@ build_answer('CER',
is_error = false},
errors = []}
= Pkt,
- S) ->
- {SupportedApps, RCaps, #diameter_base_CEA{'Result-Code' = RC,
- 'Inband-Security-Id' = IS}
- = CEA}
- = recv_CER(CER, S),
+ #state{dictionary = Dict0}
+ = S) ->
+ {SupportedApps, RCaps, CEA} = recv_CER(CER, S),
+
+ [RC, IS] = Dict0:'#get-'(['Result-Code', 'Inband-Security-Id'], CEA),
#diameter_caps{origin_host = {OH, DH}}
= Caps
@@ -664,10 +692,10 @@ build_answer('CER',
orelse ?THROW(4003), %% DIAMETER_ELECTION_LOST
caps_cb(Caps)
of
- N -> {cea(CEA, N), [fun open/5, Pkt,
- SupportedApps,
- Caps,
- {accept, hd([_] = IS)}]}
+ N -> {cea(CEA, N, Dict0), [fun open/5, Pkt,
+ SupportedApps,
+ Caps,
+ {accept, hd([_] = IS)}]}
catch
?FAILURE(Reason) ->
rejected(Reason, {'CER', Reason, Caps, Pkt}, S)
@@ -684,25 +712,25 @@ build_answer(Type,
RC = rc(H, Es),
{answer(Type, RC, Es, S), post(Type, RC, Pkt, S)}.
-cea(CEA, ok) ->
+cea(CEA, ok, _) ->
CEA;
-cea(CEA, 2001) ->
+cea(CEA, 2001, _) ->
CEA;
-cea(CEA, RC) ->
- CEA#diameter_base_CEA{'Result-Code' = RC}.
+cea(CEA, RC, Dict0) ->
+ Dict0:'#set-'({'Result-Code', RC}, CEA).
post('CER' = T, RC, Pkt, S) ->
- [fun close/2, {T, caps(S), {RC, Pkt}}];
+ {T, caps(S), {RC, Pkt}};
post(_, _, _, _) ->
ok.
rejected({capabilities_cb, _F, Reason}, T, S) ->
rejected(Reason, T, S);
-rejected(discard, T, S) ->
- close(T, S);
+rejected(discard, T, _) ->
+ close(T);
rejected({N, Es}, T, S) ->
- {answer('CER', N, Es, S), [fun close/2, T]};
+ {answer('CER', N, Es, S), T};
rejected(N, T, S) ->
rejected({N, []}, T, S).
@@ -728,7 +756,7 @@ is_origin({N, _}) ->
orelse N == 'Origin-State-Id'.
%% failed_avp/1
-
+
failed_avp([] = No) ->
No;
failed_avp(Avps) ->
@@ -817,22 +845,23 @@ a('DPR', #diameter_caps{origin_host = {Host, _},
%% recv_CER/2
-recv_CER(CER, #state{service = Svc}) ->
- {ok, T} = diameter_capx:recv_CER(CER, Svc),
+recv_CER(CER, #state{service = Svc, dictionary = Dict}) ->
+ {ok, T} = diameter_capx:recv_CER(CER, Svc, Dict),
T.
%% handle_CEA/1
handle_CEA(#diameter_packet{bin = Bin}
= Pkt,
- #state{service = #diameter_service{capabilities = LCaps}}
+ #state{dictionary = Dict0,
+ service = #diameter_service{capabilities = LCaps}}
= S)
when is_binary(Bin) ->
?LOG(recv, 'CEA'),
#diameter_packet{msg = CEA}
= DPkt
- = diameter_codec:decode(?BASE, Pkt),
+ = diameter_codec:decode(Dict0, Pkt),
{SApps, IS, RCaps} = recv_CEA(DPkt, S),
@@ -840,8 +869,7 @@ handle_CEA(#diameter_packet{bin = Bin}
= Caps
= capz(LCaps, RCaps),
- #diameter_base_CEA{'Result-Code' = RC}
- = CEA,
+ RC = Dict0:'#get-'('Result-Code', CEA),
%% Ensure that we don't already have a connection to the peer in
%% question. This isn't the peer election of 3588 except in the
@@ -862,7 +890,7 @@ handle_CEA(#diameter_packet{bin = Bin}
of
_ -> open(DPkt, SApps, Caps, {connect, hd([_] = IS)}, S)
catch
- ?FAILURE(Reason) -> close({'CEA', Reason, Caps, DPkt}, S)
+ ?FAILURE(Reason) -> close({'CEA', Reason, Caps, DPkt})
end.
%% Check more than the result code since the peer could send success
%% regardless. If not 2001 then a peer_up callback could do anything
@@ -877,12 +905,13 @@ recv_CEA(#diameter_packet{header = #diameter_header{version
is_error = false},
msg = CEA,
errors = []},
- #state{service = Svc}) ->
- {ok, T} = diameter_capx:recv_CEA(CEA, Svc),
+ #state{service = Svc,
+ dictionary = Dict}) ->
+ {ok, T} = diameter_capx:recv_CEA(CEA, Svc, Dict),
T;
recv_CEA(Pkt, S) ->
- close({'CEA', caps(S), Pkt}, S).
+ close({'CEA', caps(S), Pkt}).
caps(#diameter_service{capabilities = Caps}) ->
Caps;
@@ -935,14 +964,14 @@ open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid,
%% We've advertised TLS support: tell the transport the result
%% and expect a reply when the handshake is complete.
-tls_ack(true, Caps, Type, IS, #state{transport = TPid} = S) ->
+tls_ack(true, Caps, Type, IS, #state{transport = TPid}) ->
Ref = make_ref(),
TPid ! {diameter, {tls, Ref, Type, IS == ?TLS}},
receive
{diameter, {tls, Ref}} ->
ok;
{'DOWN', _, process, TPid, Reason} ->
- close({tls_ack, Reason, Caps}, S)
+ close({tls_ack, Reason, Caps})
end;
%% Or not. Don't send anything to the transport so that transports
@@ -955,25 +984,11 @@ capz(#diameter_caps{} = L, #diameter_caps{} = R) ->
= list_to_tuple([diameter_caps | lists:zip(tl(tuple_to_list(L)),
tl(tuple_to_list(R)))]).
-%% close/2
+%% close/1
-%% Tell the watchdog that our death isn't due to transport failure.
-close(Reason, #state{parent = Pid}) ->
- close_wd(Reason, Pid),
+close(Reason) ->
throw({?MODULE, close, Reason}).
-%% close_wd/2
-
-%% Ensure the watchdog dies if DPR has been sent ...
-close_wd(_, #state{dpr = false}) ->
- ok;
-close_wd(Reason, #state{parent = Pid}) ->
- close_wd(Reason, Pid);
-
-%% ... or otherwise
-close_wd(Reason, Pid) ->
- Pid ! {close, self(), Reason}.
-
%% dwa/1
dwa(#diameter_caps{origin_host = OH,
@@ -1035,13 +1050,14 @@ dpr([CB|Rest], [Reason | _] = Args, S) ->
diameter_lib:error_report(failure, No),
{stop, No}
end;
-
+
dpr([], [Reason | _], S) ->
send_dpr(Reason, [], S).
-record(opts, {cause, timeout = ?DPA_TIMEOUT}).
send_dpr(Reason, Opts, #state{transport = TPid,
+ dictionary = Dict,
service = #diameter_service{capabilities = Caps}}
= S) ->
#opts{cause = Cause, timeout = Tmo}
@@ -1061,7 +1077,8 @@ send_dpr(Reason, Opts, #state{transport = TPid,
= Pkt
= encode(['DPR', {'Origin-Host', OH},
{'Origin-Realm', OR},
- {'Disconnect-Cause', Cause}]),
+ {'Disconnect-Cause', Cause}],
+ Dict),
send(TPid, Pkt),
dpa_timer(Tmo),
?LOG(send, 'DPR'),
diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl
index 619b12ecad..ac58e4bf5b 100644
--- a/lib/diameter/src/base/diameter_reg.erl
+++ b/lib/diameter/src/base/diameter_reg.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -208,10 +208,6 @@ init(_) ->
%% # handle_call/3
%% ----------------------------------------------------------
-handle_call(Req, From, S)
- when not is_record(S, state) ->
- handle_call(Req, From, upgrade(S));
-
handle_call({add, Fun, Key, Pid}, _, S) ->
B = Fun(?TABLE, {Key, Pid}),
monitor(B andalso no_monitor(Pid), Pid),
@@ -281,9 +277,6 @@ code_change(_OldVsn, State, _Extra) ->
%% ===========================================================================
-upgrade(S) ->
- #state{} = list_to_tuple(tuple_to_list(S) ++ [[]]).
-
monitor(true, Pid) ->
ets:insert(?TABLE, ?MONITOR(Pid, erlang:monitor(process, Pid)));
monitor(false, _) ->
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index d2a416166f..f1342df16c 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -24,33 +24,38 @@
-module(diameter_service).
-behaviour(gen_server).
+%% towards diameter_service_sup
+-export([start_link/1]).
+
+%% towards diameter
+-export([subscribe/1,
+ unsubscribe/1,
+ services/0,
+ info/2]).
+
+%% towards diameter_config
-export([start/1,
stop/1,
start_transport/2,
- stop_transport/2,
- info/2,
- call/4]).
+ stop_transport/2]).
-%% towards diameter_watchdog
--export([receive_message/3]).
+%% towards diameter_peer
+-export([notify/2]).
-%% service supervisor
--export([start_link/1]).
+%% towards diameter_traffic
+-export([find_incoming_app/4,
+ pick_peer/3]).
--export([subscribe/1,
- unsubscribe/1,
+%% test/debug
+-export([services/1,
subscriptions/1,
subscriptions/0,
- services/0,
- services/1,
- whois/1]).
-
-%% test/debug
--export([call_module/3,
+ call_module/3,
+ whois/1,
state/1,
uptime/1]).
-%%% gen_server callbacks
+%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
@@ -58,21 +63,10 @@
terminate/2,
code_change/3]).
-%% Other callbacks.
--export([send/1]).
-
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
-%% The states mirrored by peer_up/peer_down callbacks.
--define(STATE_UP, up).
--define(STATE_DOWN, down).
-
--type op_state() :: ?STATE_UP
- | ?STATE_DOWN.
-
-%% The RFC 3539 watchdog states that are now maintained, albeit
-%% along with the old up/down. okay = up, else down.
+%% RFC 3539 watchdog states.
-define(WD_INITIAL, initial).
-define(WD_OKAY, okay).
-define(WD_SUSPECT, suspect).
@@ -86,11 +80,8 @@
| ?WD_REOPEN.
-define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1
--define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests
-define(RESTART_TC, 1000). %% if restart was this recent
--define(RELAY, ?DIAMETER_DICT_RELAY).
-
%% Used to be able to swap this with anything else dict-like but now
%% rely on the fact that a service's #state{} record does not change
%% in storing in it ?STATE table and not always going through the
@@ -98,10 +89,6 @@
%% a ?Dict don't change the handle to it.
-define(Dict, diameter_dict).
-%% Table containing outgoing requests for which a reply has yet to be
-%% received.
--define(REQUEST_TABLE, diameter_request).
-
%% Maintains state in a table. In contrast to previously, a service's
%% stat is not constant and is accessed outside of the service
%% process.
@@ -115,83 +102,58 @@
%% Workaround for dialyzer's lack of understanding of match specs.
-type match(T)
- :: T | '_' | '$1' | '$2' | '$3' | '$4'.
-
-%% State of service gen_server.
+ :: T | '_' | '$1' | '$2'.
+
+%% State of service gen_server. Note that the state term itself
+%% doesn't change, which is relevant for the stateless application
+%% callbacks since the state is retrieved from ?STATE_TABLE from
+%% outside the service process. The pid in the service record is used
+%% to determine whether or not we need to call the process for a
+%% pick_peer callback in the statefull case.
-record(state,
{id = now(),
- service_name, %% as passed to start_service/2, key in ?STATE_TABLE
+ service_name :: diameter:service_name(), %% key in ?STATE_TABLE
service :: #diameter_service{},
- peerT = ets_new(peers) :: ets:tid(),%% #peer{} at start_fsm
- connT = ets_new(conns) :: ets:tid(),%% #conn{} at connection_up/reopen
- shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
- local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
+ watchdogT = ets_new(watchdogs) %% #watchdog{} at start
+ :: ets:tid(),
+ peerT = ets_new(peers) %% #peer{pid = TPid} at okay/reopen
+ :: ets:tid(),
+ shared_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...]
+ :: ets:tid(),
+ local_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...]
+ :: ets:tid(),
monitor = false :: false | pid(), %% process to die with
options
:: [{sequence, diameter:sequence()} %% sequence mask
| {restrict_connections, diameter:restriction()}
| {share_peers, boolean()} %% broadcast peers to remote nodes?
| {use_shared_peers, boolean()}]}).%% use broadcasted peers?
-%% shared_peers reflects the peers broadcast from remote nodes. Note
-%% that the state term itself doesn't change, which is relevant for
-%% the stateless application callbacks since the state is retrieved
-%% from ?STATE_TABLE from outside the service process. The pid in the
-%% service record is used to determine whether or not we need to call
-%% the process for a pick_peer callback.
-
-%% Record representing a watchdog process as implemented by
-%% diameter_watchdog. The term "peer" here is historical, made
-%% especially confusing by the fact that a peer_ref() in the
-%% documentation is the key of a #conn{} record, not a #peer{} record.
-%% The name is also unfortunate given the meaning of peer in the
-%% Diameter sense.
--record(peer,
+%% shared_peers reflects the peers broadcast from remote nodes.
+
+%% Record representing an RFC 3539 watchdog process implemented by
+%% diameter_watchdog.
+-record(watchdog,
{pid :: match(pid()),
type :: match(connect | accept),
ref :: match(reference()), %% key into diameter_config
options :: match([diameter:transport_opt()]),%% from start_transport
- op_state = {?STATE_DOWN, ?WD_INITIAL}
- :: match(op_state() | {op_state(), wd_state()}),
+ state = ?WD_INITIAL :: match(wd_state()),
started = now(), %% at process start
- conn = false :: match(boolean() | pid())}).
- %% true at accepted, pid() at connection_up or reopen
-
-%% Record representing a peer process as implemented by
-%% diameter_peer_fsm. The term "conn" is historical. Despite the name
-%% here, comments refer to watchdog and peer processes, that are keys
-%% in #peer{} and #conn{} records respectively. To add to the
-%% confusion, a #request.transport is a peer process = key in a
-%% #conn{} record. The actual transport process (that the peer process
-%% knows about and that has a transport connection) isn't seen here.
--record(conn,
+ peer = false :: match(boolean() | pid())}).
+ %% true at accepted, pid() at okay/reopen
+
+%% Record representing an Peer State Machine processes implemented by
+%% diameter_peer_fsm.
+-record(peer,
{pid :: pid(),
apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias}
caps :: #diameter_caps{},
started = now(), %% at process start
- peer :: pid()}). %% key into peerT
-
-%% Record stored in diameter_request for each outgoing request.
--record(request,
- {from, %% arg 2 of handle_call/3
- handler :: match(pid()), %% request process
- transport :: match(pid()), %% peer process
- caps :: match(#diameter_caps{}),
- app :: match(diameter:app_alias()),%% #diameter_app.alias
- dictionary :: match(module()), %% #diameter_app.dictionary
- module :: match([module() | list()]), %% #diameter_app.module
- filter :: match(diameter:peer_filter()),
- packet :: match(#diameter_packet{})}).
-
-%% Record call/4 options are parsed into.
--record(options,
- {filter = none :: diameter:peer_filter(),
- extra = [] :: list(),
- timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF,
- detach = false :: boolean()}).
-
-%%% ---------------------------------------------------------------------------
-%%% # start(SvcName)
-%%% ---------------------------------------------------------------------------
+ watchdog :: pid()}). %% key into watchdogT
+
+%% ---------------------------------------------------------------------------
+%% # start/1
+%% ---------------------------------------------------------------------------
start(SvcName) ->
diameter_service_sup:start_child(SvcName).
@@ -202,9 +164,9 @@ start_link(SvcName) ->
%% Put the arbitrary term SvcName in a list in case we ever want to
%% send more than this and need to distinguish old from new.
-%%% ---------------------------------------------------------------------------
-%%% # stop(SvcName)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # stop/1
+%% ---------------------------------------------------------------------------
stop(SvcName) ->
case whois(SvcName) of
@@ -220,180 +182,43 @@ stop(ok, Pid) ->
stop(No, _) ->
No.
-%%% ---------------------------------------------------------------------------
-%%% # start_transport(SvcName, {Ref, Type, Opts})
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # start_transport/3
+%% ---------------------------------------------------------------------------
-start_transport(SvcName, {_,_,_} = T) ->
+start_transport(SvcName, {_Ref, _Type, _Opts} = T) ->
call_service_by_name(SvcName, {start, T}).
-%%% ---------------------------------------------------------------------------
-%%% # stop_transport(SvcName, Refs)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # stop_transport/2
+%% ---------------------------------------------------------------------------
stop_transport(_, []) ->
ok;
stop_transport(SvcName, [_|_] = Refs) ->
call_service_by_name(SvcName, {stop, Refs}).
-%%% ---------------------------------------------------------------------------
-%%% # info(SvcName, Item)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # info/2
+%% ---------------------------------------------------------------------------
info(SvcName, Item) ->
- case find_state(SvcName) of
- #state{} = S ->
+ case lookup_state(SvcName) of
+ [#state{} = S] ->
service_info(Item, S);
- false ->
+ [] ->
undefined
end.
-%%% ---------------------------------------------------------------------------
-%%% # receive_message(TPid, Pkt, MessageData)
-%%% ---------------------------------------------------------------------------
-
-%% Handle an incoming Diameter message in the watchdog process. This
-%% used to come through the service process but this avoids that
-%% becoming a bottleneck.
-
-receive_message(TPid, Pkt, T)
- when is_pid(TPid) ->
- #diameter_packet{header = #diameter_header{is_request = R}} = Pkt,
- recv(R, (not R) andalso lookup_request(Pkt, TPid), TPid, Pkt, T).
-
-%% Incoming request ...
-recv(true, false, TPid, Pkt, T) ->
- try
- spawn(fun() -> recv_request(TPid, Pkt, T) end)
- catch
- error: system_limit = E -> %% discard
- ?LOG({error, E}, now())
- end;
-
-%% ... answer to known request ...
-recv(false, #request{from = {_, Ref}, handler = Pid} = Req, _, Pkt, _) ->
- Pid ! {answer, Ref, Req, Pkt};
-%% Note that failover could have happened prior to this message being
-%% received and triggering failback. That is, both a failover message
-%% and answer may be on their way to the handler process. In the worst
-%% case the request process gets notification of the failover and
-%% sends to the alternate peer before an answer arrives, so it's
-%% always the case that we can receive more than one answer after
-%% failover. The first answer received by the request process wins,
-%% any others are discarded.
-
-%% ... or not.
-recv(false, false, _, _, _) ->
- ok.
-
-%%% ---------------------------------------------------------------------------
-%%% # call(SvcName, App, Msg, Options)
-%%% ---------------------------------------------------------------------------
-
-call(SvcName, App, Msg, Options)
- when is_list(Options) ->
- Rec = make_options(Options),
- Ref = make_ref(),
- Caller = {self(), Ref},
- Fun = fun() -> exit({Ref, call(SvcName, App, Msg, Rec, Caller)}) end,
- try spawn_monitor(Fun) of
- {_, MRef} ->
- recv(MRef, Ref, Rec#options.detach, false)
- catch
- error: system_limit = E ->
- {error, E}
- end.
-
-%% Don't rely on gen_server:call/3 for the timeout handling since it
-%% makes no guarantees about not leaving a reply message in the
-%% mailbox if we catch its exit at timeout. It currently *can* do so,
-%% which is also undocumented.
-
-recv(MRef, _, true, true) ->
- erlang:demonitor(MRef, [flush]),
- ok;
-
-recv(MRef, Ref, Detach, Sent) ->
- receive
- Ref -> %% send has been attempted
- recv(MRef, Ref, Detach, true);
- {'DOWN', MRef, process, _, Reason} ->
- call_rc(Reason, Ref, Sent)
- end.
-
-%% call/5 has returned ...
-call_rc({Ref, Ans}, Ref, _) ->
- Ans;
-
-%% ... or not. In this case failure/encode are documented.
-call_rc(_, _, Sent) ->
- {error, choose(Sent, failure, encode)}.
-
-%% call/5
-%%
-%% In the process spawned for the outgoing request.
-
-call(SvcName, App, Msg, Opts, Caller) ->
- c(find_state(SvcName), App, Msg, Opts, Caller).
-
-c(#state{service_name = Svc, options = [{_, Mask} | _]} = S,
- App,
- Msg,
- Opts,
- Caller) ->
- case find_transport(App, Msg, Opts, S) of
- {_,_,_} = T ->
- send_request(T, Mask, Msg, Opts, Caller, Svc);
- false ->
- {error, no_connection};
- {error, _} = No ->
- No
- end;
-
-c(false, _, _, _, _) ->
- {error, no_service}.
-
-%% find_state/1
-
-find_state(SvcName) ->
- fs(ets:lookup(?STATE_TABLE, SvcName)).
-
-fs([#state{} = S]) ->
- S;
-
-fs([]) ->
- false.
-
-%% make_options/1
+%% lookup_state/1
-make_options(Options) ->
- lists:foldl(fun mo/2, #options{}, Options).
+lookup_state(SvcName) ->
+ ets:lookup(?STATE_TABLE, SvcName).
-mo({timeout, T}, Rec)
- when is_integer(T), 0 =< T ->
- Rec#options{timeout = T};
-
-mo({filter, F}, #options{filter = none} = Rec) ->
- Rec#options{filter = F};
-mo({filter, F}, #options{filter = {all, Fs}} = Rec) ->
- Rec#options{filter = {all, [F | Fs]}};
-mo({filter, F}, #options{filter = F0} = Rec) ->
- Rec#options{filter = {all, [F0, F]}};
-
-mo({extra, L}, #options{extra = X} = Rec)
- when is_list(L) ->
- Rec#options{extra = X ++ L};
-
-mo(detach, Rec) ->
- Rec#options{detach = true};
-
-mo(T, _) ->
- ?ERROR({invalid_option, T}).
-
-%%% ---------------------------------------------------------------------------
-%%% # subscribe(SvcName)
-%%% # unsubscribe(SvcName)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # subscribe/1
+%% # unsubscribe/1
+%% ---------------------------------------------------------------------------
subscribe(SvcName) ->
diameter_reg:add({?MODULE, subscriber, SvcName}).
@@ -410,9 +235,9 @@ subscriptions() ->
pmap(Props) ->
lists:map(fun({{?MODULE, _, Name}, Pid}) -> {Name, Pid} end, Props).
-%%% ---------------------------------------------------------------------------
-%%% # services(Pattern)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # services/1
+%% ---------------------------------------------------------------------------
services(Pat) ->
pmap(diameter_reg:match({?MODULE, service, Pat})).
@@ -428,6 +253,86 @@ whois(SvcName) ->
undefined
end.
+%% ---------------------------------------------------------------------------
+%% # pick_peer/3
+%% ---------------------------------------------------------------------------
+
+-spec pick_peer(SvcName, AppOrAlias, Opts)
+ -> {{TPid, Caps, App}, Mask}
+ | false
+ | {error, term()}
+ when SvcName :: diameter:service_name(),
+ AppOrAlias :: {alias, diameter:app_alias()} | #diameter_app{},
+ Opts :: tuple(),
+ TPid :: pid(),
+ Caps :: #diameter_caps{},
+ App :: #diameter_app{},
+ Mask :: diameter:sequence().
+
+pick_peer(SvcName, App, Opts) ->
+ pick(lookup_state(SvcName), App, Opts).
+
+pick([], _, _) ->
+ {error, no_service};
+
+pick([S], App, Opts) ->
+ pick(S, App, Opts);
+
+pick(#state{service = #diameter_service{applications = Apps}}
+ = S,
+ {alias, Alias},
+ Opts) -> %% initial call from diameter:call/4
+ pick(S, find_outgoing_app(Alias, Apps), Opts);
+
+pick(_, false, _) ->
+ false;
+
+pick(#state{options = [{_, Mask} | _]}
+ = S,
+ #diameter_app{module = ModX, dictionary = Dict}
+ = App0,
+ {DestF, Filter, Xtra}) ->
+ App = App0#diameter_app{module = ModX ++ Xtra},
+ [_,_] = RealmAndHost = diameter_lib:eval([DestF, Dict]),
+ case pick_peer(App, RealmAndHost, Filter, S) of
+ {TPid, Caps} ->
+ {{TPid, Caps, App}, Mask};
+ false = No ->
+ No
+ end.
+
+%% ---------------------------------------------------------------------------
+%% # find_incoming_app/4
+%% ---------------------------------------------------------------------------
+
+-spec find_incoming_app(PeerT, TPid, Id, Apps)
+ -> {#diameter_app{}, #diameter_caps{}} %% connection and suitable app
+ | #diameter_caps{} %% connection but no suitable app
+ | false %% no connection
+ when PeerT :: ets:tid(),
+ TPid :: pid(),
+ Id :: non_neg_integer(),
+ Apps :: [#diameter_app{}].
+
+find_incoming_app(PeerT, TPid, Id, Apps) ->
+ try ets:lookup(PeerT, TPid) of
+ [#peer{} = P] ->
+ find_incoming_app(P, Id, Apps);
+ [] -> %% transport has gone down
+ false
+ catch
+ error: badarg -> %% service has gone down (and taken table with it)
+ false
+ end.
+
+%% ---------------------------------------------------------------------------
+%% # notify/2
+%% ---------------------------------------------------------------------------
+
+notify(SvcName, Msg) ->
+ Pid = whois(SvcName),
+ is_pid(Pid) andalso (Pid ! Msg).
+
%% ===========================================================================
%% ===========================================================================
@@ -442,9 +347,9 @@ uptime(Svc) ->
call_module(Service, AppMod, Request) ->
call_service(Service, {call_module, AppMod, Request}).
-%%% ---------------------------------------------------------------------------
-%%% # init([SvcName])
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # init/1
+%% ---------------------------------------------------------------------------
init([SvcName]) ->
process_flag(trap_exit, true), %% ensure terminate(shutdown, _)
@@ -455,9 +360,9 @@ i(SvcName, true) ->
i(_, false) ->
{stop, {shutdown, already_started}}.
-%%% ---------------------------------------------------------------------------
-%%% # handle_call(Req, From, State)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # handle_call/3
+%% ---------------------------------------------------------------------------
handle_call(state, _, S) ->
{reply, S, S};
@@ -493,17 +398,17 @@ handle_call(Req, From, S) ->
unexpected(handle_call, [Req, From], S),
{reply, nok, S}.
-%%% ---------------------------------------------------------------------------
-%%% # handle_cast(Req, State)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # handle_cast/2
+%% ---------------------------------------------------------------------------
handle_cast(Req, S) ->
unexpected(handle_cast, [Req], S),
{noreply, S}.
-%%% ---------------------------------------------------------------------------
-%%% # handle_info(Req, State)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # handle_info/2
+%% ---------------------------------------------------------------------------
handle_info(T, #state{} = S) ->
case transition(T,S) of
@@ -520,67 +425,39 @@ transition({accepted, Pid, TPid}, S) ->
accepted(Pid, TPid, S),
ok;
-%% Peer process has a new open connection.
-transition({connection_up, Pid, T}, S) ->
- connection_up(Pid, T, S),
- ok;
-
-%% Watchdog has a new connection that will be opened after DW[RA]
-%% exchange. This message was added long after connection_up, to
-%% communicate the information as soon as it's available. Leave
-%% connection_up as is it for now, duplicated information and all.
-transition({reopen, Pid, T}, S) ->
- reopen(Pid, T, S),
- ok;
-
-%% Watchdog has left state OKAY.
-transition({connection_down, Pid}, S) ->
- connection_down(Pid, S),
- ok;
-
-%% Watchdog has returned to state OKAY.
-transition({connection_up, Pid}, S) ->
- connection_up(Pid, S),
- ok;
-
-%% Accepting transport has lost connectivity.
-transition({close, Pid}, S) ->
- close(Pid, S),
- ok;
-
%% Connecting transport is being restarted by watchdog.
transition({reconnect, Pid}, S) ->
reconnect(Pid, S),
ok;
-%% Watchdog is sending notification of a state transition. Note that
-%% the connection_up/down messages pre-date this message and are still
-%% used. A watchdog message will follow these and communicate the same
-%% state as was set in handling connection_up/down.
-transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName,
- peerT = PeerT}) ->
- #peer{ref = Ref, type = T, options = Opts, op_state = {OS,_}}
- = P
- = fetch(PeerT, Pid),
- insert(PeerT, P#peer{op_state = {OS, To}}),
+%% Watchdog is sending notification of transport death.
+transition({close, Pid, Reason}, #state{service_name = SvcName,
+ watchdogT = WatchdogT}) ->
+ #watchdog{state = WS,
+ ref = Ref,
+ type = Type,
+ options = Opts}
+ = fetch(WatchdogT, Pid),
+ WS /= ?WD_OKAY
+ andalso
+ send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}),
+ ok;
+
+%% Watchdog is sending notification of a state transition.
+transition({watchdog, Pid, {[TPid | Data], From, To}},
+ #state{service_name = SvcName,
+ watchdogT = WatchdogT}
+ = S) ->
+ #watchdog{ref = Ref, type = T, options = Opts}
+ = Wd
+ = fetch(WatchdogT, Pid),
+ watchdog(TPid, Data, From, To, Wd, S),
send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}),
ok;
-%% Death of a watchdog process (#peer.pid) results in the removal of
-%% it's peer and any associated conn record when 'DOWN' is received
-%% (after this) but the states will be {?STATE_UP, ?WD_DOWN} for a
-%% short time. (No real problem since ?WD_* is only used in
-%% service_info.) We set ?WD_OKAY as a consequence of connection_up
-%% since we know a watchdog is coming. We can't set anything at
-%% connection_down since we don't know if the subsequent watchdog
-%% message will be ?WD_DOWN or ?WD_SUSPECT. We don't (yet) set
-%% ?STATE_* as a consequence of a watchdog message since this requires
-%% changing some of the matching on ?STATE_*.
-%%
-%% Death of a peer process process (#conn.pid, #peer.conn) results in
-%% connection_down followed by watchdog ?WD_DOWN. The latter doesn't
-%% result in the conn record being deleted since 'DOWN' from death of
-%% its watchdog doesn't (yet) deal with the record having been
-%% removed.
+%% Death of a watchdog process (#watchdog.pid) results in the removal of
+%% it's peer and any associated conn record when 'DOWN' is received.
+%% Death of a peer process process (#peer.pid, #watchdog.peer) results in
+%% ?WD_DOWN.
%% Monitor process has died. Just die with a reason that tells
%% diameter_config about the happening. If a cleaner shutdown is
@@ -589,9 +466,9 @@ transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) ->
{stop, {monitor, Reason}};
%% Local watchdog process has died.
-transition({'DOWN', _, process, Pid, Reason}, S)
+transition({'DOWN', _, process, Pid, _Reason}, S)
when node(Pid) == node() ->
- peer_down(Pid, Reason, S),
+ watchdog_down(Pid, S),
ok;
%% Remote service wants to know about shared peers.
@@ -614,20 +491,13 @@ transition({tc_timeout, T}, S) ->
tc_timeout(T, S),
ok;
-%% Request process is telling us it may have missed a failover message
-%% after a transport went down and the service process looked up
-%% outstanding requests.
-transition({failover, TRef, Seqs}, S) ->
- failover(TRef, Seqs, S),
- ok;
-
transition(Req, S) ->
unexpected(handle_info, [Req], S),
ok.
-%%% ---------------------------------------------------------------------------
-%%% # terminate(Reason, State)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # terminate/2
+%% ---------------------------------------------------------------------------
terminate(Reason, #state{service_name = Name} = S) ->
send_event(Name, stop),
@@ -635,9 +505,9 @@ terminate(Reason, #state{service_name = Name} = S) ->
shutdown == Reason %% application shutdown
andalso shutdown(application, S).
-%%% ---------------------------------------------------------------------------
-%%% # code_change(FromVsn, State, Extra)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # code_change/3
+%% ---------------------------------------------------------------------------
code_change(FromVsn,
#state{service_name = SvcName,
@@ -663,30 +533,20 @@ code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) ->
unexpected(F, A, #state{service_name = Name}) ->
?UNEXPECTED(F, A ++ [Name]).
-cb([_|_] = M, F, A) ->
- eval(M, F, A);
-cb(Rec, F, A) ->
- {_, M} = app(Rec),
+cb(#diameter_app{module = [_|_] = M}, F, A) ->
eval(M, F, A).
-app(#request{app = A, module = M}) ->
- {A,M};
-app(#diameter_app{alias = A, module = M}) ->
- {A,M}.
-
eval([M|X], F, A) ->
apply(M, F, A ++ X).
%% Callback with state.
-state_cb(#diameter_app{mutable = false, init_state = S}, {ModX, F, A}) ->
+state_cb(#diameter_app{module = ModX, mutable = false, init_state = S},
+ pick_peer = F,
+ A) ->
eval(ModX, F, A ++ [S]);
-state_cb(#diameter_app{mutable = true, alias = Alias}, {_,_,_} = MFA) ->
- state_cb(MFA, Alias);
-
-state_cb({ModX,F,A}, Alias)
- when is_list(ModX) ->
+state_cb(#diameter_app{module = ModX, alias = Alias}, F, A) ->
eval(ModX, F, A ++ [mod_state(Alias)]).
choose(true, X, _) -> X;
@@ -712,57 +572,38 @@ mod_state(Alias) ->
mod_state(Alias, ModS) ->
put({?MODULE, mod_state, Alias}, ModS).
-%%% ---------------------------------------------------------------------------
-%%% # shutdown/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # shutdown/2
+%% ---------------------------------------------------------------------------
-%% remove_transport: ask watchdogs to terminate their transport.
-shutdown(Refs, #state{peerT = PeerT})
+%% remove_transport
+shutdown(Refs, #state{watchdogT = WatchdogT})
when is_list(Refs) ->
- ets:foldl(fun(P,ok) -> sp(P, Refs), ok end, ok, PeerT);
-
-%% application/service shutdown: ask transports to terminate themselves.
-shutdown(Reason, #state{peerT = PeerT}) ->
- %% A transport might not be alive to receive the shutdown request
- %% but give those that are a chance to shutdown gracefully.
- shutdown(conn, Reason, PeerT),
- %% Kill the watchdogs explicitly in case there was no transport.
- shutdown(peer, Reason, PeerT).
-
-%% sp/2
-
-sp(#peer{ref = Ref, pid = Pid}, Refs) ->
- lists:member(Ref, Refs)
- andalso (Pid ! {shutdown, self()}). %% 'DOWN' cleans up
+ ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, WatchdogT);
-%% shutdown/3
-
-shutdown(Who, Reason, T) ->
- diameter_lib:wait(ets:foldl(fun(X,A) -> shutdown(Who, X, Reason, A) end,
+%% application/service shutdown
+shutdown(Reason, #state{watchdogT = WatchdogT})
+ when Reason == application;
+ Reason == service ->
+ diameter_lib:wait(ets:foldl(fun(P,A) -> st(P, Reason, A) end,
[],
- T)).
+ WatchdogT)).
-shutdown(conn = Who, #peer{op_state = {OS,_}} = P, Reason, Acc) ->
- shutdown(Who, P#peer{op_state = OS}, Reason, Acc);
+%% st/2
-shutdown(conn,
- #peer{pid = Pid, op_state = ?STATE_UP, conn = TPid},
- Reason,
- Acc) ->
- TPid ! {shutdown, Pid, Reason},
- [TPid | Acc];
+st(#watchdog{ref = Ref, pid = Pid}, Refs) ->
+ lists:member(Ref, Refs)
+ andalso (Pid ! {shutdown, self(), transport}). %% 'DOWN' cleans up
-shutdown(peer, #peer{pid = Pid}, _Reason, Acc)
- when is_pid(Pid) ->
- exit(Pid, shutdown),
- [Pid | Acc];
+%% st/3
-shutdown(_, #peer{}, _, Acc) ->
- Acc.
+st(#watchdog{pid = Pid}, Reason, Acc) ->
+ Pid ! {shutdown, self(), Reason},
+ [Pid | Acc].
-%%% ---------------------------------------------------------------------------
-%%% # call_service/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # call_service/2
+%% ---------------------------------------------------------------------------
call_service(Pid, Req)
when is_pid(Pid) ->
@@ -785,11 +626,9 @@ cs(Pid, Req)
cs(undefined, _) ->
{error, no_service}.
-%%% ---------------------------------------------------------------------------
-%%% # i/1
-%%%
-%%% Output: #state{}
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # i/1
+%% ---------------------------------------------------------------------------
%% Intialize the state of a service gen_server.
@@ -859,9 +698,9 @@ get_value(Key, Vs) ->
{_, V} = lists:keyfind(Key, 1, Vs),
V.
-%%% ---------------------------------------------------------------------------
-%%% # start/3
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # start/3
+%% ---------------------------------------------------------------------------
%% If the initial start/3 at service/transport start succeeds then
%% subsequent calls to start/4 on the same service will also succeed
@@ -891,22 +730,28 @@ type(connect = T) -> T.
%% start/4
-start(Ref, Type, Opts, #state{peerT = PeerT,
- connT = ConnT,
+start(Ref, Type, Opts, #state{watchdogT = WatchdogT,
+ peerT = PeerT,
options = SvcOpts,
service_name = SvcName,
- service = Svc})
+ service = Svc0})
when Type == connect;
Type == accept ->
- Pid = s(Type, Ref, {ConnT,
+ #diameter_service{applications = Apps}
+ = Svc
+ = merge_service(Opts, Svc0),
+ {_,_} = Mask = proplists:get_value(sequence, SvcOpts),
+ Pid = s(Type, Ref, {diameter_traffic:make_recvdata([SvcName,
+ PeerT,
+ Apps,
+ Mask]),
Opts,
- SvcName,
SvcOpts,
- merge_service(Opts, Svc)}),
- insert(PeerT, #peer{pid = Pid,
- type = Type,
- ref = Ref,
- options = Opts}),
+ Svc}),
+ insert(WatchdogT, #watchdog{pid = Pid,
+ type = Type,
+ ref = Ref,
+ options = Opts}),
Pid.
%% Note that the service record passed into the watchdog is the merged
@@ -949,100 +794,115 @@ ms({capabilities, Opts}, #diameter_service{capabilities = Caps0} = Svc)
ms(_, Svc) ->
Svc.
-%%% ---------------------------------------------------------------------------
-%%% # accepted/3
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # accepted/3
+%% ---------------------------------------------------------------------------
-accepted(Pid, _TPid, #state{peerT = PeerT} = S) ->
- #peer{ref = Ref, type = accept = T, conn = false, options = Opts}
- = P
- = fetch(PeerT, Pid),
- insert(PeerT, P#peer{conn = true}), %% mark replacement as started
- start(Ref, T, Opts, S). %% start new watchdog
+accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) ->
+ #watchdog{ref = Ref, type = accept = T, peer = false, options = Opts}
+ = Wd
+ = fetch(WatchdogT, Pid),
+ insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement as started
+ start(Ref, T, Opts, S). %% start new watchdog
fetch(Tid, Key) ->
[T] = ets:lookup(Tid, Key),
- case T of
- #peer{op_state = ?STATE_UP} = P ->
- P#peer{op_state = {?STATE_UP, ?WD_OKAY}};
- #peer{op_state = ?STATE_DOWN} = P ->
- P#peer{op_state = {?STATE_DOWN, ?WD_DOWN}};
- _ ->
- T
- end.
+ T.
-%%% ---------------------------------------------------------------------------
-%%% # connection_up/3
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # watchdog/6
+%%
+%% React to a watchdog state transition.
+%% ---------------------------------------------------------------------------
-%% Watchdog process has reached state OKAY.
+%% Watchdog has a new open connection.
+watchdog(TPid, [T], _, ?WD_OKAY, Wd, State) ->
+ connection_up({TPid, T}, Wd, State);
-connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT,
- connT = ConnT}
- = S) ->
- P = fetch(PeerT, Pid),
- C = #conn{pid = TPid,
- apps = SApps,
- caps = Caps,
- peer = Pid},
-
- insert(ConnT, C),
- connection_up([Pkt], P#peer{conn = TPid}, C, S).
-
-%%% ---------------------------------------------------------------------------
-%%% # reopen/3
-%%% ---------------------------------------------------------------------------
-
-%% Note that this connection_up/3 rewrites the same #conn{} now
-%% written here. Both do so in case reopen has not happened in old
-%% code.
-
-reopen(Pid, {TPid, {Caps, SApps, _Pkt}}, #state{peerT = PeerT,
- connT = ConnT}) ->
- P = fetch(PeerT, Pid),
- C = #conn{pid = TPid,
- apps = SApps,
- caps = Caps,
- peer = Pid},
-
- insert(ConnT, C),
- #peer{op_state = {?STATE_DOWN, _}}
- = P,
- insert(PeerT, P#peer{op_state = {?STATE_DOWN, ?WD_REOPEN},
- conn = TPid}).
-
-%%% ---------------------------------------------------------------------------
-%%% # connection_up/2
-%%% ---------------------------------------------------------------------------
-
-%% Peer process has transitioned back into the open state. Note that there
-%% has been no new capabilties exchange in this case.
-
-connection_up(Pid, #state{peerT = PeerT,
- connT = ConnT}
- = S) ->
- #peer{conn = TPid} = P = fetch(PeerT, Pid),
- C = fetch(ConnT, TPid),
- connection_up([], P, C, S).
+%% Watchdog has a new connection that will be opened after DW[RA]
+%% exchange.
+watchdog(TPid, [T], _, ?WD_REOPEN, Wd, State) ->
+ reopen({TPid, T}, Wd, State);
+
+%% Watchdog has recovered a suspect connection.
+watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) ->
+ #watchdog{peer = TPid} = Wd, %% assert
+ connection_up(Wd, State);
+
+%% Watchdog has an unresponsive connection.
+watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) ->
+ #watchdog{peer = TPid} = Wd, %% assert
+ connection_down(Wd, To, State);
+
+%% Watchdog has lost its connection.
+watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) ->
+ close(Wd, S),
+ connection_down(Wd, To, S),
+ ets:delete(PeerT, TPid);
+
+watchdog(_, [], _, _, _, _) ->
+ ok.
-%% connection_up/4
+%% ---------------------------------------------------------------------------
+%% # connection_up/3
+%% ---------------------------------------------------------------------------
-connection_up(T, P, C, #state{peerT = PeerT,
- local_peers = LDict,
- service_name = SvcName,
- service
- = #diameter_service{applications = Apps}}
- = S) ->
- #peer{conn = TPid, op_state = {?STATE_DOWN, _}}
- = P,
- #conn{apps = SApps, caps = Caps}
- = C,
+%% Watchdog process has reached state OKAY.
+
+connection_up({TPid, {Caps, SupportedApps, Pkt}},
+ #watchdog{pid = Pid}
+ = Wd,
+ #state{peerT = PeerT}
+ = S) ->
+ Pr = #peer{pid = TPid,
+ apps = SupportedApps,
+ caps = Caps,
+ watchdog = Pid},
+ insert(PeerT, Pr),
+ connection_up([Pkt], Wd#watchdog{peer = TPid}, Pr, S).
+
+%% ---------------------------------------------------------------------------
+%% # reopen/3
+%% ---------------------------------------------------------------------------
+
+reopen({TPid, {Caps, SupportedApps, _Pkt}},
+ #watchdog{pid = Pid}
+ = Wd,
+ #state{watchdogT = WatchdogT,
+ peerT = PeerT}) ->
+ insert(PeerT, #peer{pid = TPid,
+ apps = SupportedApps,
+ caps = Caps,
+ watchdog = Pid}),
+ insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN,
+ peer = TPid}).
+
+%% ---------------------------------------------------------------------------
+%% # connection_up/2
+%% ---------------------------------------------------------------------------
+
+%% Watchdog has recovered as suspect connection. Note that there has
+%% been no new capabilties exchange in this case.
+
+connection_up(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
+ connection_up([], Wd, fetch(PeerT, TPid), S).
- insert(PeerT, P#peer{op_state = {?STATE_UP, ?WD_OKAY}}),
+%% connection_up/4
- request_peer_up(TPid),
+connection_up(Extra,
+ #watchdog{peer = TPid}
+ = Wd,
+ #peer{apps = SApps, caps = Caps}
+ = Pr,
+ #state{watchdogT = WatchdogT,
+ local_peers = LDict,
+ service_name = SvcName,
+ service = #diameter_service{applications = Apps}}
+ = S) ->
+ insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}),
+ diameter_traffic:peer_up(TPid),
insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
- report_status(up, P, C, S, T).
+ report_status(up, Wd, Pr, S, Extra).
insert_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps).
@@ -1052,13 +912,57 @@ ilp({Id, Alias}, {TC, SA}, LDict) ->
?Dict:append(Alias, TC, LDict).
init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) ->
- #diameter_app{module = ModX,
- id = Id} %% assert
+ #diameter_app{id = Id} %% assert
+ = App
= find_app(Alias, Apps),
- peer_cb({ModX, peer_up, [SvcName, TC]}, Alias)
+ peer_cb(App, peer_up, [SvcName, TC])
orelse exit(TPid, kill). %% fake transport failure
+%% ---------------------------------------------------------------------------
+%% # find_incoming_app/3
+%% ---------------------------------------------------------------------------
+
+%% No one should be sending the relay identifier.
+find_incoming_app(#peer{caps = Caps}, ?APP_ID_RELAY, _) ->
+ Caps;
+
+find_incoming_app(Peer, Id, Apps)
+ when is_integer(Id) ->
+ find_incoming_app(Peer, [Id, ?APP_ID_RELAY], Apps);
+
+%% Note that the apps represented in SApps may be a strict subset of
+%% those in Apps.
+find_incoming_app(#peer{apps = SApps, caps = Caps}, Ids, Apps) ->
+ case keyfind(Ids, 1, SApps) of
+ {_Id, Alias} ->
+ {#diameter_app{} = find_app(Alias, Apps), Caps};
+ false ->
+ Caps
+ end.
+
+%% keyfind/3
+
+keyfind([], _, _) ->
+ false;
+keyfind([Key | Rest], Pos, L) ->
+ case lists:keyfind(Key, Pos, L) of
+ false ->
+ keyfind(Rest, Pos, L);
+ T ->
+ T
+ end.
+
+%% find_outgoing_app/2
+
+find_outgoing_app(Alias, Apps) ->
+ case find_app(Alias, Apps) of
+ #diameter_app{id = ?APP_ID_RELAY} ->
+ false;
+ A ->
+ A
+ end.
+
%% find_app/2
find_app(Alias, Apps) ->
@@ -1066,53 +970,51 @@ find_app(Alias, Apps) ->
%% Don't bring down the service (and all associated connections)
%% regardless of what happens.
-peer_cb(MFA, Alias) ->
- try state_cb(MFA, Alias) of
+peer_cb(App, F, A) ->
+ try state_cb(App, F, A) of
ModS ->
- mod_state(Alias, ModS),
+ mod_state(App#diameter_app.alias, ModS),
true
catch
E:R ->
- diameter_lib:error_report({failure, {E, R, Alias, ?STACK}}, MFA),
+ diameter_lib:error_report({failure, {E, R, ?STACK}},
+ {App, F, A}),
false
end.
-%%% ---------------------------------------------------------------------------
-%%% # connection_down/2
-%%% ---------------------------------------------------------------------------
-
-%% Watchdog has transitioned out of state OKAY.
-
-connection_down(Pid, #state{peerT = PeerT,
- connT = ConnT}
- = S) ->
- #peer{op_state = {?STATE_UP, WS}, %% assert
- conn = TPid}
- = P
- = fetch(PeerT, Pid),
+%% ---------------------------------------------------------------------------
+%% # connection_down/3
+%% ---------------------------------------------------------------------------
- C = fetch(ConnT, TPid),
- insert(PeerT, P#peer{op_state = {?STATE_DOWN, WS}}),
- connection_down(P,C,S).
-
-%% connection_down/3
-
-connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, _) ->
- ok;
-
-connection_down(#peer{conn = TPid,
- op_state = {?STATE_UP, _}}
- = P,
- #conn{caps = Caps,
+connection_down(#watchdog{state = ?WD_OKAY,
+ peer = TPid}
+ = Wd,
+ #peer{caps = Caps,
apps = SApps}
- = C,
+ = Pr,
#state{service_name = SvcName,
service = #diameter_service{applications = Apps},
local_peers = LDict}
= S) ->
- report_status(down, P, C, S, []),
+ report_status(down, Wd, Pr, S, []),
remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
- request_peer_down(TPid, S).
+ diameter_traffic:peer_down(TPid);
+
+connection_down(#watchdog{}, #peer{}, _) ->
+ ok;
+
+connection_down(#watchdog{state = WS,
+ peer = TPid}
+ = Wd,
+ To,
+ #state{watchdogT = WatchdogT,
+ peerT = PeerT}
+ = S)
+ when is_atom(To) ->
+ insert(WatchdogT, Wd#watchdog{state = To}),
+ ?WD_OKAY == WS
+ andalso
+ connection_down(Wd, fetch(PeerT, TPid), S).
remove_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps).
@@ -1123,71 +1025,58 @@ rlp({Id, Alias}, {TC, SA}, LDict) ->
?Dict:store(Alias, lists:delete(TC, L), LDict).
down_conn(Id, Alias, TC, {SvcName, Apps}) ->
- #diameter_app{module = ModX,
- id = Id} %% assert
+ #diameter_app{id = Id} %% assert
+ = App
= find_app(Alias, Apps),
- peer_cb({ModX, peer_down, [SvcName, TC]}, Alias).
+ peer_cb(App, peer_down, [SvcName, TC]).
-%%% ---------------------------------------------------------------------------
-%%% # peer_down/3
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # watchdog_down/2
+%% ---------------------------------------------------------------------------
%% Watchdog process has died.
-peer_down(Pid, Reason, #state{peerT = PeerT} = S) ->
- P = fetch(PeerT, Pid),
- ets:delete_object(PeerT, P),
- closed(Reason, P, S),
- restart(P,S),
- peer_down(P,S).
-
-%% Send an event at connection establishment failure.
-closed({shutdown, {close, _TPid, Reason}},
- #peer{op_state = {?STATE_DOWN, _},
- ref = Ref,
- type = Type,
- options = Opts},
- #state{service_name = SvcName}) ->
- send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}});
-closed(_, _, _) ->
- ok.
+watchdog_down(Pid, #state{watchdogT = WatchdogT} = S) ->
+ Wd = fetch(WatchdogT, Pid),
+ ets:delete_object(WatchdogT, Wd),
+ restart(Wd,S),
+ wd_down(Wd,S).
-%% The watchdog has never reached OKAY ...
-peer_down(#peer{conn = B}, _)
+%% Watchdog has never reached OKAY ...
+wd_down(#watchdog{peer = B}, _)
when is_boolean(B) ->
ok;
%% ... or maybe it has.
-peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) ->
- #conn{} = C = fetch(ConnT, TPid),
- ets:delete_object(ConnT, C),
- connection_down(P,C,S).
+wd_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
+ connection_down(Wd, ?WD_DOWN, S),
+ ets:delete(PeerT, TPid).
%% restart/2
-restart(P,S) ->
- q_restart(restart(P), S).
+restart(Wd, S) ->
+ q_restart(restart(Wd), S).
%% restart/1
%% Always try to reconnect.
-restart(#peer{ref = Ref,
- type = connect = T,
- options = Opts,
- started = Time}) ->
+restart(#watchdog{ref = Ref,
+ type = connect = T,
+ options = Opts,
+ started = Time}) ->
{Time, {Ref, T, Opts}};
%% Transport connection hasn't yet been accepted ...
-restart(#peer{ref = Ref,
- type = accept = T,
- options = Opts,
- conn = false,
- started = Time}) ->
+restart(#watchdog{ref = Ref,
+ type = accept = T,
+ options = Opts,
+ peer = false,
+ started = Time}) ->
{Time, {Ref, T, Opts}};
%% ... or it has: a replacement has already been spawned.
-restart(#peer{type = accept}) ->
+restart(#watchdog{type = accept}) ->
false.
%% q_restart/2
@@ -1237,9 +1126,9 @@ tc(true, {Ref, Type, Opts}, #state{service_name = SvcName}
tc(false = No, _, _) -> %% removed
No.
-%%% ---------------------------------------------------------------------------
-%%% # close/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # close/2
+%% ---------------------------------------------------------------------------
%% The watchdog doesn't start a new fsm in the accept case, it
%% simply stays alive until someone tells it to die in order for
@@ -1248,14 +1137,13 @@ tc(false = No, _, _) -> %% removed
%% the accepting watchdog upon reception of a CER from the previously
%% connected peer, or us after reconnect_timer timeout.
-close(Pid, #state{service_name = SvcName,
- peerT = PeerT}) ->
- #peer{pid = Pid,
- type = accept,
- ref = Ref,
- options = Opts}
- = fetch(PeerT, Pid),
-
+close(#watchdog{type = connect}, _) ->
+ ok;
+close(#watchdog{type = accept,
+ pid = Pid,
+ ref = Ref,
+ options = Opts},
+ #state{service_name = SvcName}) ->
c(Pid, diameter_config:have_transport(SvcName, Ref), Opts).
%% Tell watchdog to (maybe) die later ...
@@ -1273,21 +1161,21 @@ c(Pid, false, _Opts) ->
%% which a new connection attempt is expected of a connecting peer.
%% The value should be greater than the peer's Tc + jitter.
-%%% ---------------------------------------------------------------------------
-%%% # reconnect/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # reconnect/2
+%% ---------------------------------------------------------------------------
reconnect(Pid, #state{service_name = SvcName,
- peerT = PeerT}) ->
- #peer{ref = Ref,
- type = connect,
- options = Opts}
- = fetch(PeerT, Pid),
+ watchdogT = WatchdogT}) ->
+ #watchdog{ref = Ref,
+ type = connect,
+ options = Opts}
+ = fetch(WatchdogT, Pid),
send_event(SvcName, {reconnect, Ref, Opts}).
-%%% ---------------------------------------------------------------------------
-%%% # call_module/4
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # call_module/4
+%% ---------------------------------------------------------------------------
%% Backwards compatibility and never documented/advertised. May be
%% removed.
@@ -1309,10 +1197,10 @@ call_module(Mod, Req, From, #state{service
{reply, {error, Reason}, S}
end.
-cm([#diameter_app{module = ModX, alias = Alias}], Req, From, Svc) ->
- MFA = {ModX, handle_call, [Req, From, Svc]},
+cm([#diameter_app{alias = Alias} = App], Req, From, Svc) ->
+ Args = [Req, From, Svc],
- try state_cb(MFA, Alias) of
+ try state_cb(App, handle_call, Args) of
{noreply = T, ModS} ->
mod_state(Alias, ModS),
T;
@@ -1320,11 +1208,13 @@ cm([#diameter_app{module = ModX, alias = Alias}], Req, From, Svc) ->
mod_state(Alias, ModS),
{T, RC};
T ->
- diameter_lib:error_report({invalid, T}, MFA),
+ diameter_lib:error_report({invalid, T},
+ {App, handle_call, Args}),
invalid
catch
E: Reason ->
- diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA),
+ diameter_lib:error_report({failure, {E, Reason, ?STACK}},
+ {App, handle_call, Args}),
failure
end;
@@ -1334,1270 +1224,16 @@ cm([], _, _, _) ->
cm([_,_|_], _, _, _) ->
multiple.
-%%% ---------------------------------------------------------------------------
-%%% # send_request/6
-%%% ---------------------------------------------------------------------------
-
-%% Send an outgoing request in its dedicated process.
-%%
-%% Note that both encode of the outgoing request and of the received
-%% answer happens in this process. It's also this process that replies
-%% to the caller. The service process only handles the state-retaining
-%% callbacks.
-%%
-%% The mod field of the #diameter_app{} here includes any extra
-%% arguments passed to diameter:call/2.
-
-send_request({TPid, Caps, App} = T, Mask, Msg, Opts, Caller, SvcName) ->
- #diameter_app{module = ModX}
- = App,
-
- Pkt = make_prepare_packet(Mask, Msg),
-
- send_req(cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]),
- Pkt,
- T,
- Opts,
- Caller,
- SvcName,
- []).
-
-send_req({send, P}, Pkt, T, Opts, Caller, SvcName, Fs) ->
- send_req(make_request_packet(P, Pkt), T, Opts, Caller, SvcName, Fs);
-
-send_req({discard, Reason} , _, _, _, _, _, _) ->
- {error, Reason};
-
-send_req(discard, _, _, _, _, _, _) ->
- {error, discarded};
-
-send_req({eval_packet, RC, F}, Pkt, T, Opts, Caller, SvcName, Fs) ->
- send_req(RC, Pkt, T, Opts, Caller, SvcName, [F|Fs]);
-
-send_req(E, _, {_, _, App}, _, _, _, _) ->
- ?ERROR({invalid_return, prepare_request, App, E}).
-
-%% make_prepare_packet/2
-%%
-%% Turn an outgoing request as passed to call/4 into a diameter_packet
-%% record in preparation for a prepare_request callback.
-
-make_prepare_packet(_, Bin)
- when is_binary(Bin) ->
- #diameter_packet{header = diameter_codec:decode_header(Bin),
- bin = Bin};
-
-make_prepare_packet(Mask, #diameter_packet{msg = [#diameter_header{} = Hdr
- | Avps]}
- = Pkt) ->
- Pkt#diameter_packet{msg = [make_prepare_header(Mask, Hdr) | Avps]};
-
-make_prepare_packet(Mask, #diameter_packet{header = Hdr} = Pkt) ->
- Pkt#diameter_packet{header = make_prepare_header(Mask, Hdr)};
-
-make_prepare_packet(Mask, Msg) ->
- make_prepare_packet(Mask, #diameter_packet{msg = Msg}).
-
-%% make_prepare_header/2
-
-make_prepare_header(Mask, undefined) ->
- Seq = diameter_session:sequence(Mask),
- make_prepare_header(#diameter_header{end_to_end_id = Seq,
- hop_by_hop_id = Seq});
-
-make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined,
- hop_by_hop_id = undefined}
- = H) ->
- Seq = diameter_session:sequence(Mask),
- make_prepare_header(H#diameter_header{end_to_end_id = Seq,
- hop_by_hop_id = Seq});
-
-make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined} = H) ->
- Seq = diameter_session:sequence(Mask),
- make_prepare_header(H#diameter_header{end_to_end_id = Seq});
-
-make_prepare_header(Mask, #diameter_header{hop_by_hop_id = undefined} = H) ->
- Seq = diameter_session:sequence(Mask),
- make_prepare_header(H#diameter_header{hop_by_hop_id = Seq});
-
-make_prepare_header(_, Hdr) ->
- make_prepare_header(Hdr).
-
-%% make_prepare_header/1
-
-make_prepare_header(#diameter_header{version = undefined} = Hdr) ->
- make_prepare_header(Hdr#diameter_header{version = ?DIAMETER_VERSION});
-
-make_prepare_header(#diameter_header{} = Hdr) ->
- Hdr;
-
-make_prepare_header(T) ->
- ?ERROR({invalid_header, T}).
-
-%% make_request_packet/2
-%%
-%% Reconstruct a diameter_packet from the return value of
-%% prepare_request or prepare_retransmit callback.
-
-make_request_packet(Bin, _)
- when is_binary(Bin) ->
- make_prepare_packet(false, Bin);
-
-make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]}
- = Pkt,
- _) ->
- Pkt;
-
-%% Returning a diameter_packet with no header from a prepare_request
-%% or prepare_retransmit callback retains the header passed into it.
-%% This is primarily so that the end to end and hop by hop identifiers
-%% are retained.
-make_request_packet(#diameter_packet{header = Hdr} = Pkt,
- #diameter_packet{header = Hdr0}) ->
- Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)};
-
-make_request_packet(Msg, Pkt) ->
- Pkt#diameter_packet{msg = Msg}.
-
-%% fold_record/2
-
-fold_record(undefined, R) ->
- R;
-fold_record(Rec, R) ->
- diameter_lib:fold_tuple(2, Rec, R).
-
-%% send_req/6
-
-send_req(Pkt, {TPid, Caps, App}, Opts, Caller, SvcName, Fs) ->
- #diameter_app{alias = Alias,
- dictionary = Dict,
- module = ModX,
- options = [{answer_errors, AE} | _]}
- = App,
-
- EPkt = encode(Dict, Pkt, Fs),
-
- #options{filter = Filter,
- timeout = Timeout}
- = Opts,
-
- Req = #request{packet = Pkt,
- from = Caller,
- handler = self(),
- transport = TPid,
- caps = Caps,
- app = Alias,
- filter = Filter,
- dictionary = Dict,
- module = ModX},
-
- try
- TRef = send_request(TPid, EPkt, Req, Timeout),
- ack(Caller),
- handle_answer(SvcName, AE, recv_answer(Timeout, SvcName, {TRef, Req}))
- after
- erase_request(EPkt)
- end.
-
-%% Tell caller a send has been attempted.
-ack({Pid, Ref}) ->
- Pid ! Ref.
-
-%% recv_answer/3
-
-recv_answer(Timeout,
- SvcName,
- {TRef, #request{from = {_, Ref}, packet = RPkt} = Req}
- = T) ->
-
- %% Matching on TRef below ensures we ignore messages that pertain
- %% to a previous transport prior to failover. The answer message
- %% includes the #request{} since it's not necessarily Req; that
- %% is, from the last peer to which we've transmitted.
-
- receive
- {answer = A, Ref, Rq, Pkt} -> %% Answer from peer
- {A, Rq, Pkt};
- {timeout = Reason, TRef, _} -> %% No timely reply
- {error, Req, Reason};
- {failover = Reason, TRef, false} -> %% No alternate peer
- {error, Req, Reason};
- {failover, TRef, Transport} -> %% Resend to alternate peer
- try_retransmit(Timeout, SvcName, Req, Transport);
- {failover, TRef} -> %% May have missed failover notification
- Seqs = diameter_codec:sequence_numbers(RPkt),
- Pid = whois(SvcName),
- is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}),
- recv_answer(Timeout, SvcName, T)
- end.
-%% Note that failover starts a new timer and that expiry of an old
-%% timer value is ignored. This means that an answer could be accepted
-%% from a peer after timeout in the case of failover.
-
-try_retransmit(Timeout, SvcName, Req, Transport) ->
- try retransmit(Transport, Req, SvcName, Timeout) of
- T -> recv_answer(Timeout, SvcName, T)
- catch
- ?FAILURE(Reason) -> {error, Req, Reason}
- end.
-
-%% handle_error/3
-
-handle_error(Req, Reason, SvcName) ->
- #request{module = ModX,
- packet = Pkt,
- transport = TPid,
- caps = Caps}
- = Req,
- cb(ModX, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]).
-
-msg(#diameter_packet{msg = undefined, bin = Bin}) ->
- Bin;
-msg(#diameter_packet{msg = Msg}) ->
- Msg.
-
-%% encode/3
-
-encode(Dict, Pkt, Fs) ->
- P = encode(Dict, Pkt),
- eval_packet(P, Fs),
- P.
-
-%% encode/2
-
-%% Note that prepare_request can return a diameter_packet containing
-%% header or transport_data. Even allow the returned record to contain
-%% an encoded binary. This isn't the usual case but could some in
-%% handy, for test at least. (For example, to send garbage.)
-
-%% The normal case: encode the returned message.
-encode(Dict, #diameter_packet{msg = Msg, bin = undefined} = Pkt) ->
- D = pick_dictionary([Dict, ?BASE], Msg),
- diameter_codec:encode(D, Pkt);
-
-%% Callback has returned an encoded binary: just send.
-encode(_, #diameter_packet{} = Pkt) ->
- Pkt.
-
-%% pick_dictionary/2
-
-%% Pick the first dictionary that declares the application id in the
-%% specified header.
-pick_dictionary(Ds, [#diameter_header{application_id = Id} | _]) ->
- pd(Ds, fun(D) -> Id = D:id() end);
-
-%% Pick the first dictionary that knows the specified message name.
-pick_dictionary(Ds, [MsgName|_]) ->
- pd(Ds, fun(D) -> D:msg2rec(MsgName) end);
-
-%% Pick the first dictionary that knows the name of the specified
-%% message record.
-pick_dictionary(Ds, Rec) ->
- Name = element(1, Rec),
- pd(Ds, fun(D) -> D:rec2msg(Name) end).
-
-pd([D|Ds], F) ->
- try
- F(D),
- D
- catch
- error:_ ->
- pd(Ds, F)
- end;
-
-pd([], _) ->
- ?ERROR(no_dictionary).
-
-%% send_request/4
-
-send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, Timeout)
- when node() == node(TPid) ->
- %% Store the outgoing request before sending to avoid a race with
- %% reply reception.
- TRef = store_request(TPid, Bin, Req, Timeout),
- send(TPid, Pkt),
- TRef;
-
-%% Send using a remote transport: spawn a process on the remote node
-%% to relay the answer.
-send_request(TPid, #diameter_packet{} = Pkt, Req, Timeout) ->
- TRef = erlang:start_timer(Timeout, self(), timeout),
- T = {TPid, Pkt, Req, Timeout, TRef},
- spawn(node(TPid), ?MODULE, send, [T]),
- TRef.
-
-%% send/1
-
-send({TPid, Pkt, #request{handler = Pid} = Req, Timeout, TRef}) ->
- Ref = send_request(TPid, Pkt, Req#request{handler = self()}, Timeout),
- Pid ! reref(receive T -> T end, Ref, TRef).
-
-reref({T, Ref, R}, Ref, TRef) ->
- {T, TRef, R};
-reref(T, _, _) ->
- T.
-
-%% send/2
-
-send(Pid, Pkt) ->
- Pid ! {send, Pkt}.
-
-%% retransmit/4
-
-retransmit({TPid, Caps, #diameter_app{alias = Alias} = App} = T,
- #request{app = Alias, packet = Pkt0}
- = Req,
- SvcName,
- Timeout) ->
- have_request(Pkt0, TPid) %% Don't failover to a peer we've
- andalso ?THROW(timeout), %% already sent to.
-
- #diameter_packet{header = Hdr0} = Pkt0,
- Hdr = Hdr0#diameter_header{is_retransmitted = true},
- Pkt = Pkt0#diameter_packet{header = Hdr},
-
- resend_req(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]),
- T,
- Req#request{packet = Pkt},
- Timeout,
- []).
-
-resend_req({send, P}, T, #request{packet = Pkt} = Req, Timeout, Fs) ->
- retransmit(make_request_packet(P, Pkt), T, Req, Timeout, Fs);
-
-resend_req({discard, Reason}, _, _, _, _) ->
- ?THROW(Reason);
-
-resend_req(discard, _, _, _, _) ->
- ?THROW(discarded);
-
-resend_req({eval_packet, RC, F}, T, Req, Timeout, Fs) ->
- resend_req(RC, T, Req, Timeout, [F|Fs]);
-
-resend_req(T, {_, _, App}, _, _, _) ->
- ?ERROR({invalid_return, prepare_retransmit, App, T}).
-
-%% retransmit/6
-
-retransmit(Pkt, {TPid, Caps, _}, #request{dictionary = D} = Req0, Tmo, Fs) ->
- EPkt = encode(D, Pkt, Fs),
-
- Req = Req0#request{transport = TPid,
- packet = Pkt,
- caps = Caps},
-
- ?LOG(retransmission, Req),
- TRef = send_request(TPid, EPkt, Req, Tmo),
- {TRef, Req}.
-
-%% store_request/4
-
-store_request(TPid, Bin, Req, Timeout) ->
- Seqs = diameter_codec:sequence_numbers(Bin),
- TRef = erlang:start_timer(Timeout, self(), timeout),
- ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}),
- ets:member(?REQUEST_TABLE, TPid)
- orelse (self() ! {failover, TRef}), %% possibly missed failover
- TRef.
-
-%% lookup_request/2
-
-lookup_request(Msg, TPid)
- when is_pid(TPid) ->
- lookup(Msg, TPid, '_');
-
-lookup_request(Msg, TRef)
- when is_reference(TRef) ->
- lookup(Msg, '_', TRef).
-
-lookup(Msg, TPid, TRef) ->
- Seqs = diameter_codec:sequence_numbers(Msg),
- Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, TRef},
- [],
- ['$_']}],
- case ets:select(?REQUEST_TABLE, Spec) of
- [{_, Req, _}] ->
- Req;
- [] ->
- false
- end.
-
-%% erase_request/1
-
-erase_request(Pkt) ->
- ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)).
-
-%% match_requests/1
-
-match_requests(TPid) ->
- Pat = {'_', #request{transport = TPid, _ = '_'}, '_'},
- ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}]).
-
-%% have_request/2
-
-have_request(Pkt, TPid) ->
- Seqs = diameter_codec:sequence_numbers(Pkt),
- Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'},
- '$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1).
-
-%% request_peer_up/1
-
-request_peer_up(TPid) ->
- ets:insert(?REQUEST_TABLE, {TPid}).
-
-%% request_peer_down/2
-
-request_peer_down(TPid, S) ->
- ets:delete(?REQUEST_TABLE, TPid),
- lists:foreach(fun(T) -> failover(T,S) end, match_requests(TPid)).
-%% Note that a request process can store its request after failover
-%% notifications are sent here: store_request/4 sends the notification
-%% in that case. Note also that we'll send as many notifications to a
-%% given handler as there are peers its sent to. All but one of these
-%% will be ignored.
-
-%%% ---------------------------------------------------------------------------
-%%% recv_request/3
-%%% ---------------------------------------------------------------------------
-
-recv_request(TPid, Pkt, {ConnT, SvcName, Apps, Mask}) ->
- try ets:lookup(ConnT, TPid) of
- [C] ->
- recv_request(C, TPid, Pkt, SvcName, Apps, Mask);
- [] -> %% transport has gone down
- ok
- catch
- error: badarg -> %% service has gone down (and taken table with it)
- ok
- end.
-
-%% recv_request/5
-
-recv_request(#conn{apps = SApps, caps = Caps},
- TPid,
- Pkt,
- SvcName,
- Apps,
- Mask) ->
- #diameter_caps{origin_host = {OH,_},
- origin_realm = {OR,_}}
- = Caps,
-
- #diameter_packet{header = #diameter_header{application_id = Id}}
- = Pkt,
-
- recv_request(find_recv_app(Id, SApps),
- {SvcName, OH, OR},
- TPid,
- Apps,
- Mask,
- Caps,
- Pkt).
-
-%% find_recv_app/2
-
-%% No one should be sending the relay identifier.
-find_recv_app(?APP_ID_RELAY, _) ->
- false;
-
-%% With any other id we either support it locally or as a relay.
-find_recv_app(Id, SApps) ->
- keyfind([Id, ?APP_ID_RELAY], 1, SApps).
-
-%% keyfind/3
-
-keyfind([], _, _) ->
- false;
-keyfind([Key | Rest], Pos, L) ->
- case lists:keyfind(Key, Pos, L) of
- false ->
- keyfind(Rest, Pos, L);
- T ->
- T
- end.
-
-%% recv_request/7
-
-recv_request({Id, Alias}, T, TPid, Apps, Mask, Caps, Pkt) ->
- #diameter_app{dictionary = Dict}
- = A
- = find_app(Alias, Apps),
- recv_request(T,
- {TPid, Caps},
- A,
- Mask,
- diameter_codec:decode(Id, Dict, Pkt));
-%% Note that the decode is different depending on whether or not Id is
-%% ?APP_ID_RELAY.
-
-%% DIAMETER_APPLICATION_UNSUPPORTED 3007
-%% A request was sent for an application that is not supported.
-
-recv_request(false, T, TPid, _, _, _, Pkt) ->
- As = collect_avps(Pkt),
- protocol_error(3007, T, TPid, Pkt#diameter_packet{avps = As}).
-
-collect_avps(Pkt) ->
- case diameter_codec:collect_avps(Pkt) of
- {_Bs, As} ->
- As;
- As ->
- As
- end.
-
-%% recv_request/5
-
-%% Wrong number of bits somewhere in the message: reply.
-%%
-%% DIAMETER_INVALID_AVP_BITS 3009
-%% A request was received that included an AVP whose flag bits are
-%% set to an unrecognized value, or that is inconsistent with the
-%% AVP's definition.
-%%
-recv_request(T, {TPid, _}, _, _, #diameter_packet{errors = [Bs | _]} = Pkt)
- when is_bitstring(Bs) ->
- protocol_error(3009, T, TPid, Pkt);
-
-%% Either we support this application but don't recognize the command
-%% or we're a relay and the command isn't proxiable.
-%%
-%% DIAMETER_COMMAND_UNSUPPORTED 3001
-%% The Request contained a Command-Code that the receiver did not
-%% recognize or support. This MUST be used when a Diameter node
-%% receives an experimental command that it does not understand.
-%%
-recv_request(T,
- {TPid, _},
- #diameter_app{id = Id},
- _,
- #diameter_packet{header = #diameter_header{is_proxiable = P},
- msg = M}
- = Pkt)
- when ?APP_ID_RELAY /= Id, undefined == M;
- ?APP_ID_RELAY == Id, not P ->
- protocol_error(3001, T, TPid, Pkt);
-
-%% Error bit was set on a request.
-%%
-%% DIAMETER_INVALID_HDR_BITS 3008
-%% A request was received whose bits in the Diameter header were
-%% either set to an invalid combination, or to a value that is
-%% inconsistent with the command code's definition.
-%%
-recv_request(T,
- {TPid, _},
- _,
- _,
- #diameter_packet{header = #diameter_header{is_error = true}}
- = Pkt) ->
- protocol_error(3008, T, TPid, Pkt);
-
-%% A message in a locally supported application or a proxiable message
-%% in the relay application. Don't distinguish between the two since
-%% each application has its own callback config. That is, the user can
-%% easily distinguish between the two cases.
-recv_request(T, TC, App, Mask, Pkt) ->
- request_cb(T, TC, App, Mask, examine(Pkt)).
-
-%% Note that there may still be errors but these aren't protocol
-%% (3xxx) errors that lead to an answer-message.
-
-request_cb({SvcName, _OH, _OR} = T, TC, App, Mask, Pkt) ->
- request_cb(cb(App, handle_request, [Pkt, SvcName, TC]),
- App,
- Mask,
- T,
- TC,
- [],
- Pkt).
-
-%% examine/1
-%%
-%% Look for errors in a decoded message. Length errors result in
-%% decode failure in diameter_codec.
-
-examine(#diameter_packet{header = #diameter_header{version
- = ?DIAMETER_VERSION}}
- = Pkt) ->
- Pkt;
-
-%% DIAMETER_UNSUPPORTED_VERSION 5011
-%% This error is returned when a request was received, whose version
-%% number is unsupported.
-
-examine(#diameter_packet{errors = Es} = Pkt) ->
- Pkt#diameter_packet{errors = [5011 | Es]}.
-%% It's odd/unfortunate that this isn't a protocol error.
-
-%% request_cb/7
-
-%% A reply may be an answer-message, constructed either here or by
-%% the handle_request callback. The header from the incoming request
-%% is passed into the encode so that it can retrieve the relevant
-%% command code in this case. It will also then ignore Dict and use
-%% the base encoder.
-request_cb({reply, Ans},
- #diameter_app{dictionary = Dict},
- _,
- _,
- {TPid, _},
- Fs,
- Pkt) ->
- reply(Ans, Dict, TPid, Fs, Pkt);
-
-%% An 3xxx result code, for which the E-bit is set in the header.
-request_cb({protocol_error, RC}, _, _, T, {TPid, _}, Fs, Pkt)
- when 3000 =< RC, RC < 4000 ->
- protocol_error(RC, T, TPid, Fs, Pkt);
-
-%% RFC 3588 says we must reply 3001 to anything unrecognized or
-%% unsupported. 'noreply' is undocumented (and inappropriately named)
-%% backwards compatibility for this, protocol_error the documented
-%% alternative.
-request_cb(noreply, _, _, T, {TPid, _}, Fs, Pkt) ->
- protocol_error(3001, T, TPid, Fs, Pkt);
-
-%% Relay a request to another peer. This is equivalent to doing an
-%% explicit call/4 with the message in question except that (1) a loop
-%% will be detected by examining Route-Record AVP's, (3) a
-%% Route-Record AVP will be added to the outgoing request and (3) the
-%% End-to-End Identifier will default to that in the
-%% #diameter_header{} without the need for an end_to_end_identifier
-%% option.
-%%
-%% relay and proxy are similar in that they require the same handling
-%% with respect to Route-Record and End-to-End identifier. The
-%% difference is that a proxy advertises specific applications, while
-%% a relay advertises the relay application. If a callback doesn't
-%% want to distinguish between the cases in the callback return value
-%% then 'resend' is a neutral alternative.
-%%
-request_cb({A, Opts},
- #diameter_app{id = Id}
- = App,
- Mask,
- T,
- TC,
- Fs,
- Pkt)
- when A == relay, Id == ?APP_ID_RELAY;
- A == proxy, Id /= ?APP_ID_RELAY;
- A == resend ->
- resend(Opts, App, Mask, T, TC, Fs, Pkt);
-
-request_cb(discard, _, _, _, _, _, _) ->
- ok;
-
-request_cb({eval_packet, RC, F}, App, Mask, T, TC, Fs, Pkt) ->
- request_cb(RC, App, Mask, T, TC, [F|Fs], Pkt);
-
-request_cb({eval, RC, F}, App, Mask, T, TC, Fs, Pkt) ->
- request_cb(RC, App, Mask, T, TC, Fs, Pkt),
- diameter_lib:eval(F).
-
-%% protocol_error/5
-
-protocol_error(RC, {_, OH, OR}, TPid, Fs, Pkt) ->
- #diameter_packet{avps = Avps, errors = Es} = Pkt,
- ?LOG({error, RC}, Pkt),
- reply(answer_message({OH, OR, RC}, Avps),
- ?BASE,
- TPid,
- Fs,
- Pkt#diameter_packet{errors = [RC | Es]}).
-%% Note that reply/5 may set the result code once more. It's set in
-%% answer_message/2 in case reply/5 doesn't.
-
-%% protocol_error/4
-
-protocol_error(RC, T, TPid, Pkt) ->
- protocol_error(RC, T, TPid, [], Pkt).
-
-%% resend/7
-%%
-%% Resend a message as a relay or proxy agent.
-
-resend(Opts,
- #diameter_app{} = App,
- Mask,
- {_SvcName, OH, _OR} = T,
- {_TPid, _Caps} = TC,
- Fs,
- #diameter_packet{avps = Avps} = Pkt) ->
- {Code, _Flags, Vid} = ?BASE:avp_header('Route-Record'),
- resend(is_loop(Code, Vid, OH, Avps), Opts, App, Mask, T, TC, Fs, Pkt).
-
-%% DIAMETER_LOOP_DETECTED 3005
-%% An agent detected a loop while trying to get the message to the
-%% intended recipient. The message MAY be sent to an alternate peer,
-%% if one is available, but the peer reporting the error has
-%% identified a configuration problem.
-
-resend(true, _, _, _, T, {TPid, _}, Fs, Pkt) -> %% Route-Record loop
- protocol_error(3005, T, TPid, Fs, Pkt);
-
-%% 6.1.8. Relaying and Proxying Requests
-%%
-%% A relay or proxy agent MUST append a Route-Record AVP to all requests
-%% forwarded. The AVP contains the identity of the peer the request was
-%% received from.
-
-resend(false,
- Opts,
- App,
- Mask,
- {SvcName, _, _} = T,
- {TPid, #diameter_caps{origin_host = {_, OH}}},
- Fs,
- #diameter_packet{header = Hdr0,
- avps = Avps}
- = Pkt) ->
- Route = #diameter_avp{data = {?BASE, 'Route-Record', OH}},
- Seq = diameter_session:sequence(Mask),
- Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq},
- Msg = [Hdr, Route | Avps],
- resend(call(SvcName, App, Msg, Opts), T, TPid, Fs, Pkt).
-%% The incoming request is relayed with the addition of a
-%% Route-Record. Note the requirement on the return from call/4 below,
-%% which places a requirement on the value returned by the
-%% handle_answer callback of the application module in question.
-%%
-%% Note that there's nothing stopping the request from being relayed
-%% back to the sender. A pick_peer callback may want to avoid this but
-%% a smart peer might recognize the potential loop and choose another
-%% route. A less smart one will probably just relay the request back
-%% again and force us to detect the loop. A pick_peer that wants to
-%% avoid this can specify filter to avoid the possibility.
-%% Eg. {neg, {host, OH} where #diameter_caps{origin_host = {OH, _}}.
-%%
-%% RFC 6.3 says that a relay agent does not modify Origin-Host but
-%% says nothing about a proxy. Assume it should behave the same way.
-
-%% resend/4
-%%
-%% Relay a reply to a relayed request.
-
-%% Answer from the peer: reset the hop by hop identifier and send.
-resend(#diameter_packet{bin = B}
- = Pkt,
- _,
- TPid,
- Fs,
- #diameter_packet{header = #diameter_header{hop_by_hop_id = Id},
- transport_data = TD}) ->
- P = Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B),
- transport_data = TD},
- eval_packet(P, Fs),
- send(TPid, P);
-%% TODO: counters
-
-%% Or not: DIAMETER_UNABLE_TO_DELIVER.
-resend(_, T, TPid, Fs, Pkt) ->
- protocol_error(3002, T, TPid, Fs, Pkt).
-
-%% is_loop/4
-%%
-%% Is there a Route-Record AVP with our Origin-Host?
-
-is_loop(Code,
- Vid,
- Bin,
- [#diameter_avp{code = Code, vendor_id = Vid, data = Bin} | _]) ->
- true;
-
-is_loop(_, _, _, []) ->
- false;
-
-is_loop(Code, Vid, OH, [_ | Avps])
- when is_binary(OH) ->
- is_loop(Code, Vid, OH, Avps);
-
-is_loop(Code, Vid, OH, Avps) ->
- is_loop(Code, Vid, ?BASE:avp(encode, OH, 'Route-Record'), Avps).
-
-%% reply/5
-%%
-%% Send a locally originating reply.
-
-%% Skip the setting of Result-Code and Failed-AVP's below. This is
-%% currently undocumented.
-reply([Msg], Dict, TPid, Fs, Pkt)
- when is_list(Msg);
- is_tuple(Msg) ->
- reply(Msg, Dict, TPid, Fs, Pkt#diameter_packet{errors = []});
-
-%% No errors or a diameter_header/avp list.
-reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = Es} = ReqPkt)
- when [] == Es;
- is_record(hd(Msg), diameter_header) ->
- Pkt = diameter_codec:encode(Dict, make_answer_packet(Msg, ReqPkt)),
- eval_packet(Pkt, Fs),
- incr(send, Pkt, Dict, TPid), %% count result codes in sent answers
- send(TPid, Pkt);
-
-%% Or not: set Result-Code and Failed-AVP AVP's.
-reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = [H|_] = Es} = Pkt) ->
- reply(rc(Msg, rc(H), [A || {_,A} <- Es], Dict),
- Dict,
- TPid,
- Fs,
- Pkt#diameter_packet{errors = []}).
-
-eval_packet(Pkt, Fs) ->
- lists:foreach(fun(F) -> diameter_lib:eval([F,Pkt]) end, Fs).
-
-%% make_answer_packet/2
-
-%% A reply message clears the R and T flags and retains the P flag.
-%% The E flag will be set at encode. 6.2 of 3588 requires the same P
-%% flag on an answer as on the request. A #diameter_packet{} returned
-%% from a handle_request callback can circumvent this by setting its
-%% own header values.
-make_answer_packet(#diameter_packet{header = Hdr,
- msg = Msg,
- transport_data = TD},
- #diameter_packet{header = ReqHdr}) ->
- Hdr0 = ReqHdr#diameter_header{version = ?DIAMETER_VERSION,
- is_request = false,
- is_error = undefined,
- is_retransmitted = false},
- #diameter_packet{header = fold_record(Hdr0, Hdr),
- msg = Msg,
- transport_data = TD};
-
-%% Binaries and header/avp lists are sent as-is.
-make_answer_packet(Bin, #diameter_packet{transport_data = TD})
- when is_binary(Bin) ->
- #diameter_packet{bin = Bin,
- transport_data = TD};
-make_answer_packet([#diameter_header{} | _] = Msg,
- #diameter_packet{transport_data = TD}) ->
- #diameter_packet{msg = Msg,
- transport_data = TD};
-
-%% Otherwise, preserve transport_data.
-make_answer_packet(Msg, #diameter_packet{transport_data = TD} = Pkt) ->
- make_answer_packet(#diameter_packet{msg = Msg, transport_data = TD}, Pkt).
-
-%% rc/1
-
-rc({RC, _}) ->
- RC;
-rc(RC) ->
- RC.
-
-%% rc/4
-
-rc(#diameter_packet{msg = Rec} = Pkt, RC, Failed, Dict) ->
- Pkt#diameter_packet{msg = rc(Rec, RC, Failed, Dict)};
-
-rc(Rec, RC, Failed, Dict)
- when is_integer(RC) ->
- set(Rec,
- lists:append([rc(Rec, {'Result-Code', RC}, Dict),
- failed_avp(Rec, Failed, Dict)]),
- Dict).
-
-%% Reply as name and tuple list ...
-set([_|_] = Ans, Avps, _) ->
- Ans ++ Avps; %% Values nearer tail take precedence.
-
-%% ... or record.
-set(Rec, Avps, Dict) ->
- Dict:'#set-'(Avps, Rec).
-
-%% rc/3
-%%
-%% Turn the result code into a list if its optional and only set it if
-%% the arity is 1 or {0,1}. In other cases (which probably shouldn't
-%% exist in practise) we can't know what's appropriate.
-
-rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) ->
- case Dict:avp_arity(MsgName, 'Result-Code') of
- 1 -> [T];
- {0,1} -> [{K, [RC]}];
- _ -> []
- end;
-
-rc(Rec, T, Dict) ->
- rc([Dict:rec2msg(element(1, Rec))], T, Dict).
-
-%% failed_avp/3
-
-failed_avp(_, [] = No, _) ->
- No;
-
-failed_avp(Rec, Failed, Dict) ->
- [fa(Rec, [{'AVP', Failed}], Dict)].
-
-%% Reply as name and tuple list ...
-fa([MsgName | Values], FailedAvp, Dict) ->
- R = Dict:msg2rec(MsgName),
- try
- Dict:'#info-'(R, {index, 'Failed-AVP'}),
- {'Failed-AVP', [FailedAvp]}
- catch
- error: _ ->
- Avps = proplists:get_value('AVP', Values, []),
- A = #diameter_avp{name = 'Failed-AVP',
- value = FailedAvp},
- {'AVP', [A|Avps]}
- end;
-
-%% ... or record.
-fa(Rec, FailedAvp, Dict) ->
- try
- {'Failed-AVP', [FailedAvp]}
- catch
- error: _ ->
- Avps = Dict:'get-'('AVP', Rec),
- A = #diameter_avp{name = 'Failed-AVP',
- value = FailedAvp},
- {'AVP', [A|Avps]}
- end.
-
-%% 3. Diameter Header
-%%
-%% E(rror) - If set, the message contains a protocol error,
-%% and the message will not conform to the ABNF
-%% described for this command. Messages with the 'E'
-%% bit set are commonly referred to as error
-%% messages. This bit MUST NOT be set in request
-%% messages. See Section 7.2.
-
-%% 3.2. Command Code ABNF specification
-%%
-%% e-bit = ", ERR"
-%% ; If present, the 'E' bit in the Command
-%% ; Flags is set, indicating that the answer
-%% ; message contains a Result-Code AVP in
-%% ; the "protocol error" class.
-
-%% 7.1.3. Protocol Errors
-%%
-%% Errors that fall within the Protocol Error category SHOULD be treated
-%% on a per-hop basis, and Diameter proxies MAY attempt to correct the
-%% error, if it is possible. Note that these and only these errors MUST
-%% only be used in answer messages whose 'E' bit is set.
-
-%% Thus, only construct answers to protocol errors. Other errors
-%% require an message-specific answer and must be handled by the
-%% application.
-
-%% 6.2. Diameter Answer Processing
-%%
-%% When a request is locally processed, the following procedures MUST be
-%% applied to create the associated answer, in addition to any
-%% additional procedures that MAY be discussed in the Diameter
-%% application defining the command:
-%%
-%% - The same Hop-by-Hop identifier in the request is used in the
-%% answer.
-%%
-%% - The local host's identity is encoded in the Origin-Host AVP.
-%%
-%% - The Destination-Host and Destination-Realm AVPs MUST NOT be
-%% present in the answer message.
-%%
-%% - The Result-Code AVP is added with its value indicating success or
-%% failure.
-%%
-%% - If the Session-Id is present in the request, it MUST be included
-%% in the answer.
-%%
-%% - Any Proxy-Info AVPs in the request MUST be added to the answer
-%% message, in the same order they were present in the request.
-%%
-%% - The 'P' bit is set to the same value as the one in the request.
-%%
-%% - The same End-to-End identifier in the request is used in the
-%% answer.
-%%
-%% Note that the error messages (see Section 7.3) are also subjected to
-%% the above processing rules.
-
-%% 7.3. Error-Message AVP
-%%
-%% The Error-Message AVP (AVP Code 281) is of type UTF8String. It MAY
-%% accompany a Result-Code AVP as a human readable error message. The
-%% Error-Message AVP is not intended to be useful in real-time, and
-%% SHOULD NOT be expected to be parsed by network entities.
-
-%% answer_message/2
-
-answer_message({OH, OR, RC}, Avps) ->
- {Code, _, Vid} = ?BASE:avp_header('Session-Id'),
- ['answer-message', {'Origin-Host', OH},
- {'Origin-Realm', OR},
- {'Result-Code', RC}
- | session_id(Code, Vid, Avps)].
-
-session_id(Code, Vid, Avps)
- when is_list(Avps) ->
- try
- {value, #diameter_avp{data = D}} = find_avp(Code, Vid, Avps),
- [{'Session-Id', [?BASE:avp(decode, D, 'Session-Id')]}]
- catch
- error: _ ->
- []
- end.
-
-%% find_avp/3
-
-find_avp(Code, Vid, Avps)
- when is_integer(Code), (undefined == Vid orelse is_integer(Vid)) ->
- find(fun(A) -> is_avp(Code, Vid, A) end, Avps).
-
-%% The final argument here could be a list of AVP's, depending on the case,
-%% but we're only searching at the top level.
-is_avp(Code, Vid, #diameter_avp{code = Code, vendor_id = Vid}) ->
- true;
-is_avp(_, _, _) ->
- false.
-
-find(_, []) ->
- false;
-find(Pred, [H|T]) ->
- case Pred(H) of
- true ->
- {value, H};
- false ->
- find(Pred, T)
- end.
-
-%% 7. Error Handling
-%%
-%% There are certain Result-Code AVP application errors that require
-%% additional AVPs to be present in the answer. In these cases, the
-%% Diameter node that sets the Result-Code AVP to indicate the error
-%% MUST add the AVPs. Examples are:
-%%
-%% - An unrecognized AVP is received with the 'M' bit (Mandatory bit)
-%% set, causes an answer to be sent with the Result-Code AVP set to
-%% DIAMETER_AVP_UNSUPPORTED, and the Failed-AVP AVP containing the
-%% offending AVP.
-%%
-%% - An AVP that is received with an unrecognized value causes an
-%% answer to be returned with the Result-Code AVP set to
-%% DIAMETER_INVALID_AVP_VALUE, with the Failed-AVP AVP containing the
-%% AVP causing the error.
-%%
-%% - A command is received with an AVP that is omitted, yet is
-%% mandatory according to the command's ABNF. The receiver issues an
-%% answer with the Result-Code set to DIAMETER_MISSING_AVP, and
-%% creates an AVP with the AVP Code and other fields set as expected
-%% in the missing AVP. The created AVP is then added to the Failed-
-%% AVP AVP.
-%%
-%% The Result-Code AVP describes the error that the Diameter node
-%% encountered in its processing. In case there are multiple errors,
-%% the Diameter node MUST report only the first error it encountered
-%% (detected possibly in some implementation dependent order). The
-%% specific errors that can be described by this AVP are described in
-%% the following section.
-
-%% 7.5. Failed-AVP AVP
-%%
-%% The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides
-%% debugging information in cases where a request is rejected or not
-%% fully processed due to erroneous information in a specific AVP. The
-%% value of the Result-Code AVP will provide information on the reason
-%% for the Failed-AVP AVP.
-%%
-%% The possible reasons for this AVP are the presence of an improperly
-%% constructed AVP, an unsupported or unrecognized AVP, an invalid AVP
-%% value, the omission of a required AVP, the presence of an explicitly
-%% excluded AVP (see tables in Section 10), or the presence of two or
-%% more occurrences of an AVP which is restricted to 0, 1, or 0-1
-%% occurrences.
-%%
-%% A Diameter message MAY contain one Failed-AVP AVP, containing the
-%% entire AVP that could not be processed successfully. If the failure
-%% reason is omission of a required AVP, an AVP with the missing AVP
-%% code, the missing vendor id, and a zero filled payload of the minimum
-%% required length for the omitted AVP will be added.
-
-%%% ---------------------------------------------------------------------------
-%%% # handle_answer/3
-%%% ---------------------------------------------------------------------------
-
-%% Process an answer message in call-specific process.
-
-handle_answer(SvcName, _, {error, Req, Reason}) ->
- handle_error(Req, Reason, SvcName);
-
-handle_answer(SvcName,
- AnswerErrors,
- {answer, #request{dictionary = Dict} = Req, Pkt}) ->
- answer(examine(diameter_codec:decode(Dict, Pkt)),
- SvcName,
- AnswerErrors,
- Req).
-
-%% We don't really need to do a full decode if we're a relay and will
-%% just resend with a new hop by hop identifier, but might a proxy
-%% want to examine the answer?
-
-answer(Pkt, SvcName, AE, #request{transport = TPid,
- dictionary = Dict}
- = Req) ->
- try
- incr(recv, Pkt, Dict, TPid)
- of
- _ -> a(Pkt, SvcName, AE, Req)
- catch
- exit: {invalid_error_bit, _} = E ->
- a(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req)
- end.
-
-a(#diameter_packet{errors = Es} = Pkt, SvcName, AE, #request{transport = TPid,
- caps = Caps,
- packet = P}
- = Req)
- when [] == Es;
- callback == AE ->
- cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]);
-
-a(Pkt, SvcName, report, Req) ->
- x(errors, handle_answer, [SvcName, Req, Pkt]);
-
-a(Pkt, SvcName, discard, Req) ->
- x({errors, handle_answer, [SvcName, Req, Pkt]}).
-
-%% Note that we don't check that the application id in the answer's
-%% header is what we expect. (TODO: Does the rfc says anything about
-%% this?)
-
-%% incr/4
-%%
-%% Increment a stats counter for an incoming or outgoing message.
-
-%% TODO: fix
-incr(_, #diameter_packet{msg = undefined}, _, _) ->
- ok;
-
-incr(recv = D, #diameter_packet{header = H, errors = [_|_]}, _, TPid) ->
- incr(TPid, {diameter_codec:msg_id(H), D, error});
-
-incr(Dir, Pkt, Dict, TPid) ->
- #diameter_packet{header = #diameter_header{is_error = E}
- = Hdr,
- msg = Rec}
- = Pkt,
-
- RC = int(get_avp_value(Dict, 'Result-Code', Rec)),
- PE = is_protocol_error(RC),
-
- %% Check that the E bit is set only for 3xxx result codes.
- (not (E orelse PE))
- orelse (E andalso PE)
- orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]),
-
- irc(TPid, Hdr, Dir, rc_counter(Dict, Rec, RC)).
-
-irc(_, _, _, undefined) ->
- false;
-
-irc(TPid, Hdr, Dir, Ctr) ->
- incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}).
-
-%% incr/2
-
-incr(TPid, Counter) ->
- diameter_stats:incr(Counter, TPid, 1).
-
-%% error_counter/2
-
-%% RFC 3588, 7.6:
-%%
-%% All Diameter answer messages defined in vendor-specific
-%% applications MUST include either one Result-Code AVP or one
-%% Experimental-Result AVP.
-%%
-%% Maintain statistics assuming one or the other, not both, which is
-%% surely the intent of the RFC.
-
-rc_counter(Dict, Rec, undefined) ->
- er(get_avp_value(Dict, 'Experimental-Result', Rec));
-rc_counter(_, _, RC) ->
- {'Result-Code', RC}.
-
-%% Outgoing answers may be in any of the forms messages can be sent
-%% in. Incoming messages will be records. We're assuming here that the
-%% arity of the result code AVP's is 0 or 1.
-
-er([{_,_,N} = T | _])
- when is_integer(N) ->
- T;
-er({_,_,N} = T)
- when is_integer(N) ->
- T;
-er(_) ->
- undefined.
-
-%% Extract the first good looking integer. There's no guarantee
-%% that what we're looking for has arity 1.
-int([N|_])
- when is_integer(N) ->
- N;
-int(N)
- when is_integer(N) ->
- N;
-int(_) ->
- undefined.
-
-is_protocol_error(RC) ->
- 3000 =< RC andalso RC < 4000.
-
--spec x(any(), atom(), list()) -> no_return().
-
-%% Warn and exit request process on errors in an incoming answer.
-x(Reason, F, A) ->
- diameter_lib:warning_report(Reason, {?MODULE, F, A}),
- x(Reason).
-
-x(T) ->
- exit(T).
-
-%%% ---------------------------------------------------------------------------
-%%% # failover/[23]
-%%% ---------------------------------------------------------------------------
-
-%% Failover as a consequence of request_peer_down/2.
-failover({_, #request{handler = Pid} = Req, TRef}, S) ->
- Pid ! {failover, TRef, rt(Req, S)}.
-
-%% Failover as a consequence of store_request/4.
-failover(TRef, Seqs, S)
- when is_reference(TRef) ->
- case lookup_request(Seqs, TRef) of
- #request{} = Req ->
- failover({Seqs, Req, TRef}, S);
- false ->
- ok
- end.
-
-%% prepare_request returned a binary ...
-rt(#request{packet = #diameter_packet{msg = undefined}}, _) ->
- false; %% TODO: Not what we should do.
-
-%% ... or not.
-rt(#request{packet = #diameter_packet{msg = Msg},
- dictionary = Dict}
- = Req,
- S) ->
- find_transport(get_destination(Dict, Msg), Req, S).
-
-%%% ---------------------------------------------------------------------------
-%%% # report_status/5
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # report_status/5
+%% ---------------------------------------------------------------------------
report_status(Status,
- #peer{ref = Ref,
- conn = TPid,
- type = Type,
- options = Opts},
- #conn{apps = [_|_] = As,
+ #watchdog{ref = Ref,
+ peer = TPid,
+ type = Type,
+ options = Opts},
+ #peer{apps = [_|_] = As,
caps = Caps},
#state{service_name = SvcName}
= S,
@@ -2615,9 +1251,9 @@ send_event(SvcName, Info) ->
send_event(#diameter_event{service = SvcName} = E) ->
lists:foreach(fun({_, Pid}) -> Pid ! E end, subscriptions(SvcName)).
-%%% ---------------------------------------------------------------------------
-%%% # share_peer/5
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # share_peer/5
+%% ---------------------------------------------------------------------------
share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _],
service_name = Svc}) ->
@@ -2626,9 +1262,9 @@ share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _],
share_peer(_, _, _, _, _) ->
ok.
-%%% ---------------------------------------------------------------------------
-%%% # share_peers/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # share_peers/2
+%% ---------------------------------------------------------------------------
share_peers(Pid, #state{options = [_, {_, true} | _],
local_peers = PDict}) ->
@@ -2640,9 +1276,9 @@ share_peers(_, _) ->
sp(Pid, Alias, Peers) ->
lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers).
-%%% ---------------------------------------------------------------------------
-%%% # remote_peer_up/4
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # remote_peer_up/4
+%% ---------------------------------------------------------------------------
remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _],
service = Svc,
@@ -2662,9 +1298,9 @@ rpu(Pid, Caps, PDict, Aliases) ->
T = {Pid, Caps},
lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases).
-%%% ---------------------------------------------------------------------------
-%%% # remote_peer_down/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # remote_peer_down/2
+%% ---------------------------------------------------------------------------
remote_peer_down(Pid, #state{options = [_, _, {_, true} | _],
shared_peers = PDict}) ->
@@ -2673,164 +1309,20 @@ remote_peer_down(Pid, #state{options = [_, _, {_, true} | _],
rpd(Pid, Alias, PDict) ->
?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict).
-%%% ---------------------------------------------------------------------------
-%%% find_transport/[34]
-%%%
-%%% Output: {TransportPid, #diameter_caps{}, #diameter_app{}}
-%%% | false
-%%% | {error, Reason}
-%%% ---------------------------------------------------------------------------
-
-%% Initial call, from an arbitrary process.
-find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) ->
- #diameter_service{applications = Apps} = Svc,
- ft(find_send_app(Alias, Apps), Msg, Opts, S);
-
-%% Relay or proxy send.
-find_transport(#diameter_app{} = App, Msg, Opts, S) ->
- ft(App, Msg, Opts, S).
-
-ft(#diameter_app{module = Mod, dictionary = Dict} = App, Msg, Opts, S) ->
- #options{filter = Filter,
- extra = Xtra}
- = Opts,
- pick_peer(App#diameter_app{module = Mod ++ Xtra},
- get_destination(Dict, Msg),
- Filter,
- S);
-ft(false = No, _, _, _) ->
- No.
-
-%% This can't be used if we're a relay and sending a message
-%% in an application not known locally. (TODO)
-find_send_app(Alias, Apps) ->
- case lists:keyfind(Alias, #diameter_app.alias, Apps) of
- #diameter_app{id = ?APP_ID_RELAY} ->
- false;
- T ->
- T
- end.
-
-%% Retransmission, in the service process.
-find_transport([_,_] = RH,
- Req,
- #state{service = #diameter_service{pid = Pid,
- applications = Apps}}
- = S)
- when self() == Pid ->
- #request{app = Alias,
- filter = Filter,
- module = ModX}
- = Req,
- #diameter_app{}
- = App
- = lists:keyfind(Alias, #diameter_app.alias, Apps),
-
- pick_peer(App#diameter_app{module = ModX},
- RH,
- Filter,
- S).
-
-%% get_destination/2
-
-get_destination(Dict, Msg) ->
- [str(get_avp_value(Dict, 'Destination-Realm', Msg)),
- str(get_avp_value(Dict, 'Destination-Host', Msg))].
-
-%% This is not entirely correct. The avp could have an arity 1, in
-%% which case an empty list is a DiameterIdentity of length 0 rather
-%% than the list of no values we treat it as by mapping to undefined.
-%% This behaviour is documented.
-str([]) ->
- undefined;
-str(T) ->
- T.
-
-%% get_avp_value/3
-%%
-%% Find an AVP in a message of one of three forms:
-%%
-%% - a message record (as generated from a .dia spec) or
-%% - a list of an atom message name followed by 2-tuple, avp name/value pairs.
-%% - a list of a #diameter_header{} followed by #diameter_avp{} records,
-%%
-%% In the first two forms a dictionary module is used at encode to
-%% identify the type of the AVP and its arity in the message in
-%% question. The third form allows messages to be sent as is, without
-%% a dictionary, which is needed in the case of relay agents, for one.
-
-%% Messages will be header/avps list as a relay and the only AVP's we
-%% look for are in the common dictionary. This is required since the
-%% relay dictionary doesn't inherit the common dictionary (which maybe
-%% it should).
-get_avp_value(?RELAY, Name, Msg) ->
- get_avp_value(?BASE, Name, Msg);
-
-%% Message sent as a header/avps list, probably a relay case but not
-%% necessarily.
-get_avp_value(Dict, Name, [#diameter_header{} | Avps]) ->
- try
- {Code, _, VId} = Dict:avp_header(Name),
- [A|_] = lists:dropwhile(fun(#diameter_avp{code = C, vendor_id = V}) ->
- C /= Code orelse V /= VId
- end,
- Avps),
- avp_decode(Dict, Name, A)
- catch
- error: _ ->
- undefined
- end;
-
-%% Outgoing message as a name/values list.
-get_avp_value(_, Name, [_MsgName | Avps]) ->
- case lists:keyfind(Name, 1, Avps) of
- {_, V} ->
- V;
- _ ->
- undefined
- end;
-
-%% Record might be an answer message in the common dictionary.
-get_avp_value(Dict, Name, Rec)
- when Dict /= ?BASE, element(1, Rec) == 'diameter_base_answer-message' ->
- get_avp_value(?BASE, Name, Rec);
-
-%% Message is typically a record but not necessarily: diameter:call/4
-%% can be passed an arbitrary term.
-get_avp_value(Dict, Name, Rec) ->
- try
- Dict:'#get-'(Name, Rec)
- catch
- error:_ ->
- undefined
- end.
-
-avp_decode(Dict, Name, #diameter_avp{value = undefined,
- data = Bin}) ->
- Dict:avp(decode, Bin, Name);
-avp_decode(_, _, #diameter_avp{value = V}) ->
- V.
-
-%%% ---------------------------------------------------------------------------
-%%% # pick_peer(App, [DestRealm, DestHost], Filter, #state{})
-%%%
-%%% Output: {TransportPid, #diameter_caps{}, App}
-%%% | false
-%%% | {error, Reason}
-%%% ---------------------------------------------------------------------------
-
-%% Find transports to a given realm/host.
+%% ---------------------------------------------------------------------------
+%% pick_peer/4
+%% ---------------------------------------------------------------------------
pick_peer(#diameter_app{alias = Alias}
= App,
- [_,_] = RH,
+ RealmAndHost,
Filter,
#state{local_peers = L,
shared_peers = S,
service_name = SvcName,
service = #diameter_service{pid = Pid}}) ->
- pick_peer(peers(Alias, RH, Filter, L),
- peers(Alias, RH, Filter, S),
+ pick_peer(peers(Alias, RealmAndHost, Filter, L),
+ peers(Alias, RealmAndHost, Filter, S),
Pid,
SvcName,
App).
@@ -2843,7 +1335,12 @@ pick_peer([], [], _, _, _) ->
%% App state is mutable but we're not in the service process: go there.
pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App)
when self() /= Pid ->
- call_service(Pid, {pick_peer, Local, Remote, App});
+ case call_service(Pid, {pick_peer, Local, Remote, App}) of
+ {TPid, _} = T when is_pid(TPid) ->
+ T;
+ {error, _} ->
+ false
+ end;
%% App state isn't mutable or it is and we're in the service process:
%% do the deed.
@@ -2851,19 +1348,18 @@ pick_peer(Local,
Remote,
_Pid,
SvcName,
- #diameter_app{module = ModX,
- alias = Alias,
+ #diameter_app{alias = Alias,
init_state = S,
mutable = M}
= App) ->
- MFA = {ModX, pick_peer, [Local, Remote, SvcName]},
+ Args = [Local, Remote, SvcName],
- try state_cb(App, MFA) of
- {ok, {TPid, #diameter_caps{} = Caps}} when is_pid(TPid) ->
- {TPid, Caps, App};
- {{TPid, #diameter_caps{} = Caps}, ModS} when is_pid(TPid), M ->
+ try state_cb(App, pick_peer, Args) of
+ {ok, {TPid, #diameter_caps{}} = T} when is_pid(TPid) ->
+ T;
+ {{TPid, #diameter_caps{}} = T, ModS} when is_pid(TPid), M ->
mod_state(Alias, ModS),
- {TPid, Caps, App};
+ T;
{false = No, ModS} when M ->
mod_state(Alias, ModS),
No;
@@ -2871,15 +1367,17 @@ pick_peer(Local,
No;
false = No ->
No;
- {{TPid, #diameter_caps{} = Caps}, S} when is_pid(TPid) ->
- {TPid, Caps, App}; %% Accept returned state in the immutable
+ {{TPid, #diameter_caps{}} = T, S} when is_pid(TPid) ->
+ T; %% Accept returned state in the immutable
{false = No, S} -> %% case as long it isn't changed.
No;
T ->
- diameter_lib:error_report({invalid, T, App}, MFA)
+ diameter_lib:error_report({invalid, T, App},
+ {App, pick_peer, Args})
catch
E: Reason ->
- diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA)
+ diameter_lib:error_report({failure, {E, Reason, ?STACK}},
+ {App, pick_peer, Args})
end.
%% peers/4
@@ -2966,14 +1464,14 @@ eq(Any, Id, PeerId) ->
%% transports/1
-transports(#state{peerT = PeerT}) ->
- ets:select(PeerT, [{#peer{conn = '$1', _ = '_'},
+transports(#state{watchdogT = WatchdogT}) ->
+ ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'},
[{'is_pid', '$1'}],
['$1']}]).
-%%% ---------------------------------------------------------------------------
-%%% # service_info/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # service_info/2
+%% ---------------------------------------------------------------------------
%% The config passed to diameter:start_service/2.
-define(CAP_INFO, ['Origin-Host',
@@ -3021,11 +1519,12 @@ tagged_info(Item, S)
undefined
end;
-tagged_info(TPid, #state{peerT = PT, connT = CT})
+tagged_info(TPid, #state{watchdogT = WatchdogT, peerT = PeerT})
when is_pid(TPid) ->
try
- [#conn{peer = Pid}] = ets:lookup(CT, TPid),
- [#peer{ref = Ref, type = Type, options = Opts}] = ets:lookup(PT, Pid),
+ [#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid),
+ [#watchdog{ref = Ref, type = Type, options = Opts}]
+ = ets:lookup(WatchdogT, Pid),
[{ref, Ref},
{type, Type},
{options, Opts}]
@@ -3108,11 +1607,11 @@ complete(Pre) ->
%% info_stats/1
-info_stats(#state{peerT = PeerT}) ->
- MatchSpec = [{#peer{ref = '$1', conn = '$2', _ = '_'},
+info_stats(#state{watchdogT = WatchdogT}) ->
+ MatchSpec = [{#watchdog{ref = '$1', peer = '$2', _ = '_'},
[{'is_pid', '$2'}],
[['$1', '$2']]}],
- try ets:select(PeerT, MatchSpec) of
+ try ets:select(WatchdogT, MatchSpec) of
L ->
diameter_stats:read(lists:append(L))
catch
@@ -3122,7 +1621,8 @@ info_stats(#state{peerT = PeerT}) ->
%% info_transport/1
%%
%% One entry per configured transport. Statistics for each entry are
-%% the accumulated values for the ref and associated peer pids.
+%% the accumulated values for the ref and associated watchdog/peer
+%% pids.
info_transport(S) ->
PeerD = peer_dict(S, config_dict(S)),
@@ -3155,43 +1655,42 @@ transport([[_,_] | L]) ->
%% Possibly many peer entries for a listening transport. Note that all
%% have the same options by construction, which is not terribly space
-%% efficient. (TODO: all entries for the same Ref should share options.)
+%% efficient.
transport([[{type, accept}, {options, Opts} | _] | _] = Ls) ->
[{type, listen},
{options, Opts},
{accept, [lists:nthtail(2,L) || L <- Ls]}].
-peer_dict(#state{peerT = PeerT, connT = ConnT}, Dict0) ->
- try ets:tab2list(PeerT) of
+peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) ->
+ try ets:tab2list(WatchdogT) of
L ->
- lists:foldl(fun(T,A) -> peer_acc(ConnT, A, T) end, Dict0, L)
+ lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L)
catch
error: badarg -> Dict0 %% service has gone down
end.
-peer_acc(ConnT, Acc, #peer{pid = Pid,
- type = Type,
- ref = Ref,
- options = Opts,
- op_state = OS,
- started = T,
- conn = TPid}) ->
- WS = wd_state(OS),
+peer_acc(PeerT, Acc, #watchdog{pid = Pid,
+ type = Type,
+ ref = Ref,
+ options = Opts,
+ state = WS,
+ started = At,
+ peer = TPid}) ->
dict:append(Ref,
[{type, Type},
{options, Opts},
- {watchdog, {Pid, T, WS}}
- | info_conn(ConnT, TPid, WS /= ?WD_DOWN)],
+ {watchdog, {Pid, At, WS}}
+ | info_peer(PeerT, TPid, WS)],
Acc).
-info_conn(ConnT, TPid, true)
- when is_pid(TPid) ->
- try ets:lookup(ConnT, TPid) of
- T -> info_conn(T)
+info_peer(PeerT, TPid, WS)
+ when is_pid(TPid), WS /= ?WD_DOWN ->
+ try ets:lookup(PeerT, TPid) of
+ T -> info_peer(T)
catch
error: badarg -> [] %% service has gone down
end;
-info_conn(_, _, _) ->
+info_peer(_, _, _) ->
[].
%% The point of extracting the config here is so that 'transport' info
@@ -3210,19 +1709,12 @@ config_acc({Ref, T, Opts}, Dict)
config_acc(_, Dict) ->
Dict.
-wd_state({_,S}) ->
- S;
-wd_state(?STATE_UP) ->
- ?WD_OKAY;
-wd_state(?STATE_DOWN) ->
- ?WD_DOWN.
-
-info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
+info_peer([#peer{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
[{peer, {Pid, T}},
{apps, SApps},
{caps, info_caps(Caps)}
| try [{port, info_port(Pid)}] catch _:_ -> [] end];
-info_conn([] = No) ->
+info_peer([] = No) ->
No.
%% Extract information that the processes involved are expected to
@@ -3256,22 +1748,7 @@ mk_app(#diameter_app{} = A) ->
%% One entry for each outgoing request whose answer is outstanding.
info_pending(#state{} = S) ->
- MatchSpec = [{{'$1',
- #request{transport = '$2',
- from = '$3',
- app = '$4',
- _ = '_'},
- '_'},
- [?ORCOND([{'==', T, '$2'} || T <- transports(S)])],
- [{{'$1', [{{app, '$4'}},
- {{transport, '$2'}},
- {{from, '$3'}}]}}]}],
-
- try
- ets:select(?REQUEST_TABLE, MatchSpec)
- catch
- error: badarg -> [] %% service has gone down
- end.
+ diameter_traffic:pending(transports(S)).
%% info_connections/1
%%
diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl
index 70727d068e..8fd5ded300 100644
--- a/lib/diameter/src/base/diameter_stats.erl
+++ b/lib/diameter/src/base/diameter_stats.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -136,7 +136,7 @@ read(Refs, B) ->
-spec flush([ref()])
-> [{ref(), {counter(), integer()}}].
-
+
flush(Refs) ->
try
call({flush, Refs})
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
new file mode 100644
index 0000000000..0de3825943
--- /dev/null
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -0,0 +1,1647 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2013. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% Implements the handling of incoming and outgoing Diameter messages
+%% except CER/CEA, DWR/DWA and DPR/DPA. That is, the messages that a
+%% diameter client sends of receives.
+%%
+
+-module(diameter_traffic).
+
+%% towards diameter
+-export([send_request/4]).
+
+%% towards diameter_watchdog
+-export([receive_message/4]).
+
+%% towards diameter_service
+-export([make_recvdata/1,
+ peer_up/1,
+ peer_down/1,
+ failover/1,
+ pending/1]).
+
+%% Other callbacks.
+-export([send/1]). %% send from remote node
+
+-include_lib("diameter/include/diameter.hrl").
+-include("diameter_internal.hrl").
+
+-define(RELAY, ?DIAMETER_DICT_RELAY).
+-define(BASE, ?DIAMETER_DICT_COMMON). %% Note: the RFC 3588 dictionary
+
+-define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests
+
+%% Table containing outgoing requests for which a reply has yet to be
+%% received.
+-define(REQUEST_TABLE, diameter_request).
+
+%% Workaround for dialyzer's lack of understanding of match specs.
+-type match(T)
+ :: T | '_' | '$1' | '$2' | '$3' | '$4'.
+
+%% Record diameter:call/4 options are parsed into.
+-record(options,
+ {filter = none :: diameter:peer_filter(),
+ extra = [] :: list(),
+ timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF,
+ detach = false :: boolean()}).
+
+%% Term passed back to receive_message/4 with every incoming message.
+-record(recvdata,
+ {peerT :: ets:tid(),
+ service_name :: diameter:service_name(),
+ apps :: [#diameter_app{}],
+ sequence :: diameter:sequence()}).
+
+%% Record stored in diameter_request for each outgoing request.
+-record(request,
+ {ref :: match(reference()), %% used to receive answer
+ caller :: match(pid()), %% calling process
+ handler :: match(pid()), %% request process
+ transport :: match(pid()), %% peer process
+ caps :: match(#diameter_caps{}), %% of connection
+ packet :: match(#diameter_packet{})}). %% of request
+
+%% ---------------------------------------------------------------------------
+%% # make_recvdata/1
+%% ---------------------------------------------------------------------------
+
+make_recvdata([SvcName, PeerT, Apps, Mask | _]) ->
+ #recvdata{service_name = SvcName,
+ peerT = PeerT,
+ apps = Apps,
+ sequence = Mask}.
+%% Take a list so that the caller (diameter_service) can be upgraded
+%% first if new members are added. Note that receive_message/4 might
+%% still get an old term from any watchdog started in old code.
+
+%% ---------------------------------------------------------------------------
+%% peer_up/1
+%% ---------------------------------------------------------------------------
+
+%% Insert an element that is used to detect whether or not there has
+%% been a failover when inserting an outgoing request.
+peer_up(TPid) ->
+ ets:insert(?REQUEST_TABLE, {TPid}).
+
+%% ---------------------------------------------------------------------------
+%% peer_down/1
+%% ---------------------------------------------------------------------------
+
+peer_down(TPid) ->
+ ets:delete(?REQUEST_TABLE, TPid),
+ failover(TPid).
+
+%% ---------------------------------------------------------------------------
+%% pending/1
+%% ---------------------------------------------------------------------------
+
+pending(TPids) ->
+ MatchSpec = [{{'$1',
+ #request{caller = '$2',
+ handler = '$3',
+ transport = '$4',
+ _ = '_'},
+ '_'},
+ [?ORCOND([{'==', T, '$4'} || T <- TPids])],
+ [{{'$1', [{{caller, '$2'}},
+ {{handler, '$3'}},
+ {{transport, '$4'}}]}}]}],
+
+ try
+ ets:select(?REQUEST_TABLE, MatchSpec)
+ catch
+ error: badarg -> [] %% service has gone down
+ end.
+
+%% ---------------------------------------------------------------------------
+%% # receive_message/4
+%%
+%% Handle an incoming Diameter message.
+%% ---------------------------------------------------------------------------
+
+%% Handle an incoming Diameter message in the watchdog process. This
+%% used to come through the service process but this avoids that
+%% becoming a bottleneck.
+
+receive_message(TPid, Pkt, Dict0, RecvData)
+ when is_pid(TPid) ->
+ #diameter_packet{header = #diameter_header{is_request = R}} = Pkt,
+ recv(R,
+ (not R) andalso lookup_request(Pkt, TPid),
+ TPid,
+ Pkt,
+ Dict0,
+ RecvData).
+
+%% Incoming request ...
+recv(true, false, TPid, Pkt, Dict0, RecvData) ->
+ try
+ spawn(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end)
+ catch
+ error: system_limit = E -> %% discard
+ ?LOG({error, E}, now())
+ end;
+
+%% ... answer to known request ...
+recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) ->
+ Pid ! {answer, Ref, Req, Dict0, Pkt};
+%% Note that failover could have happened prior to this message being
+%% received and triggering failback. That is, both a failover message
+%% and answer may be on their way to the handler process. In the worst
+%% case the request process gets notification of the failover and
+%% sends to the alternate peer before an answer arrives, so it's
+%% always the case that we can receive more than one answer after
+%% failover. The first answer received by the request process wins,
+%% any others are discarded.
+
+%% ... or not.
+recv(false, false, _, _, _, _) ->
+ ok.
+
+%% ---------------------------------------------------------------------------
+%% recv_request/4
+%% ---------------------------------------------------------------------------
+
+recv_request(TPid,
+ #diameter_packet{header = #diameter_header{application_id = Id}}
+ = Pkt,
+ Dict0,
+ #recvdata{peerT = PeerT, apps = Apps}
+ = RecvData) ->
+ recv_request(diameter_service:find_incoming_app(PeerT, TPid, Id, Apps),
+ TPid,
+ Pkt,
+ Dict0,
+ RecvData).
+
+%% recv_request/5
+
+recv_request({#diameter_app{id = Id, dictionary = Dict} = App, Caps},
+ TPid,
+ Pkt,
+ Dict0,
+ RecvData) ->
+ recv_R(App,
+ TPid,
+ Caps,
+ Dict0,
+ RecvData,
+ diameter_codec:decode(Id, Dict, Pkt));
+%% Note that the decode is different depending on whether or not Id is
+%% ?APP_ID_RELAY.
+
+%% DIAMETER_APPLICATION_UNSUPPORTED 3007
+%% A request was sent for an application that is not supported.
+
+recv_request(#diameter_caps{} = Caps, TPid, Pkt, Dict0, _) ->
+ As = collect_avps(Pkt),
+ protocol_error(3007, TPid, Caps, Dict0, Pkt#diameter_packet{avps = As});
+
+recv_request(false, _, _, _, _) -> %% transport has gone down
+ ok.
+
+collect_avps(Pkt) ->
+ case diameter_codec:collect_avps(Pkt) of
+ {_Bs, As} ->
+ As;
+ As ->
+ As
+ end.
+
+%% recv_R/6
+
+%% Wrong number of bits somewhere in the message: reply.
+%%
+%% DIAMETER_INVALID_AVP_BITS 3009
+%% A request was received that included an AVP whose flag bits are
+%% set to an unrecognized value, or that is inconsistent with the
+%% AVP's definition.
+%%
+recv_R(_App,
+ TPid,
+ Caps,
+ Dict0,
+ _RecvData,
+ #diameter_packet{errors = [Bs | _]} = Pkt)
+ when is_bitstring(Bs) ->
+ protocol_error(3009, TPid, Caps, Dict0, Pkt);
+
+%% Either we support this application but don't recognize the command
+%% or we're a relay and the command isn't proxiable.
+%%
+%% DIAMETER_COMMAND_UNSUPPORTED 3001
+%% The Request contained a Command-Code that the receiver did not
+%% recognize or support. This MUST be used when a Diameter node
+%% receives an experimental command that it does not understand.
+%%
+recv_R(#diameter_app{id = Id},
+ TPid,
+ Caps,
+ Dict0,
+ _RecvData,
+ #diameter_packet{header = #diameter_header{is_proxiable = P},
+ msg = M}
+ = Pkt)
+ when ?APP_ID_RELAY /= Id, undefined == M;
+ ?APP_ID_RELAY == Id, not P ->
+ protocol_error(3001, TPid, Caps, Dict0, Pkt);
+
+%% Error bit was set on a request.
+%%
+%% DIAMETER_INVALID_HDR_BITS 3008
+%% A request was received whose bits in the Diameter header were
+%% either set to an invalid combination, or to a value that is
+%% inconsistent with the command code's definition.
+%%
+recv_R(_App,
+ TPid,
+ Caps,
+ Dict0,
+ _RecvData,
+ #diameter_packet{header = #diameter_header{is_error = true}}
+ = Pkt) ->
+ protocol_error(3008, TPid, Caps, Dict0, Pkt);
+
+%% A message in a locally supported application or a proxiable message
+%% in the relay application. Don't distinguish between the two since
+%% each application has its own callback config. That is, the user can
+%% easily distinguish between the two cases.
+recv_R(App, TPid, Caps, Dict0, RecvData, Pkt) ->
+ request_cb(App, TPid, Caps, Dict0, RecvData, examine(Pkt)).
+
+%% Note that there may still be errors but these aren't protocol
+%% (3xxx) errors that lead to an answer-message.
+
+request_cb(App,
+ TPid,
+ Caps,
+ Dict0,
+ #recvdata{service_name = SvcName}
+ = RecvData,
+ Pkt) ->
+ request_cb(cb(App, handle_request, [Pkt, SvcName, {TPid, Caps}]),
+ App,
+ TPid,
+ Caps,
+ Dict0,
+ RecvData,
+ [],
+ Pkt).
+
+%% examine/1
+%%
+%% Look for errors in a decoded message. It's odd/unfortunate that
+%% 501[15] aren't protocol errors.
+
+%% DIAMETER_INVALID_MESSAGE_LENGTH 5015
+%%
+%% This error is returned when a request is received with an invalid
+%% message length.
+
+examine(#diameter_packet{header = #diameter_header{length = Len},
+ bin = Bin,
+ errors = Es}
+ = Pkt)
+ when Len < 20;
+ 0 /= Len rem 4;
+ 8*Len /= bit_size(Bin) ->
+ Pkt#diameter_packet{errors = [5015 | Es]};
+
+%% DIAMETER_UNSUPPORTED_VERSION 5011
+%% This error is returned when a request was received, whose version
+%% number is unsupported.
+
+examine(#diameter_packet{header = #diameter_header{version = V},
+ errors = Es}
+ = Pkt)
+ when V /= ?DIAMETER_VERSION ->
+ Pkt#diameter_packet{errors = [5011 | Es]};
+
+examine(Pkt) ->
+ Pkt.
+
+%% request_cb/8
+
+%% A reply may be an answer-message, constructed either here or by
+%% the handle_request callback. The header from the incoming request
+%% is passed into the encode so that it can retrieve the relevant
+%% command code in this case. It will also then ignore Dict and use
+%% the base encoder.
+request_cb({reply, Ans},
+ #diameter_app{dictionary = Dict},
+ TPid,
+ _Caps,
+ Dict0,
+ _RecvData,
+ Fs,
+ Pkt) ->
+ reply(Ans, dict(Dict, Dict0, Ans), TPid, Fs, Pkt);
+
+%% An 3xxx result code, for which the E-bit is set in the header.
+request_cb({protocol_error, RC},
+ _App,
+ TPid,
+ Caps,
+ Dict0,
+ _RecvData,
+ Fs,
+ Pkt)
+ when 3000 =< RC, RC < 4000 ->
+ protocol_error(RC, TPid, Caps, Dict0, Fs, Pkt);
+
+%% RFC 3588 says we must reply 3001 to anything unrecognized or
+%% unsupported. 'noreply' is undocumented (and inappropriately named)
+%% backwards compatibility for this, protocol_error the documented
+%% alternative.
+request_cb(noreply,
+ _App,
+ TPid,
+ Caps,
+ Dict0,
+ _RecvData,
+ Fs,
+ Pkt) ->
+ protocol_error(3001, TPid, Caps, Dict0, Fs, Pkt);
+
+%% Relay a request to another peer. This is equivalent to doing an
+%% explicit call/4 with the message in question except that (1) a loop
+%% will be detected by examining Route-Record AVP's, (3) a
+%% Route-Record AVP will be added to the outgoing request and (3) the
+%% End-to-End Identifier will default to that in the
+%% #diameter_header{} without the need for an end_to_end_identifier
+%% option.
+%%
+%% relay and proxy are similar in that they require the same handling
+%% with respect to Route-Record and End-to-End identifier. The
+%% difference is that a proxy advertises specific applications, while
+%% a relay advertises the relay application. If a callback doesn't
+%% want to distinguish between the cases in the callback return value
+%% then 'resend' is a neutral alternative.
+%%
+request_cb({A, Opts},
+ #diameter_app{id = Id}
+ = App,
+ TPid,
+ Caps,
+ Dict0,
+ RecvData,
+ Fs,
+ Pkt)
+ when A == relay, Id == ?APP_ID_RELAY;
+ A == proxy, Id /= ?APP_ID_RELAY;
+ A == resend ->
+ resend(Opts, App, TPid, Caps, Dict0, RecvData, Fs, Pkt);
+
+request_cb(discard, _, _, _, _, _, _, _) ->
+ ok;
+
+request_cb({eval_packet, RC, F}, App, TPid, Caps, Dict0, RecvData, Fs, Pkt) ->
+ request_cb(RC, App, TPid, Caps, Dict0, RecvData, [F|Fs], Pkt);
+
+request_cb({eval, RC, F}, App, TPid, Caps, Dict0, RecvData, Fs, Pkt) ->
+ request_cb(RC, App, TPid, Caps, Dict0, RecvData, Fs, Pkt),
+ diameter_lib:eval(F).
+
+%% dict/3
+
+%% An incoming answer, not yet decoded.
+dict(Dict, Dict0, #diameter_packet{header
+ = #diameter_header{is_request = false,
+ is_error = E},
+ msg = undefined}) ->
+ if E -> Dict0; true -> Dict end;
+
+dict(Dict, Dict0, [Msg]) ->
+ dict(Dict, Dict0, Msg);
+
+dict(Dict, Dict0, #diameter_packet{msg = Msg}) ->
+ dict(Dict, Dict0, Msg);
+
+dict(_Dict, Dict0, ['answer-message' | _]) ->
+ Dict0;
+
+dict(Dict, Dict0, Rec) ->
+ try
+ 'answer-message' = Dict0:rec2msg(element(1,Rec)),
+ Dict0
+ catch
+ error:_ -> Dict
+ end.
+
+%% protocol_error/6
+
+protocol_error(RC, TPid, Caps, Dict0, Fs, Pkt) ->
+ #diameter_caps{origin_host = {OH,_},
+ origin_realm = {OR,_}}
+ = Caps,
+ #diameter_packet{avps = Avps, errors = Es}
+ = Pkt,
+
+ ?LOG({error, RC}, Pkt),
+ reply(answer_message({OH, OR, RC}, Dict0, Avps),
+ Dict0,
+ TPid,
+ Fs,
+ Pkt#diameter_packet{errors = [RC | Es]}).
+%% Note that reply/5 may set the result code once more. It's set in
+%% answer_message/3 in case reply/5 doesn't.
+
+%% protocol_error/5
+
+protocol_error(RC, TPid, Caps, Dict0, Pkt) ->
+ protocol_error(RC, TPid, Caps, Dict0, [], Pkt).
+
+%% resend/7
+%%
+%% Resend a message as a relay or proxy agent.
+
+resend(Opts,
+ #diameter_app{}
+ = App,
+ TPid,
+ #diameter_caps{origin_host = {OH,_}}
+ = Caps,
+ Dict0,
+ RecvData,
+ Fs,
+ #diameter_packet{avps = Avps}
+ = Pkt) ->
+ {Code, _Flags, Vid} = Dict0:avp_header('Route-Record'),
+ resend(is_loop(Code, Vid, OH, Dict0, Avps),
+ Opts,
+ App,
+ TPid,
+ Caps,
+ Dict0,
+ RecvData,
+ Fs,
+ Pkt).
+
+%% DIAMETER_LOOP_DETECTED 3005
+%% An agent detected a loop while trying to get the message to the
+%% intended recipient. The message MAY be sent to an alternate peer,
+%% if one is available, but the peer reporting the error has
+%% identified a configuration problem.
+
+resend(true, _Opts, _App, TPid, Caps, Dict0, _RecvData, Fs, Pkt) ->
+ protocol_error(3005, TPid, Caps, Dict0, Fs, Pkt);
+
+%% 6.1.8. Relaying and Proxying Requests
+%%
+%% A relay or proxy agent MUST append a Route-Record AVP to all requests
+%% forwarded. The AVP contains the identity of the peer the request was
+%% received from.
+
+resend(false,
+ Opts,
+ App,
+ TPid,
+ #diameter_caps{origin_host = {_,OH}}
+ = Caps,
+ Dict0,
+ #recvdata{service_name = SvcName,
+ sequence = Mask},
+ Fs,
+ #diameter_packet{header = Hdr0,
+ avps = Avps}
+ = Pkt) ->
+ Route = #diameter_avp{data = {Dict0, 'Route-Record', OH}},
+ Seq = diameter_session:sequence(Mask),
+ Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq},
+ Msg = [Hdr, Route | Avps],
+ resend(send_request(SvcName, App, Msg, Opts), TPid, Caps, Dict0, Fs, Pkt).
+%% The incoming request is relayed with the addition of a
+%% Route-Record. Note the requirement on the return from call/4 below,
+%% which places a requirement on the value returned by the
+%% handle_answer callback of the application module in question.
+%%
+%% Note that there's nothing stopping the request from being relayed
+%% back to the sender. A pick_peer callback may want to avoid this but
+%% a smart peer might recognize the potential loop and choose another
+%% route. A less smart one will probably just relay the request back
+%% again and force us to detect the loop. A pick_peer that wants to
+%% avoid this can specify filter to avoid the possibility.
+%% Eg. {neg, {host, OH} where #diameter_caps{origin_host = {OH, _}}.
+%%
+%% RFC 6.3 says that a relay agent does not modify Origin-Host but
+%% says nothing about a proxy. Assume it should behave the same way.
+
+%% resend/6
+%%
+%% Relay a reply to a relayed request.
+
+%% Answer from the peer: reset the hop by hop identifier and send.
+resend(#diameter_packet{bin = B}
+ = Pkt,
+ TPid,
+ _Caps,
+ _Dict0,
+ Fs,
+ #diameter_packet{header = #diameter_header{hop_by_hop_id = Id},
+ transport_data = TD}) ->
+ P = Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B),
+ transport_data = TD},
+ eval_packet(P, Fs),
+ send(TPid, P);
+%% TODO: counters
+
+%% Or not: DIAMETER_UNABLE_TO_DELIVER.
+resend(_, TPid, Caps, Dict0, Fs, Pkt) ->
+ protocol_error(3002, TPid, Caps, Dict0, Fs, Pkt).
+
+%% is_loop/5
+%%
+%% Is there a Route-Record AVP with our Origin-Host?
+
+is_loop(Code,
+ Vid,
+ Bin,
+ _Dict0,
+ [#diameter_avp{code = Code, vendor_id = Vid, data = Bin} | _]) ->
+ true;
+
+is_loop(_, _, _, _, []) ->
+ false;
+
+is_loop(Code, Vid, OH, Dict0, [_ | Avps])
+ when is_binary(OH) ->
+ is_loop(Code, Vid, OH, Dict0, Avps);
+
+is_loop(Code, Vid, OH, Dict0, Avps) ->
+ is_loop(Code, Vid, Dict0:avp(encode, OH, 'Route-Record'), Dict0, Avps).
+
+%% reply/5
+%%
+%% Send a locally originating reply.
+
+%% Skip the setting of Result-Code and Failed-AVP's below. This is
+%% currently undocumented.
+reply([Msg], Dict, TPid, Fs, Pkt)
+ when is_list(Msg);
+ is_tuple(Msg) ->
+ reply(Msg, Dict, TPid, Fs, Pkt#diameter_packet{errors = []});
+
+%% No errors or a diameter_header/avp list.
+reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = Es} = ReqPkt)
+ when [] == Es;
+ is_record(hd(Msg), diameter_header) ->
+ Pkt = encode(Dict, make_answer_packet(Msg, ReqPkt), Fs),
+ incr(send, Pkt, Dict, TPid), %% count result codes in sent answers
+ send(TPid, Pkt);
+
+%% Or not: set Result-Code and Failed-AVP AVP's.
+reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = [H|_] = Es} = Pkt) ->
+ reply(rc(Msg, rc(H), [A || {_,A} <- Es], Dict),
+ Dict,
+ TPid,
+ Fs,
+ Pkt#diameter_packet{errors = []}).
+
+eval_packet(Pkt, Fs) ->
+ lists:foreach(fun(F) -> diameter_lib:eval([F,Pkt]) end, Fs).
+
+%% make_answer_packet/2
+
+%% A reply message clears the R and T flags and retains the P flag.
+%% The E flag will be set at encode. 6.2 of 3588 requires the same P
+%% flag on an answer as on the request. A #diameter_packet{} returned
+%% from a handle_request callback can circumvent this by setting its
+%% own header values.
+make_answer_packet(#diameter_packet{header = Hdr,
+ msg = Msg,
+ transport_data = TD},
+ #diameter_packet{header = ReqHdr}) ->
+ Hdr0 = ReqHdr#diameter_header{version = ?DIAMETER_VERSION,
+ is_request = false,
+ is_error = undefined,
+ is_retransmitted = false},
+ #diameter_packet{header = fold_record(Hdr0, Hdr),
+ msg = Msg,
+ transport_data = TD};
+
+%% Binaries and header/avp lists are sent as-is.
+make_answer_packet(Bin, #diameter_packet{transport_data = TD})
+ when is_binary(Bin) ->
+ #diameter_packet{bin = Bin,
+ transport_data = TD};
+make_answer_packet([#diameter_header{} | _] = Msg,
+ #diameter_packet{transport_data = TD}) ->
+ #diameter_packet{msg = Msg,
+ transport_data = TD};
+
+%% Otherwise, preserve transport_data.
+make_answer_packet(Msg, #diameter_packet{transport_data = TD} = Pkt) ->
+ make_answer_packet(#diameter_packet{msg = Msg, transport_data = TD}, Pkt).
+
+%% rc/1
+
+rc({RC, _}) ->
+ RC;
+rc(RC) ->
+ RC.
+
+%% rc/4
+
+rc(#diameter_packet{msg = Rec} = Pkt, RC, Failed, DictT) ->
+ Pkt#diameter_packet{msg = rc(Rec, RC, Failed, DictT)};
+
+rc(Rec, RC, Failed, DictT)
+ when is_integer(RC) ->
+ set(Rec,
+ lists:append([rc(Rec, {'Result-Code', RC}, DictT),
+ failed_avp(Rec, Failed, DictT)]),
+ DictT).
+
+%% Reply as name and tuple list ...
+set([_|_] = Ans, Avps, _) ->
+ Ans ++ Avps; %% Values nearer tail take precedence.
+
+%% ... or record.
+set(Rec, Avps, Dict) ->
+ Dict:'#set-'(Avps, Rec).
+
+%% rc/3
+%%
+%% Turn the result code into a list if its optional and only set it if
+%% the arity is 1 or {0,1}. In other cases (which probably shouldn't
+%% exist in practise) we can't know what's appropriate.
+
+rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) ->
+ case Dict:avp_arity(MsgName, 'Result-Code') of
+ 1 -> [T];
+ {0,1} -> [{K, [RC]}];
+ _ -> []
+ end;
+
+rc(Rec, T, Dict) ->
+ rc([Dict:rec2msg(element(1, Rec))], T, Dict).
+
+%% failed_avp/3
+
+failed_avp(_, [] = No, _) ->
+ No;
+
+failed_avp(Rec, Failed, Dict) ->
+ [fa(Rec, [{'AVP', Failed}], Dict)].
+
+%% Reply as name and tuple list ...
+fa([MsgName | Values], FailedAvp, Dict) ->
+ R = Dict:msg2rec(MsgName),
+ try
+ Dict:'#info-'(R, {index, 'Failed-AVP'}),
+ {'Failed-AVP', [FailedAvp]}
+ catch
+ error: _ ->
+ Avps = proplists:get_value('AVP', Values, []),
+ A = #diameter_avp{name = 'Failed-AVP',
+ value = FailedAvp},
+ {'AVP', [A|Avps]}
+ end;
+
+%% ... or record.
+fa(Rec, FailedAvp, Dict) ->
+ try
+ {'Failed-AVP', [FailedAvp]}
+ catch
+ error: _ ->
+ Avps = Dict:'get-'('AVP', Rec),
+ A = #diameter_avp{name = 'Failed-AVP',
+ value = FailedAvp},
+ {'AVP', [A|Avps]}
+ end.
+
+%% 3. Diameter Header
+%%
+%% E(rror) - If set, the message contains a protocol error,
+%% and the message will not conform to the ABNF
+%% described for this command. Messages with the 'E'
+%% bit set are commonly referred to as error
+%% messages. This bit MUST NOT be set in request
+%% messages. See Section 7.2.
+
+%% 3.2. Command Code ABNF specification
+%%
+%% e-bit = ", ERR"
+%% ; If present, the 'E' bit in the Command
+%% ; Flags is set, indicating that the answer
+%% ; message contains a Result-Code AVP in
+%% ; the "protocol error" class.
+
+%% 7.1.3. Protocol Errors
+%%
+%% Errors that fall within the Protocol Error category SHOULD be treated
+%% on a per-hop basis, and Diameter proxies MAY attempt to correct the
+%% error, if it is possible. Note that these and only these errors MUST
+%% only be used in answer messages whose 'E' bit is set.
+
+%% Thus, only construct answers to protocol errors. Other errors
+%% require an message-specific answer and must be handled by the
+%% application.
+
+%% 6.2. Diameter Answer Processing
+%%
+%% When a request is locally processed, the following procedures MUST be
+%% applied to create the associated answer, in addition to any
+%% additional procedures that MAY be discussed in the Diameter
+%% application defining the command:
+%%
+%% - The same Hop-by-Hop identifier in the request is used in the
+%% answer.
+%%
+%% - The local host's identity is encoded in the Origin-Host AVP.
+%%
+%% - The Destination-Host and Destination-Realm AVPs MUST NOT be
+%% present in the answer message.
+%%
+%% - The Result-Code AVP is added with its value indicating success or
+%% failure.
+%%
+%% - If the Session-Id is present in the request, it MUST be included
+%% in the answer.
+%%
+%% - Any Proxy-Info AVPs in the request MUST be added to the answer
+%% message, in the same order they were present in the request.
+%%
+%% - The 'P' bit is set to the same value as the one in the request.
+%%
+%% - The same End-to-End identifier in the request is used in the
+%% answer.
+%%
+%% Note that the error messages (see Section 7.3) are also subjected to
+%% the above processing rules.
+
+%% 7.3. Error-Message AVP
+%%
+%% The Error-Message AVP (AVP Code 281) is of type UTF8String. It MAY
+%% accompany a Result-Code AVP as a human readable error message. The
+%% Error-Message AVP is not intended to be useful in real-time, and
+%% SHOULD NOT be expected to be parsed by network entities.
+
+%% answer_message/3
+
+answer_message({OH, OR, RC}, Dict0, Avps) ->
+ {Code, _, Vid} = Dict0:avp_header('Session-Id'),
+ ['answer-message', {'Origin-Host', OH},
+ {'Origin-Realm', OR},
+ {'Result-Code', RC}
+ | session_id(Code, Vid, Dict0, Avps)].
+
+session_id(Code, Vid, Dict0, Avps)
+ when is_list(Avps) ->
+ try
+ {value, #diameter_avp{data = D}} = find_avp(Code, Vid, Avps),
+ [{'Session-Id', [Dict0:avp(decode, D, 'Session-Id')]}]
+ catch
+ error: _ ->
+ []
+ end.
+
+%% find_avp/3
+
+find_avp(Code, Vid, Avps)
+ when is_integer(Code), (undefined == Vid orelse is_integer(Vid)) ->
+ find(fun(A) -> is_avp(Code, Vid, A) end, Avps).
+
+%% The final argument here could be a list of AVP's, depending on the case,
+%% but we're only searching at the top level.
+is_avp(Code, Vid, #diameter_avp{code = Code, vendor_id = Vid}) ->
+ true;
+is_avp(_, _, _) ->
+ false.
+
+find(_, []) ->
+ false;
+find(Pred, [H|T]) ->
+ case Pred(H) of
+ true ->
+ {value, H};
+ false ->
+ find(Pred, T)
+ end.
+
+%% 7. Error Handling
+%%
+%% There are certain Result-Code AVP application errors that require
+%% additional AVPs to be present in the answer. In these cases, the
+%% Diameter node that sets the Result-Code AVP to indicate the error
+%% MUST add the AVPs. Examples are:
+%%
+%% - An unrecognized AVP is received with the 'M' bit (Mandatory bit)
+%% set, causes an answer to be sent with the Result-Code AVP set to
+%% DIAMETER_AVP_UNSUPPORTED, and the Failed-AVP AVP containing the
+%% offending AVP.
+%%
+%% - An AVP that is received with an unrecognized value causes an
+%% answer to be returned with the Result-Code AVP set to
+%% DIAMETER_INVALID_AVP_VALUE, with the Failed-AVP AVP containing the
+%% AVP causing the error.
+%%
+%% - A command is received with an AVP that is omitted, yet is
+%% mandatory according to the command's ABNF. The receiver issues an
+%% answer with the Result-Code set to DIAMETER_MISSING_AVP, and
+%% creates an AVP with the AVP Code and other fields set as expected
+%% in the missing AVP. The created AVP is then added to the Failed-
+%% AVP AVP.
+%%
+%% The Result-Code AVP describes the error that the Diameter node
+%% encountered in its processing. In case there are multiple errors,
+%% the Diameter node MUST report only the first error it encountered
+%% (detected possibly in some implementation dependent order). The
+%% specific errors that can be described by this AVP are described in
+%% the following section.
+
+%% 7.5. Failed-AVP AVP
+%%
+%% The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides
+%% debugging information in cases where a request is rejected or not
+%% fully processed due to erroneous information in a specific AVP. The
+%% value of the Result-Code AVP will provide information on the reason
+%% for the Failed-AVP AVP.
+%%
+%% The possible reasons for this AVP are the presence of an improperly
+%% constructed AVP, an unsupported or unrecognized AVP, an invalid AVP
+%% value, the omission of a required AVP, the presence of an explicitly
+%% excluded AVP (see tables in Section 10), or the presence of two or
+%% more occurrences of an AVP which is restricted to 0, 1, or 0-1
+%% occurrences.
+%%
+%% A Diameter message MAY contain one Failed-AVP AVP, containing the
+%% entire AVP that could not be processed successfully. If the failure
+%% reason is omission of a required AVP, an AVP with the missing AVP
+%% code, the missing vendor id, and a zero filled payload of the minimum
+%% required length for the omitted AVP will be added.
+
+%% incr/4
+%%
+%% Increment a stats counter for an incoming or outgoing message.
+
+%% Outgoing message as binary: don't count. (Sending binaries is only
+%% partially supported.)
+incr(_, #diameter_packet{msg = undefined}, _, _) ->
+ ok;
+
+incr(recv = D, #diameter_packet{header = H, errors = [_|_]}, _, TPid) ->
+ incr(TPid, {diameter_codec:msg_id(H), D, error});
+
+incr(Dir, Pkt, Dict, TPid) ->
+ #diameter_packet{header = #diameter_header{is_error = E}
+ = Hdr,
+ msg = Rec}
+ = Pkt,
+
+ RC = int(get_avp_value(Dict, 'Result-Code', Rec)),
+ PE = is_protocol_error(RC),
+
+ %% Check that the E bit is set only for 3xxx result codes.
+ (not (E orelse PE))
+ orelse (E andalso PE)
+ orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]),
+
+ irc(TPid, Hdr, Dir, rc_counter(Dict, Rec, RC)).
+
+irc(_, _, _, undefined) ->
+ false;
+
+irc(TPid, Hdr, Dir, Ctr) ->
+ incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}).
+
+%% incr/2
+
+incr(TPid, Counter) ->
+ diameter_stats:incr(Counter, TPid, 1).
+
+%% error_counter/2
+
+%% RFC 3588, 7.6:
+%%
+%% All Diameter answer messages defined in vendor-specific
+%% applications MUST include either one Result-Code AVP or one
+%% Experimental-Result AVP.
+%%
+%% Maintain statistics assuming one or the other, not both, which is
+%% surely the intent of the RFC.
+
+rc_counter(Dict, Rec, undefined) ->
+ rcc(get_avp_value(Dict, 'Experimental-Result', Rec));
+rc_counter(_, _, RC) ->
+ {'Result-Code', RC}.
+
+%% Outgoing answers may be in any of the forms messages can be sent
+%% in. Incoming messages will be records. We're assuming here that the
+%% arity of the result code AVP's is 0 or 1.
+
+rcc([{_,_,N} = T | _])
+ when is_integer(N) ->
+ T;
+rcc({_,_,N} = T)
+ when is_integer(N) ->
+ T;
+rcc(_) ->
+ undefined.
+
+%% Extract the first good looking integer. There's no guarantee
+%% that what we're looking for has arity 1.
+int([N|_])
+ when is_integer(N) ->
+ N;
+int(N)
+ when is_integer(N) ->
+ N;
+int(_) ->
+ undefined.
+
+is_protocol_error(RC) ->
+ 3000 =< RC andalso RC < 4000.
+
+-spec x(any(), atom(), list()) -> no_return().
+
+%% Warn and exit request process on errors in an incoming answer.
+x(Reason, F, A) ->
+ diameter_lib:warning_report(Reason, {?MODULE, F, A}),
+ x(Reason).
+
+x(T) ->
+ exit(T).
+
+%% ---------------------------------------------------------------------------
+%% # send_request/4
+%%
+%% Handle an outgoing Diameter request.
+%% ---------------------------------------------------------------------------
+
+send_request(SvcName, AppOrAlias, Msg, Options)
+ when is_list(Options) ->
+ Rec = make_options(Options),
+ Ref = make_ref(),
+ Caller = {self(), Ref},
+ ReqF = fun() ->
+ exit({Ref, send_R(SvcName, AppOrAlias, Msg, Rec, Caller)})
+ end,
+ try spawn_monitor(ReqF) of
+ {_, MRef} ->
+ recv_A(MRef, Ref, Rec#options.detach, false)
+ catch
+ error: system_limit = E ->
+ {error, E}
+ end.
+%% The R in send_R is because Diameter request are usually given short
+%% names of the form XXR. (eg. CER, DWR, etc.) Similarly, answers have
+%% names of the form XXA.
+
+%% Don't rely on gen_server:call/3 for the timeout handling since it
+%% makes no guarantees about not leaving a reply message in the
+%% mailbox if we catch its exit at timeout. It currently *can* do so,
+%% which is also undocumented.
+
+recv_A(MRef, _, true, true) ->
+ erlang:demonitor(MRef, [flush]),
+ ok;
+
+recv_A(MRef, Ref, Detach, Sent) ->
+ receive
+ Ref -> %% send has been attempted
+ recv_A(MRef, Ref, Detach, true);
+ {'DOWN', MRef, process, _, Reason} ->
+ answer_rc(Reason, Ref, Sent)
+ end.
+
+%% send_R/5 has returned ...
+answer_rc({Ref, Ans}, Ref, _) ->
+ Ans;
+
+%% ... or not. Note that failure/encode are documented return values.
+answer_rc(_, _, Sent) ->
+ {error, choose(Sent, failure, encode)}.
+
+%% send_R/5
+%%
+%% In the process spawned for the outgoing request.
+
+send_R(SvcName, AppOrAlias, Msg, Opts, Caller) ->
+ case pick_peer(SvcName, AppOrAlias, Msg, Opts) of
+ {{_,_,_} = Transport, Mask} ->
+ send_request(Transport, Mask, Msg, Opts, Caller, SvcName);
+ false ->
+ {error, no_connection};
+ {error, _} = No ->
+ No
+ end.
+
+%% make_options/1
+
+make_options(Options) ->
+ lists:foldl(fun mo/2, #options{}, Options).
+
+mo({timeout, T}, Rec)
+ when is_integer(T), 0 =< T ->
+ Rec#options{timeout = T};
+
+mo({filter, F}, #options{filter = none} = Rec) ->
+ Rec#options{filter = F};
+mo({filter, F}, #options{filter = {all, Fs}} = Rec) ->
+ Rec#options{filter = {all, [F | Fs]}};
+mo({filter, F}, #options{filter = F0} = Rec) ->
+ Rec#options{filter = {all, [F0, F]}};
+
+mo({extra, L}, #options{extra = X} = Rec)
+ when is_list(L) ->
+ Rec#options{extra = X ++ L};
+
+mo(detach, Rec) ->
+ Rec#options{detach = true};
+
+mo(T, _) ->
+ ?ERROR({invalid_option, T}).
+
+%% ---------------------------------------------------------------------------
+%% # send_request/6
+%% ---------------------------------------------------------------------------
+
+%% Send an outgoing request in its dedicated process.
+%%
+%% Note that both encode of the outgoing request and of the received
+%% answer happens in this process. It's also this process that replies
+%% to the caller. The service process only handles the state-retaining
+%% callbacks.
+%%
+%% The module field of the #diameter_app{} here includes any extra
+%% arguments passed to diameter:call/4.
+
+send_request({TPid, Caps, App}
+ = Transport,
+ Mask,
+ Msg,
+ Opts,
+ Caller,
+ SvcName) ->
+ Pkt = make_prepare_packet(Mask, Msg),
+
+ send_R(cb(App, prepare_request, [Pkt, SvcName, {TPid, Caps}]),
+ Pkt,
+ Transport,
+ Opts,
+ Caller,
+ SvcName,
+ []).
+
+send_R({send, Msg}, Pkt, Transport, Opts, Caller, SvcName, Fs) ->
+ send_R(make_request_packet(Msg, Pkt),
+ Transport,
+ Opts,
+ Caller,
+ SvcName,
+ Fs);
+
+send_R({discard, Reason} , _, _, _, _, _, _) ->
+ {error, Reason};
+
+send_R(discard, _, _, _, _, _, _) ->
+ {error, discarded};
+
+send_R({eval_packet, RC, F}, Pkt, T, Opts, Caller, SvcName, Fs) ->
+ send_R(RC, Pkt, T, Opts, Caller, SvcName, [F|Fs]);
+
+send_R(E, _, {_, _, App}, _, _, _, _) ->
+ ?ERROR({invalid_return, prepare_request, App, E}).
+
+%% make_prepare_packet/2
+%%
+%% Turn an outgoing request as passed to call/4 into a diameter_packet
+%% record in preparation for a prepare_request callback.
+
+make_prepare_packet(_, Bin)
+ when is_binary(Bin) ->
+ #diameter_packet{header = diameter_codec:decode_header(Bin),
+ bin = Bin};
+
+make_prepare_packet(Mask, #diameter_packet{msg = [#diameter_header{} = Hdr
+ | Avps]}
+ = Pkt) ->
+ Pkt#diameter_packet{msg = [make_prepare_header(Mask, Hdr) | Avps]};
+
+make_prepare_packet(Mask, #diameter_packet{header = Hdr} = Pkt) ->
+ Pkt#diameter_packet{header = make_prepare_header(Mask, Hdr)};
+
+make_prepare_packet(Mask, Msg) ->
+ make_prepare_packet(Mask, #diameter_packet{msg = Msg}).
+
+%% make_prepare_header/2
+
+make_prepare_header(Mask, undefined) ->
+ Seq = diameter_session:sequence(Mask),
+ make_prepare_header(#diameter_header{end_to_end_id = Seq,
+ hop_by_hop_id = Seq});
+
+make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined,
+ hop_by_hop_id = undefined}
+ = H) ->
+ Seq = diameter_session:sequence(Mask),
+ make_prepare_header(H#diameter_header{end_to_end_id = Seq,
+ hop_by_hop_id = Seq});
+
+make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined} = H) ->
+ Seq = diameter_session:sequence(Mask),
+ make_prepare_header(H#diameter_header{end_to_end_id = Seq});
+
+make_prepare_header(Mask, #diameter_header{hop_by_hop_id = undefined} = H) ->
+ Seq = diameter_session:sequence(Mask),
+ make_prepare_header(H#diameter_header{hop_by_hop_id = Seq});
+
+make_prepare_header(_, Hdr) ->
+ make_prepare_header(Hdr).
+
+%% make_prepare_header/1
+
+make_prepare_header(#diameter_header{version = undefined} = Hdr) ->
+ make_prepare_header(Hdr#diameter_header{version = ?DIAMETER_VERSION});
+
+make_prepare_header(#diameter_header{} = Hdr) ->
+ Hdr;
+
+make_prepare_header(T) ->
+ ?ERROR({invalid_header, T}).
+
+%% make_request_packet/2
+%%
+%% Reconstruct a diameter_packet from the return value of
+%% prepare_request or prepare_retransmit callback.
+
+make_request_packet(Bin, _)
+ when is_binary(Bin) ->
+ make_prepare_packet(false, Bin);
+
+make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]}
+ = Pkt,
+ _) ->
+ Pkt;
+
+%% Returning a diameter_packet with no header from a prepare_request
+%% or prepare_retransmit callback retains the header passed into it.
+%% This is primarily so that the end to end and hop by hop identifiers
+%% are retained.
+make_request_packet(#diameter_packet{header = Hdr} = Pkt,
+ #diameter_packet{header = Hdr0}) ->
+ Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)};
+
+make_request_packet(Msg, Pkt) ->
+ Pkt#diameter_packet{msg = Msg}.
+
+%% fold_record/2
+
+fold_record(undefined, R) ->
+ R;
+fold_record(Rec, R) ->
+ diameter_lib:fold_tuple(2, Rec, R).
+
+%% send_R/6
+
+send_R(Pkt0,
+ {TPid, Caps, #diameter_app{dictionary = Dict} = App},
+ Opts,
+ {Pid, Ref},
+ SvcName,
+ Fs) ->
+ Pkt = encode(Dict, Pkt0, Fs),
+
+ #options{timeout = Timeout}
+ = Opts,
+
+ Req = #request{ref = Ref,
+ caller = Pid,
+ handler = self(),
+ transport = TPid,
+ caps = Caps,
+ packet = Pkt0},
+
+ try
+ TRef = send_request(TPid, Pkt, Req, SvcName, Timeout),
+ Pid ! Ref, %% tell caller a send has been attempted
+ handle_answer(SvcName,
+ App,
+ recv_A(Timeout, SvcName, App, Opts, {TRef, Req}))
+ after
+ erase_requests(Pkt)
+ end.
+
+%% recv_A/5
+
+recv_A(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} = Req}) ->
+ %% Matching on TRef below ensures we ignore messages that pertain
+ %% to a previous transport prior to failover. The answer message
+ %% includes the #request{} since it's not necessarily Req; that
+ %% is, from the last peer to which we've transmitted.
+ receive
+ {answer = A, Ref, Rq, Dict0, Pkt} -> %% Answer from peer
+ {A, Rq, Dict0, Pkt};
+ {timeout = Reason, TRef, _} -> %% No timely reply
+ {error, Req, Reason};
+ {failover, TRef} -> %% Service says peer has gone down
+ retransmit(pick_peer(SvcName, App, Req, Opts),
+ Req,
+ Opts,
+ SvcName,
+ Timeout)
+ end.
+
+%% handle_answer/3
+
+handle_answer(SvcName, App, {error, Req, Reason}) ->
+ handle_error(App, Req, Reason, SvcName);
+
+handle_answer(SvcName,
+ #diameter_app{dictionary = Dict}
+ = App,
+ {answer, Req, Dict0, Pkt}) ->
+ Mod = dict(Dict, Dict0, Pkt),
+ answer(examine(diameter_codec:decode(Mod, Pkt)),
+ SvcName,
+ Mod,
+ App,
+ Req).
+
+%% We don't really need to do a full decode if we're a relay and will
+%% just resend with a new hop by hop identifier, but might a proxy
+%% want to examine the answer?
+
+answer(Pkt, SvcName, Dict, App, #request{transport = TPid} = Req) ->
+ try
+ incr(recv, Pkt, Dict, TPid)
+ of
+ _ -> answer(Pkt, SvcName, App, Req)
+ catch
+ exit: {invalid_error_bit, _} = E ->
+ answer(Pkt#diameter_packet{errors = [E]}, SvcName, App, Req)
+ end.
+
+answer(Pkt,
+ SvcName,
+ #diameter_app{module = ModX,
+ options = [{answer_errors, AE} | _]},
+ Req) ->
+ a(Pkt, SvcName, ModX, AE, Req).
+
+a(#diameter_packet{errors = Es}
+ = Pkt,
+ SvcName,
+ ModX,
+ AE,
+ #request{transport = TPid,
+ caps = Caps,
+ packet = P})
+ when [] == Es;
+ callback == AE ->
+ cb(ModX, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]);
+
+a(Pkt, SvcName, _, report, Req) ->
+ x(errors, handle_answer, [SvcName, Req, Pkt]);
+
+a(Pkt, SvcName, _, discard, Req) ->
+ x({errors, handle_answer, [SvcName, Req, Pkt]}).
+
+%% Note that we don't check that the application id in the answer's
+%% header is what we expect. (TODO: Does the rfc says anything about
+%% this?)
+
+%% Note that failover starts a new timer and that expiry of an old
+%% timer value is ignored. This means that an answer could be accepted
+%% from a peer after timeout in the case of failover.
+
+retransmit({{_,_,App} = Transport, _Mask}, Req, Opts, SvcName, Timeout) ->
+ try retransmit(Transport, Req, SvcName, Timeout) of
+ T -> recv_A(Timeout, SvcName, App, Opts, T)
+ catch
+ ?FAILURE(Reason) -> {error, Req, Reason}
+ end;
+
+retransmit(_, Req, _, _, _) -> %% no alternate peer
+ {error, Req, failover}.
+
+%% pick_peer/4
+
+%% Retransmission after failover: call-specific arguments have already
+%% been appended in App.
+pick_peer(SvcName,
+ App,
+ #request{packet = #diameter_packet{msg = Msg}},
+ Opts) ->
+ pick_peer(SvcName, App, Msg, Opts#options{extra = []});
+
+pick_peer(_, _, undefined, _) ->
+ false;
+
+pick_peer(SvcName,
+ AppOrAlias,
+ Msg,
+ #options{filter = Filter, extra = Xtra}) ->
+ diameter_service:pick_peer(SvcName,
+ AppOrAlias,
+ {fun(D) -> get_destination(D, Msg) end,
+ Filter,
+ Xtra}).
+
+%% handle_error/4
+
+handle_error(App,
+ #request{packet = Pkt,
+ transport = TPid,
+ caps = Caps},
+ Reason,
+ SvcName) ->
+ cb(App, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]).
+
+msg(#diameter_packet{msg = undefined, bin = Bin}) ->
+ Bin;
+msg(#diameter_packet{msg = Msg}) ->
+ Msg.
+
+%% encode/3
+
+encode(Dict, Pkt, Fs) ->
+ P = encode(Dict, Pkt),
+ eval_packet(P, Fs),
+ P.
+
+%% encode/2
+
+%% Note that prepare_request can return a diameter_packet containing a
+%% header or transport_data. Even allow the returned record to contain
+%% an encoded binary. This isn't the usual case and doesn't properly
+%% support retransmission but is useful for test.
+
+%% A message to be encoded.
+encode(Dict, #diameter_packet{bin = undefined} = Pkt) ->
+ diameter_codec:encode(Dict, Pkt);
+
+%% An encoded binary: just send.
+encode(_, #diameter_packet{} = Pkt) ->
+ Pkt.
+
+%% send_request/5
+
+send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout)
+ when node() == node(TPid) ->
+ %% Store the outgoing request before sending to avoid a race with
+ %% reply reception.
+ TRef = store_request(TPid, Bin, Req, Timeout),
+ send(TPid, Pkt),
+ TRef;
+
+%% Send using a remote transport: spawn a process on the remote node
+%% to relay the answer.
+send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) ->
+ TRef = erlang:start_timer(Timeout, self(), TPid),
+ T = {TPid, Pkt, Req, SvcName, Timeout, TRef},
+ spawn(node(TPid), ?MODULE, send, [T]),
+ TRef.
+
+%% send/1
+
+send({TPid, Pkt, #request{handler = Pid} = Req, SvcName, Timeout, TRef}) ->
+ Ref = send_request(TPid,
+ Pkt,
+ Req#request{handler = self()},
+ SvcName,
+ Timeout),
+ Pid ! reref(receive T -> T end, Ref, TRef).
+
+reref({T, Ref, R}, Ref, TRef) ->
+ {T, TRef, R};
+reref(T, _, _) ->
+ T.
+
+%% send/2
+
+send(Pid, Pkt) ->
+ Pid ! {send, Pkt}.
+
+%% retransmit/4
+
+retransmit({TPid, Caps, App}
+ = Transport,
+ #request{packet = Pkt0}
+ = Req,
+ SvcName,
+ Timeout) ->
+ have_request(Pkt0, TPid) %% Don't failover to a peer we've
+ andalso ?THROW(timeout), %% already sent to.
+
+ #diameter_packet{header = Hdr0} = Pkt0,
+ Hdr = Hdr0#diameter_header{is_retransmitted = true},
+ Pkt = Pkt0#diameter_packet{header = Hdr},
+
+ retransmit(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]),
+ Transport,
+ Req#request{packet = Pkt},
+ SvcName,
+ Timeout,
+ []).
+
+retransmit({send, Msg},
+ Transport,
+ #request{packet = Pkt}
+ = Req,
+ SvcName,
+ Timeout,
+ Fs) ->
+ resend_request(make_request_packet(Msg, Pkt),
+ Transport,
+ Req,
+ SvcName,
+ Timeout,
+ Fs);
+
+retransmit({discard, Reason}, _, _, _, _, _) ->
+ ?THROW(Reason);
+
+retransmit(discard, _, _, _, _, _) ->
+ ?THROW(discarded);
+
+retransmit({eval_packet, RC, F}, Transport, Req, SvcName, Timeout, Fs) ->
+ retransmit(RC, Transport, Req, SvcName, Timeout, [F|Fs]);
+
+retransmit(T, {_, _, App}, _, _, _, _) ->
+ ?ERROR({invalid_return, prepare_retransmit, App, T}).
+
+resend_request(Pkt0,
+ {TPid, Caps, #diameter_app{dictionary = Dict}},
+ Req0,
+ SvcName,
+ Tmo,
+ Fs) ->
+ Pkt = encode(Dict, Pkt0, Fs),
+
+ Req = Req0#request{transport = TPid,
+ packet = Pkt0,
+ caps = Caps},
+
+ ?LOG(retransmission, Req),
+ TRef = send_request(TPid, Pkt, Req, SvcName, Tmo),
+ {TRef, Req}.
+
+%% store_request/4
+
+store_request(TPid, Bin, Req, Timeout) ->
+ Seqs = diameter_codec:sequence_numbers(Bin),
+ TRef = erlang:start_timer(Timeout, self(), timeout),
+ ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}),
+ ets:member(?REQUEST_TABLE, TPid)
+ orelse (self() ! {failover, TRef}), %% failover/1 may have missed
+ TRef.
+
+%% lookup_request/2
+
+lookup_request(Msg, TPid) ->
+ Seqs = diameter_codec:sequence_numbers(Msg),
+ Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, '_'},
+ [],
+ ['$_']}],
+ case ets:select(?REQUEST_TABLE, Spec) of
+ [{_, Req, _}] ->
+ Req;
+ [] ->
+ false
+ end.
+
+%% erase_requests/1
+
+erase_requests(Pkt) ->
+ ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)).
+
+%% match_requests/1
+
+match_requests(TPid) ->
+ Pat = {'_', #request{transport = TPid, _ = '_'}, '_'},
+ ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}]).
+
+%% have_request/2
+
+have_request(Pkt, TPid) ->
+ Seqs = diameter_codec:sequence_numbers(Pkt),
+ Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'},
+ '$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1).
+
+%% ---------------------------------------------------------------------------
+%% # failover/1-2
+%% ---------------------------------------------------------------------------
+
+failover(TPid)
+ when is_pid(TPid) ->
+ lists:foreach(fun failover/1, match_requests(TPid));
+%% Note that a request process can store its request after failover
+%% notifications are sent here: store_request/4 sends the notification
+%% in that case.
+
+%% Failover as a consequence of request_peer_down/1: inform the
+%% request process.
+failover({_, Req, TRef}) ->
+ #request{handler = Pid,
+ packet = #diameter_packet{msg = M}}
+ = Req,
+ M /= undefined andalso (Pid ! {failover, TRef}).
+%% Failover is not performed when msg = binary() since sending
+%% pre-encoded binaries is only partially supported. (Mostly for
+%% test.)
+
+%% get_destination/2
+
+get_destination(Dict, Msg) ->
+ [str(get_avp_value(Dict, D, Msg)) || D <- ['Destination-Realm',
+ 'Destination-Host']].
+
+%% This is not entirely correct. The avp could have an arity 1, in
+%% which case an empty list is a DiameterIdentity of length 0 rather
+%% than the list of no values we treat it as by mapping to undefined.
+%% This behaviour is documented.
+str([]) ->
+ undefined;
+str(T) ->
+ T.
+
+%% get_avp_value/3
+%%
+%% Find an AVP in a message of one of three forms:
+%%
+%% - a message record (as generated from a .dia spec) or
+%% - a list of an atom message name followed by 2-tuple, avp name/value pairs.
+%% - a list of a #diameter_header{} followed by #diameter_avp{} records,
+%%
+%% In the first two forms a dictionary module is used at encode to
+%% identify the type of the AVP and its arity in the message in
+%% question. The third form allows messages to be sent as is, without
+%% a dictionary, which is needed in the case of relay agents, for one.
+
+%% Messages will be header/avps list as a relay and the only AVP's we
+%% look for are in the common dictionary. This is required since the
+%% relay dictionary doesn't inherit the common dictionary (which maybe
+%% it should).
+get_avp_value(?RELAY, Name, Msg) ->
+ get_avp_value(?BASE, Name, Msg);
+
+%% Message sent as a header/avps list, probably a relay case but not
+%% necessarily.
+get_avp_value(Dict, Name, [#diameter_header{} | Avps]) ->
+ try
+ {Code, _, VId} = Dict:avp_header(Name),
+ [A|_] = lists:dropwhile(fun(#diameter_avp{code = C, vendor_id = V}) ->
+ C /= Code orelse V /= VId
+ end,
+ Avps),
+ avp_decode(Dict, Name, A)
+ catch
+ error: _ ->
+ undefined
+ end;
+
+%% Outgoing message as a name/values list.
+get_avp_value(_, Name, [_MsgName | Avps]) ->
+ case lists:keyfind(Name, 1, Avps) of
+ {_, V} ->
+ V;
+ _ ->
+ undefined
+ end;
+
+%% Message is typically a record but not necessarily.
+get_avp_value(Dict, Name, Rec) ->
+ try
+ Dict:'#get-'(Name, Rec)
+ catch
+ error:_ ->
+ undefined
+ end.
+
+avp_decode(Dict, Name, #diameter_avp{value = undefined,
+ data = Bin}) ->
+ Dict:avp(decode, Bin, Name);
+avp_decode(_, _, #diameter_avp{value = V}) ->
+ V.
+
+cb(#diameter_app{module = [_|_] = M}, F, A) ->
+ eval(M, F, A);
+cb([_|_] = M, F, A) ->
+ eval(M, F, A).
+
+eval([M|X], F, A) ->
+ apply(M, F, A ++ X).
+
+choose(true, X, _) -> X;
+choose(false, _, X) -> X.
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index a5429c967c..073a415d10 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -45,6 +45,8 @@
-define(DEFAULT_TW_INIT, 30000). %% RFC 3539 ch 3.4.1
-define(NOMASK, {0,32}). %% default sequence mask
+-define(BASE, ?DIAMETER_DICT_COMMON).
+
-record(watchdog,
{%% PCB - Peer Control Block; see RFC 3539, Appendix A
status = initial :: initial | okay | suspect | down | reopen,
@@ -56,33 +58,36 @@
%% end PCB
parent = self() :: pid(), %% service process
transport :: pid() | undefined, %% peer_fsm process
- tref :: reference(), %% reference for current watchdog timer
- message_data, %% term passed into diameter_service with message
+ tref :: reference(), %% reference for current watchdog timer
+ dictionary :: module(), %% common dictionary
+ receive_data :: term(),
+ %% term passed into diameter_service with incoming message
sequence :: diameter:sequence(), %% mask
restrict :: {diameter:restriction(), boolean()},
shutdown = false :: boolean()}).
+%% ---------------------------------------------------------------------------
%% start/2
%%
%% Start a monitor before the watchdog is allowed to proceed to ensure
%% that a failed capabilities exchange produces the desired exit
%% reason.
+%% ---------------------------------------------------------------------------
--spec start(Type, {RecvData, [Opt], SvcName, SvcOpts, #diameter_service{}})
+-spec start(Type, {RecvData, [Opt], SvcOpts, #diameter_service{}})
-> {reference(), pid()}
when Type :: {connect|accept, diameter:transport_ref()},
RecvData :: term(),
Opt :: diameter:transport_opt(),
- SvcOpts :: [diameter:service_opt()],
- SvcName :: diameter:service_name().
+ SvcOpts :: [diameter:service_opt()].
start({_,_} = Type, T) ->
- Ref = make_ref(),
- {ok, Pid} = diameter_watchdog_sup:start_child({Ref, {Type, self(), T}}),
+ Ack = make_ref(),
+ {ok, Pid} = diameter_watchdog_sup:start_child({Ack, Type, self(), T}),
try
{erlang:monitor(process, Pid), Pid}
after
- Pid ! Ref
+ send(Pid, Ack)
end.
start_link(T) ->
@@ -101,39 +106,95 @@ init(T) ->
proc_lib:init_ack({ok, self()}),
gen_server:enter_loop(?MODULE, [], i(T)).
-i({Ref, {_, Pid, _} = T}) ->
- MRef = erlang:monitor(process, Pid),
- receive
- Ref ->
- make_state(T);
- {'DOWN', MRef, process, _, _} = D ->
- exit({shutdown, D})
- end.
-
-make_state({T, Pid, {RecvData,
- Opts,
- SvcName,
- SvcOpts,
- #diameter_service{applications = Apps,
- capabilities = Caps}
- = Svc}}) ->
+i({Ack, T, Pid, {RecvData,
+ Opts,
+ SvcOpts,
+ #diameter_service{applications = Apps,
+ capabilities = Caps}
+ = Svc}}) ->
+ erlang:monitor(process, Pid),
+ wait(Ack, Pid),
random:seed(now()),
putr(restart, {T, Opts, Svc}), %% save seeing it in trace
putr(dwr, dwr(Caps)), %%
{_,_} = Mask = proplists:get_value(sequence, SvcOpts),
Restrict = proplists:get_value(restrict_connections, SvcOpts),
Nodes = restrict_nodes(Restrict),
+ Dict0 = common_dictionary(Apps),
#watchdog{parent = Pid,
- transport = monitor(diameter_peer_fsm:start(T,
- Opts,
- {Mask, Nodes, Svc})),
+ transport = start(T, Opts, Mask, Nodes, Dict0, Svc),
tw = proplists:get_value(watchdog_timer,
Opts,
?DEFAULT_TW_INIT),
- message_data = {RecvData, SvcName, Apps, Mask},
+ receive_data = RecvData,
+ dictionary = Dict0,
sequence = Mask,
restrict = {Restrict, lists:member(node(), Nodes)}}.
+wait(Ref, Pid) ->
+ receive
+ Ref ->
+ ok;
+ {'DOWN', _, process, Pid, _} = D ->
+ exit({shutdown, D})
+ end.
+
+%% start/5
+
+start(T, Opts, Mask, Nodes, Dict0, Svc) ->
+ {_MRef, Pid}
+ = diameter_peer_fsm:start(T, Opts, {Mask, Nodes, Dict0, Svc}),
+ Pid.
+
+%% common_dictionary/1
+%%
+%% Determine the dictionary of the Diameter common application with
+%% Application Id 0. Fail on config errors.
+
+common_dictionary(Apps) ->
+ case
+ orddict:fold(fun dict0/3,
+ false,
+ lists:foldl(fun(#diameter_app{dictionary = M}, D) ->
+ orddict:append(M:id(), M, D)
+ end,
+ orddict:new(),
+ Apps))
+ of
+ {value, Mod} ->
+ Mod;
+ false ->
+ %% A transport should configure a common dictionary but
+ %% don't require it. Not configuring a common dictionary
+ %% means a user won't be able either send of receive
+ %% messages in the common dictionary: incoming request
+ %% will be answered with 3007 and outgoing requests cannot
+ %% be sent. The dictionary returned here is oly used for
+ %% messages diameter sends and receives: CER/CEA, DPR/DPA
+ %% and DWR/DWA.
+ ?BASE
+ end.
+
+%% Each application should be represented by a single dictionary.
+dict0(Id, [_,_|_] = Ms, _) ->
+ config_error({multiple_dictionaries, Ms, {application_id, Id}});
+
+%% An explicit common dictionary.
+dict0(?APP_ID_COMMON, [Mod], _) ->
+ {value, Mod};
+
+%% A pure relay, in which case the common application is implicit.
+%% This uses the fact that the common application will already have
+%% been folded.
+dict0(?APP_ID_RELAY, _, false) ->
+ {value, ?BASE};
+
+dict0(_, _, Acc) ->
+ Acc.
+
+config_error(T) ->
+ ?ERROR({configuration_error, T}).
+
%% handle_call/3
handle_call(_, _, State) ->
@@ -151,41 +212,59 @@ handle_info(T, #watchdog{} = State) ->
ok ->
{noreply, State};
#watchdog{} = S ->
- event(State, S),
+ close(T, State), %% service expects 'close' message
+ event(T, State, S), %% before 'watchdog'
{noreply, S};
stop ->
?LOG(stop, T),
- event(State, State#watchdog{status = down}),
+ event(T, State, State#watchdog{status = down}),
{stop, {shutdown, T}, State}
- end;
+ end.
-handle_info(T, S) ->
- handle_info(T, upgrade(S)).
+close({'DOWN', _, process, TPid, {shutdown, Reason}},
+ #watchdog{transport = TPid,
+ parent = Pid}) ->
+ send(Pid, {close, self(), Reason});
-upgrade(S) ->
- #watchdog{} = list_to_tuple(tuple_to_list(S)
- ++ [?NOMASK, {nodes, true}, false]).
+close(_, _) ->
+ ok.
-event(#watchdog{status = T}, #watchdog{status = T}) ->
+event(_, #watchdog{status = T}, #watchdog{status = T}) ->
ok;
-event(#watchdog{transport = undefined}, #watchdog{transport = undefined}) ->
+event(_, #watchdog{transport = undefined}, #watchdog{transport = undefined}) ->
ok;
-event(#watchdog{status = From, transport = F, parent = Pid},
+event(Msg,
+ #watchdog{status = From, transport = F, parent = Pid},
#watchdog{status = To, transport = T}) ->
- E = {tpid(F,T), From, To},
- notify(Pid, E),
+ TPid = tpid(F,T),
+ E = {[TPid | data(Msg, TPid, From, To)], From, To},
+ send(Pid, {watchdog, self(), E}),
?LOG(transition, {self(), E}).
+data(Msg, TPid, reopen, okay) ->
+ {recv, TPid, 'DWA', _Pkt} = Msg, %% assert
+ {TPid, T} = eraser(open),
+ [T];
+
+data({open, TPid, _Hosts, T}, TPid, _From, To)
+ when To == okay;
+ To == reopen ->
+ [T];
+
+data(_, _, _, _) ->
+ [].
+
tpid(_, Pid)
when is_pid(Pid) ->
Pid;
+
tpid(Pid, _) ->
Pid.
-notify(Pid, E) ->
- Pid ! {watchdog, self(), E}.
+send(Pid, T) ->
+ Pid ! T.
%% terminate/2
@@ -215,15 +294,13 @@ transition(close, #watchdog{}) ->
ok;
%% Service is asking for the peer to be taken down gracefully.
-transition({shutdown, Pid}, #watchdog{parent = Pid,
- transport = undefined,
- status = S}) ->
- down = S, %% sanity check
+transition({shutdown, Pid, _}, #watchdog{parent = Pid,
+ transport = undefined}) ->
stop;
-transition({shutdown = T, Pid}, #watchdog{parent = Pid,
- transport = TPid}
- = S) ->
- TPid ! {T, self()},
+transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid,
+ transport = TPid}
+ = S) ->
+ send(TPid, {T, self(), Reason}),
S#watchdog{shutdown = true};
%% Parent process has died,
@@ -234,13 +311,9 @@ transition({'DOWN', _, process, Pid, _Reason},
%% Transport has accepted a connection.
transition({accepted = T, TPid}, #watchdog{transport = TPid,
parent = Pid}) ->
- Pid ! {T, self(), TPid},
+ send(Pid, {T, self(), TPid}),
ok;
-%% Transport is telling us that its impending death isn't failure.
-transition({close, TPid, _Reason}, #watchdog{transport = TPid}) ->
- stop;
-
%% STATE Event Actions New State
%% ===== ------ ------- ----------
%% INITIAL Connection up SetWatchdog() OKAY
@@ -255,15 +328,13 @@ transition({close, TPid, _Reason}, #watchdog{transport = TPid}) ->
%% know the identity of the peer (ie. now) that we know that we're in
%% state down rather than initial.
-transition({open, TPid, Hosts, T} = Open,
+transition({open, TPid, Hosts, _} = Open,
#watchdog{transport = TPid,
status = initial,
- parent = Pid,
restrict = {_, R}}
= S) ->
case okay(getr(restart), Hosts, R) of
okay ->
- open(Pid, {TPid, T}),
set_watchdog(S#watchdog{status = okay});
reopen ->
transition(Open, S#watchdog{status = down})
@@ -274,17 +345,15 @@ transition({open, TPid, Hosts, T} = Open,
%% SetWatchdog()
%% Pending = TRUE REOPEN
-transition({open = P, TPid, _Hosts, T},
+transition({open = Key, TPid, _Hosts, T},
#watchdog{transport = TPid,
- parent = Pid,
status = down}
= S) ->
%% Store the info we need to notify the parent to reopen the
%% connection after the requisite DWA's are received, at which
%% time we eraser(open). The reopen message is a later addition,
%% to communicate the new capabilities as soon as they're known.
- putr(P, {TPid, T}),
- Pid ! {reopen, self(), {TPid, T}},
+ putr(Key, {TPid, T}),
set_watchdog(send_watchdog(S#watchdog{status = reopen,
num_dwa = 0}));
@@ -296,26 +365,18 @@ transition({open = P, TPid, _Hosts, T},
%% REOPEN Connection down CloseConnection()
%% SetWatchdog() DOWN
-transition({'DOWN', _, process, TPid, _},
+transition({'DOWN', _, process, TPid, _Reason},
#watchdog{transport = TPid,
- status = S,
- shutdown = D})
- when S == initial;
- D ->
+ shutdown = true}) ->
stop;
-transition({'DOWN', _, process, TPid, _},
- #watchdog{transport = TPid}
+transition({'DOWN', _, process, TPid, _Reason},
+ #watchdog{transport = TPid,
+ status = T}
= S) ->
- failover(S),
- close(S),
- set_watchdog(S#watchdog{status = down,
+ set_watchdog(S#watchdog{status = case T of initial -> T; _ -> down end,
pending = false,
transport = undefined});
-%% Any outstanding pending (or other messages from the transport) will
-%% have arrived before 'DOWN' since the message comes from the same
-%% process. Note that we could also get this message in the initial
-%% state.
%% Incoming message.
transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) ->
@@ -331,15 +392,11 @@ transition({timeout, _, tw}, #watchdog{}) ->
%% State query.
transition({state, Pid}, #watchdog{status = S}) ->
- Pid ! {self(), S},
+ send(Pid, {self(), S}),
ok.
%% ===========================================================================
-monitor(Pid) ->
- erlang:monitor(process, Pid),
- Pid.
-
putr(Key, Val) ->
put({?MODULE, Key}, Val).
@@ -349,16 +406,16 @@ getr(Key) ->
eraser(Key) ->
erase({?MODULE, Key}).
-%% encode/2
+%% encode/3
-encode(Msg, Mask) ->
+encode(Msg, Mask, Dict) ->
Seq = diameter_session:sequence(Mask),
Hdr = #diameter_header{version = ?DIAMETER_VERSION,
end_to_end_id = Seq,
hop_by_hop_id = Seq},
Pkt = #diameter_packet{header = Hdr,
msg = Msg},
- #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt),
+ #diameter_packet{bin = Bin} = diameter_codec:encode(Dict, Pkt),
Bin.
%% okay/3
@@ -386,7 +443,7 @@ okay([{_,P}]) ->
%% ... or it has.
okay(C) ->
- [_|_] = [P ! close || {_,P} <- C, self() /= P],
+ [_|_] = [send(P, close) || {_,P} <- C, self() /= P],
reopen.
%% set_watchdog/1
@@ -408,36 +465,14 @@ tw(T)
tw({M,F,A}) ->
apply(M,F,A).
-%% open/2
-
-open(Pid, {_,_} = T) ->
- Pid ! {connection_up, self(), T}.
-
-%% failover/1
-
-failover(#watchdog{status = okay,
- parent = Pid}) ->
- Pid ! {connection_down, self()};
-
-failover(_) ->
- ok.
-
-%% close/1
-
-close(#watchdog{status = down}) ->
- ok;
-
-close(#watchdog{parent = Pid}) ->
- {{T, _}, _, _} = getr(restart),
- T == accept andalso (Pid ! {close, self()}).
-
%% send_watchdog/1
send_watchdog(#watchdog{pending = false,
transport = TPid,
+ dictionary = Dict0,
sequence = Mask}
= S) ->
- TPid ! {send, encode(getr(dwr), Mask)},
+ send(TPid, {send, encode(getr(dwr), Mask, Dict0)}),
?LOG(send, 'DWR'),
S#watchdog{pending = true}.
@@ -465,8 +500,9 @@ rcv(N, _, _)
false;
rcv(_, Pkt, #watchdog{transport = TPid,
- message_data = T}) ->
- diameter_service:receive_message(TPid, Pkt, T).
+ dictionary = Dict0,
+ receive_data = T}) ->
+ diameter_traffic:receive_message(TPid, Pkt, Dict0, T).
throwaway(S) ->
throw({?MODULE, throwaway, S}).
@@ -518,12 +554,10 @@ rcv(_, #watchdog{status = okay} = S) ->
%% SetWatchdog() OKAY
rcv('DWA', #watchdog{status = suspect} = S) ->
- failback(S),
set_watchdog(S#watchdog{status = okay,
pending = false});
rcv(_, #watchdog{status = suspect} = S) ->
- failback(S),
set_watchdog(S#watchdog{status = okay});
%% REOPEN Receive DWA & Pending = FALSE
@@ -531,10 +565,8 @@ rcv(_, #watchdog{status = suspect} = S) ->
%% Failback() OKAY
rcv('DWA', #watchdog{status = reopen,
- num_dwa = 2 = N,
- parent = Pid}
+ num_dwa = 2 = N}
= S) ->
- open(Pid, eraser(open)),
S#watchdog{status = okay,
num_dwa = N+1,
pending = false};
@@ -553,11 +585,6 @@ rcv('DWA', #watchdog{status = reopen,
rcv(_, #watchdog{status = reopen} = S) ->
throwaway(S).
-%% failback/1
-
-failback(#watchdog{parent = Pid}) ->
- Pid ! {connection_up, self()}.
-
%% timeout/1
%%
%% The caller sets the watchdog on the return value.
@@ -582,7 +609,6 @@ timeout(#watchdog{status = T,
timeout(#watchdog{status = okay,
pending = true}
= S) ->
- failover(S),
S#watchdog{status = suspect};
%% SUSPECT Timer expires CloseConnection()
@@ -599,7 +625,6 @@ timeout(#watchdog{status = T,
when T == suspect;
T == reopen, P, N < 0 ->
exit(TPid, {shutdown, watchdog_timeout}),
- close(S),
S#watchdog{status = down};
%% REOPEN Timer expires & NumDWA = -1
@@ -633,7 +658,9 @@ timeout(#watchdog{status = reopen,
%% process has died. We only need to handle state down since we start
%% the first watchdog when transitioning out of initial.
-timeout(#watchdog{status = down} = S) ->
+timeout(#watchdog{status = T} = S)
+ when T == initial;
+ T == down ->
restart(S).
%% restart/1
@@ -655,15 +682,15 @@ restart(S) ->
%% state down rather then initial when receiving notification of an
%% open connection.
-restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid,
- sequence = Mask,
- restrict = {R,_}}
- = S) ->
- Pid ! {reconnect, self()},
+restart({{connect, _} = T, Opts, Svc},
+ #watchdog{parent = Pid,
+ sequence = Mask,
+ restrict = {R,_},
+ dictionary = Dict0}
+ = S) ->
+ send(Pid, {reconnect, self()}),
Nodes = restrict_nodes(R),
- S#watchdog{transport = monitor(diameter_peer_fsm:start(T,
- Opts,
- {Mask, Nodes, Svc})),
+ S#watchdog{transport = start(T, Opts, Mask, Nodes, Dict0, Svc),
restrict = {R, lists:member(node(), Nodes)}};
%% No restriction on the number of connections to the same peer: just
diff --git a/lib/diameter/src/compiler/diameter_exprecs.erl b/lib/diameter/src/compiler/diameter_exprecs.erl
index 191f53f29d..cc8458efa1 100644
--- a/lib/diameter/src/compiler/diameter_exprecs.erl
+++ b/lib/diameter/src/compiler/diameter_exprecs.erl
@@ -18,7 +18,7 @@
%%
%%
-%% Parse transform for generating record access functions
+%% Parse transform for generating record access functions.
%%
%% This parse transform can be used to reduce compile-time
%% dependencies in large systems.
@@ -39,21 +39,21 @@
%%
%% export_records([RecName, ...])
%%
-%% causes this transform to lay out access functions for the exported
-%% records:
+%% causes this transform to insert functions for the exported records:
%%
%% -module(foo)
%% -compile({parse_transform, diameter_exprecs}).
%%
%% -record(r, {a, b, c}).
-%% -export_records([a]).
+%% -export_records([r]).
%%
%% -export(['#info-'/1, '#info-'/2,
-%% '#new-'/1, '#new-'/2,
-%% '#get-'/2, '#set-'/2,
-%% '#new-a'/0, '#new-a'/1,
-%% '#get-a'/2, '#set-a'/2,
-%% '#info-a'/1]).
+%% '#new-'/1, '#new-'/2,
+%% '#get-'/1', '#get-'/2,
+%% '#set-'/2,
+%% '#new-r'/0, '#new-r'/1,
+%% '#get-r'/2, '#set-r'/2,
+%% '#info-r'/1]).
%%
%% '#info-'(RecName) ->
%% '#info-'(RecName, fields).
@@ -61,15 +61,23 @@
%% '#info-'(r, Info) ->
%% '#info-r'(Info).
%%
+%% '#new-'([r | Vals]) -> '#new-r'(Vals);
%% '#new-'(r) -> #r{}.
-%% '#new-'(r, Vals) -> '#new-r'(Vals)
+%%
+%% '#new-'(r, Vals) -> '#new-r'(Vals).
%%
%% '#new-r'() -> #r{}.
%% '#new-r'(Vals) -> '#set-r'(Vals, #r{}).
%%
+%% '#get-'(#r{} = Rec) ->
+%% [r | '#get-r'(Rec)].
+%%
%% '#get-'(Attrs, #r{} = Rec) ->
%% '#get-r'(Attrs, Rec).
%%
+%% '#get-r'(#r{} = Rec) ->
+%% lists:zip([a,b,c], tl(tuple_to_list(Rec))).
+%%
%% '#get-r'(Attrs, Rec) when is_list(Attrs) ->
%% ['#get-r'(A, Rec) || A <- Attrs];
%% '#get-r'(a, Rec) -> Rec#r.a;
@@ -116,6 +124,7 @@ a_export(Exports) ->
{fname(info), 2},
{fname(new), 1},
{fname(new), 2},
+ {fname(get), 1},
{fname(get), 2},
{fname(set), 2}
| lists:flatmap(fun export/1, Exports)]}.
@@ -124,6 +133,7 @@ export(Rname) ->
New = fname(new, Rname),
[{New, 0},
{New, 1},
+ {fname(get, Rname), 1},
{fname(get, Rname), 2},
{fname(set, Rname), 2},
{fname(info, Rname), 1}].
@@ -135,6 +145,7 @@ f_accessors(Es, Rs) ->
'#info-/2'(Es),
'#new-/1'(Es),
'#new-/2'(Es),
+ '#get-/1'(Es),
'#get-/2'(Es),
'#set-/2'(Es)
| lists:flatmap(fun(N) -> accessors(N, fields(N, Rs)) end, Es)].
@@ -142,6 +153,7 @@ f_accessors(Es, Rs) ->
accessors(Rname, Fields) ->
['#new-X/0'(Rname),
'#new-X/1'(Rname),
+ '#get-X/1'(Rname, Fields),
'#get-X/2'(Rname, Fields),
'#set-X/2'(Rname, Fields),
'#info-X/1'(Rname, Fields)].
@@ -183,12 +195,15 @@ fname(Op, Rname) ->
'#new-/1'(Exports) ->
{?function, fname(new), 1,
- lists:map(fun 'new-'/1, Exports) ++ [?BADARG(1)]}.
+ lists:flatmap(fun 'new-'/1, Exports) ++ [?BADARG(1)]}.
'new-'(R) ->
- {?clause, [?ATOM(R)],
- [],
- [{?record, R, []}]}.
+ [{?clause, [?ATOM(R)],
+ [],
+ [{?record, R, []}]},
+ {?clause, [{?cons, ?ATOM(R), ?VAR('Vals')}],
+ [],
+ [?CALL(fname(new, R), [?VAR('Vals')])]}].
'#new-/2'(Exports) ->
{?function, fname(new), 2,
@@ -199,6 +214,15 @@ fname(Op, Rname) ->
[],
[?CALL(fname(new, R), [?VAR('Vals')])]}.
+'#get-/1'(Exports) ->
+ {?function, fname(get), 1,
+ lists:map(fun 'get--'/1, Exports) ++ [?BADARG(1)]}.
+
+'get--'(R) ->
+ {?clause, [{?match, {?record, R, []}, ?VAR('Rec')}],
+ [],
+ [{?cons, ?ATOM(R), ?CALL(fname(get, R), [?VAR('Rec')])}]}.
+
'#get-/2'(Exports) ->
{?function, fname(get), 2,
lists:map(fun 'get-'/1, Exports) ++ [?BADARG(2)]}.
@@ -245,6 +269,14 @@ fname(Op, Rname) ->
[{?record, ?VAR('Rec'), Rname,
[{?record_field, ?ATOM(Attr), ?VAR('V')}]}]}.
+'#get-X/1'(Rname, Fields) ->
+ FName = fname(get, Rname),
+ Values = ?CALL(tl, [?CALL(tuple_to_list, [?VAR('Rec')])]),
+ {?function, FName, 1,
+ [{?clause, [?VAR('Rec')],
+ [],
+ [?APPLY(lists, zip, [?TERM(Fields), Values])]}]}.
+
'#get-X/2'(Rname, Fields) ->
FName = fname(get, Rname),
{?function, FName, 2,
diff --git a/lib/diameter/src/compiler/diameter_forms.hrl b/lib/diameter/src/compiler/diameter_forms.hrl
index 4cd86c32aa..1a0f3492ea 100644
--- a/lib/diameter/src/compiler/diameter_forms.hrl
+++ b/lib/diameter/src/compiler/diameter_forms.hrl
@@ -36,6 +36,7 @@
-define(clause, ?F(clause)).
-define(function, ?F(function)).
-define(call, ?F(call)).
+-define(cons, ?F(cons)).
-define('fun', ?F('fun')).
-define(generate, ?F(generate)).
-define(lc, ?F(lc)).
diff --git a/lib/diameter/src/dict/acct_rfc6733.dia b/lib/diameter/src/dict/acct_rfc6733.dia
new file mode 100644
index 0000000000..7d6d11a71e
--- /dev/null
+++ b/lib/diameter/src/dict/acct_rfc6733.dia
@@ -0,0 +1,72 @@
+;;
+;; %CopyrightBegin%
+;;
+;; Copyright Ericsson AB 2013. All Rights Reserved.
+;;
+;; The contents of this file are subject to the Erlang Public License,
+;; Version 1.1, (the "License"); you may not use this file except in
+;; compliance with the License. You should have received a copy of the
+;; Erlang Public License along with this software. If not, it can be
+;; retrieved online at http://www.erlang.org/.
+;;
+;; Software distributed under the License is distributed on an "AS IS"
+;; basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+;; the License for the specific language governing rights and limitations
+;; under the License.
+;;
+;; %CopyrightEnd%
+;;
+
+@id 3
+@name diameter_gen_acct_rfc6733
+@prefix diameter_base_accounting
+@vendor 0 IETF
+
+@inherits diameter_gen_base_rfc6733
+
+@messages
+
+ ACR ::= < Diameter Header: 271, REQ, PXY >
+ < Session-Id >
+ { Origin-Host }
+ { Origin-Realm }
+ { Destination-Realm }
+ { Accounting-Record-Type }
+ { Accounting-Record-Number }
+ [ Acct-Application-Id ]
+ [ Vendor-Specific-Application-Id ]
+ [ User-Name ]
+ [ Destination-Host ]
+ [ Accounting-Sub-Session-Id ]
+ [ Acct-Session-Id ]
+ [ Acct-Multi-Session-Id ]
+ [ Acct-Interim-Interval ]
+ [ Accounting-Realtime-Required ]
+ [ Origin-State-Id ]
+ [ Event-Timestamp ]
+ * [ Proxy-Info ]
+ * [ Route-Record ]
+ * [ AVP ]
+
+ ACA ::= < Diameter Header: 271, PXY >
+ < Session-Id >
+ { Result-Code }
+ { Origin-Host }
+ { Origin-Realm }
+ { Accounting-Record-Type }
+ { Accounting-Record-Number }
+ [ Acct-Application-Id ]
+ [ Vendor-Specific-Application-Id ]
+ [ User-Name ]
+ [ Accounting-Sub-Session-Id ]
+ [ Acct-Session-Id ]
+ [ Acct-Multi-Session-Id ]
+ [ Error-Message ]
+ [ Error-Reporting-Host ]
+ [ Failed-AVP ]
+ [ Acct-Interim-Interval ]
+ [ Accounting-Realtime-Required ]
+ [ Origin-State-Id ]
+ [ Event-Timestamp ]
+ * [ Proxy-Info ]
+ * [ AVP ]
diff --git a/lib/diameter/src/dict/base_rfc6733.dia b/lib/diameter/src/dict/base_rfc6733.dia
new file mode 100644
index 0000000000..e1d1f18d86
--- /dev/null
+++ b/lib/diameter/src/dict/base_rfc6733.dia
@@ -0,0 +1,415 @@
+;;
+;; %CopyrightBegin%
+;;
+;; Copyright Ericsson AB 2013. All Rights Reserved.
+;;
+;; The contents of this file are subject to the Erlang Public License,
+;; Version 1.1, (the "License"); you may not use this file except in
+;; compliance with the License. You should have received a copy of the
+;; Erlang Public License along with this software. If not, it can be
+;; retrieved online at http://www.erlang.org/.
+;;
+;; Software distributed under the License is distributed on an "AS IS"
+;; basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+;; the License for the specific language governing rights and limitations
+;; under the License.
+;;
+;; %CopyrightEnd%
+;;
+
+@id 0
+@name diameter_gen_base_rfc6733
+@prefix diameter_base
+@vendor 0 IETF
+
+@avp_types
+
+ Acct-Interim-Interval 85 Unsigned32 M
+ Accounting-Realtime-Required 483 Enumerated M
+ Acct-Multi-Session-Id 50 UTF8String M
+ Accounting-Record-Number 485 Unsigned32 M
+ Accounting-Record-Type 480 Enumerated M
+ Acct-Session-Id 44 OctetString M
+ Accounting-Sub-Session-Id 287 Unsigned64 M
+ Acct-Application-Id 259 Unsigned32 M
+ Auth-Application-Id 258 Unsigned32 M
+ Auth-Request-Type 274 Enumerated M
+ Authorization-Lifetime 291 Unsigned32 M
+ Auth-Grace-Period 276 Unsigned32 M
+ Auth-Session-State 277 Enumerated M
+ Re-Auth-Request-Type 285 Enumerated M
+ Class 25 OctetString M
+ Destination-Host 293 DiamIdent M
+ Destination-Realm 283 DiamIdent M
+ Disconnect-Cause 273 Enumerated M
+ Error-Message 281 UTF8String -
+ Error-Reporting-Host 294 DiamIdent -
+ Event-Timestamp 55 Time M
+ Experimental-Result 297 Grouped M
+ Experimental-Result-Code 298 Unsigned32 M
+ Failed-AVP 279 Grouped M
+ Firmware-Revision 267 Unsigned32 -
+ Host-IP-Address 257 Address M
+ Inband-Security-Id 299 Unsigned32 M
+ Multi-Round-Time-Out 272 Unsigned32 M
+ Origin-Host 264 DiamIdent M
+ Origin-Realm 296 DiamIdent M
+ Origin-State-Id 278 Unsigned32 M
+ Product-Name 269 UTF8String -
+ Proxy-Host 280 DiamIdent M
+ Proxy-Info 284 Grouped M
+ Proxy-State 33 OctetString M
+ Redirect-Host 292 DiamURI M
+ Redirect-Host-Usage 261 Enumerated M
+ Redirect-Max-Cache-Time 262 Unsigned32 M
+ Result-Code 268 Unsigned32 M
+ Route-Record 282 DiamIdent M
+ Session-Id 263 UTF8String M
+ Session-Timeout 27 Unsigned32 M
+ Session-Binding 270 Unsigned32 M
+ Session-Server-Failover 271 Enumerated M
+ Supported-Vendor-Id 265 Unsigned32 M
+ Termination-Cause 295 Enumerated M
+ User-Name 1 UTF8String M
+ Vendor-Id 266 Unsigned32 M
+ Vendor-Specific-Application-Id 260 Grouped M
+
+@messages
+
+ CER ::= < Diameter Header: 257, REQ >
+ { Origin-Host }
+ { Origin-Realm }
+ 1* { Host-IP-Address }
+ { Vendor-Id }
+ { Product-Name }
+ [ Origin-State-Id ]
+ * [ Supported-Vendor-Id ]
+ * [ Auth-Application-Id ]
+ * [ Inband-Security-Id ]
+ * [ Acct-Application-Id ]
+ * [ Vendor-Specific-Application-Id ]
+ [ Firmware-Revision ]
+ * [ AVP ]
+
+ CEA ::= < Diameter Header: 257 >
+ { Result-Code }
+ { Origin-Host }
+ { Origin-Realm }
+ 1* { Host-IP-Address }
+ { Vendor-Id }
+ { Product-Name }
+ [ Origin-State-Id ]
+ [ Error-Message ]
+ [ Failed-AVP ]
+ * [ Supported-Vendor-Id ]
+ * [ Auth-Application-Id ]
+ * [ Inband-Security-Id ]
+ * [ Acct-Application-Id ]
+ * [ Vendor-Specific-Application-Id ]
+ [ Firmware-Revision ]
+ * [ AVP ]
+
+ DPR ::= < Diameter Header: 282, REQ >
+ { Origin-Host }
+ { Origin-Realm }
+ { Disconnect-Cause }
+ * [ AVP ]
+
+ DPA ::= < Diameter Header: 282 >
+ { Result-Code }
+ { Origin-Host }
+ { Origin-Realm }
+ [ Error-Message ]
+ [ Failed-AVP ]
+ * [ AVP ]
+
+ DWR ::= < Diameter Header: 280, REQ >
+ { Origin-Host }
+ { Origin-Realm }
+ [ Origin-State-Id ]
+ * [ AVP ]
+
+ DWA ::= < Diameter Header: 280 >
+ { Result-Code }
+ { Origin-Host }
+ { Origin-Realm }
+ [ Error-Message ]
+ [ Failed-AVP ]
+ [ Origin-State-Id ]
+ * [ AVP ]
+
+ answer-message ::= < Diameter Header: code, ERR [PXY] >
+ 0*1 < Session-Id >
+ { Origin-Host }
+ { Origin-Realm }
+ { Result-Code }
+ [ Origin-State-Id ]
+ [ Error-Message ]
+ [ Error-Reporting-Host ]
+ [ Failed-AVP ]
+ [ Experimental-Result ]
+ * [ Proxy-Info ]
+ * [ AVP ]
+
+ RAR ::= < Diameter Header: 258, REQ, PXY >
+ < Session-Id >
+ { Origin-Host }
+ { Origin-Realm }
+ { Destination-Realm }
+ { Destination-Host }
+ { Auth-Application-Id }
+ { Re-Auth-Request-Type }
+ [ User-Name ]
+ [ Origin-State-Id ]
+ * [ Proxy-Info ]
+ * [ Route-Record ]
+ * [ AVP ]
+
+ RAA ::= < Diameter Header: 258, PXY >
+ < Session-Id >
+ { Result-Code }
+ { Origin-Host }
+ { Origin-Realm }
+ [ User-Name ]
+ [ Origin-State-Id ]
+ [ Error-Message ]
+ [ Error-Reporting-Host ]
+ [ Failed-AVP ]
+ * [ Redirect-Host ]
+ [ Redirect-Host-Usage ]
+ [ Redirect-Max-Cache-Time ]
+ * [ Proxy-Info ]
+ * [ AVP ]
+
+ STR ::= < Diameter Header: 275, REQ, PXY >
+ < Session-Id >
+ { Origin-Host }
+ { Origin-Realm }
+ { Destination-Realm }
+ { Auth-Application-Id }
+ { Termination-Cause }
+ [ User-Name ]
+ [ Destination-Host ]
+ * [ Class ]
+ [ Origin-State-Id ]
+ * [ Proxy-Info ]
+ * [ Route-Record ]
+ * [ AVP ]
+
+ STA ::= < Diameter Header: 275, PXY >
+ < Session-Id >
+ { Result-Code }
+ { Origin-Host }
+ { Origin-Realm }
+ [ User-Name ]
+ * [ Class ]
+ [ Error-Message ]
+ [ Error-Reporting-Host ]
+ [ Failed-AVP ]
+ [ Origin-State-Id ]
+ * [ Redirect-Host ]
+ [ Redirect-Host-Usage ]
+ [ Redirect-Max-Cache-Time ]
+ * [ Proxy-Info ]
+ * [ AVP ]
+
+ ASR ::= < Diameter Header: 274, REQ, PXY >
+ < Session-Id >
+ { Origin-Host }
+ { Origin-Realm }
+ { Destination-Realm }
+ { Destination-Host }
+ { Auth-Application-Id }
+ [ User-Name ]
+ [ Origin-State-Id ]
+ * [ Proxy-Info ]
+ * [ Route-Record ]
+ * [ AVP ]
+
+ ASA ::= < Diameter Header: 274, PXY >
+ < Session-Id >
+ { Result-Code }
+ { Origin-Host }
+ { Origin-Realm }
+ [ User-Name ]
+ [ Origin-State-Id ]
+ [ Error-Message ]
+ [ Error-Reporting-Host ]
+ [ Failed-AVP ]
+ * [ Redirect-Host ]
+ [ Redirect-Host-Usage ]
+ [ Redirect-Max-Cache-Time ]
+ * [ Proxy-Info ]
+ * [ AVP ]
+
+ ACR ::= < Diameter Header: 271, REQ, PXY >
+ < Session-Id >
+ { Origin-Host }
+ { Origin-Realm }
+ { Destination-Realm }
+ { Accounting-Record-Type }
+ { Accounting-Record-Number }
+ [ Acct-Application-Id ]
+ [ Vendor-Specific-Application-Id ]
+ [ User-Name ]
+ [ Destination-Host ]
+ [ Accounting-Sub-Session-Id ]
+ [ Acct-Session-Id ]
+ [ Acct-Multi-Session-Id ]
+ [ Acct-Interim-Interval ]
+ [ Accounting-Realtime-Required ]
+ [ Origin-State-Id ]
+ [ Event-Timestamp ]
+ * [ Proxy-Info ]
+ * [ Route-Record ]
+ * [ AVP ]
+
+ ACA ::= < Diameter Header: 271, PXY >
+ < Session-Id >
+ { Result-Code }
+ { Origin-Host }
+ { Origin-Realm }
+ { Accounting-Record-Type }
+ { Accounting-Record-Number }
+ [ Acct-Application-Id ]
+ [ Vendor-Specific-Application-Id ]
+ [ User-Name ]
+ [ Accounting-Sub-Session-Id ]
+ [ Acct-Session-Id ]
+ [ Acct-Multi-Session-Id ]
+ [ Error-Message ]
+ [ Error-Reporting-Host ]
+ [ Failed-AVP ]
+ [ Acct-Interim-Interval ]
+ [ Accounting-Realtime-Required ]
+ [ Origin-State-Id ]
+ [ Event-Timestamp ]
+ * [ Proxy-Info ]
+ * [ AVP ]
+
+@enum Disconnect-Cause
+
+ REBOOTING 0
+ BUSY 1
+ DO_NOT_WANT_TO_TALK_TO_YOU 2
+
+@enum Redirect-Host-Usage
+
+ DONT_CACHE 0
+ ALL_SESSION 1
+ ALL_REALM 2
+ REALM_AND_APPLICATION 3
+ ALL_APPLICATION 4
+ ALL_HOST 5
+ ALL_USER 6
+
+@enum Auth-Request-Type
+
+ AUTHENTICATE_ONLY 1
+ AUTHORIZE_ONLY 2
+ AUTHORIZE_AUTHENTICATE 3
+
+@enum Auth-Session-State
+
+ STATE_MAINTAINED 0
+ NO_STATE_MAINTAINED 1
+
+@enum Re-Auth-Request-Type
+
+ AUTHORIZE_ONLY 0
+ AUTHORIZE_AUTHENTICATE 1
+
+@enum Termination-Cause
+
+ LOGOUT 1
+ SERVICE_NOT_PROVIDED 2
+ BAD_ANSWER 3
+ ADMINISTRATIVE 4
+ LINK_BROKEN 5
+ AUTH_EXPIRED 6
+ USER_MOVED 7
+ SESSION_TIMEOUT 8
+
+@enum Session-Server-Failover
+
+ REFUSE_SERVICE 0
+ TRY_AGAIN 1
+ ALLOW_SERVICE 2
+ TRY_AGAIN_ALLOW_SERVICE 3
+
+@enum Accounting-Record-Type
+
+ EVENT_RECORD 1
+ START_RECORD 2
+ INTERIM_RECORD 3
+ STOP_RECORD 4
+
+@enum Accounting-Realtime-Required
+
+ DELIVER_AND_GRANT 1
+ GRANT_AND_STORE 2
+ GRANT_AND_LOSE 3
+
+@define Result-Code
+
+ ;; 7.1.1. Informational
+ MULTI_ROUND_AUTH 1001
+
+ ;; 7.1.2. Success
+ SUCCESS 2001
+ LIMITED_SUCCESS 2002
+
+ ;; 7.1.3. Protocol Errors
+ COMMAND_UNSUPPORTED 3001
+ UNABLE_TO_DELIVER 3002
+ REALM_NOT_SERVED 3003
+ TOO_BUSY 3004
+ LOOP_DETECTED 3005
+ REDIRECT_INDICATION 3006
+ APPLICATION_UNSUPPORTED 3007
+ INVALID_HDR_BITS 3008
+ INVALID_AVP_BITS 3009
+ UNKNOWN_PEER 3010
+
+ ;; 7.1.4. Transient Failures
+ AUTHENTICATION_REJECTED 4001
+ OUT_OF_SPACE 4002
+ ELECTION_LOST 4003
+
+ ;; 7.1.5. Permanent Failures
+ AVP_UNSUPPORTED 5001
+ UNKNOWN_SESSION_ID 5002
+ AUTHORIZATION_REJECTED 5003
+ INVALID_AVP_VALUE 5004
+ MISSING_AVP 5005
+ RESOURCES_EXCEEDED 5006
+ CONTRADICTING_AVPS 5007
+ AVP_NOT_ALLOWED 5008
+ AVP_OCCURS_TOO_MANY_TIMES 5009
+ NO_COMMON_APPLICATION 5010
+ UNSUPPORTED_VERSION 5011
+ UNABLE_TO_COMPLY 5012
+ INVALID_BIT_IN_HEADER 5013
+ INVALID_AVP_LENGTH 5014
+ INVALID_MESSAGE_LENGTH 5015
+ INVALID_AVP_BIT_COMBO 5016
+ NO_COMMON_SECURITY 5017
+
+@grouped
+
+ Proxy-Info ::= < AVP Header: 284 >
+ { Proxy-Host }
+ { Proxy-State }
+ * [ AVP ]
+
+ Failed-AVP ::= < AVP Header: 279 >
+ 1* {AVP}
+
+ Experimental-Result ::= < AVP Header: 297 >
+ { Vendor-Id }
+ { Experimental-Result-Code }
+
+ Vendor-Specific-Application-Id ::= < AVP Header: 260 >
+ { Vendor-Id }
+ [ Auth-Application-Id ]
+ [ Acct-Application-Id ]
diff --git a/lib/diameter/src/modules.mk b/lib/diameter/src/modules.mk
index 25207625be..2f3239e1aa 100644
--- a/lib/diameter/src/modules.mk
+++ b/lib/diameter/src/modules.mk
@@ -2,7 +2,7 @@
# %CopyrightBegin%
#
-# Copyright Ericsson AB 2010-2012. All Rights Reserved.
+# Copyright Ericsson AB 2010-2013. All Rights Reserved.
#
# The contents of this file are subject to the Erlang Public License,
# Version 1.1, (the "License"); you may not use this file except in
@@ -17,11 +17,13 @@
#
# %CopyrightEnd%
-# Runtime dictionary files in ./dict. Modules will be generated from
-# these are included in the app file.
+# Runtime dictionary files in ./dict. Modules generated from these are
+# included in the app file.
DICTS = \
base_rfc3588 \
+ base_rfc6733 \
base_accounting \
+ acct_rfc6733 \
relay
# The yecc grammar for the dictionary parser.
@@ -49,6 +51,7 @@ RT_MODULES = \
base/diameter_stats \
base/diameter_sup \
base/diameter_sync \
+ base/diameter_traffic \
base/diameter_types \
base/diameter_watchdog \
base/diameter_watchdog_sup \
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 3cb13d7043..8b8c2a6694 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -289,7 +289,7 @@ ports() ->
Ts = diameter_reg:match({?MODULE, '_', '_'}),
[{type(T), N, Pid} || {{?MODULE, T, {_, {_, S}}}, Pid} <- Ts,
{ok, N} <- [inet:port(S)]].
-
+
ports(Ref) ->
Ts = diameter_reg:match({?MODULE, '_', {Ref, '_'}}),
[{type(T), N, Pid} || {{?MODULE, T, {R, {_, S}}}, Pid} <- Ts,
@@ -484,8 +484,8 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
%% TLS over SCTP is described in RFC 3436 but has limitations as
%% described in RFC 6083. The latter describes DTLS over SCTP, which
%% addresses these limitations, DTLS itself being described in RFC
-%% 4347. TLS is primarily used over TCP, which the current RFC 3588
-%% draft acknowledges by equating TLS with TLS/TCP and DTLS/SCTP.
+%% 4347. TLS is primarily used over TCP, which RFC 6733 acknowledges
+%% by equating TLS with TLS/TCP and DTLS/SCTP.
transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
stop;
@@ -585,8 +585,7 @@ recv({_, #sctp_assoc_change{state = comm_up,
socket = Sock}
= S) ->
Ref = getr(?REF_KEY),
- is_reference(Ref) %% started in new code
- andalso publish(T, Ref, Id, Sock),
+ publish(T, Ref, Id, Sock),
up(S#transport{assoc_id = Id,
streams = {IS, OS}});
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 7ec7b1c5e7..132088b514 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -52,7 +52,10 @@
-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1
-define(LISTENER_TIMEOUT, 30000).
--define(FRAGMENT_TIMEOUT, 1000).
+-define(DEFAULT_FRAGMENT_TIMEOUT, 1000).
+
+-define(IS_UINT32(N), (is_integer(N) andalso 0 =< N andalso 0 == N bsr 32)).
+-define(IS_TIMEOUT(N), (infinity == N orelse ?IS_UINT32(N))).
%% cb_info passed to ssl.
-define(TCP_CB(Mod), {Mod, tcp, tcp_closed, tcp_error}).
@@ -72,7 +75,6 @@
{parent :: pid(),
transport = self() :: pid()}).
--type tref() :: reference(). %% timer reference
-type length() :: 0..16#FFFFFF. %% message length from Diameter header
-type size() :: non_neg_integer(). %% accumulated binary size
-type frag() :: {length(), size(), binary(), list(binary())}
@@ -83,8 +85,11 @@
{socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket
parent :: pid(), %% of process that started us
module :: module(), %% gen_tcp-like module
- frag = <<>> :: binary() | {tref(), frag()}, %% message fragment
- ssl :: boolean() | [term()]}). %% ssl options
+ frag = <<>> :: frag(), %% message fragment
+ ssl :: boolean() | [term()], %% ssl options
+ timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
+ tref = false :: false | reference(), %% fragment timer reference
+ flush = false :: boolean()}). %% flush fragment at timeout?
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
%% (for simplicity) transport option. The transport_module diameter_etcp
@@ -161,7 +166,12 @@ i({T, Ref, Mod, Pid, Opts, Addrs})
%% that does nothing but kill us with the parent until call
%% returns.
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
- {SslOpts, Rest} = ssl(Opts),
+ {SslOpts, Rest0} = ssl(Opts),
+ {OwnOpts, Rest} = own(Rest0),
+ Tmo = proplists:get_value(fragment_timer,
+ OwnOpts,
+ ?DEFAULT_FRAGMENT_TIMEOUT),
+ ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
Sock = i(T, Ref, Mod, Pid, SslOpts, Rest, Addrs),
MPid ! {stop, self()}, %% tell the monitor to die
M = if SslOpts -> ssl; true -> Mod end,
@@ -170,7 +180,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs})
#transport{parent = Pid,
module = M,
socket = Sock,
- ssl = SslOpts};
+ ssl = SslOpts,
+ timeout = Tmo};
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
@@ -196,6 +207,10 @@ i({listen, LRef, APid, {Mod, Opts, Addrs}}) ->
erlang:monitor(process, APid),
start_timer(#listener{socket = LSock}).
+own(Opts) ->
+ {Own, Rest} = proplists:split(Opts, [fragment_timer]),
+ {lists:append(Own), Rest}.
+
ssl(Opts) ->
{[SslOpts], Rest} = proplists:split(Opts, [ssl_options]),
{ssl_opts(SslOpts), Rest}.
@@ -368,8 +383,6 @@ handle_info(T, #monitor{} = S) ->
%% # code_change/3
%% ---------------------------------------------------------------------------
-code_change(_, {transport, _, _, _, _} = S, _) ->
- {ok, #transport{} = list_to_tuple(tuple_to_list(S) ++ [false])};
code_change(_, State, _) ->
{ok, State}.
@@ -452,6 +465,7 @@ t(T,S) ->
%% Initial incoming message when we might need to upgrade to TLS:
%% don't request another message until we know.
+
transition({tcp, Sock, Bin}, #transport{socket = Sock,
parent = Pid,
frag = Head,
@@ -459,13 +473,13 @@ transition({tcp, Sock, Bin}, #transport{socket = Sock,
ssl = Opts}
= S)
when is_list(Opts) ->
- case recv1(Head, Bin) of
+ case rcv(Head, Bin) of
{Msg, B} when is_binary(Msg) ->
diameter_peer:recv(Pid, Msg),
S#transport{frag = B};
Frag ->
setopts(M, Sock),
- S#transport{frag = Frag}
+ start_fragment_timer(S#transport{frag = Frag})
end;
%% Incoming message.
@@ -476,7 +490,7 @@ transition({P, Sock, Bin}, #transport{socket = Sock,
when P == tcp, not B;
P == ssl, B ->
setopts(M, Sock),
- recv(Bin, S);
+ start_fragment_timer(recv(Bin, S));
%% Capabilties exchange has decided on whether or not to run over TLS.
transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
@@ -487,7 +501,7 @@ transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
= tls_handshake(Type, B, S),
Pid ! {diameter, {tls, Ref}},
setopts(M, Sock),
- NS#transport{ssl = B};
+ start_fragment_timer(NS#transport{ssl = B});
transition({C, Sock}, #transport{socket = Sock,
ssl = B})
@@ -520,8 +534,8 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid,
stop;
%% Timeout for reception of outstanding packets.
-transition({timeout, TRef, flush}, S) ->
- flush(TRef, S);
+transition({timeout, TRef, flush}, #transport{tref = TRef} = S) ->
+ flush(S#transport{tref = false});
%% Request for the local port number.
transition({resolve_port, Pid}, #transport{socket = Sock,
@@ -559,9 +573,7 @@ tls_handshake(Type, true, #transport{socket = Sock,
= S) ->
{ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]),
Ref = getr(?REF_KEY),
- is_reference(Ref) %% started in new code
- andalso
- (true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}})),
+ true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}),
S#transport{socket = SSock,
module = ssl};
@@ -576,30 +588,25 @@ tls(accept, Sock, Opts) ->
%% recv/2
%%
-%% Reassemble fragmented messages and extract multple message sent
+%% Reassemble fragmented messages and extract multiple message sent
%% using Nagle.
recv(Bin, #transport{parent = Pid, frag = Head} = S) ->
- case recv1(Head, Bin) of
+ case rcv(Head, Bin) of
{Msg, B} when is_binary(Msg) ->
diameter_peer:recv(Pid, Msg),
recv(B, S#transport{frag = <<>>});
Frag ->
- S#transport{frag = Frag}
+ S#transport{frag = Frag,
+ flush = false}
end.
-%% recv1/2
+%% rcv/2
%% No previous fragment.
-recv1(<<>>, Bin) ->
+rcv(<<>>, Bin) ->
rcv(Bin);
-recv1({TRef, Head}, Bin) ->
- erlang:cancel_timer(TRef),
- rcv(Head, Bin).
-
-%% rcv/2
-
%% Not even the first four bytes of the header.
rcv(Head, Bin)
when is_binary(Head) ->
@@ -614,22 +621,22 @@ rcv({Len, N, Head, Acc}, Bin) ->
%% Extract a message for which we have all bytes.
rcv(Len, N, Head, Acc)
when Len =< N ->
- rcv1(Len, bin(Head, Acc));
+ recv1(Len, bin(Head, Acc));
%% Wait for more packets.
rcv(Len, N, Head, Acc) ->
- {start_timer(), {Len, N, Head, Acc}}.
+ {Len, N, Head, Acc}.
-%% rcv/2
+%% rcv/1
%% Nothing left.
rcv(<<>> = Bin) ->
Bin;
-%% Well, this isn't good. Chances are things will go south from here
-%% but if we're lucky then the bytes we have extend to an intended
-%% message boundary and we can recover by simply discarding them,
-%% which is the result of receiving them.
+%% The Message Length isn't even sufficient for a header. Chances are
+%% things will go south from here but if we're lucky then the bytes we
+%% have extend to an intended message boundary and we can recover by
+%% simply receiving them. Make it so.
rcv(<<_:1/binary, Len:24, _/binary>> = Bin)
when Len < 20 ->
{Bin, <<>>};
@@ -637,23 +644,23 @@ rcv(<<_:1/binary, Len:24, _/binary>> = Bin)
%% Enough bytes to extract a message.
rcv(<<_:1/binary, Len:24, _/binary>> = Bin)
when Len =< size(Bin) ->
- rcv1(Len, Bin);
+ recv1(Len, Bin);
%% Or not: wait for more packets.
rcv(<<_:1/binary, Len:24, _/binary>> = Head) ->
- {start_timer(), {Len, size(Head), Head, []}};
+ {Len, size(Head), Head, []};
%% Not even 4 bytes yet.
rcv(Head) ->
- {start_timer(), Head}.
+ Head.
-%% rcv1/2
+%% recv1/2
-rcv1(Len, Bin) ->
+recv1(Len, Bin) ->
<<Msg:Len/binary, Rest/binary>> = Bin,
{Msg, Rest}.
-%% bin/[12]
+%% bin/1-2
bin(Head, Acc) ->
list_to_binary([Head | lists:reverse(Acc)]).
@@ -664,7 +671,7 @@ bin(Bin)
when is_binary(Bin) ->
Bin.
-%% start_timer/0
+%% flush/1
%% An erroneously large message length may leave us with a fragment
%% that lingers if the peer doesn't have anything more to send. Start
@@ -677,14 +684,30 @@ bin(Bin)
%% since all messages with length problems are discarded this should
%% also eventually lead to watchdog failover.
-start_timer() ->
- erlang:start_timer(?FRAGMENT_TIMEOUT, self(), flush).
+%% No fragment to flush.
+flush(#transport{frag = <<>>} = S) ->
+ S;
-flush(TRef, #transport{parent = Pid, frag = {TRef, Head}} = S) ->
- diameter_peer:recv(Pid, bin(Head)),
- S#transport{frag = <<>>};
-flush(_, S) ->
- S.
+%% Messages have been received since last timer expiry.
+flush(#transport{flush = false} = S) ->
+ start_fragment_timer(S#transport{flush = true});
+
+%% No messages since last expiry.
+flush(#transport{frag = Frag, parent = Pid} = S) ->
+ diameter_peer:recv(Pid, bin(Frag)),
+ S#transport{frag = <<>>}.
+
+%% start_fragment_timer/1
+%%
+%% Start a timer only if there's none running and a message to flush.
+
+start_fragment_timer(#transport{frag = B, tref = TRef} = S)
+ when B == <<>>;
+ TRef /= false ->
+ S;
+
+start_fragment_timer(#transport{timeout = Tmo} = S) ->
+ S#transport{tref = erlang:start_timer(Tmo, self(), flush)}.
%% accept/2