diff options
Diffstat (limited to 'lib/diameter')
19 files changed, 359 insertions, 296 deletions
diff --git a/lib/diameter/doc/src/diameter_sctp.xml b/lib/diameter/doc/src/diameter_sctp.xml index 9b6d629f79..c9b74a9ec5 100644 --- a/lib/diameter/doc/src/diameter_sctp.xml +++ b/lib/diameter/doc/src/diameter_sctp.xml @@ -16,7 +16,7 @@ <header> <copyright> <year>2011</year> -<year>2016</year> +<year>2017</year> <holder>Ericsson AB. All Rights Reserved.</holder> </copyright> <legalnotice> @@ -116,7 +116,6 @@ and port respectively.</p> Multiple <c>ip</c> options can be specified for a multihomed peer. If none are specified then the values of <c>Host-IP-Address</c> in the <c>diameter_service</c> record are used. -(In particular, one of these must be specified.) Option <c>port</c> defaults to 3868 for a listening transport and 0 for a connecting transport.</p> diff --git a/lib/diameter/doc/src/diameter_tcp.xml b/lib/diameter/doc/src/diameter_tcp.xml index 6ca280c52b..1d65d14257 100644 --- a/lib/diameter/doc/src/diameter_tcp.xml +++ b/lib/diameter/doc/src/diameter_tcp.xml @@ -170,14 +170,11 @@ that will not be forthcoming, which will eventually cause the RFC 3539 watchdog to take down the connection.</p> <p> -If an <c>ip</c> option is not specified then the first element of a -non-empty <c>Host-IP-Address</c> list in <c>Svc</c> provides the local -IP address. -If neither is specified then the default address selected by &gen_tcp; -is used. -In all cases, the selected address is either returned from -&start; or passed in a <c>connected</c> message over the transport -interface.</p> +The first element of a non-empty <c>Host-IP-Address</c> list in +<c>Svc</c> provides the local IP address if an <c>ip</c> option is not +specified. +The local address is either returned from&start; or passed in a +<c>connected</c> message over the transport interface.</p> </desc> </func> diff --git a/lib/diameter/examples/code/client.erl b/lib/diameter/examples/code/client.erl index 6fb90b1c09..0864919cdd 100644 --- a/lib/diameter/examples/code/client.erl +++ b/lib/diameter/examples/code/client.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -39,7 +39,6 @@ -module(client). -include_lib("diameter/include/diameter.hrl"). --include_lib("diameter/include/diameter_gen_base_rfc6733.hrl"). -export([start/1, %% start a service start/2, %% @@ -71,6 +70,7 @@ {'Product-Name', "Client"}, {'Auth-Application-Id', [0]}, {string_decode, false}, + {decode_format, map}, {application, [{alias, common}, {dictionary, diameter_gen_base_rfc6733}, {module, client_cb}]}]). @@ -108,9 +108,9 @@ connect(T) -> call(Name) -> SId = diameter:session_id(?L(Name)), - RAR = #diameter_base_RAR{'Session-Id' = SId, - 'Auth-Application-Id' = 0, - 'Re-Auth-Request-Type' = 0}, + RAR = ['RAR' | #{'Session-Id' => SId, + 'Auth-Application-Id' => 0, + 'Re-Auth-Request-Type' => 0}], diameter:call(Name, common, RAR, []). call() -> diff --git a/lib/diameter/examples/code/client_cb.erl b/lib/diameter/examples/code/client_cb.erl index ed1d3b9b7b..af2d4d6da7 100644 --- a/lib/diameter/examples/code/client_cb.erl +++ b/lib/diameter/examples/code/client_cb.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -55,21 +55,18 @@ prepare_request(#diameter_packet{msg = ['RAR' = T | Avps]}, _, {_, Caps}) -> origin_realm = {OR, DR}} = Caps, - {send, [T, {'Origin-Host', OH}, - {'Origin-Realm', OR}, - {'Destination-Host', DH}, - {'Destination-Realm', DR} - | Avps]}; - -prepare_request(#diameter_packet{msg = Rec}, _, {_, Caps}) -> - #diameter_caps{origin_host = {OH, DH}, - origin_realm = {OR, DR}} - = Caps, - - {send, Rec#diameter_base_RAR{'Origin-Host' = OH, - 'Origin-Realm' = OR, - 'Destination-Host' = DH, - 'Destination-Realm' = DR}}. + {send, [T | if is_map(Avps) -> + Avps#{'Origin-Host' => OH, + 'Origin-Realm' => OR, + 'Destination-Host' => DH, + 'Destination-Realm' => DR}; + is_list(Avps) -> + [{'Origin-Host', OH}, + {'Origin-Realm', OR}, + {'Destination-Host', DH}, + {'Destination-Realm', DR} + | Avps] + end]}. %% prepare_retransmit/3 diff --git a/lib/diameter/examples/code/node.erl b/lib/diameter/examples/code/node.erl index 246be4194b..fc5830f8e2 100644 --- a/lib/diameter/examples/code/node.erl +++ b/lib/diameter/examples/code/node.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -30,6 +30,8 @@ connect/2, stop/1]). +-export([message/3]). + -type protocol() :: tcp | sctp. @@ -128,6 +130,8 @@ stop(Name) -> server_opts({T, Addr, Port}) -> [{transport_module, tmod(T)}, {transport_config, [{reuseaddr, true}, + {sender, true}, + {message_cb, [fun ?MODULE:message/3, 0]}, {ip, addr(Addr)}, {port, Port}]}]; @@ -173,3 +177,26 @@ addr(loopback) -> {127,0,0,1}; addr(A) -> A. + +%% --------------------------------------------------------------------------- + +%% message/3 +%% +%% Simple message callback that limits the number of concurrent +%% requests on the peer connection in question. + +%% Incoming request. +message(recv, <<_:32, 1:1, _/bits>> = Bin, N) -> + [Bin, N < 32, fun ?MODULE:message/3, N+1]; + +%% Outgoing request. +message(ack, <<_:32, 1:1, _/bits>>, _) -> + []; + +%% Incoming answer or request discarded. +message(ack, _, N) -> + [N =< 32, fun ?MODULE:message/3, N-1]; + +%% Outgoing message or incoming answer. +message(_, Bin, _) -> + [Bin]. diff --git a/lib/diameter/src/base/diameter_lib.erl b/lib/diameter/src/base/diameter_lib.erl index 8792e97621..1c1ea42cb5 100644 --- a/lib/diameter/src/base/diameter_lib.erl +++ b/lib/diameter/src/base/diameter_lib.erl @@ -283,7 +283,7 @@ ip(T) %% Or not: convert from '.'/':'-separated decimal/hex. ip(Addr) -> - {ok, A} = inet_parse:address(Addr), %% documented in inet(3) + {ok, A} = inet:parse_address(Addr), A. %% --------------------------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index 2759f17e64..4cb5a57a54 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -202,10 +202,10 @@ match1(Addr, Match) -> match(Addr, {ok, A}, _) -> Addr == A; match(Addr, {error, _}, RE) -> - match == re:run(inet_parse:ntoa(Addr), RE, [{capture, none}]). + match == re:run(inet:ntoa(Addr), RE, [{capture, none}, caseless]). addr([_|_] = A) -> - inet_parse:address(A); + inet:parse_address(A); addr(A) -> {ok, A}. diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 18904b8a55..d99f11a697 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -544,11 +544,11 @@ put_route(Pid) -> MRef = monitor(process, Pid), put(Pid, MRef). -%% get_route/2 +%% get_route/3 -%% incoming answer -get_route(_, #diameter_packet{header = #diameter_header{is_request = false}} - = Pkt) -> +%% Incoming answer. +get_route(_, _, #diameter_packet{header = #diameter_header{is_request = false}} + = Pkt) -> Seqs = diameter_codec:sequence_numbers(Pkt), case erase(Seqs) of {Pid, Ref, MRef} -> @@ -559,8 +559,14 @@ get_route(_, #diameter_packet{header = #diameter_header{is_request = false}} false end; -%% incoming request -get_route(Ack, _) -> +%% Requests answered here ... +get_route(_, N, _) + when N == 'CER'; + N == 'DPR' -> + false; + +%% ... or not. +get_route(Ack, _, _) -> Ack. %% erase_route/1 @@ -745,7 +751,7 @@ recv1('DPA' = Name, %% Any other message with a header and no length errors. recv1(Name, H, Msg, #state{parent = Pid, ack = Ack} = S) -> Pkt = pkt(H, Msg), - Pid ! {recv, self(), get_route(Ack, Pkt), Name, Pkt}, + Pid ! {recv, self(), get_route(Ack, Name, Pkt), Name, Pkt}, handle(Name, Pkt, S). %% pkt/2 diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 97e74657bd..bd5db54a5c 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -19,10 +19,11 @@ %% %% -%% The module implements a simple term -> pid registry. +%% A simple term -> pid registry. %% -module(diameter_reg). + -behaviour(gen_server). -export([add/1, @@ -57,18 +58,18 @@ -type key() :: term(). -type from() :: {pid(), term()}. +-type rcvr() :: [pid() | term()] %% subscribe + | from(). %% wait -type pattern() :: term(). -record(state, {id = diameter_lib:now(), - receivers = dict:new() - :: dict:dict(pattern(), [[pid() | term()]%% subscribe - | from()]), %% wait + notify = #{} :: #{pattern() => [rcvr()]}, monitors = sets:new() :: sets:set(pid())}). %% The ?TABLE bag contains the Key -> Pid mapping, as {Key, Pid} %% tuples. Each pid is stored in the monitors set to ensure only one %% monitor for each pid: more are harmless, but unnecessary. A pattern -%% is added to receivers a result of calls to wait/1 or subscribe/2: +%% is added to notify a result of calls to wait/1 or subscribe/2: %% changes to ?TABLE causes processes to be notified as required. %% =========================================================================== @@ -156,7 +157,7 @@ wait(Pat) -> %% # subscribe(Pat, T) %% %% Like match/1, but additionally receive messages of the form -%% {T, add|remove, {term(), pid()} when associations are added +%% {T, add|remove, {term(), pid()}} when associations are added %% or removed. %% =========================================================================== @@ -186,15 +187,12 @@ uptime() -> -> [{pid(), [key()]}]. pids() -> - to_list(fun swap/1). - -to_list(Fun) -> - ets:foldl(fun(T,D) -> append(Fun(T), D) end, orddict:new(), ?TABLE). + append(ets:select(?TABLE, [{{'$1','$2'}, [], [{{'$2', '$1'}}]}])). -append({K,V}, Dict) -> - orddict:append(K, V, Dict). - -id(T) -> T. +append(Pairs) -> + dict:to_list(lists:foldl(fun({K,V}, D) -> dict:append(K, V, D) end, + dict:new(), + Pairs)). %% terms/0 @@ -202,9 +200,7 @@ id(T) -> T. -> [{key(), [pid()]}]. terms() -> - to_list(fun id/1). - -swap({X,Y}) -> {Y,X}. + append(ets:tab2list(?TABLE)). %% subs/0 @@ -212,31 +208,19 @@ swap({X,Y}) -> {Y,X}. -> [{pattern(), [{pid(), term()}]}]. subs() -> - #state{receivers = RD} = state(), - dict:fold(fun sub/3, orddict:new(), RD). - -sub(Pat, Ps, Dict) -> - lists:foldl(fun([P|T], D) -> orddict:append(Pat, {P,T}, D); - (_, D) -> D - end, - Dict, - Ps). + #state{notify = Dict} = state(), + [{K, Ts} || {K,Ps} <- maps:to_list(Dict), + Ts <- [[{P,T} || [P|T] <- Ps]]]. %% waits/0 -spec waits() - -> [{pattern(), [{from(), term()}]}]. + -> [{pattern(), [from()]}]. waits() -> - #state{receivers = RD} = state(), - dict:fold(fun wait/3, orddict:new(), RD). - -wait(Pat, Ps, Dict) -> - lists:foldl(fun({_,_} = F, D) -> orddict:append(Pat, F, D); - (_, D) -> D - end, - Dict, - Ps). + #state{notify = Dict} = state(), + [{K, Ts} || {K,Ps} <- maps:to_list(Dict), + Ts <- [[T || {_,_} = T <- Ps]]]. %% ---------------------------------------------------------- %% # init/1 @@ -250,33 +234,28 @@ init(_) -> %% # handle_call/3 %% ---------------------------------------------------------- -handle_call({add, Uniq, Key}, {Pid, _}, S0) -> +handle_call({add, Uniq, Key}, {Pid, _}, S) -> Rec = {Key, Pid}, - S1 = flush(Uniq, Rec, S0), + NS = flush(Uniq, Rec, S), %% before insert {Res, New} = insert(Uniq, Rec), - {Recvs, S} = add(New, Rec, S1), - notify(Recvs, Rec), - {reply, Res, S}; + {reply, Res, notify(add, New andalso Rec, NS)}; handle_call({remove, Key}, {Pid, _}, S) -> Rec = {Key, Pid}, - Recvs = delete([Rec], S), ets:delete_object(?TABLE, Rec), - notify(Recvs, remove), - {reply, true, S}; + {reply, true, notify(remove, Rec, S)}; -handle_call({wait, Pat}, {Pid, _} = From, #state{receivers = RD} = S) -> +handle_call({wait, Pat}, {Pid, _} = From, S) -> NS = add_monitor(Pid, S), case match(Pat) of - [_|_] = L -> - {reply, L, NS}; + [_|_] = Recs -> + {reply, Recs, NS}; [] -> - {noreply, NS#state{receivers = dict:append(Pat, From, RD)}} + {noreply, queue(Pat, From, NS)} end; -handle_call({subscribe, Pat, T}, {Pid, _}, #state{receivers = RD} = S) -> - NS = add_monitor(Pid, S), - {reply, match(Pat), NS#state{receivers = dict:append(Pat, [Pid | T], RD)}}; +handle_call({subscribe, Pat, T}, {Pid, _}, S) -> + {reply, match(Pat), queue(Pat, [Pid | T], add_monitor(Pid, S))}; handle_call(state, _, S) -> {reply, S, S}; @@ -332,106 +311,60 @@ insert(true, Rec) -> B = ets:insert_new(?TABLE, Rec), %% entry inserted? {B, B}. -%% add/3 - +%% add_monitor/2 +%% %% Only add a single monitor for any given process, since there's no %% use to more. -add(true, {_Key, Pid} = Rec, S) -> - NS = add_monitor(Pid, S), - {Recvs, RD} = add(Rec, NS), - {Recvs, S#state{receivers = RD}}; - -add(false = No, _, S) -> - {No, S}. - -%% add/2 - -%% Notify processes whose patterns match the inserted key. -add({_Key, Pid} = Rec, #state{receivers = RD}) -> - dict:fold(fun(Pt, Ps, A) -> - add(lists:member(Rec, match(Pt, Pid)), Pt, Ps, Rec, A) - end, - {sets:new(), RD}, - RD). - -%% add/5 - -add(true, Pat, Recvs, {_,_} = Rec, {Set, Dict}) -> - {lists:foldl(fun sets:add_element/2, Set, Recvs), - remove(fun erlang:is_list/1, Pat, Recvs, Dict)}; -add(false, _, _, _, Acc) -> - Acc. +add_monitor(Pid, #state{monitors = Ps} = S) -> + case sets:is_element(Pid, Ps) of + false -> + monitor(process, Pid), + S#state{monitors = sets:add_element(Pid, Ps)}; + true -> + S + end. -%% add_monitor/2 - -add_monitor(Pid, #state{monitors = MS} = S) -> - add_monitor(sets:is_element(Pid, MS), Pid, S). - -%% add_monitor/3 - -add_monitor(false, Pid, #state{monitors = MS} = S) -> - monitor(process, Pid), - S#state{monitors = sets:add_element(Pid, MS)}; - -add_monitor(true, _, S) -> - S. - -%% delete/2 - -delete(Recs, #state{receivers = RD}) -> - lists:foldl(fun(R,S) -> delete(R, RD, S) end, sets:new(), Recs). +%% notify/3 -%% delete/3 +notify(_, false, S) -> + S; -delete({_Key, Pid} = Rec, RD, Set) -> - dict:fold(fun(Pt, Ps, S) -> - delete(lists:member(Rec, match(Pt, Pid)), Rec, Ps, S) - end, - Set, - RD). +notify(Op, {_,_} = Rec, #state{notify = Dict} = S) -> + S#state{notify = maps:fold(fun(P,Rs,D) -> notify(Op, Rec, P, Rs, D) end, + Dict, + Dict)}. -%% delete/4 +%% notify/5 -%% Entry matches a pattern ... -delete(true, Rec, Recvs, Set) -> - lists:foldl(fun(R,S) -> sets:add_element({R, Rec}, S) end, - Set, - Recvs); - -%% ... or not. -delete(false, _, _, Set) -> - Set. - -%% notify/2 - -notify(false = No, _) -> - No; - -notify(Recvs, remove = Op) -> - sets:fold(fun({P,R}, N) -> send(P, R, Op), N+1 end, 0, Recvs); - -notify(Recvs, {_,_} = Rec) -> - sets:fold(fun(P,N) -> send(P, Rec, add), N+1 end, 0, Recvs). +notify(Op, {_, Pid} = Rec, Pat, Rcvrs, Dict) -> + case lists:member(Rec, match(Pat, Pid)) of + true -> + reset(Pat, Dict, [P || P <- Rcvrs, send(P, Op, Rec)]); + false -> + Dict + end. %% send/3 -%% No processes waiting on remove, by construction: they've either -%% received notification at add or aren't waiting. -send([Pid | T], Rec, Op) -> - Pid ! {T, Op, Rec}; +send([Pid | T], Op, Rec) -> + Pid ! {T, Op, Rec}, + true; -send({_,_} = From, Rec, add) -> - gen_server:reply(From, [Rec]). +%% No processes wait on remove: they receive notification immediately +%% or at add, by construction. +send({_,_} = From, add, Rec) -> + gen_server:reply(From, [Rec]), + false. %% down/2 -down(Pid, #state{monitors = MS} = S) -> - NS = flush(Pid, S), - Recvs = delete(match('_', Pid), NS), +down(Pid, #state{monitors = Ps} = S) -> + Recs = match('_', Pid), ets:match_delete(?TABLE, {'_', Pid}), - notify(Recvs, remove), - NS#state{monitors = sets:del_element(Pid, MS)}. + lists:foldl(fun(R,NS) -> notify(remove, R, NS) end, + flush(Pid, S#state{monitors = sets:del_element(Pid, Ps)}), + Recs). %% flush/3 @@ -452,16 +385,15 @@ flush(false, _, S) -> %% flush/2 %% Process has died and should no longer receive messages/replies. -flush(Pid, #state{receivers = RD} = S) - when is_pid(Pid) -> - S#state{receivers = dict:fold(fun(Pt,Ps,D) -> flush(Pid, Pt, Ps, D) end, - RD, - RD)}. +flush(Pid, #state{notify = Dict} = S) -> + S#state{notify = maps:fold(fun(P,Rs,D) -> flush(Pid, P, Rs, D) end, + Dict, + Dict)}. %% flush/4 -flush(Pid, Pat, Recvs, Dict) -> - remove(fun(T) -> Pid /= head(T) end, Pat, Recvs, Dict). +flush(Pid, Pat, Rcvrs, Dict) -> + reset(Pat, Dict, [T || T <- Rcvrs, Pid /= head(T)]). %% head/1 @@ -471,15 +403,18 @@ head([P|_]) -> head({P,_}) -> P. -%% remove/4 +%% reset/3 + +reset(Key, Map, []) -> + maps:remove(Key, Map); + +reset(Key, Map, List) -> + maps:put(Key, List, Map). + +%% queue/3 -remove(Pred, Key, Values, Dict) -> - case lists:filter(Pred, Values) of - [] -> - dict:erase(Key, Dict); - Rest -> - dict:store(Key, Rest, Dict) - end. +queue(Pat, Rcvr, #state{notify = Dict} = S) -> + S#state{notify = maps:put(Pat, [Rcvr | maps:get(Pat, Dict, [])], Dict)}. %% call/1 diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 7cd4d23402..31dd92f878 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -530,6 +530,13 @@ transition({tc_timeout, T}, S) -> tc_timeout(T, S), ok; +transition({nodeup, Node, _}, S) -> + nodeup(Node, S), + ok; + +transition({nodedown, _Node, _}, _) -> + ok; + transition(Req, S) -> unexpected(handle_info, [Req], S), ok. @@ -749,6 +756,8 @@ mref(P) -> init_shared(#state{options = #{use_shared_peers := T}, service_name = Svc}) -> + T == false orelse net_kernel:monitor_nodes(true, [{node_type, visible}, + nodedown_reason]), notify(T, Svc, {service, self()}). init_mod(#diameter_app{alias = Alias, @@ -764,6 +773,11 @@ notify(Share, SvcName, T) -> %% Test for the empty list for upgrade reasons: there's no %% diameter_peer:notify/3 in old code. +nodeup(Node, #state{options = #{share_peers := SP}, + service_name = SvcName}) -> + lists:member(Node, remotes(SP)) + andalso diameter_peer:notify([Node], SvcName, {service, self()}). + remotes(false) -> []; @@ -1440,9 +1454,15 @@ is_remote(Pid, T) -> %% # remote_peer_up/4 %% --------------------------------------------------------------------------- -remote_peer_up(TPid, Aliases, Caps, #state{options = #{use_shared_peers := T}} +remote_peer_up(TPid, Aliases, Caps, #state{options = #{use_shared_peers := T}, + remote = {PeerT, _, _}} = S) -> - is_remote(TPid, T) andalso rpu(TPid, Aliases, Caps, S). + is_remote(TPid, T) + andalso not ets:member(PeerT, TPid) + andalso rpu(TPid, Aliases, Caps, S). + +%% Notification can be duplicate since remote nodes push and the local +%% node pulls. rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) -> #diameter_service{applications = Apps} = Svc, @@ -1452,6 +1472,7 @@ rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) -> rpu(_, [] = No, _, _) -> No; + rpu(TPid, Aliases, Caps, {PeerT, _, _} = RT) -> monitor(process, TPid), ets:insert(PeerT, #peer{pid = TPid, diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index a10bf78d6e..f510f40a17 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -93,6 +93,7 @@ caller :: pid() | undefined, %% calling process handler :: pid(), %% request process peer :: undefined | {pid(), #diameter_caps{}}, + caps :: undefined, %% no longer used packet :: #diameter_packet{} | undefined}). %% of request %% --------------------------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index c08e2da672..43623334a9 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -74,7 +74,6 @@ okay := non_neg_integer()}, %% REOPEN -> OKAY codec :: #{decode_format := none, string_decode := false, - strict_arities => diameter:strict_arities(), strict_mbit := boolean(), rfc := 3588 | 6733, ordered_encode := false}, @@ -152,7 +151,6 @@ i({Ack, T, Pid, {Opts, okay => 3}, Opts)), codec = maps:with([decode_format, - strict_arities, strict_mbit, string_decode, rfc, diff --git a/lib/diameter/src/compiler/diameter_exprecs.erl b/lib/diameter/src/compiler/diameter_exprecs.erl index 9a0cb6baf2..143dede037 100644 --- a/lib/diameter/src/compiler/diameter_exprecs.erl +++ b/lib/diameter/src/compiler/diameter_exprecs.erl @@ -110,9 +110,9 @@ %% parse_transform/2 parse_transform(Forms, _Options) -> - Rs = [R || {attribute, _, record, R} <- Forms], - Es = lists:append([E || {attribute, _, export_records, E} <- Forms]), {H,T} = lists:splitwith(fun is_head/1, Forms), + Rs = [R || {attribute, _, record, R} <- H], + Es = lists:append([E || {attribute, _, export_records, E} <- H]), H ++ [a_export(Es) | f_accessors(Es, Rs)] ++ T. is_head(T) -> diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 4f56475529..4eb3379d59 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -102,6 +102,7 @@ streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, os = 0 :: uint(), %% next output stream + rotate = 1 :: boolean() | 0 | 1, %% rotate os? packet = true :: boolean() %% legacy transport_data? | raw, message_cb = false :: false | diameter:eval(), @@ -112,7 +113,7 @@ {transport :: pid(), ack = false :: boolean(), socket :: gen_sctp:sctp_socket(), - assoc_id :: gen_sctp:assoc_id()}). %% next output stream + assoc_id :: gen_sctp:assoc_id()}). %% Listener process state. -record(listener, @@ -156,12 +157,7 @@ start(T, Svc, Opts) = Svc, diameter_sctp_sup:start(), %% start supervisors on demand Addrs = Caps#diameter_caps.host_ip_address, - s(T, Addrs, Pid, lists:map(fun ip/1, Opts)). - -ip({ifaddr, A}) -> - {ip, A}; -ip(T) -> - T. + s(T, Addrs, Pid, Opts). %% A listener spawns transports either as a consequence of this call %% when there is not yet an association to assign it, or at comm_up on @@ -354,23 +350,35 @@ l([], Ref, T) -> %% open/3 open(Addrs, Opts, PortNr) -> - {LAs, Os} = addrs(Addrs, Opts), - {LAs, case gen_sctp:open(gen_opts(portnr(Os, PortNr))) of - {ok, Sock} -> - Sock; - {error, Reason} -> - x({open, Reason}) - end}. + case gen_sctp:open(gen_opts(portnr(addrs(Addrs, Opts), PortNr))) of + {ok, Sock} -> + {addrs(Sock), Sock}; + {error, Reason} -> + x({open, Reason}) + end. addrs(Addrs, Opts) -> - case proplists:split(Opts, [ip]) of - {[[]], _} -> - {Addrs, Opts ++ [{ip, A} || A <- Addrs]}; - {[As], Os} -> - LAs = [diameter_lib:ipaddr(A) || {ip, A} <- As], - {LAs, Os ++ [{ip, A} || A <- LAs]} + case lists:mapfoldl(fun ipaddr/2, false, Opts) of + {Os, true} -> + Os; + {_, false} -> + Opts ++ [{ip, A} || A <- Addrs] end. +ipaddr({K,A}, _) + when K == ifaddr; + K == ip -> + {{ip, ipaddr(A)}, true}; +ipaddr(T, B) -> + {T, B}. + +ipaddr(A) + when A == loopback; + A == any -> + A; +ipaddr(A) -> + diameter_lib:ipaddr(A). + portnr(Opts, PortNr) -> case proplists:get_value(port, Opts) of undefined -> @@ -379,6 +387,14 @@ portnr(Opts, PortNr) -> Opts end. +addrs(Sock) -> + case inet:socknames(Sock) of + {ok, As} -> + [A || {A,_} <- As]; + {error, Reason} -> + x({socknames, Reason}) + end. + %% x/1 x(Reason) -> @@ -565,7 +581,7 @@ transition(Msg, S) %% Deferred actions from a message_cb. transition({actions, Dir, Acts}, S) -> - actions(Acts, Dir, S); + setopts(ok, actions(Acts, Dir, S)); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> @@ -677,11 +693,16 @@ send(#diameter_packet{transport_data = {outstream, SId}} = S) -> send(SId rem OS, Msg, S); -%% ... or not: rotate through all streams. -send(Msg, #transport{streams = {_, OS}, +%% ... or not: rotate when sending on multiple streams ... +send(Msg, #transport{rotate = true, + streams = {_, OS}, os = N} = S) -> - send(N, Msg, S#transport{os = (N + 1) rem OS}). + send(N, Msg, S#transport{os = (N + 1) rem OS}); + +%% ... or send on the only stream available. +send(Msg, S) -> + send(0, Msg, S). %% send/3 @@ -749,7 +770,7 @@ recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} %% Inbound Diameter message. recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S) when is_binary(Bin) -> - message(recv, Msg, S); + message(recv, Msg, recv(S)); recv({_, #sctp_shutdown_event{}}, _) -> stop; @@ -769,6 +790,25 @@ recv({_, #sctp_paddr_change{}}, _) -> recv({_, #sctp_pdapi_event{}}, _) -> ok. +%% recv/1 +%% +%% Start sending unordered after the second reception, so that an +%% outgoing CER/CEA will arrive at the peer before another request. + +recv(#transport{rotate = B} = S) + when is_boolean(B) -> + S; + +recv(#transport{rotate = 0, streams = {_,N}, socket = Sock} = S) -> + ok = inet:setopts(Sock, [{sctp_default_send_param, + #sctp_sndrcvinfo{flags = [unordered]}}]), + S#transport{rotate = 1 < N}; + +recv(#transport{rotate = N} = S) -> + S#transport{rotate = N-1}. + +%% publish/4 + publish(T, Ref, Id, Sock) -> true = diameter_reg:add_new({?MODULE, T, {Ref, {Id, Sock}}}), putr(?INFO_KEY, {gen_sctp, Sock}). %% for info/1 diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index ac55d722fa..a8639baa11 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -142,8 +142,7 @@ -> {ok, pid(), [inet:ip_address()]} when Ref :: diameter:transport_ref(); ({connect, Ref}, #diameter_service{}, [connect_option()]) - -> {ok, pid(), [inet:ip_address()]} - | {ok, pid()} + -> {ok, pid()} when Ref :: diameter:transport_ref(). start({T, Ref}, Svc, Opts) -> @@ -258,22 +257,14 @@ i(#monitor{parent = Pid, transport = TPid} = S) -> i({listen, Ref, {Mod, Opts, Addrs}}) -> [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {[LA, LP], Rest} = proplists:split(Opts, [ip, port]), - LAddrOpt = get_addr(LA, Addrs), - LPort = get_port(LP), - {ok, LSock} = Mod:listen(LPort, gen_opts(LAddrOpt, Rest)), - LAddr = laddr(LAddrOpt, Mod, LSock), + {[LP], Rest} = proplists:split(Opts, [port]), + {ok, LSock} = Mod:listen(get_port(LP), gen_opts(Addrs, Rest)), + {ok, {LAddr, _}} = sockname(Mod, LSock), true = diameter_reg:add_new({?MODULE, listener, {Ref, {LAddr, LSock}}}), proc_lib:init_ack({ok, self(), {LAddr, LSock}}), #listener{socket = LSock, module = Mod}. -laddr([], Mod, Sock) -> - {ok, {Addr, _Port}} = sockname(Mod, Sock), - Addr; -laddr([{ip, Addr}], _, _) -> - Addr. - ssl_opts([]) -> false; ssl_opts([{ssl_options, true}]) -> @@ -308,24 +299,16 @@ init(accept = T, Ref, Mod, Pid, Opts, Addrs, SvcPid) -> Sock; init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SvcPid) -> - {[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]), - LAddrOpt = get_addr(LA, Addrs), + {[RA, RP], Rest} = proplists:split(Opts, [raddr, rport]), RAddr = get_addr(RA), RPort = get_port(RP), - proc_lib:init_ack(init_rc(LAddrOpt)), - Sock = ok(connect(Mod, RAddr, RPort, gen_opts(LAddrOpt, Rest))), + proc_lib:init_ack({ok, self()}), + Sock = ok(connect(Mod, RAddr, RPort, gen_opts(Addrs, Rest))), publish(Mod, T, Ref, Sock), - up(Pid, {RAddr, RPort}, LAddrOpt, Mod, Sock), + up(Pid, {RAddr, RPort}, Mod, Sock), Sock. -init_rc([{ip, Addr}]) -> - {ok, self(), [Addr]}; -init_rc([]) -> - {ok, self()}. - -up(Pid, Remote, [{ip, _Addr}], _, _) -> - diameter_peer:up(Pid, Remote); -up(Pid, Remote, [], Mod, Sock) -> +up(Pid, Remote, Mod, Sock) -> {Addr, _Port} = ok(sockname(Mod, Sock)), diameter_peer:up(Pid, Remote, [Addr]). @@ -382,25 +365,41 @@ l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> l([], Ref, T) -> diameter_tcp_sup:start_child({listen, Ref, T}). -%% get_addr/1 +%% addrs/2 +%% +%% Take the first address from the service if several are specified +%% and not address is configured. + +addrs(Addrs, Opts) -> + case lists:mapfoldr(fun ipaddr/2, [], Opts) of + {Os, [_]} -> + Os; + {_, []} -> + Opts ++ [{ip, A} || [A|_] <- [Addrs]]; + {_, As} -> + ?ERROR({invalid_addrs, As, Addrs}) + end. -get_addr(As) -> - diameter_lib:ipaddr(addr(As, [])). +ipaddr({K,A}, As) + when K == ifaddr; + K == ip -> + {{ip, ipaddr(A)}, [A | As]}; +ipaddr(T, B) -> + {T, B}. -%% get_addr/2 +ipaddr(A) + when A == loopback; + A == any -> + A; +ipaddr(A) -> + diameter_lib:ipaddr(A). -get_addr([], []) -> - []; -get_addr(As, Def) -> - [{ip, diameter_lib:ipaddr(addr(As, Def))}]. +%% get_addr/1 -%% Take the first address from the service if several are unspecified. -addr([], [Addr | _]) -> - Addr; -addr([{_, Addr}], _) -> - Addr; -addr(As, Addrs) -> - ?ERROR({invalid_addrs, As, Addrs}). +get_addr([{_, Addr}]) -> + diameter_lib:ipaddr(Addr); +get_addr(Addrs) -> + ?ERROR({invalid_addrs, Addrs}). %% get_port/1 @@ -413,10 +412,15 @@ get_port(Ps) -> %% gen_opts/2 -gen_opts(LAddrOpt, Opts) -> +gen_opts(Addrs, Opts) -> + gen_opts(addrs(Addrs, Opts)). + +%% gen_opts/1 + +gen_opts(Opts) -> {L,_} = proplists:split(Opts, [binary, packet, active]), [[],[],[]] == L orelse ?ERROR({reserved_options, Opts}), - [binary, {packet, 0}, {active, false}] ++ LAddrOpt ++ Opts. + [binary, {packet, 0}, {active, false} | Opts]. %% --------------------------------------------------------------------------- %% # ports/1 @@ -640,7 +644,7 @@ transition(Msg, S) %% Deferred actions from a message_cb. transition({actions, Dir, Acts}, S) -> - actions(Acts, Dir, S); + setopts(actions(Acts, Dir, S)); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, diff --git a/lib/diameter/test/diameter_examples_SUITE.erl b/lib/diameter/test/diameter_examples_SUITE.erl index eb99f10fe6..ee44ed8dc9 100644 --- a/lib/diameter/test/diameter_examples_SUITE.erl +++ b/lib/diameter/test/diameter_examples_SUITE.erl @@ -344,7 +344,7 @@ top(Dir, LibDir) -> start({server, Prot}) -> ok = diameter:start(), ok = server:start(), - {ok, Ref} = server:listen(Prot), + {ok, Ref} = server:listen({Prot, any, 3868}), [_] = ?util:lport(Prot, Ref), ok; @@ -352,7 +352,7 @@ start({client = Svc, Prot}) -> ok = diameter:start(), true = diameter:subscribe(Svc), ok = client:start(), - {ok, Ref} = client:connect(Prot), + {ok, Ref} = client:connect({Prot, loopback, loopback, 3868}), receive #diameter_event{info = {up, Ref, _, _, _}} -> ok end; start(Config) -> diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index f3c2af07e6..ffb4a508cd 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -28,8 +28,10 @@ -export([suite/0, all/0, groups/0, + init_per_suite/0, init_per_suite/1, end_per_suite/1, + init_per_group/1, init_per_group/2, end_per_group/2, init_per_testcase/2, @@ -114,6 +116,8 @@ %% diameter_{tcp,sctp} callbacks -export([message/3]). +-include_lib("kernel/include/inet_sctp.hrl"). + -include("diameter.hrl"). -include("diameter_gen_base_rfc3588.hrl"). -include("diameter_gen_base_accounting.hrl"). @@ -129,7 +133,7 @@ %% Positive number of testcases from which to select (randomly) from %% tc(), the list of testcases to run, or [] to run all. The random %% selection is to limit the time it takes for the suite to run. --define(LIMIT, 42). +-define(LIMIT, #{tcp => 42, sctp => 5}). -define(util, diameter_util). @@ -277,12 +281,15 @@ all() -> -define(GROUPS, []). %-define(GROUPS, [[tcp,rfc6733,record,map,false,false,false,false]]). +%% Issues with gen_sctp sporadically cause huge numbers of failed +%% testcases when running testcases in parallel. groups() -> Names = names(), [{P, [P], Ts} || Ts <- [tc()], P <- [shuffle, parallel]] ++ - [{?util:name(N), [], [{group, if S -> shuffle; not S -> parallel end}]} - || [_,_,_,_,S|_] = N <- Names] + [{?util:name(N), [], [{group, if T == sctp; S -> shuffle; + true -> parallel end}]} + || [T,_,_,_,S|_] = N <- Names] ++ [{T, [], [{group, ?util:name(N)} || N <- names(Names, ?GROUPS), T == hd(N)]} @@ -310,6 +317,9 @@ names(_, Names) -> %% -------------------- +init_per_suite() -> + [{timetrap, {seconds, 60}}]. + init_per_suite(Config) -> [{rfc4005, compile_and_load()}, {sctp, ?util:have_sctp()} | Config]. @@ -320,6 +330,9 @@ end_per_suite(_Config) -> %% -------------------- +init_per_group(_) -> + [{timetrap, {seconds, 30}}]. + init_per_group(Name, Config) when Name == shuffle; Name == parallel -> @@ -356,7 +369,7 @@ init_per_group(Name, Config) -> server_decoding = D, server_sender = SS, server_throttle = ST}, - replace([{group, G}, {runlist, select()}], Config); + replace([{group, G}, {runlist, select(T)}], Config); _ -> Config end. @@ -370,9 +383,10 @@ end_per_group(Name, Config) end_per_group(_, _) -> ok. -select() -> - try rand:uniform(?LIMIT) of - N -> lists:sublist(?util:scramble(tc()), max(N,5)) +select(T) -> + try maps:get(T, ?LIMIT) of + N -> + lists:sublist(?util:scramble(tc()), max(5, rand:uniform(N))) catch error:_ -> ?LIMIT end. @@ -527,7 +541,7 @@ add_transports(Config) -> | server_apps()] ++ [{spawn_opt, {erlang, spawn, []}} || CS]), Cs = [?util:connect(CN, - [T, {sender, CS}], + [T, {sender, CS} | client_opts(T)], LRef, [{id, Id} | client_apps(R, [{'Origin-State-Id', origin(Id)}])]) @@ -537,6 +551,14 @@ add_transports(Config) -> Id <- [{D,E}]], ?util:write_priv(Config, "transport", [LRef | Cs]). +client_opts(tcp) -> + []; +client_opts(sctp) -> + [{sctp_initmsg, #sctp_initmsg{num_ostreams = N, + max_instreams = 5}} + || N <- [rand:uniform(8)], + N =< 6]. + server_apps() -> B = have_nas(), [{applications, [diameter_gen_base_rfc3588, diff --git a/lib/diameter/test/diameter_transport_SUITE.erl b/lib/diameter/test/diameter_transport_SUITE.erl index 9d981d0a2b..284d2b9566 100644 --- a/lib/diameter/test/diameter_transport_SUITE.erl +++ b/lib/diameter/test/diameter_transport_SUITE.erl @@ -349,35 +349,40 @@ rand_bytes(N) -> %% start_connect/3 start_connect(Prot, PortNr, Ref) -> - {ok, TPid, [?ADDR]} = start_connect(Prot, - {connect, Ref}, - ?SVC([]), - [{raddr, ?ADDR}, - {rport, PortNr}, - {ip, ?ADDR}, - {port, 0}]), - ?RECV(?TMSG({TPid, connected, _})), + {ok, TPid} = start_connect(Prot, + {connect, Ref}, + ?SVC([]), + [{raddr, ?ADDR}, + {rport, PortNr}, + {ip, ?ADDR}, + {port, 0}]), + connected(Prot, TPid), TPid. +connected(sctp, TPid) -> + ?RECV(?TMSG({TPid, connected, _})); +connected(tcp, TPid) -> + ?RECV(?TMSG({TPid, connected, _, [?ADDR]})). + start_connect(sctp, T, Svc, Opts) -> - diameter_sctp:start(T, Svc, [{sctp_initmsg, ?SCTP_INIT} | Opts]); + {ok, TPid, [?ADDR]} + = diameter_sctp:start(T, Svc, [{sctp_initmsg, ?SCTP_INIT} | Opts]), + {ok, TPid}; start_connect(tcp, T, Svc, Opts) -> diameter_tcp:start(T, Svc, Opts). %% start_accept/2 start_accept(Prot, Ref) -> - {Mod, Opts} = tmod(Prot), - {ok, TPid, [?ADDR]} = Mod:start({accept, Ref}, - ?SVC([?ADDR]), - [{port, 0} | Opts]), + {ok, TPid, [?ADDR]} + = start_accept(Prot, {accept, Ref}, ?SVC([?ADDR]), [{port, 0}]), ?RECV(?TMSG({TPid, connected})), TPid. -tmod(sctp) -> - {diameter_sctp, [{sctp_initmsg, ?SCTP_INIT}]}; -tmod(tcp) -> - {diameter_tcp, []}. +start_accept(sctp, T, Svc, Opts) -> + diameter_sctp:start(T, Svc, [{sctp_initmsg, ?SCTP_INIT} | Opts]); +start_accept(tcp, T, Svc, Opts) -> + diameter_tcp:start(T, Svc, Opts). %% =========================================================================== diff --git a/lib/diameter/test/diameter_util.erl b/lib/diameter/test/diameter_util.erl index 03f79096ac..d249b0e4fa 100644 --- a/lib/diameter/test/diameter_util.erl +++ b/lib/diameter/test/diameter_util.erl @@ -32,7 +32,8 @@ foldl/3, scramble/1, unique_string/0, - have_sctp/0]). + have_sctp/0, + eprof/1]). %% diameter-specific -export([lport/2, @@ -48,6 +49,16 @@ -define(L, atom_to_list). +%% --------------------------------------------------------------------------- + +eprof(start) -> + eprof:start(), + eprof:start_profiling([self()]); + +eprof(stop) -> + eprof:stop_profiling(), + eprof:analyze(), + eprof:stop(). %% --------------------------------------------------------------------------- %% name/2 |