aboutsummaryrefslogblamecommitdiffstats
path: root/lib/diameter/src/base/diameter_dist.erl
blob: ed23152b8b723c617b6ea525df42986be0f2cef0 (plain) (tree)





























                                                                           
                      




                           







                                                                           












                                              

                                                                       
                                                                










                                                                      

                                       

                                                               









                                                               


                                                    









                                                       

                                                                
  























































                                                                          





                                                                             





                                                                               

                            
                                                          
                                              
                                                                   


















                                                               






                                                             
                       

                                                                  


                                                                      
 

                                                    
 



                                        

          








                                               
               
 

                               

                                            
                                                              
               
                                       
                                             
                       
            
                 






                                                                      
                                                  


                                                      
                                                                     
            

                 
 

                   
 





                                                                    
 















                                          















                                                                






























                                                             

                                              
 
                                            

           







                                                                


                         

                                                                              

































































































                                                                              









                                                                              

                                                

                                                              


                                                                     







                           




























                                                                          





                                    


                                                                            


                                      

                                                                   











                    








                                                              

                                      
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2019. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%

-module(diameter_dist).

-behaviour(gen_server).

%%
%% Implements callbacks that can be configured as a spawn_opt
%% transport configuration, to be able to distribute incoming Diameter
%% requests to handler processes (local or remote) in various ways.
%%

%% spawn_opt callbacks
-export([spawn_local/2,
         spawn_local/1,
         route_session/2,
         route_session/1]).

%% signal availability for handling incoming requests to route_sesssion/2
-export([attach/1,
         detach/1]).

%% consistent hashing
-export([hash/3,   %% for use as default MFA in route_session/2 options map
         hash/2]). %% arbitrary key/values

-include_lib("diameter/include/diameter.hrl").

%% server start
-export([start_link/0]).

%% gen_server callbacks
-export([init/1,
         handle_info/2,
         handle_cast/2,
         handle_call/3,
         code_change/3,
         terminate/2]).

-type request() :: tuple().  %% callback argument from diameter_traffic

-define(SERVER, ?MODULE).  %% server monitoring node connections

%% Maps a node name binary to the corresponding atom. Used by
%% route_session/2 to map the optional value of a Session-Id to
%% node().
-define(NODE_TABLE, diameter_dist_node).

%% Maps a diameter:service_name() to a node() that has called attach/1
%% to declare its willingness to handle incoming requests for the
%% service. Use by route_session/2 in case the optional value mapping
%% has failed.
-define(SERVICE_TABLE, diameter_dist_service).

-define(B(A), atom_to_binary(A, utf8)).
-define(ORCOND(List), list_to_tuple(['orelse', false | List])).
-define(HASH(T), erlang:phash2(T, 16#100000000)).

%% spawn_local/2
%%
%% Callback that is equivalent to an options list. That is, the
%% following are equivalent when passed as options to
%% diameter:add_transport/2.
%%
%%   {spawn_opt, Opts}
%%   {spawn_opt, {diameter_dist, spawn_local, [Opts]}}

-spec spawn_local(ReqT :: request(), Opts :: list())
   -> pid().

spawn_local(ReqT, Opts) ->
    spawn_opt(diameter_traffic, request, [ReqT], Opts).

%% spawn_local/1

spawn_local(ReqT) ->
    spawn_local(ReqT, []).

%% route_session/2
%%
%% Callback that maps the Session-Id of an incoming request to a
%% handler node.
%%
%% With an options list, maps an id whose optional value is the name
%% of a connected node to the same node, to handle the case that the
%% session id has been returned from diameter:session_id/1; otherwise
%% to a node that has called diameter_dist:attach/1 using the
%% consistent hashing provided by hash/3, or to the local node() if a
%% session id could not be extracted or there are no attached nodes. A
%% handler process is spawned on the selected node using
%% erlang:spawn_opt/4.
%%
%% Different behaviour can be configured by supplying an options map
%% of the following form:
%%
%%   #{search => non_neg_integer(),
%%     id => [binary()],
%%     default => discard | local | mfa(),
%%     dispatch => list() | mfa()}
%%
%% The search member limits the number of AVPs that are examined in
%% the message (from the front), to avoid searching entire message in
%% case it's known that peers follow RFC 6733's recommendation that
%% Session-Id be placed at the head of a message. The default is to
%% search the entire message.
%%
%% The id member restricts the optional value mapping to session ids
%% whose DiamterIdentity is one of those specified. Set this to the
%% list of Diameter identities advertised by the service in question
%% (typically one) to ensure that only locally generated session ids
%% are mapped; or to the empty list to disable the mapping.
%%
%% The default member determines where to handle a message whose
%% Session-Id isn't found or whose optional value isn't mapped to the
%% name of a connected node. The atom local says the local node, an
%% MFA is invoked on Session-Id | false, the name of the diameter
%% service, and the message binary, and should return either a node()
%% or false to discard the message. Defaults to {diameter_dist, hash, []}.
%%
%% The dispatch member determines how the pid() of the request handler
%% process is retrieved. An MFA is applied to a previously selected
%% node(), and the module, function, and arguments list to apply in
%% the handler process to handle the request, the MFA being supplied
%% by diameter, and returns pid() | discard. A list is equivalent to
%% {erlang, spawn_opt, []}. Defaults to [].
%%
%% This can be used with search = 0 to route on something other than
%% Session-Id, but this is probably no simpler than just implementing
%% an own spawn_opt callback. (Except with the default dispatch possibly.)
%%
%% Note that if the peer is also implemented with OTP diameter and
%% generating session ids with diameter:session_id/1 then
%% route_session/2 can map an optional value to a local node that
%% happens to have the same name as one of the peer's nodes. This
%% could lead to an uneven distribution; for example, if the peer
%% nodes are a subset of the local nodes. In practice, it's typically
%% known if it's peers or the local node originating sessions; if the
%% former then setting id = [] disables the optional value mapping, if
%% the latter then setting default = local disables the hashing.
-spec route_session(ReqT :: request(), Opts)
   -> discard
    | pid()
 when Opts :: pos_integer()   %% aka #{search => N}
            | list()          %% aka #{dispatch => Opts}
            | #{search => non_neg_integer(), %% limit number of examined AVPs
                id => [binary()],   %% restrict optional value map on DiamIdent
                default => local    %% handle locally
                         | discard
                         | mfa(),   %% return node() | false
                dispatch => list()  %% spawn options
                          | mfa()}. %% (Node, M, F, A) -> pid() | discard

route_session(ReqT, Opts) ->
    {_, Bin} = Info = diameter_traffic:request_info(ReqT),
    Sid = session_id(avps(Bin), search(Opts)),
    Node = default(node_of_session_id(Sid, Opts), Sid, Opts, Info),
    dispatch(Node, ReqT, dispatch(Opts)).

%% avps/1

avps(<<_:20/binary, Bin/binary>>) ->
    Bin;

avps(_) ->
    false.

%% dispatch/3

dispatch(false, _, _) ->
    discard;

dispatch(Node, ReqT, {M,F,A}) ->
    apply(M, F, [Node, diameter_traffic, request, [ReqT] | A]);

dispatch(Node, ReqT, Opts) ->
    spawn_opt(Node, diameter_traffic, request, [ReqT], Opts).

%% route_session/1

route_session(ReqT) ->
    route_session(ReqT, []).

%% node_of_session_id/2
%%
%% Return the node name encoded as optional value in a Session-Id,
%% assuming the id has been created with diameter:session_id/0. Lookup
%% the node name to ensure we don't convert arbitrary binaries to
%% atom.

node_of_session_id([Id, _, _, Bin], #{id := Ids}) ->
    lists:member(Id, Ids) andalso nodemap(Bin);

node_of_session_id([_, _, _, Bin], _) ->
    nodemap(Bin);

node_of_session_id(_, _) ->
    false.

%% nodemap/1

nodemap(Bin) ->
    try
        ets:lookup_element(?NODE_TABLE, Bin, 2)
    catch
        error: badarg -> false
    end.

%% session_id/2

session_id(_, 0) ->  %% give up
    false;

%% Session-Id = Command Code 263, V-bit = 0.
session_id(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin, _) ->
    case Bin of
        <<Avp:Len/binary, _/binary>> ->
            <<_:8/binary, Sid/binary>> = Avp,
            split(Sid);
        _ ->
            false
    end;

%% Jump to the next AVP. This is potentially costly for a message with
%% many AVPs and no Session-Id, which an attacker is prone to send.
%% 8.8 or RFC 6733 says that Session-Id SHOULD (but not MUST) appear
%% immediately following the Diameter Header, so there is no
%% guarantee.
session_id(<<_:40, Len:24, _/binary>> = Bin, N) ->
    Pad = (4 - (Len rem 4)) rem 4,
    case Bin of
        <<_:Len/binary, _:Pad/binary, Rest/binary>> ->
            session_id(Rest, if N == infinity -> N; true -> N-1 end);
        _ ->
            false
    end;

session_id(_, _) ->
    false.

%% split/1
%%
%% Split a Session-Id at no more than three semicolons: the optional
%% value (if any) follows the third. binary:split/2 does better than
%% matching character by character, especially when the pattern is
%% compiled.

split(Bin) ->
    split(3, Bin, pattern()).

%% split/3

split(0, Bin, _) ->
    [Bin];

split(N, Bin, Pattern) ->
    [H|T] = binary:split(Bin, Pattern),
    [H | case T of
             [] ->
                 T;
             [Rest] ->
                 split(N-1, Rest, Pattern)
         end].

%% pattern/0
%%
%% Since this is being called in a watchdog process, compile the
%% pattern once and maintain it in the process dictionary.

pattern() ->
    case get(?MODULE) of
        undefined ->
            CP = binary:compile_pattern(<<$;>>), %% tuple
            put(?MODULE, CP),
            CP;
        CP ->
            CP
    end.

%% dispatch/1

dispatch(#{} = Opts) ->
    maps:get(dispatch, Opts, []);

dispatch(Opts)
  when is_list(Opts) ->
    Opts;

dispatch(_) ->
    [].

%% search/1
%%
%% Bound number of AVPs examined when looking for Session-Id.

search(#{search := N})
  when is_integer(N), 0 =< N ->
    N;

search(N)
  when is_integer(N), 0 =< N ->
    N;

search(_) ->
    infinity.

%% default/3
%%
%% Choose a node when Session-Id lookup has failed.

default(false, _, #{default := discard}, _) ->
    false;

default(false, _, #{default := local}, _) ->
    node();

default(false, Sid, #{default := {M,F,A}}, Info) ->
    {ServiceName, Bin} = Info,
    apply(M, F, [Sid, ServiceName, Bin | A]);  %% node() | false

default(false, Sid, _, Info) -> %% aka {?MODULE, hash, []}
    {ServiceName, Bin} = Info,
    hash(Sid, ServiceName, Bin);

default(Node, _, _, _) ->
    Node.

%% ===========================================================================

%% hash/3
%%
%% Consistent hashing of Session-Id to an attached node, or the local
%% node if Session-Id = false or no attached nodes.

hash(Sid, ServiceName, _) ->
    case false /= Sid andalso attached(ServiceName) of
        [_|_] = Nodes ->
            hash(Sid, Nodes);
        _ ->
            node()
    end.

%% hash/2
%%
%% Consistent hashing on arbitrary key/values. Returns false if the
%% list is empty.

%% No key or no values.
hash(_, []) ->
    false;

%% Not much choice.
hash(_, [Value]) ->
    Value;

%% Hash on a circle and choose the closest predecessor.
hash(Key, Values) ->
    Hash = ?HASH(Key),
    tl(lists:foldl(fun(V,A) ->
                           choose(Hash, [?HASH({Key, V}) | V], A)
                   end,
                   false,  %% < list()
                   Values)).

%% choose/3

choose(Hash, [Hash1 | _] = T, [Hash2 | _])
  when Hash1 =< Hash, Hash < Hash2 ->
    T;

choose(Hash, [Hash1 | _], [Hash2 | _] = T)
  when Hash2 =< Hash, Hash < Hash1 ->
    T;

choose(_, T1, T2) ->
    max(T1, T2).

%% ===========================================================================

%% attach/1
%%
%% Register the local node as a handler of incoming requests for the
%% specified services when using the route_session/2 spawn_opt
%% callback.

attach(ServiceNames) ->
    abcast({attach, node(), ServiceNames}).

%% detach/1
%%
%% Deregister the local node as a handler of incoming requests.

detach(ServiceNames) ->
    abcast({detach, node(), ServiceNames}).

%% abcast/1

abcast(T) ->
    gen_server:abcast([node() | nodes()], ?SERVER, T),
    ok.

%% attached/1

attached(ServiceName) ->
    try
        ets:lookup_element(?SERVICE_TABLE, ServiceName, 2)
    catch
        error: badarg -> []
    end.

%% cast/2

cast(Node, T) ->
    gen_server:cast({?SERVER, Node}, T).

%% attach/2

attach(Node, S) ->
    case sets:to_list(S) of
        [] ->
            ok;
        Services ->
            cast(Node, {attach, node(), Services})
    end.

%% ===========================================================================

start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, _Args = [], _Opts  = []).

%% init/1
%%
%% Maintain [node() | nodes()] in a table that maps from binary-valued
%% names, so we can lookup the corresponding atoms rather than convert
%% binaries that aren't necessarily node names.

init([]) ->
    ets:new(?NODE_TABLE,    [set, named_table]),
    ets:new(?SERVICE_TABLE, [bag, named_table]),
    ok = net_kernel:monitor_nodes(true, [{node_type, visible},
                                         nodedown_reason]),
    ets:insert(?NODE_TABLE, [{?B(N), N} || N <- [node() | nodes()]]),
    abcast({attach, node()}),
    {ok, sets:new()}.

%% handle_call/3

handle_call(_, _From, S) ->
    {reply, nok, S}.

%% handle_cast/2

%% Remote node is asking which services the local node wants to handle.
handle_cast({attach, Node}, S)
  when Node /= node() ->
    attach(Node, S),
    {noreply, S};

%% Node wants to handle incoming requests ...
handle_cast({attach, Node, ServiceNames}, S) ->
    ets:insert(?SERVICE_TABLE, [{N, Node} || N <- ServiceNames]),
    {noreply, case node() of
                  Node ->
                      sets:union(S, sets:from_list(ServiceNames));
                  _ ->
                      S
              end};

%% ... or not.
handle_cast({detach, Node, ServiceNames}, S) ->
    ets:select_delete(?SERVICE_TABLE, [{{'$1', Node},
                                        [?ORCOND([{'==', '$1', {const, N}}
                                                  || N <- ServiceNames])],
                                        [true]}]),
    {noreply, case node() of
                  Node ->
                      sets:subtract(S, sets:from_list(ServiceNames));
                  _ ->
                      S
              end};

handle_cast(_, S) ->
    {noreply, S}.

%% handle_info/2

handle_info({nodeup, Node, _}, S) ->
    ets:insert(?NODE_TABLE, {?B(Node), Node}),
    cast(Node, {attach, node()}),  %% ask which services remote node handles
    attach(Node, S),               %% say which service local node handles
    {noreply, S};

handle_info({nodedown, Node, _}, S) ->
    ets:delete(?NODE_TABLE, ?B(Node)),
    ets:select_delete(?SERVICE_TABLE, [{{'_', Node}, [], [true]}]),
    {noreply, S};

handle_info(_, S) ->
    {noreply, S}.

%% terminate/2

terminate(_, _) ->
    ok.

%% code_change/3

%% Old code inadvertently monitored all nodes: start a new
%% subscription and remove the old one.
code_change(_OldVsn, State, "2.2") ->
    ok = net_kernel:monitor_nodes(true, [{node_type, visible},
                                         nodedown_reason]),
    ok = net_kernel:monitor_nodes(false, [{node_type, all},
                                          nodedown_reason]),
    {ok, State};

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.