From 9589d10c491f2660995a8424c21d6d72a198c315 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Tue, 15 Jan 2019 12:21:23 +0100 Subject: Fix comment typo --- lib/diameter/src/base/diameter_callback.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/diameter/src/base/diameter_callback.erl b/lib/diameter/src/base/diameter_callback.erl index d04a416bef..3bcf550cd8 100644 --- a/lib/diameter/src/base/diameter_callback.erl +++ b/lib/diameter/src/base/diameter_callback.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2017. All Rights Reserved. +%% Copyright Ericsson AB 2010-2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -70,7 +70,7 @@ -module(diameter_callback). -%% Default callbacks when no aleternate is specified. +%% Default callbacks when no alternate is specified. -export([peer_up/3, peer_down/3, pick_peer/4, -- cgit v1.2.3 From 212ff19f5d9950f680be3b52cb53994b2df734d4 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 15 Feb 2019 15:02:09 +0100 Subject: Document acknowledgements in transport interface This is the {diameter, ack} message from a transport process to its parent that requests that every Diameter request passed to the parent be matched by a incoming send message, the atom false replacing a message() in the case that the request is unanswered, allowing the transport to keep track of the number of outstanding requests. These were added in commit ca09cf7b, and are used to implement the message_cb config in diameter_tcp/sctp, documented in commit cefcaa5c. This commit documents the interface, to make it available to other transport implementations. --- lib/diameter/doc/src/diameter_transport.xml | 31 ++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/lib/diameter/doc/src/diameter_transport.xml b/lib/diameter/doc/src/diameter_transport.xml index 294e8a8864..099c600bf3 100644 --- a/lib/diameter/doc/src/diameter_transport.xml +++ b/lib/diameter/doc/src/diameter_transport.xml @@ -14,7 +14,8 @@
-20112016 +2011 +2019 Ericsson AB. All Rights Reserved. @@ -174,10 +175,13 @@ its parent.

-{diameter, {send, &message;}} +{diameter, {send, &message; | false}}

-An outbound Diameter message.

+An outbound Diameter message. +The atom false can only be received when request +acknowledgements have been requests: see the ack message +below.

{diameter, {close, Pid}} @@ -246,6 +250,27 @@ A LocalAddr list has the same semantics as one returned from &start;.

+{diameter, ack} + +

+Request acknowledgements of unanswered requests. +A transport process should send this once before passing incoming +Diameter messages into diameter. +As a result, every Diameter request passed into diameter with a +recv message (below) will be answered with a +send message (above), either a &message; for the transport +process to send or the atom false if the request has been +discarded or otherwise not answered.

+ +

+This is to allow a transport process to keep count of the number +of incoming request messages that have not yet been answered or +discarded, to allow it to regulate the amount of incoming traffic. +Both diameter_tcp and diameter_sctp request acknowledgements when a +message_cb is configured, turning send/recv message into +callbacks that can be used to regulate traffic.

+
+ {diameter, {recv, &message;}}

-- cgit v1.2.3 From f1cdd72110184460f76630db79ce6fc0ead44ba6 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 18 Jan 2019 13:22:31 +0100 Subject: Tweak/document request handler callback The possibility of configuring an MFA as spawn_opt was added in commit fd285079, the callback being passed an arity-0 fun to be applied in an appropriate handler process. Replace the fun by a tuple to be passed to diameter_traffic:request/1, to avoid passing funs between nodes when handler processes are remote. A list-valued spawn_opt is now equivalent to the following configured as {spawn_opt, {Mod, spawn_local, [Opts]}}. spawn_local(ReqT, Opts) -> spawn_opt(diameter_traffic, request, [ReqT], Opts). ReqT is passed by diameter and contains information that the callback may want to decide where to handle the request in question (which wasn't accessible with a fun), but this information isn't exposed in a documented way. The intention is instead to add an own callback implementation to make use of the information. Note that application lookup now takes place in the watchdog process in both the list-valued (or no configuration) and mfa-valued cases. Whether this is good, bad, or (probably) inconsequential remains to be seen. --- lib/diameter/doc/src/diameter.xml | 24 ++++++-- lib/diameter/src/base/diameter.erl | 4 +- lib/diameter/src/base/diameter_traffic.erl | 86 +++++++++++++++++----------- lib/diameter/test/diameter_traffic_SUITE.erl | 5 +- 4 files changed, 74 insertions(+), 45 deletions(-) diff --git a/lib/diameter/doc/src/diameter.xml b/lib/diameter/doc/src/diameter.xml index dfa4c803ed..b98e55d2bf 100644 --- a/lib/diameter/doc/src/diameter.xml +++ b/lib/diameter/doc/src/diameter.xml @@ -1,7 +1,9 @@ erlang:spawn_opt/2'> + erlang:spawn_opt/5'> erlang:nodes/0'> 2011 -2017 +2019 Ericsson AB. All Rights Reserved. @@ -1384,12 +1386,22 @@ the same peer.

-{spawn_opt, [term()]} +{spawn_opt, [term()] | {M,F,A}}

-Options passed to &spawn_opt; when spawning a process for an -incoming Diameter request. -Options monitor and link are ignored.

+An options list passed to &spawn_opt2; to spawn a handler process for an +incoming Diameter request on the local node, or an MFA that returns +the pid of a handler process.

+ +

+Options monitor and link are ignored in the list-valued +case. +An MFA is applied with an additional term prepended to its argument +list, and should return either the pid of the handler process that +invokes diameter_traffic:request/1 on the term in order to +process the request, or the atom discard. +The handler process need not be local, but diameter must be started on +the remote node.

Defaults to the empty list.

diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index b90b794611..7f172e1fa1 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2017. All Rights Reserved. +%% Copyright Ericsson AB 2010-2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -365,7 +365,7 @@ call(SvcName, App, Message) -> | {connect_timer, 'Unsigned32'()} | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}} | {watchdog_config, [{okay|suspect, non_neg_integer()}]} - | {spawn_opt, list()}. + | {spawn_opt, list() | mfa()}. %% Options passed to start_service/2 diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index d2856ae530..e9acb5c0e8 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2013-2017. All Rights Reserved. +%% Copyright Ericsson AB 2013-2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -44,6 +44,7 @@ %% internal -export([send/1, %% send from remote node + request/1, %% process request in handler process init/1]). %% monitor process start -include_lib("diameter/include/diameter.hrl"). @@ -232,7 +233,7 @@ incr_rc(Dir, Pkt, TPid, MsgDict, AppDict, Dict0) -> -spec receive_message(pid(), Route, #diameter_packet{}, module(), RecvData) -> pid() %% request handler | boolean() %% answer, known request or not - | discard %% request discarded by MFA + | discard %% request discarded when Route :: {Handler, RequestRef, TPid} | Ack, RecvData :: {[SpawnOpt], #recvdata{}}, @@ -252,7 +253,8 @@ receive_message(TPid, Route, Pkt, Dict0, RecvData) -> recv(true, Ack, TPid, Pkt, Dict0, T) when is_boolean(Ack) -> {Opts, RecvData} = T, - spawn_request(Ack, TPid, Pkt, Dict0, RecvData, Opts); + AppT = find_app(TPid, Pkt, RecvData), + ack(Ack, TPid, spawn_request(AppT, Opts, Ack, TPid, Pkt, Dict0, RecvData)); %% ... answer to known request ... recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) -> @@ -274,58 +276,65 @@ recv(false, false, TPid, Pkt, _, _) -> incr(TPid, {{unknown, 0}, recv, discarded}), false. -%% spawn_request/6 +%% spawn_request/7 -%% An MFA should return a pid() or the atom 'discard'. The latter -%% results in an acknowledgment back to the transport process when -%% appropriate, to ensure that send/recv callbacks can count -%% outstanding requests. Acknowledgement is implicit if the +spawn_request(false, _, _, _, _, _, _) -> %% no transport + discard; + +%% An MFA should return the pid() of a process in which the argument +%% fun in applied, or the atom 'discard' if the fun is not applied. +%% The latter results in an acknowledgment back to the transport +%% process when appropriate, to ensure that send/recv callbacks can +%% count outstanding requests. Acknowledgement is implicit if the %% handler process dies (in a handle_request callback for example). -spawn_request(Ack, TPid, Pkt, Dict0, RecvData, {M,F,A}) -> - ReqF = fun() -> - ack(Ack, TPid, recv_request(Ack, TPid, Pkt, Dict0, RecvData)) - end, - ack(Ack, TPid, apply(M, F, [ReqF | A])); +spawn_request(AppT, {M,F,A}, Ack, TPid, Pkt, Dict0, RecvData) -> + %% Term to pass to request/1 in an appropriate process. + ReqT = {Pkt, AppT, Ack, TPid, Dict0, RecvData}, + apply(M, F, [ReqT | A]); %% A spawned process acks implicitly when it dies, so there's no need %% to handle 'discard'. -spawn_request(Ack, TPid, Pkt, Dict0, RecvData, Opts) -> +spawn_request(AppT, Opts, Ack, TPid, Pkt, Dict0, RecvData) -> spawn_opt(fun() -> - recv_request(Ack, TPid, Pkt, Dict0, RecvData) + recv_request(Ack, TPid, Pkt, Dict0, RecvData, AppT) end, Opts). +%% request/1 +%% +%% Called from a handler process chosen by a transport spawn_opt MFA +%% to process an incoming request. + +request({Pkt, AppT, Ack, TPid, Dict0, RecvData} = _ReqT) -> + ack(Ack, TPid, recv_request(Ack, TPid, Pkt, Dict0, RecvData, AppT)). + %% ack/3 ack(Ack, TPid, RC) -> - RC == discard andalso Ack andalso (TPid ! {send, false}), + RC == discard + andalso Ack + andalso (TPid ! {send, false}), RC. %% --------------------------------------------------------------------------- -%% recv_request/5 +%% recv_request/6 %% --------------------------------------------------------------------------- -spec recv_request(Ack :: boolean(), TPid :: pid(), #diameter_packet{}, Dict0 :: module(), - #recvdata{}) + #recvdata{}, + AppT :: {#diameter_app{}, #diameter_caps{}} + | #diameter_caps{}) %% no suitable app -> ok %% answer was sent - | discard %% or not - | false. %% no transport - -recv_request(Ack, - TPid, - #diameter_packet{header = #diameter_header{application_id = Id}} - = Pkt, - Dict0, - #recvdata{peerT = PeerT, - apps = Apps, - counters = Count} - = RecvData) -> + | discard. %% or not + +recv_request(Ack, TPid, Pkt, Dict0, RecvData, AppT) -> Ack andalso (TPid ! {handler, self()}), - case diameter_service:find_incoming_app(PeerT, TPid, Id, Apps) of + case AppT of {#diameter_app{id = Aid, dictionary = AppDict} = App, Caps} -> + Count = RecvData#recvdata.counters, Count andalso incr(recv, Pkt, TPid, AppDict), DecPkt = decode(Aid, AppDict, RecvData, Pkt), Count andalso incr_error(recv, DecPkt, TPid, AppDict), @@ -349,11 +358,20 @@ recv_request(Ack, Dict0, RecvData, DecPkt, - [[]]); - false = No -> %% transport has gone down - No + [[]]) end. +%% find_app/3 +%% +%% Lookup the application of a received Diameter request on the node +%% on which it's received. + +find_app(TPid, + #diameter_packet{header = #diameter_header{application_id = Id}}, + #recvdata{peerT = PeerT, + apps = Apps}) -> + diameter_service:find_incoming_app(PeerT, TPid, Id, Apps). + %% decode/4 decode(Id, Dict, #recvdata{codec = Opts}, Pkt) -> diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index 434aef01dd..47b00c25a2 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2018. All Rights Reserved. +%% Copyright Ericsson AB 2010-2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -539,8 +539,7 @@ add_transports(Config) -> ++ [{unordered, unordered()} || T == sctp], [{capabilities_cb, fun capx/2}, {pool_size, 8} - | server_apps()] - ++ [{spawn_opt, {erlang, spawn, []}} || CS]), + | server_apps()]), Cs = [?util:connect(CN, [T, {sender, CS} | client_opts(T)], LRef, -- cgit v1.2.3 From d9d918b2e31daca8b3d904ffbd26a9e4207b166f Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 20 Feb 2019 01:42:17 +0100 Subject: Add diameter_dist for ready spawn_opt callbacks That is, of functions that can be configured as spawn_opt MFAs in transport configuration. This commits adds the spawn_local described in the parent commit, and a route_session that assumes that the local node initiates all sessions with Session-Id returned by diameter:session_id/1, and handles incoming requests on the node on which the id in question was returned, diameter:session_id/1 using node() as optional value in the Session-Id format. --- lib/diameter/src/base/diameter_dist.erl | 218 ++++++++++++++++++++++ lib/diameter/src/base/diameter_misc_sup.erl | 3 +- lib/diameter/src/base/diameter_traffic.erl | 4 +- lib/diameter/src/modules.mk | 3 +- lib/diameter/test/diameter_distribution_SUITE.erl | 3 +- 5 files changed, 227 insertions(+), 4 deletions(-) create mode 100644 lib/diameter/src/base/diameter_dist.erl diff --git a/lib/diameter/src/base/diameter_dist.erl b/lib/diameter/src/base/diameter_dist.erl new file mode 100644 index 0000000000..cef9522c9d --- /dev/null +++ b/lib/diameter/src/base/diameter_dist.erl @@ -0,0 +1,218 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2019. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(diameter_dist). + +-behaviour(gen_server). + +%% +%% Implements callbacks that can be configured as a spawn_opt +%% transport configuration, to be able to distribute incoming Diameter +%% requests to handler processes (local or remote) in various ways. +%% + +%% spawn_opt callbacks; initial argument constructed in diameter_traffic +-export([spawn_local/2, + spawn_local/1, + route_session/2, + route_session/1]). + +-include_lib("diameter/include/diameter.hrl"). + +%% server start +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, + handle_info/2, + handle_cast/2, + handle_call/3, + code_change/3, + terminate/2]). + +-define(SERVER, ?MODULE). %% server monitoring node connections +-define(TABLE, ?MODULE). %% node() binary -> node() atom + +-define(B(A), atom_to_binary(A, utf8)). + +%% spawn_local/2 +%% +%% Callback that is equivalent to an options list. That is, the +%% following are equivalent when passed as options to +%% diameter:add_transport/2. +%% +%% {spawn_opt, Opts} +%% {spawn_opt, {diameter_dist, spawn_local, [Opts]}} + +spawn_local(ReqT, Opts) -> + spawn_opt(diameter_traffic, request, [ReqT], Opts). + +%% spawn_local/1 + +spawn_local(ReqT) -> + spawn_local(ReqT, []). + +%% route_session/2 +%% +%% Callback that routes requests containing Session-Id AVPs as +%% returned by diameter:session_id/0 back to the node on which the +%% function was called. This is only appropriate when sessions are +%% initiated by the own (typically client) node, and ids have been +%% returned from diameter:session_id/0. + +route_session(ReqT, Opts) -> + #diameter_packet{bin = Bin} = element(1, ReqT), + Node = node_of_session_id(Bin), + spawn_opt(Node, diameter_traffic, request, [ReqT], Opts). + +%% route_session/1 + +route_session(ReqT) -> + route_session(ReqT, []). + +%% node_of_session_id/1 +%% +%% Return the node name encoded as optional value in a Session-Id, +%% assuming the id has been created with diameter:session_id/0. +%% +%% node() is returned if a node name can't be extracted for any +%% reason. + +node_of_session_id(<<_Head:20/binary, Avps/binary>>) -> + sid_node(Avps); + +node_of_session_id(_) -> + node(). + +%% sid_node/1 + +%% Session-Id = Command Code 263, V-bit = 0. +sid_node(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin) -> + case Bin of + <> -> + <<_:8/binary, Sid/binary>> = Avp, + sid_node(Sid, pattern(), 2); %% look for the optional value + _ -> + node() + end; + +%% Jump to the next AVP. This is potentially costly for a message with +%% many AVPs and no Session-Id, which an attacker is prone to send. +%% 8.8 or RFC 6733 says that Session-Id SHOULD (but not MUST) appear +%% immediately following the Diameter Header, so there is no +%% guarantee. +sid_node(<<_:40, Len:24, _/binary>> = Bin) -> + Pad = (4 - (Len rem 4)) rem 4, + case Bin of + <<_:Len/binary, _:Pad/binary, Rest/binary>> -> + sid_node(Rest); + _ -> + node() + end. + +%% sid_node/2 + +%% Lookup the node name to ensure we don't convert arbitrary binaries +%% to atom. +sid_node(Bin, _, 0) -> + case ets:lookup(?TABLE, Bin) of + [{_, Node}] -> + Node; + [] -> + node() + end; + +%% The optional value (if any) of a Session-Id follows the third +%% semicolon. Searching with binary:match/2 does better than matching, +%% especially when the pattern is compiled. +sid_node(Bin, CP, N) -> + case binary:match(Bin, CP) of + {Offset, 1} -> + <<_:Offset/binary, _, Rest/binary>> = Bin, + sid_node(Rest, CP, N-1); + nomatch -> + node() + end. + +%% pattern/0 +%% +%% Since this is being called in a watchdog process, compile the +%% pattern once and maintain it in the process dictionary. + +pattern() -> + case get(?MODULE) of + undefined -> + CP = binary:compile_pattern(<<$;>>), %% tuple + put(?MODULE, CP), + CP; + CP -> + CP + end. + +%% =========================================================================== + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, _Args = [], _Opts = []). + +%% init/1 +%% +%% Maintain [node() | nodes()] in a table that maps from binary-valued +%% names, so we can lookup the corresponding atoms rather than convert +%% binaries that aren't necessarily node names. + +init([]) -> + ets:new(?TABLE, [set, named_table]), + ok = net_kernel:monitor_nodes(true, [{node_type, all}, nodedown_reason]), + ets:insert(?TABLE, [{B,N} || N <- [node() | nodes()], + B <- [?B(N)]]), + {ok, erlang:monotonic_time()}. + +%% handle_call/3 + +handle_call(_, _From, S) -> + {reply, nok, S}. + +%% handle_cast/2 + +handle_cast(_, S) -> + {noreply, S}. + +%% handle_info/2 + +handle_info({nodeup, Node, _}, S) -> + ets:insert(?TABLE, {?B(Node), Node}), + {noreply, S}; + +handle_info({nodedown, Node, _}, S) -> + ets:delete(?TABLE, ?B(Node)), + {noreply, S}; + +handle_info(_, S) -> + {noreply, S}. + +%% terminate/2 + +terminate(_, _) -> + ok. + +%% code_change/3 + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/lib/diameter/src/base/diameter_misc_sup.erl b/lib/diameter/src/base/diameter_misc_sup.erl index 343688be23..fec5a41b5c 100644 --- a/lib/diameter/src/base/diameter_misc_sup.erl +++ b/lib/diameter/src/base/diameter_misc_sup.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ diameter_stats, %% statistics counter management diameter_reg, %% service/property publishing diameter_peer, %% remote peer manager + diameter_dist, %% request distribution diameter_config]). %% configuration/restart %% start_link/0 diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index e9acb5c0e8..b1b797aad8 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -288,7 +288,9 @@ spawn_request(false, _, _, _, _, _, _) -> %% no transport %% count outstanding requests. Acknowledgement is implicit if the %% handler process dies (in a handle_request callback for example). spawn_request(AppT, {M,F,A}, Ack, TPid, Pkt, Dict0, RecvData) -> - %% Term to pass to request/1 in an appropriate process. + %% Term to pass to request/1 in an appropriate process. Module + %% diameter_dist implements callbacks, and uses the form of the + %% argument tuple constructed below. ReqT = {Pkt, AppT, Ack, TPid, Dict0, RecvData}, apply(M, F, [ReqT | A]); diff --git a/lib/diameter/src/modules.mk b/lib/diameter/src/modules.mk index bb86de016a..d16292bb88 100644 --- a/lib/diameter/src/modules.mk +++ b/lib/diameter/src/modules.mk @@ -1,7 +1,7 @@ # %CopyrightBegin% # -# Copyright Ericsson AB 2010-2017. All Rights Reserved. +# Copyright Ericsson AB 2010-2019. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ RT_MODULES = \ base/diameter_config \ base/diameter_config_sup \ base/diameter_codec \ + base/diameter_dist \ base/diameter_gen \ base/diameter_lib \ base/diameter_misc_sup \ diff --git a/lib/diameter/test/diameter_distribution_SUITE.erl b/lib/diameter/test/diameter_distribution_SUITE.erl index 5146f68ff1..92d5c59797 100644 --- a/lib/diameter/test/diameter_distribution_SUITE.erl +++ b/lib/diameter/test/diameter_distribution_SUITE.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2013-2016. All Rights Reserved. +%% Copyright Ericsson AB 2013-2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -76,6 +76,7 @@ {share_peers, peers()}, {use_shared_peers, peers()}, {restrict_connections, false}, + {spawn_opt, {diameter_dist, spawn_local, []}}, {sequence, fun sequence/0}, {application, [{dictionary, ?DICT}, {module, ?MODULE}, -- cgit v1.2.3 From 734a7daf2e556d684850a3cb278684ba522a29de Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 4 Mar 2019 17:31:13 +0100 Subject: Add options to diameter_dist:route_session/2 node selection To be able to restrict how many AVPs will be examined (from the front of a message) when looking for Session-Id, and to decide what to do with if the AVP isn't found. Options are specified as a map of the following form. #{search => non_neg_integer(), default => discard | mfa(), dispatch => list() | mfa()} The search member says how many AVPs to examine at most, from the front of the message. If the optional value of a Session-Id is not the name of a connected node then the default member determines what to do with the request, handle it locally (the default), discard it, or invoke an MFA on the Session-Id | false (if none was found) and diameter_packet record to return a node() | false; if the latter then the request is discarded. If a node is identified then the dispatch MFA is invoked on the node and the request MFA (as three arguments), a list Opts being equivalent to the MFA {erlang, spawn_opt, [Opts]}, and the default being the empty list. Integer- or list-valued options are equivalent to the corresponding map with a single value. Limiting the search is to avoid searching messages containing many AVPs for a Session-Id that is known to occur near the header, since section 8.8 of RFC 6733 says this: When present, the Session-Id SHOULD appear immediately following the Diameter header (see Section 3). There's no guarantee, but in practice it may well be known that peers are respecting the RFC, and in that case limiting the search is a defense against searching messages from a malicious peer unnecessarily. The search is unlimited by default. A default is only used when a search fails to locate a Session-Id, and can be to discard the message, or have a node() or false be returned from an MFA applied to the diameter_packet in question. The local node is chosen by default. --- lib/diameter/src/base/diameter_dist.erl | 174 +++++++++++++++++++++++------- lib/diameter/test/diameter_pool_SUITE.erl | 3 +- 2 files changed, 136 insertions(+), 41 deletions(-) diff --git a/lib/diameter/src/base/diameter_dist.erl b/lib/diameter/src/base/diameter_dist.erl index cef9522c9d..ed2859e914 100644 --- a/lib/diameter/src/base/diameter_dist.erl +++ b/lib/diameter/src/base/diameter_dist.erl @@ -47,6 +47,8 @@ code_change/3, terminate/2]). +-type request() :: tuple(). %% callback argument from diameter_traffic + -define(SERVER, ?MODULE). %% server monitoring node connections -define(TABLE, ?MODULE). %% node() binary -> node() atom @@ -61,6 +63,9 @@ %% {spawn_opt, Opts} %% {spawn_opt, {diameter_dist, spawn_local, [Opts]}} +-spec spawn_local(ReqT :: request(), Opts :: list()) + -> pid(). + spawn_local(ReqT, Opts) -> spawn_opt(diameter_traffic, request, [ReqT], Opts). @@ -74,12 +79,48 @@ spawn_local(ReqT) -> %% Callback that routes requests containing Session-Id AVPs as %% returned by diameter:session_id/0 back to the node on which the %% function was called. This is only appropriate when sessions are -%% initiated by the own (typically client) node, and ids have been -%% returned from diameter:session_id/0. +%% only initiated by the own (typically client) node, and ids have +%% been returned from diameter:session_id/0. +%% +%% This can be used with #{search => 0} to route on something other +%% than Session-Id since default can be an MFA returning a node() +%% (applied to the incoming diameter_packet record) and dispatch can +%% be an MFA returning a pid() (applied to Node and the request MFA), +%% but this is no simpler than just implementing an own spawn_opt +%% callback. (Except with the default dispatch possibly.) + +-spec route_session(ReqT :: request(), Opts) + -> discard + | pid() + when Opts :: pos_integer() %% aka #{search => N} + | list() %% aka #{dispatch => Opts} + | #{search => non_neg_integer(), %% limit number of examined AVPs + default => discard | mfa(), %% return node() | false + dispatch => list() | mfa()}. %% spawn options or return pid() route_session(ReqT, Opts) -> - #diameter_packet{bin = Bin} = element(1, ReqT), - Node = node_of_session_id(Bin), + #diameter_packet{bin = Bin} = Pkt = element(1, ReqT), + Sid = session_id(avps(Bin), search(Opts)), + Node = default(node_of_session_id(Sid), Sid, Opts, Pkt), + dispatch(Node, ReqT, dispatch(Opts)). + +%% avps/1 + +avps(<<_:20/binary, Bin/binary>>) -> + Bin; + +avps(_) -> + false. + +%% dispatch/3 + +dispatch(false, _, _) -> + discard; + +dispatch(Node, ReqT, {M,F,A}) -> + apply(M, F, [Node, diameter_traffic, request, [ReqT] | A]); + +dispatch(Node, ReqT, Opts) -> spawn_opt(Node, diameter_traffic, request, [ReqT], Opts). %% route_session/1 @@ -90,27 +131,34 @@ route_session(ReqT) -> %% node_of_session_id/1 %% %% Return the node name encoded as optional value in a Session-Id, -%% assuming the id has been created with diameter:session_id/0. -%% -%% node() is returned if a node name can't be extracted for any -%% reason. +%% assuming the id has been created with diameter:session_id/0. Lookup +%% the node name to ensure we don't convert arbitrary binaries to +%% atom. -node_of_session_id(<<_Head:20/binary, Avps/binary>>) -> - sid_node(Avps); +node_of_session_id([_, _, _, Bin]) -> + case ets:lookup(?TABLE, Bin) of + [{_, Node}] -> + Node; + [] -> + false + end; node_of_session_id(_) -> - node(). + false. + +%% session_id/2 -%% sid_node/1 +session_id(_, 0) -> %% give up + false; %% Session-Id = Command Code 263, V-bit = 0. -sid_node(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin) -> +session_id(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin, _) -> case Bin of <> -> <<_:8/binary, Sid/binary>> = Avp, - sid_node(Sid, pattern(), 2); %% look for the optional value + split(Sid); _ -> - node() + false end; %% Jump to the next AVP. This is potentially costly for a message with @@ -118,38 +166,41 @@ sid_node(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin) -> %% 8.8 or RFC 6733 says that Session-Id SHOULD (but not MUST) appear %% immediately following the Diameter Header, so there is no %% guarantee. -sid_node(<<_:40, Len:24, _/binary>> = Bin) -> +session_id(<<_:40, Len:24, _/binary>> = Bin, N) -> Pad = (4 - (Len rem 4)) rem 4, case Bin of <<_:Len/binary, _:Pad/binary, Rest/binary>> -> - sid_node(Rest); + session_id(Rest, if N == infinity -> N; true -> N-1 end); _ -> - node() - end. + false + end; -%% sid_node/2 +session_id(_, _) -> + false. -%% Lookup the node name to ensure we don't convert arbitrary binaries -%% to atom. -sid_node(Bin, _, 0) -> - case ets:lookup(?TABLE, Bin) of - [{_, Node}] -> - Node; - [] -> - node() - end; +%% split/1 +%% +%% Split a Session-Id at no more than three semicolons: the optional +%% value (if any) follows the third. binary:split/2 does better than +%% matching character by character, especially when the pattern is +%% compiled. -%% The optional value (if any) of a Session-Id follows the third -%% semicolon. Searching with binary:match/2 does better than matching, -%% especially when the pattern is compiled. -sid_node(Bin, CP, N) -> - case binary:match(Bin, CP) of - {Offset, 1} -> - <<_:Offset/binary, _, Rest/binary>> = Bin, - sid_node(Rest, CP, N-1); - nomatch -> - node() - end. +split(Bin) -> + split(3, Bin, pattern()). + +%% split/3 + +split(0, Bin, _) -> + [Bin]; + +split(N, Bin, Pattern) -> + [H|T] = binary:split(Bin, Pattern), + [H | case T of + [] -> + T; + [Rest] -> + split(N-1, Rest, Pattern) + end]. %% pattern/0 %% @@ -166,6 +217,49 @@ pattern() -> CP end. +%% dispatch/1 + +dispatch(#{} = Opts) -> + maps:get(dispatch, Opts, []); + +dispatch(Opts) + when is_list(Opts) -> + Opts; + +dispatch(_) -> + []. + +%% search/1 +%% +%% Bound number of AVPs examined when looking for Session-Id. + +search(#{search := N}) + when is_integer(N), 0 =< N -> + N; + +search(N) + when is_integer(N), 0 =< N -> + N; + +search(_) -> + infinity. + +%% default/3 +%% +%% Choose a node when Session-Id lookup has failed. + +default(false = No, _, #{default := discard}, _) -> + No; + +default(false, Sid, #{default := {M,F,A}}, Pkt) -> + apply(M, F, [Sid, Pkt | A]); %% false | node() + +default(false, _, _, _) -> + node(); + +default(Node, _, _, _) -> + Node. + %% =========================================================================== start_link() -> diff --git a/lib/diameter/test/diameter_pool_SUITE.erl b/lib/diameter/test/diameter_pool_SUITE.erl index 97c16940ff..a36a4fa17a 100644 --- a/lib/diameter/test/diameter_pool_SUITE.erl +++ b/lib/diameter/test/diameter_pool_SUITE.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2015-2017. All Rights Reserved. +%% Copyright Ericsson AB 2015-2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -51,6 +51,7 @@ {'Auth-Application-Id', [0]}, %% common {'Acct-Application-Id', [3]}, %% accounting {restrict_connections, false}, + {spawn_opt, {diameter_dist, route_session, []}}, {application, [{alias, common}, {dictionary, diameter_gen_base_rfc6733}, {module, diameter_callback}]}, -- cgit v1.2.3 From 376e8fac401bd11b3cb3d2f9661eb7ff0b9bbcd7 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 6 Mar 2019 17:13:07 +0100 Subject: Add consistent hashing to diameter_dist:route_session/2 If the Session-Id optional value to node() mapping fails then hash Session-Id to a node by default, instead of selecting the local node as in the parent commit. The previous behaviour is configurable by setting default = local in an options map. Nodes make themselves part of the pool from which nodes are selected by calling diameter_dist:attach/1 with the list of service names they are willing to handle requests for, the local node being selected in the absence of any attached nodes. The original idea was to base the node pool on share_peers and/or use_shared_peers configuration, but that configuration determines where outgoing requests can be sent, while route_session/2 deals with incoming requests, so it's not obvious that conflating the two is a good thing. (Also because share_peers/use_shared_peers can be used in different ways; the former could have been skipped entirely.) The hashing effectively places nodes on a circle, a hashed Session-Id being mapped to the nearest predecessor node (clockwise). Nodes are rehashed with each Session-Id (with the id as salt) for a more even distribution, at the cost of performance, although how high the cost or how even the distribution has yet to be tested. Obviously, the larger the number of attached nodes, the higher the cost. Adding/removing an attached node only affects session ids that hash in the interval between the added/removed node and its successor (hence consistent hashing). Options are tweaked slightly compared to the parent commit, and it is now possible to restrict the optional value mapping to specific Diameter identities, to avoid mapping an id that was generated at the peer when the peer is also implemented with the diameter application. Note that diameter_dist is not yet an officially documented interface, so could change. Documentation is in the module itself. --- lib/diameter/src/base/diameter_dist.erl | 293 +++++++++++++++++++++++++---- lib/diameter/src/base/diameter_traffic.erl | 13 +- 2 files changed, 264 insertions(+), 42 deletions(-) diff --git a/lib/diameter/src/base/diameter_dist.erl b/lib/diameter/src/base/diameter_dist.erl index ed2859e914..5c29ea95a4 100644 --- a/lib/diameter/src/base/diameter_dist.erl +++ b/lib/diameter/src/base/diameter_dist.erl @@ -28,12 +28,20 @@ %% requests to handler processes (local or remote) in various ways. %% -%% spawn_opt callbacks; initial argument constructed in diameter_traffic +%% spawn_opt callbacks -export([spawn_local/2, spawn_local/1, route_session/2, route_session/1]). +%% signal availability for handling incoming requests to route_sesssion/2 +-export([attach/1, + detach/1]). + +%% consistent hashing +-export([hash/3, %% for use as default MFA in route_session/2 options map + hash/2]). %% arbitrary key/values + -include_lib("diameter/include/diameter.hrl"). %% server start @@ -50,9 +58,21 @@ -type request() :: tuple(). %% callback argument from diameter_traffic -define(SERVER, ?MODULE). %% server monitoring node connections --define(TABLE, ?MODULE). %% node() binary -> node() atom + +%% Maps a node name binary to the corresponding atom. Used by +%% route_session/2 to map the optional value of a Session-Id to +%% node(). +-define(NODE_TABLE, diameter_dist_node). + +%% Maps a diameter:service_name() to a node() that has called attach/1 +%% to declare its willingness to handle incoming requests for the +%% service. Use by route_session/2 in case the optional value mapping +%% has failed. +-define(SERVICE_TABLE, diameter_dist_service). -define(B(A), atom_to_binary(A, utf8)). +-define(ORCOND(List), list_to_tuple(['orelse', false | List])). +-define(HASH(T), erlang:phash2(T, 16#100000000)). %% spawn_local/2 %% @@ -76,32 +96,82 @@ spawn_local(ReqT) -> %% route_session/2 %% -%% Callback that routes requests containing Session-Id AVPs as -%% returned by diameter:session_id/0 back to the node on which the -%% function was called. This is only appropriate when sessions are -%% only initiated by the own (typically client) node, and ids have -%% been returned from diameter:session_id/0. +%% Callback that maps the Session-Id of an incoming request to a +%% handler node. %% -%% This can be used with #{search => 0} to route on something other -%% than Session-Id since default can be an MFA returning a node() -%% (applied to the incoming diameter_packet record) and dispatch can -%% be an MFA returning a pid() (applied to Node and the request MFA), -%% but this is no simpler than just implementing an own spawn_opt -%% callback. (Except with the default dispatch possibly.) - +%% With an options list, maps an id whose optional value is the name +%% of a connected node to the same node, to handle the case that the +%% session id has been returned from diameter:session_id/1; otherwise +%% to a node that has called diameter_dist:attach/1 using the +%% consistent hashing provided by hash/3, or to the local node() if a +%% session id could not be extracted or there are no attached nodes. A +%% handler process is spawned on the selected node using +%% erlang:spawn_opt/4. +%% +%% Different behaviour can be configured by supplying an options map +%% of the following form: +%% +%% #{search => non_neg_integer(), +%% id => [binary()], +%% default => discard | local | mfa(), +%% dispatch => list() | mfa()} +%% +%% The search member limits the number of AVPs that are examined in +%% the message (from the front), to avoid searching entire message in +%% case it's known that peers follow RFC 6733's recommendation that +%% Session-Id be placed at the head of a message. The default is to +%% search the entire message. +%% +%% The id member restricts the optional value mapping to session ids +%% whose DiamterIdentity is one of those specified. Set this to the +%% list of Diameter identities advertised by the service in question +%% (typically one) to ensure that only locally generated session ids +%% are mapped; or to the empty list to disable the mapping. +%% +%% The default member determines where to handle a message whose +%% Session-Id isn't found or whose optional value isn't mapped to the +%% name of a connected node. The atom local says the local node, an +%% MFA is invoked on Session-Id | false, the name of the diameter +%% service, and the message binary, and should return either a node() +%% or false to discard the message. Defaults to {diameter_dist, hash, []}. +%% +%% The dispatch member determines how the pid() of the request handler +%% process is retrieved. An MFA is applied to a previously selected +%% node(), and the module, function, and arguments list to apply in +%% the handler process to handle the request, the MFA being supplied +%% by diameter, and returns pid() | discard. A list is equivalent to +%% {erlang, spawn_opt, []}. Defaults to []. +%% +%% This can be used with search = 0 to route on something other than +%% Session-Id, but this is probably no simpler than just implementing +%% an own spawn_opt callback. (Except with the default dispatch possibly.) +%% +%% Note that if the peer is also implemented with OTP diameter and +%% generating session ids with diameter:session_id/1 then +%% route_session/2 can map an optional value to a local node that +%% happens to have the same name as one of the peer's nodes. This +%% could lead to an uneven distribution; for example, if the peer +%% nodes are a subset of the local nodes. In practice, it's typically +%% known if it's peers or the local node originating sessions; if the +%% former then setting id = [] disables the optional value mapping, if +%% the latter then setting default = local disables the hashing. -spec route_session(ReqT :: request(), Opts) -> discard | pid() when Opts :: pos_integer() %% aka #{search => N} | list() %% aka #{dispatch => Opts} | #{search => non_neg_integer(), %% limit number of examined AVPs - default => discard | mfa(), %% return node() | false - dispatch => list() | mfa()}. %% spawn options or return pid() + id => [binary()], %% restrict optional value map on DiamIdent + default => local %% handle locally + | discard + | mfa(), %% return node() | false + dispatch => list() %% spawn options + | mfa()}. %% (Node, M, F, A) -> pid() | discard route_session(ReqT, Opts) -> - #diameter_packet{bin = Bin} = Pkt = element(1, ReqT), + {_, Bin} = Info = diameter_traffic:request_info(ReqT), Sid = session_id(avps(Bin), search(Opts)), - Node = default(node_of_session_id(Sid), Sid, Opts, Pkt), + Node = default(node_of_session_id(Sid, Opts), Sid, Opts, Info), dispatch(Node, ReqT, dispatch(Opts)). %% avps/1 @@ -128,24 +198,31 @@ dispatch(Node, ReqT, Opts) -> route_session(ReqT) -> route_session(ReqT, []). -%% node_of_session_id/1 +%% node_of_session_id/2 %% %% Return the node name encoded as optional value in a Session-Id, %% assuming the id has been created with diameter:session_id/0. Lookup %% the node name to ensure we don't convert arbitrary binaries to %% atom. -node_of_session_id([_, _, _, Bin]) -> - case ets:lookup(?TABLE, Bin) of - [{_, Node}] -> - Node; - [] -> - false - end; +node_of_session_id([Id, _, _, Bin], #{id := Ids}) -> + lists:member(Id, Ids) andalso nodemap(Bin); -node_of_session_id(_) -> +node_of_session_id([_, _, _, Bin], _) -> + nodemap(Bin); + +node_of_session_id(_, _) -> false. +%% nodemap/1 + +nodemap(Bin) -> + try + ets:lookup_element(?NODE_TABLE, Bin, 2) + catch + error: badarg -> false + end. + %% session_id/2 session_id(_, 0) -> %% give up @@ -154,7 +231,7 @@ session_id(_, 0) -> %% give up %% Session-Id = Command Code 263, V-bit = 0. session_id(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin, _) -> case Bin of - <> -> + <> -> <<_:8/binary, Sid/binary>> = Avp, split(Sid); _ -> @@ -248,20 +325,123 @@ search(_) -> %% %% Choose a node when Session-Id lookup has failed. -default(false = No, _, #{default := discard}, _) -> - No; - -default(false, Sid, #{default := {M,F,A}}, Pkt) -> - apply(M, F, [Sid, Pkt | A]); %% false | node() +default(false, _, #{default := discard}, _) -> + false; -default(false, _, _, _) -> +default(false, _, #{default := local}, _) -> node(); +default(false, Sid, #{default := {M,F,A}}, Info) -> + {ServiceName, Bin} = Info, + apply(M, F, [Sid, ServiceName, Bin | A]); %% node() | false + +default(false, Sid, _, Info) -> %% aka {?MODULE, hash, []} + {ServiceName, Bin} = Info, + hash(Sid, ServiceName, Bin); + default(Node, _, _, _) -> Node. %% =========================================================================== +%% hash/3 +%% +%% Consistent hashing of Session-Id to an attached node, or the local +%% node if Session-Id = false or no attached nodes. + +hash(Sid, ServiceName, _) -> + case false /= Sid andalso attached(ServiceName) of + [_|_] = Nodes -> + hash(Sid, Nodes); + _ -> + node() + end. + +%% hash/2 +%% +%% Consistent hashing on arbitrary key/values. Returns false if the +%% list is empty. + +%% No key or no values. +hash(_, []) -> + false; + +%% Not much choice. +hash(_, [Value]) -> + Value; + +%% Hash on a circle and choose the closest predecessor. +hash(Key, Values) -> + Hash = ?HASH(Key), + tl(lists:foldl(fun(V,A) -> + choose(Hash, [?HASH({Key, V}) | V], A) + end, + false, %% < list() + Values)). + +%% choose/3 + +choose(Hash, [Hash1 | _] = T, [Hash2 | _]) + when Hash1 =< Hash, Hash < Hash2 -> + T; + +choose(Hash, [Hash1 | _], [Hash2 | _] = T) + when Hash2 =< Hash, Hash < Hash1 -> + T; + +choose(_, T1, T2) -> + max(T1, T2). + +%% =========================================================================== + +%% attach/1 +%% +%% Register the local node as a handler of incoming requests for the +%% specified services when using the route_session/2 spawn_opt +%% callback. + +attach(ServiceNames) -> + abcast({attach, node(), ServiceNames}). + +%% detach/1 +%% +%% Deregister the local node as a handler of incoming requests. + +detach(ServiceNames) -> + abcast({detach, node(), ServiceNames}). + +%% abcast/1 + +abcast(T) -> + gen_server:abcast([node() | nodes()], ?SERVER, T), + ok. + +%% attached/1 + +attached(ServiceName) -> + try + ets:lookup_element(?SERVICE_TABLE, ServiceName, 2) + catch + error: badarg -> [] + end. + +%% cast/2 + +cast(Node, T) -> + gen_server:cast({?SERVER, Node}, T). + +%% attach/2 + +attach(Node, S) -> + case sets:to_list(S) of + [] -> + ok; + Services -> + cast(Node, {attach, node(), Services}) + end. + +%% =========================================================================== + start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, _Args = [], _Opts = []). @@ -272,11 +452,12 @@ start_link() -> %% binaries that aren't necessarily node names. init([]) -> - ets:new(?TABLE, [set, named_table]), + ets:new(?NODE_TABLE, [set, named_table]), + ets:new(?SERVICE_TABLE, [bag, named_table]), ok = net_kernel:monitor_nodes(true, [{node_type, all}, nodedown_reason]), - ets:insert(?TABLE, [{B,N} || N <- [node() | nodes()], - B <- [?B(N)]]), - {ok, erlang:monotonic_time()}. + ets:insert(?NODE_TABLE, [{?B(N), N} || N <- [node() | nodes()]]), + abcast({attach, node()}), + {ok, sets:new()}. %% handle_call/3 @@ -285,17 +466,49 @@ handle_call(_, _From, S) -> %% handle_cast/2 +%% Remote node is asking which services the local node wants to handle. +handle_cast({attach, Node}, S) + when Node /= node() -> + attach(Node, S), + {noreply, S}; + +%% Node wants to handle incoming requests ... +handle_cast({attach, Node, ServiceNames}, S) -> + ets:insert(?SERVICE_TABLE, [{N, Node} || N <- ServiceNames]), + {noreply, case node() of + Node -> + sets:union(S, sets:from_list(ServiceNames)); + _ -> + S + end}; + +%% ... or not. +handle_cast({detach, Node, ServiceNames}, S) -> + ets:select_delete(?SERVICE_TABLE, [{{'$1', Node}, + [?ORCOND([{'==', '$1', {const, N}} + || N <- ServiceNames])], + [true]}]), + {noreply, case node() of + Node -> + sets:subtract(S, sets:from_list(ServiceNames)); + _ -> + S + end}; + handle_cast(_, S) -> {noreply, S}. %% handle_info/2 handle_info({nodeup, Node, _}, S) -> - ets:insert(?TABLE, {?B(Node), Node}), + ets:insert(?NODE_TABLE, {?B(Node), Node}), + cast(Node, {attach, node()}), %% ask which services remote node handles + attach(Node, S), %% say which service local node handles {noreply, S}; handle_info({nodedown, Node, _}, S) -> - ets:delete(?TABLE, ?B(Node)), + ets:delete(?NODE_TABLE, ?B(Node)), + ets:select_delete(?SERVICE_TABLE, [{{'_', Node}, [], [true]}]), {noreply, S}; handle_info(_, S) -> diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index b1b797aad8..c0643402a6 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -42,6 +42,9 @@ peer_up/1, peer_down/1]). +%% towards diameter_dist +-export([request_info/1]). + %% internal -export([send/1, %% send from remote node request/1, %% process request in handler process @@ -289,8 +292,7 @@ spawn_request(false, _, _, _, _, _, _) -> %% no transport %% handler process dies (in a handle_request callback for example). spawn_request(AppT, {M,F,A}, Ack, TPid, Pkt, Dict0, RecvData) -> %% Term to pass to request/1 in an appropriate process. Module - %% diameter_dist implements callbacks, and uses the form of the - %% argument tuple constructed below. + %% diameter_dist implements callbacks. ReqT = {Pkt, AppT, Ack, TPid, Dict0, RecvData}, apply(M, F, [ReqT | A]); @@ -302,6 +304,13 @@ spawn_request(AppT, Opts, Ack, TPid, Pkt, Dict0, RecvData) -> end, Opts). +%% request_info/1 +%% +%% Limited request information for diameter_dist. + +request_info({Pkt, _AppT, _Ack, _TPid, _Dict0, RecvData} = _ReqT) -> + {RecvData#recvdata.service_name, Pkt#diameter_packet.bin}. + %% request/1 %% %% Called from a handler process chosen by a transport spawn_opt MFA -- cgit v1.2.3 From d46fcfdcac4b0dcea41add605f9419a7ea17e32c Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 4 Mar 2019 14:40:17 +0100 Subject: Add diameter_dist_SUITE to exercise diameter_dist:route_session/2 Spread a server over three nodes, one of which terminates a peer connection, the other two to handle requests. Terminate transport on one of the server nodes and ensure that answers come only from the other two. --- lib/diameter/test/diameter_dist_SUITE.erl | 332 ++++++++++++++++++++++ lib/diameter/test/diameter_distribution_SUITE.erl | 4 +- lib/diameter/test/modules.mk | 3 +- 3 files changed, 337 insertions(+), 2 deletions(-) create mode 100644 lib/diameter/test/diameter_dist_SUITE.erl diff --git a/lib/diameter/test/diameter_dist_SUITE.erl b/lib/diameter/test/diameter_dist_SUITE.erl new file mode 100644 index 0000000000..b2e4c35b9a --- /dev/null +++ b/lib/diameter/test/diameter_dist_SUITE.erl @@ -0,0 +1,332 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2019. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% Tests of traffic between two Diameter nodes, the server being +%% spread across three Erlang nodes. +%% + +-module(diameter_dist_SUITE). + +-export([suite/0, + all/0]). + +%% testcases +-export([enslave/1, enslave/0, + ping/1, + start/1, + connect/1, + send/1, + stop/1, stop/0]). + +%% diameter callbacks +-export([peer_up/3, + peer_down/3, + pick_peer/4, + prepare_request/3, + prepare_retransmit/3, + handle_answer/4, + handle_error/4, + handle_request/3]). + +-export([call/1]). + +-include("diameter.hrl"). +-include("diameter_gen_base_rfc6733.hrl"). + +%% =========================================================================== + +-define(util, diameter_util). + +-define(CLIENT, 'CLIENT'). +-define(SERVER, 'SERVER'). +-define(REALM, "erlang.org"). +-define(DICT, diameter_gen_base_rfc6733). +-define(ADDR, {127,0,0,1}). + +%% Config for diameter:start_service/2. +-define(SERVICE(Host), + [{'Origin-Host', Host ++ [$.|?REALM]}, + {'Origin-Realm', ?REALM}, + {'Host-IP-Address', [?ADDR]}, + {'Vendor-Id', 12345}, + {'Product-Name', "OTP/diameter"}, + {'Auth-Application-Id', [?DICT:id()]}, + {'Origin-State-Id', origin()}, + {spawn_opt, {diameter_dist, route_session, [#{id => []}]}}, + {sequence, fun sequence/0}, + {string_decode, false}, + {application, [{dictionary, ?DICT}, + {module, ?MODULE}, + {request_errors, callback}, + {answer_errors, callback}]}]). + +-define(SUCCESS, 2001). +-define(BUSY, 3004). +-define(LOGOUT, ?'DIAMETER_BASE_TERMINATION-CAUSE_LOGOUT'). +-define(MOVED, ?'DIAMETER_BASE_TERMINATION-CAUSE_USER_MOVED'). +-define(TIMEOUT, ?'DIAMETER_BASE_TERMINATION-CAUSE_SESSION_TIMEOUT'). + +-define(L, atom_to_list). +-define(A, list_to_atom). + +%% The order here is significant and causes the server to listen +%% before the clients connect. The server listens on the first node, +%% and distributes requests to the other two. +-define(NODES, [{server0, ?SERVER}, + {server1, ?SERVER}, + {server2, ?SERVER}, + {client, ?CLIENT}]). + +%% Options to ct_slave:start/2. +-define(TIMEOUTS, [{T, 15000} || T <- [boot_timeout, + init_timeout, + start_timeout]]). + +%% =========================================================================== + +suite() -> + [{timetrap, {seconds, 60}}]. + +all() -> + [enslave, + ping, + start, + connect, + send, + stop]. + +%% =========================================================================== +%% start/stop testcases + +%% enslave/1 +%% +%% Start four slave nodes, three to implement a Diameter server, +%% one to implement a client. + +enslave() -> + [{timetrap, {seconds, 30*length(?NODES)}}]. + +enslave(Config) -> + Here = filename:dirname(code:which(?MODULE)), + Ebin = filename:join([Here, "..", "ebin"]), + Dirs = [Here, Ebin], + Nodes = [{N,S} || {M,S} <- ?NODES, N <- [slave(M, Dirs)]], + ?util:write_priv(Config, nodes, [{N,S} || {{N,ok},S} <- Nodes]), + [] = [{T,S} || {{_,E} = T, S} <- Nodes, E /= ok]. + +slave(Name, Dirs) -> + add_pathsa(Dirs, ct_slave:start(Name, ?TIMEOUTS)). + +add_pathsa(Dirs, {ok, Node}) -> + {Node, rpc:call(Node, code, add_pathsa, [Dirs])}; +add_pathsa(_, No) -> + {No, error}. + +%% ping/1 +%% +%% Ensure the server nodes are connected so that diameter_dist can attach. + +ping({S, Nodes}) -> + ?SERVER = S, + [N || {N,_} <- Nodes, + node() /= N, + pang <- [net_adm:ping(N)]]; + +ping(Config) -> + Nodes = lists:droplast(?util:read_priv(Config, nodes)), + [] = [{N,RC} || {N,S} <- Nodes, + RC <- [rpc:call(N, ?MODULE, ping, [{S,Nodes}])], + RC /= []]. + +%% start/1 +%% +%% Start diameter services. + +start(SvcName) + when is_atom(SvcName) -> + ok = diameter:start(), + ok = diameter:start_service(SvcName, ?SERVICE((?L(SvcName)))); + +start(Config) -> + Nodes = ?util:read_priv(Config, nodes), + [] = [{N,RC} || {N,S} <- Nodes, + RC <- [rpc:call(N, ?MODULE, start, [S])], + RC /= ok]. + +sequence() -> + sequence(sname()). + +sequence(client) -> + {0,32}; +sequence(Server) -> + "server" ++ N = ?L(Server), + {list_to_integer(N), 30}. + +origin() -> + origin(sname()). + +origin(client) -> + 99; +origin(Server) -> + "server" ++ N = ?L(Server), + list_to_integer(N). + +%% connect/1 +%% +%% Establish one connection from the client, terminated on the first +%% server node, the others handling requests. + +connect({?SERVER, Config, [{Node, _} | _]}) -> + if Node == node() -> %% server0 + ?util:write_priv(Config, lref, {Node, ?util:listen(?SERVER, tcp)}); + true -> + diameter_dist:attach([?SERVER]) + end, + ok; + +connect({?CLIENT, Config, _}) -> + ?util:connect(?CLIENT, tcp, ?util:read_priv(Config, lref)), + ok; + +connect(Config) -> + Nodes = ?util:read_priv(Config, nodes), + [] = [{N,RC} || {N,S} <- Nodes, + RC <- [rpc:call(N, ?MODULE, connect, [{S, Config, Nodes}])], + RC /= ok]. + +%% stop/1 +%% +%% Stop the slave nodes. + +stop() -> + [{timetrap, {seconds, 30*length(?NODES)}}]. + +stop(_Config) -> + [] = [{N,E} || {N,_} <- ?NODES, + {error, _, _} = E <- [ct_slave:stop(N)]]. + +%% =========================================================================== +%% traffic testcases + +%% send/1 +%% +%% Send 100 requests and ensure the node name sent as User-Name isn't +%% the node terminating transport. + +send(Config) -> + send(Config, 100, dict:new()). + +%% send/2 + +send(Config, 0, Dict) -> + [{Server0, _} | _] = ?util:read_priv(Config, nodes) , + Node = atom_to_binary(Server0, utf8), + {false, _} = {dict:is_key(Node, Dict), dict:to_list(Dict)}; + +send(Config, N, Dict) -> + #diameter_base_STA{'Result-Code' = ?SUCCESS, + 'User-Name' = [ServerNode]} + = send(Config, str(?LOGOUT)), + true = is_binary(ServerNode), + send(Config, N-1, dict:update_counter(ServerNode, 1, Dict)). + +%% =========================================================================== + +str(Cause) -> + #diameter_base_STR{'Destination-Realm' = ?REALM, + 'Auth-Application-Id' = ?DICT:id(), + 'Termination-Cause' = Cause}. + +%% send/2 + +send(Config, Req) -> + {Node, _} = lists:last(?util:read_priv(Config, nodes)), + rpc:call(Node, ?MODULE, call, [Req]). + +%% call/1 + +call(Req) -> + diameter:call(?CLIENT, ?DICT, Req, []). + +%% sname/0 + +sname() -> + ?A(hd(string:tokens(?L(node()), "@"))). + +%% =========================================================================== +%% diameter callbacks + +%% peer_up/3 + +peer_up(_SvcName, _Peer, State) -> + State. + +%% peer_down/3 + +peer_down(_SvcName, _Peer, State) -> + State. + +%% pick_peer/4 + +pick_peer([Peer], [], ?CLIENT, _State) -> + {ok, Peer}. + +%% prepare_request/3 + +prepare_request(Pkt, ?CLIENT, {_Ref, Caps}) -> + #diameter_packet{msg = Req} + = Pkt, + #diameter_caps{origin_host = {OH, _}, + origin_realm = {OR, _}} + = Caps, + {send, Req#diameter_base_STR{'Origin-Host' = OH, + 'Origin-Realm' = OR, + 'Session-Id' = diameter:session_id(OH)}}. + +%% prepare_retransmit/3 + +prepare_retransmit(_, ?CLIENT, _) -> + discard. + +%% handle_answer/5 + +handle_answer(Pkt, _Req, ?CLIENT, _Peer) -> + #diameter_packet{msg = Rec, errors = []} = Pkt, + Rec. + +%% handle_error/5 + +handle_error(Reason, _Req, ?CLIENT, _Peer) -> + {error, Reason}. + +%% handle_request/3 + +handle_request(Pkt, ?SERVER, {_, Caps}) -> + #diameter_packet{msg = #diameter_base_STR{'Session-Id' = SId}} + = Pkt, + #diameter_caps{origin_host = {OH, _}, + origin_realm = {OR, _}} + = Caps, + {reply, #diameter_base_STA{'Result-Code' = ?SUCCESS, + 'Session-Id' = SId, + 'Origin-Host' = OH, + 'Origin-Realm' = OR, + 'User-Name' = [atom_to_binary(node(), utf8)]}}. diff --git a/lib/diameter/test/diameter_distribution_SUITE.erl b/lib/diameter/test/diameter_distribution_SUITE.erl index 92d5c59797..5fe02284ae 100644 --- a/lib/diameter/test/diameter_distribution_SUITE.erl +++ b/lib/diameter/test/diameter_distribution_SUITE.erl @@ -126,7 +126,7 @@ all() -> %% enslave/1 %% %% Start four slave nodes, one to implement a Diameter server, -%% two three to implement a client. +%% three to implement a client. enslave() -> [{timetrap, {seconds, 30*length(?NODES)}}]. @@ -332,6 +332,8 @@ prepare_request(Pkt, ?CLIENT, {_Ref, Caps}, {_, client0}) -> 'Origin-Realm' = OR, 'Session-Id' = diameter:session_id(OH)}}. +%% prepare_retransmit/4 + prepare_retransmit(Pkt, ?CLIENT, _, {_, client0}) -> #diameter_packet{msg = #diameter_base_STR{'Termination-Cause' = ?MOVED}} = Pkt, %% assert diff --git a/lib/diameter/test/modules.mk b/lib/diameter/test/modules.mk index 0c73adca12..90b0a25d5f 100644 --- a/lib/diameter/test/modules.mk +++ b/lib/diameter/test/modules.mk @@ -1,7 +1,7 @@ # %CopyrightBegin% # -# Copyright Ericsson AB 2010-2017. All Rights Reserved. +# Copyright Ericsson AB 2010-2019. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ MODULES = \ diameter_codec_test \ diameter_config_SUITE \ diameter_compiler_SUITE \ + diameter_dist_SUITE \ diameter_distribution_SUITE \ diameter_dpr_SUITE \ diameter_event_SUITE \ -- cgit v1.2.3