diff options
Diffstat (limited to 'lib/diameter/src')
-rw-r--r-- | lib/diameter/src/Makefile | 2 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter.appup.src | 11 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter.erl | 13 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_capx.erl | 64 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_codec.erl | 175 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_config.erl | 142 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_lib.erl | 245 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer.erl | 16 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 90 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_reg.erl | 2 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 126 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_stats.erl | 79 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_traffic.erl | 181 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 118 | ||||
-rw-r--r-- | lib/diameter/src/compiler/diameter_codegen.erl | 6 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 79 |
16 files changed, 915 insertions, 434 deletions
diff --git a/lib/diameter/src/Makefile b/lib/diameter/src/Makefile index df10c33268..c0cf418f06 100644 --- a/lib/diameter/src/Makefile +++ b/lib/diameter/src/Makefile @@ -230,7 +230,7 @@ include $(ERL_TOP)/make/otp_release_targets.mk # Can't $(INSTALL_DIR) more than one directory at a time on Solaris. release_spec: opt - for d in bin ebin include src/dict; do \ + for d in bin ebin examples include src/dict; do \ $(INSTALL_DIR) "$(RELSYSDIR)/$$d"; \ done $(INSTALL_SCRIPT) $(BINS:%=../bin/%) "$(RELSYSDIR)/bin" diff --git a/lib/diameter/src/base/diameter.appup.src b/lib/diameter/src/base/diameter.appup.src index 2ce89579ff..359f434941 100644 --- a/lib/diameter/src/base/diameter.appup.src +++ b/lib/diameter/src/base/diameter.appup.src @@ -28,7 +28,13 @@ {"1.2.1", [{restart_application, diameter}]}, {"1.3", [{restart_application, diameter}]}, %% R15B03 {"1.3.1", [{restart_application, diameter}]}, - {"1.4", [{restart_application, diameter}]} %% R16A + {"1.4", [{restart_application, diameter}]}, %% R16A + {"1.4.1", [{load_module, diameter_reg}, %% R16B + {load_module, diameter_stats}, + {load_module, diameter_service}, + {load_module, diameter_watchdog}, + {load_module, diameter_capx}, + {load_module, diameter}]} ], [ {"0.9", [{restart_application, diameter}]}, @@ -39,6 +45,7 @@ {"1.2.1", [{restart_application, diameter}]}, {"1.3", [{restart_application, diameter}]}, {"1.3.1", [{restart_application, diameter}]}, - {"1.4", [{restart_application, diameter}]} + {"1.4", [{restart_application, diameter}]}, + {"1.4.1", [{restart_application, diameter}]} ] }. diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index c67fba5f89..57730cad61 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -45,6 +45,7 @@ -export_type([evaluable/0, restriction/0, + remotes/0, sequence/0, app_alias/0, service_name/0, @@ -292,13 +293,20 @@ call(SvcName, App, Message) -> | [node()] | evaluable(). +-type remotes() + :: boolean() + | [node()] + | evaluable(). + %% Options passed to start_service/2 -type service_opt() :: capability() | {application, [application_opt()]} | {restrict_connections, restriction()} - | {sequence, sequence() | evaluable()}. + | {sequence, sequence() | evaluable()} + | {share_peers, remotes()} + | {use_shared_peers, remotes()}. -type application_opt() :: {alias, app_alias()} @@ -327,7 +335,7 @@ call(SvcName, App, Message) -> -type transport_opt() :: {transport_module, atom()} | {transport_config, any()} - | {transport_config, any(), non_neg_integer() | infinity} + | {transport_config, any(), 'Unsigned32'() | infinity} | {applications, [app_alias()]} | {capabilities, [capability()]} | {capabilities_cb, evaluable()} @@ -336,6 +344,7 @@ call(SvcName, App, Message) -> | {length_errors, exit | handle | discard} | {reconnect_timer, 'Unsigned32'()} | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}} + | {watchdog_config, [{okay|suspect, non_neg_integer()}]} | {private, any()}. %% Predicate passed to remove_transport/2 diff --git a/lib/diameter/src/base/diameter_capx.erl b/lib/diameter/src/base/diameter_capx.erl index 715b15628c..4b821f5139 100644 --- a/lib/diameter/src/base/diameter_capx.erl +++ b/lib/diameter/src/base/diameter_capx.erl @@ -172,7 +172,50 @@ ipaddr(A) -> bCER(#diameter_caps{} = Rec, Dict) -> Values = lists:zip(Dict:'#info-'(diameter_base_CER, fields), tl(tuple_to_list(Rec))), - Dict:'#new-'(diameter_base_CER, Values). + Dict:'#new-'(diameter_base_CER, [{K, map(K, V, Dict)} + || {K,V} <- Values]). + +%% map/3 +%% +%% Deal with differerences in common dictionary AVP's to make changes +%% transparent in service/transport config. In particular, one +%% annoying difference between RFC 3588 and RFC 6733. +%% +%% RFC 6773 changes the definition of Vendor-Specific-Application-Id, +%% giving Vendor-Id arity 1 instead of 3588's 1*. This causes woe +%% since the corresponding dictionaries expect different values for a +%% 'Vendor-Id': a list for 3588, an integer for 6733. + +map('Vendor-Specific-Application-Id', L, Dict) -> + Rec = Dict:'#new-'('diameter_base_Vendor-Specific-Application-Id', []), + Def = Dict:'#get-'('Vendor-Id', Rec), + [vsa(V, Def) || V <- L]; +map(_, V, _) -> + V. + +vsa({_, N, _, _} = Rec, []) + when is_integer(N) -> + setelement(2, Rec, [N]); + +vsa({_, [N], _, _} = Rec, undefined) + when is_integer(N) -> + setelement(2, Rec, N); + +vsa([_|_] = L, Def) -> + [vid(T, Def) || T <- L]; + +vsa(T, _) -> + T. + +vid({'Vendor-Id' = K, N}, []) + when is_integer(N) -> + {K, [N]}; + +vid({'Vendor-Id' = K, [N]}, undefined) -> + {K, N}; + +vid(T, _) -> + T. %% rCER/3 %% @@ -239,9 +282,26 @@ build_CEA(_, LCaps, RCaps, Dict, CEA) -> [] -> Dict:'#set-'({'Result-Code', ?NOSECURITY}, CEA); [_] = IS -> - Dict:'#set-'({'Inband-Security-Id', IS}, CEA) + Dict:'#set-'({'Inband-Security-Id', inband_security(IS)}, CEA) end. +%% Only set Inband-Security-Id if different from the default, since +%% RFC 6733 recommends against the AVP: +%% +%% 6.10. Inband-Security-Id AVP +%% +%% The Inband-Security-Id AVP (AVP Code 299) is of type Unsigned32 and +%% is used in order to advertise support of the security portion of the +%% application. The use of this AVP in CER and CEA messages is NOT +%% RECOMMENDED. Instead, discovery of a Diameter entity's security +%% capabilities can be done either through static configuration or via +%% Diameter Peer Discovery as described in Section 5.2. + +inband_security([?NO_INBAND_SECURITY]) -> + []; +inband_security([_] = IS) -> + IS. + %% common_security/2 common_security(#diameter_caps{inband_security_id = LS}, diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl index e446a0209c..6c0e73de36 100644 --- a/lib/diameter/src/base/diameter_codec.erl +++ b/lib/diameter/src/base/diameter_codec.erl @@ -38,6 +38,10 @@ -define(MASK(N,I), ((I) band (1 bsl (N)))). +-type u32() :: 0..16#FFFFFFFF. +-type u24() :: 0..16#FFFFFF. +-type u1() :: 0..1. + %% 0 1 2 3 %% 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 %% +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ @@ -55,9 +59,13 @@ %% +-+-+-+-+-+-+-+-+-+-+-+-+- %%% --------------------------------------------------------------------------- -%%% # encode/[2-4] +%%% # encode/2 %%% --------------------------------------------------------------------------- +-spec encode(module(), Msg :: term()) + -> #diameter_packet{} + | no_return(). + encode(Mod, #diameter_packet{} = Pkt) -> try e(Mod, Pkt) @@ -217,6 +225,9 @@ rec2msg(Mod, Rec) -> %% Unsuccessfully decoded AVPs will be placed in #diameter_packet.errors. +-spec decode(module(), #diameter_packet{} | bitstring()) + -> #diameter_packet{}. + decode(Mod, Pkt) -> decode(Mod:id(), Mod, Pkt). @@ -225,9 +236,9 @@ decode(Mod, Pkt) -> %% question. decode(?APP_ID_RELAY, _, #diameter_packet{} = Pkt) -> case collect_avps(Pkt) of - {Bs, As} -> + {E, As} -> Pkt#diameter_packet{avps = As, - errors = [Bs]}; + errors = [E]}; As -> Pkt#diameter_packet{avps = As} end; @@ -251,12 +262,12 @@ decode(Id, Mod, Bin) when is_bitstring(Bin) -> decode(Id, Mod, #diameter_packet{header = decode_header(Bin), bin = Bin}). -decode_avps(MsgName, Mod, Pkt, {Bs, Avps}) -> %% invalid avp bits ... +decode_avps(MsgName, Mod, Pkt, {E, Avps}) -> ?LOG(invalid, Pkt#diameter_packet.bin), #diameter_packet{errors = Failed} = P = decode_avps(MsgName, Mod, Pkt, Avps), - P#diameter_packet{errors = [Bs | Failed]}; + P#diameter_packet{errors = [E | Failed]}; decode_avps('', Mod, Pkt, Avps) -> %% unknown message ... ?LOG(unknown, {Mod, Pkt#diameter_packet.header}), @@ -275,6 +286,10 @@ decode_avps(MsgName, Mod, Pkt, Avps) -> %% ... or not %%% # decode_header/1 %%% --------------------------------------------------------------------------- +-spec decode_header(bitstring()) + -> #diameter_header{} + | false. + decode_header(<<Version:8, MsgLength:24, CmdFlags:1/binary, @@ -324,6 +339,13 @@ decode_header(_) -> %% wraparound counter. The 8-bit counter is incremented each time the %% system is restarted. +-spec sequence_numbers(#diameter_packet{} + | #diameter_header{} + | binary() + | Seq) + -> Seq + when Seq :: {HopByHopId :: u32(), EndToEndId :: u32()}. + sequence_numbers({_,_} = T) -> T; @@ -345,6 +367,9 @@ sequence_numbers(<<_:12/binary, H:32, E:32, _/binary>>) -> %%% # hop_by_hop_id/2 %%% --------------------------------------------------------------------------- +-spec hop_by_hop_id(u32(), binary()) + -> binary(). + hop_by_hop_id(Id, <<H:12/binary, _:32, T/binary>>) -> <<H/binary, Id:32, T/binary>>. @@ -352,6 +377,10 @@ hop_by_hop_id(Id, <<H:12/binary, _:32, T/binary>>) -> %%% # msg_name/2 %%% --------------------------------------------------------------------------- +-spec msg_name(module(), #diameter_header{}) + -> atom() + | {ApplicationId :: u32(), CommandCode :: u24(), Rbit :: u1()}. + msg_name(Dict0, #diameter_header{application_id = ?APP_ID_COMMON, cmd_code = C, is_request = R}) -> @@ -367,6 +396,9 @@ msg_name(_, Hdr) -> %%% # msg_id/1 %%% --------------------------------------------------------------------------- +-spec msg_id(#diameter_packet{} | #diameter_header{}) + -> {ApplicationId :: u32(), CommandCode :: u24(), Rbit :: u1()}. + msg_id(#diameter_packet{msg = [#diameter_header{} = Hdr | _]}) -> msg_id(Hdr); @@ -389,6 +421,12 @@ msg_id(<<_:32, Rbit:1, _:7, CmdCode:24, ApplId:32, _/bitstring>>) -> %% order in the binary. Note also that grouped avp's aren't unraveled, %% only those at the top level. +-spec collect_avps(#diameter_packet{} | bitstring()) + -> [Avp] + | {Error, [Avp]} + when Avp :: #diameter_avp{}, + Error :: {5014, #diameter_avp{}}. + collect_avps(#diameter_packet{bin = Bin}) -> <<_:20/binary, Avps/bitstring>> = Bin, collect_avps(Avps); @@ -403,8 +441,8 @@ collect_avps(Bin, N, Acc) -> {Rest, AVP} -> collect_avps(Rest, N+1, [AVP#diameter_avp{index = N} | Acc]) catch - ?FAILURE(_) -> - {Bin, Acc} + ?FAILURE(Error) -> + {Error, Acc} end. %% 0 1 2 3 @@ -422,42 +460,87 @@ collect_avps(Bin, N, Acc) -> %% split_avp/1 split_avp(Bin) -> - 8 =< size(Bin) orelse ?THROW(truncated_header), + {Code, V, M, P, Len, HdrLen} = split_head(Bin), + {Data, B} = split_data(Bin, HdrLen, Len - HdrLen), - <<Code:32, Flags:1/binary, Length:24, Rest/bitstring>> - = Bin, + {B, #diameter_avp{code = Code, + vendor_id = V, + is_mandatory = 1 == M, + need_encryption = 1 == P, + data = Data}}. - DataSize = Length - 8, % size(Code+Flags+Length) = 8 octets - PadSize = (4 - (DataSize rem 4)) rem 4, +%% split_head/1 - DataSize + PadSize =< size(Rest) - orelse ?THROW(truncated_data), +split_head(<<Code:32, 1:1, M:1, P:1, _:5, Len:24, V:32, _/bitstring>>) -> + {Code, V, M, P, Len, 12}; - <<Data:DataSize/binary, _:PadSize/binary, R/bitstring>> - = Rest, - <<Vbit:1, Mbit:1, Pbit:1, _Reserved:5>> - = Flags, +split_head(<<Code:32, 0:1, M:1, P:1, _:5, Len:24, _/bitstring>>) -> + {Code, undefined, M, P, Len, 8}; - 0 == Vbit orelse 4 =< size(Data) - orelse ?THROW(truncated_vendor_id), +split_head(Bin) -> + ?THROW({5014, #diameter_avp{data = Bin}}). + +%% 3588: +%% +%% DIAMETER_INVALID_AVP_LENGTH 5014 +%% The request contained an AVP with an invalid length. A Diameter +%% message indicating this error MUST include the offending AVPs +%% within a Failed-AVP AVP. - {Vid, D} = vid(Vbit, Data), - {R, #diameter_avp{code = Code, - vendor_id = Vid, - is_mandatory = 1 == Mbit, - need_encryption = 1 == Pbit, - data = D}}. +%% 6733: +%% +%% DIAMETER_INVALID_AVP_LENGTH 5014 +%% +%% The request contained an AVP with an invalid length. A Diameter +%% message indicating this error MUST include the offending AVPs +%% within a Failed-AVP AVP. In cases where the erroneous AVP length +%% value exceeds the message length or is less than the minimum AVP +%% ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +%% header length, it is sufficient to include the offending AVP +%% ^^^^^^^^^^^^^ +%% header and a zero filled payload of the minimum required length +%% for the payloads data type. If the AVP is a Grouped AVP, the +%% Grouped AVP header with an empty payload would be sufficient to +%% indicate the offending AVP. In the case where the offending AVP +%% header cannot be fully decoded when the AVP length is less than +%% the minimum AVP header length, it is sufficient to include an +%% offending AVP header that is formulated by padding the incomplete +%% AVP header with zero up to the minimum AVP header length. +%% +%% The underlined clause must be in error since (1) a header less than +%% the minimum value mean we don't know the identity of the AVP and +%% (2) the last sentence covers this case. -%% The RFC is a little misleading when stating that OctetString is -%% padded to a 32-bit boundary while other types align naturally. All -%% other types are already multiples of 32 bits so there's no need to -%% distinguish between types here. Any invalid lengths will result in -%% decode error in diameter_types. +%% split_data/3 -vid(1, <<Vid:32, Data/bitstring>>) -> - {Vid, Data}; -vid(0, Data) -> - {undefined, Data}. +split_data(Bin, HdrLen, Len) + when 0 =< Len -> + split_data(Bin, HdrLen, Len, (4 - (Len rem 4)) rem 4); + +split_data(_, _, _) -> + invalid_avp_length(). + +%% split_data/4 + +split_data(Bin, HdrLen, Len, Pad) -> + <<_:HdrLen/binary, T/bitstring>> = Bin, + case T of + <<Data:Len/binary, _:Pad/binary, Rest/bitstring>> -> + {Data, Rest}; + _ -> + invalid_avp_length() + end. + +%% invalid_avp_length/0 +%% +%% AVP Length doesn't mesh with payload. Induce a decode error by +%% returning a payload that no valid Diameter type can have. This is +%% so that a known AVP will result in 5014 error with a zero'd +%% payload. Here we simply don't know how to construct this payload. +%% (Yes, this solution is an afterthought.) + +invalid_avp_length() -> + {<<0:1>>, <<>>}. %%% --------------------------------------------------------------------------- %%% # pack_avp/1 @@ -472,20 +555,35 @@ vid(0, Data) -> pack_avp(#diameter_avp{data = [#diameter_avp{} | _] = Avps} = A) -> pack_avp(A#diameter_avp{data = encode_avps(Avps)}); -%% ... data as a type/value tuple, possibly with header data, ... +%% ... data as a type/value tuple ... pack_avp(#diameter_avp{data = {Type, Value}} = A) when is_atom(Type) -> pack_avp(A#diameter_avp{data = diameter_types:Type(encode, Value)}); + +%% ... with a header in various forms ... pack_avp(#diameter_avp{data = {{_,_,_} = T, {Type, Value}}}) -> pack_avp(T, iolist_to_binary(diameter_types:Type(encode, Value))); + pack_avp(#diameter_avp{data = {{_,_,_} = T, Bin}}) when is_binary(Bin) -> pack_avp(T, Bin); + pack_avp(#diameter_avp{data = {Dict, Name, Value}} = A) -> {Code, _Flags, Vid} = Hdr = Dict:avp_header(Name), {Name, Type} = Dict:avp_name(Code, Vid), pack_avp(A#diameter_avp{data = {Hdr, {Type, Value}}}); +pack_avp(#diameter_avp{code = undefined, data = Bin}) + when is_binary(Bin) -> + %% Reset the AVP Length of an AVP Header resulting from a 5014 + %% error. The RFC doesn't explicitly say to do this but the + %% receiver can't correctly extract this and following AVP's + %% without a correct length. On the downside, the header doesn't + %% reveal if the received header has been padded. + Pad = 8*header_length(Bin) - bit_size(Bin), + Len = size(<<H:5/binary, _:24, T/binary>> = <<Bin/bitstring, 0:Pad>>), + <<H/binary, Len:24, T/binary>>; + %% ... or as an iolist. pack_avp(#diameter_avp{code = Code, vendor_id = V, @@ -497,6 +595,11 @@ pack_avp(#diameter_avp{code = Code, {P, 2#00100000}]), pack_avp({Code, Flags, V}, iolist_to_binary(Data)). +header_length(<<_:32, 1:1, _/bitstring>>) -> + 12; +header_length(_) -> + 8. + flag_avp({true, B}, F) -> F bor B; flag_avp({false, _}, F) -> diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index 9f73815756..2a145c874b 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -103,6 +103,10 @@ %% Time to lay low before restarting a dead service. -define(RESTART_SLEEP, 2000). +%% Test for a valid timeout. +-define(IS_UINT32(N), + is_integer(N) andalso 0 =< N andalso 0 == N bsr 32). + %% A minimal diameter_caps for checking for valid capabilities values. -define(EXAMPLE_CAPS, #diameter_caps{origin_host = "TheHost", @@ -490,13 +494,11 @@ stop(SvcName) -> %% has many. add(SvcName, Type, Opts) -> - %% Ensure usable capabilities. diameter_service:merge_service/2 - %% depends on this. - lists:foreach(fun(Os) -> - is_list(Os) orelse ?THROW({capabilities, Os}), - ok = encode_CER(Os) - end, - [Os || {capabilities, Os} <- Opts, is_list(Os)]), + %% Ensure acceptable transport options. This won't catch all + %% possible errors (faulty callbacks for example) but it catches + %% many. diameter_service:merge_service/2 depends on usable + %% capabilities for example. + ok = transport_opts(Opts), Ref = make_ref(), T = {Ref, Type, Opts}, @@ -514,6 +516,61 @@ add(SvcName, Type, Opts) -> No end. +transport_opts(Opts) -> + lists:foreach(fun(T) -> opt(T) orelse ?THROW({invalid, T}) end, Opts). + +opt({transport_module, M}) -> + is_atom(M); + +opt({transport_config, _, Tmo}) -> + ?IS_UINT32(Tmo) orelse Tmo == infinity; + +opt({applications, As}) -> + is_list(As); + +opt({capabilities, Os}) -> + is_list(Os) andalso ok == encode_CER(Os); + +opt({capx_timeout, Tmo}) -> + ?IS_UINT32(Tmo); + +opt({length_errors, T}) -> + lists:member(T, [exit, handle, discard]); + +opt({reconnect_timer, Tmo}) -> + ?IS_UINT32(Tmo); + +opt({watchdog_timer, {M,F,A}}) + when is_atom(M), is_atom(F), is_list(A) -> + true; +opt({watchdog_timer, Tmo}) -> + ?IS_UINT32(Tmo); + +opt({watchdog_config, L}) -> + is_list(L) andalso lists:all(fun wdopt/1, L); + +%% Options that we can't validate. +opt({K, _}) + when K == transport_config; + K == capabilities_cb; + K == disconnect_cb; + K == private -> + true; + +%% Anything else, which is ignored by us. This makes options sensitive +%% to spelling mistakes but arbitrary options are passed by some users +%% as a way to identify transports. (That is, can't just do away with +%% it.) +opt(_) -> + true. + +wdopt({K,N}) -> + (K == okay orelse K == suspect) andalso is_integer(N) andalso 0 =< N; +wdopt(_) -> + false. + +%% start_transport/2 + start_transport(SvcName, T) -> case diameter_service:start_transport(SvcName, T) of {ok, _Pid} -> @@ -557,54 +614,69 @@ stop_transport(SvcName, Refs) -> %% make_config/2 make_config(SvcName, Opts) -> - Apps = init_apps(Opts), + AppOpts = [T || {application, _} = T <- Opts], + Apps = init_apps(AppOpts), + [] == Apps andalso ?THROW(no_apps), %% Use the fact that diameter_caps has the same field names as CER. Fields = ?BASE:'#info-'(diameter_base_CER) -- ['AVP'], - COpts = [T || {K,_} = T <- Opts, lists:member(K, Fields)], - Caps = make_caps(#diameter_caps{}, COpts), + CapOpts = [T || {K,_} = T <- Opts, lists:member(K, Fields)], + Caps = make_caps(#diameter_caps{}, CapOpts), - ok = encode_CER(COpts), + ok = encode_CER(CapOpts), - Os = split(Opts, fun opt/2, [{false, share_peers}, - {false, use_shared_peers}, - {false, monitor}, - {?NOMASK, sequence}, - {nodes, restrict_connections}]), - %% share_peers and use_shared_peers are currently undocumented. + SvcOpts = make_opts((Opts -- AppOpts) -- CapOpts, + [{false, share_peers}, + {false, use_shared_peers}, + {false, monitor}, + {?NOMASK, sequence}, + {nodes, restrict_connections}]), #service{name = SvcName, rec = #diameter_service{applications = Apps, capabilities = Caps}, - options = Os}. + options = SvcOpts}. + +make_opts(Opts, Defs) -> + Known = [{K, get_opt(K, Opts, D)} || {D,K} <- Defs], + Unknown = Opts -- Known, -split(Opts, F, Defs) -> - [{K, F(K, get_opt(K, Opts, D))} || {D,K} <- Defs]. + [] == Unknown orelse ?THROW({invalid, hd(Unknown)}), + + [{K, opt(K,V)} || {K,V} <- Known]. opt(K, false = B) when K /= sequence -> B; opt(K, true = B) - when K == share_peer; + when K == share_peers; K == use_shared_peers -> B; -opt(monitor, P) - when is_pid(P) -> - P; - opt(restrict_connections, T) when T == node; - T == nodes; - T == []; - is_atom(hd(T)) -> + T == nodes -> + T; + +opt(K, T) + when (K == share_peers + orelse K == use_shared_peers + orelse K == restrict_connections), ([] == T + orelse is_atom(hd(T))) -> T; -opt(restrict_connections = K, F) -> - try diameter_lib:eval(F) of %% no guarantee that it won't fail later +opt(monitor, P) + when is_pid(P) -> + P; + +opt(K, F) + when K == restrict_connections; + K == share_peers; + K == use_shared_peers -> + try diameter_lib:eval(F) of %% but no guarantee that it won't fail later Nodes when is_list(Nodes) -> F; V -> @@ -629,7 +701,7 @@ opt(K, _) -> ?THROW({value, K}). sequence({H,N} = T) - when 0 =< N, N =< 32, 0 =< H, 0 == H bsr N -> + when 0 =< N, N =< 32, 0 =< H, 0 == H bsr (32-N) -> T; sequence(_) -> @@ -664,8 +736,8 @@ encode_CER(Opts) -> init_apps(Opts) -> lists:foldl(fun app_acc/2, [], lists:reverse(Opts)). -app_acc({application, Opts}, Acc) -> - is_list(Opts) orelse ?THROW({application, Opts}), +app_acc({application, Opts} = T, Acc) -> + is_list(Opts) orelse ?THROW(T), [Dict, Mod] = get_opt([dictionary, module], Opts), Alias = get_opt(alias, Opts, Dict), @@ -681,9 +753,7 @@ app_acc({application, Opts}, Acc) -> mutable = M, options = [{answer_errors, A}, {request_errors, P}]} - | Acc]; -app_acc(_, Acc) -> - Acc. + | Acc]. init_mod(#diameter_callback{} = R) -> init_mod([diameter_callback, R]); diff --git a/lib/diameter/src/base/diameter_lib.erl b/lib/diameter/src/base/diameter_lib.erl index 362d593b24..44d81e2778 100644 --- a/lib/diameter/src/base/diameter_lib.erl +++ b/lib/diameter/src/base/diameter_lib.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2013. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -19,77 +19,86 @@ -module(diameter_lib). --export([report/2, info_report/2, +-export([info_report/2, error_report/2, warning_report/2, now_diff/1, time/1, eval/1, - ip4address/1, - ip6address/1, ipaddr/1, spawn_opts/2, wait/1, fold_tuple/3, log/4]). --include("diameter_internal.hrl"). - %% --------------------------------------------------------------------------- -%% # info_report(Reason, MFA) -%% -%% Input: Reason = Arbitrary term indicating the reason for the report. -%% MFA = {Module, Function, Args} to report. -%% -%% Output: true +%% # info_report/2 %% --------------------------------------------------------------------------- -report(Reason, MFA) -> - info_report(Reason, MFA). +-spec info_report(Reason :: term(), T :: term()) + -> true. -info_report(Reason, MFA) -> - report(fun error_logger:info_report/1, Reason, MFA), +info_report(Reason, T) -> + report(fun error_logger:info_report/1, Reason, T), true. -%%% --------------------------------------------------------------------------- -%%% # error_report(Reason, MFA) -%%% # warning_report(Reason, MFA) -%%% -%%% Output: false -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # error_report/2 +%% # warning_report/2 +%% --------------------------------------------------------------------------- + +-spec error_report(Reason :: term(), T :: term()) + -> false. + +error_report(Reason, T) -> + report(fun error_logger:error_report/1, Reason, T). -error_report(Reason, MFA) -> - report(fun error_logger:error_report/1, Reason, MFA). +-spec warning_report(Reason :: term(), T :: term()) + -> false. -warning_report(Reason, MFA) -> - report(fun error_logger:warning_report/1, Reason, MFA). +warning_report(Reason, T) -> + report(fun error_logger:warning_report/1, Reason, T). -report(Fun, Reason, MFA) -> - Fun([{why, Reason}, {who, self()}, {what, MFA}]), +report(Fun, Reason, T) -> + Fun([{why, Reason}, {who, self()}, {what, T}]), false. -%%% --------------------------------------------------------------------------- -%%% # now_diff(Time) -%%% -%%% Description: Return timer:now_diff(now(), Time) as an {H, M, S, MicroS} -%%% tuple instead of as integer microseconds. -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # now_diff/1 +%% --------------------------------------------------------------------------- + +-spec now_diff(NowT) + -> {Hours, Mins, Secs, MicroSecs} + when NowT :: {non_neg_integer(), 0..999999, 0..999999}, + Hours :: non_neg_integer(), + Mins :: 0..59, + Secs :: 0..59, + MicroSecs :: 0..999999. + +%% Return timer:now_diff(now(), NowT) as an {H, M, S, MicroS} tuple +%% instead of as integer microseconds. now_diff({_,_,_} = Time) -> - time(timer:now_diff(erlang:now(), Time)). - -%%% --------------------------------------------------------------------------- -%%% # time(Time) -%%% -%%% Input: Time = {MegaSec, Sec, MicroSec} -%%% | MicroSec -%%% -%%% Output: {H, M, S, MicroS} -%%% --------------------------------------------------------------------------- - -time({_,_,_} = Time) -> %% time of day + time(timer:now_diff(now(), Time)). + +%% --------------------------------------------------------------------------- +%% # time/1 +%% +%% Return an elapsed time as an {H, M, S, MicroS} tuple. +%% --------------------------------------------------------------------------- + +-spec time(NowT | Diff) + -> {Hours, Mins, Secs, MicroSecs} + when NowT :: {non_neg_integer(), 0..999999, 0..999999}, + Diff :: non_neg_integer(), + Hours :: non_neg_integer(), + Mins :: 0..59, + Secs :: 0..59, + MicroSecs :: 0..999999. + +time({_,_,_} = NowT) -> %% time of day %% 24 hours = 24*60*60*1000000 = 86400000000 microsec - time(timer:now_diff(Time, {0,0,0}) rem 86400000000); + time(timer:now_diff(NowT, {0,0,0}) rem 86400000000); time(Micro) -> %% elapsed time Seconds = Micro div 1000000, @@ -98,9 +107,21 @@ time(Micro) -> %% elapsed time S = Seconds rem 60, {H, M, S, Micro rem 1000000}. -%%% --------------------------------------------------------------------------- -%%% # eval(Func) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # eval/1 +%% +%% Evaluate a function in various forms. +%% --------------------------------------------------------------------------- + +-type f() :: {module(), atom(), list()} + | nonempty_maybe_improper_list(fun(), list()) + | fun(). + +-spec eval(Fun) + -> term() + when Fun :: f() + | {f()} + | nonempty_maybe_improper_list(f(), list()). eval({M,F,A}) -> apply(M,F,A); @@ -120,66 +141,15 @@ eval({F}) -> eval(F) -> F(). -%%% --------------------------------------------------------------------------- -%%% # ip4address(Addr) -%%% -%%% Input: string() (eg. "10.0.0.1") -%%% | list of integer() -%%% | tuple of integer() -%%% -%%% Output: {_,_,_,_} of integer -%%% -%%% Exceptions: error: {invalid_address, Addr, erlang:get_stacktrace()} -%%% --------------------------------------------------------------------------- - -ip4address([_,_,_,_] = Addr) -> %% Length 4 string can't be an address. - ipaddr(list_to_tuple(Addr)); - -%% Be brutal. -ip4address(Addr) -> - try - {_,_,_,_} = ipaddr(Addr) - catch - error: _ -> - erlang:error({invalid_address, Addr, ?STACK}) - end. - -%%% --------------------------------------------------------------------------- -%%% # ip6address(Addr) -%%% -%%% Input: string() (eg. "1080::8:800:200C:417A") -%%% | list of integer() -%%% | tuple of integer() -%%% -%%% Output: {_,_,_,_,_,_,_,_} of integer -%%% -%%% Exceptions: error: {invalid_address, Addr, erlang:get_stacktrace()} -%%% --------------------------------------------------------------------------- - -ip6address([_,_,_,_,_,_,_,_] = Addr) -> %% Length 8 string can't be an address. - ipaddr(list_to_tuple(Addr)); - -%% Be brutal. -ip6address(Addr) -> - try - {_,_,_,_,_,_,_,_} = ipaddr(Addr) - catch - error: _ -> - erlang:error({invalid_address, Addr, ?STACK}) - end. - -%%% --------------------------------------------------------------------------- -%%% # ipaddr(Addr) -%%% -%%% Input: string() | tuple of integer() -%%% -%%% Output: {_,_,_,_} | {_,_,_,_,_,_,_,_} -%%% -%%% Exceptions: error: {invalid_address, erlang:get_stacktrace()} -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # ipaddr/1 +%% +%% Parse an IP address. +%% --------------------------------------------------------------------------- --spec ipaddr(string() | tuple()) - -> inet:ip_address(). +-spec ipaddr([byte()] | tuple()) + -> inet:ip_address() + | none(). %% Don't convert lists of integers since a length 8 list like %% [$1,$0,$.,$0,$.,$0,$.,$1] is ambiguous: is it "10.0.0.1" or @@ -193,7 +163,7 @@ ipaddr(Addr) -> ip(Addr) catch error: _ -> - erlang:error({invalid_address, ?STACK}) + erlang:error({invalid_address, erlang:get_stacktrace()}) end. %% Already a tuple: ensure non-negative integers of the right size. @@ -210,11 +180,12 @@ ip(Addr) -> {ok, A} = inet_parse:address(Addr), %% documented in inet(3) A. -%%% --------------------------------------------------------------------------- -%%% # spawn_opts(Type, Opts) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # spawn_opts/2 +%% --------------------------------------------------------------------------- -%% TODO: config variables. +-spec spawn_opts(server|worker, list()) + -> list(). spawn_opts(server, Opts) -> opts(75000, Opts); @@ -224,24 +195,32 @@ spawn_opts(worker, Opts) -> opts(HeapSize, Opts) -> [{min_heap_size, HeapSize} | lists:keydelete(min_heap_size, 1, Opts)]. -%%% --------------------------------------------------------------------------- -%%% # wait(MRefs) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # wait/1 +%% --------------------------------------------------------------------------- + +-spec wait([pid()]) + -> ok. wait(L) -> - w([erlang:monitor(process, P) || P <- L]). + down([erlang:monitor(process, P) || P <- L]). -w([]) -> +down([]) -> ok; -w(L) -> - receive - {'DOWN', MRef, process, _, _} -> - w(lists:delete(MRef, L)) - end. +down([MRef|T]) -> + receive {'DOWN', MRef, process, _, _} -> ok end, + down(T). -%%% --------------------------------------------------------------------------- -%%% # fold_tuple(N, T0, T) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # fold_tuple/3 +%% --------------------------------------------------------------------------- + +-spec fold_tuple(N, T0, T) + -> tuple() + when N :: pos_integer(), + T0 :: tuple(), + T :: tuple() + | undefined. %% Replace fields in T0 by those of T starting at index N, unless the %% new value is 'undefined'. @@ -262,11 +241,11 @@ ft(undefined, {_, T}) -> ft(Value, {Idx, T}) -> setelement(Idx, T, Value). -%%% ---------------------------------------------------------- -%%% # log(Slogan, Mod, Line, Details) -%%% -%%% Called to have something to trace on for happenings of interest. -%%% ---------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # log/4 +%% +%% Called to have something to trace on for happenings of interest. +%% --------------------------------------------------------------------------- -log(_, _, _, _) -> +log(_Slogan, _Mod, _Line, _Details) -> ok. diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index 130bedda84..0d2efd4d1f 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -24,14 +24,15 @@ %% Interface towards transport modules ... -export([recv/2, up/1, - up/2]). + up/2, + up/3]). %% ... and the stack. -export([start/1, send/2, close/1, abort/1, - notify/2]). + notify/3]). %% Server start. -export([start_link/0]). @@ -63,11 +64,11 @@ -define(DEFAULT_TTMO, infinity). %%% --------------------------------------------------------------------------- -%%% # notify/2 +%%% # notify/3 %%% --------------------------------------------------------------------------- -notify(SvcName, T) -> - rpc:abcast(nodes(), ?SERVER, {notify, SvcName, T}). +notify(Nodes, SvcName, T) -> + rpc:abcast(Nodes, ?SERVER, {notify, SvcName, T}). %%% --------------------------------------------------------------------------- %%% # start/1 @@ -180,7 +181,7 @@ start(Mod, Args) -> apply(Mod, start, Args). %%% --------------------------------------------------------------------------- -%%% # up/[12] +%%% # up/1-3 %%% --------------------------------------------------------------------------- up(Pid) -> %% accepting transport @@ -189,6 +190,9 @@ up(Pid) -> %% accepting transport up(Pid, Remote) -> %% connecting transport ifc_send(Pid, {self(), connected, Remote}). +up(Pid, Remote, LAddrs) -> %% connecting transport + ifc_send(Pid, {self(), connected, Remote, LAddrs}). + %%% --------------------------------------------------------------------------- %%% # recv/2 %%% --------------------------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 66342f7b62..4e55864168 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -198,6 +198,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {Mask, OnLengthErr = proplists:get_value(length_errors, Opts, exit), lists:member(OnLengthErr, [exit, handle, discard]) orelse ?ERROR({invalid, {length_errors, OnLengthErr}}), + %% Error checking is for configuration added in old code. {TPid, Addrs} = start_transport(T, Rest, Svc), @@ -212,9 +213,6 @@ i({Ack, WPid, {M, Ref} = T, Opts, {Mask, %% transports on the same service can use different local addresses. %% The local addresses are put into Host-IP-Address avps here when %% sending capabilities exchange messages. -%% -%% Invalid transport config may cause us to crash but note that the -%% watchdog start (start/2) succeeds regardless. %% Wait for the caller to have a monitor to avoid a race with our %% death. (Since the exit reason is used in diameter_service.) @@ -235,20 +233,21 @@ start_transport(Addrs0, T) -> {TPid, Addrs, Tmo, Data} -> erlang:monitor(process, TPid), q_next(TPid, Addrs0, Tmo, Data), - {TPid, addrs(Addrs, Addrs0)}; + {TPid, Addrs}; No -> exit({shutdown, No}) end. -addrs([], Addrs0) -> - Addrs0; -addrs(Addrs, _) -> - Addrs. - -svc(Svc, []) -> - Svc; -svc(Svc, Addrs) -> - readdr(Svc, Addrs). +svc(#diameter_service{capabilities = LCaps0} = Svc, Addrs) -> + #diameter_caps{host_ip_address = Addrs0} + = LCaps0, + case Addrs0 of + [] -> + LCaps = LCaps0#diameter_caps{host_ip_address = Addrs}, + Svc#diameter_service{capabilities = LCaps}; + [_|_] -> + Svc + end. readdr(#diameter_service{capabilities = LCaps0} = Svc, Addrs) -> LCaps = LCaps0#diameter_caps{host_ip_address = Addrs}, @@ -353,10 +352,17 @@ transition({diameter, {TPid, connected, Remote}}, mode = M} = S) -> {'Wait-Conn-Ack', _} = PS, %% assert - connect = M, %% + connect = M, %% keep_transport(TPid), send_CER(S#state{mode = {M, Remote}}); +transition({diameter, {TPid, connected, Remote, LAddrs}}, + #state{transport = TPid, + service = Svc} + = S) -> + transition({diameter, {TPid, connected, Remote}}, + S#state{service = svc(Svc, LAddrs)}); + %% Connection from peer. transition({diameter, {TPid, connected}}, #state{transport = TPid, @@ -365,7 +371,7 @@ transition({diameter, {TPid, connected}}, parent = Pid} = S) -> {'Wait-Conn-Ack', Tmo} = PS, %% assert - accept = M, %% + accept = M, %% keep_transport(TPid), Pid ! {accepted, self()}, start_timer(Tmo, S#state{state = recv_CER}); @@ -378,6 +384,8 @@ transition({diameter, {_, connected}}, _) -> {stop, connection_timeout}; transition({diameter, {_, connected, _}}, _) -> {stop, connection_timeout}; +transition({diameter, {_, connected, _, _}}, _) -> + {stop, connection_timeout}; %% Connection has timed out: start an alternate. transition({connection_timeout = T, TPid}, @@ -695,13 +703,13 @@ build_answer('CER', N -> {cea(CEA, N, Dict0), [fun open/5, Pkt, SupportedApps, Caps, - {accept, hd([_] = IS)}]} + {accept, inband_security(IS)}]} catch ?FAILURE(Reason) -> rejected(Reason, {'CER', Reason, Caps, Pkt}, S) end; -%% The error checks below are similar to those in diameter_service for +%% The error checks below are similar to those in diameter_traffic for %% other messages. Should factor out the commonality. build_answer(Type, @@ -712,6 +720,11 @@ build_answer(Type, RC = rc(H, Es), {answer(Type, RC, Es, S), post(Type, RC, Pkt, S)}. +inband_security([]) -> + ?NO_INBAND_SECURITY; +inband_security([IS]) -> + IS. + cea(CEA, ok, _) -> CEA; cea(CEA, 2001, _) -> @@ -735,7 +748,14 @@ rejected(N, T, S) -> rejected({N, []}, T, S). answer(Type, RC, Es, S) -> - set(answer(Type, RC, S), failed_avp([A || {_,A} <- Es])). + set(answer(Type, RC, S), failed_avp(RC, Es)). + +failed_avp(RC, [{RC, Avp} | _]) -> + [{'Failed-AVP', [{'AVP', [Avp]}]}]; +failed_avp(RC, [_ | Es]) -> + failed_avp(RC, Es); +failed_avp(_, [] = No) -> + No. answer(Type, RC, S) -> answer_message(answer(Type, S), RC). @@ -755,13 +775,6 @@ is_origin({N, _}) -> orelse N == 'Origin-Realm' orelse N == 'Origin-State-Id'. -%% failed_avp/1 - -failed_avp([] = No) -> - No; -failed_avp(Avps) -> - [{'Failed-AVP', [[{'AVP', Avps}]]}]. - %% set/2 set(Ans, []) -> @@ -777,7 +790,7 @@ rc(#diameter_header{is_error = true}, _) -> 3008; %% DIAMETER_INVALID_HDR_BITS rc(_, [Bs|_]) - when is_bitstring(Bs) -> + when is_bitstring(Bs) -> %% from old code 3009; %% DIAMETER_INVALID_HDR_BITS rc(#diameter_header{version = ?DIAMETER_VERSION}, Es) -> @@ -846,8 +859,12 @@ a('DPR', #diameter_caps{origin_host = {Host, _}, %% recv_CER/2 recv_CER(CER, #state{service = Svc, dictionary = Dict}) -> - {ok, T} = diameter_capx:recv_CER(CER, Svc, Dict), - T. + case diameter_capx:recv_CER(CER, Svc, Dict) of + {ok, T} -> + T; + {error, Reason} -> + close({'CER', CER, Svc, Dict, Reason}) + end. %% handle_CEA/1 @@ -907,8 +924,12 @@ recv_CEA(#diameter_packet{header = #diameter_header{version errors = []}, #state{service = Svc, dictionary = Dict}) -> - {ok, T} = diameter_capx:recv_CEA(CEA, Svc, Dict), - T; + case diameter_capx:recv_CEA(CEA, Svc, Dict) of + {ok, T} -> + T; + {error, Reason} -> + close({'CEA', CEA, Svc, Dict, Reason}) + end; recv_CEA(Pkt, S) -> close({'CEA', caps(S), Pkt}). @@ -987,8 +1008,17 @@ capz(#diameter_caps{} = L, #diameter_caps{} = R) -> %% close/1 close(Reason) -> + report(Reason), throw({?MODULE, close, Reason}). +%% Could possibly log more here. +report({M, _, _, _, _} = T) + when M == 'CER'; + M == 'CEA' -> + diameter_lib:error_report(failure, T); +report(_) -> + ok. + %% dwa/1 dwa(#diameter_caps{origin_host = OH, diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index ac58e4bf5b..3197c1aee1 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -138,7 +138,7 @@ del(T) -> %% associations removed.) %% =========================================================================== --spec match(tuple()) +-spec match(any()) -> [{term(), pid()}]. match(Pat) -> diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index f1342df16c..112e83476d 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -125,9 +125,9 @@ monitor = false :: false | pid(), %% process to die with options :: [{sequence, diameter:sequence()} %% sequence mask - | {restrict_connections, diameter:restriction()} - | {share_peers, boolean()} %% broadcast peers to remote nodes? - | {use_shared_peers, boolean()}]}).%% use broadcasted peers? + | {share_peers, diameter:remotes()} %% broadcast to + | {use_shared_peers, diameter:remotes()} %% use from + | {restrict_connections, diameter:restriction()}]}). %% shared_peers reflects the peers broadcast from remote nodes. %% Record representing an RFC 3539 watchdog process implemented by @@ -681,11 +681,9 @@ mref(false = No) -> mref(P) -> erlang:monitor(process, P). -init_shared(#state{options = [_, _, {_, true} | _], +init_shared(#state{options = [_, _, {_,T} | _], service_name = Svc}) -> - diameter_peer:notify(Svc, {service, self()}); -init_shared(#state{options = [_, _, {_, false} | _]}) -> - ok. + notify(T, Svc, {service, self()}). init_mod(#diameter_app{alias = Alias, init_state = S}) -> @@ -698,6 +696,37 @@ get_value(Key, Vs) -> {_, V} = lists:keyfind(Key, 1, Vs), V. +notify(Share, SvcName, T) -> + Nodes = remotes(Share), + [] /= Nodes andalso diameter_peer:notify(Nodes, SvcName, T). +%% Test for the empty list for upgrade reasons: there's no +%% diameter_peer:notify/3 in old code so no call means no load order +%% requirement. + +remotes(false) -> + []; + +remotes(true) -> + nodes(); + +remotes(Nodes) + when is_atom(hd(Nodes)); + Nodes == [] -> + Nodes; + +remotes(F) -> + try diameter_lib:eval(F) of + L when is_list(L) -> + L; + T -> + diameter_lib:error_report({invalid_return, T}, F), + [] + catch + E:R -> + diameter_lib:error_report({failure, {E, R, ?STACK}}, F), + [] + end. + %% --------------------------------------------------------------------------- %% # start/3 %% --------------------------------------------------------------------------- @@ -832,17 +861,21 @@ watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) -> %% Watchdog has an unresponsive connection. watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) -> #watchdog{peer = TPid} = Wd, %% assert - connection_down(Wd, To, State); + watchdog_down(Wd, To, State); %% Watchdog has lost its connection. watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) -> close(Wd, S), - connection_down(Wd, To, S), + watchdog_down(Wd, To, S), ets:delete(PeerT, TPid); watchdog(_, [], _, _, _, _) -> ok. +watchdog_down(Wd, To, #state{watchdogT = WatchdogT} = S) -> + insert(WatchdogT, Wd#watchdog{state = To}), + connection_down(Wd, To, S). + %% --------------------------------------------------------------------------- %% # connection_up/3 %% --------------------------------------------------------------------------- @@ -1000,21 +1033,17 @@ connection_down(#watchdog{state = ?WD_OKAY, remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), diameter_traffic:peer_down(TPid); -connection_down(#watchdog{}, #peer{}, _) -> - ok; - -connection_down(#watchdog{state = WS, +connection_down(#watchdog{state = ?WD_OKAY, peer = TPid} = Wd, To, - #state{watchdogT = WatchdogT, - peerT = PeerT} + #state{peerT = PeerT} = S) when is_atom(To) -> - insert(WatchdogT, Wd#watchdog{state = To}), - ?WD_OKAY == WS - andalso - connection_down(Wd, fetch(PeerT, TPid), S). + connection_down(Wd, #peer{} = fetch(PeerT, TPid), S); + +connection_down(#watchdog{}, _, _) -> + ok. remove_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). @@ -1233,12 +1262,12 @@ report_status(Status, peer = TPid, type = Type, options = Opts}, - #peer{apps = [_|_] = As, + #peer{apps = [_|_] = Apps, caps = Caps}, #state{service_name = SvcName} = S, Extra) -> - share_peer(Status, Caps, As, TPid, S), + share_peer(Status, Caps, Apps, TPid, S), Info = [Status, Ref, {TPid, Caps}, {type(Type), Opts} | Extra], send_event(SvcName, list_to_tuple(Info)). @@ -1255,9 +1284,9 @@ send_event(#diameter_event{service = SvcName} = E) -> %% # share_peer/5 %% --------------------------------------------------------------------------- -share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _], - service_name = Svc}) -> - diameter_peer:notify(Svc, {peer, TPid, Aliases, Caps}); +share_peer(up, Caps, Apps, TPid, #state{options = [_, {_,T} | _], + service_name = Svc}) -> + notify(T, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps}); share_peer(_, _, _, _, _) -> ok. @@ -1266,34 +1295,34 @@ share_peer(_, _, _, _, _) -> %% # share_peers/2 %% --------------------------------------------------------------------------- -share_peers(Pid, #state{options = [_, {_, true} | _], - local_peers = PDict}) -> - ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict); - -share_peers(_, _) -> - ok. +share_peers(Pid, #state{options = [_, {_,T} | _], local_peers = PDict}) -> + is_remote(Pid, T) + andalso ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict). sp(Pid, Alias, Peers) -> lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers). +is_remote(Pid, T) -> + Node = node(Pid), + Node /= node() andalso lists:member(Node, remotes(T)). + %% --------------------------------------------------------------------------- %% # remote_peer_up/4 %% --------------------------------------------------------------------------- -remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _], - service = Svc, - shared_peers = PDict}) -> +remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) -> + is_remote(Pid, T) + andalso rpu(Pid, Aliases, Caps, S). + +rpu(Pid, Aliases, Caps, #state{service = Svc, shared_peers = PDict}) -> #diameter_service{applications = Apps} = Svc, Key = #diameter_app.alias, - As = lists:filter(fun(A) -> lists:keymember(A, Key, Apps) end, Aliases), - rpu(Pid, Caps, PDict, As); - -remote_peer_up(_, _, _, #state{options = [_, _, {_, false} | _]}) -> - ok. + F = fun(A) -> lists:keymember(A, Key, Apps) end, + rpu(Pid, lists:filter(F, Aliases), Caps, PDict); -rpu(_, _, PDict, []) -> - PDict; -rpu(Pid, Caps, PDict, Aliases) -> +rpu(_, [] = No, _, _) -> + No; +rpu(Pid, Aliases, Caps, PDict) -> erlang:monitor(process, Pid), T = {Pid, Caps}, lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases). @@ -1302,8 +1331,7 @@ rpu(Pid, Caps, PDict, Aliases) -> %% # remote_peer_down/2 %% --------------------------------------------------------------------------- -remote_peer_down(Pid, #state{options = [_, _, {_, true} | _], - shared_peers = PDict}) -> +remote_peer_down(Pid, #state{shared_peers = PDict}) -> lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)). rpd(Pid, Alias, PDict) -> @@ -1626,16 +1654,10 @@ info_stats(#state{watchdogT = WatchdogT}) -> info_transport(S) -> PeerD = peer_dict(S, config_dict(S)), - RefsD = dict:map(fun(_, Ls) -> [P || L <- Ls, {peer, {P,_}} <- L] end, - PeerD), - Refs = lists:append(dict:fold(fun(R, Ps, A) -> [[R|Ps] | A] end, - [], - RefsD)), - Stats = diameter_stats:read(Refs), + Stats = diameter_stats:sum(dict:fetch_keys(PeerD)), dict:fold(fun(R, Ls, A) -> - Ps = dict:fetch(R, RefsD), - [[{ref, R} | transport(Ls)] ++ [stats([R|Ps], Stats)] - | A] + Cs = proplists:get_value(R, Stats, []), + [[{ref, R} | transport(Ls)] ++ [{statistics, Cs}] | A] end, [], PeerD). diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl index 8fd5ded300..b68d4af11f 100644 --- a/lib/diameter/src/base/diameter_stats.erl +++ b/lib/diameter/src/base/diameter_stats.erl @@ -28,6 +28,7 @@ -export([reg/2, reg/1, incr/3, incr/1, read/1, + sum/1, flush/1]). %% supervisor callback @@ -77,10 +78,14 @@ reg(Pid, Ref) when is_pid(Pid) -> - call({reg, Pid, Ref}). + try + call({reg, Pid, Ref}) + catch + exit: _ -> false + end. -spec reg(ref()) - -> true. + -> boolean(). reg(Ref) -> reg(self(), Ref). @@ -111,11 +116,19 @@ incr(Ctr) -> %% Retrieve counters for the specified contributors. %% --------------------------------------------------------------------------- +%% Read in the server process to ensure that counters for a dying +%% contributor aren't folded concurrently with select. + -spec read([ref()]) -> [{ref(), [{counter(), integer()}]}]. -read(Refs) -> - read(Refs, false). +read(Refs) + when is_list(Refs) -> + try call({read, Refs, false}) of + L -> to_refdict(L) + catch + exit: _ -> [] + end. read(Refs, B) -> MatchSpec = [{{{'_', '$1'}, '_'}, @@ -124,11 +137,52 @@ read(Refs, B) -> ['$_']}], L = ets:select(?TABLE, MatchSpec), B andalso delete(L), + L. + +to_refdict(L) -> lists:foldl(fun({{C,R}, N}, D) -> orddict:append(R, {C,N}, D) end, orddict:new(), L). %% --------------------------------------------------------------------------- +%% # sum(Refs) +%% +%% Retrieve counters summed over all contributors for each term. +%% --------------------------------------------------------------------------- + +-spec sum([ref()]) + -> [{ref(), [{counter(), integer()}]}]. + +sum(Refs) + when is_list(Refs) -> + try call({read, Refs}) of + L -> [{R, to_ctrdict(Cs)} || {R, [_|_] = Cs} <- L] + catch + exit: _ -> [] + end. + +read_refs(Refs) -> + [{R, readr(R)} || R <- Refs]. + +readr(Ref) -> + MatchSpec = [{{{'_', '$1'}, '_'}, + [?ORCOND([{'=:=', '$1', {const, R}} + || R <- [Ref | pids(Ref)]])], + ['$_']}], + ets:select(?TABLE, MatchSpec). + +pids(Ref) -> + MatchSpec = [{{'$1', '$2'}, + [{'=:=', '$2', {const, Ref}}], + ['$1']}], + ets:select(?TABLE, MatchSpec). + +to_ctrdict(L) -> + lists:foldl(fun({{C,_}, N}, D) -> orddict:update_counter(C, N, D) end, + orddict:new(), + L). + +%% --------------------------------------------------------------------------- %% # flush(Refs) %% %% Retrieve and delete statistics for the specified contributors. @@ -138,11 +192,10 @@ read(Refs, B) -> -> [{ref(), {counter(), integer()}}]. flush(Refs) -> - try - call({flush, Refs}) + try call({read, Refs, true}) of + L -> to_refdict(L) catch - exit: _ -> - [] + exit: _ -> [] end. %% =========================================================================== @@ -186,8 +239,14 @@ handle_call({reg, Pid, Ref}, _From, State) -> B andalso erlang:monitor(process, Pid), {reply, B, State}; -handle_call({flush, Refs}, _From, State) -> - {reply, read(Refs, true), State}; +handle_call({read, Refs, Del}, _From, State) -> + {reply, read(Refs, Del), State}; + +handle_call({read, Refs}, _, State) -> + {reply, read_refs(Refs), State}; + +handle_call({flush, Refs}, _From, State) -> %% from old code + {reply, to_refdict(read(Refs, true)), State}; handle_call(Req, From, State) -> ?UNEXPECTED([Req, From]), diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index f527f7c754..0b15e68ec7 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -226,10 +226,10 @@ recv_R(false = No, _, _, _, _) -> %% transport has gone down collect_avps(Pkt) -> case diameter_codec:collect_avps(Pkt) of - {_Bs, As} -> - As; - As -> - As + {_Error, Avps} -> + Avps; + Avps -> + Avps end. %% recv_R/6 @@ -300,7 +300,7 @@ errors(_, #diameter_packet{header = #diameter_header{version = V}, %% AVP's definition. errors(_, #diameter_packet{errors = [Bs | Es]} = Pkt) - when is_bitstring(Bs) -> + when is_bitstring(Bs) -> %% from old code Pkt#diameter_packet{errors = [3009 | Es]}; %% DIAMETER_COMMAND_UNSUPPORTED 3001 @@ -479,10 +479,9 @@ answer_message(RC, #diameter_caps{origin_host = {OH,_}, origin_realm = {OR,_}}, Dict0, - #diameter_packet{avps = Avps} - = Pkt) -> + Pkt) -> ?LOG({error, RC}, Pkt), - {Dict0, answer_message(OH, OR, RC, Dict0, Avps)}. + {Dict0, answer_message(OH, OR, RC, Dict0, Pkt)}. %% resend/7 @@ -595,71 +594,87 @@ reply([Msg], Dict, TPid, Dict0, Fs, ReqPkt) is_tuple(Msg) -> reply(Msg, Dict, TPid, Dict0, Fs, ReqPkt#diameter_packet{errors = []}); -%% No errors or a diameter_header/avp list. reply(Msg, Dict, TPid, Dict0, Fs, ReqPkt) -> - Pkt = encode(Dict, reset(make_answer_packet(Msg, ReqPkt), Dict), Fs), + Pkt = encode(Dict, + reset(make_answer_packet(Msg, ReqPkt), Dict, Dict0), + Fs), incr(send, Pkt, Dict, TPid, Dict0), %% count outgoing result codes send(TPid, Pkt). -%% reset/2 +%% reset/3 %% Header/avps list: send as is. -reset(#diameter_packet{msg = [#diameter_header{} | _]} = Pkt, _) -> +reset(#diameter_packet{msg = [#diameter_header{} | _]} = Pkt, _, _) -> Pkt; %% No errors to set or errors explicitly ignored. -reset(#diameter_packet{errors = Es} = Pkt, _) +reset(#diameter_packet{errors = Es} = Pkt, _, _) when Es == []; Es == false -> Pkt; %% Otherwise possibly set Result-Code and/or Failed-AVP. -reset(#diameter_packet{msg = Msg, errors = Es} = Pkt, Dict) -> - Pkt#diameter_packet{msg = reset(Msg, Dict, Es)}. - -%% reset/3 - -reset(Msg, Dict, Es) - when is_list(Es) -> - {E3, E5, Fs} = partition(Es), - FailedAVP = failed_avp(Msg, lists:reverse(Fs), Dict), - reset(set(Msg, FailedAVP, Dict), - Dict, - choose(is_answer_message(Msg, Dict), E3, E5)); +reset(#diameter_packet{msg = Msg, errors = Es} = Pkt, Dict, Dict0) -> + {RC, Failed} = select_error(Msg, Es, Dict0), + Pkt#diameter_packet{msg = reset(Msg, Dict, RC, Failed)}. -reset(Msg, Dict, N) - when is_integer(N) -> - ResultCode = rc(Msg, {'Result-Code', N}, Dict), - set(Msg, ResultCode, Dict); - -reset(Msg, _, _) -> - Msg. - -partition(Es) -> - lists:foldl(fun pacc/2, {false, false, []}, Es). - -%% Note that the errors list can contain not only integer() and -%% {integer(), #diameter_avp{}} but also #diameter_avp{}. The latter -%% isn't something that's returned by decode but can be set in a reply -%% for encode. +%% select_error/3 +%% +%% Extract the first appropriate RC or {RC, #diameter_avp{}} +%% pair from an errors list, and accumulate all #diameter_avp{}. +%% +%% RFC 6733: +%% +%% 7.5. Failed-AVP AVP +%% +%% The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides +%% debugging information in cases where a request is rejected or not +%% fully processed due to erroneous information in a specific AVP. The +%% value of the Result-Code AVP will provide information on the reason +%% for the Failed-AVP AVP. A Diameter answer message SHOULD contain an +%% instance of the Failed-AVP AVP that corresponds to the error +%% indicated by the Result-Code AVP. For practical purposes, this +%% Failed-AVP would typically refer to the first AVP processing error +%% that a Diameter node encounters. + +select_error(Msg, Es, Dict0) -> + {RC, Avps} = lists:foldl(fun(T,A) -> select(T, A, Dict0) end, + {is_answer_message(Msg, Dict0), []}, + Es), + {RC, lists:reverse(Avps)}. + +%% Only integer() and {integer(), #diameter_avp{}} are the result of +%% decode. #diameter_avp{} can only be set in a reply for encode. + +select(#diameter_avp{} = A, {RC, As}, _) -> + {RC, [A|As]}; + +select(_, {RC, _} = Acc, _) + when is_integer(RC) -> + Acc; -pacc({RC, #diameter_avp{} = A}, {E3, E5, Acc}) +select({RC, #diameter_avp{} = A}, {IsAns, As} = Acc, Dict0) when is_integer(RC) -> - pacc(RC, {E3, E5, [A|Acc]}); + case is_result(RC, IsAns, Dict0) of + true -> {RC, [A|As]}; + false -> Acc + end; -pacc(#diameter_avp{} = A, {E3, E5, Acc}) -> - {E3, E5, [A|Acc]}; +select(RC, {IsAns, As} = Acc, Dict0) + when is_boolean(IsAns), is_integer(RC) -> + case is_result(RC, IsAns, Dict0) of + true -> {RC, As}; + false -> Acc + end. -pacc(RC, {false, E5, Acc}) - when 3 == RC div 1000 -> - {RC, E5, Acc}; +%% reset/4 -pacc(RC, {E3, false, Acc}) - when 5 == RC div 1000 -> - {E3, RC, Acc}; +reset(Msg, Dict, RC, Avps) -> + FailedAVP = failed_avp(Msg, Avps, Dict), + ResultCode = rc(Msg, RC, Dict), + set(set(Msg, FailedAVP, Dict), ResultCode, Dict). -pacc(_, Acc) -> - Acc. +%% eval_packet/2 eval_packet(Pkt, Fs) -> lists:foreach(fun(F) -> diameter_lib:eval([F,Pkt]) end, Fs). @@ -725,29 +740,34 @@ set(Rec, Avps, Dict) -> %% the arity is 1 or {0,1}. In other cases (which probably shouldn't %% exist in practise) we can't know what's appropriate. -rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) -> - case Dict:avp_arity(MsgName, 'Result-Code') of - 1 -> [T]; +rc(_, B, _) + when is_boolean(B) -> + []; + +rc([MsgName | _], RC, Dict) -> + K = 'Result-Code', + case Dict:avp_arity(MsgName, K) of + 1 -> [{K, RC}]; {0,1} -> [{K, [RC]}]; _ -> [] end; -rc(Rec, T, Dict) -> - rc([Dict:rec2msg(element(1, Rec))], T, Dict). +rc(Rec, RC, Dict) -> + rc([Dict:rec2msg(element(1, Rec))], RC, Dict). %% failed_avp/3 failed_avp(_, [] = No, _) -> No; -failed_avp(Rec, Failed, Dict) -> - [fa(Rec, [{'AVP', Failed}], Dict)]. +failed_avp(Rec, Avps, Dict) -> + [failed(Rec, [{'AVP', Avps}], Dict)]. %% Reply as name and tuple list ... -fa([MsgName | Values], FailedAvp, Dict) -> - R = Dict:msg2rec(MsgName), +failed([MsgName | Values], FailedAvp, Dict) -> + RecName = Dict:msg2rec(MsgName), try - Dict:'#info-'(R, {index, 'Failed-AVP'}), + Dict:'#info-'(RecName, {index, 'Failed-AVP'}), {'Failed-AVP', [FailedAvp]} catch error: _ -> @@ -758,8 +778,10 @@ fa([MsgName | Values], FailedAvp, Dict) -> end; %% ... or record. -fa(Rec, FailedAvp, Dict) -> +failed(Rec, FailedAvp, Dict) -> try + RecName = element(1, Rec), + Dict:'#info-'(RecName, {index, 'Failed-AVP'}), {'Failed-AVP', [FailedAvp]} catch error: _ -> @@ -838,12 +860,14 @@ fa(Rec, FailedAvp, Dict) -> %% answer_message/5 -answer_message(OH, OR, RC, Dict0, Avps) -> +answer_message(OH, OR, RC, Dict0, #diameter_packet{avps = Avps, + errors = Es}) -> {Code, _, Vid} = Dict0:avp_header('Session-Id'), ['answer-message', {'Origin-Host', OH}, {'Origin-Realm', OR}, - {'Result-Code', RC} - | session_id(Code, Vid, Dict0, Avps)]. + {'Result-Code', RC}] + ++ session_id(Code, Vid, Dict0, Avps) + ++ failed_avp(RC, Es). session_id(Code, Vid, Dict0, Avps) when is_list(Avps) -> @@ -855,6 +879,15 @@ session_id(Code, Vid, Dict0, Avps) [] end. +%% Note that this should only match 5xxx result codes currently but +%% don't bother distinguishing this case. +failed_avp(RC, [{RC, Avp} | _]) -> + [{'Failed-AVP', [{'AVP', [Avp]}]}]; +failed_avp(RC, [_ | Es]) -> + failed_avp(RC, Es); +failed_avp(_, [] = No) -> + No. + %% find_avp/3 find_avp(Code, Vid, Avps) @@ -1479,12 +1512,14 @@ send({TPid, Pkt, #request{handler = Pid} = Req, SvcName, Timeout, TRef}) -> Req#request{handler = self()}, SvcName, Timeout), - Pid ! reref(receive T -> T end, Ref, TRef). - -reref({T, Ref, R}, Ref, TRef) -> - {T, TRef, R}; -reref(T, _, _) -> - T. + receive + {answer, _, _, _, _} = A -> + Pid ! A; + {failover = T, Ref} -> + Pid ! {T, TRef}; + T -> + exit({timeout, Ref, TPid} = T) + end. %% send/2 @@ -1559,7 +1594,7 @@ resend_request(Pkt0, store_request(TPid, Bin, Req, Timeout) -> Seqs = diameter_codec:sequence_numbers(Bin), - TRef = erlang:start_timer(Timeout, self(), timeout), + TRef = erlang:start_timer(Timeout, self(), TPid), ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}), ets:member(?REQUEST_TABLE, TPid) orelse (self() ! {failover, TRef}), %% failover/1 may have missed diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 073a415d10..88ccf630e2 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -47,6 +47,14 @@ -define(BASE, ?DIAMETER_DICT_COMMON). +-define(IS_NATURAL(N), (is_integer(N) andalso 0 =< N)). + +-define(CHOOSE(B,T,F), if (B) -> T; true -> F end). + +-record(config, + {suspect = 1 :: non_neg_integer(), %% OKAY -> SUSPECT + okay = 3 :: non_neg_integer()}). %% REOPEN -> OKAY + -record(watchdog, {%% PCB - Peer Control Block; see RFC 3539, Appendix A status = initial :: initial | okay | suspect | down | reopen, @@ -54,7 +62,8 @@ tw :: 6000..16#FFFFFFFF | {module(), atom(), list()}, %% {M,F,A} -> integer() >= 0 num_dwa = 0 :: -1 | non_neg_integer(), - %% number of DWAs received during reopen + %% number of DWAs received in reopen, + %% or number of timeouts before okay -> suspect %% end PCB parent = self() :: pid(), %% service process transport :: pid() | undefined, %% peer_fsm process @@ -64,7 +73,8 @@ %% term passed into diameter_service with incoming message sequence :: diameter:sequence(), %% mask restrict :: {diameter:restriction(), boolean()}, - shutdown = false :: boolean()}). + shutdown = false :: boolean(), + config :: #config{}}). %% --------------------------------------------------------------------------- %% start/2 @@ -129,7 +139,8 @@ i({Ack, T, Pid, {RecvData, receive_data = RecvData, dictionary = Dict0, sequence = Mask, - restrict = {Restrict, lists:member(node(), Nodes)}}. + restrict = {Restrict, lists:member(node(), Nodes)}, + config = config(Opts)}. wait(Ref, Pid) -> receive @@ -139,6 +150,27 @@ wait(Ref, Pid) -> exit({shutdown, D}) end. +%% config/1 +%% +%% Could also configure counts for SUSPECT to DOWN and REOPEN to DOWN, +%% but don't. + +config(Opts) -> + Config = proplists:get_value(watchdog_config, Opts, []), + is_list(Config) orelse config_error({watchdog_config, Config}), + lists:foldl(fun config/2, #config{}, Config). %% ^ added in old code + +config({suspect, N}, Rec) + when ?IS_NATURAL(N) -> + Rec#config{suspect = N}; + +config({okay, N}, Rec) + when ?IS_NATURAL(N) -> + Rec#config{okay = N}; + +config(T, _) -> %% added in old code + config_error(T). + %% start/5 start(T, Opts, Mask, Nodes, Dict0, Svc) -> @@ -193,7 +225,8 @@ dict0(_, _, Acc) -> Acc. config_error(T) -> - ?ERROR({configuration_error, T}). + diameter_lib:error_report(configuration_error, T), + exit({shutdown, {configuration_error, T}}). %% handle_call/3 @@ -219,6 +252,17 @@ handle_info(T, #watchdog{} = State) -> ?LOG(stop, T), event(T, State, State#watchdog{status = down}), {stop, {shutdown, T}, State} + end; + +handle_info(T, State) -> %% started in old code + handle_info(T, upgrade(State)). + +upgrade(State) -> + case erlang:append_element(State, #config{}) of + #watchdog{status = okay, config = #config{suspect = OS}} = S -> + S#watchdog{num_dwa = OS}; + #watchdog{} = S -> + S end. close({'DOWN', _, process, TPid, {shutdown, Reason}}, @@ -331,11 +375,13 @@ transition({accepted = T, TPid}, #watchdog{transport = TPid, transition({open, TPid, Hosts, _} = Open, #watchdog{transport = TPid, status = initial, - restrict = {_, R}} + restrict = {_,R}, + config = #config{suspect = OS}} = S) -> case okay(getr(restart), Hosts, R) of okay -> - set_watchdog(S#watchdog{status = okay}); + set_watchdog(S#watchdog{status = okay, + num_dwa = OS}); reopen -> transition(Open, S#watchdog{status = down}) end; @@ -347,15 +393,22 @@ transition({open, TPid, Hosts, _} = Open, transition({open = Key, TPid, _Hosts, T}, #watchdog{transport = TPid, - status = down} + status = down, + config = #config{suspect = OS, + okay = RO}} = S) -> - %% Store the info we need to notify the parent to reopen the - %% connection after the requisite DWA's are received, at which - %% time we eraser(open). The reopen message is a later addition, - %% to communicate the new capabilities as soon as they're known. - putr(Key, {TPid, T}), - set_watchdog(send_watchdog(S#watchdog{status = reopen, - num_dwa = 0})); + case RO of + 0 -> %% non-standard: skip REOPEN + set_watchdog(S#watchdog{status = okay, + num_dwa = OS}); + _ -> + %% Store the info we need to notify the parent to reopen + %% the connection after the requisite DWA's are received, + %% at which time we eraser(open). + putr(Key, {TPid, T}), + set_watchdog(send_watchdog(S#watchdog{status = reopen, + num_dwa = 0})) + end; %% OKAY Connection down CloseConnection() %% Failover() @@ -374,7 +427,7 @@ transition({'DOWN', _, process, TPid, _Reason}, #watchdog{transport = TPid, status = T} = S) -> - set_watchdog(S#watchdog{status = case T of initial -> T; _ -> down end, + set_watchdog(S#watchdog{status = ?CHOOSE(initial == T, T, down), pending = false, transport = undefined}); @@ -452,7 +505,9 @@ set_watchdog(#watchdog{tw = TwInit, tref = TRef} = S) -> cancel(TRef), - S#watchdog{tref = erlang:start_timer(tw(TwInit), self(), tw)}. + S#watchdog{tref = erlang:start_timer(tw(TwInit), self(), tw)}; +set_watchdog(stop = No) -> + No. cancel(undefined) -> ok; @@ -553,22 +608,27 @@ rcv(_, #watchdog{status = okay} = S) -> %% SUSPECT Receive non-DWA Failback() %% SetWatchdog() OKAY -rcv('DWA', #watchdog{status = suspect} = S) -> +rcv('DWA', #watchdog{status = suspect, config = #config{suspect = OS}} = S) -> set_watchdog(S#watchdog{status = okay, + num_dwa = OS, pending = false}); -rcv(_, #watchdog{status = suspect} = S) -> - set_watchdog(S#watchdog{status = okay}); +rcv(_, #watchdog{status = suspect, config = #config{suspect = OS}} = S) -> + set_watchdog(S#watchdog{status = okay, + num_dwa = OS}); %% REOPEN Receive DWA & Pending = FALSE %% NumDWA == 2 NumDWA++ %% Failback() OKAY rcv('DWA', #watchdog{status = reopen, - num_dwa = 2 = N} - = S) -> + num_dwa = N, + config = #config{suspect = OS, + okay = RO}} + = S) + when N+1 == RO -> S#watchdog{status = okay, - num_dwa = N+1, + num_dwa = OS, pending = false}; %% REOPEN Receive DWA & Pending = FALSE @@ -607,9 +667,17 @@ timeout(#watchdog{status = T, %% Pending SetWatchdog() SUSPECT timeout(#watchdog{status = okay, - pending = true} - = S) -> - S#watchdog{status = suspect}; + pending = true, + num_dwa = N} + = S) -> + case N of + 1 -> + S#watchdog{status = suspect}; + 0 -> %% non-standard: never move to suspect + S; + N -> %% non-standard: more timeouts before moving + S#watchdog{num_dwa = N-1} + end; %% SUSPECT Timer expires CloseConnection() %% SetWatchdog() DOWN diff --git a/lib/diameter/src/compiler/diameter_codegen.erl b/lib/diameter/src/compiler/diameter_codegen.erl index 80036879ea..e687145263 100644 --- a/lib/diameter/src/compiler/diameter_codegen.erl +++ b/lib/diameter/src/compiler/diameter_codegen.erl @@ -574,12 +574,12 @@ cs_enumerated_avp({AvpName, Values}) -> lists:flatmap(fun(V) -> c_enumerated_avp(AvpName, V) end, Values). c_enumerated_avp(AvpName, {_,I}) -> - [{?clause, [?ATOM(decode), ?Atom(AvpName), ?TERM(<<I:32/integer>>)], + [{?clause, [?ATOM(decode), ?Atom(AvpName), ?TERM(<<I:32>>)], [], [?TERM(I)]}, {?clause, [?ATOM(encode), ?Atom(AvpName), ?INTEGER(I)], [], - [?TERM(<<I:32/integer>>)]}]. + [?TERM(<<I:32>>)]}]. %%% ------------------------------------------------------------------------ %%% msg_header/1 @@ -700,7 +700,7 @@ c_empty_value({Name, _, _, _}) -> c_empty_value({Name, _}) -> {?clause, [?Atom(Name)], [], - [?TERM(<<0:32/integer>>)]}. + [?TERM(<<0:32>>)]}. %%% ------------------------------------------------------------------------ %%% # dict/0 diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 132088b514..cbbba714ac 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -100,6 +100,18 @@ %% # start/3 %% --------------------------------------------------------------------------- +-spec start({accept, Ref}, Svc, [Opt]) + -> {ok, pid(), [inet:ip_address()]} + when Ref :: diameter:transport_ref(), + Svc :: #diameter_service{}, + Opt :: diameter:transport_opt(); + ({connect, Ref}, Svc, [Opt]) + -> {ok, pid(), [inet:ip_address()]} + | {ok, pid()} + when Ref :: diameter:transport_ref(), + Svc :: #diameter_service{}, + Opt :: diameter:transport_opt(). + start({T, Ref}, #diameter_service{capabilities = Caps}, Opts) -> diameter_tcp_sup:start(), %% start tcp supervisors on demand {Mod, Rest} = split(Opts), @@ -172,7 +184,7 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), - Sock = i(T, Ref, Mod, Pid, SslOpts, Rest, Addrs), + Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs), MPid ! {stop, self()}, %% tell the monitor to die M = if SslOpts -> ssl; true -> Mod end, setopts(M, Sock), @@ -199,14 +211,21 @@ i(#monitor{parent = Pid, transport = TPid} = S) -> i({listen, LRef, APid, {Mod, Opts, Addrs}}) -> {[LA, LP], Rest} = proplists:split(Opts, [ip, port]), - LAddr = get_addr(LA, Addrs), + LAddrOpt = get_addr(LA, Addrs), LPort = get_port(LP), - {ok, LSock} = Mod:listen(LPort, gen_opts(LAddr, Rest)), + {ok, LSock} = Mod:listen(LPort, gen_opts(LAddrOpt, Rest)), + LAddr = laddr(LAddrOpt, Mod, LSock), true = diameter_reg:add_new({?MODULE, listener, {LRef, {LAddr, LSock}}}), proc_lib:init_ack({ok, self(), {LAddr, LSock}}), erlang:monitor(process, APid), start_timer(#listener{socket = LSock}). +laddr([], Mod, Sock) -> + {ok, {Addr, _Port}} = sockname(Mod, Sock), + Addr; +laddr([{ip, Addr}], _, _) -> + Addr. + own(Opts) -> {Own, Rest} = proplists:split(Opts, [fragment_timer]), {lists:append(Own), Rest}. @@ -225,17 +244,19 @@ ssl_opts([{ssl_options, Opts}]) ssl_opts(L) -> ?ERROR({ssl_options, L}). -%% i/7 +%% init/7 %% Establish a TLS connection before capabilities exchange ... -i(Type, Ref, Mod, Pid, true, Opts, Addrs) -> - i(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs); +init(Type, Ref, Mod, Pid, true, Opts, Addrs) -> + init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs); %% ... or not. -i(Type, Ref, Mod, Pid, _, Opts, Addrs) -> - i(Type, Ref, Mod, Pid, Opts, Addrs). +init(Type, Ref, Mod, Pid, _, Opts, Addrs) -> + init(Type, Ref, Mod, Pid, Opts, Addrs). + +%% init/6 -i(accept = T, Ref, Mod, Pid, Opts, Addrs) -> +init(accept = T, Ref, Mod, Pid, Opts, Addrs) -> {LAddr, LSock} = listener(Ref, {Mod, Opts, Addrs}), proc_lib:init_ack({ok, self(), [LAddr]}), Sock = ok(accept(Mod, LSock)), @@ -243,17 +264,28 @@ i(accept = T, Ref, Mod, Pid, Opts, Addrs) -> diameter_peer:up(Pid), Sock; -i(connect = T, Ref, Mod, Pid, Opts, Addrs) -> +init(connect = T, Ref, Mod, Pid, Opts, Addrs) -> {[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]), - LAddr = get_addr(LA, Addrs), - RAddr = get_addr(RA, []), + LAddrOpt = get_addr(LA, Addrs), + RAddr = get_addr(RA), RPort = get_port(RP), - proc_lib:init_ack({ok, self(), [LAddr]}), - Sock = ok(connect(Mod, RAddr, RPort, gen_opts(LAddr, Rest))), + proc_lib:init_ack(init_rc(LAddrOpt)), + Sock = ok(connect(Mod, RAddr, RPort, gen_opts(LAddrOpt, Rest))), publish(Mod, T, Ref, Sock), - diameter_peer:up(Pid, {RAddr, RPort}), + up(Pid, {RAddr, RPort}, LAddrOpt, Mod, Sock), Sock. +init_rc([{ip, Addr}]) -> + {ok, self(), [Addr]}; +init_rc([]) -> + {ok, self()}. + +up(Pid, Remote, [{ip, _Addr}], _, _) -> + diameter_peer:up(Pid, Remote); +up(Pid, Remote, [], Mod, Sock) -> + {Addr, _Port} = ok(sockname(Mod, Sock)), + diameter_peer:up(Pid, Remote, [Addr]). + publish(Mod, T, Ref, Sock) -> true = diameter_reg:add_new({?MODULE, T, {Ref, Sock}}), putr(?INFO_KEY, {Mod, Sock}). %% for info/1 @@ -281,10 +313,17 @@ l([], LRef, T) -> {ok, _, AS} = diameter_tcp_sup:start_child({listen, LRef, self(), T}), AS. +%% get_addr/1 + +get_addr(As) -> + diameter_lib:ipaddr(addr(As, [])). + %% get_addr/2 +get_addr([], []) -> + []; get_addr(As, Def) -> - diameter_lib:ipaddr(addr(As, Def)). + [{ip, diameter_lib:ipaddr(addr(As, Def))}]. %% Take the first address from the service if several are unspecified. addr([], [Addr | _]) -> @@ -305,14 +344,10 @@ get_port(Ps) -> %% gen_opts/2 -gen_opts(LAddr, Opts) -> +gen_opts(LAddrOpt, Opts) -> {L,_} = proplists:split(Opts, [binary, packet, active]), [[],[],[]] == L orelse ?ERROR({reserved_options, Opts}), - [binary, - {packet, 0}, - {active, once}, - {ip, LAddr} - | Opts]. + [binary, {packet, 0}, {active, once}] ++ LAddrOpt ++ Opts. %% --------------------------------------------------------------------------- %% # ports/1 |