diff options
Diffstat (limited to 'lib/diameter/src/base')
24 files changed, 2402 insertions, 1102 deletions
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index d74e091e11..de88f6befd 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -45,6 +46,7 @@ -export_type([evaluable/0, restriction/0, + message_length/0, remotes/0, sequence/0, app_alias/0, @@ -298,6 +300,9 @@ call(SvcName, App, Message) -> | [node()] | evaluable(). +-type message_length() + :: 0..16#FFFFFF. + %% Options passed to start_service/2 -type service_opt() @@ -306,6 +311,9 @@ call(SvcName, App, Message) -> | {restrict_connections, restriction()} | {sequence, sequence() | evaluable()} | {share_peers, remotes()} + | {string_decode, boolean()} + | {strict_mbit, boolean()} + | {incoming_maxlen, message_length()} | {use_shared_peers, remotes()} | {spawn_opt, list()}. @@ -337,11 +345,14 @@ call(SvcName, App, Message) -> :: {transport_module, atom()} | {transport_config, any()} | {transport_config, any(), 'Unsigned32'() | infinity} + | {pool_size, pos_integer()} | {applications, [app_alias()]} | {capabilities, [capability()]} | {capabilities_cb, evaluable()} | {capx_timeout, 'Unsigned32'()} | {disconnect_cb, evaluable()} + | {dpr_timeout, 'Unsigned32'()} + | {dpa_timeout, 'Unsigned32'()} | {length_errors, exit | handle | discard} | {connect_timer, 'Unsigned32'()} | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}} diff --git a/lib/diameter/src/base/diameter_app.erl b/lib/diameter/src/base/diameter_app.erl index 600f7ff04d..6f0c78094a 100644 --- a/lib/diameter/src/base/diameter_app.erl +++ b/lib/diameter/src/base/diameter_app.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2011. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_callback.erl b/lib/diameter/src/base/diameter_callback.erl index 90431099b0..70c70fb5bd 100644 --- a/lib/diameter/src/base/diameter_callback.erl +++ b/lib/diameter/src/base/diameter_callback.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2011. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_capx.erl b/lib/diameter/src/base/diameter_capx.erl index 93548ecafd..07a678c617 100644 --- a/lib/diameter/src/base/diameter_capx.erl +++ b/lib/diameter/src/base/diameter_capx.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -50,7 +51,8 @@ -export([build_CER/2, recv_CER/3, recv_CEA/3, - make_caps/2]). + make_caps/2, + binary_caps/1]). -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). @@ -115,7 +117,8 @@ mk_caps(Caps0, Opts) -> -define(SC(K,F), set_cap({K, Val}, {Caps, #diameter_caps{F = false} = C}) -> - {Caps#diameter_caps{F = cap(K, Val)}, C#diameter_caps{F = true}}). + {Caps#diameter_caps{F = cap(K, copy(Val))}, + C#diameter_caps{F = true}}). ?SC('Origin-Host', origin_host); ?SC('Origin-Realm', origin_realm); @@ -375,10 +378,10 @@ capx_to_caps(CEX, Dict) -> 'Firmware-Revision', 'AVP'], CEX), - #diameter_caps{origin_host = OH, - origin_realm = OR, + #diameter_caps{origin_host = copy(OH), + origin_realm = copy(OR), vendor_id = VId, - product_name = PN, + product_name = copy(PN), origin_state_id = OSI, host_ip_address = IP, supported_vendor_id = SV, @@ -389,6 +392,32 @@ capx_to_caps(CEX, Dict) -> firmware_revision = FR, avp = X}. +%% Copy binaries to avoid retaining a reference to a large binary +%% containing AVPs we aren't interested in. +copy(B) + when is_binary(B) -> + binary:copy(B); + +copy(T) -> + T. + +%% binary_caps/1 +%% +%% Encode stringish capabilities with {string_decode, false}. + +binary_caps(Caps) -> + lists:foldl(fun bcaps/2, Caps, [#diameter_caps.origin_host, + #diameter_caps.origin_realm, + #diameter_caps.product_name]). + +bcaps(N, Caps) -> + case element(N, Caps) of + undefined -> + Caps; + V -> + setelement(N, Caps, iolist_to_binary(V)) + end. + %% --------------------------------------------------------------------------- %% --------------------------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl index 0de4d53973..1ea5357924 100644 --- a/lib/diameter/src/base/diameter_codec.erl +++ b/lib/diameter/src/base/diameter_codec.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -22,6 +23,8 @@ -export([encode/2, decode/2, decode/3, + setopts/1, + getopt/1, collect_avps/1, decode_header/1, sequence_numbers/1, @@ -59,6 +62,52 @@ %% +-+-+-+-+-+-+-+-+-+-+-+-+- %%% --------------------------------------------------------------------------- +%%% # setopts/1 +%%% # getopt/1 +%%% --------------------------------------------------------------------------- + +%% These functions are a compromise in the same vein as the use of the +%% process dictionary in diameter_gen.hrl in generated codec modules. +%% Instead of rewriting the entire dictionary generation to pass +%% encode/decode options around, the calling process sets them by +%% calling setopts/1. At current, the only option is whether or not to +%% decode binaries as strings, which is used by diameter_types. + +setopts(Opts) + when is_list(Opts) -> + lists:foreach(fun setopt/1, Opts). + +%% The default string_decode true is for backwards compatibility. +setopt({K, false = B}) + when K == string_decode; + K == strict_mbit -> + setopt(K, B); + +%% Regard anything but the generated RFC 3588 dictionary as modern. +%% This affects the interpretation of defaults during the decode +%% of values of type DiameterURI, this having changed from RFC 3588. +%% (So much for backwards compatibility.) +setopt({common_dictionary, diameter_gen_base_rfc3588}) -> + setopt(rfc, 3588); + +setopt(_) -> + ok. + +setopt(Key, Value) -> + put({diameter, Key}, Value). + +getopt(Key) -> + case get({diameter, Key}) of + undefined when Key == string_decode; + Key == strict_mbit -> + true; + undefined when Key == rfc -> + 6733; + V -> + V + end. + +%%% --------------------------------------------------------------------------- %%% # encode/2 %%% --------------------------------------------------------------------------- @@ -70,12 +119,15 @@ encode(Mod, #diameter_packet{} = Pkt) -> try e(Mod, Pkt) catch + exit: {Reason, Stack, #diameter_header{} = H} = T -> + %% Exit with a header in the reason to let the caller + %% count encode errors. + ?LOG(encode_error, {Reason, Stack, H}), + exit({?MODULE, encode, T}); error: Reason -> - %% Be verbose since a crash report may be truncated and - %% encode errors are self-inflicted. - X = {?MODULE, encode, {Reason, ?STACK}}, - diameter_lib:error_report(X, {?MODULE, encode, [Mod, Pkt]}), - exit(X) + T = {Reason, diameter_lib:get_stacktrace()}, + ?LOG(encode_error, T), + exit({?MODULE, encode, T}) end; encode(Mod, Msg) -> @@ -87,53 +139,62 @@ encode(Mod, Msg) -> msg = Msg}). e(_, #diameter_packet{msg = [#diameter_header{} = Hdr | As]} = Pkt) -> - Avps = encode_avps(As), - Length = size(Avps) + 20, - - #diameter_header{version = Vsn, - cmd_code = Code, - application_id = Aid, - hop_by_hop_id = Hid, - end_to_end_id = Eid} - = Hdr, - - Flags = make_flags(0, Hdr), - - Pkt#diameter_packet{header = Hdr, - bin = <<Vsn:8, Length:24, - Flags:8, Code:24, - Aid:32, - Hid:32, - Eid:32, - Avps/binary>>}; + try encode_avps(reorder(As)) of + Avps -> + Length = size(Avps) + 20, + + #diameter_header{version = Vsn, + cmd_code = Code, + application_id = Aid, + hop_by_hop_id = Hid, + end_to_end_id = Eid} + = Hdr, + + Flags = make_flags(0, Hdr), + + Pkt#diameter_packet{header = Hdr, + bin = <<Vsn:8, Length:24, + Flags:8, Code:24, + Aid:32, + Hid:32, + Eid:32, + Avps/binary>>} + catch + error: Reason -> + exit({Reason, diameter_lib:get_stacktrace(), Hdr}) + end; -e(Mod, #diameter_packet{header = Hdr, msg = Msg} = Pkt) -> +e(Mod, #diameter_packet{header = Hdr0, msg = Msg} = Pkt) -> #diameter_header{version = Vsn, hop_by_hop_id = Hid, end_to_end_id = Eid} - = Hdr, + = Hdr0, MsgName = rec2msg(Mod, Msg), - {Code, Flags0, Aid} = msg_header(Mod, MsgName, Hdr), - Flags = make_flags(Flags0, Hdr), - - Avps = encode_avps(Mod, MsgName, values(Msg)), - Length = size(Avps) + 20, - - Pkt#diameter_packet{header = Hdr#diameter_header - {length = Length, - cmd_code = Code, - application_id = Aid, - is_request = 0 /= ?MASK(7, Flags), - is_proxiable = 0 /= ?MASK(6, Flags), - is_error = 0 /= ?MASK(5, Flags), - is_retransmitted = 0 /= ?MASK(4, Flags)}, - bin = <<Vsn:8, Length:24, - Flags:8, Code:24, - Aid:32, - Hid:32, - Eid:32, - Avps/binary>>}. + {Code, Flags0, Aid} = msg_header(Mod, MsgName, Hdr0), + Flags = make_flags(Flags0, Hdr0), + Hdr = Hdr0#diameter_header{cmd_code = Code, + application_id = Aid, + is_request = 0 /= ?MASK(7, Flags), + is_proxiable = 0 /= ?MASK(6, Flags), + is_error = 0 /= ?MASK(5, Flags), + is_retransmitted = 0 /= ?MASK(4, Flags)}, + Values = values(Msg), + + try encode_avps(Mod, MsgName, Values) of + Avps -> + Length = size(Avps) + 20, + Pkt#diameter_packet{header = Hdr#diameter_header{length = Length}, + bin = <<Vsn:8, Length:24, + Flags:8, Code:24, + Aid:32, + Hid:32, + Eid:32, + Avps/binary>>} + catch + error: Reason -> + exit({Reason, diameter_lib:get_stacktrace(), Hdr}) + end. %% make_flags/2 @@ -171,26 +232,50 @@ values(Avps) -> %% Message as a list of #diameter_avp{} ... encode_avps(_, _, [#diameter_avp{} | _] = Avps) -> - encode_avps(reorder(Avps, [], Avps)); + encode_avps(reorder(Avps)); %% ... or as a tuple list or record. encode_avps(Mod, MsgName, Values) -> Mod:encode_avps(MsgName, Values). %% reorder/1 +%% +%% Reorder AVPs for the relay case using the index field of +%% diameter_avp records. Decode populates this field in collect_avps +%% and presents AVPs in reverse order. A relay then sends the reversed +%% list with a Route-Record AVP prepended. The goal here is just to do +%% lists:reverse/1 in Grouped AVPs and the outer list, but only in the +%% case there are indexed AVPs at all, so as not to reverse lists that +%% have been explicilty sent (unindexed, in the desired order) as a +%% diameter_avp list. The effect is the same as lists:keysort/2, but +%% only on the cases we expect, not a general sort. + +reorder(Avps) -> + case reorder(Avps, []) of + false -> + Avps; + Sorted -> + Sorted + end. + +%% reorder/3 -reorder([#diameter_avp{index = 0} | _] = Avps, Acc, _) -> +%% In case someone has reversed the list already. (Not likely.) +reorder([#diameter_avp{index = 0} | _] = Avps, Acc) -> Avps ++ Acc; -reorder([#diameter_avp{index = N} = A | Avps], Acc, _) +%% Assume indexed AVPs are in reverse order. +reorder([#diameter_avp{index = N} = A | Avps], Acc) when is_integer(N) -> lists:reverse(Avps, [A | Acc]); -reorder([H | T], Acc, Avps) -> - reorder(T, [H | Acc], Avps); +%% An unindexed AVP. +reorder([H | T], Acc) -> + reorder(T, [H | Acc]); -reorder([], Acc, _) -> - Acc. +%% No indexed members. +reorder([], _) -> + false. %% encode_avps/1 @@ -225,15 +310,35 @@ rec2msg(Mod, Rec) -> %% Unsuccessfully decoded AVPs will be placed in #diameter_packet.errors. --spec decode(module(), #diameter_packet{} | bitstring()) +-spec decode(module() | {module(), module()}, #diameter_packet{} | binary()) -> #diameter_packet{}. +%% An Answer setting the E-bit. The application dictionary is needed +%% for the best-effort decode of Failed-AVP, and the best way to make +%% this available to the AVP decode in diameter_gen.hrl, without +%% having to rewrite the entire codec generation, is to place it in +%% the process dictionary. It's the code in diameter_gen.hrl (that's +%% included by every generated codec module) that looks for the entry. +%% Not ideal, but it solves the problem relatively simply. +decode({Mod, Mod}, Pkt) -> + decode(Mod, Pkt); +decode({Mod, AppMod}, Pkt) -> + Key = {?MODULE, dictionary}, + put(Key, AppMod), + try + decode(Mod, Pkt) + after + erase(Key) + end; + +%% Or not: a request, or an answer not setting the E-bit. decode(Mod, Pkt) -> decode(Mod:id(), Mod, Pkt). -%% If we're a relay application then just extract the avp's without -%% any decoding of their data since we don't know the application in -%% question. +%% decode/3 + +%% Relay application: just extract the avp's without any decoding of +%% their data since we don't know the application in question. decode(?APP_ID_RELAY, _, #diameter_packet{} = Pkt) -> case collect_avps(Pkt) of {E, As} -> @@ -259,34 +364,36 @@ decode(_, Mod, #diameter_packet{header = Hdr} = Pkt) -> decode_avps(MsgName, Mod, Pkt, collect_avps(Pkt)); decode(Id, Mod, Bin) - when is_bitstring(Bin) -> + when is_binary(Bin) -> decode(Id, Mod, #diameter_packet{header = decode_header(Bin), bin = Bin}). +%% decode_avps/4 + decode_avps(MsgName, Mod, Pkt, {E, Avps}) -> - ?LOG(invalid, Pkt#diameter_packet.bin), + ?LOG(invalid_avp_length, Pkt#diameter_packet.header), #diameter_packet{errors = Failed} = P = decode_avps(MsgName, Mod, Pkt, Avps), P#diameter_packet{errors = [E | Failed]}; -decode_avps('', Mod, Pkt, Avps) -> %% unknown message ... - ?LOG(unknown, {Mod, Pkt#diameter_packet.header}), +decode_avps('', _, Pkt, Avps) -> %% unknown message ... + ?LOG(unknown_message, Pkt#diameter_packet.header), Pkt#diameter_packet{avps = lists:reverse(Avps), errors = [3001]}; %% DIAMETER_COMMAND_UNSUPPORTED %% msg = undefined identifies this case. decode_avps(MsgName, Mod, Pkt, Avps) -> %% ... or not - {Rec, As, Failed} = Mod:decode_avps(MsgName, Avps), - ?LOGC([] /= Failed, failed, {Mod, Failed}), + {Rec, As, Errors} = Mod:decode_avps(MsgName, Avps), + ?LOGC([] /= Errors, decode_errors, Pkt#diameter_packet.header), Pkt#diameter_packet{msg = Rec, - errors = Failed, + errors = Errors, avps = As}. %%% --------------------------------------------------------------------------- %%% # decode_header/1 %%% --------------------------------------------------------------------------- --spec decode_header(bitstring()) +-spec decode_header(binary()) -> #diameter_header{} | false. @@ -297,7 +404,7 @@ decode_header(<<Version:8, ApplicationId:32, HopByHopId:32, EndToEndId:32, - _/bitstring>>) -> + _/binary>>) -> <<R:1, P:1, E:1, T:1, _:4>> = CmdFlags, %% 3588 (ch 3) says that reserved bits MUST be set to 0 and ignored @@ -356,6 +463,9 @@ sequence_numbers(#diameter_packet{bin = Bin}) sequence_numbers(#diameter_packet{header = #diameter_header{} = H}) -> sequence_numbers(H); +sequence_numbers(#diameter_packet{msg = [#diameter_header{} = H | _]}) -> + sequence_numbers(H); + sequence_numbers(#diameter_header{hop_by_hop_id = H, end_to_end_id = E}) -> {H,E}; @@ -410,7 +520,7 @@ msg_id(#diameter_header{application_id = A, is_request = R}) -> {A, C, if R -> 1; true -> 0 end}; -msg_id(<<_:32, Rbit:1, _:7, CmdCode:24, ApplId:32, _/bitstring>>) -> +msg_id(<<_:32, Rbit:1, _:7, CmdCode:24, ApplId:32, _/binary>>) -> {ApplId, CmdCode, Rbit}. %%% --------------------------------------------------------------------------- @@ -421,17 +531,18 @@ msg_id(<<_:32, Rbit:1, _:7, CmdCode:24, ApplId:32, _/bitstring>>) -> %% order in the binary. Note also that grouped avp's aren't unraveled, %% only those at the top level. --spec collect_avps(#diameter_packet{} | bitstring()) +-spec collect_avps(#diameter_packet{} | binary()) -> [Avp] | {Error, [Avp]} when Avp :: #diameter_avp{}, Error :: {5014, #diameter_avp{}}. collect_avps(#diameter_packet{bin = Bin}) -> - <<_:20/binary, Avps/bitstring>> = Bin, + <<_:20/binary, Avps/binary>> = Bin, collect_avps(Avps); -collect_avps(Bin) -> +collect_avps(Bin) + when is_binary(Bin) -> collect_avps(Bin, 0, []). collect_avps(<<>>, _, Acc) -> @@ -461,7 +572,9 @@ collect_avps(Bin, N, Acc) -> split_avp(Bin) -> {Code, V, M, P, Len, HdrLen} = split_head(Bin), - {Data, B} = split_data(Bin, HdrLen, Len - HdrLen), + + <<_:HdrLen/binary, Rest/binary>> = Bin, + {Data, B} = split_data(Rest, Len - HdrLen), {B, #diameter_avp{code = Code, vendor_id = V, @@ -471,17 +584,16 @@ split_avp(Bin) -> %% split_head/1 -split_head(<<Code:32, 1:1, M:1, P:1, _:5, Len:24, V:32, _/bitstring>>) -> +split_head(<<Code:32, 1:1, M:1, P:1, _:5, Len:24, V:32, _/binary>>) -> {Code, V, M, P, Len, 12}; -split_head(<<Code:32, 0:1, M:1, P:1, _:5, Len:24, _/bitstring>>) -> +split_head(<<Code:32, 0:1, M:1, P:1, _:5, Len:24, _/binary>>) -> {Code, undefined, M, P, Len, 8}; -%% Header is truncated: pack_avp/1 will pad to the minimum header -%% length. -split_head(B) - when is_bitstring(B) -> - ?THROW({5014, #diameter_avp{data = B}}). +%% Header is truncated. +split_head(Bin) -> + ?THROW({5014, #diameter_avp{data = Bin}}). +%% Note that pack_avp/1 will pad this at encode if sent in a Failed-AVP. %% 3588: %% @@ -511,53 +623,59 @@ split_head(B) %% AVP header with zero up to the minimum AVP header length. %% %% The underlined clause must be in error since (1) a header less than -%% the minimum value mean we don't know the identity of the AVP and +%% the minimum value mean we might not know the identity of the AVP and %% (2) the last sentence covers this case. %% split_data/3 -split_data(Bin, HdrLen, Len) - when 0 =< Len -> - split_data(Bin, HdrLen, Len, (4 - (Len rem 4)) rem 4); +split_data(Bin, Len) -> + Pad = (4 - (Len rem 4)) rem 4, -split_data(_, _, _) -> - invalid_avp_length(). + %% Len might be negative here, but that ensures the failure of the + %% binary match. -%% split_data/4 - -split_data(Bin, HdrLen, Len, Pad) -> case Bin of - <<_:HdrLen/binary, Data:Len/binary, _:Pad/binary, Rest/bitstring>> -> + <<Data:Len/binary, _:Pad/binary, Rest/binary>> -> {Data, Rest}; _ -> - invalid_avp_length() + %% Header length points past the end of the message, or + %% doesn't span the header. As stated in the 6733 text + %% above, it's sufficient to return a zero-filled minimal + %% payload if this is a request. Do this (in cases that we + %% know the type) by inducing a decode failure and letting + %% the dictionary's decode (in diameter_gen) deal with it. + %% + %% Note that the extra bit can only occur in the trailing + %% AVP of a message or Grouped AVP, since a faulty AVP + %% Length is otherwise indistinguishable from a correct + %% one here, since we don't know the types of the AVPs + %% being extracted. + {<<0:1, Bin/binary>>, <<>>} end. -%% invalid_avp_length/0 -%% -%% AVP Length doesn't mesh with payload. Induce a decode error by -%% returning a payload that no valid Diameter type can have. This is -%% so that a known AVP will result in 5014 error with a zero'd -%% payload. Here we simply don't know how to construct this payload. -%% (Yes, this solution is an afterthought.) - -invalid_avp_length() -> - {<<0:1>>, <<>>}. - %%% --------------------------------------------------------------------------- %%% # pack_avp/1 %%% --------------------------------------------------------------------------- %% The normal case here is data as an #diameter_avp{} list or an %% iolist, which are the cases that generated codec modules use. The -%% other case is as a convenience in the relay case in which the +%% other cases are a convenience in the relay case in which the %% dictionary doesn't know about specific AVP's. -%% Grouped AVP whose components need packing ... -pack_avp(#diameter_avp{data = [#diameter_avp{} | _] = Avps} = A) -> - pack_avp(A#diameter_avp{data = encode_avps(Avps)}); +%% Decoded Grouped AVP with decoded components: ignore components +%% since they're already encoded in the Grouped AVP. +pack_avp([#diameter_avp{} = Grouped | _Components]) -> + pack_avp(Grouped); -%% ... data as a type/value tuple ... +%% Grouped AVP whose components need packing. It's intentional that +%% this isn't equivalent to [Grouped | Components]: here the +%% components need to be encoded before wrapping with the Grouped AVP, +%% and the list is flat, nesting being accomplished in the data +%% fields. +pack_avp(#diameter_avp{data = [#diameter_avp{} | _] = Components} = Grouped) -> + pack_avp(Grouped#diameter_avp{data = encode_avps(Components)}); + +%% Data as a type/value tuple ... pack_avp(#diameter_avp{data = {Type, Value}} = A) when is_atom(Type) -> pack_avp(A#diameter_avp{data = diameter_types:Type(encode, Value)}); @@ -575,17 +693,23 @@ pack_avp(#diameter_avp{data = {Dict, Name, Value}} = A) -> {Name, Type} = Dict:avp_name(Code, Vid), pack_avp(A#diameter_avp{data = {Hdr, {Type, Value}}}); +%% ... with a truncated header ... pack_avp(#diameter_avp{code = undefined, data = B}) - when is_bitstring(B) -> + when is_binary(B) -> %% Reset the AVP Length of an AVP Header resulting from a 5014 %% error. The RFC doesn't explicitly say to do this but the %% receiver can't correctly extract this and following AVP's %% without a correct length. On the downside, the header doesn't %% reveal if the received header has been padded. Pad = 8*header_length(B) - bit_size(B), - Len = size(<<H:5/binary, _:24, T/binary>> = <<B/bitstring, 0:Pad>>), + Len = size(<<H:5/binary, _:24, T/binary>> = <<B/binary, 0:Pad>>), <<H/binary, Len:24, T/binary>>; +%% ... when ignoring errors in Failed-AVP ... +%% ... during a relay encode ... +pack_avp(#diameter_avp{data = <<0:1, B/binary>>} = A) -> + pack_avp(A#diameter_avp{data = B}); + %% ... or as an iolist. pack_avp(#diameter_avp{code = Code, vendor_id = V, diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index f5ea459fd0..702f11593a 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -35,10 +36,10 @@ %% -module(diameter_config). --compile({no_auto_import, [monitor/2]}). - -behaviour(gen_server). +-compile({no_auto_import, [monitor/2]}). + -export([start_service/2, stop_service/1, add_transport/2, @@ -68,7 +69,7 @@ -include("diameter_internal.hrl"). %% Server state. --record(state, {id = now()}). +-record(state, {id = diameter_lib:now()}). %% Registered name of the server. -define(SERVER, ?MODULE). @@ -158,7 +159,8 @@ stop_service(SvcName) -> %% # add_transport/2 %% -------------------------------------------------------------------------- --spec add_transport(diameter:service_name(), {connect|listen, [diameter:transport_opt()]}) +-spec add_transport(diameter:service_name(), + {connect|listen, [diameter:transport_opt()]}) -> {ok, diameter:transport_ref()} | {error, term()}. @@ -531,7 +533,10 @@ opt({applications, As}) -> opt({capabilities, Os}) -> is_list(Os) andalso ok == encode_CER(Os); -opt({capx_timeout, Tmo}) -> +opt({K, Tmo}) + when K == capx_timeout; + K == dpr_timeout; + K == dpa_timeout -> ?IS_UINT32(Tmo); opt({length_errors, T}) -> @@ -554,6 +559,9 @@ opt({watchdog_config, L}) -> opt({spawn_opt, Opts}) -> is_list(Opts); +opt({pool_size, N}) -> + is_integer(N) andalso 0 < N; + %% Options that we can't validate. opt({K, _}) when K == transport_config; @@ -638,13 +646,25 @@ make_config(SvcName, Opts) -> {false, monitor}, {?NOMASK, sequence}, {nodes, restrict_connections}, + {16#FFFFFF, incoming_maxlen}, + {true, strict_mbit}, + {true, string_decode}, {[], spawn_opt}]), + D = proplists:get_value(string_decode, SvcOpts, true), + #service{name = SvcName, rec = #diameter_service{applications = Apps, - capabilities = Caps}, + capabilities = binary_caps(Caps, D)}, options = SvcOpts}. +binary_caps(Caps, true) -> + Caps; +binary_caps(Caps, false) -> + diameter_capx:binary_caps(Caps). + +%% make_opts/2 + make_opts(Opts, Defs) -> Known = [{K, get_opt(K, Opts, D)} || {D,K} <- Defs], Unknown = Opts -- Known, @@ -653,17 +673,28 @@ make_opts(Opts, Defs) -> [{K, opt(K,V)} || {K,V} <- Known]. +opt(incoming_maxlen, N) + when 0 =< N, N < 1 bsl 24 -> + N; + opt(spawn_opt, L) when is_list(L) -> L; opt(K, false = B) - when K /= sequence -> + when K == share_peers; + K == use_shared_peers; + K == monitor; + K == restrict_connections; + K == strict_mbit; + K == string_decode -> B; opt(K, true = B) when K == share_peers; - K == use_shared_peers -> + K == use_shared_peers; + K == strict_mbit; + K == string_decode -> B; opt(restrict_connections, T) @@ -753,7 +784,7 @@ app_acc({application, Opts} = T, Acc) -> Alias = get_opt(alias, Opts, Dict), ModS = get_opt(state, Opts, Alias), M = get_opt(call_mutates_state, Opts, false, [true]), - A = get_opt(answer_errors, Opts, report, [callback, discard]), + A = get_opt(answer_errors, Opts, discard, [callback, report]), P = get_opt(request_errors, Opts, answer_3xxx, [answer, callback]), [#diameter_app{alias = Alias, dictionary = Dict, diff --git a/lib/diameter/src/base/diameter_dict.erl b/lib/diameter/src/base/diameter_dict.erl index 3b9ba00a3f..1013690a5b 100644 --- a/lib/diameter/src/base/diameter_dict.erl +++ b/lib/diameter/src/base/diameter_dict.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2011. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_internal.hrl b/lib/diameter/src/base/diameter_internal.hrl index 4b672aa071..518c0b9b1f 100644 --- a/lib/diameter/src/base/diameter_internal.hrl +++ b/lib/diameter/src/base/diameter_internal.hrl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2013. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_lib.erl b/lib/diameter/src/base/diameter_lib.erl index 44d81e2778..43b0ca24ab 100644 --- a/lib/diameter/src/base/diameter_lib.erl +++ b/lib/diameter/src/base/diameter_lib.erl @@ -1,37 +1,65 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% -module(diameter_lib). +-compile({no_auto_import, [now/0]}). +-compile({nowarn_deprecated_function, [{erlang, now, 0}]}). -export([info_report/2, error_report/2, warning_report/2, + now/0, + timestamp/1, now_diff/1, + micro_diff/1, + micro_diff/2, time/1, + seed/0, eval/1, + eval_name/1, + get_stacktrace/0, ipaddr/1, spawn_opts/2, wait/1, fold_tuple/3, + fold_n/3, + for_n/2, log/4]). %% --------------------------------------------------------------------------- +%% # get_stacktrace/0 +%% --------------------------------------------------------------------------- + +%% Return a stacktrace with a leading, potentially large, argument +%% list replaced by an arity. Trace on stacktrace/0 to see the +%% original. + +get_stacktrace() -> + stacktrace(erlang:get_stacktrace()). + +stacktrace([{M,F,A,L} | T]) when is_list(A) -> + [{M, F, length(A), L} | T]; +stacktrace(L) -> + L. + +%% --------------------------------------------------------------------------- %% # info_report/2 %% --------------------------------------------------------------------------- @@ -60,26 +88,80 @@ warning_report(Reason, T) -> report(fun error_logger:warning_report/1, Reason, T). report(Fun, Reason, T) -> - Fun([{why, Reason}, {who, self()}, {what, T}]), + Fun(io_lib:format("diameter: ~" ++ fmt(Reason) ++ "~n ~p~n", + [Reason, T])), false. +fmt(T) -> + if is_list(T) -> + "s"; + true -> + "p" + end. + +%% --------------------------------------------------------------------------- +%% # now/0 +%% --------------------------------------------------------------------------- + +-spec now() + -> integer(). + +now() -> + erlang:monotonic_time(). + +%% --------------------------------------------------------------------------- +%% # timestamp/1 +%% --------------------------------------------------------------------------- + +-spec timestamp(integer()) + -> erlang:timestamp(). + +timestamp(MonoT) -> %% monotonic time + MicroSecs = monotonic_to_microseconds(MonoT + erlang:time_offset()), + Secs = MicroSecs div 1000000, + {Secs div 1000000, Secs rem 1000000, MicroSecs rem 1000000}. + +monotonic_to_microseconds(MonoT) -> + erlang:convert_time_unit(MonoT, native, micro_seconds). + %% --------------------------------------------------------------------------- %% # now_diff/1 %% --------------------------------------------------------------------------- --spec now_diff(NowT) +-spec now_diff(T0 :: integer()) -> {Hours, Mins, Secs, MicroSecs} - when NowT :: {non_neg_integer(), 0..999999, 0..999999}, - Hours :: non_neg_integer(), + when Hours :: non_neg_integer(), Mins :: 0..59, Secs :: 0..59, MicroSecs :: 0..999999. -%% Return timer:now_diff(now(), NowT) as an {H, M, S, MicroS} tuple -%% instead of as integer microseconds. +%% Return time difference as an {H, M, S, MicroS} tuple instead of as +%% integer microseconds. -now_diff({_,_,_} = Time) -> - time(timer:now_diff(now(), Time)). +now_diff(T0) -> + time(micro_diff(T0)). + +%% --------------------------------------------------------------------------- +%% # micro_diff/1 +%% --------------------------------------------------------------------------- + +-spec micro_diff(T0 :: integer()) + -> MicroSecs + when MicroSecs :: non_neg_integer(). + +micro_diff(T0) -> %% monotonic time + monotonic_to_microseconds(erlang:monotonic_time() - T0). + +%% --------------------------------------------------------------------------- +%% # micro_diff/2 +%% --------------------------------------------------------------------------- + +-spec micro_diff(T1 :: integer(), T0 :: integer()) + -> MicroSecs + when MicroSecs :: non_neg_integer(). + +micro_diff(T1, T0) -> %% monotonic time + monotonic_to_microseconds(T1 - T0). %% --------------------------------------------------------------------------- %% # time/1 @@ -87,19 +169,13 @@ now_diff({_,_,_} = Time) -> %% Return an elapsed time as an {H, M, S, MicroS} tuple. %% --------------------------------------------------------------------------- --spec time(NowT | Diff) +-spec time(Diff :: non_neg_integer()) -> {Hours, Mins, Secs, MicroSecs} - when NowT :: {non_neg_integer(), 0..999999, 0..999999}, - Diff :: non_neg_integer(), - Hours :: non_neg_integer(), + when Hours :: non_neg_integer(), Mins :: 0..59, Secs :: 0..59, MicroSecs :: 0..999999. -time({_,_,_} = NowT) -> %% time of day - %% 24 hours = 24*60*60*1000000 = 86400000000 microsec - time(timer:now_diff(NowT, {0,0,0}) rem 86400000000); - time(Micro) -> %% elapsed time Seconds = Micro div 1000000, H = Seconds div 3600, @@ -108,6 +184,24 @@ time(Micro) -> %% elapsed time {H, M, S, Micro rem 1000000}. %% --------------------------------------------------------------------------- +%% # seed/0 +%% --------------------------------------------------------------------------- + +-spec seed() + -> {erlang:timestamp(), {integer(), integer(), integer()}}. + +%% Return an argument for random:seed/1. + +seed() -> + T = now(), + {timestamp(T), seed(T)}. + +%% seed/1 + +seed(T) -> %% monotonic time + {erlang:phash2(node()), T, erlang:unique_integer()}. + +%% --------------------------------------------------------------------------- %% # eval/1 %% %% Evaluate a function in various forms. @@ -129,8 +223,8 @@ eval({M,F,A}) -> eval([{M,F,A} | X]) -> apply(M, F, X ++ A); -eval([[F|A] | X]) -> - eval([F | X ++ A]); +eval([[F|X] | A]) -> + eval([F | A ++ X]); eval([F|A]) -> apply(F,A); @@ -142,6 +236,28 @@ eval(F) -> F(). %% --------------------------------------------------------------------------- +%% eval_name/1 +%% --------------------------------------------------------------------------- + +eval_name({M,F,A}) -> + {M, F, length(A)}; + +eval_name([{M,F,A} | X]) -> + {M, F, length(A) + length(X)}; + +eval_name([[F|A] | X]) -> + eval_name([F | X ++ A]); + +eval_name([F|_]) -> + F; + +eval_name({F}) -> + eval_name(F); + +eval_name(F) -> + F. + +%% --------------------------------------------------------------------------- %% # ipaddr/1 %% %% Parse an IP address. @@ -199,17 +315,19 @@ opts(HeapSize, Opts) -> %% # wait/1 %% --------------------------------------------------------------------------- --spec wait([pid()]) +-spec wait([pid() | reference()]) -> ok. wait(L) -> - down([erlang:monitor(process, P) || P <- L]). + lists:foreach(fun down/1, L). -down([]) -> - ok; -down([MRef|T]) -> - receive {'DOWN', MRef, process, _, _} -> ok end, - down(T). +down(Pid) + when is_pid(Pid) -> + down(monitor(process, Pid)); + +down(MRef) + when is_reference(MRef) -> + receive {'DOWN', MRef, process, _, _} = T -> T end. %% --------------------------------------------------------------------------- %% # fold_tuple/3 @@ -242,6 +360,35 @@ ft(Value, {Idx, T}) -> setelement(Idx, T, Value). %% --------------------------------------------------------------------------- +%% # fold_n/3 +%% --------------------------------------------------------------------------- + +-spec fold_n(F, Acc0, N) + -> term() + when F :: fun((non_neg_integer(), term()) -> term()), + Acc0 :: term(), + N :: non_neg_integer(). + +fold_n(F, Acc, N) + when is_integer(N), 0 < N -> + fold_n(F, F(N, Acc), N-1); + +fold_n(_, Acc, _) -> + Acc. + +%% --------------------------------------------------------------------------- +%% # for_n/2 +%% --------------------------------------------------------------------------- + +-spec for_n(F, N) + -> non_neg_integer() + when F :: fun((non_neg_integer()) -> term()), + N :: non_neg_integer(). + +for_n(F, N) -> + fold_n(fun(M,A) -> F(M), A+1 end, 0, N). + +%% --------------------------------------------------------------------------- %% # log/4 %% %% Called to have something to trace on for happenings of interest. diff --git a/lib/diameter/src/base/diameter_misc_sup.erl b/lib/diameter/src/base/diameter_misc_sup.erl index 4e40476f14..2054ea7831 100644 --- a/lib/diameter/src/base/diameter_misc_sup.erl +++ b/lib/diameter/src/base/diameter_misc_sup.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2011. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index e5d4b28766..2759f17e64 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -1,24 +1,24 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% -module(diameter_peer). - -behaviour(gen_server). %% Interface towards transport modules ... @@ -57,7 +57,7 @@ -define(SERVER, ?MODULE). %% Server state. --record(state, {id = now()}). +-record(state, {id = diameter_lib:now()}). %% Default transport_module/config. -define(DEFAULT_TMOD, diameter_tcp). @@ -119,7 +119,7 @@ pair([{transport_module, M} | Rest], Mods, Acc) -> pair([{transport_config = T, C} | Rest], Mods, Acc) -> pair([{T, C, ?DEFAULT_TTMO} | Rest], Mods, Acc); pair([{transport_config, C, Tmo} | Rest], Mods, Acc) -> - pair(Rest, [], acc({Mods, C, Tmo}, Acc)); + pair(Rest, [], acc({lists:reverse(Mods), C, Tmo}, Acc)); pair([_ | Rest], Mods, Acc) -> pair(Rest, Mods, Acc); @@ -128,13 +128,16 @@ pair([_ | Rest], Mods, Acc) -> pair([], [], []) -> [{[?DEFAULT_TMOD], ?DEFAULT_TCFG, ?DEFAULT_TTMO}]; -%% One transport_module, one transport_config. -pair([], [M], [{[], Cfg, Tmo}]) -> - [{[M], Cfg, Tmo}]; +%% One transport_module, one transport_config: ignore option order. +%% That is, interpret [{transport_config, _}, {transport_module, _}] +%% as if the order was reversed, not as config with default module and +%% module with default config. +pair([], [_] = Mods, [{[], Cfg, Tmo}]) -> + [{Mods, Cfg, Tmo}]; %% Trailing transport_module: default transport_config. pair([], [_|_] = Mods, Acc) -> - lists:reverse(acc({Mods, ?DEFAULT_TCFG, ?DEFAULT_TTMO}, Acc)); + pair([{transport_config, ?DEFAULT_TCFG}], Mods, Acc); pair([], [], Acc) -> lists:reverse(def(Acc)). @@ -230,12 +233,22 @@ recv(Pid, Pkt) -> %% # send/2 %% --------------------------------------------------------------------------- -send(Pid, #diameter_packet{transport_data = undefined, - bin = Bin}) -> - send(Pid, Bin); +send(Pid, Msg) -> + ifc_send(Pid, {send, strip(Msg)}). + +%% Send only binary when possible. +strip(#diameter_packet{transport_data = undefined, + bin = Bin}) -> + Bin; + +%% Strip potentially large message terms. +strip(#diameter_packet{transport_data = T, + bin = Bin}) -> + #diameter_packet{transport_data = T, + bin = Bin}; -send(Pid, Pkt) -> - ifc_send(Pid, {send, Pkt}). +strip(Msg) -> + Msg. %% --------------------------------------------------------------------------- %% # close/1 @@ -324,7 +337,6 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %% --------------------------------------------------------- -%% INTERNAL FUNCTIONS %% --------------------------------------------------------- %% ifc_send/2 diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index f76bd96c3c..2b23183d18 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2014. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -63,6 +64,8 @@ %% Keys in process dictionary. -define(CB_KEY, cb). %% capabilities callback -define(DPR_KEY, dpr). %% disconnect callback +-define(DPA_KEY, dpa). %% timeout for incoming DPA, or shutdown after + %% outgoing DPA -define(REF_KEY, ref). %% transport_ref() -define(Q_KEY, q). %% transport start queue -define(START_KEY, start). %% start of connected transport @@ -82,18 +85,26 @@ N == ?GOAWAY; N == goaway; N == ?BUSY; N == busy). -%% RFC 3588: +%% RFC 6733: %% %% Timeout An application-defined timer has expired while waiting %% for some event. %% --define(EVENT_TIMEOUT, 10000). + %% Default timeout for reception of CER/CEA. +-define(CAPX_TIMEOUT, 10000). -%% Default timeout for DPA in response to DPR. A bit short but the -%% timeout used to be hardcoded. (So it could be worse.) +%% Default timeout for DPA to be received in response to an outgoing +%% DPR. A bit short but the timeout used to be hardcoded. (So it could +%% be worse.) -define(DPA_TIMEOUT, 1000). +%% Default timeout for the connection to be closed by the peer +%% following an outgoing DPA in response to an incoming DPR. It's the +%% recipient of DPA that should close the connection according to the +%% RFC. +-define(DPR_TIMEOUT, 5000). + -type uint32() :: diameter:'Unsigned32'(). -record(state, @@ -107,9 +118,15 @@ transport :: pid(), %% transport process dictionary :: module(), %% common dictionary service :: #diameter_service{}, - dpr = false :: false | {uint32(), uint32()}, - %% | hop by hop and end to end identifiers - length_errors :: exit | handle | discard}). + dpr = false :: false + | true %% DPR received, DPA sent + | {boolean(), uint32(), uint32()}, + %% hop by hop and end to end identifiers in + %% outgoing DPR; boolean says whether or not + %% the request was sent explicitly with + %% diameter:call/4. + length_errors :: exit | handle | discard, + incoming_maxlen :: integer() | infinity}). %% There are non-3588 states possible as a consequence of 5.6.1 of the %% standard and the corresponding problem for incoming CEA's: we don't @@ -138,7 +155,7 @@ %% # start/3 %% --------------------------------------------------------------------------- --spec start(T, [Opt], {diameter:sequence(), +-spec start(T, [Opt], {[diameter:service_opt()], [node()], module(), #diameter_service{}}) @@ -177,18 +194,23 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({Ack, WPid, {M, Ref} = T, Opts, {Mask, Nodes, Dict0, Svc}}) -> +i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> erlang:monitor(process, WPid), wait(Ack, WPid), diameter_stats:reg(Ref), + diameter_codec:setopts([{common_dictionary, Dict0} | SvcOpts]), + {_,_} = Mask = proplists:get_value(sequence, SvcOpts), + Maxlen = proplists:get_value(incoming_maxlen, SvcOpts, 16#FFFFFF), {[Cs,Ds], Rest} = proplists:split(Opts, [capabilities_cb, disconnect_cb]), putr(?CB_KEY, {Ref, [F || {_,F} <- Cs]}), putr(?DPR_KEY, [F || {_, F} <- Ds]), putr(?REF_KEY, Ref), putr(?SEQUENCE_KEY, Mask), putr(?RESTRICT_KEY, Nodes), + putr(?DPA_KEY, {proplists:get_value(dpr_timeout, Opts, ?DPR_TIMEOUT), + proplists:get_value(dpa_timeout, Opts, ?DPA_TIMEOUT)}), - Tmo = proplists:get_value(capx_timeout, Opts, ?EVENT_TIMEOUT), + Tmo = proplists:get_value(capx_timeout, Opts, ?CAPX_TIMEOUT), OnLengthErr = proplists:get_value(length_errors, Opts, exit), {TPid, Addrs} = start_transport(T, Rest, Svc), @@ -199,7 +221,8 @@ i({Ack, WPid, {M, Ref} = T, Opts, {Mask, Nodes, Dict0, Svc}}) -> dictionary = Dict0, mode = M, service = svc(Svc, Addrs), - length_errors = OnLengthErr}. + length_errors = OnLengthErr, + incoming_maxlen = Maxlen}. %% The transport returns its local ip addresses so that different %% transports on the same service can use different local addresses. %% The local addresses are put into Host-IP-Address avps here when @@ -212,9 +235,12 @@ wait(Ref, Pid) -> Ref -> ok; {'DOWN', _, process, Pid, _} = D -> - exit({shutdown, D}) + x(D) end. +x(T) -> + exit({shutdown, T}). + start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) -> Addrs0 = LCaps#diameter_caps.host_ip_address, start_transport(Addrs0, {T, Opts, Svc}). @@ -225,8 +251,8 @@ start_transport(Addrs0, T) -> erlang:monitor(process, TPid), q_next(TPid, Addrs0, Tmo, Data), {TPid, Addrs}; - No -> - exit({shutdown, No}) + {error, No} -> + x({no_connection, No}) end. svc(#diameter_service{capabilities = LCaps0} = Svc, Addrs) -> @@ -283,24 +309,21 @@ handle_info(T, #state{} = State) -> ok -> {noreply, State}; #state{state = X} = S -> - ?LOGC(X =/= State#state.state, transition, X), + ?LOGC(X /= State#state.state, transition, X), {noreply, S}; {stop, Reason} -> ?LOG(stop, Reason), {stop, {shutdown, Reason}, State}; stop -> - ?LOG(stop, T), + ?LOG(stop, truncate(T)), {stop, {shutdown, T}, State} catch - exit: {diameter_codec, encode, _} = Reason -> + exit: {diameter_codec, encode, T} = Reason -> + incr_error(send, T, State#state.dictionary), ?LOG(stop, Reason), - %% diameter_codec:encode/2 emits an error report. Only - %% indicate the probable reason here. - diameter_lib:info_report(probable_configuration_error, - insufficient_capabilities), {stop, {shutdown, Reason}, State}; {?MODULE, Tag, Reason} -> - ?LOG(Tag, {Reason, T}), + ?LOG(stop, Tag), {stop, {shutdown, Reason}, State} end. %% The form of the throw caught here is historical. It's @@ -325,6 +348,11 @@ code_change(_, State, _) -> %% --------------------------------------------------------------------------- %% --------------------------------------------------------------------------- +truncate({'DOWN' = T, _, process, Pid, _}) -> + {T, Pid}; +truncate(T) -> + T. + putr(Key, Val) -> put({?MODULE, Key}, Val). @@ -371,11 +399,8 @@ transition({diameter, {TPid, connected}}, %% message. This may be followed by an incoming message which arrived %% before the transport was killed and this can't be distinguished %% from one from the transport that's been started to replace it. -transition({diameter, {_, connected}}, _) -> - {stop, connection_timeout}; -transition({diameter, {_, connected, _}}, _) -> - {stop, connection_timeout}; -transition({diameter, {_, connected, _, _}}, _) -> +transition({diameter, T}, _) + when tuple_size(T) < 5, connected == element(2,T) -> {stop, connection_timeout}; %% Connection has timed out: start an alternate. @@ -403,9 +428,8 @@ transition({timeout, _}, _) -> ok; %% Outgoing message. -transition({send, Msg}, #state{transport = TPid}) -> - send(TPid, Msg), - ok; +transition({send, Msg}, S) -> + outgoing(Msg, S); %% Request for graceful shutdown at remove_transport, stop_service of %% application shutdown. @@ -414,7 +438,8 @@ transition({shutdown, Pid, Reason}, #state{parent = Pid, dpr = false} = S) -> transition({shutdown, Pid, _}, #state{parent = Pid}) -> ok; -%% DPA reception has timed out. +%% DPA reception has timed out, or peer has not closed the connection +%% as a result of outgoing DPA. transition(dpa_timeout, _) -> stop; @@ -476,12 +501,13 @@ send_CER(#state{state = {'Wait-Conn-Ack', Tmo}, orelse close({already_connected, Remote, LCaps}), CER = build_CER(S), - ?LOG(send, 'CER'), #diameter_packet{header = #diameter_header{end_to_end_id = Eid, hop_by_hop_id = Hid}} = Pkt = encode(CER, Dict), + incr(send, Pkt, Dict), send(TPid, Pkt), + ?LOG(send, 'CER'), start_timer(Tmo, S#state{state = {'Wait-CEA', Hid, Eid}}). %% Register ourselves as connecting to the remote endpoint in @@ -521,13 +547,9 @@ encode(Rec, Dict) -> recv(#diameter_packet{header = #diameter_header{} = Hdr} = Pkt, - #state{parent = Pid, - dictionary = Dict0} + #state{dictionary = Dict0} = S) -> - Name = diameter_codec:msg_name(Dict0, Hdr), - Pid ! {recv, self(), Name, Pkt}, - diameter_stats:incr({msg_id(Name, Hdr), recv}), %% count received - rcv(Name, Pkt, S); + recv1(diameter_codec:msg_name(Dict0, Hdr), Pkt, S); recv(#diameter_packet{header = undefined, bin = Bin} @@ -538,6 +560,47 @@ recv(#diameter_packet{header = undefined, recv(Bin, S) -> recv(#diameter_packet{bin = Bin}, S). +%% recv1/3 + +recv1(_, + #diameter_packet{header = H, bin = Bin}, + #state{incoming_maxlen = M}) + when M < size(Bin) -> + invalid(false, incoming_maxlen_exceeded, {size(Bin), H}); + +%% Incoming request after outgoing DPR: discard. Don't discard DPR, so +%% both ends don't do so when sending simultaneously. +recv1(Name, + #diameter_packet{header = #diameter_header{is_request = true} = H}, + #state{dpr = {_,_,_}}) + when Name /= 'DPR' -> + invalid(false, recv_after_outgoing_dpr, H); + +%% Incoming request after incoming DPR: discard. +recv1(_, + #diameter_packet{header = #diameter_header{is_request = true} = H}, + #state{dpr = true}) -> + invalid(false, recv_after_incoming_dpr, H); + +%% DPA with identifier mismatch, or in response to a DPR initiated by +%% the service. +recv1('DPA' = N, + #diameter_packet{header = #diameter_header{hop_by_hop_id = Hid, + end_to_end_id = Eid}} + = Pkt, + #state{dpr = {X,H,E}} + = S) + when H /= Hid; + E /= Eid; + not X -> + rcv(N, Pkt, S); + +%% Any other message with a header and no length errors: send to the +%% parent. +recv1(Name, Pkt, #state{parent = Pid} = S) -> + Pid ! {recv, self(), Name, Pkt}, + rcv(Name, Pkt, S). + %% recv/3 recv(#diameter_header{length = Len} @@ -553,42 +616,30 @@ recv(#diameter_header{length = Len} recv(#diameter_header{} = H, #diameter_packet{bin = Bin}, - #state{length_errors = E} - = S) -> - invalid(E, - invalid_message_length, - recv, - [size(Bin), bit_size(Bin) rem 8, H, S]); + #state{length_errors = E}) -> + T = {size(Bin), bit_size(Bin) rem 8, H}, + invalid(E, message_length_mismatch, T); -recv(false, Pkt, #state{length_errors = E} = S) -> - invalid(E, truncated_header, recv, [Pkt, S]). +recv(false, #diameter_packet{bin = Bin}, #state{length_errors = E}) -> + invalid(E, truncated_header, Bin). %% Note that counters here only count discarded messages. -invalid(E, Reason, F, A) -> +invalid(E, Reason, T) -> diameter_stats:incr(Reason), - abort(E, Reason, F, A). - -abort(exit, Reason, F, A) -> - diameter_lib:warning_report(Reason, {?MODULE, F, A}), - throw({?MODULE, abort, Reason}); - -abort(_, _, _, _) -> + E == exit andalso close({Reason, T}), + ?LOG(Reason, T), ok. -msg_id({_,_,_} = T, _) -> - T; -msg_id(_, Hdr) -> - {_,_,_} = diameter_codec:msg_id(Hdr). - %% rcv/3 %% Incoming CEA. -rcv('CEA', +rcv('CEA' = N, #diameter_packet{header = #diameter_header{end_to_end_id = Eid, hop_by_hop_id = Hid}} = Pkt, #state{state = {'Wait-CEA', Hid, Eid}} = S) -> + ?LOG(recv, N), handle_CEA(Pkt, S); %% Incoming CER @@ -606,37 +657,133 @@ rcv(Name, _, #state{state = PS}) rcv('DPR' = N, Pkt, S) -> handle_request(N, Pkt, S); -%% DPA in response to DPR and with the expected identifiers. +%% DPA in response to DPR, with the expected identifiers. rcv('DPA' = N, #diameter_packet{header = #diameter_header{end_to_end_id = Eid, - hop_by_hop_id = Hid}}, - #state{transport = TPid, - dpr = {Hid, Eid}}) -> + hop_by_hop_id = Hid} + = H} + = Pkt, + #state{dictionary = Dict0, + transport = TPid, + dpr = {X, Hid, Eid}}) -> + ?LOG(recv, N), + X orelse begin + %% Only count DPA in response to a DPR sent by the + %% service: explicit DPR is counted in the same way + %% as other explicitly sent requests. + incr(recv, H, Dict0), + incr_rc(recv, diameter_codec:decode(Dict0, Pkt), Dict0) + end, diameter_peer:close(TPid), {stop, N}; -%% Ignore anything else, an unsolicited DPA in particular. +%% Ignore anything else, an unsolicited DPA in particular. Note that +%% dpa_timeout deals with the case in which the peer sends the wrong +%% identifiers in DPA. +rcv(N, #diameter_packet{header = H}, _) + when N == 'CER'; + N == 'CEA'; + N == 'DPR'; + N == 'DPA' -> + ?LOG(ignored, N), + %% Note that these aren't counted in the normal recv counter. + diameter_stats:incr({diameter_codec:msg_id(H), recv, ignored}), + ok; + rcv(_, _, _) -> ok. +%% incr/3 + +incr(Dir, Hdr, Dict0) -> + diameter_traffic:incr(Dir, Hdr, self(), Dict0). + +%% incr_rc/3 + +incr_rc(Dir, Pkt, Dict0) -> + diameter_traffic:incr_rc(Dir, Pkt, self(), Dict0). + +%% incr_error/3 + +incr_error(Dir, Pkt, Dict0) -> + diameter_traffic:incr_error(Dir, Pkt, self(), Dict0). + %% send/2 %% Msg here could be a #diameter_packet or a binary depending on who's %% sending. In particular, the watchdog will send DWR as a binary %% while messages coming from clients will be in a #diameter_packet. + send(Pid, Msg) -> - diameter_stats:incr({diameter_codec:msg_id(Msg), send}), diameter_peer:send(Pid, Msg). +%% outgoing/2 + +%% Explicit DPR. +outgoing(#diameter_packet{header = #diameter_header{application_id = 0, + cmd_code = 282, + is_request = true} + = H} + = Pkt, + #state{dpr = T, + parent = Pid} + = S) -> + if T == false -> + inform_dpr(Pid), + send_dpr(true, Pkt, dpa_timeout(), S); + T == true -> + invalid(false, dpr_after_dpa, H); %% DPA sent: discard + true -> + invalid(false, dpr_after_dpr, H) %% DPR sent: discard + end; + +%% Explict CER or DWR: discard. These are sent by us. +outgoing(#diameter_packet{header = #diameter_header{application_id = 0, + cmd_code = C, + is_request = true} + = H}, + _) + when 257 == C; %% CER + 280 == C -> %% DWR + invalid(false, invalid_request, H); + +%% DPR not sent: send. +outgoing(Msg, #state{transport = TPid, dpr = false}) -> + send(TPid, Msg), + ok; + +%% Outgoing answer: send. +outgoing(#diameter_packet{header = #diameter_header{is_request = false}} + = Pkt, + #state{transport = TPid}) -> + send(TPid, Pkt), + ok; + +%% Outgoing request: discard. +outgoing(Msg, #state{dpr = {_,_,_}}) -> + invalid(false, send_after_dpr, header(Msg)). + +header(#diameter_packet{header = H}) -> + H; +header(Bin) -> %% DWR + diameter_codec:decode_header(Bin). + %% handle_request/3 +%% +%% Incoming CER or DPR. -handle_request(Type, #diameter_packet{} = Pkt, #state{dictionary = D} = S) -> - ?LOG(recv, Type), - send_answer(Type, diameter_codec:decode(D, Pkt), S). +handle_request(Name, + #diameter_packet{header = H} = Pkt, + #state{dictionary = Dict0} = S) -> + ?LOG(recv, Name), + incr(recv, H, Dict0), + send_answer(Name, diameter_codec:decode(Dict0, Pkt), S). %% send_answer/3 send_answer(Type, ReqPkt, #state{transport = TPid, dictionary = Dict} = S) -> + incr_error(recv, ReqPkt, Dict), + #diameter_packet{header = H, transport_data = TD} = ReqPkt, @@ -653,13 +800,19 @@ send_answer(Type, ReqPkt, #state{transport = TPid, dictionary = Dict} = S) -> msg = Msg, transport_data = TD}, - send(TPid, diameter_codec:encode(Dict, Pkt)), + AnsPkt = diameter_codec:encode(Dict, Pkt), + + incr(send, AnsPkt, Dict), + incr_rc(send, AnsPkt, Dict), + send(TPid, AnsPkt), + ?LOG(send, ans(Type)), eval(PostF, S). +ans('CER') -> 'CEA'; +ans('DPR') -> 'DPA'. + eval([F|A], S) -> apply(F, A ++ [S]); -eval(ok, S) -> - S; eval(T, _) -> close(T). @@ -674,6 +827,8 @@ build_answer('CER', = Pkt, #state{dictionary = Dict0} = S) -> + diameter_codec:setopts([{string_decode, false}]), + {SupportedApps, RCaps, CEA} = recv_CER(CER, S), [RC, IS] = Dict0:'#get-'(['Result-Code', 'Inband-Security-Id'], CEA), @@ -706,7 +861,7 @@ build_answer(Type, errors = Es} = Pkt, S) -> - {RC, FailedAVP} = result_code(H, Es), + {RC, FailedAVP} = result_code(Type, H, Es), {answer(Type, RC, FailedAVP, S), post(Type, RC, Pkt, S)}. inband_security([]) -> @@ -723,8 +878,16 @@ cea(CEA, RC, Dict0) -> post('CER' = T, RC, Pkt, S) -> {T, caps(S), {RC, Pkt}}; -post('DPR', _, _, _) -> - ok. +post('DPR', _, _, #state{parent = Pid}) -> + [fun(S) -> dpr_timer(), inform_dpr(Pid), dpr(S) end]. + +dpr(#state{dpr = false} = S) -> %% not awaiting DPA + S#state{dpr = true}; %% DPR received +dpr(S) -> %% DPR already sent or received + S. + +inform_dpr(Pid) -> + Pid ! {'DPR', self()}. %% tell watchdog to die with us rejected({capabilities_cb, _F, Reason}, T, S) -> rejected(Reason, T, S); @@ -734,7 +897,7 @@ rejected(discard, T, _) -> rejected({N, Es}, T, S) -> {answer('CER', N, failed_avp(N, Es), S), T}; rejected(N, T, S) -> - rejected({N, []}, T, S). + {answer('CER', N, [], S), T}. failed_avp(RC, [{RC, Avp} | _]) -> [{'Failed-AVP', [[{'AVP', [Avp]}]]}]; @@ -773,6 +936,19 @@ set(['answer-message' | _] = Ans, FailedAvp) -> set([_|_] = Ans, FailedAvp) -> Ans ++ FailedAvp. +%% result_code/3 + +%% Be lenient with errors in DPR since there's no reason to be +%% otherwise. Rejecting may cause the peer to missinterpret the error +%% as meaning that the connection should not be closed, which may well +%% lead to more problems than any errors in the DPR. + +result_code('DPR', _, _) -> + {2001, []}; + +result_code('CER', H, Es) -> + result_code(H, Es). + %% result_code/2 result_code(#diameter_header{is_error = true}, _) -> @@ -848,28 +1024,29 @@ recv_CER(CER, #state{service = Svc, dictionary = Dict}) -> close({'CER', CER, Svc, Dict, Reason}) end. -%% handle_CEA/1 +%% handle_CEA/2 -handle_CEA(#diameter_packet{bin = Bin} +handle_CEA(#diameter_packet{header = H} = Pkt, #state{dictionary = Dict0, service = #diameter_service{capabilities = LCaps}} - = S) - when is_binary(Bin) -> - ?LOG(recv, 'CEA'), + = S) -> + incr(recv, H, Dict0), - #diameter_packet{msg = CEA} + #diameter_packet{} = DPkt = diameter_codec:decode(Dict0, Pkt), + diameter_codec:setopts([{string_decode, false}]), + + RC = result_code(incr_rc(recv, DPkt, Dict0)), + {SApps, IS, RCaps} = recv_CEA(DPkt, S), #diameter_caps{origin_host = {OH, DH}} = Caps = capz(LCaps, RCaps), - RC = Dict0:'#get-'('Result-Code', CEA), - %% Ensure that we don't already have a connection to the peer in %% question. This isn't the peer election of 3588 except in the %% sense that, since we don't know who we're talking to until we @@ -877,7 +1054,7 @@ handle_CEA(#diameter_packet{bin = Bin} %% connection with the peer. try - ?IS_SUCCESS(RC) + is_integer(RC) andalso ?IS_SUCCESS(RC) orelse ?THROW(RC), [] == SApps andalso ?THROW(no_common_application), @@ -897,6 +1074,11 @@ handle_CEA(#diameter_packet{bin = Bin} %% capabilities exchange could send DIAMETER_LIMITED_SUCCESS = 2002, %% even if this isn't required by RFC 3588. +result_code({'Result-Code', N}) -> + N; +result_code(_) -> + undefined. + %% recv_CEA/2 recv_CEA(#diameter_packet{header = #diameter_header{version @@ -988,22 +1170,16 @@ capz(#diameter_caps{} = L, #diameter_caps{} = R) -> tl(tuple_to_list(R)))]). %% close/1 +%% +%% A good function to trace on in case of problems with capabilities +%% exchange. close(Reason) -> - report(Reason), throw({?MODULE, close, Reason}). -%% Could possibly log more here. -report({M, _, _, _, _} = T) - when M == 'CER'; - M == 'CEA' -> - diameter_lib:error_report(failure, T); -report(_) -> - ok. - %% dpr/2 %% -%% The RFC isn't clear on whether DPR should be send in a non-Open +%% The RFC isn't clear on whether DPR should be sent in a non-Open %% state. The Peer State Machine transitions it documents aren't %% exhaustive (no Stop in Wait-I-CEA for example) so assume it's up to %% the implementation and transition to Closed (ie. die) if we haven't @@ -1019,7 +1195,7 @@ dpr(Reason, #state{state = 'Open', Peer = {self(), Caps}, dpr(CBs, [Reason, Ref, Peer], S); -%% Connection is open, DPR already sent. +%% Connection is open, DPR already sent or received. dpr(_, #state{state = 'Open'}) -> ok; @@ -1034,7 +1210,7 @@ dpr(_Reason, _S) -> %% process and contact it. (eg. diameter:service_info/2) dpr([CB|Rest], [Reason | _] = Args, S) -> - try diameter_lib:eval([CB | Args]) of + case diameter_lib:eval([CB | Args]) of {dpr, Opts} when is_list(Opts) -> send_dpr(Reason, Opts, S); dpr -> @@ -1044,23 +1220,15 @@ dpr([CB|Rest], [Reason | _] = Args, S) -> ignore -> dpr(Rest, Args, S); T -> - No = {disconnect_cb, T}, - diameter_lib:error_report(invalid, No), - {stop, No} - catch - E:R -> - No = {disconnect_cb, E, R, ?STACK}, - diameter_lib:error_report(failure, No), - {stop, No} + ?ERROR({disconnect_cb, CB, Args, T}) end; dpr([], [Reason | _], S) -> send_dpr(Reason, [], S). --record(opts, {cause, timeout = ?DPA_TIMEOUT}). +-record(opts, {cause, timeout}). -send_dpr(Reason, Opts, #state{transport = TPid, - dictionary = Dict, +send_dpr(Reason, Opts, #state{dictionary = Dict, service = #diameter_service{capabilities = Caps}} = S) -> #opts{cause = Cause, timeout = Tmo} @@ -1069,23 +1237,37 @@ send_dpr(Reason, Opts, #state{transport = TPid, transport -> ?GOAWAY; _ -> ?REBOOT end, - timeout = ?DPA_TIMEOUT}, + timeout = dpa_timeout()}, Opts), #diameter_caps{origin_host = {OH, _}, origin_realm = {OR, _}} = Caps, - #diameter_packet{header = #diameter_header{end_to_end_id = Eid, - hop_by_hop_id = Hid}} - = Pkt - = encode(['DPR', {'Origin-Host', OH}, + Pkt = encode(['DPR', {'Origin-Host', OH}, {'Origin-Realm', OR}, {'Disconnect-Cause', Cause}], Dict), + send_dpr(false, Pkt, Tmo, S). + +%% send_dpr/4 + +send_dpr(X, + #diameter_packet{header = #diameter_header{end_to_end_id = Eid, + hop_by_hop_id = Hid}} + = Pkt, + Tmo, + #state{transport = TPid, + dictionary = Dict} + = S) -> + %% Only count DPR sent by the service: explicit DPR is counted in + %% the same way as other explicitly sent requests. + X orelse incr(send, Pkt, Dict), send(TPid, Pkt), dpa_timer(Tmo), ?LOG(send, 'DPR'), - S#state{dpr = {Hid, Eid}}. + S#state{dpr = {X, Hid, Eid}}. + +%% opt/2 opt({timeout, Tmo}, Rec) when ?IS_TIMEOUT(Tmo) -> @@ -1108,6 +1290,17 @@ cause(N) -> dpa_timer(Tmo) -> erlang:send_after(Tmo, self(), dpa_timeout). +dpa_timeout() -> + {_, Tmo} = getr(?DPA_KEY), + Tmo. + +dpr_timer() -> + dpa_timer(dpr_timeout()). + +dpr_timeout() -> + {Tmo, _} = getr(?DPA_KEY), + Tmo. + %% register_everywhere/1 %% %% Register a term and ensure it's not registered elsewhere. Note that diff --git a/lib/diameter/src/base/diameter_peer_fsm_sup.erl b/lib/diameter/src/base/diameter_peer_fsm_sup.erl index 995eaf74d0..54bd06929d 100644 --- a/lib/diameter/src/base/diameter_peer_fsm_sup.erl +++ b/lib/diameter/src/base/diameter_peer_fsm_sup.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2011. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 3197c1aee1..7f198080ba 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -22,10 +23,10 @@ %% -module(diameter_reg). --compile({no_auto_import, [monitor/2]}). - -behaviour(gen_server). +-compile({no_auto_import, [monitor/2]}). + -export([add/1, add_new/1, del/1, @@ -66,7 +67,7 @@ %% Table entry containing the Term -> Pid mapping. -define(MAPPING(Term, Pid), {Term, Pid}). --record(state, {id = now(), +-record(state, {id = diameter_lib:now(), q = []}). %% [{From, Pat}] %% =========================================================================== diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 8914992f17..87ef2e522d 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2014. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -82,16 +83,8 @@ -define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1 -define(RESTART_TC, 1000). %% if restart was this recent -%% Used to be able to swap this with anything else dict-like but now -%% rely on the fact that a service's #state{} record does not change -%% in storing in it ?STATE table and not always going through the -%% service process. In particular, rely on the fact that operations on -%% a ?Dict don't change the handle to it. --define(Dict, diameter_dict). - -%% Maintains state in a table. In contrast to previously, a service's -%% stat is not constant and is accessed outside of the service -%% process. +%% Maintain state in a table since a service's state is accessed +%% outside of the service process. -define(STATE_TABLE, ?MODULE). %% The default sequence mask. @@ -111,23 +104,25 @@ %% to determine whether or not we need to call the process for a %% pick_peer callback in the statefull case. -record(state, - {id = now(), + {id = diameter_lib:now(), service_name :: diameter:service_name(), %% key in ?STATE_TABLE service :: #diameter_service{}, watchdogT = ets_new(watchdogs) %% #watchdog{} at start :: ets:tid(), - peerT = ets_new(peers) %% #peer{pid = TPid} at okay/reopen - :: ets:tid(), - shared_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...] - :: ets:tid(), - local_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...] - :: ets:tid(), + peerT, %% undefined in new code, but remain for upgrade + shared_peers, %% reasons. Replaced by local/remote. + local_peers, %% + local :: {ets:tid(), ets:tid(), ets:tid()}, + remote :: {ets:tid(), ets:tid(), ets:tid()}, monitor = false :: false | pid(), %% process to die with options :: [{sequence, diameter:sequence()} %% sequence mask | {share_peers, diameter:remotes()} %% broadcast to | {use_shared_peers, diameter:remotes()} %% use from - | {restrict_connections, diameter:restriction()}]}). + | {restrict_connections, diameter:restriction()} + | {strict_mbit, boolean()} + | {string_decode, boolean()} + | {incoming_maxlen, diameter:message_length()}]}). %% shared_peers reflects the peers broadcast from remote nodes. %% Record representing an RFC 3539 watchdog process implemented by @@ -138,18 +133,20 @@ ref :: match(reference()), %% key into diameter_config options :: match([diameter:transport_opt()]),%% from start_transport state = ?WD_INITIAL :: match(wd_state()), - started = now(), %% at process start + started = diameter_lib:now(),%% at process start peer = false :: match(boolean() | pid())}). %% true at accepted, pid() at okay/reopen -%% Record representing an Peer State Machine processes implemented by +%% Record representing a Peer State Machine processes implemented by %% diameter_peer_fsm. -record(peer, - {pid :: pid(), - apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias} - caps :: #diameter_caps{}, - started = now(), %% at process start - watchdog :: pid()}). %% key into watchdogT + {pid :: pid(), + apps :: match([{0..16#FFFFFFFF, diameter:app_alias()}] %% {Id, Alias} + | [diameter:app_alias()]), %% remote + caps :: match(#diameter_caps{}), + started = diameter_lib:now(), %% at process start or sharing + watchdog :: match(pid() %% key into watchdogT + | undefined)}). %% undefined if remote %% --------------------------------------------------------------------------- %% # start/1 @@ -177,7 +174,7 @@ stop(SvcName) -> end. stop(ok, Pid) -> - MRef = erlang:monitor(process, Pid), + MRef = monitor(process, Pid), receive {'DOWN', MRef, process, _, _} -> ok end; stop(No, _) -> No. @@ -204,7 +201,7 @@ stop_transport(SvcName, [_|_] = Refs) -> info(SvcName, Item) -> case lookup_state(SvcName) of - [#state{} = S] -> + [S] -> service_info(Item, S); [] -> undefined @@ -213,7 +210,12 @@ info(SvcName, Item) -> %% lookup_state/1 lookup_state(SvcName) -> - ets:lookup(?STATE_TABLE, SvcName). + case ets:lookup(?STATE_TABLE, SvcName) of + [#state{}] = L -> + L; + _ -> + [] + end. %% --------------------------------------------------------------------------- %% # subscribe/1 @@ -258,16 +260,22 @@ whois(SvcName) -> %% --------------------------------------------------------------------------- -spec pick_peer(SvcName, AppOrAlias, Opts) - -> {{TPid, Caps, App}, Mask} - | false - | {error, term()} + -> {{TPid, Caps, App}, Mask, SvcOpts} + | false %% no selection + | {error, no_service} when SvcName :: diameter:service_name(), - AppOrAlias :: {alias, diameter:app_alias()} | #diameter_app{}, - Opts :: tuple(), + AppOrAlias :: #diameter_app{} + | {alias, diameter:app_alias()}, + Opts :: {fun((Dict :: module()) -> [term()]), + diameter:peer_filter(), + Xtra :: list()}, TPid :: pid(), Caps :: #diameter_caps{}, App :: #diameter_app{}, - Mask :: diameter:sequence(). + Mask :: diameter:sequence(), + SvcOpts :: [diameter:service_opt()]. +%% Extract Mask in the returned tuple so that diameter_traffic doesn't +%% need to know about the ordering of SvcOpts used here. pick_peer(SvcName, App, Opts) -> pick(lookup_state(SvcName), App, Opts). @@ -284,10 +292,10 @@ pick(#state{service = #diameter_service{applications = Apps}} Opts) -> %% initial call from diameter:call/4 pick(S, find_outgoing_app(Alias, Apps), Opts); -pick(_, false, _) -> - false; +pick(_, false = No, _) -> + No; -pick(#state{options = [{_, Mask} | _]} +pick(#state{options = [{_, Mask} | SvcOpts]} = S, #diameter_app{module = ModX, dictionary = Dict} = App0, @@ -296,7 +304,7 @@ pick(#state{options = [{_, Mask} | _]} [_,_] = RealmAndHost = diameter_lib:eval([DestF, Dict]), case pick_peer(App, RealmAndHost, Filter, S) of {TPid, Caps} -> - {{TPid, Caps, App}, Mask}; + {{TPid, Caps, App}, Mask, SvcOpts}; false = No -> No end. @@ -480,6 +488,9 @@ transition({service, Pid}, S) -> transition({peer, TPid, Aliases, Caps}, S) -> remote_peer_up(TPid, Aliases, Caps, S), ok; +transition({peer, TPid}, S) -> + remote_peer_down(TPid, S), + ok; %% Remote peer process has died. transition({'DOWN', _, process, TPid, _}, S) -> @@ -499,9 +510,21 @@ transition(Req, S) -> %% # terminate/2 %% --------------------------------------------------------------------------- -terminate(Reason, #state{service_name = Name} = S) -> +terminate(Reason, #state{service_name = Name, local = {PeerT, _, _}} = S) -> send_event(Name, stop), ets:delete(?STATE_TABLE, Name), + + %% Communicate pending loss of any peers that connection_down/3 + %% won't. This is needed when stopping a service since we don't + %% wait for watchdog state changes to take care of if. That this + %% takes place after deleting the state entry ensures that the + %% resulting failover by request processes accomplishes nothing. + ets:foldl(fun(#peer{pid = TPid}, _) -> + diameter_traffic:peer_down(TPid) + end, + ok, + PeerT), + shutdown == Reason %% application shutdown andalso shutdown(application, S). @@ -509,23 +532,80 @@ terminate(Reason, #state{service_name = Name} = S) -> %% # code_change/3 %% --------------------------------------------------------------------------- -code_change(FromVsn, - #state{service_name = SvcName, - service = #diameter_service{applications = Apps}} - = S, - Extra) -> - lists:foreach(fun(A) -> - code_change(FromVsn, SvcName, Extra, A) - end, - Apps), +code_change(_FromVsn, #state{} = S, _Extra) -> + {ok, S}; + +%% Don't support downgrade since we won't in appup. +code_change({down = T, _}, _, _Extra) -> + {error, T}; + +%% Upgrade local/shared peers dicts populated in old code. Don't +code_change(_FromVsn, S0, _Extra) -> + {state, Id, SvcName, Svc, WT, PeerT, SDict, LDict, Monitor, Opts} + = S0, + + init_peers(LT = setelement(1, {PT, _, _} = init_peers(), PeerT), + fun({_,A}) -> A end), + init_peers(init_peers(RT = init_peers(), SDict), + fun(A) -> A end), + + S = #state{id = Id, + service_name = SvcName, + service = Svc, + watchdogT = WT, + peerT = PT, %% empty + shared_peers = SDict, + local_peers = LDict, + local = LT, + remote = RT, + monitor = Monitor, + options = Opts}, + + %% Replacing the table entry and deleting the old shared tables + %% can make outgoing requests return {error, no_connection} until + %% everyone is running new code. Don't delete the tables to avoid + %% crashing request processes. + ets:delete_all_objects(SDict), + ets:delete_all_objects(LDict), + ets:insert(?STATE_TABLE, S), {ok, S}. -code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) -> - {ok, S} = cb(A, code_change, [FromVsn, - mod_state(Alias), - Extra, - SvcName]), - mod_state(Alias, S). +%% init_peers/2 + +%% Populate app and identity bags from a new-style #peer{} sets. +init_peers({PeerT, _, _} = T, F) + when is_function(F) -> + ets:foldl(fun(#peer{pid = P, apps = As, caps = C}, N) -> + insert_peer(P, lists:map(F, As), C, T), + N+1 + end, + 0, + PeerT); + +%% Populate #peer{} table given a shared peers dict. +init_peers({PeerT, _, _}, SDict) -> + dict:fold(fun(P, As, N) -> + ets:update_element(PeerT, P, {#peer.apps, As}), + N+1 + end, + 0, + diameter_dict:fold(fun(A, Ps, D) -> + init_peers(A, Ps, PeerT, D) + end, + dict:new(), + SDict)). + +%% init_peers/4 + +init_peers(App, TCs, PeerT, Dict) -> + lists:foldl(fun({P,C}, D) -> + ets:insert(PeerT, #peer{pid = P, + apps = [], + caps = C}), + dict:append(P, App, D) + end, + Dict, + TCs). %% =========================================================================== %% =========================================================================== @@ -533,9 +613,6 @@ code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) -> unexpected(F, A, #state{service_name = Name}) -> ?UNEXPECTED(F, A ++ [Name]). -cb(#diameter_app{module = [_|_] = M}, F, A) -> - eval(M, F, A). - eval([M|X], F, A) -> apply(M, F, A ++ X). @@ -555,10 +632,6 @@ choose(false, _, X) -> X. ets_new(Tbl) -> ets:new(Tbl, [{keypos, 2}]). -insert(Tbl, Rec) -> - ets:insert(Tbl, Rec), - Rec. - %% Using the process dictionary for the callback state was initially %% just a way to make what was horrendous trace (big state record and %% much else everywhere) somewhat more readable. There's not as much @@ -598,8 +671,9 @@ st(#watchdog{ref = Ref, pid = Pid}, Refs) -> %% st/3 st(#watchdog{pid = Pid}, Reason, Acc) -> + MRef = monitor(process, Pid), Pid ! {shutdown, self(), Reason}, - [Pid | Acc]. + [MRef | Acc]. %% --------------------------------------------------------------------------- %% # call_service/2 @@ -658,6 +732,8 @@ cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts}, lists:foreach(fun init_mod/1, Apps), S = #state{service_name = SvcName, service = Rec#diameter_service{pid = self()}, + local = init_peers(), + remote = init_peers(), monitor = mref(get_value(monitor, Opts)), options = service_options(Opts)}, {S, Acc}; @@ -667,6 +743,13 @@ cfg_acc({_Ref, Type, _Opts} = T, {S, Acc}) Type == listen -> {S, [T | Acc]}. +init_peers() -> + {ets_new(caps), %% #peer{} + ets:new(apps, [bag]), %% {Alias, TPid} + ets:new(idents, [bag])}. %% {{host, OH} | {realm, OR} | {OR, OH}, + %% Alias, + %% TPid} + service_options(Opts) -> [{sequence, proplists:get_value(sequence, Opts, ?NOMASK)}, {share_peers, get_value(share_peers, Opts)}, @@ -674,13 +757,16 @@ service_options(Opts) -> {restrict_connections, proplists:get_value(restrict_connections, Opts, ?RESTRICT)}, - {spawn_opt, proplists:get_value(spawn_opt, Opts, [])}]. + {spawn_opt, proplists:get_value(spawn_opt, Opts, [])}, + {string_decode, proplists:get_value(string_decode, Opts, true)}, + {incoming_maxlen, proplists:get_value(incoming_maxlen, Opts, 16#FFFFFF)}, + {strict_mbit, proplists:get_value(strict_mbit, Opts, true)}]. %% The order of options is significant since we match against the list. mref(false = No) -> No; mref(P) -> - erlang:monitor(process, P). + monitor(process, P). init_shared(#state{options = [_, _, {_,T} | _], service_name = Svc}) -> @@ -719,14 +805,27 @@ remotes(F) -> L when is_list(L) -> L; T -> - diameter_lib:error_report({invalid_return, T}, F), + ?LOG(invalid_return, {F,T}), + error_report(invalid_return, share_peers, F), [] catch E:R -> - diameter_lib:error_report({failure, {E, R, ?STACK}}, F), + ?LOG(failure, {E, R, F, diameter_lib:get_stacktrace()}), + error_report(failure, share_peers, F), [] end. +%% error_report/3 + +error_report(T, What, F) -> + Reason = io_lib:format("~s from ~p callback", [reason(T), What]), + diameter_lib:error_report(Reason, diameter_lib:eval_name(F)). + +reason(invalid_return) -> + "invalid return"; +reason(failure) -> + "failure". + %% --------------------------------------------------------------------------- %% # start/3 %% --------------------------------------------------------------------------- @@ -740,8 +839,9 @@ remotes(F) -> start(Ref, {T, Opts}, S) when T == connect; T == listen -> + N = proplists:get_value(pool_size, Opts, 1), try - {ok, start(Ref, type(T), Opts, S)} + {ok, start(Ref, type(T), Opts, N, S)} catch ?FAILURE(Reason) -> {error, Reason} @@ -759,26 +859,44 @@ type(connect = T) -> T. %% start/4 -start(Ref, Type, Opts, #state{watchdogT = WatchdogT, - peerT = PeerT, - options = SvcOpts, - service_name = SvcName, - service = Svc0}) +start(Ref, Type, Opts, State) -> + start(Ref, Type, Opts, 1, State). + +%% start/5 + +start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT, + local = {PeerT, _, _}, + options = SvcOpts, + service_name = SvcName, + service = Svc0}) when Type == connect; Type == accept -> #diameter_service{applications = Apps} - = Svc + = Svc1 = merge_service(Opts, Svc0), - {_,_} = Mask = proplists:get_value(sequence, SvcOpts), - RecvData = diameter_traffic:make_recvdata([SvcName, PeerT, Apps, Mask]), - Pid = s(Type, Ref, {{spawn_opts([Opts, SvcOpts]), RecvData}, - Opts, - SvcOpts, - Svc}), - insert(WatchdogT, #watchdog{pid = Pid, - type = Type, - ref = Ref, - options = Opts}), + Svc = binary_caps(Svc1, proplists:get_value(string_decode, SvcOpts, true)), + RecvData = diameter_traffic:make_recvdata([SvcName, + PeerT, + Apps, + SvcOpts]), + T = {{spawn_opts([Opts, SvcOpts]), RecvData}, Opts, SvcOpts, Svc}, + Rec = #watchdog{type = Type, + ref = Ref, + options = Opts}, + diameter_lib:fold_n(fun(_,A) -> + [wd(Type, Ref, T, WatchdogT, Rec) | A] + end, + [], + N). + +binary_caps(Svc, true) -> + Svc; +binary_caps(#diameter_service{capabilities = Caps} = Svc, false) -> + Svc#diameter_service{capabilities = diameter_capx:binary_caps(Caps)}. + +wd(Type, Ref, T, WatchdogT, Rec) -> + Pid = start_watchdog(Type, Ref, T), + ets:insert(WatchdogT, Rec#watchdog{pid = Pid}), Pid. %% Note that the service record passed into the watchdog is the merged @@ -791,7 +909,7 @@ spawn_opts(Optss) -> T /= link, T /= monitor]. -s(Type, Ref, T) -> +start_watchdog(Type, Ref, T) -> {_MRef, Pid} = diameter_watchdog:start({Type, Ref}, T), Pid. @@ -812,7 +930,7 @@ ms({applications, As}, #diameter_service{applications = Apps} = S) %% The fact that all capabilities can be configured on the transports %% means that the service doesn't necessarily represent a single -%% locally implemented Diameter peer as identified by Origin-Host: a +%% locally implemented Diameter node as identified by Origin-Host: a %% transport can configure its own Origin-Host. This means that the %% service little more than a placeholder for default capabilities %% plus a list of applications that individual transports can choose @@ -835,8 +953,8 @@ accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) -> #watchdog{ref = Ref, type = accept = T, peer = false, options = Opts} = Wd = fetch(WatchdogT, Pid), - insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement as started - start(Ref, T, Opts, S). %% start new watchdog + ets:insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement started + start(Ref, T, Opts, S). %% start new watchdog fetch(Tid, Key) -> [T] = ets:lookup(Tid, Key), @@ -862,14 +980,15 @@ watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) -> #watchdog{peer = TPid} = Wd, %% assert connection_up(Wd, State); -%% Watchdog has an unresponsive connection. +%% Watchdog has an unresponsive connection. Note that the peer table +%% entry isn't removed until DOWN. watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) -> #watchdog{peer = TPid} = Wd, %% assert watchdog_down(Wd, To, State); %% Watchdog has lost its connection. -watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) -> - close(Wd, S), +watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{local = {PeerT, _, _}} = S) -> + close(Wd), watchdog_down(Wd, To, S), ets:delete(PeerT, TPid); @@ -877,7 +996,7 @@ watchdog(_, [], _, _, _, _) -> ok. watchdog_down(Wd, To, #state{watchdogT = WatchdogT} = S) -> - insert(WatchdogT, Wd#watchdog{state = To}), + ets:insert(WatchdogT, Wd#watchdog{state = To}), connection_down(Wd, To, S). %% --------------------------------------------------------------------------- @@ -889,14 +1008,14 @@ watchdog_down(Wd, To, #state{watchdogT = WatchdogT} = S) -> connection_up({TPid, {Caps, SupportedApps, Pkt}}, #watchdog{pid = Pid} = Wd, - #state{peerT = PeerT} + #state{local = {PeerT, _, _}} = S) -> - Pr = #peer{pid = TPid, - apps = SupportedApps, - caps = Caps, - watchdog = Pid}, - insert(PeerT, Pr), - connection_up([Pkt], Wd#watchdog{peer = TPid}, Pr, S). + Rec = #peer{pid = TPid, + apps = SupportedApps, + caps = Caps, + watchdog = Pid}, + ets:insert(PeerT, Rec), + connection_up([Pkt], Wd#watchdog{peer = TPid}, Rec, S). %% --------------------------------------------------------------------------- %% # reopen/3 @@ -906,22 +1025,23 @@ reopen({TPid, {Caps, SupportedApps, _Pkt}}, #watchdog{pid = Pid} = Wd, #state{watchdogT = WatchdogT, - peerT = PeerT}) -> - insert(PeerT, #peer{pid = TPid, - apps = SupportedApps, - caps = Caps, - watchdog = Pid}), - insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN, - peer = TPid}). + local = {PeerT, _, _}}) -> + ets:insert(PeerT, #peer{pid = TPid, + apps = SupportedApps, + caps = Caps, + watchdog = Pid}), + ets:insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN, + peer = TPid}). %% --------------------------------------------------------------------------- %% # connection_up/2 %% --------------------------------------------------------------------------- -%% Watchdog has recovered as suspect connection. Note that there has +%% Watchdog has recovered a suspect connection. Note that there has %% been no new capabilties exchange in this case. -connection_up(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> +connection_up(#watchdog{peer = TPid} = Wd, #state{local = {PeerT, _, _}} + = S) -> connection_up([], Wd, fetch(PeerT, TPid), S). %% connection_up/4 @@ -932,23 +1052,23 @@ connection_up(Extra, #peer{apps = SApps, caps = Caps} = Pr, #state{watchdogT = WatchdogT, - local_peers = LDict, + local = LT, service_name = SvcName, service = #diameter_service{applications = Apps}} = S) -> - insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}), + ets:insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}), diameter_traffic:peer_up(TPid), - insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), + local_peer_up(SApps, {TPid, Caps}, {SvcName, Apps}, LT), report_status(up, Wd, Pr, S, Extra). -insert_local_peer(SApps, T, LDict) -> - lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps). +local_peer_up(SApps, {TPid, Caps} = TC, SA, LT) -> + insert_peer(TPid, [A || {_,A} <- SApps], Caps, LT), + lists:foreach(fun(A) -> peer_up(A, TC, SA) end, SApps). -ilp({Id, Alias}, {TC, SA}, LDict) -> - init_conn(Id, Alias, TC, SA), - ?Dict:append(Alias, TC, LDict). +peer_up({Id, Alias}, TC, SA) -> + peer_up(Id, Alias, TC, SA). -init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) -> +peer_up(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) -> #diameter_app{id = Id} %% assert = App = find_app(Alias, Apps), @@ -1026,8 +1146,11 @@ peer_cb(App, F, A) -> true catch E:R -> - diameter_lib:error_report({failure, {E, R, ?STACK}}, - {App, F, A}), + %% Don't include arguments since a #diameter_caps{} strings + %% from the peer, which could be anything (especially, large). + [Mod|X] = App#diameter_app.module, + ?LOG(failure, {E, R, Mod, F, diameter_lib:get_stacktrace()}), + error_report(failure, F, {Mod, F, A ++ X}), false end. @@ -1043,17 +1166,17 @@ connection_down(#watchdog{state = ?WD_OKAY, = Pr, #state{service_name = SvcName, service = #diameter_service{applications = Apps}, - local_peers = LDict} + local = LT} = S) -> report_status(down, Wd, Pr, S, []), - remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), + local_peer_down(SApps, {TPid, Caps}, {SvcName, Apps}, LT), diameter_traffic:peer_down(TPid); connection_down(#watchdog{state = ?WD_OKAY, peer = TPid} = Wd, To, - #state{peerT = PeerT} + #state{local = {PeerT, _, _}} = S) when is_atom(To) -> connection_down(Wd, #peer{} = fetch(PeerT, TPid), S); @@ -1061,15 +1184,14 @@ connection_down(#watchdog{state = ?WD_OKAY, connection_down(#watchdog{}, _, _) -> ok. -remove_local_peer(SApps, T, LDict) -> - lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). +local_peer_down(SApps, {TPid, _Caps} = TC, SA, LT) -> + delete_peer(TPid, LT), + lists:foreach(fun(A) -> peer_down(A, TC, SA) end, SApps). -rlp({Id, Alias}, {TC, SA}, LDict) -> - L = ?Dict:fetch(Alias, LDict), - down_conn(Id, Alias, TC, SA), - ?Dict:store(Alias, lists:delete(TC, L), LDict). +peer_down({Id, Alias}, TC, SA) -> + peer_down(Id, Alias, TC, SA). -down_conn(Id, Alias, TC, {SvcName, Apps}) -> +peer_down(Id, Alias, TC, {SvcName, Apps}) -> #diameter_app{id = Id} %% assert = App = find_app(Alias, Apps), @@ -1094,7 +1216,7 @@ wd_down(#watchdog{peer = B}, _) ok; %% ... or maybe it has. -wd_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> +wd_down(#watchdog{peer = TPid} = Wd, #state{local = {PeerT, _, _}} = S) -> connection_down(Wd, ?WD_DOWN, S), ets:delete(PeerT, TPid). @@ -1157,7 +1279,7 @@ connect_timer(Opts, Def0) -> %% continuous restarted in case of faulty config or other problems. tc(Time, Tc) -> choose(Tc > ?RESTART_TC - orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC, + orelse diameter_lib:micro_diff(Time) > 1000*?RESTART_TC, Tc, ?RESTART_TC). @@ -1187,26 +1309,16 @@ tc(false = No, _, _) -> %% removed %% another watchdog to be able to detect that it should transition %% from initial into reopen rather than okay. That someone is either %% the accepting watchdog upon reception of a CER from the previously -%% connected peer, or us after connect_timer timeout. +%% connected peer, or us after connect_timer timeout or immediately. -close(#watchdog{type = connect}, _) -> +close(#watchdog{type = connect}) -> ok; + close(#watchdog{type = accept, pid = Pid, - ref = Ref, - options = Opts}, - #state{service_name = SvcName}) -> - c(Pid, diameter_config:have_transport(SvcName, Ref), Opts). - -%% Tell watchdog to (maybe) die later ... -c(Pid, true, Opts) -> + options = Opts}) -> Tc = connect_timer(Opts, 2*?DEFAULT_TC), - erlang:send_after(Tc, Pid, close); - -%% ... or now. -c(Pid, false, _Opts) -> - Pid ! close. - + erlang:send_after(Tc, Pid, close). %% The RFC's only document the behaviour of Tc, our connect_timer, %% for the establishment of connections but we also give %% connect_timer semantics for a listener, being the time within @@ -1260,13 +1372,14 @@ cm([#diameter_app{alias = Alias} = App], Req, From, Svc) -> mod_state(Alias, ModS), {T, RC}; T -> - diameter_lib:error_report({invalid, T}, - {App, handle_call, Args}), + ModX = App#diameter_app.module, + ?LOG(invalid_return, {ModX, handle_call, Args, T}), invalid catch E: Reason -> - diameter_lib:error_report({failure, {E, Reason, ?STACK}}, - {App, handle_call, Args}), + ModX = App#diameter_app.module, + Stack = diameter_lib:get_stacktrace(), + ?LOG(failure, {E, Reason, ModX, handle_call, Stack}), failure end; @@ -1311,19 +1424,37 @@ share_peer(up, Caps, Apps, TPid, #state{options = [_, {_,T} | _], service_name = Svc}) -> notify(T, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps}); -share_peer(_, _, _, _, _) -> - ok. +share_peer(down, _Caps, _Apps, TPid, #state{options = [_, {_,T} | _], + service_name = Svc}) -> + notify(T, Svc, {peer, TPid}). %% --------------------------------------------------------------------------- %% # share_peers/2 %% --------------------------------------------------------------------------- -share_peers(Pid, #state{options = [_, {_,T} | _], local_peers = PDict}) -> - is_remote(Pid, T) - andalso ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict). - -sp(Pid, Alias, Peers) -> - lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers). +share_peers(Pid, #state{options = [_, {_,SP} | _], + local = {PeerT, AppT, _}}) -> + is_remote(Pid, SP) + andalso ets:foldl(fun(T, N) -> N + sp(Pid, AppT, T) end, + 0, + PeerT). + +%% An entry in the peer table doesn't mean the watchdog state is OKAY, +%% an entry in the app table does. + +sp(Pid, AppT, #peer{pid = TPid, + apps = [{_, Alias} | _] = Apps, + caps = Caps}) -> + Spec = [{{'$1', TPid}, + [{'==', '$1', {const, Alias}}], + ['$_']}], + case ets:select(AppT, Spec, 1) of + '$end_of_table' -> + 0; + _ -> + Pid ! {peer, TPid, [A || {_,A} <- Apps], Caps}, + 1 + end. is_remote(Pid, T) -> Node = node(Pid), @@ -1333,32 +1464,49 @@ is_remote(Pid, T) -> %% # remote_peer_up/4 %% --------------------------------------------------------------------------- -remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) -> - is_remote(Pid, T) - andalso rpu(Pid, Aliases, Caps, S). +remote_peer_up(TPid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) -> + is_remote(TPid, T) andalso rpu(TPid, Aliases, Caps, S). -rpu(Pid, Aliases, Caps, #state{service = Svc, shared_peers = PDict}) -> +rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) -> #diameter_service{applications = Apps} = Svc, Key = #diameter_app.alias, F = fun(A) -> lists:keymember(A, Key, Apps) end, - rpu(Pid, lists:filter(F, Aliases), Caps, PDict); + rpu(TPid, lists:filter(F, Aliases), Caps, RT); rpu(_, [] = No, _, _) -> No; -rpu(Pid, Aliases, Caps, PDict) -> - erlang:monitor(process, Pid), - T = {Pid, Caps}, - lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases). +rpu(TPid, Aliases, Caps, {PeerT, _, _} = RT) -> + monitor(process, TPid), + ets:insert(PeerT, #peer{pid = TPid, + apps = Aliases, + caps = Caps}), + insert_peer(TPid, Aliases, Caps, RT). + +%% insert_peer/4 + +insert_peer(TPid, Aliases, Caps, {_PeerT, AppT, IdentT}) -> + #diameter_caps{origin_host = {_, OH}, + origin_realm = {_, OR}} + = Caps, + ets:insert(AppT, [{A, TPid} || A <- Aliases]), + H = iolist_to_binary(OH), + R = iolist_to_binary(OR), + ets:insert(IdentT, [{T, A, TPid} || T <- [{host, H}, {realm, R}, {R, H}], + A <- Aliases]). %% --------------------------------------------------------------------------- %% # remote_peer_down/2 %% --------------------------------------------------------------------------- -remote_peer_down(Pid, #state{shared_peers = PDict}) -> - lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)). +remote_peer_down(TPid, #state{remote = {PeerT, _, _} = RT}) -> + ets:delete(PeerT, TPid), + delete_peer(TPid, RT). + +%% delete_peer/2 -rpd(Pid, Alias, PDict) -> - ?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict). +delete_peer(TPid, {_PeerT, AppT, IdentT}) -> + ets:select_delete(AppT, [{{'_', TPid}, [], [true]}]), + ets:select_delete(IdentT, [{{'_', '_', TPid}, [], [true]}]). %% --------------------------------------------------------------------------- %% pick_peer/4 @@ -1368,12 +1516,12 @@ pick_peer(#diameter_app{alias = Alias} = App, RealmAndHost, Filter, - #state{local_peers = L, - shared_peers = S, + #state{local = LT, + remote = RT, service_name = SvcName, service = #diameter_service{pid = Pid}}) -> - pick_peer(peers(Alias, RealmAndHost, Filter, L), - peers(Alias, RealmAndHost, Filter, S), + pick_peer(peers(Alias, RealmAndHost, Filter, LT), + peers(Alias, RealmAndHost, Filter, RT), Pid, SvcName, App). @@ -1424,56 +1572,211 @@ pick_peer(Local, T; %% Accept returned state in the immutable {false = No, S} -> %% case as long it isn't changed. No; - T -> - diameter_lib:error_report({invalid, T, App}, - {App, pick_peer, Args}) + T when M -> + ModX = App#diameter_app.module, + ?LOG(invalid_return, {ModX, pick_peer, T}), + false catch - E: Reason -> - diameter_lib:error_report({failure, {E, Reason, ?STACK}}, - {App, pick_peer, Args}) + E: Reason when M -> + ModX = App#diameter_app.module, + Stack = diameter_lib:get_stacktrace(), + ?LOG(failure, {E, Reason, ModX, pick_peer, Stack}), + false end. %% peers/4 -peers(Alias, RH, Filter, Peers) -> - case ?Dict:find(Alias, Peers) of - {ok, L} -> - ps(L, RH, Filter, {[],[]}); - error -> +peers(Alias, RH, Filter, T) -> + filter(Alias, RH, Filter, T, true). + +%% filter/5 +%% +%% Try to limit the peers list by starting with a host/realm lookup. + +filter(Alias, RH, {neg, F}, T, B) -> + filter(Alias, RH, F, T, not B); + +filter(_, _, none, _, false) -> + []; + +filter(Alias, _, none, T, true) -> + all_peers(Alias, T); + +filter(Alias, [DR,DH] = RH, K, T, B) + when K == realm, DR == undefined; + K == host, DH == undefined -> + filter(Alias, RH, none, T, B); + +filter(Alias, [DR,_] = RH, realm = K, T, B) -> + filter(Alias, RH, {K, DR}, T, B); + +filter(Alias, [_,DH] = RH, host = K, T, B) -> + filter(Alias, RH, {K, DH}, T, B); + +filter(Alias, _, {K, D}, {PeerT, _AppT, IdentT}, true) + when K == host; + K == realm -> + try iolist_to_binary(D) of + B -> + caps(PeerT, ets:select(IdentT, [{{{K, B}, '$1', '$2'}, + [{'==', '$1', {const, Alias}}], + ['$2']}])) + catch + error:_ -> [] - end. + end; -%% Place a peer whose Destination-Host/Realm matches those of the -%% request at the front of the result list. Could add some sort of -%% 'sort' option to allow more control. - -ps([], _, _, {Ys, Ns}) -> - lists:reverse(Ys, Ns); -ps([{_TPid, #diameter_caps{} = Caps} = TC | Rest], RH, Filter, Acc) -> - ps(Rest, RH, Filter, pacc(caps_filter(Caps, RH, Filter), - caps_filter(Caps, RH, {all, [host, realm]}), - TC, - Acc)). - -pacc(true, true, Peer, {Ts, Fs}) -> - {[Peer|Ts], Fs}; -pacc(true, false, Peer, {Ts, Fs}) -> - {Ts, [Peer|Fs]}; -pacc(_, _, _, Acc) -> - Acc. +filter(Alias, RH, {all, Filters}, T, B) + when is_list(Filters) -> + fltr_all(Alias, RH, Filters, T, B); + +filter(Alias, RH, {first, Filters}, T, B) + when is_list(Filters) -> + fltr_first(Alias, RH, Filters, T, B); + +filter(Alias, RH, Filter, T, B) -> + {Ts, Fs} = filter(all_peers(Alias, T), RH, Filter), + choose(B, Ts, Fs). + +%% fltr_all/5 + +fltr_all(Alias, RH, [{K, any} | Filters], T, B) + when K == host; + K == realm -> + fltr_all(Alias, RH, Filters, T, B); + +fltr_all(Alias, RH, [{host, _} = H, {realm, _} = R | Filters], T, B) -> + fltr_all(Alias, RH, [R, H | Filters], T, B); + +fltr_all(Alias, RH, [{realm, _} = R, {host, any} | Filters], T, B) -> + fltr_all(Alias, RH, [R | Filters], T, B); + +fltr_all(Alias, RH, [{realm, OR}, {host, OH} | Filters], T, true) -> + {PeerT, _AppT, IdentT} = T, + try {iolist_to_binary(OR), iolist_to_binary(OH)} of + BT -> + Peers = caps(PeerT, + ets:select(IdentT, [{{BT, '$1', '$2'}, + [{'==', '$1', {const, Alias}}], + ['$2']}])), + {Ts, _} = filter(Peers, RH, {all, Filters}), + Ts + catch + error:_ -> + [] + end; -%% caps_filter/3 +fltr_all(Alias, [undefined,_] = RH, [realm | Filters], T, B) -> + fltr_all(Alias, RH, Filters, T, B); + +fltr_all(Alias, [DR,_] = RH, [realm | Filters], T, B) -> + fltr_all(Alias, RH, [{realm, DR} | Filters], T, B); + +fltr_all(Alias, [_,undefined] = RH, [host | Filters], T, B) -> + fltr_all(Alias, RH, Filters, T, B); + +fltr_all(Alias, [_,DH] = RH, [host | Filters], T, B) -> + fltr_all(Alias, RH, [{host, DH} | Filters], T, B); + +fltr_all(Alias, RH, [{K, _} = KT, KA | Filters], T, B) + when K == host, KA == realm; + K == realm, KA == host -> + fltr_all(Alias, RH, [KA, KT | Filters], T, B); + +fltr_all(Alias, RH, [F | Filters], T, B) -> + {Ts, Fs} = filter(filter(Alias, RH, F, T, B), RH, {all, Filters}), + choose(B, Ts, Fs); + +fltr_all(Alias, RH, [], T, B) -> + filter(Alias, RH, none, T, B). + +%% fltr_first/5 +%% +%% Like any, but stop at the first filter with any matches. + +fltr_first(Alias, RH, [F | Filters], T, B) -> + case filter(Alias, RH, F, T, B) of + [] -> + fltr_first(Alias, RH, Filters, T, B); + [_|_] = Ts -> + Ts + end; + +fltr_first(Alias, RH, [], T, B) -> + filter(Alias, RH, none, T, not B). + +%% all_peers/2 -caps_filter(C, RH, {neg, F}) -> - not caps_filter(C, RH, F); +all_peers(Alias, {PeerT, AppT, _}) -> + ets:select(PeerT, [{#peer{pid = P, caps = '$1', _ = '_'}, + [], + [{{P, '$1'}}]} + || {_,P} <- ets:lookup(AppT, Alias)]). -caps_filter(C, RH, {all, L}) +%% caps/2 + +caps(PeerT, Pids) -> + ets:select(PeerT, [{#peer{pid = P, caps = '$1', _ = '_'}, + [], + [{{P, '$1'}}]} + || P <- Pids]). + +%% filter/3 +%% +%% Return peers in match order. + +filter(Peers, _, none) -> + {Peers, []}; + +filter(Peers, RH, {neg, F}) -> + {Ts, Fs} = filter(Peers, RH, F), + {Fs, Ts}; + +filter(Peers, RH, {all, L}) + when is_list(L) -> + lists:foldl(fun(F,A) -> fltr_all(F, A, RH) end, + {Peers, []}, + L); + +filter(Peers, RH, {any, L}) when is_list(L) -> - lists:all(fun(F) -> caps_filter(C, RH, F) end, L); + lists:foldl(fun(F,A) -> fltr_any(F, A, RH) end, + {[], Peers}, + L); -caps_filter(C, RH, {any, L}) +filter(Peers, RH, {first, L}) when is_list(L) -> - lists:any(fun(F) -> caps_filter(C, RH, F) end, L); + fltr_first(Peers, RH, L); + +filter(Peers, RH, F) -> + lists:partition(fun({_,C}) -> caps_filter(C, RH, F) end, Peers). + +%% fltr_all/3 + +fltr_all(F, {Ts0, Fs0}, RH) -> + {Ts1, Fs1} = filter(Ts0, RH, F), + {Ts1, Fs0 ++ Fs1}. + +%% fltr_any/3 + +fltr_any(F, {Ts0, Fs0}, RH) -> + {Ts1, Fs1} = filter(Fs0, RH, F), + {Ts0 ++ Ts1, Fs1}. + +%% fltr_first/3 + +fltr_first(Peers, _, []) -> + {[], Peers}; + +fltr_first(Peers, RH, [F | Filters]) -> + case filter(Peers, RH, F) of + {[], _} -> + fltr_first(Peers, RH, Filters); + {_, _} = T -> + T + end. + +%% caps_filter/3 caps_filter(#diameter_caps{origin_host = {_,OH}}, [_,DH], host) -> eq(undefined, DH, OH); @@ -1486,9 +1789,6 @@ caps_filter(C, _, Filter) -> %% caps_filter/2 -caps_filter(_, none) -> - true; - caps_filter(#diameter_caps{origin_host = {_,OH}}, {host, H}) -> eq(any, H, OH); @@ -1519,8 +1819,8 @@ eq(Any, Id, PeerId) -> transports(#state{watchdogT = WatchdogT}) -> ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'}, - [{'is_pid', '$1'}], - ['$1']}]). + [{'is_pid', '$1'}], + ['$1']}]). %% --------------------------------------------------------------------------- %% # service_info/2 @@ -1551,7 +1851,8 @@ transports(#state{watchdogT = WatchdogT}) -> -define(OTHER_INFO, [connections, name, peers, - statistics]). + statistics, + info]). service_info(Item, S) when is_atom(Item) -> @@ -1572,7 +1873,7 @@ tagged_info(Item, S) undefined end; -tagged_info(TPid, #state{watchdogT = WatchdogT, peerT = PeerT}) +tagged_info(TPid, #state{watchdogT = WatchdogT, local = {PeerT, _, _}}) when is_pid(TPid) -> try [#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid), @@ -1641,6 +1942,7 @@ complete_info(Item, #state{service = Svc} = S) -> keys -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO; all -> service_info(?ALL_INFO, S); statistics -> info_stats(S); + info -> info_info(S); connections -> info_connections(S); peers -> info_peers(S) end. @@ -1687,31 +1989,43 @@ info_transport(S) -> [], PeerD). -%% Only a config entry for a listening transport: use it. -transport([[{type, listen}, _] = L]) -> - L ++ [{accept, []}]; - -%% Only one config or peer entry for a connecting transport: use it. -transport([[{type, connect} | _] = L]) -> - L; +%% Single config entry. Distinguish between pool_size config or not on +%% a connecting transport for backwards compatibility: with the option +%% the form is similar to the listening case, with connections grouped +%% in a pool tuple (for lack of a better name), without as before. +transport([[{type, Type}, {options, Opts}] = L]) + when Type == listen; + Type == connect -> + L ++ [{K, []} || [{_,K}] <- [keys(Type, Opts)]]; %% Peer entries: discard config. Note that the peer entries have %% length at least 3. transport([[_,_] | L]) -> transport(L); -%% Possibly many peer entries for a listening transport. Note that all -%% have the same options by construction, which is not terribly space -%% efficient. -transport([[{type, accept}, {options, Opts} | _] | _] = Ls) -> - [{type, listen}, +%% Multiple tranports. Note that all have the same options by +%% construction, which is not terribly space efficient. +transport([[{type, Type}, {options, Opts} | _] | _] = Ls) -> + transport(keys(Type, Opts), Ls). + +%% Group transports in an accept or pool tuple ... +transport([{Type, Key}], [[{type, _}, {options, Opts} | _] | _] = Ls) -> + [{type, Type}, {options, Opts}, - {accept, [lists:nthtail(2,L) || L <- Ls]}]. + {Key, [tl(tl(L)) || L <- Ls]}]; + +%% ... or not: there can only be one. +transport([], [L]) -> + L. + +keys(connect = T, Opts) -> + [{T, pool} || lists:keymember(pool_size, 1, Opts)]; +keys(_, _) -> + [{listen, accept}]. -peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) -> +peer_dict(#state{watchdogT = WatchdogT, local = {PeerT, _, _}}, Dict0) -> try ets:tab2list(WatchdogT) of - L -> - lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L) + L -> lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L) catch error: badarg -> Dict0 %% service has gone down end. @@ -1723,12 +2037,11 @@ peer_acc(PeerT, Acc, #watchdog{pid = Pid, state = WS, started = At, peer = TPid}) -> - dict:append(Ref, - [{type, Type}, - {options, Opts}, - {watchdog, {Pid, At, WS}} - | info_peer(PeerT, TPid, WS)], - Acc). + Info = [{type, Type}, + {options, Opts}, + {watchdog, {Pid, At, WS}} + | info_peer(PeerT, TPid, WS)], + dict:append(Ref, Info ++ [{info, info_process_info(Info)}], Acc). info_peer(PeerT, TPid, WS) when is_pid(TPid), WS /= ?WD_DOWN -> @@ -1740,6 +2053,49 @@ info_peer(PeerT, TPid, WS) info_peer(_, _, _) -> []. +info_process_info(Info) -> + lists:flatmap(fun ipi/1, Info). + +ipi({watchdog, {Pid, _, _}}) -> + info_pid(Pid); + +ipi({peer, {Pid, _}}) -> + info_pid(Pid); + +ipi({port, [{owner, Pid} | _]}) -> + info_pid(Pid); + +ipi(_) -> + []. + +info_pid(Pid) -> + case process_info(Pid, [message_queue_len, memory, binary]) of + undefined -> + []; + L -> + [{Pid, lists:map(fun({K,V}) -> {K, map_info(K,V)} end, L)}] + end. + +%% The binary list consists of 3-tuples {Ptr, Size, Count}, where Ptr +%% is a C pointer value, Size is the size of a referenced binary in +%% bytes, and Count is a global reference count. The same Ptr can +%% occur multiple times, once for each reference on the process heap. +%% In this case, the corresponding tuples will have Size in common but +%% Count may differ just because no global lock is taken when the +%% value is retrieved. +%% +%% The list can be quite large, and we aren't often interested in the +%% pointers or counts, so whittle this down to the number of binaries +%% referenced and their total byte count. +map_info(binary, L) -> + SzD = lists:foldl(fun({P,S,_}, D) -> dict:store(P,S,D) end, + dict:new(), + L), + {dict:size(SzD), dict:fold(fun(_,S,N) -> S + N end, 0, SzD)}; + +map_info(_, T) -> + T. + %% The point of extracting the config here is so that 'transport' info %% has one entry for each transport ref, the peer table only %% containing entries that have a living watchdog. @@ -1797,6 +2153,13 @@ mk_app(#diameter_app{} = A) -> info_pending(#state{} = S) -> diameter_traffic:pending(transports(S)). +%% info_info/1 +%% +%% Extract process_info from connections info. + +info_info(S) -> + [I || L <- conn_list(S), {info, I} <- L]. + %% info_connections/1 %% %% One entry per transport connection. Statistics for each entry are diff --git a/lib/diameter/src/base/diameter_service_sup.erl b/lib/diameter/src/base/diameter_service_sup.erl index 153fff902f..369e403fff 100644 --- a/lib/diameter/src/base/diameter_service_sup.erl +++ b/lib/diameter/src/base/diameter_service_sup.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -58,7 +59,7 @@ init([]) -> ChildSpec = {Mod, {Mod, start_link, []}, temporary, - 1000, + 5000, worker, [Mod]}, {ok, {Flags, [ChildSpec]}}. diff --git a/lib/diameter/src/base/diameter_session.erl b/lib/diameter/src/base/diameter_session.erl index 3b236f109a..4cd76ed1f1 100644 --- a/lib/diameter/src/base/diameter_session.erl +++ b/lib/diameter/src/base/diameter_session.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -157,8 +158,8 @@ session_id(Host) -> %% --------------------------------------------------------------------------- init() -> - Now = now(), - random:seed(Now), + {Now, Seed} = diameter_lib:seed(), + random:seed(Seed), Time = time32(Now), Seq = (?INT32 band (Time bsl 20)) bor (random:uniform(1 bsl 20) - 1), ets:insert(diameter_sequence, [{origin_state_id, Time}, diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl index 8353613d32..8c10464e98 100644 --- a/lib/diameter/src/base/diameter_stats.erl +++ b/lib/diameter/src/base/diameter_stats.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2014. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -22,7 +23,6 @@ %% -module(diameter_stats). - -behaviour(gen_server). -export([reg/2, reg/1, @@ -58,7 +58,7 @@ -define(SERVER, ?MODULE). %% Server state. --record(state, {id = now()}). +-record(state, {id = diameter_lib:now()}). -type counter() :: any(). -type ref() :: any(). @@ -140,9 +140,14 @@ read(Refs, B) -> L. to_refdict(L) -> - lists:foldl(fun({{C,R}, N}, D) -> orddict:append(R, {C,N}, D) end, - orddict:new(), - L). + lists:foldl(fun append/2, orddict:new(), L). + +%% Order both references and counters in the returned list. +append({{Ctr, Ref}, N}, Dict) -> + orddict:update(Ref, + fun(D) -> orddict:store(Ctr, N, D) end, + [{Ctr, N}], + Dict). %% --------------------------------------------------------------------------- %% # sum(Refs) @@ -218,7 +223,7 @@ uptime() -> %% ---------------------------------------------------------- init([]) -> - ets:new(?TABLE, [named_table, ordered_set, public]), + ets:new(?TABLE, [named_table, set, public, {write_concurrency, true}]), {ok, #state{}}. %% ---------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_sup.erl b/lib/diameter/src/base/diameter_sup.erl index e5afd23dcd..e89ede9843 100644 --- a/lib/diameter/src/base/diameter_sup.erl +++ b/lib/diameter/src/base/diameter_sup.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -64,7 +65,7 @@ spec(Mod) -> {Mod, {Mod, start_link, []}, permanent, - 1000, + infinity, supervisor, [Mod]}. diff --git a/lib/diameter/src/base/diameter_sync.erl b/lib/diameter/src/base/diameter_sync.erl index ce2db4b3a2..7fb6888e21 100644 --- a/lib/diameter/src/base/diameter_sync.erl +++ b/lib/diameter/src/base/diameter_sync.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -69,7 +70,7 @@ %% Server state. -record(state, - {time = now(), + {time = diameter_lib:now(), pending = 0 :: non_neg_integer(), %% outstanding requests monitor = new() :: ets:tid(), %% MonitorRef -> {Name, From} queue = new() :: ets:tid()}). %% Name -> queue of {Pid, Ref} diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index 7fbb306b02..c169d3fc2c 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2013-2014. All Rights Reserved. +%% Copyright Ericsson AB 2013-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -31,11 +32,15 @@ %% towards diameter_watchdog -export([receive_message/4]). +%% towards diameter_peer_fsm and diameter_watchdog +-export([incr/4, + incr_error/4, + incr_rc/4]). + %% towards diameter_service -export([make_recvdata/1, peer_up/1, peer_down/1, - failover/1, pending/1]). %% towards ?MODULE @@ -44,6 +49,8 @@ -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). +-define(LOGX(Reason, T), begin ?LOG(Reason, T), x({Reason, T}) end). + -define(RELAY, ?DIAMETER_DICT_RELAY). -define(BASE, ?DIAMETER_DICT_COMMON). %% Note: the RFC 3588 dictionary @@ -70,7 +77,13 @@ {peerT :: ets:tid(), service_name :: diameter:service_name(), apps :: [#diameter_app{}], - sequence :: diameter:sequence()}). + sequence :: diameter:sequence(), + codec :: [{string_decode, boolean()} + | {strict_mbit, boolean()} + | {incoming_maxlen, diameter:message_length()}]}). +%% Note that incoming_maxlen is currently handled in diameter_peer_fsm, +%% so that any message exceeding the maximum is discarded. Retain the +%% option in case we want to extend the values and semantics. %% Record stored in diameter_request for each outgoing request. -record(request, @@ -85,11 +98,16 @@ %% # make_recvdata/1 %% --------------------------------------------------------------------------- -make_recvdata([SvcName, PeerT, Apps, Mask | _]) -> +make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) -> + {_,_} = Mask = proplists:get_value(sequence, SvcOpts), #recvdata{service_name = SvcName, peerT = PeerT, apps = Apps, - sequence = Mask}. + sequence = Mask, + codec = [T || {K,_} = T <- SvcOpts, + lists:member(K, [string_decode, + incoming_maxlen, + strict_mbit])]}. %% --------------------------------------------------------------------------- %% peer_up/1 @@ -109,6 +127,78 @@ peer_down(TPid) -> failover(TPid). %% --------------------------------------------------------------------------- +%% incr/4 +%% --------------------------------------------------------------------------- + +incr(Dir, #diameter_packet{header = H}, TPid, AppDict) -> + incr(Dir, H, TPid, AppDict); + +incr(Dir, #diameter_header{} = H, TPid, AppDict) -> + incr(TPid, {msg_id(H, AppDict), Dir}). + +%% --------------------------------------------------------------------------- +%% incr_error/4 +%% --------------------------------------------------------------------------- + +%% Identify messages using the application dictionary, not the encode +%% dictionary, which may differ in the case of answer-message. +incr_error(Dir, T, Pid, {_MsgDict, AppDict}) -> + incr_error(Dir, T, Pid, AppDict); + +%% Decoded message without errors. +incr_error(recv, #diameter_packet{errors = []}, _, _) -> + ok; + +incr_error(recv = D, #diameter_packet{header = H}, TPid, AppDict) -> + incr_error(D, H, TPid, AppDict); + +%% Encoded message with errors and an identifiable header ... +incr_error(send = D, {_, _, #diameter_header{} = H}, TPid, AppDict) -> + incr_error(D, H, TPid, AppDict); + +%% ... or not. +incr_error(send = D, {_,_}, TPid, _) -> + incr_error(D, unknown, TPid); + +incr_error(Dir, #diameter_header{} = H, TPid, AppDict) -> + incr_error(Dir, msg_id(H, AppDict), TPid); + +incr_error(Dir, Id, TPid, _) -> + incr_error(Dir, Id, TPid). + +incr_error(Dir, Id, TPid) -> + incr(TPid, {Id, Dir, error}). + +%% --------------------------------------------------------------------------- +%% incr_rc/4 +%% --------------------------------------------------------------------------- + +-spec incr_rc(send|recv, Pkt, TPid, DictT) + -> {Counter, non_neg_integer()} + | Reason + when Pkt :: #diameter_packet{}, + TPid :: pid(), + DictT :: module() | {MsgDict :: module(), + AppDict :: module(), + CommonDict:: module()}, + Counter :: {'Result-Code', integer()} + | {'Experimental-Result', integer(), integer()}, + Reason :: atom(). + +incr_rc(Dir, Pkt, TPid, {_, AppDict, _} = DictT) -> + try + incr_result(Dir, Pkt, TPid, DictT) + catch + exit: {E,_} when E == no_result_code; + E == invalid_error_bit -> + incr(TPid, {msg_id(Pkt#diameter_packet.header, AppDict), Dir, E}), + E + end; + +incr_rc(Dir, Pkt, TPid, Dict0) -> + incr_rc(Dir, Pkt, TPid, {Dict0, Dict0, Dict0}). + +%% --------------------------------------------------------------------------- %% pending/1 %% --------------------------------------------------------------------------- @@ -150,6 +240,8 @@ receive_message(TPid, Pkt, Dict0, RecvData) Dict0, RecvData). +%% recv/6 + %% Incoming request ... recv(true, false, TPid, Pkt, Dict0, T) -> spawn_request(TPid, Pkt, Dict0, T); @@ -157,6 +249,7 @@ recv(true, false, TPid, Pkt, Dict0, T) -> %% ... answer to known request ... recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> Pid ! {answer, Ref, Req, Dict0, Pkt}; + %% Note that failover could have happened prior to this message being %% received and triggering failback. That is, both a failover message %% and answer may be on their way to the handler process. In the worst @@ -167,7 +260,9 @@ recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> %% any others are discarded. %% ... or not. -recv(false, false, _, _, _, _) -> +recv(false, false, TPid, Pkt, _, _) -> + ?LOG(discarded, Pkt#diameter_packet.header), + incr(TPid, {{unknown, 0}, recv, discarded}), ok. %% spawn_request/4 @@ -182,7 +277,7 @@ spawn_request(TPid, Pkt, Dict0, Opts, RecvData) -> spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts) catch error: system_limit = E -> %% discard - ?LOG({error, E}, now()) + ?LOG(error, E) end. %% --------------------------------------------------------------------------- @@ -193,8 +288,11 @@ recv_request(TPid, #diameter_packet{header = #diameter_header{application_id = Id}} = Pkt, Dict0, - #recvdata{peerT = PeerT, apps = Apps} + #recvdata{peerT = PeerT, + apps = Apps, + codec = Opts} = RecvData) -> + diameter_codec:setopts([{common_dictionary, Dict0} | Opts]), send_A(recv_R(diameter_service:find_incoming_app(PeerT, TPid, Id, Apps), TPid, Pkt, @@ -206,12 +304,14 @@ recv_request(TPid, %% recv_R/5 -recv_R({#diameter_app{id = Id, dictionary = Dict} = App, Caps}, +recv_R({#diameter_app{id = Id, dictionary = AppDict} = App, Caps}, TPid, Pkt0, Dict0, RecvData) -> - Pkt = errors(Id, diameter_codec:decode(Id, Dict, Pkt0)), + incr(recv, Pkt0, TPid, AppDict), + Pkt = errors(Id, diameter_codec:decode(Id, AppDict, Pkt0)), + incr_error(recv, Pkt, TPid, AppDict), {Caps, Pkt, App, recv_R(App, TPid, Dict0, Caps, RecvData, Pkt)}; %% Note that the decode is different depending on whether or not Id is %% ?APP_ID_RELAY. @@ -283,23 +383,25 @@ rc(N) -> %% This error is returned when a request is received with an invalid %% message length. -errors(_, #diameter_packet{header = #diameter_header{length = Len}, +errors(_, #diameter_packet{header = #diameter_header{length = Len} = H, bin = Bin, errors = Es} = Pkt) when Len < 20; 0 /= Len rem 4; 8*Len /= bit_size(Bin) -> + ?LOG(invalid_message_length, {H, bit_size(Bin)}), Pkt#diameter_packet{errors = [5015 | Es]}; %% DIAMETER_UNSUPPORTED_VERSION 5011 %% This error is returned when a request was received, whose version %% number is unsupported. -errors(_, #diameter_packet{header = #diameter_header{version = V}, +errors(_, #diameter_packet{header = #diameter_header{version = V} = H, errors = Es} = Pkt) when V /= ?DIAMETER_VERSION -> + ?LOG(unsupported_version, H), Pkt#diameter_packet{errors = [5011 | Es]}; %% DIAMETER_COMMAND_UNSUPPORTED 3001 @@ -307,12 +409,13 @@ errors(_, #diameter_packet{header = #diameter_header{version = V}, %% recognize or support. This MUST be used when a Diameter node %% receives an experimental command that it does not understand. -errors(Id, #diameter_packet{header = #diameter_header{is_proxiable = P}, +errors(Id, #diameter_packet{header = #diameter_header{is_proxiable = P} = H, msg = M, errors = Es} = Pkt) when ?APP_ID_RELAY /= Id, undefined == M; %% don't know the command ?APP_ID_RELAY == Id, not P -> %% command isn't proxiable + ?LOG(command_unsupported, H), Pkt#diameter_packet{errors = [3001 | Es]}; %% DIAMETER_INVALID_HDR_BITS 3008 @@ -321,9 +424,11 @@ errors(Id, #diameter_packet{header = #diameter_header{is_proxiable = P}, %% inconsistent with the command code's definition. errors(_, #diameter_packet{header = #diameter_header{is_request = true, - is_error = true}, + is_error = true} + = H, errors = Es} = Pkt) -> + ?LOG(invalid_hdr_bits, H), Pkt#diameter_packet{errors = [3008 | Es]}; %% Green. @@ -396,7 +501,7 @@ send_A({Caps, Pkt}, TPid, Dict0, _RecvData) -> %% unsupported application #diameter_packet{errors = [RC|_]} = Pkt, send_A(answer_message(RC, Caps, Dict0, Pkt), TPid, - Dict0, + {Dict0, Dict0}, Pkt, [], []); @@ -404,7 +509,7 @@ send_A({Caps, Pkt}, TPid, Dict0, _RecvData) -> %% unsupported application send_A({Caps, Pkt, App, {T, EvalPktFs, EvalFs}}, TPid, Dict0, RecvData) -> send_A(answer(T, Caps, Pkt, App, Dict0, RecvData), TPid, - Dict0, + {App#diameter_app.dictionary, Dict0}, Pkt, EvalPktFs, EvalFs); @@ -414,14 +519,17 @@ send_A(_, _, _, _) -> %% send_A/6 -send_A(T, TPid, Dict0, ReqPkt, EvalPktFs, EvalFs) -> - reply(T, TPid, Dict0, EvalPktFs, ReqPkt), +send_A(T, TPid, {AppDict, Dict0} = DictT0, ReqPkt, EvalPktFs, EvalFs) -> + {MsgDict, Pkt} = reply(T, TPid, DictT0, EvalPktFs, ReqPkt), + incr(send, Pkt, TPid, AppDict), + incr_rc(send, Pkt, TPid, {MsgDict, AppDict, Dict0}), %% count outgoing + send(TPid, Pkt), lists:foreach(fun diameter_lib:eval/1, EvalFs). %% answer/6 answer({reply, Ans}, _Caps, _Pkt, App, Dict0, _RecvData) -> - {dict(App#diameter_app.dictionary, Dict0, Ans), Ans}; + {msg_dict(App#diameter_app.dictionary, Dict0, Ans), Ans}; answer({call, Opts}, Caps, Pkt, App, Dict0, RecvData) -> #diameter_caps{origin_host = {OH,_}} @@ -444,27 +552,37 @@ answer({answer_message, RC} = T, Caps, Pkt, App, Dict0, _RecvData) -> orelse ?ERROR({invalid_return, T, handle_request, App}), answer_message(RC, Caps, Dict0, Pkt). -%% dict/3 +%% msg_dict/3 +%% +%% Return the dictionary defining the message grammar in question: the +%% application dictionary or the common dictionary. -%% An incoming answer, not yet decoded. -dict(Dict, Dict0, #diameter_packet{header - = #diameter_header{is_request = false, - is_error = E}, - msg = undefined}) -> - if E -> Dict0; true -> Dict end; +msg_dict(AppDict, Dict0, [Msg]) + when is_list(Msg); + is_tuple(Msg) -> + msg_dict(AppDict, Dict0, Msg); -dict(Dict, Dict0, [Msg]) -> - dict(Dict, Dict0, Msg); +msg_dict(AppDict, Dict0, Msg) -> + choose(is_answer_message(Msg, Dict0), Dict0, AppDict). -dict(Dict, Dict0, #diameter_packet{msg = Msg}) -> - dict(Dict, Dict0, Msg); +%% Incoming, not yet decoded. +is_answer_message(#diameter_packet{header = #diameter_header{} = H, + msg = undefined}, + Dict0) -> + is_answer_message([H], Dict0); -dict(Dict, Dict0, Msg) -> - choose(is_answer_message(Msg, Dict0), Dict0, Dict). +is_answer_message(#diameter_packet{msg = Msg}, Dict0) -> + is_answer_message(Msg, Dict0); +%% Message sent as a header/avps list. +is_answer_message([#diameter_header{is_request = R, is_error = E} | _], _) -> + E andalso not R; + +%% Message sent as a tagged avp/value list. is_answer_message([Name | _], _) -> Name == 'answer-message'; +%% Message sent as a record. is_answer_message(Rec, Dict) -> try 'answer-message' == Dict:rec2msg(element(1,Rec)) @@ -479,7 +597,6 @@ answer_message(RC, origin_realm = {OR,_}}, Dict0, Pkt) -> - ?LOG({error, RC}, Pkt), {Dict0, answer_message(OH, OR, RC, Dict0, Pkt)}. %% resend/7 @@ -513,7 +630,7 @@ resend(false, Route = #diameter_avp{data = {Dict0, 'Route-Record', OH}}, Seq = diameter_session:sequence(Mask), Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq}, - Msg = [Hdr, Route | Avps], + Msg = [Hdr, Route | Avps], %% reordered at encode resend(send_request(SvcName, App, Msg, Opts), Caps, Dict0, Pkt). %% The incoming request is relayed with the addition of a %% Route-Record. Note the requirement on the return from call/4 below, @@ -535,7 +652,7 @@ resend(false, %% %% Relay a reply to a relayed request. -%% Answer from the peer: reset the hop by hop identifier and send. +%% Answer from the peer: reset the hop by hop identifier. resend(#diameter_packet{bin = B} = Pkt, _Caps, @@ -574,31 +691,31 @@ is_loop(Code, Vid, OH, Dict0, Avps) -> %% reply/5 %% Local answer ... -reply({Dict, Ans}, TPid, Dict0, Fs, ReqPkt) -> - reply(Ans, Dict, TPid, Dict0, Fs, ReqPkt); +reply({MsgDict, Ans}, TPid, {AppDict, Dict0}, Fs, ReqPkt) -> + local(Ans, TPid, {MsgDict, AppDict, Dict0}, Fs, ReqPkt); %% ... or relayed. -reply(#diameter_packet{} = Pkt, TPid, _Dict0, Fs, _ReqPkt) -> +reply(#diameter_packet{} = Pkt, _TPid, {AppDict, Dict0}, Fs, _ReqPkt) -> eval_packet(Pkt, Fs), - send(TPid, Pkt). + {msg_dict(AppDict, Dict0, Pkt), Pkt}. -%% reply/6 +%% local/5 %% %% Send a locally originating reply. %% Skip the setting of Result-Code and Failed-AVP's below. This is %% undocumented and shouldn't be relied on. -reply([Msg], Dict, TPid, Dict0, Fs, ReqPkt) +local([Msg], TPid, DictT, Fs, ReqPkt) when is_list(Msg); is_tuple(Msg) -> - reply(Msg, Dict, TPid, Dict0, Fs, ReqPkt#diameter_packet{errors = []}); + local(Msg, TPid, DictT, Fs, ReqPkt#diameter_packet{errors = []}); -reply(Msg, Dict, TPid, Dict0, Fs, ReqPkt) -> - Pkt = encode(Dict, - reset(make_answer_packet(Msg, ReqPkt), Dict, Dict0), +local(Msg, TPid, {MsgDict, AppDict, Dict0}, Fs, ReqPkt) -> + Pkt = encode({MsgDict, AppDict}, + TPid, + reset(make_answer_packet(Msg, ReqPkt), MsgDict, Dict0), Fs), - incr(send, Pkt, Dict, TPid, Dict0), %% count outgoing result codes - send(TPid, Pkt). + {MsgDict, Pkt}. %% reset/3 @@ -871,8 +988,8 @@ answer_message(OH, OR, RC, Dict0, #diameter_packet{avps = Avps, session_id(Code, Vid, Dict0, Avps) when is_list(Avps) -> try - {value, #diameter_avp{data = D}} = find_avp(Code, Vid, Avps), - [{'Session-Id', [Dict0:avp(decode, D, 'Session-Id')]}] + #diameter_avp{data = Bin} = find_avp(Code, Vid, Avps), + [{'Session-Id', [Dict0:avp(decode, Bin, 'Session-Id')]}] catch error: _ -> [] @@ -889,26 +1006,17 @@ failed_avp(_, [] = No) -> %% find_avp/3 -find_avp(Code, Vid, Avps) - when is_integer(Code), (undefined == Vid orelse is_integer(Vid)) -> - find(fun(A) -> is_avp(Code, Vid, A) end, Avps). +%% Grouped ... +find_avp(Code, VId, [[#diameter_avp{code = Code, vendor_id = VId} | _] = As + | _]) -> + As; -%% The final argument here could be a list of AVP's, depending on the case, -%% but we're only searching at the top level. -is_avp(Code, Vid, #diameter_avp{code = Code, vendor_id = Vid}) -> - true; -is_avp(_, _, _) -> - false. +%% ... or not. +find_avp(Code, VId, [#diameter_avp{code = Code, vendor_id = VId} = A | _]) -> + A; -find(_, []) -> - false; -find(Pred, [H|T]) -> - case Pred(H) of - true -> - {value, H}; - false -> - find(Pred, T) - end. +find_avp(Code, VId, [_ | Avps]) -> + find_avp(Code, VId, Avps). %% 7. Error Handling %% @@ -962,35 +1070,79 @@ find(Pred, [H|T]) -> %% code, the missing vendor id, and a zero filled payload of the minimum %% required length for the omitted AVP will be added. -%% incr/4 +%% incr_result/5 %% %% Increment a stats counter for result codes in incoming and outgoing %% answers. +%% Message sent as a header/avps list. +incr_result(send = Dir, + #diameter_packet{msg = [#diameter_header{} = H | _]} + = Pkt, + TPid, + DictT) -> + incr_res(Dir, Pkt#diameter_packet{header = H}, TPid, DictT); + %% Outgoing message as binary: don't count. (Sending binaries is only %% partially supported.) -incr(_, #diameter_packet{msg = undefined}, _, _, _) -> - ok; +incr_result(send, #diameter_packet{header = undefined = No}, _, _) -> + No; -%% Incoming with decode errors. -incr(recv = D, #diameter_packet{header = H, errors = [_|_]}, _, TPid, _) -> - incr(TPid, {diameter_codec:msg_id(H), D, error}); +%% Incoming or outgoing. Outgoing with encode errors never gets here +%% since encode fails. +incr_result(Dir, Pkt, TPid, DictT) -> + incr_res(Dir, Pkt, TPid, DictT). -%% Incoming without errors or outgoing. Outgoing with encode errors -%% never gets here since encode fails. -incr(Dir, Pkt, Dict, TPid, Dict0) -> - #diameter_packet{header = #diameter_header{is_error = E} - = Hdr, - msg = Rec} - = Pkt, +incr_res(Dir, + #diameter_packet{header = #diameter_header{is_error = E} + = Hdr, + errors = Es} + = Pkt, + TPid, + DictT) -> + {MsgDict, AppDict, Dict0} = DictT, + + Id = msg_id(Hdr, AppDict), + %% Could be {relay, 0}, in which case the R-bit is redundant since + %% only answers are being counted. Let it be however, so that the + %% same tuple is in both send/recv and result code counters. - RC = int(get_avp_value(Dict, 'Result-Code', Rec)), + %% Count incoming decode errors. + recv /= Dir orelse [] == Es orelse incr_error(Dir, Id, TPid, AppDict), - %% Exit on an improper Result-Code. + %% Exit on a missing result code. + T = rc_counter(MsgDict, Dir, Pkt), + T == false andalso ?LOGX(no_result_code, {MsgDict, Dir, Hdr}), + {Ctr, RC, Avp} = T, + + %% Or on an inappropriate value. is_result(RC, E, Dict0) - orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]), + orelse ?LOGX(invalid_error_bit, {MsgDict, Dir, Hdr, Avp}), + + incr(TPid, {Id, Dir, Ctr}), + Ctr. + +%% msg_id/2 + +msg_id(#diameter_packet{header = H}, AppDict) -> + msg_id(H, AppDict); + +%% Only count on known keys so as not to be vulnerable to attack: +%% there are 2^32 (application ids) * 2^24 (command codes) = 2^56 +%% pairs for an attacker to choose from. +msg_id(Hdr, AppDict) -> + {Aid, Code, R} = Id = diameter_codec:msg_id(Hdr), + case AppDict:id() of + ?APP_ID_RELAY -> + {relay, R}; + A -> + unknown(A /= Aid orelse '' == AppDict:msg_name(Code, 0 == R), Id) + end. - irc(TPid, Hdr, Dir, rc_counter(Dict, Rec, RC)). +unknown(true, {_, _, R}) -> + {unknown, R}; +unknown(false, Id) -> + Id. %% No E-bit: can't be 3xxx. is_result(RC, false, _Dict0) -> @@ -1006,63 +1158,62 @@ is_result(RC, true, _) -> orelse 5000 =< RC andalso RC < 6000. -irc(_, _, _, undefined) -> - false; - -irc(TPid, Hdr, Dir, Ctr) -> - incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}). - %% incr/2 incr(TPid, Counter) -> diameter_stats:incr(Counter, TPid, 1). -%% rc_counter/2 +%% rc_counter/3 %% RFC 3588, 7.6: %% %% All Diameter answer messages defined in vendor-specific %% applications MUST include either one Result-Code AVP or one %% Experimental-Result AVP. -%% -%% Maintain statistics assuming one or the other, not both, which is -%% surely the intent of the RFC. -rc_counter(Dict, Rec, undefined) -> - rcc(get_avp_value(Dict, 'Experimental-Result', Rec)); -rc_counter(_, _, RC) -> - {'Result-Code', RC}. +rc_counter(Dict, Dir, #diameter_packet{header = H, + avps = As, + msg = Msg}) + when Dir == recv; %% decoded incoming + Msg == undefined -> %% relayed outgoing + rc_counter(Dict, [H|As]); -%% Outgoing answers may be in any of the forms messages can be sent -%% in. Incoming messages will be records. We're assuming here that the -%% arity of the result code AVP's is 0 or 1. +rc_counter(Dict, _, #diameter_packet{msg = Msg}) -> + rc_counter(Dict, Msg). -rcc([{_,_,N} = T | _]) +rc_counter(Dict, Msg) -> + rcc(get_result(Dict, Msg)). + +rcc(#diameter_avp{name = 'Result-Code' = Name, value = N} = A) when is_integer(N) -> - T; -rcc({_,_,N} = T) + {{Name, N}, N, A}; + +rcc(#diameter_avp{name = 'Result-Code' = Name, value = [N|_]} = A) when is_integer(N) -> - T; -rcc(_) -> - undefined. + {{Name, N}, N, A}; -%% Extract the first good looking integer. There's no guarantee -%% that what we're looking for has arity 1. -int([N|_]) +rcc(#diameter_avp{name = 'Experimental-Result', value = {_,_,N} = T} = A) when is_integer(N) -> - N; -int(N) + {T, N, A}; + +rcc(#diameter_avp{name = 'Experimental-Result', value = [{_,_,N} = T|_]} = A) when is_integer(N) -> - N; -int(_) -> - undefined. + {T, N, A}; --spec x(any(), atom(), list()) -> no_return(). +rcc(_) -> + false. -%% Warn and exit request process on errors in an incoming answer. -x(Reason, F, A) -> - diameter_lib:warning_report(Reason, {?MODULE, F, A}), - x(Reason). +%% get_result/2 + +get_result(Dict, Msg) -> + try + [throw(A) || N <- ['Result-Code', 'Experimental-Result'], + #diameter_avp{} = A <- [get_avp(Dict, N, Msg)]] + of + [] -> false + catch + #diameter_avp{} = A -> A + end. x(T) -> exit(T). @@ -1123,10 +1274,9 @@ answer_rc(_, _, Sent) -> send_R(SvcName, AppOrAlias, Msg, Opts, Caller) -> case pick_peer(SvcName, AppOrAlias, Msg, Opts) of - {{_,_,_} = Transport, Mask} -> + {Transport, Mask, SvcOpts} -> + diameter_codec:setopts(SvcOpts), send_request(Transport, Mask, Msg, Opts, Caller, SvcName); - false -> - {error, no_connection}; {error, _} = No -> No end. @@ -1188,6 +1338,8 @@ send_request({TPid, Caps, App} SvcName, []). +%% send_R/7 + send_R({send, Msg}, Pkt, Transport, Opts, Caller, SvcName, Fs) -> send_R(make_request_packet(Msg, Pkt), Transport, @@ -1290,6 +1442,21 @@ make_request_packet(#diameter_packet{header = Hdr} = Pkt, make_request_packet(Msg, Pkt) -> Pkt#diameter_packet{msg = Msg}. +%% make_retransmit_packet/2 + +make_retransmit_packet(#diameter_packet{msg = [#diameter_header{} = Hdr + | Avps]} + = Pkt) -> + Pkt#diameter_packet{msg = [make_retransmit_header(Hdr) | Avps]}; + +make_retransmit_packet(#diameter_packet{header = Hdr} = Pkt) -> + Pkt#diameter_packet{header = make_retransmit_header(Hdr)}. + +%% make_retransmit_header/1 + +make_retransmit_header(Hdr) -> + Hdr#diameter_header{is_retransmitted = true}. + %% fold_record/2 fold_record(undefined, R) -> @@ -1300,12 +1467,12 @@ fold_record(Rec, R) -> %% send_R/6 send_R(Pkt0, - {TPid, Caps, #diameter_app{dictionary = Dict} = App}, + {TPid, Caps, #diameter_app{dictionary = AppDict} = App}, Opts, {Pid, Ref}, SvcName, Fs) -> - Pkt = encode(Dict, Pkt0, Fs), + Pkt = encode(AppDict, TPid, Pkt0, Fs), #options{timeout = Timeout} = Opts, @@ -1317,15 +1484,12 @@ send_R(Pkt0, caps = Caps, packet = Pkt0}, - try - TRef = send_request(TPid, Pkt, Req, SvcName, Timeout), - Pid ! Ref, %% tell caller a send has been attempted - handle_answer(SvcName, - App, - recv_A(Timeout, SvcName, App, Opts, {TRef, Req})) - after - erase_requests(Pkt) - end. + incr(send, Pkt, TPid, AppDict), + TRef = send_request(TPid, Pkt, Req, SvcName, Timeout), + Pid ! Ref, %% tell caller a send has been attempted + handle_answer(SvcName, + App, + recv_A(Timeout, SvcName, App, Opts, {TRef, Req})). %% recv_A/5 @@ -1353,14 +1517,14 @@ handle_answer(SvcName, App, {error, Req, Reason}) -> handle_error(App, Req, Reason, SvcName); handle_answer(SvcName, - #diameter_app{dictionary = Dict, + #diameter_app{dictionary = AppDict, id = Id} = App, {answer, Req, Dict0, Pkt}) -> - Mod = dict(Dict, Dict0, Pkt), - handle_A(errors(Id, diameter_codec:decode(Mod, Pkt)), + MsgDict = msg_dict(AppDict, Dict0, Pkt), + handle_A(errors(Id, diameter_codec:decode({MsgDict, AppDict}, Pkt)), SvcName, - Mod, + MsgDict, Dict0, App, Req). @@ -1370,18 +1534,30 @@ handle_answer(SvcName, %% want to examine the answer? handle_A(Pkt, SvcName, Dict, Dict0, App, #request{transport = TPid} = Req) -> + AppDict = App#diameter_app.dictionary, + + incr(recv, Pkt, TPid, AppDict), + try - incr(recv, Pkt, Dict, TPid, Dict0) %% count incoming result codes + incr_result(recv, Pkt, TPid, {Dict, AppDict, Dict0}) %% count incoming of _ -> answer(Pkt, SvcName, App, Req) catch - exit: {invalid_error_bit, RC} -> + exit: {no_result_code, _} -> + %% RFC 6733 requires one of Result-Code or + %% Experimental-Result, but the decode will have detected + %% a missing AVP. If both are optional in the dictionary + %% then this isn't a decode error: just continue on. + answer(Pkt, SvcName, App, Req); + exit: {invalid_error_bit, {_, _, _, Avp}} -> #diameter_packet{errors = Es} = Pkt, - E = {5004, #diameter_avp{name = 'Result-Code', value = RC}}, + E = {5004, Avp}, answer(Pkt#diameter_packet{errors = [E|Es]}, SvcName, App, Req) end. +%% answer/4 + answer(Pkt, SvcName, #diameter_app{module = ModX, @@ -1389,6 +1565,8 @@ answer(Pkt, Req) -> a(Pkt, SvcName, ModX, AE, Req). +-spec a(_, _, _) -> no_return(). %% silence dialyzer + a(#diameter_packet{errors = Es} = Pkt, SvcName, @@ -1401,11 +1579,16 @@ a(#diameter_packet{errors = Es} callback == AE -> cb(ModX, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]); -a(Pkt, SvcName, _, report, Req) -> - x(errors, handle_answer, [SvcName, Req, Pkt]); +a(Pkt, SvcName, _, AE, _) -> + a(Pkt#diameter_packet.header, SvcName, AE). + +a(Hdr, SvcName, report) -> + MFA = {?MODULE, handle_answer, [SvcName, Hdr]}, + diameter_lib:warning_report(errors, MFA), + a(Hdr, SvcName, discard); -a(Pkt, SvcName, _, discard, Req) -> - x({errors, handle_answer, [SvcName, Req, Pkt]}). +a(Hdr, SvcName, discard) -> + x({answer_errors, {SvcName, Hdr}}). %% Note that we don't check that the application id in the answer's %% header is what we expect. (TODO: Does the rfc says anything about @@ -1415,7 +1598,9 @@ a(Pkt, SvcName, _, discard, Req) -> %% timer value is ignored. This means that an answer could be accepted %% from a peer after timeout in the case of failover. -retransmit({{_,_,App} = Transport, _Mask}, Req, Opts, SvcName, Timeout) -> +%% retransmit/5 + +retransmit({{_,_,App} = Transport, _, _}, Req, Opts, SvcName, Timeout) -> try retransmit(Transport, Req, SvcName, Timeout) of T -> recv_A(Timeout, SvcName, App, Opts, T) catch @@ -1436,17 +1621,23 @@ pick_peer(SvcName, pick_peer(SvcName, App, Msg, Opts#options{extra = []}); pick_peer(_, _, undefined, _) -> - false; + {error, no_connection}; pick_peer(SvcName, AppOrAlias, Msg, #options{filter = Filter, extra = Xtra}) -> - diameter_service:pick_peer(SvcName, - AppOrAlias, - {fun(D) -> get_destination(D, Msg) end, - Filter, - Xtra}). + pick(diameter_service:pick_peer(SvcName, + AppOrAlias, + {fun(D) -> get_destination(D, Msg) end, + Filter, + Xtra})). + +pick(false) -> + {error, no_connection}; + +pick(T) -> + T. %% handle_error/4 @@ -1463,10 +1654,10 @@ msg(#diameter_packet{msg = undefined, bin = Bin}) -> msg(#diameter_packet{msg = Msg}) -> Msg. -%% encode/3 +%% encode/4 -encode(Dict, Pkt, Fs) -> - P = encode(Dict, Pkt), +encode(Dict, TPid, Pkt, Fs) -> + P = encode(Dict, TPid, Pkt), eval_packet(P, Fs), P. @@ -1477,21 +1668,41 @@ encode(Dict, Pkt, Fs) -> %% an encoded binary. This isn't the usual case and doesn't properly %% support retransmission but is useful for test. +encode(Dict, TPid, Pkt) + when is_atom(Dict) -> + encode({Dict, Dict}, TPid, Pkt); + %% A message to be encoded. -encode(Dict, #diameter_packet{bin = undefined} = Pkt) -> - diameter_codec:encode(Dict, Pkt); +encode(DictT, TPid, #diameter_packet{bin = undefined} = Pkt) -> + {Dict, AppDict} = DictT, + try + diameter_codec:encode(Dict, Pkt) + catch + exit: {diameter_codec, encode, T} = Reason -> + incr_error(send, T, TPid, AppDict), + exit(Reason) + end; %% An encoded binary: just send. -encode(_, #diameter_packet{} = Pkt) -> +encode(_, _, #diameter_packet{} = Pkt) -> Pkt. %% send_request/5 send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout) when node() == node(TPid) -> - %% Store the outgoing request before sending to avoid a race with - %% reply reception. - TRef = store_request(TPid, Bin, Req, Timeout), + Seqs = diameter_codec:sequence_numbers(Bin), + TRef = erlang:start_timer(Timeout, self(), TPid), + Entry = {Seqs, Req, TRef}, + + %% Ensure that request table is cleaned even if we receive an exit + %% signal. An alternative would be to simply trap exits, but + %% callbacks are applied in this process, and these could possibly + %% be expecting the prevailing behaviour. + Self = self(), + spawn(fun() -> diameter_lib:wait([Self]), erase_request(Entry) end), + + store_request(Entry, TPid), send(TPid, Pkt), TRef; @@ -1505,25 +1716,34 @@ send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) -> %% send/1 -send({TPid, Pkt, #request{handler = Pid} = Req, SvcName, Timeout, TRef}) -> - Ref = send_request(TPid, - Pkt, - Req#request{handler = self()}, - SvcName, - Timeout), +send({TPid, Pkt, #request{handler = Pid} = Req0, SvcName, Timeout, TRef}) -> + Req = Req0#request{handler = self()}, + recv(TPid, Pid, TRef, send_request(TPid, Pkt, Req, SvcName, Timeout)). + +%% recv/4 +%% +%% Relay an answer from a remote node. + +recv(TPid, Pid, TRef, LocalTRef) -> receive {answer, _, _, _, _} = A -> Pid ! A; - {failover = T, Ref} -> + {failover = T, LocalTRef} -> Pid ! {T, TRef}; T -> - exit({timeout, Ref, TPid} = T) + exit({timeout, LocalTRef, TPid} = T) end. %% send/2 -send(Pid, Pkt) -> - Pid ! {send, Pkt}. +send(Pid, Pkt) -> %% Strip potentially large message terms. + #diameter_packet{header = H, + bin = Bin, + transport_data = T} + = Pkt, + Pid ! {send, #diameter_packet{header = H, + bin = Bin, + transport_data = T}}. %% retransmit/4 @@ -1536,9 +1756,7 @@ retransmit({TPid, Caps, App} have_request(Pkt0, TPid) %% Don't failover to a peer we've andalso ?THROW(timeout), %% already sent to. - #diameter_packet{header = Hdr0} = Pkt0, - Hdr = Hdr0#diameter_header{is_retransmitted = true}, - Pkt = Pkt0#diameter_packet{header = Hdr}, + Pkt = make_retransmit_packet(Pkt0), retransmit(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]), Transport, @@ -1574,32 +1792,37 @@ retransmit(T, {_, _, App}, _, _, _, _) -> ?ERROR({invalid_return, T, prepare_retransmit, App}). resend_request(Pkt0, - {TPid, Caps, #diameter_app{dictionary = Dict}}, + {TPid, Caps, #diameter_app{dictionary = AppDict}}, Req0, SvcName, Tmo, Fs) -> - Pkt = encode(Dict, Pkt0, Fs), + Pkt = encode(AppDict, TPid, Pkt0, Fs), Req = Req0#request{transport = TPid, packet = Pkt0, caps = Caps}, - ?LOG(retransmission, Req), + ?LOG(retransmission, Pkt#diameter_packet.header), + incr(TPid, {msg_id(Pkt, AppDict), send, retransmission}), TRef = send_request(TPid, Pkt, Req, SvcName, Tmo), {TRef, Req}. -%% store_request/4 +%% store_request/2 -store_request(TPid, Bin, Req, Timeout) -> - Seqs = diameter_codec:sequence_numbers(Bin), - TRef = erlang:start_timer(Timeout, self(), TPid), - ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}), +store_request(T, TPid) -> + ets:insert(?REQUEST_TABLE, T), ets:member(?REQUEST_TABLE, TPid) - orelse (self() ! {failover, TRef}), %% failover/1 may have missed - TRef. + orelse begin + {_Seqs, _Req, TRef} = T, + self() ! {failover, TRef} %% failover/1 may have missed + end. %% lookup_request/2 +%% +%% Note the match on both the key and transport pid. The latter is +%% necessary since the same Hop-by-Hop and End-to-End identifiers are +%% reused in the case of retransmission. lookup_request(Msg, TPid) -> Seqs = diameter_codec:sequence_numbers(Msg), @@ -1613,10 +1836,10 @@ lookup_request(Msg, TPid) -> false end. -%% erase_requests/1 +%% erase_request/1 -erase_requests(Pkt) -> - ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)). +erase_request(T) -> + ets:delete_object(?REQUEST_TABLE, T). %% match_requests/1 @@ -1639,10 +1862,10 @@ failover(TPid) when is_pid(TPid) -> lists:foreach(fun failover/1, match_requests(TPid)); %% Note that a request process can store its request after failover -%% notifications are sent here: store_request/4 sends the notification +%% notifications are sent here: store_request/2 sends the notification %% in that case. -%% Failover as a consequence of request_peer_down/1: inform the +%% Failover as a consequence of peer_down/1: inform the %% request process. failover({_, Req, TRef}) -> #request{handler = Pid, @@ -1668,7 +1891,7 @@ str([]) -> str(T) -> T. -%% get_avp_value/3 +%% get_avp/3 %% %% Find an AVP in a message of one of three forms: %% @@ -1685,47 +1908,71 @@ str(T) -> %% look for are in the common dictionary. This is required since the %% relay dictionary doesn't inherit the common dictionary (which maybe %% it should). -get_avp_value(?RELAY, Name, Msg) -> - get_avp_value(?BASE, Name, Msg); +get_avp(?RELAY, Name, Msg) -> + get_avp(?BASE, Name, Msg); -%% Message sent as a header/avps list, probably a relay case but not -%% necessarily. -get_avp_value(Dict, Name, [#diameter_header{} | Avps]) -> +%% Message as a header/avps list. +get_avp(Dict, Name, [#diameter_header{} | Avps]) -> try {Code, _, VId} = Dict:avp_header(Name), - [A|_] = lists:dropwhile(fun(#diameter_avp{code = C, vendor_id = V}) -> - C /= Code orelse V /= VId - end, - Avps), - avp_decode(Dict, Name, A) + find_avp(Code, VId, Avps) + of + A -> + (avp_decode(Dict, Name, ungroup(A)))#diameter_avp{name = Name} catch error: _ -> undefined end; %% Outgoing message as a name/values list. -get_avp_value(_, Name, [_MsgName | Avps]) -> +get_avp(_, Name, [_MsgName | Avps]) -> case lists:keyfind(Name, 1, Avps) of {_, V} -> - V; + #diameter_avp{name = Name, value = V}; _ -> undefined end; %% Message is typically a record but not necessarily. -get_avp_value(Dict, Name, Rec) -> +get_avp(Dict, Name, Rec) -> try - Dict:'#get-'(Name, Rec) + #diameter_avp{name = Name, value = Dict:'#get-'(Name, Rec)} catch error:_ -> undefined end. +%% get_avp_value/3 + +get_avp_value(Dict, Name, Msg) -> + case get_avp(Dict, Name, Msg) of + #diameter_avp{value = V} -> + V; + undefined = No -> + No + end. + +%% ungroup/1 + +ungroup([Avp|_]) -> + Avp; +ungroup(Avp) -> + Avp. + +%% avp_decode/3 + avp_decode(Dict, Name, #diameter_avp{value = undefined, - data = Bin}) -> - Dict:avp(decode, Bin, Name); -avp_decode(_, _, #diameter_avp{value = V}) -> - V. + data = Bin} + = Avp) -> + try Dict:avp(decode, Bin, Name) of + V -> + Avp#diameter_avp{value = V} + catch + error:_ -> + Avp + end; +avp_decode(_, _, #diameter_avp{} = Avp) -> + Avp. cb(#diameter_app{module = [_|_] = M}, F, A) -> eval(M, F, A); diff --git a/lib/diameter/src/base/diameter_types.erl b/lib/diameter/src/base/diameter_types.erl index ca3338be5f..6ecf385239 100644 --- a/lib/diameter/src/base/diameter_types.erl +++ b/lib/diameter/src/base/diameter_types.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -75,7 +76,7 @@ %% message indicating this error MUST include the offending AVPs %% within a Failed-AVP AVP. %% --define(INVALID_LENGTH(Bin), erlang:error({'DIAMETER', 5014, Bin})). +-define(INVALID_LENGTH(Bitstr), erlang:error({'DIAMETER', 5014, Bitstr})). %% ------------------------------------------------------------------------- %% 3588, 4.2. Basic AVP Data Formats @@ -90,7 +91,12 @@ 'OctetString'(decode, Bin) when is_binary(Bin) -> - binary_to_list(Bin); + case diameter_codec:getopt(string_decode) of + true -> + binary_to_list(Bin); + false -> + Bin + end; 'OctetString'(decode, B) -> ?INVALID_LENGTH(B); @@ -298,21 +304,29 @@ 'OctetString'(M, lists:duplicate(0,7)); 'DiameterURI'(encode, #diameter_uri{type = Type, - fqdn = D, - port = P, + fqdn = DN, + port = PN, transport = T, - protocol = Prot} - = U) -> - S = lists:append([atom_to_list(Type), "://", D, - ":", integer_to_list(P), + protocol = P}) + when (Type == 'aaa' orelse Type == 'aaas'), + is_integer(PN), + 0 =< PN, + (T == tcp orelse T == sctp orelse T == udp), + (P == diameter orelse P == radius orelse P == 'tacacs+'), + (P /= diameter orelse T /= udp) -> + iolist_to_binary([atom_to_list(Type), "://", DN, + ":", integer_to_list(PN), ";transport=", atom_to_list(T), - ";protocol=", atom_to_list(Prot)]), - U = scan_uri(S), %% assert - list_to_binary(S); + ";protocol=", atom_to_list(P)]); +%% Don't omit defaults since they're dependent on whether RFC 3588 or +%% 6733 is being followed. For one, we don't know this at encode; for +%% two (more importantly), we don't know how the peer will interpret +%% defaults, so it's best to be explicit. Interpret defaults on decode +%% since there's no choice. 'DiameterURI'(encode, Str) -> Bin = iolist_to_binary(Str), - #diameter_uri{} = scan_uri(Bin), %% type check + #diameter_uri{} = scan_uri(Bin), %% assert Bin. %% -------------------- @@ -321,7 +335,6 @@ 'IPFilterRule'(encode = M, zero) -> 'OctetString'(M, lists:duplicate(0,33)); -%% TODO: parse grammar. 'IPFilterRule'(M, X) -> 'OctetString'(M, X). @@ -331,7 +344,6 @@ 'QoSFilterRule'(encode = M, zero = X) -> 'IPFilterRule'(M, X); -%% TODO: parse grammar. 'QoSFilterRule'(M, X) -> 'OctetString'(M, X). @@ -339,7 +351,13 @@ 'UTF8String'(decode, Bin) when is_binary(Bin) -> - tl([0|_] = unicode:characters_to_list([0, Bin])); %% assert list return + case diameter_codec:getopt(string_decode) of + true -> + %% assert list return + tl([0|_] = unicode:characters_to_list([0, Bin])); + false -> + <<_/binary>> = unicode:characters_to_binary(Bin) + end; 'UTF8String'(decode, B) -> ?INVALID_LENGTH(B); @@ -507,55 +525,90 @@ msb(false) -> ?TIME_2036. %% %% aaa-protocol = ( "diameter" / "radius" / "tacacs+" ) -scan_uri(Bin) - when is_binary(Bin) -> - scan_uri(binary_to_list(Bin)); -scan_uri("aaa://" ++ Rest) -> - scan_fqdn(Rest, #diameter_uri{type = aaa}); -scan_uri("aaas://" ++ Rest) -> - scan_fqdn(Rest, #diameter_uri{type = aaas}). - -scan_fqdn(S, U) -> - {[_|_] = F, Rest} = lists:splitwith(fun is_fqdn/1, S), - scan_opt_port(Rest, U#diameter_uri{fqdn = F}). - -scan_opt_port(":" ++ S, U) -> - {[_|_] = P, Rest} = lists:splitwith(fun is_digit/1, S), - scan_opt_transport(Rest, U#diameter_uri{port = list_to_integer(P)}); -scan_opt_port(S, U) -> - scan_opt_transport(S, U). - -scan_opt_transport(";transport=" ++ S, U) -> - {P, Rest} = transport(S), - scan_opt_protocol(Rest, U#diameter_uri{transport = P}); -scan_opt_transport(S, U) -> - scan_opt_protocol(S, U). - -scan_opt_protocol(";protocol=" ++ S, U) -> - {P, ""} = protocol(S), - U#diameter_uri{protocol = P}; -scan_opt_protocol("", U) -> - U. - -transport("tcp" ++ S) -> - {tcp, S}; -transport("sctp" ++ S) -> - {sctp, S}; -transport("udp" ++ S) -> - {udp, S}. - -protocol("diameter" ++ S) -> - {diameter, S}; -protocol("radius" ++ S) -> - {radius, S}; -protocol("tacacs+" ++ S) -> - {'tacacs+', S}. - -is_fqdn(C) -> - is_digit(C) orelse is_alpha(C) orelse C == $. orelse C == $-. - -is_alpha(C) -> - ($a =< C andalso C =< $z) orelse ($A =< C andalso C =< $Z). - -is_digit(C) -> - $0 =< C andalso C =< $9. +%% RFC 6733, 4.3.1, changes the defaults: +%% +%% "aaa://" FQDN [ port ] [ transport ] [ protocol ] +%% +%% ; No transport security +%% +%% "aaas://" FQDN [ port ] [ transport ] [ protocol ] +%% +%% ; Transport security used +%% +%% FQDN = < Fully Qualified Domain Name > +%% +%% port = ":" 1*DIGIT +%% +%% ; One of the ports used to listen for +%% ; incoming connections. +%% ; If absent, the default Diameter port +%% ; (3868) is assumed if no transport +%% ; security is used and port 5658 when +%% ; transport security (TLS/TCP and DTLS/SCTP) +%% ; is used. +%% +%% transport = ";transport=" transport-protocol +%% +%% ; One of the transports used to listen +%% ; for incoming connections. If absent, +%% ; the default protocol is assumed to be TCP. +%% ; UDP MUST NOT be used when the aaa-protocol +%% ; field is set to diameter. +%% +%% transport-protocol = ( "tcp" / "sctp" / "udp" ) +%% +%% protocol = ";protocol=" aaa-protocol +%% +%% ; If absent, the default AAA protocol +%% ; is Diameter. +%% +%% aaa-protocol = ( "diameter" / "radius" / "tacacs+" ) + +scan_uri(Bin) -> + RE = "^(aaas?)://" + "([-a-zA-Z0-9.]{1,255})" + "(:0{0,5}([0-9]{1,5}))?" + "(;transport=(tcp|sctp|udp))?" + "(;protocol=(diameter|radius|tacacs\\+))?$", + %% A port number is 16-bit, so an arbitrary number of digits is + %% just a vulnerability, but provide a little slack with leading + %% zeros in a port number just because the regexp was previously + %% [0-9]+ and it's not inconceivable that a value might be padded. + %% Don't fantasize about this padding being more than the number + %% of digits in the port number proper. + %% + %% Similarly, a FQDN can't be arbitrarily long: at most 255 + %% octets. + {match, [A, DN, PN, T, P]} = re:run(Bin, + RE, + [{capture, [1,2,4,6,8], binary}]), + Type = to_atom(A), + {PN0, T0} = defaults(diameter_codec:getopt(rfc), Type), + PortNr = to_int(PN, PN0), + 0 = PortNr bsr 16, %% assert + #diameter_uri{type = Type, + fqdn = 'OctetString'(decode, DN), + port = PortNr, + transport = to_atom(T, T0), + protocol = to_atom(P, diameter)}. + +%% Choose defaults based on the RFC, since 6733 has changed them. +defaults(3588, _) -> + {3868, sctp}; +defaults(6733, aaa) -> + {3868, tcp}; +defaults(6733, aaas) -> + {5658, tcp}. + +to_int(<<>>, N) -> + N; +to_int(B, _) -> + binary_to_integer(B). + +to_atom(<<>>, A) -> + A; +to_atom(B, _) -> + to_atom(B). + +to_atom(B) -> + binary_to_atom(B, latin1). diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 53e659e3f6..ea8b2fdb0e 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2014. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -49,8 +50,6 @@ -define(IS_NATURAL(N), (is_integer(N) andalso 0 =< N)). --define(CHOOSE(B,T,F), if (B) -> T; true -> F end). - -record(config, {suspect = 1 :: non_neg_integer(), %% OKAY -> SUSPECT okay = 3 :: non_neg_integer()}). %% REOPEN -> OKAY @@ -67,7 +66,9 @@ %% end PCB parent = self() :: pid(), %% service process transport :: pid() | undefined, %% peer_fsm process - tref :: reference(), %% reference for current watchdog timer + tref :: reference() %% reference for current watchdog timer + | integer() %% monotonic time + | undefined, dictionary :: module(), %% common dictionary receive_data :: term(), %% term passed into diameter_service with incoming message @@ -95,7 +96,7 @@ start({_,_} = Type, T) -> Ack = make_ref(), {ok, Pid} = diameter_watchdog_sup:start_child({Ack, Type, self(), T}), try - {erlang:monitor(process, Pid), Pid} + {monitor(process, Pid), Pid} after send(Pid, Ack) end. @@ -122,17 +123,20 @@ i({Ack, T, Pid, {RecvData, #diameter_service{applications = Apps, capabilities = Caps} = Svc}}) -> - erlang:monitor(process, Pid), + monitor(process, Pid), wait(Ack, Pid), - random:seed(now()), - putr(restart, {T, Opts, Svc}), %% save seeing it in trace - putr(dwr, dwr(Caps)), %% + {_, Seed} = diameter_lib:seed(), + random:seed(Seed), + putr(restart, {T, Opts, Svc, SvcOpts}), %% save seeing it in trace + putr(dwr, dwr(Caps)), %% {_,_} = Mask = proplists:get_value(sequence, SvcOpts), Restrict = proplists:get_value(restrict_connections, SvcOpts), Nodes = restrict_nodes(Restrict), Dict0 = common_dictionary(Apps), + diameter_codec:setopts([{common_dictionary, Dict0}, + {string_decode, false}]), #watchdog{parent = Pid, - transport = start(T, Opts, Mask, Nodes, Dict0, Svc), + transport = start(T, Opts, SvcOpts, Nodes, Dict0, Svc), tw = proplists:get_value(watchdog_timer, Opts, ?DEFAULT_TW_INIT), @@ -167,11 +171,11 @@ config({okay, N}, Rec) when ?IS_NATURAL(N) -> Rec#config{okay = N}. -%% start/5 +%% start/6 -start(T, Opts, Mask, Nodes, Dict0, Svc) -> +start(T, Opts, SvcOpts, Nodes, Dict0, Svc) -> {_MRef, Pid} - = diameter_peer_fsm:start(T, Opts, {Mask, Nodes, Dict0, Svc}), + = diameter_peer_fsm:start(T, Opts, {SvcOpts, Nodes, Dict0, Svc}), Pid. %% common_dictionary/1 @@ -221,7 +225,6 @@ dict0(_, _, Acc) -> Acc. config_error(T) -> - diameter_lib:error_report(configuration_error, T), exit({shutdown, {configuration_error, T}}). %% handle_call/3 @@ -245,11 +248,16 @@ handle_info(T, #watchdog{} = State) -> event(T, State, S), %% before 'watchdog' {noreply, S}; stop -> - ?LOG(stop, T), + ?LOG(stop, truncate(T)), event(T, State, State#watchdog{status = down}), {stop, {shutdown, T}, State} end. +truncate({'DOWN' = T, _, process, Pid, _}) -> + {T, Pid}; +truncate(T) -> + T. + close({'DOWN', _, process, TPid, {shutdown, Reason}}, #watchdog{transport = TPid, parent = Pid}) -> @@ -258,11 +266,15 @@ close({'DOWN', _, process, TPid, {shutdown, Reason}}, close(_, _) -> ok. -event(_, #watchdog{status = T}, #watchdog{status = T}) -> - ok; - -event(_, #watchdog{transport = undefined}, #watchdog{transport = undefined}) -> +event(_, + #watchdog{status = From, transport = F}, + #watchdog{status = To, transport = T}) + when F == undefined, T == undefined; %% transport not started + From == initial, To == down; %% never really left INITIAL + From == To -> %% no state transition ok; +%% Note that there is no INITIAL -> DOWN transition in RFC 3539: ours +%% is just a consequence of stop. event(Msg, #watchdog{status = From, transport = F, parent = Pid}, @@ -270,7 +282,7 @@ event(Msg, TPid = tpid(F,T), E = {[TPid | data(Msg, TPid, From, To)], From, To}, send(Pid, {watchdog, self(), E}), - ?LOG(transition, {self(), E}). + ?LOG(transition, {From, To}). data(Msg, TPid, reopen, okay) -> {recv, TPid, 'DWA', _Pkt} = Msg, %% assert @@ -313,14 +325,13 @@ code_change(_, State, _) -> %% The state transitions documented here are extracted from RFC 3539, %% the commentary is ours. -%% Service or watchdog is telling the watchdog of an accepting -%% transport to die after connect_timer expiry or reestablished -%% connection (in another transport process) respectively. -transition(close, #watchdog{status = down}) -> - {{accept, _}, _, _} = getr(restart), %% assert - stop; +%% Service is telling the watchdog of an accepting transport to die +%% following transport death in state INITIAL, or after connect_timer +%% expiry; or another watchdog is saying the same after reestablishing +%% a connection previously had by this one. transition(close, #watchdog{}) -> - ok; + {accept, _} = role(), %% assert + stop; %% Service is asking for the peer to be taken down gracefully. transition({shutdown, Pid, _}, #watchdog{parent = Pid, @@ -332,6 +343,12 @@ transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid, send(TPid, {T, self(), Reason}), S#watchdog{shutdown = true}; +%% Transport is telling us that DPA has been sent in response to DPR, +%% or that DPR has been explicitly sent: transport death should lead +%% to ours. +transition({'DPR', TPid}, #watchdog{transport = TPid} = S) -> + S#watchdog{shutdown = true}; + %% Parent process has died, transition({'DOWN', _, process, Pid, _Reason}, #watchdog{parent = Pid}) -> @@ -363,7 +380,7 @@ transition({open, TPid, Hosts, _} = Open, restrict = {_,R}, config = #config{suspect = OS}} = S) -> - case okay(getr(restart), Hosts, R) of + case okay(role(), Hosts, R) of okay -> set_watchdog(S#watchdog{status = okay, num_dwa = OS}); @@ -403,18 +420,33 @@ transition({open = Key, TPid, _Hosts, T}, %% REOPEN Connection down CloseConnection() %% SetWatchdog() DOWN +%% Transport has died after DPA or service requested termination ... transition({'DOWN', _, process, TPid, _Reason}, #watchdog{transport = TPid, shutdown = true}) -> stop; -transition({'DOWN', _, process, TPid, _Reason}, +%% ... or not. +transition({'DOWN', _, process, TPid, _Reason} = D, #watchdog{transport = TPid, - status = T} - = S) -> - set_watchdog(S#watchdog{status = ?CHOOSE(initial == T, T, down), - pending = false, - transport = undefined}); + status = T, + restrict = {_,R}} + = S0) -> + S = S0#watchdog{pending = false, + transport = undefined}, + {M,_} = role(), + + %% Close an accepting watchdog immediately if there's no + %% restriction on the number of connections to the same peer: the + %% state machine never enters state REOPEN in this case. + + if T == initial; + M == accept, not R -> + close(D, S0), + stop; + true -> + set_watchdog(S#watchdog{status = down}) + end; %% Incoming message. transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) -> @@ -422,11 +454,12 @@ transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) -> %% Current watchdog has timed out. transition({timeout, TRef, tw}, #watchdog{tref = TRef} = S) -> - set_watchdog(timeout(S)); + set_watchdog(0, timeout(S)); -%% Timer was canceled after message was already sent. -transition({timeout, _, tw}, #watchdog{}) -> - ok; +%% Message has arrived since the timer was started: subtract time +%% already elapsed from new timer. +transition({timeout, _, tw}, #watchdog{tref = T0} = S) -> + set_watchdog(diameter_lib:micro_diff(T0) div 1000, S); %% State query. transition({state, Pid}, #watchdog{status = S}) -> @@ -454,9 +487,7 @@ encode(dwr = M, Dict0, Mask) -> hop_by_hop_id = Seq}, Pkt = #diameter_packet{header = Hdr, msg = Msg}, - #diameter_packet{bin = Bin} = diameter_codec:encode(Dict0, Pkt), - Bin; - + diameter_codec:encode(Dict0, Pkt); encode(dwa, Dict0, #diameter_packet{header = H, transport_data = TD} = ReqPkt) -> @@ -471,7 +502,7 @@ encode(dwa, Dict0, #diameter_packet{header = H, transport_data = TD} %% okay/3 -okay({{accept, Ref}, _, _}, Hosts, Restrict) -> +okay({accept, Ref}, Hosts, Restrict) -> T = {?MODULE, connection, Ref, Hosts}, diameter_reg:add(T), if Restrict -> @@ -482,7 +513,7 @@ okay({{accept, Ref}, _, _}, Hosts, Restrict) -> %% Register before matching so that at least one of two registering %% processes will match the other. -okay({{connect, _}, _, _}, _, _) -> +okay({connect, _}, _, _) -> okay. %% okay/2 @@ -497,20 +528,34 @@ okay(C) -> [_|_] = [send(P, close) || {_,P} <- C, self() /= P], reopen. +%% role/0 + +role() -> + element(1, getr(restart)). + %% set_watchdog/1 -set_watchdog(#watchdog{tw = TwInit, - tref = TRef} - = S) -> - cancel(TRef), - S#watchdog{tref = erlang:start_timer(tw(TwInit), self(), tw)}; -set_watchdog(stop = No) -> - No. +%% Timer not yet set. +set_watchdog(#watchdog{tref = undefined} = S) -> + set_watchdog(0, S); -cancel(undefined) -> - ok; -cancel(TRef) -> - erlang:cancel_timer(TRef). +%% Timer already set: start at new one only at expiry. +set_watchdog(#watchdog{} = S) -> + S#watchdog{tref = diameter_lib:now()}. + +%% set_watchdog/2 + +set_watchdog(_, stop = No) -> + No; + +set_watchdog(Ms, #watchdog{tw = TwInit} = S) -> + S#watchdog{tref = erlang:start_timer(tw(TwInit, Ms), self(), tw)}. + +%% A callback could return anything, so ensure the result isn't +%% negative. Don't prevent abuse, even though the smallest valid +%% timeout is 4000. +tw(TwInit, Ms) -> + max(tw(TwInit) - Ms, 0). tw(T) when is_integer(T), T >= 6000 -> @@ -525,10 +570,14 @@ send_watchdog(#watchdog{pending = false, dictionary = Dict0, sequence = Mask} = S) -> - send(TPid, {send, encode(dwr, Dict0, Mask)}), + #diameter_packet{bin = Bin} = EPkt = encode(dwr, Dict0, Mask), + diameter_traffic:incr(send, EPkt, TPid, Dict0), + send(TPid, {send, Bin}), ?LOG(send, 'DWR'), S#watchdog{pending = true}. +%% Don't count encode errors since we don't expect any on DWR/DWA. + %% recv/3 recv(Name, Pkt, S) -> @@ -545,16 +594,40 @@ recv(Name, Pkt, S) -> rcv('DWR', Pkt, #watchdog{transport = TPid, dictionary = Dict0}) -> - send(TPid, {send, encode(dwa, Dict0, Pkt)}), + ?LOG(recv, 'DWR'), + DPkt = diameter_codec:decode(Dict0, Pkt), + diameter_traffic:incr(recv, DPkt, TPid, Dict0), + diameter_traffic:incr_error(recv, DPkt, TPid, Dict0), + #diameter_packet{header = H, + transport_data = T, + bin = Bin} + = EPkt + = encode(dwa, Dict0, Pkt), + diameter_traffic:incr(send, EPkt, TPid, Dict0), + diameter_traffic:incr_rc(send, EPkt, TPid, Dict0), + + %% Strip potentially large message terms. + send(TPid, {send, #diameter_packet{header = H, + transport_data = T, + bin = Bin}}), ?LOG(send, 'DWA'); +rcv('DWA', Pkt, #watchdog{transport = TPid, + dictionary = Dict0}) -> + ?LOG(recv, 'DWA'), + diameter_traffic:incr(recv, Pkt, TPid, Dict0), + diameter_traffic:incr_rc(recv, + diameter_codec:decode(Dict0, Pkt), + TPid, + Dict0); + rcv(N, _, _) when N == 'CER'; N == 'CEA'; - N == 'DWA'; - N == 'DPR'; - N == 'DPA' -> + N == 'DPR' -> false; +%% DPR can be sent explicitly with diameter:call/4. Only the +%% corresponding DPAs arrive here. rcv(_, Pkt, #watchdog{transport = TPid, dictionary = Dict0, @@ -740,7 +813,7 @@ timeout(#watchdog{status = T} = S) restart(#watchdog{transport = undefined} = S) -> restart(getr(restart), S); -restart(S) -> +restart(S) -> %% reconnect has won race with timeout S. %% restart/2 @@ -755,25 +828,25 @@ restart(S) -> %% state down rather then initial when receiving notification of an %% open connection. -restart({{connect, _} = T, Opts, Svc}, +restart({{connect, _} = T, Opts, Svc, SvcOpts}, #watchdog{parent = Pid, - sequence = Mask, restrict = {R,_}, dictionary = Dict0} = S) -> send(Pid, {reconnect, self()}), Nodes = restrict_nodes(R), - S#watchdog{transport = start(T, Opts, Mask, Nodes, Dict0, Svc), + S#watchdog{transport = start(T, Opts, SvcOpts, Nodes, Dict0, Svc), restrict = {R, lists:member(node(), Nodes)}}; %% No restriction on the number of connections to the same peer: just %% die. Note that a state machine never enters state REOPEN in this %% case. -restart({{accept, _}, _, _}, #watchdog{restrict = {_, false}}) -> +restart({{accept, _}, _, _, _}, #watchdog{restrict = {_, false}}) -> stop; -%% Otherwise hang around until told to die. -restart({{accept, _}, _, _}, S) -> +%% Otherwise hang around until told to die, either by the service or +%% by another watchdog. +restart({{accept, _}, _, _, _}, S) -> S. %% Don't currently use Opts/Svc in the accept case. diff --git a/lib/diameter/src/base/diameter_watchdog_sup.erl b/lib/diameter/src/base/diameter_watchdog_sup.erl index fc837fe4ef..5d24e12f19 100644 --- a/lib/diameter/src/base/diameter_watchdog_sup.erl +++ b/lib/diameter/src/base/diameter_watchdog_sup.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2011. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% |