aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2019-03-06 17:13:07 +0100
committerAnders Svensson <[email protected]>2019-03-06 17:42:44 +0100
commit376e8fac401bd11b3cb3d2f9661eb7ff0b9bbcd7 (patch)
treefc9937d824a9e24e4724119ed12c62a97c012369 /lib
parent734a7daf2e556d684850a3cb278684ba522a29de (diff)
downloadotp-376e8fac401bd11b3cb3d2f9661eb7ff0b9bbcd7.tar.gz
otp-376e8fac401bd11b3cb3d2f9661eb7ff0b9bbcd7.tar.bz2
otp-376e8fac401bd11b3cb3d2f9661eb7ff0b9bbcd7.zip
Add consistent hashing to diameter_dist:route_session/2
If the Session-Id optional value to node() mapping fails then hash Session-Id to a node by default, instead of selecting the local node as in the parent commit. The previous behaviour is configurable by setting default = local in an options map. Nodes make themselves part of the pool from which nodes are selected by calling diameter_dist:attach/1 with the list of service names they are willing to handle requests for, the local node being selected in the absence of any attached nodes. The original idea was to base the node pool on share_peers and/or use_shared_peers configuration, but that configuration determines where outgoing requests can be sent, while route_session/2 deals with incoming requests, so it's not obvious that conflating the two is a good thing. (Also because share_peers/use_shared_peers can be used in different ways; the former could have been skipped entirely.) The hashing effectively places nodes on a circle, a hashed Session-Id being mapped to the nearest predecessor node (clockwise). Nodes are rehashed with each Session-Id (with the id as salt) for a more even distribution, at the cost of performance, although how high the cost or how even the distribution has yet to be tested. Obviously, the larger the number of attached nodes, the higher the cost. Adding/removing an attached node only affects session ids that hash in the interval between the added/removed node and its successor (hence consistent hashing). Options are tweaked slightly compared to the parent commit, and it is now possible to restrict the optional value mapping to specific Diameter identities, to avoid mapping an id that was generated at the peer when the peer is also implemented with the diameter application. Note that diameter_dist is not yet an officially documented interface, so could change. Documentation is in the module itself.
Diffstat (limited to 'lib')
-rw-r--r--lib/diameter/src/base/diameter_dist.erl293
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl13
2 files changed, 264 insertions, 42 deletions
diff --git a/lib/diameter/src/base/diameter_dist.erl b/lib/diameter/src/base/diameter_dist.erl
index ed2859e914..5c29ea95a4 100644
--- a/lib/diameter/src/base/diameter_dist.erl
+++ b/lib/diameter/src/base/diameter_dist.erl
@@ -28,12 +28,20 @@
%% requests to handler processes (local or remote) in various ways.
%%
-%% spawn_opt callbacks; initial argument constructed in diameter_traffic
+%% spawn_opt callbacks
-export([spawn_local/2,
spawn_local/1,
route_session/2,
route_session/1]).
+%% signal availability for handling incoming requests to route_sesssion/2
+-export([attach/1,
+ detach/1]).
+
+%% consistent hashing
+-export([hash/3, %% for use as default MFA in route_session/2 options map
+ hash/2]). %% arbitrary key/values
+
-include_lib("diameter/include/diameter.hrl").
%% server start
@@ -50,9 +58,21 @@
-type request() :: tuple(). %% callback argument from diameter_traffic
-define(SERVER, ?MODULE). %% server monitoring node connections
--define(TABLE, ?MODULE). %% node() binary -> node() atom
+
+%% Maps a node name binary to the corresponding atom. Used by
+%% route_session/2 to map the optional value of a Session-Id to
+%% node().
+-define(NODE_TABLE, diameter_dist_node).
+
+%% Maps a diameter:service_name() to a node() that has called attach/1
+%% to declare its willingness to handle incoming requests for the
+%% service. Use by route_session/2 in case the optional value mapping
+%% has failed.
+-define(SERVICE_TABLE, diameter_dist_service).
-define(B(A), atom_to_binary(A, utf8)).
+-define(ORCOND(List), list_to_tuple(['orelse', false | List])).
+-define(HASH(T), erlang:phash2(T, 16#100000000)).
%% spawn_local/2
%%
@@ -76,32 +96,82 @@ spawn_local(ReqT) ->
%% route_session/2
%%
-%% Callback that routes requests containing Session-Id AVPs as
-%% returned by diameter:session_id/0 back to the node on which the
-%% function was called. This is only appropriate when sessions are
-%% only initiated by the own (typically client) node, and ids have
-%% been returned from diameter:session_id/0.
+%% Callback that maps the Session-Id of an incoming request to a
+%% handler node.
%%
-%% This can be used with #{search => 0} to route on something other
-%% than Session-Id since default can be an MFA returning a node()
-%% (applied to the incoming diameter_packet record) and dispatch can
-%% be an MFA returning a pid() (applied to Node and the request MFA),
-%% but this is no simpler than just implementing an own spawn_opt
-%% callback. (Except with the default dispatch possibly.)
-
+%% With an options list, maps an id whose optional value is the name
+%% of a connected node to the same node, to handle the case that the
+%% session id has been returned from diameter:session_id/1; otherwise
+%% to a node that has called diameter_dist:attach/1 using the
+%% consistent hashing provided by hash/3, or to the local node() if a
+%% session id could not be extracted or there are no attached nodes. A
+%% handler process is spawned on the selected node using
+%% erlang:spawn_opt/4.
+%%
+%% Different behaviour can be configured by supplying an options map
+%% of the following form:
+%%
+%% #{search => non_neg_integer(),
+%% id => [binary()],
+%% default => discard | local | mfa(),
+%% dispatch => list() | mfa()}
+%%
+%% The search member limits the number of AVPs that are examined in
+%% the message (from the front), to avoid searching entire message in
+%% case it's known that peers follow RFC 6733's recommendation that
+%% Session-Id be placed at the head of a message. The default is to
+%% search the entire message.
+%%
+%% The id member restricts the optional value mapping to session ids
+%% whose DiamterIdentity is one of those specified. Set this to the
+%% list of Diameter identities advertised by the service in question
+%% (typically one) to ensure that only locally generated session ids
+%% are mapped; or to the empty list to disable the mapping.
+%%
+%% The default member determines where to handle a message whose
+%% Session-Id isn't found or whose optional value isn't mapped to the
+%% name of a connected node. The atom local says the local node, an
+%% MFA is invoked on Session-Id | false, the name of the diameter
+%% service, and the message binary, and should return either a node()
+%% or false to discard the message. Defaults to {diameter_dist, hash, []}.
+%%
+%% The dispatch member determines how the pid() of the request handler
+%% process is retrieved. An MFA is applied to a previously selected
+%% node(), and the module, function, and arguments list to apply in
+%% the handler process to handle the request, the MFA being supplied
+%% by diameter, and returns pid() | discard. A list is equivalent to
+%% {erlang, spawn_opt, []}. Defaults to [].
+%%
+%% This can be used with search = 0 to route on something other than
+%% Session-Id, but this is probably no simpler than just implementing
+%% an own spawn_opt callback. (Except with the default dispatch possibly.)
+%%
+%% Note that if the peer is also implemented with OTP diameter and
+%% generating session ids with diameter:session_id/1 then
+%% route_session/2 can map an optional value to a local node that
+%% happens to have the same name as one of the peer's nodes. This
+%% could lead to an uneven distribution; for example, if the peer
+%% nodes are a subset of the local nodes. In practice, it's typically
+%% known if it's peers or the local node originating sessions; if the
+%% former then setting id = [] disables the optional value mapping, if
+%% the latter then setting default = local disables the hashing.
-spec route_session(ReqT :: request(), Opts)
-> discard
| pid()
when Opts :: pos_integer() %% aka #{search => N}
| list() %% aka #{dispatch => Opts}
| #{search => non_neg_integer(), %% limit number of examined AVPs
- default => discard | mfa(), %% return node() | false
- dispatch => list() | mfa()}. %% spawn options or return pid()
+ id => [binary()], %% restrict optional value map on DiamIdent
+ default => local %% handle locally
+ | discard
+ | mfa(), %% return node() | false
+ dispatch => list() %% spawn options
+ | mfa()}. %% (Node, M, F, A) -> pid() | discard
route_session(ReqT, Opts) ->
- #diameter_packet{bin = Bin} = Pkt = element(1, ReqT),
+ {_, Bin} = Info = diameter_traffic:request_info(ReqT),
Sid = session_id(avps(Bin), search(Opts)),
- Node = default(node_of_session_id(Sid), Sid, Opts, Pkt),
+ Node = default(node_of_session_id(Sid, Opts), Sid, Opts, Info),
dispatch(Node, ReqT, dispatch(Opts)).
%% avps/1
@@ -128,24 +198,31 @@ dispatch(Node, ReqT, Opts) ->
route_session(ReqT) ->
route_session(ReqT, []).
-%% node_of_session_id/1
+%% node_of_session_id/2
%%
%% Return the node name encoded as optional value in a Session-Id,
%% assuming the id has been created with diameter:session_id/0. Lookup
%% the node name to ensure we don't convert arbitrary binaries to
%% atom.
-node_of_session_id([_, _, _, Bin]) ->
- case ets:lookup(?TABLE, Bin) of
- [{_, Node}] ->
- Node;
- [] ->
- false
- end;
+node_of_session_id([Id, _, _, Bin], #{id := Ids}) ->
+ lists:member(Id, Ids) andalso nodemap(Bin);
-node_of_session_id(_) ->
+node_of_session_id([_, _, _, Bin], _) ->
+ nodemap(Bin);
+
+node_of_session_id(_, _) ->
false.
+%% nodemap/1
+
+nodemap(Bin) ->
+ try
+ ets:lookup_element(?NODE_TABLE, Bin, 2)
+ catch
+ error: badarg -> false
+ end.
+
%% session_id/2
session_id(_, 0) -> %% give up
@@ -154,7 +231,7 @@ session_id(_, 0) -> %% give up
%% Session-Id = Command Code 263, V-bit = 0.
session_id(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin, _) ->
case Bin of
- <<Avp:Len/binary>> ->
+ <<Avp:Len/binary, _/binary>> ->
<<_:8/binary, Sid/binary>> = Avp,
split(Sid);
_ ->
@@ -248,20 +325,123 @@ search(_) ->
%%
%% Choose a node when Session-Id lookup has failed.
-default(false = No, _, #{default := discard}, _) ->
- No;
-
-default(false, Sid, #{default := {M,F,A}}, Pkt) ->
- apply(M, F, [Sid, Pkt | A]); %% false | node()
+default(false, _, #{default := discard}, _) ->
+ false;
-default(false, _, _, _) ->
+default(false, _, #{default := local}, _) ->
node();
+default(false, Sid, #{default := {M,F,A}}, Info) ->
+ {ServiceName, Bin} = Info,
+ apply(M, F, [Sid, ServiceName, Bin | A]); %% node() | false
+
+default(false, Sid, _, Info) -> %% aka {?MODULE, hash, []}
+ {ServiceName, Bin} = Info,
+ hash(Sid, ServiceName, Bin);
+
default(Node, _, _, _) ->
Node.
%% ===========================================================================
+%% hash/3
+%%
+%% Consistent hashing of Session-Id to an attached node, or the local
+%% node if Session-Id = false or no attached nodes.
+
+hash(Sid, ServiceName, _) ->
+ case false /= Sid andalso attached(ServiceName) of
+ [_|_] = Nodes ->
+ hash(Sid, Nodes);
+ _ ->
+ node()
+ end.
+
+%% hash/2
+%%
+%% Consistent hashing on arbitrary key/values. Returns false if the
+%% list is empty.
+
+%% No key or no values.
+hash(_, []) ->
+ false;
+
+%% Not much choice.
+hash(_, [Value]) ->
+ Value;
+
+%% Hash on a circle and choose the closest predecessor.
+hash(Key, Values) ->
+ Hash = ?HASH(Key),
+ tl(lists:foldl(fun(V,A) ->
+ choose(Hash, [?HASH({Key, V}) | V], A)
+ end,
+ false, %% < list()
+ Values)).
+
+%% choose/3
+
+choose(Hash, [Hash1 | _] = T, [Hash2 | _])
+ when Hash1 =< Hash, Hash < Hash2 ->
+ T;
+
+choose(Hash, [Hash1 | _], [Hash2 | _] = T)
+ when Hash2 =< Hash, Hash < Hash1 ->
+ T;
+
+choose(_, T1, T2) ->
+ max(T1, T2).
+
+%% ===========================================================================
+
+%% attach/1
+%%
+%% Register the local node as a handler of incoming requests for the
+%% specified services when using the route_session/2 spawn_opt
+%% callback.
+
+attach(ServiceNames) ->
+ abcast({attach, node(), ServiceNames}).
+
+%% detach/1
+%%
+%% Deregister the local node as a handler of incoming requests.
+
+detach(ServiceNames) ->
+ abcast({detach, node(), ServiceNames}).
+
+%% abcast/1
+
+abcast(T) ->
+ gen_server:abcast([node() | nodes()], ?SERVER, T),
+ ok.
+
+%% attached/1
+
+attached(ServiceName) ->
+ try
+ ets:lookup_element(?SERVICE_TABLE, ServiceName, 2)
+ catch
+ error: badarg -> []
+ end.
+
+%% cast/2
+
+cast(Node, T) ->
+ gen_server:cast({?SERVER, Node}, T).
+
+%% attach/2
+
+attach(Node, S) ->
+ case sets:to_list(S) of
+ [] ->
+ ok;
+ Services ->
+ cast(Node, {attach, node(), Services})
+ end.
+
+%% ===========================================================================
+
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, _Args = [], _Opts = []).
@@ -272,11 +452,12 @@ start_link() ->
%% binaries that aren't necessarily node names.
init([]) ->
- ets:new(?TABLE, [set, named_table]),
+ ets:new(?NODE_TABLE, [set, named_table]),
+ ets:new(?SERVICE_TABLE, [bag, named_table]),
ok = net_kernel:monitor_nodes(true, [{node_type, all}, nodedown_reason]),
- ets:insert(?TABLE, [{B,N} || N <- [node() | nodes()],
- B <- [?B(N)]]),
- {ok, erlang:monotonic_time()}.
+ ets:insert(?NODE_TABLE, [{?B(N), N} || N <- [node() | nodes()]]),
+ abcast({attach, node()}),
+ {ok, sets:new()}.
%% handle_call/3
@@ -285,17 +466,49 @@ handle_call(_, _From, S) ->
%% handle_cast/2
+%% Remote node is asking which services the local node wants to handle.
+handle_cast({attach, Node}, S)
+ when Node /= node() ->
+ attach(Node, S),
+ {noreply, S};
+
+%% Node wants to handle incoming requests ...
+handle_cast({attach, Node, ServiceNames}, S) ->
+ ets:insert(?SERVICE_TABLE, [{N, Node} || N <- ServiceNames]),
+ {noreply, case node() of
+ Node ->
+ sets:union(S, sets:from_list(ServiceNames));
+ _ ->
+ S
+ end};
+
+%% ... or not.
+handle_cast({detach, Node, ServiceNames}, S) ->
+ ets:select_delete(?SERVICE_TABLE, [{{'$1', Node},
+ [?ORCOND([{'==', '$1', {const, N}}
+ || N <- ServiceNames])],
+ [true]}]),
+ {noreply, case node() of
+ Node ->
+ sets:subtract(S, sets:from_list(ServiceNames));
+ _ ->
+ S
+ end};
+
handle_cast(_, S) ->
{noreply, S}.
%% handle_info/2
handle_info({nodeup, Node, _}, S) ->
- ets:insert(?TABLE, {?B(Node), Node}),
+ ets:insert(?NODE_TABLE, {?B(Node), Node}),
+ cast(Node, {attach, node()}), %% ask which services remote node handles
+ attach(Node, S), %% say which service local node handles
{noreply, S};
handle_info({nodedown, Node, _}, S) ->
- ets:delete(?TABLE, ?B(Node)),
+ ets:delete(?NODE_TABLE, ?B(Node)),
+ ets:select_delete(?SERVICE_TABLE, [{{'_', Node}, [], [true]}]),
{noreply, S};
handle_info(_, S) ->
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index b1b797aad8..c0643402a6 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -42,6 +42,9 @@
peer_up/1,
peer_down/1]).
+%% towards diameter_dist
+-export([request_info/1]).
+
%% internal
-export([send/1, %% send from remote node
request/1, %% process request in handler process
@@ -289,8 +292,7 @@ spawn_request(false, _, _, _, _, _, _) -> %% no transport
%% handler process dies (in a handle_request callback for example).
spawn_request(AppT, {M,F,A}, Ack, TPid, Pkt, Dict0, RecvData) ->
%% Term to pass to request/1 in an appropriate process. Module
- %% diameter_dist implements callbacks, and uses the form of the
- %% argument tuple constructed below.
+ %% diameter_dist implements callbacks.
ReqT = {Pkt, AppT, Ack, TPid, Dict0, RecvData},
apply(M, F, [ReqT | A]);
@@ -302,6 +304,13 @@ spawn_request(AppT, Opts, Ack, TPid, Pkt, Dict0, RecvData) ->
end,
Opts).
+%% request_info/1
+%%
+%% Limited request information for diameter_dist.
+
+request_info({Pkt, _AppT, _Ack, _TPid, _Dict0, RecvData} = _ReqT) ->
+ {RecvData#recvdata.service_name, Pkt#diameter_packet.bin}.
+
%% request/1
%%
%% Called from a handler process chosen by a transport spawn_opt MFA