From 9589d10c491f2660995a8424c21d6d72a198c315 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Tue, 15 Jan 2019 12:21:23 +0100 Subject: Fix comment typo --- lib/diameter/src/base/diameter_callback.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_callback.erl b/lib/diameter/src/base/diameter_callback.erl index d04a416bef..3bcf550cd8 100644 --- a/lib/diameter/src/base/diameter_callback.erl +++ b/lib/diameter/src/base/diameter_callback.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2017. All Rights Reserved. +%% Copyright Ericsson AB 2010-2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -70,7 +70,7 @@ -module(diameter_callback). -%% Default callbacks when no aleternate is specified. +%% Default callbacks when no alternate is specified. -export([peer_up/3, peer_down/3, pick_peer/4, -- cgit v1.2.3 From f1cdd72110184460f76630db79ce6fc0ead44ba6 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 18 Jan 2019 13:22:31 +0100 Subject: Tweak/document request handler callback The possibility of configuring an MFA as spawn_opt was added in commit fd285079, the callback being passed an arity-0 fun to be applied in an appropriate handler process. Replace the fun by a tuple to be passed to diameter_traffic:request/1, to avoid passing funs between nodes when handler processes are remote. A list-valued spawn_opt is now equivalent to the following configured as {spawn_opt, {Mod, spawn_local, [Opts]}}. spawn_local(ReqT, Opts) -> spawn_opt(diameter_traffic, request, [ReqT], Opts). ReqT is passed by diameter and contains information that the callback may want to decide where to handle the request in question (which wasn't accessible with a fun), but this information isn't exposed in a documented way. The intention is instead to add an own callback implementation to make use of the information. Note that application lookup now takes place in the watchdog process in both the list-valued (or no configuration) and mfa-valued cases. Whether this is good, bad, or (probably) inconsequential remains to be seen. --- lib/diameter/src/base/diameter.erl | 4 +- lib/diameter/src/base/diameter_traffic.erl | 86 ++++++++++++++++++------------ 2 files changed, 54 insertions(+), 36 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index b90b794611..7f172e1fa1 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2017. All Rights Reserved. +%% Copyright Ericsson AB 2010-2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -365,7 +365,7 @@ call(SvcName, App, Message) -> | {connect_timer, 'Unsigned32'()} | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}} | {watchdog_config, [{okay|suspect, non_neg_integer()}]} - | {spawn_opt, list()}. + | {spawn_opt, list() | mfa()}. %% Options passed to start_service/2 diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index d2856ae530..e9acb5c0e8 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2013-2017. All Rights Reserved. +%% Copyright Ericsson AB 2013-2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -44,6 +44,7 @@ %% internal -export([send/1, %% send from remote node + request/1, %% process request in handler process init/1]). %% monitor process start -include_lib("diameter/include/diameter.hrl"). @@ -232,7 +233,7 @@ incr_rc(Dir, Pkt, TPid, MsgDict, AppDict, Dict0) -> -spec receive_message(pid(), Route, #diameter_packet{}, module(), RecvData) -> pid() %% request handler | boolean() %% answer, known request or not - | discard %% request discarded by MFA + | discard %% request discarded when Route :: {Handler, RequestRef, TPid} | Ack, RecvData :: {[SpawnOpt], #recvdata{}}, @@ -252,7 +253,8 @@ receive_message(TPid, Route, Pkt, Dict0, RecvData) -> recv(true, Ack, TPid, Pkt, Dict0, T) when is_boolean(Ack) -> {Opts, RecvData} = T, - spawn_request(Ack, TPid, Pkt, Dict0, RecvData, Opts); + AppT = find_app(TPid, Pkt, RecvData), + ack(Ack, TPid, spawn_request(AppT, Opts, Ack, TPid, Pkt, Dict0, RecvData)); %% ... answer to known request ... recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) -> @@ -274,58 +276,65 @@ recv(false, false, TPid, Pkt, _, _) -> incr(TPid, {{unknown, 0}, recv, discarded}), false. -%% spawn_request/6 +%% spawn_request/7 -%% An MFA should return a pid() or the atom 'discard'. The latter -%% results in an acknowledgment back to the transport process when -%% appropriate, to ensure that send/recv callbacks can count -%% outstanding requests. Acknowledgement is implicit if the +spawn_request(false, _, _, _, _, _, _) -> %% no transport + discard; + +%% An MFA should return the pid() of a process in which the argument +%% fun in applied, or the atom 'discard' if the fun is not applied. +%% The latter results in an acknowledgment back to the transport +%% process when appropriate, to ensure that send/recv callbacks can +%% count outstanding requests. Acknowledgement is implicit if the %% handler process dies (in a handle_request callback for example). -spawn_request(Ack, TPid, Pkt, Dict0, RecvData, {M,F,A}) -> - ReqF = fun() -> - ack(Ack, TPid, recv_request(Ack, TPid, Pkt, Dict0, RecvData)) - end, - ack(Ack, TPid, apply(M, F, [ReqF | A])); +spawn_request(AppT, {M,F,A}, Ack, TPid, Pkt, Dict0, RecvData) -> + %% Term to pass to request/1 in an appropriate process. + ReqT = {Pkt, AppT, Ack, TPid, Dict0, RecvData}, + apply(M, F, [ReqT | A]); %% A spawned process acks implicitly when it dies, so there's no need %% to handle 'discard'. -spawn_request(Ack, TPid, Pkt, Dict0, RecvData, Opts) -> +spawn_request(AppT, Opts, Ack, TPid, Pkt, Dict0, RecvData) -> spawn_opt(fun() -> - recv_request(Ack, TPid, Pkt, Dict0, RecvData) + recv_request(Ack, TPid, Pkt, Dict0, RecvData, AppT) end, Opts). +%% request/1 +%% +%% Called from a handler process chosen by a transport spawn_opt MFA +%% to process an incoming request. + +request({Pkt, AppT, Ack, TPid, Dict0, RecvData} = _ReqT) -> + ack(Ack, TPid, recv_request(Ack, TPid, Pkt, Dict0, RecvData, AppT)). + %% ack/3 ack(Ack, TPid, RC) -> - RC == discard andalso Ack andalso (TPid ! {send, false}), + RC == discard + andalso Ack + andalso (TPid ! {send, false}), RC. %% --------------------------------------------------------------------------- -%% recv_request/5 +%% recv_request/6 %% --------------------------------------------------------------------------- -spec recv_request(Ack :: boolean(), TPid :: pid(), #diameter_packet{}, Dict0 :: module(), - #recvdata{}) + #recvdata{}, + AppT :: {#diameter_app{}, #diameter_caps{}} + | #diameter_caps{}) %% no suitable app -> ok %% answer was sent - | discard %% or not - | false. %% no transport - -recv_request(Ack, - TPid, - #diameter_packet{header = #diameter_header{application_id = Id}} - = Pkt, - Dict0, - #recvdata{peerT = PeerT, - apps = Apps, - counters = Count} - = RecvData) -> + | discard. %% or not + +recv_request(Ack, TPid, Pkt, Dict0, RecvData, AppT) -> Ack andalso (TPid ! {handler, self()}), - case diameter_service:find_incoming_app(PeerT, TPid, Id, Apps) of + case AppT of {#diameter_app{id = Aid, dictionary = AppDict} = App, Caps} -> + Count = RecvData#recvdata.counters, Count andalso incr(recv, Pkt, TPid, AppDict), DecPkt = decode(Aid, AppDict, RecvData, Pkt), Count andalso incr_error(recv, DecPkt, TPid, AppDict), @@ -349,11 +358,20 @@ recv_request(Ack, Dict0, RecvData, DecPkt, - [[]]); - false = No -> %% transport has gone down - No + [[]]) end. +%% find_app/3 +%% +%% Lookup the application of a received Diameter request on the node +%% on which it's received. + +find_app(TPid, + #diameter_packet{header = #diameter_header{application_id = Id}}, + #recvdata{peerT = PeerT, + apps = Apps}) -> + diameter_service:find_incoming_app(PeerT, TPid, Id, Apps). + %% decode/4 decode(Id, Dict, #recvdata{codec = Opts}, Pkt) -> -- cgit v1.2.3 From d9d918b2e31daca8b3d904ffbd26a9e4207b166f Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 20 Feb 2019 01:42:17 +0100 Subject: Add diameter_dist for ready spawn_opt callbacks That is, of functions that can be configured as spawn_opt MFAs in transport configuration. This commits adds the spawn_local described in the parent commit, and a route_session that assumes that the local node initiates all sessions with Session-Id returned by diameter:session_id/1, and handles incoming requests on the node on which the id in question was returned, diameter:session_id/1 using node() as optional value in the Session-Id format. --- lib/diameter/src/base/diameter_dist.erl | 218 ++++++++++++++++++++++++++++ lib/diameter/src/base/diameter_misc_sup.erl | 3 +- lib/diameter/src/base/diameter_traffic.erl | 4 +- lib/diameter/src/modules.mk | 3 +- 4 files changed, 225 insertions(+), 3 deletions(-) create mode 100644 lib/diameter/src/base/diameter_dist.erl (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_dist.erl b/lib/diameter/src/base/diameter_dist.erl new file mode 100644 index 0000000000..cef9522c9d --- /dev/null +++ b/lib/diameter/src/base/diameter_dist.erl @@ -0,0 +1,218 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2019. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(diameter_dist). + +-behaviour(gen_server). + +%% +%% Implements callbacks that can be configured as a spawn_opt +%% transport configuration, to be able to distribute incoming Diameter +%% requests to handler processes (local or remote) in various ways. +%% + +%% spawn_opt callbacks; initial argument constructed in diameter_traffic +-export([spawn_local/2, + spawn_local/1, + route_session/2, + route_session/1]). + +-include_lib("diameter/include/diameter.hrl"). + +%% server start +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, + handle_info/2, + handle_cast/2, + handle_call/3, + code_change/3, + terminate/2]). + +-define(SERVER, ?MODULE). %% server monitoring node connections +-define(TABLE, ?MODULE). %% node() binary -> node() atom + +-define(B(A), atom_to_binary(A, utf8)). + +%% spawn_local/2 +%% +%% Callback that is equivalent to an options list. That is, the +%% following are equivalent when passed as options to +%% diameter:add_transport/2. +%% +%% {spawn_opt, Opts} +%% {spawn_opt, {diameter_dist, spawn_local, [Opts]}} + +spawn_local(ReqT, Opts) -> + spawn_opt(diameter_traffic, request, [ReqT], Opts). + +%% spawn_local/1 + +spawn_local(ReqT) -> + spawn_local(ReqT, []). + +%% route_session/2 +%% +%% Callback that routes requests containing Session-Id AVPs as +%% returned by diameter:session_id/0 back to the node on which the +%% function was called. This is only appropriate when sessions are +%% initiated by the own (typically client) node, and ids have been +%% returned from diameter:session_id/0. + +route_session(ReqT, Opts) -> + #diameter_packet{bin = Bin} = element(1, ReqT), + Node = node_of_session_id(Bin), + spawn_opt(Node, diameter_traffic, request, [ReqT], Opts). + +%% route_session/1 + +route_session(ReqT) -> + route_session(ReqT, []). + +%% node_of_session_id/1 +%% +%% Return the node name encoded as optional value in a Session-Id, +%% assuming the id has been created with diameter:session_id/0. +%% +%% node() is returned if a node name can't be extracted for any +%% reason. + +node_of_session_id(<<_Head:20/binary, Avps/binary>>) -> + sid_node(Avps); + +node_of_session_id(_) -> + node(). + +%% sid_node/1 + +%% Session-Id = Command Code 263, V-bit = 0. +sid_node(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin) -> + case Bin of + <> -> + <<_:8/binary, Sid/binary>> = Avp, + sid_node(Sid, pattern(), 2); %% look for the optional value + _ -> + node() + end; + +%% Jump to the next AVP. This is potentially costly for a message with +%% many AVPs and no Session-Id, which an attacker is prone to send. +%% 8.8 or RFC 6733 says that Session-Id SHOULD (but not MUST) appear +%% immediately following the Diameter Header, so there is no +%% guarantee. +sid_node(<<_:40, Len:24, _/binary>> = Bin) -> + Pad = (4 - (Len rem 4)) rem 4, + case Bin of + <<_:Len/binary, _:Pad/binary, Rest/binary>> -> + sid_node(Rest); + _ -> + node() + end. + +%% sid_node/2 + +%% Lookup the node name to ensure we don't convert arbitrary binaries +%% to atom. +sid_node(Bin, _, 0) -> + case ets:lookup(?TABLE, Bin) of + [{_, Node}] -> + Node; + [] -> + node() + end; + +%% The optional value (if any) of a Session-Id follows the third +%% semicolon. Searching with binary:match/2 does better than matching, +%% especially when the pattern is compiled. +sid_node(Bin, CP, N) -> + case binary:match(Bin, CP) of + {Offset, 1} -> + <<_:Offset/binary, _, Rest/binary>> = Bin, + sid_node(Rest, CP, N-1); + nomatch -> + node() + end. + +%% pattern/0 +%% +%% Since this is being called in a watchdog process, compile the +%% pattern once and maintain it in the process dictionary. + +pattern() -> + case get(?MODULE) of + undefined -> + CP = binary:compile_pattern(<<$;>>), %% tuple + put(?MODULE, CP), + CP; + CP -> + CP + end. + +%% =========================================================================== + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, _Args = [], _Opts = []). + +%% init/1 +%% +%% Maintain [node() | nodes()] in a table that maps from binary-valued +%% names, so we can lookup the corresponding atoms rather than convert +%% binaries that aren't necessarily node names. + +init([]) -> + ets:new(?TABLE, [set, named_table]), + ok = net_kernel:monitor_nodes(true, [{node_type, all}, nodedown_reason]), + ets:insert(?TABLE, [{B,N} || N <- [node() | nodes()], + B <- [?B(N)]]), + {ok, erlang:monotonic_time()}. + +%% handle_call/3 + +handle_call(_, _From, S) -> + {reply, nok, S}. + +%% handle_cast/2 + +handle_cast(_, S) -> + {noreply, S}. + +%% handle_info/2 + +handle_info({nodeup, Node, _}, S) -> + ets:insert(?TABLE, {?B(Node), Node}), + {noreply, S}; + +handle_info({nodedown, Node, _}, S) -> + ets:delete(?TABLE, ?B(Node)), + {noreply, S}; + +handle_info(_, S) -> + {noreply, S}. + +%% terminate/2 + +terminate(_, _) -> + ok. + +%% code_change/3 + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/lib/diameter/src/base/diameter_misc_sup.erl b/lib/diameter/src/base/diameter_misc_sup.erl index 343688be23..fec5a41b5c 100644 --- a/lib/diameter/src/base/diameter_misc_sup.erl +++ b/lib/diameter/src/base/diameter_misc_sup.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ diameter_stats, %% statistics counter management diameter_reg, %% service/property publishing diameter_peer, %% remote peer manager + diameter_dist, %% request distribution diameter_config]). %% configuration/restart %% start_link/0 diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index e9acb5c0e8..b1b797aad8 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -288,7 +288,9 @@ spawn_request(false, _, _, _, _, _, _) -> %% no transport %% count outstanding requests. Acknowledgement is implicit if the %% handler process dies (in a handle_request callback for example). spawn_request(AppT, {M,F,A}, Ack, TPid, Pkt, Dict0, RecvData) -> - %% Term to pass to request/1 in an appropriate process. + %% Term to pass to request/1 in an appropriate process. Module + %% diameter_dist implements callbacks, and uses the form of the + %% argument tuple constructed below. ReqT = {Pkt, AppT, Ack, TPid, Dict0, RecvData}, apply(M, F, [ReqT | A]); diff --git a/lib/diameter/src/modules.mk b/lib/diameter/src/modules.mk index bb86de016a..d16292bb88 100644 --- a/lib/diameter/src/modules.mk +++ b/lib/diameter/src/modules.mk @@ -1,7 +1,7 @@ # %CopyrightBegin% # -# Copyright Ericsson AB 2010-2017. All Rights Reserved. +# Copyright Ericsson AB 2010-2019. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ RT_MODULES = \ base/diameter_config \ base/diameter_config_sup \ base/diameter_codec \ + base/diameter_dist \ base/diameter_gen \ base/diameter_lib \ base/diameter_misc_sup \ -- cgit v1.2.3 From 734a7daf2e556d684850a3cb278684ba522a29de Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 4 Mar 2019 17:31:13 +0100 Subject: Add options to diameter_dist:route_session/2 node selection To be able to restrict how many AVPs will be examined (from the front of a message) when looking for Session-Id, and to decide what to do with if the AVP isn't found. Options are specified as a map of the following form. #{search => non_neg_integer(), default => discard | mfa(), dispatch => list() | mfa()} The search member says how many AVPs to examine at most, from the front of the message. If the optional value of a Session-Id is not the name of a connected node then the default member determines what to do with the request, handle it locally (the default), discard it, or invoke an MFA on the Session-Id | false (if none was found) and diameter_packet record to return a node() | false; if the latter then the request is discarded. If a node is identified then the dispatch MFA is invoked on the node and the request MFA (as three arguments), a list Opts being equivalent to the MFA {erlang, spawn_opt, [Opts]}, and the default being the empty list. Integer- or list-valued options are equivalent to the corresponding map with a single value. Limiting the search is to avoid searching messages containing many AVPs for a Session-Id that is known to occur near the header, since section 8.8 of RFC 6733 says this: When present, the Session-Id SHOULD appear immediately following the Diameter header (see Section 3). There's no guarantee, but in practice it may well be known that peers are respecting the RFC, and in that case limiting the search is a defense against searching messages from a malicious peer unnecessarily. The search is unlimited by default. A default is only used when a search fails to locate a Session-Id, and can be to discard the message, or have a node() or false be returned from an MFA applied to the diameter_packet in question. The local node is chosen by default. --- lib/diameter/src/base/diameter_dist.erl | 174 ++++++++++++++++++++++++-------- 1 file changed, 134 insertions(+), 40 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_dist.erl b/lib/diameter/src/base/diameter_dist.erl index cef9522c9d..ed2859e914 100644 --- a/lib/diameter/src/base/diameter_dist.erl +++ b/lib/diameter/src/base/diameter_dist.erl @@ -47,6 +47,8 @@ code_change/3, terminate/2]). +-type request() :: tuple(). %% callback argument from diameter_traffic + -define(SERVER, ?MODULE). %% server monitoring node connections -define(TABLE, ?MODULE). %% node() binary -> node() atom @@ -61,6 +63,9 @@ %% {spawn_opt, Opts} %% {spawn_opt, {diameter_dist, spawn_local, [Opts]}} +-spec spawn_local(ReqT :: request(), Opts :: list()) + -> pid(). + spawn_local(ReqT, Opts) -> spawn_opt(diameter_traffic, request, [ReqT], Opts). @@ -74,12 +79,48 @@ spawn_local(ReqT) -> %% Callback that routes requests containing Session-Id AVPs as %% returned by diameter:session_id/0 back to the node on which the %% function was called. This is only appropriate when sessions are -%% initiated by the own (typically client) node, and ids have been -%% returned from diameter:session_id/0. +%% only initiated by the own (typically client) node, and ids have +%% been returned from diameter:session_id/0. +%% +%% This can be used with #{search => 0} to route on something other +%% than Session-Id since default can be an MFA returning a node() +%% (applied to the incoming diameter_packet record) and dispatch can +%% be an MFA returning a pid() (applied to Node and the request MFA), +%% but this is no simpler than just implementing an own spawn_opt +%% callback. (Except with the default dispatch possibly.) + +-spec route_session(ReqT :: request(), Opts) + -> discard + | pid() + when Opts :: pos_integer() %% aka #{search => N} + | list() %% aka #{dispatch => Opts} + | #{search => non_neg_integer(), %% limit number of examined AVPs + default => discard | mfa(), %% return node() | false + dispatch => list() | mfa()}. %% spawn options or return pid() route_session(ReqT, Opts) -> - #diameter_packet{bin = Bin} = element(1, ReqT), - Node = node_of_session_id(Bin), + #diameter_packet{bin = Bin} = Pkt = element(1, ReqT), + Sid = session_id(avps(Bin), search(Opts)), + Node = default(node_of_session_id(Sid), Sid, Opts, Pkt), + dispatch(Node, ReqT, dispatch(Opts)). + +%% avps/1 + +avps(<<_:20/binary, Bin/binary>>) -> + Bin; + +avps(_) -> + false. + +%% dispatch/3 + +dispatch(false, _, _) -> + discard; + +dispatch(Node, ReqT, {M,F,A}) -> + apply(M, F, [Node, diameter_traffic, request, [ReqT] | A]); + +dispatch(Node, ReqT, Opts) -> spawn_opt(Node, diameter_traffic, request, [ReqT], Opts). %% route_session/1 @@ -90,27 +131,34 @@ route_session(ReqT) -> %% node_of_session_id/1 %% %% Return the node name encoded as optional value in a Session-Id, -%% assuming the id has been created with diameter:session_id/0. -%% -%% node() is returned if a node name can't be extracted for any -%% reason. +%% assuming the id has been created with diameter:session_id/0. Lookup +%% the node name to ensure we don't convert arbitrary binaries to +%% atom. -node_of_session_id(<<_Head:20/binary, Avps/binary>>) -> - sid_node(Avps); +node_of_session_id([_, _, _, Bin]) -> + case ets:lookup(?TABLE, Bin) of + [{_, Node}] -> + Node; + [] -> + false + end; node_of_session_id(_) -> - node(). + false. + +%% session_id/2 -%% sid_node/1 +session_id(_, 0) -> %% give up + false; %% Session-Id = Command Code 263, V-bit = 0. -sid_node(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin) -> +session_id(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin, _) -> case Bin of <> -> <<_:8/binary, Sid/binary>> = Avp, - sid_node(Sid, pattern(), 2); %% look for the optional value + split(Sid); _ -> - node() + false end; %% Jump to the next AVP. This is potentially costly for a message with @@ -118,38 +166,41 @@ sid_node(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin) -> %% 8.8 or RFC 6733 says that Session-Id SHOULD (but not MUST) appear %% immediately following the Diameter Header, so there is no %% guarantee. -sid_node(<<_:40, Len:24, _/binary>> = Bin) -> +session_id(<<_:40, Len:24, _/binary>> = Bin, N) -> Pad = (4 - (Len rem 4)) rem 4, case Bin of <<_:Len/binary, _:Pad/binary, Rest/binary>> -> - sid_node(Rest); + session_id(Rest, if N == infinity -> N; true -> N-1 end); _ -> - node() - end. + false + end; -%% sid_node/2 +session_id(_, _) -> + false. -%% Lookup the node name to ensure we don't convert arbitrary binaries -%% to atom. -sid_node(Bin, _, 0) -> - case ets:lookup(?TABLE, Bin) of - [{_, Node}] -> - Node; - [] -> - node() - end; +%% split/1 +%% +%% Split a Session-Id at no more than three semicolons: the optional +%% value (if any) follows the third. binary:split/2 does better than +%% matching character by character, especially when the pattern is +%% compiled. -%% The optional value (if any) of a Session-Id follows the third -%% semicolon. Searching with binary:match/2 does better than matching, -%% especially when the pattern is compiled. -sid_node(Bin, CP, N) -> - case binary:match(Bin, CP) of - {Offset, 1} -> - <<_:Offset/binary, _, Rest/binary>> = Bin, - sid_node(Rest, CP, N-1); - nomatch -> - node() - end. +split(Bin) -> + split(3, Bin, pattern()). + +%% split/3 + +split(0, Bin, _) -> + [Bin]; + +split(N, Bin, Pattern) -> + [H|T] = binary:split(Bin, Pattern), + [H | case T of + [] -> + T; + [Rest] -> + split(N-1, Rest, Pattern) + end]. %% pattern/0 %% @@ -166,6 +217,49 @@ pattern() -> CP end. +%% dispatch/1 + +dispatch(#{} = Opts) -> + maps:get(dispatch, Opts, []); + +dispatch(Opts) + when is_list(Opts) -> + Opts; + +dispatch(_) -> + []. + +%% search/1 +%% +%% Bound number of AVPs examined when looking for Session-Id. + +search(#{search := N}) + when is_integer(N), 0 =< N -> + N; + +search(N) + when is_integer(N), 0 =< N -> + N; + +search(_) -> + infinity. + +%% default/3 +%% +%% Choose a node when Session-Id lookup has failed. + +default(false = No, _, #{default := discard}, _) -> + No; + +default(false, Sid, #{default := {M,F,A}}, Pkt) -> + apply(M, F, [Sid, Pkt | A]); %% false | node() + +default(false, _, _, _) -> + node(); + +default(Node, _, _, _) -> + Node. + %% =========================================================================== start_link() -> -- cgit v1.2.3 From 376e8fac401bd11b3cb3d2f9661eb7ff0b9bbcd7 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 6 Mar 2019 17:13:07 +0100 Subject: Add consistent hashing to diameter_dist:route_session/2 If the Session-Id optional value to node() mapping fails then hash Session-Id to a node by default, instead of selecting the local node as in the parent commit. The previous behaviour is configurable by setting default = local in an options map. Nodes make themselves part of the pool from which nodes are selected by calling diameter_dist:attach/1 with the list of service names they are willing to handle requests for, the local node being selected in the absence of any attached nodes. The original idea was to base the node pool on share_peers and/or use_shared_peers configuration, but that configuration determines where outgoing requests can be sent, while route_session/2 deals with incoming requests, so it's not obvious that conflating the two is a good thing. (Also because share_peers/use_shared_peers can be used in different ways; the former could have been skipped entirely.) The hashing effectively places nodes on a circle, a hashed Session-Id being mapped to the nearest predecessor node (clockwise). Nodes are rehashed with each Session-Id (with the id as salt) for a more even distribution, at the cost of performance, although how high the cost or how even the distribution has yet to be tested. Obviously, the larger the number of attached nodes, the higher the cost. Adding/removing an attached node only affects session ids that hash in the interval between the added/removed node and its successor (hence consistent hashing). Options are tweaked slightly compared to the parent commit, and it is now possible to restrict the optional value mapping to specific Diameter identities, to avoid mapping an id that was generated at the peer when the peer is also implemented with the diameter application. Note that diameter_dist is not yet an officially documented interface, so could change. Documentation is in the module itself. --- lib/diameter/src/base/diameter_dist.erl | 293 +++++++++++++++++++++++++---- lib/diameter/src/base/diameter_traffic.erl | 13 +- 2 files changed, 264 insertions(+), 42 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_dist.erl b/lib/diameter/src/base/diameter_dist.erl index ed2859e914..5c29ea95a4 100644 --- a/lib/diameter/src/base/diameter_dist.erl +++ b/lib/diameter/src/base/diameter_dist.erl @@ -28,12 +28,20 @@ %% requests to handler processes (local or remote) in various ways. %% -%% spawn_opt callbacks; initial argument constructed in diameter_traffic +%% spawn_opt callbacks -export([spawn_local/2, spawn_local/1, route_session/2, route_session/1]). +%% signal availability for handling incoming requests to route_sesssion/2 +-export([attach/1, + detach/1]). + +%% consistent hashing +-export([hash/3, %% for use as default MFA in route_session/2 options map + hash/2]). %% arbitrary key/values + -include_lib("diameter/include/diameter.hrl"). %% server start @@ -50,9 +58,21 @@ -type request() :: tuple(). %% callback argument from diameter_traffic -define(SERVER, ?MODULE). %% server monitoring node connections --define(TABLE, ?MODULE). %% node() binary -> node() atom + +%% Maps a node name binary to the corresponding atom. Used by +%% route_session/2 to map the optional value of a Session-Id to +%% node(). +-define(NODE_TABLE, diameter_dist_node). + +%% Maps a diameter:service_name() to a node() that has called attach/1 +%% to declare its willingness to handle incoming requests for the +%% service. Use by route_session/2 in case the optional value mapping +%% has failed. +-define(SERVICE_TABLE, diameter_dist_service). -define(B(A), atom_to_binary(A, utf8)). +-define(ORCOND(List), list_to_tuple(['orelse', false | List])). +-define(HASH(T), erlang:phash2(T, 16#100000000)). %% spawn_local/2 %% @@ -76,32 +96,82 @@ spawn_local(ReqT) -> %% route_session/2 %% -%% Callback that routes requests containing Session-Id AVPs as -%% returned by diameter:session_id/0 back to the node on which the -%% function was called. This is only appropriate when sessions are -%% only initiated by the own (typically client) node, and ids have -%% been returned from diameter:session_id/0. +%% Callback that maps the Session-Id of an incoming request to a +%% handler node. %% -%% This can be used with #{search => 0} to route on something other -%% than Session-Id since default can be an MFA returning a node() -%% (applied to the incoming diameter_packet record) and dispatch can -%% be an MFA returning a pid() (applied to Node and the request MFA), -%% but this is no simpler than just implementing an own spawn_opt -%% callback. (Except with the default dispatch possibly.) - +%% With an options list, maps an id whose optional value is the name +%% of a connected node to the same node, to handle the case that the +%% session id has been returned from diameter:session_id/1; otherwise +%% to a node that has called diameter_dist:attach/1 using the +%% consistent hashing provided by hash/3, or to the local node() if a +%% session id could not be extracted or there are no attached nodes. A +%% handler process is spawned on the selected node using +%% erlang:spawn_opt/4. +%% +%% Different behaviour can be configured by supplying an options map +%% of the following form: +%% +%% #{search => non_neg_integer(), +%% id => [binary()], +%% default => discard | local | mfa(), +%% dispatch => list() | mfa()} +%% +%% The search member limits the number of AVPs that are examined in +%% the message (from the front), to avoid searching entire message in +%% case it's known that peers follow RFC 6733's recommendation that +%% Session-Id be placed at the head of a message. The default is to +%% search the entire message. +%% +%% The id member restricts the optional value mapping to session ids +%% whose DiamterIdentity is one of those specified. Set this to the +%% list of Diameter identities advertised by the service in question +%% (typically one) to ensure that only locally generated session ids +%% are mapped; or to the empty list to disable the mapping. +%% +%% The default member determines where to handle a message whose +%% Session-Id isn't found or whose optional value isn't mapped to the +%% name of a connected node. The atom local says the local node, an +%% MFA is invoked on Session-Id | false, the name of the diameter +%% service, and the message binary, and should return either a node() +%% or false to discard the message. Defaults to {diameter_dist, hash, []}. +%% +%% The dispatch member determines how the pid() of the request handler +%% process is retrieved. An MFA is applied to a previously selected +%% node(), and the module, function, and arguments list to apply in +%% the handler process to handle the request, the MFA being supplied +%% by diameter, and returns pid() | discard. A list is equivalent to +%% {erlang, spawn_opt, []}. Defaults to []. +%% +%% This can be used with search = 0 to route on something other than +%% Session-Id, but this is probably no simpler than just implementing +%% an own spawn_opt callback. (Except with the default dispatch possibly.) +%% +%% Note that if the peer is also implemented with OTP diameter and +%% generating session ids with diameter:session_id/1 then +%% route_session/2 can map an optional value to a local node that +%% happens to have the same name as one of the peer's nodes. This +%% could lead to an uneven distribution; for example, if the peer +%% nodes are a subset of the local nodes. In practice, it's typically +%% known if it's peers or the local node originating sessions; if the +%% former then setting id = [] disables the optional value mapping, if +%% the latter then setting default = local disables the hashing. -spec route_session(ReqT :: request(), Opts) -> discard | pid() when Opts :: pos_integer() %% aka #{search => N} | list() %% aka #{dispatch => Opts} | #{search => non_neg_integer(), %% limit number of examined AVPs - default => discard | mfa(), %% return node() | false - dispatch => list() | mfa()}. %% spawn options or return pid() + id => [binary()], %% restrict optional value map on DiamIdent + default => local %% handle locally + | discard + | mfa(), %% return node() | false + dispatch => list() %% spawn options + | mfa()}. %% (Node, M, F, A) -> pid() | discard route_session(ReqT, Opts) -> - #diameter_packet{bin = Bin} = Pkt = element(1, ReqT), + {_, Bin} = Info = diameter_traffic:request_info(ReqT), Sid = session_id(avps(Bin), search(Opts)), - Node = default(node_of_session_id(Sid), Sid, Opts, Pkt), + Node = default(node_of_session_id(Sid, Opts), Sid, Opts, Info), dispatch(Node, ReqT, dispatch(Opts)). %% avps/1 @@ -128,24 +198,31 @@ dispatch(Node, ReqT, Opts) -> route_session(ReqT) -> route_session(ReqT, []). -%% node_of_session_id/1 +%% node_of_session_id/2 %% %% Return the node name encoded as optional value in a Session-Id, %% assuming the id has been created with diameter:session_id/0. Lookup %% the node name to ensure we don't convert arbitrary binaries to %% atom. -node_of_session_id([_, _, _, Bin]) -> - case ets:lookup(?TABLE, Bin) of - [{_, Node}] -> - Node; - [] -> - false - end; +node_of_session_id([Id, _, _, Bin], #{id := Ids}) -> + lists:member(Id, Ids) andalso nodemap(Bin); -node_of_session_id(_) -> +node_of_session_id([_, _, _, Bin], _) -> + nodemap(Bin); + +node_of_session_id(_, _) -> false. +%% nodemap/1 + +nodemap(Bin) -> + try + ets:lookup_element(?NODE_TABLE, Bin, 2) + catch + error: badarg -> false + end. + %% session_id/2 session_id(_, 0) -> %% give up @@ -154,7 +231,7 @@ session_id(_, 0) -> %% give up %% Session-Id = Command Code 263, V-bit = 0. session_id(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin, _) -> case Bin of - <> -> + <> -> <<_:8/binary, Sid/binary>> = Avp, split(Sid); _ -> @@ -248,20 +325,123 @@ search(_) -> %% %% Choose a node when Session-Id lookup has failed. -default(false = No, _, #{default := discard}, _) -> - No; - -default(false, Sid, #{default := {M,F,A}}, Pkt) -> - apply(M, F, [Sid, Pkt | A]); %% false | node() +default(false, _, #{default := discard}, _) -> + false; -default(false, _, _, _) -> +default(false, _, #{default := local}, _) -> node(); +default(false, Sid, #{default := {M,F,A}}, Info) -> + {ServiceName, Bin} = Info, + apply(M, F, [Sid, ServiceName, Bin | A]); %% node() | false + +default(false, Sid, _, Info) -> %% aka {?MODULE, hash, []} + {ServiceName, Bin} = Info, + hash(Sid, ServiceName, Bin); + default(Node, _, _, _) -> Node. %% =========================================================================== +%% hash/3 +%% +%% Consistent hashing of Session-Id to an attached node, or the local +%% node if Session-Id = false or no attached nodes. + +hash(Sid, ServiceName, _) -> + case false /= Sid andalso attached(ServiceName) of + [_|_] = Nodes -> + hash(Sid, Nodes); + _ -> + node() + end. + +%% hash/2 +%% +%% Consistent hashing on arbitrary key/values. Returns false if the +%% list is empty. + +%% No key or no values. +hash(_, []) -> + false; + +%% Not much choice. +hash(_, [Value]) -> + Value; + +%% Hash on a circle and choose the closest predecessor. +hash(Key, Values) -> + Hash = ?HASH(Key), + tl(lists:foldl(fun(V,A) -> + choose(Hash, [?HASH({Key, V}) | V], A) + end, + false, %% < list() + Values)). + +%% choose/3 + +choose(Hash, [Hash1 | _] = T, [Hash2 | _]) + when Hash1 =< Hash, Hash < Hash2 -> + T; + +choose(Hash, [Hash1 | _], [Hash2 | _] = T) + when Hash2 =< Hash, Hash < Hash1 -> + T; + +choose(_, T1, T2) -> + max(T1, T2). + +%% =========================================================================== + +%% attach/1 +%% +%% Register the local node as a handler of incoming requests for the +%% specified services when using the route_session/2 spawn_opt +%% callback. + +attach(ServiceNames) -> + abcast({attach, node(), ServiceNames}). + +%% detach/1 +%% +%% Deregister the local node as a handler of incoming requests. + +detach(ServiceNames) -> + abcast({detach, node(), ServiceNames}). + +%% abcast/1 + +abcast(T) -> + gen_server:abcast([node() | nodes()], ?SERVER, T), + ok. + +%% attached/1 + +attached(ServiceName) -> + try + ets:lookup_element(?SERVICE_TABLE, ServiceName, 2) + catch + error: badarg -> [] + end. + +%% cast/2 + +cast(Node, T) -> + gen_server:cast({?SERVER, Node}, T). + +%% attach/2 + +attach(Node, S) -> + case sets:to_list(S) of + [] -> + ok; + Services -> + cast(Node, {attach, node(), Services}) + end. + +%% =========================================================================== + start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, _Args = [], _Opts = []). @@ -272,11 +452,12 @@ start_link() -> %% binaries that aren't necessarily node names. init([]) -> - ets:new(?TABLE, [set, named_table]), + ets:new(?NODE_TABLE, [set, named_table]), + ets:new(?SERVICE_TABLE, [bag, named_table]), ok = net_kernel:monitor_nodes(true, [{node_type, all}, nodedown_reason]), - ets:insert(?TABLE, [{B,N} || N <- [node() | nodes()], - B <- [?B(N)]]), - {ok, erlang:monotonic_time()}. + ets:insert(?NODE_TABLE, [{?B(N), N} || N <- [node() | nodes()]]), + abcast({attach, node()}), + {ok, sets:new()}. %% handle_call/3 @@ -285,17 +466,49 @@ handle_call(_, _From, S) -> %% handle_cast/2 +%% Remote node is asking which services the local node wants to handle. +handle_cast({attach, Node}, S) + when Node /= node() -> + attach(Node, S), + {noreply, S}; + +%% Node wants to handle incoming requests ... +handle_cast({attach, Node, ServiceNames}, S) -> + ets:insert(?SERVICE_TABLE, [{N, Node} || N <- ServiceNames]), + {noreply, case node() of + Node -> + sets:union(S, sets:from_list(ServiceNames)); + _ -> + S + end}; + +%% ... or not. +handle_cast({detach, Node, ServiceNames}, S) -> + ets:select_delete(?SERVICE_TABLE, [{{'$1', Node}, + [?ORCOND([{'==', '$1', {const, N}} + || N <- ServiceNames])], + [true]}]), + {noreply, case node() of + Node -> + sets:subtract(S, sets:from_list(ServiceNames)); + _ -> + S + end}; + handle_cast(_, S) -> {noreply, S}. %% handle_info/2 handle_info({nodeup, Node, _}, S) -> - ets:insert(?TABLE, {?B(Node), Node}), + ets:insert(?NODE_TABLE, {?B(Node), Node}), + cast(Node, {attach, node()}), %% ask which services remote node handles + attach(Node, S), %% say which service local node handles {noreply, S}; handle_info({nodedown, Node, _}, S) -> - ets:delete(?TABLE, ?B(Node)), + ets:delete(?NODE_TABLE, ?B(Node)), + ets:select_delete(?SERVICE_TABLE, [{{'_', Node}, [], [true]}]), {noreply, S}; handle_info(_, S) -> diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index b1b797aad8..c0643402a6 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -42,6 +42,9 @@ peer_up/1, peer_down/1]). +%% towards diameter_dist +-export([request_info/1]). + %% internal -export([send/1, %% send from remote node request/1, %% process request in handler process @@ -289,8 +292,7 @@ spawn_request(false, _, _, _, _, _, _) -> %% no transport %% handler process dies (in a handle_request callback for example). spawn_request(AppT, {M,F,A}, Ack, TPid, Pkt, Dict0, RecvData) -> %% Term to pass to request/1 in an appropriate process. Module - %% diameter_dist implements callbacks, and uses the form of the - %% argument tuple constructed below. + %% diameter_dist implements callbacks. ReqT = {Pkt, AppT, Ack, TPid, Dict0, RecvData}, apply(M, F, [ReqT | A]); @@ -302,6 +304,13 @@ spawn_request(AppT, Opts, Ack, TPid, Pkt, Dict0, RecvData) -> end, Opts). +%% request_info/1 +%% +%% Limited request information for diameter_dist. + +request_info({Pkt, _AppT, _Ack, _TPid, _Dict0, RecvData} = _ReqT) -> + {RecvData#recvdata.service_name, Pkt#diameter_packet.bin}. + %% request/1 %% %% Called from a handler process chosen by a transport spawn_opt MFA -- cgit v1.2.3