diff options
Diffstat (limited to 'lib/diameter')
-rw-r--r-- | lib/diameter/doc/src/notes.xml | 2 | ||||
-rw-r--r-- | lib/diameter/include/diameter.hrl | 2 | ||||
-rw-r--r-- | lib/diameter/include/diameter_gen.hrl | 92 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_codec.erl | 3 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer.erl | 13 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 31 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_traffic.erl | 17 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 5 | ||||
-rw-r--r-- | lib/diameter/src/diameter.appup.src | 82 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 331 | ||||
-rw-r--r-- | lib/diameter/test/diameter_gen_sctp_SUITE.erl | 7 | ||||
-rw-r--r-- | lib/diameter/test/diameter_gen_tcp_SUITE.erl | 2 | ||||
-rw-r--r-- | lib/diameter/test/diameter_traffic_SUITE.erl | 75 | ||||
-rw-r--r-- | lib/diameter/test/diameter_util.erl | 36 | ||||
-rw-r--r-- | lib/diameter/vsn.mk | 2 |
15 files changed, 324 insertions, 376 deletions
diff --git a/lib/diameter/doc/src/notes.xml b/lib/diameter/doc/src/notes.xml index 4ffa09c890..26e6e2ef7a 100644 --- a/lib/diameter/doc/src/notes.xml +++ b/lib/diameter/doc/src/notes.xml @@ -88,7 +88,7 @@ first.</p> </item> <item> <p> - Don't confuse Result-Code and Experimental-Result</p> + Don't confuse Result-Code and Experimental-Result.</p> <p> The errors field of a decoded diameter_packet record was populated with a Result-Code AVP when an diff --git a/lib/diameter/include/diameter.hrl b/lib/diameter/include/diameter.hrl index e9c8b866cf..91ddd56cd6 100644 --- a/lib/diameter/include/diameter.hrl +++ b/lib/diameter/include/diameter.hrl @@ -40,7 +40,7 @@ %% -record(diameter_event, {service, %% name - info}). %% tuple() + info}). %% term() %% diameter_packet records are passed through the encode/decode %% interface supplied by a dictionary module configured on a Diameter diff --git a/lib/diameter/include/diameter_gen.hrl b/lib/diameter/include/diameter_gen.hrl index bc0e79216b..a43ab4edb2 100644 --- a/lib/diameter/include/diameter_gen.hrl +++ b/lib/diameter/include/diameter_gen.hrl @@ -334,16 +334,10 @@ d(Name, Avp, Acc) -> {H, A} = ungroup(V, Avp), {[H | Avps], pack_avp(Name, A, T)} catch - throw: {?TAG, {grouped, RC, ComponentAvps}} -> - {Avps, {Rec, Errors}} = Acc, - A = trim(Avp), - {[[A | trim(ComponentAvps)] | Avps], {Rec, [{RC, A} | Errors]}}; + throw: {?TAG, {grouped, Error, ComponentAvps}} -> + g(is_failed(), Error, Name, trim(Avp), Acc, ComponentAvps); error: Reason -> - d(undefined == Failed orelse is_failed(), - Reason, - Name, - trim(Avp), - Acc) + d(is_failed(), Reason, Name, trim(Avp), Acc) after reset(?STRICT_KEY, Strict), reset(?FAILED_KEY, Failed) @@ -381,6 +375,27 @@ dict(true) -> dict(_) -> ?MODULE. +%% g/5 + +%% Ignore decode errors within Failed-AVP (best-effort) ... +g(true, [_Error | Rec], Name, Avp, Acc, _ComponentAvps) -> + decode_AVP(Name, Avp#diameter_avp{value = Rec}, Acc); +g(true, _Error, Name, Avp, Acc, _ComponentAvps) -> + decode_AVP(Name, Avp, Acc); + +%% ... or not. +g(false, [Error | _Rec], _Name, Avp, Acc, ComponentAvps) -> + g(Error, Avp, Acc, ComponentAvps); +g(false, Error, _Name, Avp, Acc, ComponentAvps) -> + g(Error, Avp, Acc, ComponentAvps). + +%% g/4 + +g({RC, ErrorData}, Avp, Acc, ComponentAvps) -> + {Avps, {Rec, Errors}} = Acc, + E = Avp#diameter_avp{data = [ErrorData]}, + {[[Avp | trim(ComponentAvps)] | Avps], {Rec, [{RC, E} | Errors]}}. + %% d/5 %% Ignore a decode error within Failed-AVP ... @@ -425,14 +440,26 @@ is_strict() -> %% Strictly, this doesn't need to be the case. relax('Failed-AVP') -> - is_failed() orelse putr(?FAILED_KEY, true); + putr(?FAILED_KEY, true); relax(_) -> is_failed(). - + +%% is_failed/0 +%% +%% Is the AVP currently being decoded nested within Failed-AVP? Note +%% that this is only true when Failed-AVP is the parent. In +%% particular, it's not true when Failed-AVP itself is being decoded +%% (unless nested). + is_failed() -> true == getr(?FAILED_KEY). +%% is_failed/1 + +is_failed(Name) -> + 'Failed-AVP' == Name orelse is_failed(). + %% reset/2 reset(Key, undefined) -> @@ -452,8 +479,8 @@ decode_AVP(Name, Avp, {Avps, Acc}) -> %% diameter_types will raise an error of this form to communicate %% DIAMETER_INVALID_AVP_LENGTH (5014). A module specified to a -%% @custom_types tag in a spec file can also raise an error of this -%% form. +%% @custom_types tag in a dictionary file can also raise an error of +%% this form. rc({'DIAMETER', 5014 = RC, _}, #diameter_avp{name = AvpName} = Avp) -> {RC, Avp#diameter_avp{data = empty_value(AvpName)}}; @@ -529,17 +556,16 @@ pack_AVP(Name, #diameter_avp{is_mandatory = M, name = AvpName} = Avp, Acc) -> %% allow for Failed-AVP in an answer-message. pack_arity(Name, AvpName, M) -> - IsFailed = Name == 'Failed-AVP' orelse is_failed(), %% Not testing just Name /= 'Failed-AVP' means we're changing the %% packing of AVPs nested within Failed-AVP, but the point of %% ignoring errors within Failed-AVP is to decode as much as %% possible, and failing because a mandatory AVP couldn't be - %% packed into a dedicated field defeats that point. Note that we - %% can't just test not is_failed() since this will be 'true' when - %% packing an unknown AVP directly within Failed-AVP. + %% packed into a dedicated field defeats that point. Note + %% is_failed/1 since is_failed/0 will return false when packing + %% 'AVP' within Failed-AVP. - pack_arity(IsFailed + pack_arity(is_failed(Name) orelse {Name, AvpName} == {'answer-message', 'Failed-AVP'} orelse not M orelse not is_strict(), @@ -611,9 +637,12 @@ value(_, Avp) -> -> binary() | no_return(). -%% Length error induced by diameter_codec:collect_avps/1. +%% Length error induced by diameter_codec:collect_avps/1: the AVP +%% length in the header was too short (insufficient for the extracted +%% header) or too long (past the end of the message). An empty payload +%% is sufficient according to the RFC text for 5014. grouped_avp(decode, _Name, <<0:1, _/binary>>) -> - throw({?TAG, {grouped, 5014, []}}); + throw({?TAG, {grouped, {5014, []}, []}}); grouped_avp(decode, Name, Data) -> grouped_decode(Name, diameter_codec:collect_avps(Data)); @@ -627,13 +656,28 @@ grouped_avp(encode, Name, Data) -> %% decoded value, also returning the list of component diameter_avp %% records. +%% Length error in trailing component AVP. grouped_decode(_Name, {Error, Acc}) -> - {RC, Avp} = Error, - throw({?TAG, {grouped, RC, [Avp | Acc]}}); - + {5014, Avp} = Error, + throw({?TAG, {grouped, Error, [Avp | Acc]}}); + +%% 7.5. Failed-AVP AVP + +%% In the case where the offending AVP is embedded within a Grouped AVP, +%% the Failed-AVP MAY contain the grouped AVP, which in turn contains +%% the single offending AVP. The same method MAY be employed if the +%% grouped AVP itself is embedded in yet another grouped AVP and so on. +%% In this case, the Failed-AVP MAY contain the grouped AVP hierarchy up +%% to the single offending AVP. This enables the recipient to detect +%% the location of the offending AVP when embedded in a group. + +%% An error in decoding a component AVP throws the first fauly +%% component, which the catch in d/3 wraps in the Grouped AVP in +%% question. A partially decoded record is only used when ignoring +%% errors in Failed-AVP. grouped_decode(Name, ComponentAvps) -> {Rec, Avps, Es} = decode_avps(Name, ComponentAvps), - [] == Es orelse throw({?TAG, {grouped, 5004, Avps}}), %% decode failure + [] == Es orelse throw({?TAG, {grouped, [{_,_} = hd(Es) | Rec], Avps}}), {Rec, Avps}. %% --------------------------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl index c8bd0ebd15..34276a1674 100644 --- a/lib/diameter/src/base/diameter_codec.erl +++ b/lib/diameter/src/base/diameter_codec.erl @@ -591,6 +591,7 @@ split_head(<<Code:32, 0:1, M:1, P:1, _:5, Len:24, _/binary>>) -> %% Header is truncated. split_head(Bin) -> ?THROW({5014, #diameter_avp{data = Bin}}). +%% Note that pack_avp/1 will pad this at encode if sent in a Failed-AVP. %% 3588: %% @@ -620,7 +621,7 @@ split_head(Bin) -> %% AVP header with zero up to the minimum AVP header length. %% %% The underlined clause must be in error since (1) a header less than -%% the minimum value mean we don't know the identity of the AVP and +%% the minimum value mean we might not know the identity of the AVP and %% (2) the last sentence covers this case. %% split_data/3 diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index e19f011920..acec91c43f 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -122,7 +122,7 @@ pair([{transport_module, M} | Rest], Mods, Acc) -> pair([{transport_config = T, C} | Rest], Mods, Acc) -> pair([{T, C, ?DEFAULT_TTMO} | Rest], Mods, Acc); pair([{transport_config, C, Tmo} | Rest], Mods, Acc) -> - pair(Rest, [], acc({Mods, C, Tmo}, Acc)); + pair(Rest, [], acc({lists:reverse(Mods), C, Tmo}, Acc)); pair([_ | Rest], Mods, Acc) -> pair(Rest, Mods, Acc); @@ -131,13 +131,16 @@ pair([_ | Rest], Mods, Acc) -> pair([], [], []) -> [{[?DEFAULT_TMOD], ?DEFAULT_TCFG, ?DEFAULT_TTMO}]; -%% One transport_module, one transport_config. -pair([], [M], [{[], Cfg, Tmo}]) -> - [{[M], Cfg, Tmo}]; +%% One transport_module, one transport_config: ignore option order. +%% That is, interpret [{transport_config, _}, {transport_module, _}] +%% as if the order was reversed, not as config with default module and +%% module with default config. +pair([], [_] = Mods, [{[], Cfg, Tmo}]) -> + [{Mods, Cfg, Tmo}]; %% Trailing transport_module: default transport_config. pair([], [_|_] = Mods, Acc) -> - lists:reverse(acc({Mods, ?DEFAULT_TCFG, ?DEFAULT_TTMO}, Acc)); + pair([{transport_config, ?DEFAULT_TCFG}], Mods, Acc); pair([], [], Acc) -> lists:reverse(def(Acc)). diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 9e8bf2ffcd..f5e04d3eae 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -120,7 +120,6 @@ service :: #diameter_service{}, dpr = false :: false | true %% DPR received, DPA sent - | {uint32(), uint32()} %% set in old code | {boolean(), uint32(), uint32()}, %% hop by hop and end to end identifiers in %% outgoing DPR; boolean says whether or not @@ -156,8 +155,7 @@ %% # start/3 %% --------------------------------------------------------------------------- --spec start(T, [Opt], {[diameter:service_opt()] - | diameter:sequence(), %% from old code +-spec start(T, [Opt], {[diameter:service_opt()], [node()], module(), #diameter_service{}}) @@ -196,9 +194,6 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({Ack, WPid, T, Opts, {{_,_} = Mask, Nodes, Dict0, Svc}}) -> %% from old code - i({Ack, WPid, T, Opts, {[{sequence, Mask}], Nodes, Dict0, Svc}}); - i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> erlang:monitor(process, WPid), wait(Ack, WPid), @@ -330,14 +325,11 @@ handle_info(T, #state{} = State) -> {?MODULE, Tag, Reason} -> ?LOG(stop, Tag), {stop, {shutdown, Reason}, State} - end; + end. %% The form of the throw caught here is historical. It's %% significant that it's not a 2-tuple, as in ?FAILURE(Reason), %% since these are caught elsewhere. -handle_info(T, S) -> %% started in old code - handle_info(T, #state{} = erlang:append_element(S, infinity)). - %% Note that there's no guarantee that the service and transport %% capabilities are good enough to build a CER/CEA that can be %% succesfully encoded. It's not checked at diameter:add_transport/2 @@ -367,9 +359,6 @@ eraser(Key) -> %% transition/2 -transition(T, #state{dpr = {Hid, Eid}} = S) -> %% DPR sent from old code - transition(T, S#state{dpr = {false, Hid, Eid}}); - %% Connection to peer. transition({diameter, {TPid, connected, Remote}}, #state{transport = TPid, @@ -1297,25 +1286,15 @@ dpa_timer(Tmo) -> erlang:send_after(Tmo, self(), dpa_timeout). dpa_timeout() -> - dpa_timeout(getr(?DPA_KEY)). - -dpa_timeout({_, Tmo}) -> - Tmo; -dpa_timeout(undefined) -> %% set in old code - ?DPA_TIMEOUT; -dpa_timeout(Tmo) -> %% ditto + {_, Tmo} = getr(?DPA_KEY), Tmo. dpr_timer() -> dpa_timer(dpr_timeout()). dpr_timeout() -> - dpr_timeout(getr(?DPA_KEY)). - -dpr_timeout({Tmo, _}) -> - Tmo; -dpr_timeout(_) -> %% set in old code - ?DPR_TIMEOUT. + {Tmo, _} = getr(?DPA_KEY), + Tmo. %% register_everywhere/1 %% diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index 9d6fbc9113..692a01e651 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -98,9 +98,6 @@ %% # make_recvdata/1 %% --------------------------------------------------------------------------- -make_recvdata([SvcName, PeerT, Apps, {_,_} = Mask | _]) -> %% from old code - make_recvdata([SvcName, PeerT, Apps, [{sequence, Mask}]]); - make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) -> {_,_} = Mask = proplists:get_value(sequence, SvcOpts), #recvdata{service_name = SvcName, @@ -262,7 +259,8 @@ recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> %% any others are discarded. %% ... or not. -recv(false, false, TPid, _, _, _) -> +recv(false, false, TPid, Pkt, _, _) -> + ?LOG(discarded, Pkt#diameter_packet.header), incr(TPid, {{unknown, 0}, recv, discarded}), ok. @@ -301,13 +299,7 @@ recv_request(TPid, RecvData), TPid, Dict0, - RecvData); - -recv_request(TPid, Pkt, Dict0, RecvData) -> %% from old code - recv_request(TPid, - Pkt, - Dict0, - #recvdata{} = erlang:append_element(RecvData, [])). + RecvData). %% recv_R/5 @@ -1642,9 +1634,6 @@ pick_peer(SvcName, Filter, Xtra})). -pick({{_,_,_} = Transport, Mask}) -> %% from old code; dialyzer complains - {Transport, Mask, []}; %% about this - pick(false) -> {error, no_connection}; diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 5f333684bc..c6d0a0b6ed 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -811,9 +811,6 @@ restart(S) -> %% reconnect has won race with timeout %% state down rather then initial when receiving notification of an %% open connection. -restart({T, Opts, Svc}, S) -> %% put in old code - restart({T, Opts, Svc, []}, S); - restart({{connect, _} = T, Opts, Svc, SvcOpts}, #watchdog{parent = Pid, restrict = {R,_}, @@ -828,7 +825,7 @@ restart({{connect, _} = T, Opts, Svc, SvcOpts}, %% die. Note that a state machine never enters state REOPEN in this %% case. restart({{accept, _}, _, _, _}, #watchdog{restrict = {_, false}}) -> - stop; %% 'DOWN' was in old code: 'close' was not sent + stop; %% Otherwise hang around until told to die, either by the service or %% by another watchdog. diff --git a/lib/diameter/src/diameter.appup.src b/lib/diameter/src/diameter.appup.src index 8e82eb8014..86d231179c 100644 --- a/lib/diameter/src/diameter.appup.src +++ b/lib/diameter/src/diameter.appup.src @@ -38,42 +38,11 @@ {"1.5", [{restart_application, diameter}]}, %% R16B03 {"1.6", [{restart_application, diameter}]}, %% 17.0 {"1.7", [{restart_application, diameter}]}, %% 17.[12] - {<<"^1\\.(7\\.1|8)$">>, %% 17.[34] - [{load_module, diameter_lib}, - {load_module, diameter_peer}, - {load_module, diameter_reg}, - {load_module, diameter_session}, - {load_module, diameter_stats}, - {load_module, diameter_sync}, - {load_module, diameter_capx}, - {load_module, diameter_codec}, - {load_module, diameter_types}, - {load_module, diameter_traffic}, - {load_module, diameter_service}, - {load_module, diameter_peer_fsm}, - {load_module, diameter_watchdog}, - {load_module, diameter_tcp}, - {load_module, diameter_sctp}, - {load_module, diameter_config}, - {load_module, diameter}, - {load_module, diameter_gen_base_rfc6733}, - {load_module, diameter_gen_acct_rfc6733}, - {load_module, diameter_gen_base_rfc3588}, - {load_module, diameter_gen_base_accounting}, - {load_module, diameter_gen_relay}, - {update, diameter_transport_sup, supervisor}, - {update, diameter_service_sup, supervisor}, - {update, diameter_sup, supervisor}]}, - {"1.9", [{load_module, diameter_codec}, %% 17.5 - {load_module, diameter_traffic}, - {load_module, diameter_sctp}, - {load_module, diameter_gen_base_rfc6733}, - {load_module, diameter_gen_acct_rfc6733}, - {load_module, diameter_gen_base_rfc3588}, - {load_module, diameter_gen_base_accounting}, - {load_module, diameter_gen_relay}]}, - {"1.9.1", [{load_module, diameter_traffic}, %% 17.5.3 - {load_module, diameter_sctp}]} + {"1.7.1", [{restart_application, diameter}]}, %% 17.3 + {"1.8", [{restart_application, diameter}]}, %% 17.4 + {"1.9", [{restart_application, diameter}]}, %% 17.5 + {"1.9.1", [{restart_application, diameter}]}, %% 17.5.3 + {"1.9.2", [{restart_application, diameter}]} %% 17.5.5 ], [ {"0.9", [{restart_application, diameter}]}, @@ -93,41 +62,10 @@ {"1.5", [{restart_application, diameter}]}, {"1.6", [{restart_application, diameter}]}, {"1.7", [{restart_application, diameter}]}, - {<<"^1\\.(7\\.1|8)$">>, - [{update, diameter_sup, supervisor}, - {update, diameter_service_sup, supervisor}, - {update, diameter_transport_sup, supervisor}, - {load_module, diameter_gen_relay}, - {load_module, diameter_gen_base_accounting}, - {load_module, diameter_gen_base_rfc3588}, - {load_module, diameter_gen_acct_rfc6733}, - {load_module, diameter_gen_base_rfc6733}, - {load_module, diameter}, - {load_module, diameter_config}, - {load_module, diameter_sctp}, - {load_module, diameter_tcp}, - {load_module, diameter_watchdog}, - {load_module, diameter_peer_fsm}, - {load_module, diameter_service}, - {load_module, diameter_traffic}, - {load_module, diameter_types}, - {load_module, diameter_codec}, - {load_module, diameter_capx}, - {load_module, diameter_sync}, - {load_module, diameter_stats}, - {load_module, diameter_session}, - {load_module, diameter_reg}, - {load_module, diameter_peer}, - {load_module, diameter_lib}]}, - {"1.9", [{load_module, diameter_gen_relay}, - {load_module, diameter_gen_base_accounting}, - {load_module, diameter_gen_base_rfc3588}, - {load_module, diameter_gen_acct_rfc6733}, - {load_module, diameter_gen_base_rfc6733}, - {load_module, diameter_sctp}, - {load_module, diameter_traffic}, - {load_module, diameter_codec}]}, - {"1.9.1", [{load_module, diameter_sctp}, - {load_module, diameter_traffic}]} + {"1.7.1", [{restart_application, diameter}]}, + {"1.8", [{restart_application, diameter}]}, + {"1.9", [{restart_application, diameter}]}, + {"1.9.1", [{restart_application, diameter}]}, + {"1.9.2", [{restart_application, diameter}]} ] }. diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 110fa2c6e7..3e08b78ea1 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -104,15 +104,11 @@ {ref :: reference(), socket :: gen_sctp:sctp_socket(), count = 0 :: uint(), %% attached transport processes - tmap = ets:new(?MODULE, []) :: ets:tid(), - %% {MRef, Pid|AssocId}, {AssocId, Pid} - pending = {0, ets:new(?MODULE, [ordered_set])}, + pending = {0, queue:new()}, tref :: reference(), accept :: [match()]}). -%% Field tmap is used to map an incoming message or event to the -%% relevant transport process. Field pending implements two queues: -%% the first of transport-to-be processes to which an association has -%% been assigned (at comm_up and written into tmap) but for which +%% Field pending implements two queues: the first of transport-to-be +%% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived %% state of affairs as a new transport is spawned as a consequence of %% a peer being taken up, transport processes being spawned by the @@ -126,8 +122,7 @@ %% queue or spawned and placed in the first queue. Thus, there are %% only elements in one queue at a time, so share an ets table queue %% and tag it with a positive length if it contains the first queue, a -%% negative length if it contains the second queue. The case -1 is -%% handled differently for backwards compatibility reasons. +%% negative length if it contains the second queue. %% --------------------------------------------------------------------------- %% # start/3 @@ -229,7 +224,7 @@ i({listen, Ref, {Opts, Addrs}}) -> proc_lib:init_ack({ok, self(), LAs}), start_timer(#listener{ref = Ref, socket = Sock, - accept = accept(Matches)}); + accept = [[M] || {accept, M} <- Matches]}); %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> @@ -244,43 +239,49 @@ i({connect, Pid, Opts, Addrs, Ref}) -> mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock}; -%% An accepting transport spawned by diameter. -i({accept, Pid, LPid, Sock, Ref}) +%% An accepting transport spawned by diameter, not yet owning an +%% association. +i({accept, Ref, LPid, Pid}) when is_pid(Pid) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), monitor(process, Pid), - monitor(process, LPid), - #transport{parent = Pid, - mode = {accept, LPid}, - socket = Sock}; + MRef = monitor(process, LPid), + wait([{peeloff, MRef}], #transport{parent = Pid, + mode = {accept, LPid}}); -%% An accepting transport spawned at association establishment. -i({accept, Ref, LPid, Sock, Id}) -> +%% An accepting transport spawned at association establishment, whose +%% parent is not yet known. +i({accept, Ref, LPid}) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), + erlang:send_after(?ACCEPT_TIMEOUT, self(), accept_timeout), MRef = monitor(process, LPid), - %% Wait for a signal that the transport has been started before - %% processing other messages. + wait([{parent, Ref}, {peeloff, MRef}], #transport{mode = {accept, LPid}}). + +%% wait/2 +%% +%% Wait for diameter to start the transport process and for the +%% association to be peeled off before processing other messages. + +wait(Keys, S) -> + lists:foldl(fun i/2, S, Keys). + +i({K, Ref}, #transport{mode = {accept, _}} = S) -> receive - {Ref, Pid} -> %% transport started - #transport{parent = Pid, - mode = {accept, LPid}, - socket = Sock}; - {'DOWN', MRef, process, _, _} = T -> %% listener down - close(Sock, Id), + {Ref, Pid} when K == parent -> %% transport process started + S#transport{parent = Pid}; + {K, T, Matches} when K == peeloff -> %% association + {sctp, Sock, _RA, _RP, _Data} = T, + ok = accept_peer(Sock, Matches), + demonitor(Ref, [flush]), + t(T, S#transport{socket = Sock}); + accept_timeout = T -> + x(T); + {'DOWN', _, process, _, _} = T -> x(T) - after ?ACCEPT_TIMEOUT -> - close(Sock, Id), - x(timeout) end. -%% close/2 - -close(Sock, Id) -> - gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Id}). -%% Having to pass a record here is hokey. - %% listener/2 %% Accepting processes can be started concurrently: ensure only one @@ -298,7 +299,7 @@ listener({LRef, T}) -> l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> {LAs, _Sock} = AS, {LPid, LAs}; - + %% ... or not. l([], LRef, T) -> {ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}), @@ -368,17 +369,11 @@ type(T) -> %% # handle_call/3 %% --------------------------------------------------------------------------- -handle_call(T, From, #listener{pending = L} = S) - when is_list(L) -> - handle_call(T, From, upgrade(S)); - handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, - pending = {N,Q}, count = K} = S) -> - TPid = accept(Ref, Pid, S), - {reply, {ok, TPid}, downgrade(S#listener{pending = {N-1,Q}, - count = K+1})}; + {TPid, NewS} = accept(Ref, Pid, S), + {reply, {ok, TPid}, NewS#listener{count = K+1}}; handle_call(_, _, State) -> {reply, nok, State}. @@ -397,18 +392,9 @@ handle_cast(_, State) -> handle_info(T, #transport{} = S) -> {noreply, #transport{} = t(T,S)}; -handle_info(T, #listener{pending = L} = S) - when is_list(L) -> - handle_info(T, upgrade(S)); - handle_info(T, #listener{} = S) -> - {noreply, downgrade(#listener{} = l(T,S))}. - -%% upgrade/1 + {noreply, #listener{} = l(T,S)}. -upgrade(#listener{pending = [TPid | {0,Q}]} = S) -> - ets:insert(Q, {TPid, now()}), - S#listener{pending = {-1,Q}}. %% Prior to the possiblity of setting pool_size on in transport %% configuration, a new accepting transport was only started following %% the death of a predecessor, so that there was only at most one @@ -417,26 +403,6 @@ upgrade(#listener{pending = [TPid | {0,Q}]} = S) -> %% several accepting transports are started concurrently. Deal with %% this by placing the started transports in a new queue of transport %% processes waiting for an association. -%% -%% Since only one of this queue and the existing queue of controlling -%% processes waiting for a transport to be started can be non-empty at -%% any given time, implement both queues in the same ets table. The -%% absolute value of the first element of the 2-tuple is the queue -%% length, the sign says which queue it is. - -%% downgrade/1 -%% -%% Revert to the pre-pool_size representation when possible, for -%% backwards compatibility in the case that the pool_size option -%% hasn't been used. - -downgrade(#listener{pending = {-1,Q}} = S) -> - TPid = ets:first(Q), - ets:delete(Q, TPid), - S#listener{pending = [TPid | {0,Q}]}; - -downgrade(S) -> - S. %% --------------------------------------------------------------------------- %% # code_change/3 @@ -452,16 +418,6 @@ code_change(_, State, _) -> terminate(_, #transport{assoc_id = undefined}) -> ok; -terminate(_, #transport{socket = Sock, - mode = accept, - assoc_id = Id}) -> - close(Sock, Id); - -terminate(_, #transport{socket = Sock, - mode = {accept, _}, - assoc_id = Id}) -> - close(Sock, Id); - terminate(_, #transport{socket = Sock}) -> gen_sctp:close(Sock); @@ -488,21 +444,17 @@ start_timer(S) -> %% Transition listener state. %% Incoming message from SCTP. -l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) -> +l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, + accept = Matches} + = S) -> Id = assoc_id(Data), + {TPid, NewS} = accept(S), + TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches}, + setopts(Sock), + NewS; - try find(Id, Data, S) of - {TPid, NewS} -> - TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept}, - NewS; - false -> - S - after - setopts(Sock) - end; - -l({'DOWN', MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> - down(ets:member(Q, TPid), MRef, TPid, S); +l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> + down(queue:member(TPid, Q), TPid, S); %% Timeout after the last accepting process has died. l({timeout, TRef, close = T}, #listener{tref = TRef, @@ -511,36 +463,25 @@ l({timeout, TRef, close = T}, #listener{tref = TRef, l({timeout, _, close}, #listener{} = S) -> S. -%% down/4 +%% down/3 +%% +%% Accepting transport has died. + +%% One that's waiting for transport start in the pending queue ... +down(true, TPid, #listener{pending = {N,Q}, + count = K} + = S) -> + NQ = queue:filter(fun(P) -> P /= TPid end, Q), + if N < 0 -> %% awaiting an association ... + start_timer(S#listener{count = K-1, + pending = {N+1, NQ}}); + true -> %% ... or one has been assigned + S#listener{pending = {N-1, NQ}} + end; -%% Accepting transport has died. One that's awaiting an association ... -down(true, MRef, TPid, #listener{pending = {N,Q}, - tmap = T, - count = K} - = S) - when N < 0 -> - ets:delete(Q, TPid), - ets:delete(T, MRef), - ets:delete(T, TPid), - start_timer(S#listener{count = K-1, - pending = {N+1,Q}}); - -%% ... or one that already has one. -down(B, MRef, TPid, #listener{socket = Sock, - tmap = T, - count = K, - pending = {N,Q}} - = S) -> - [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId - ets:delete(T, MRef), - ets:delete(T, Id), - Id == TPid orelse close(Sock, Id), - if B -> %% Waiting for attachment in the pending queue ... - ets:delete(Q, TPid), - S#listener{pending = {N-1,Q}}; - true -> %% ... or already attached - start_timer(S#listener{count = K-1}) - end. +%% ... or one that's already attached. +down(false, _TPid, #listener{count = K} = S) -> + start_timer(S#listener{count = K-1}). %% t/2 %% @@ -558,20 +499,10 @@ t(T,S) -> %% transition/2 -%% Listening process is transfering ownership of an association. -transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg, Matches}, - #transport{mode = {accept, _}, - socket = LSock} - = S) -> - ok = accept_peer(Sock, Matches), - transition(Msg, S#transport{socket = Sock}); - %% Incoming message. -transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> +transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> setopts(Sock), recv(Data, S); -%% Don't match on Sock since in R15B01 it can be the listening socket -%% in the (peeled-off) accept case, which is likely a bug. %% Outgoing message. transition({diameter, {send, Msg}}, S) -> @@ -593,13 +524,8 @@ transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> stop; -%% Listener process has died. -transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) -> - stop; - -%% Ditto but we have ownership of the association. It might be that -%% we'll go down anyway though. -transition({'DOWN', _, process, _Pid, _}, #transport{mode = accept}) -> +%% Timeout after transport process has been started. +transition(accept_timeout, _) -> ok; %% Request for the local port number. @@ -626,37 +552,27 @@ accept_peer(Sock, Matches) -> orelse x({accept, RAddrs, Matches}), ok. -%% accept/1 - -accept(Opts) -> - [[M] || {accept, M} <- Opts]. - %% accept/3 %% %% Start a new transport process or use one that's already been -%% started as a consequence of association establishment. +%% started as a consequence of diameter requesting a transport +%% process. -%% No pending associations: spawn a new transport. -accept(Ref, Pid, #listener{socket = Sock, - tmap = T, - pending = {N,Q}}) - when N =< 0 -> - Arg = {accept, Pid, self(), Sock, Ref}, - {ok, TPid} = diameter_sctp_sup:start_child(Arg), - MRef = monitor(process, TPid), - ets:insert(T, [{MRef, TPid}, {TPid, MRef}]), - ets:insert(Q, {TPid, now()}), - TPid; -%% Placing the transport in the second pending table makes it -%% available to the next association. +accept(Ref, Pid, #listener{pending = {N,_}} = S) -> + {TPid, NQ} = q(Ref, Pid, S), + {TPid, S#listener{pending = {N-1, NQ}}}. %% Pending associations: attach to the first in the queue. -accept(_, Pid, #listener{ref = Ref, - pending = {_,Q}}) -> - TPid = ets:first(Q), +q(_, Pid, #listener{ref = Ref, + pending = {N,Q}}) + when 0 < N -> + {TPid, _} = T = dq(Q), TPid ! {Ref, Pid}, - ets:delete(Q, TPid), - TPid. + T; + +%% No pending associations: spawn a new transport. +q(Ref, Pid, #listener{pending = {_,Q}}) -> + nq({accept, Ref, self(), Pid}, Q). %% send/2 @@ -717,7 +633,7 @@ recv({_, #sctp_assoc_change{} = E}, = S) -> S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}}; -%% Lost association after establishment. +%% Association failure. recv({_, #sctp_assoc_change{}}, _) -> stop; @@ -728,8 +644,10 @@ recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid}) bin = Bin}), ok; -recv({_, #sctp_shutdown_event{assoc_id = Id}}, - #transport{assoc_id = Id}) -> +recv({_, #sctp_shutdown_event{assoc_id = A}}, + #transport{assoc_id = Id}) + when A == Id; + A == 0 -> stop; %% Note that diameter_sctp(3) documents that sctp_events cannot be @@ -765,52 +683,49 @@ up(#transport{parent = Pid, diameter_peer:up(Pid), S#transport{mode = A}. -%% find/3 - -find(Id, Data, #listener{tmap = T} = S) -> - f(ets:lookup(T, Id), Data, S). - -%% New association ... -f([], - {_, #sctp_assoc_change{state = comm_up, assoc_id = Id}}, - #listener{pending = {N,Q}} - = S) -> - {find(Id, S), S#listener{pending = {N+1,Q}}}; - -%% Known association ... -f([{_, TPid}], _, S) -> - {TPid, S}; - -%% ... or not: discard. -f([], _, _) -> - false. +%% accept/1 +%% +%% Start a new transport process or use one that's already been +%% started as a consequence of an event to a listener process. -%% find/2 +accept(#listener{pending = {N,_}} = S) -> + {TPid, NQ} = q(S), + {TPid, S#listener{pending = {N+1, NQ}}}. %% Transport waiting for an association: use it. -find(Id, #listener{tmap = T, - pending = {N,Q}}) +q(#listener{pending = {N,Q}}) when N < 0 -> - TPid = ets:first(Q), - [{TPid, MRef}] = ets:lookup(T, TPid), - ets:insert(T, [{MRef, Id}, {Id, TPid}]), - ets:delete(T, TPid), - ets:delete(Q, TPid), - TPid; + dq(Q); %% No transport start yet: spawn one and queue. -find(Id, #listener{ref = Ref, - socket = Sock, - tmap = T, - pending = {_,Q}}) -> - Arg = {accept, Ref, self(), Sock, Id}, +q(#listener{ref = Ref, + pending = {_,Q}}) -> + nq({accept, Ref, self()}, Q). + +%% nq/2 +%% +%% Place a transport process in the second pending queue to make it +%% available to the next association. + +nq(Arg, Q) -> {ok, TPid} = diameter_sctp_sup:start_child(Arg), - MRef = monitor(process, TPid), - ets:insert(T, [{MRef, Id}, {Id, TPid}]), - ets:insert(Q, {TPid, now()}), - TPid. + monitor(process, TPid), + {TPid, queue:in(TPid, Q)}. + +%% dq/1 +%% +%% Remove a transport process from the first pending queue to assign +%% it to an existing association. + +dq(Q) -> + {{value, TPid}, NQ} = queue:out(Q), + {TPid, NQ}. %% assoc_id/1 +%% +%% It's unclear if this is needed, or if the first message on an +%% association is always sctp_assoc_change, but don't assume since +%% SCTP behaviour differs between operating systems. assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) -> Id; diff --git a/lib/diameter/test/diameter_gen_sctp_SUITE.erl b/lib/diameter/test/diameter_gen_sctp_SUITE.erl index 803a93f02f..c739643dbe 100644 --- a/lib/diameter/test/diameter_gen_sctp_SUITE.erl +++ b/lib/diameter/test/diameter_gen_sctp_SUITE.erl @@ -34,7 +34,7 @@ %% testcases -export([send_not_from_controlling_process/1, - send_from_multiple_clients/1, + send_from_multiple_clients/1, send_from_multiple_clients/0, receive_what_was_sent/1]). -include_lib("kernel/include/inet_sctp.hrl"). @@ -59,7 +59,7 @@ %% =========================================================================== suite() -> - [{timetrap, {minutes, 2}}]. + [{timetrap, {seconds, 10}}]. all() -> [send_not_from_controlling_process, @@ -168,6 +168,9 @@ send(Sock, Id) -> %% %% Demonstrates sluggish delivery of messages. +send_from_multiple_clients() -> + [{timetrap, {seconds, 60}}]. + send_from_multiple_clients(_) -> {S, Rs} = T = send_from_multiple_clients(8, 1024), Max = ?FOREVER*1000, diff --git a/lib/diameter/test/diameter_gen_tcp_SUITE.erl b/lib/diameter/test/diameter_gen_tcp_SUITE.erl index c7e5e9279e..2be2cf4b35 100644 --- a/lib/diameter/test/diameter_gen_tcp_SUITE.erl +++ b/lib/diameter/test/diameter_gen_tcp_SUITE.erl @@ -40,7 +40,7 @@ %% =========================================================================== suite() -> - [{timetrap, {minutes, 2}}]. + [{timetrap, {seconds, 10}}]. all() -> [connect, %% Appears to fail only when run first. diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index 70e5f6ffee..7e316c03f1 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -49,6 +49,7 @@ send_unknown_mandatory/1, send_unknown_short_mandatory/1, send_noreply/1, + send_grouped_error/1, send_unsupported/1, send_unsupported_app/1, send_error_bit/1, @@ -269,15 +270,15 @@ groups() -> SD <- ?STRING_DECODES, CD <- ?STRING_DECODES] ++ - [{traffic, [parallel], [{group, ?util:name([T,R,D,A,C,SD,CD])} - || T <- ?TRANSPORTS, - T /= sctp orelse Sctp, - R <- ?ENCODINGS, - D <- ?RFCS, - A <- ?ENCODINGS, - C <- ?CONTAINERS, - SD <- ?STRING_DECODES, - CD <- ?STRING_DECODES]}]. + [{traffic, [], [{group, ?util:name([T,R,D,A,C,SD,CD])} + || T <- ?TRANSPORTS, + T /= sctp orelse Sctp, + R <- ?ENCODINGS, + D <- ?RFCS, + A <- ?ENCODINGS, + C <- ?CONTAINERS, + SD <- ?STRING_DECODES, + CD <- ?STRING_DECODES]}]. init_per_group(Name, Config) -> case ?util:name(Name) of @@ -330,6 +331,7 @@ tc() -> send_unknown_mandatory, send_unknown_short_mandatory, send_noreply, + send_grouped_error, send_unsupported, send_unsupported_app, send_error_bit, @@ -574,7 +576,7 @@ send_unknown_mandatory(Config) -> send_unknown_short_mandatory(Config) -> send_unknown_short(Config, true, ?INVALID_AVP_LENGTH). -%% Send an ACR containing an unexpected mandatory Session-Timeout. +%% Send an ASR containing an unexpected mandatory Session-Timeout. %% Expect 5001, and check that the value in Failed-AVP was decoded. send_unexpected_mandatory_decode(Config) -> Req = ['ASR', {'AVP', [#diameter_avp{code = 27, %% Session-Timeout @@ -590,6 +592,25 @@ send_unexpected_mandatory_decode(Config) -> data = <<12:32>>}] = As. +%% Send an containing a faulty Grouped AVP (empty Proxy-Host in +%% Proxy-Info) and expect that only the faulty AVP is sent in +%% Failed-AVP. The encoded values of Proxy-Host and Proxy-State are +%% swapped in prepare_request since an empty Proxy-Host is an encode +%% error. +send_grouped_error(Config) -> + Req = ['ASR', {'Proxy-Info', [[{'Proxy-Host', "abcd"}, + {'Proxy-State', ""}]]}], + ['ASA', {'Session-Id', _}, {'Result-Code', ?INVALID_AVP_LENGTH} | Avps] + = call(Config, Req), + [#'diameter_base_Failed-AVP'{'AVP' = As}] + = proplists:get_value('Failed-AVP', Avps), + [#diameter_avp{name = 'Proxy-Info', + value = #'diameter_base_Proxy-Info' + {'Proxy-Host' = Empty, + 'Proxy-State' = undefined}}] + = As, + <<0>> = iolist_to_binary(Empty). + %% Send an STR that the server ignores. send_noreply(Config) -> Req = ['STR', {'Termination-Cause', ?BAD_ANSWER}], @@ -1070,6 +1091,38 @@ prepare(Pkt, Caps, send_unexpected_mandatory, #group{client_dict0 = Dict0} Avp = <<Code:32, Flags, 8:24>>, E#diameter_packet{bin = <<V, (Len+8):24, T/binary, Avp/binary>>}; +prepare(Pkt, Caps, send_grouped_error, #group{client_dict0 = Dict0} + = Group) -> + Req = prepare(Pkt, Caps, Group), + #diameter_packet{bin = Bin} + = E + = diameter_codec:encode(Dict0, Pkt#diameter_packet{msg = Req}), + {Code, Flags, undefined} = Dict0:avp_header('Proxy-Info'), + %% Find Proxy-Info by looking for its header. + Pattern = <<Code:32, Flags, 28:24>>, + {Offset, 8} = binary:match(Bin, Pattern), + + %% Extract and swap Proxy-Host/State payloads. + + <<H:Offset/binary, + PI:8/binary, + PH:5/binary, + 12:24, + Payload:4/binary, + PS:5/binary, + 8:24, + T/binary>> + = Bin, + + E#diameter_packet{bin = <<H/binary, + PI/binary, + PH/binary, + 8:24, + PS:5/binary, + 12:24, + Payload/binary, + T/binary>>}; + prepare(Pkt, Caps, send_unsupported, #group{client_dict0 = Dict0} = Group) -> Req = prepare(Pkt, Caps, Group), #diameter_packet{bin = <<H:5/binary, _CmdCode:3/binary, T/binary>>} @@ -1176,7 +1229,7 @@ answer(Pkt, Req, _Peer, Name, #group{client_dict0 = Dict0}) -> [R | Vs] = Dict:'#get-'(answer(Ans, Es, Name)), [Dict:rec2msg(R) | Vs]. -%% Missing Result-Codec and inapproriate Experimental-Result-Code. +%% Missing Result-Code and inappropriate Experimental-Result-Code. answer(Rec, Es, send_experimental_result) -> [{5004, #diameter_avp{name = 'Experimental-Result'}}, {5005, #diameter_avp{name = 'Result-Code'}}] diff --git a/lib/diameter/test/diameter_util.erl b/lib/diameter/test/diameter_util.erl index 0df216b9d0..c727d10ddf 100644 --- a/lib/diameter/test/diameter_util.erl +++ b/lib/diameter/test/diameter_util.erl @@ -381,12 +381,38 @@ tmod(any) -> [diameter_sctp, diameter_tcp]. opts(Prot, T) -> - [{transport_module, M} || M <- tmod(Prot)] - ++ [{transport_config, [{ip, ?ADDR}, {port, 0} | opts(T)]}]. - -opts(listen) -> + tmo(T, lists:append([[{transport_module, M}, {transport_config, C}] + || M <- tmod(Prot), + C <- [cfg(M,T) ++ cfg(M) ++ cfg(T)]])). + +tmo(listen, Opts) -> + Opts; +tmo(_, Opts) -> + tmo(Opts). + +%% Timeout on all but the last alternative. +tmo([_,_] = Opts) -> + Opts; +tmo([M, C | Opts]) -> + {transport_config = K, Cfg} = C, + [M, {K, Cfg, 5000} | tmo(Opts)]. + +%% Listening SCTP socket need larger-than-default buffers to avoid +%% resends on some platforms (eg. SLES 11). +cfg(diameter_sctp, listen) -> + [{recbuf, 1 bsl 16}, {sndbuf, 1 bsl 16}]; + +cfg(_, _) -> + []. + +cfg(M) + when M == diameter_tcp; + M == diameter_sctp -> + [{ip, ?ADDR}, {port, 0}]; + +cfg(listen) -> [{accept, M} || M <- [{256,0,0,1}, ["256.0.0.1", ["^.+$"]]]]; -opts(PortNr) -> +cfg(PortNr) -> [{raddr, ?ADDR}, {rport, PortNr}]. %% --------------------------------------------------------------------------- diff --git a/lib/diameter/vsn.mk b/lib/diameter/vsn.mk index 0ddeea9463..1e3135680d 100644 --- a/lib/diameter/vsn.mk +++ b/lib/diameter/vsn.mk @@ -17,5 +17,5 @@ # %CopyrightEnd% APPLICATION = diameter -DIAMETER_VSN = 1.9.2 +DIAMETER_VSN = 1.10 APP_VSN = $(APPLICATION)-$(DIAMETER_VSN)$(PRE_VSN) |