aboutsummaryrefslogblamecommitdiffstats
path: root/lib/diameter/src/base/diameter_reg.erl
blob: e6a82e1764a030ffc13f4f1c4d7bd7686de9d41c (plain) (tree)
1
2
3
4
5


                   
                                                        
  










                                                                           








                                                       



                       
                  
                 

                       












                         


                   




                    


                         


                                
 
                                        









                                                                              







                                                                              
                                                                     
                                



                                                                              
 
                


           
                          
 





                                                                              
 
                    


                
                         
 
                                                                              
                 


                                                                              
 
                   

           

                      
 









                                                                              
 

                       

             


                    
 

                                         
 






                                                                              

                       
 
            
                                                


                                                                              


                                                              
                                                              










                                                                              












                                                                    

            
                         




                        
                                                                        






                               

             
                         





                      































                                                               


                                                             




                                        


                                                             
 







                                              
                                          


                                   
                          








                                                                        
        
 



                                                                               

                           
 

                                                
 
                              
                    
 


                                                             
 
                      
                 
 


                                                             
 

                                                   
 
                        
                 
 


                                                             



                            


                                                             



                                      

                                                                              


















                                                                         
                                      












                                                                            
 
        
 


                                                     
 















































                                                                           
                             






                                                                  
                                                                  












                                                  
                          


                                                   

                                                                   










                                                                      

      






























                                                                            
 



                                                
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%

%%
%% The module implements a simple term -> pid registry.
%%

-module(diameter_reg).
-behaviour(gen_server).

-export([add/1,
         add_new/1,
         remove/1,
         match/1,
         wait/1,
         subscribe/2]).

-export([start_link/0]).

%% gen_server callbacks
-export([init/1,
         terminate/2,
         handle_call/3,
         handle_cast/2,
         handle_info/2,
         code_change/3]).

%% test
-export([pids/0,
         terms/0,
         subs/0,
         waits/0]).

%% debug
-export([state/0,
         uptime/0]).

-define(SERVER, ?MODULE).
-define(TABLE, ?MODULE).

-type key() :: term().
-type from() :: {pid(), term()}.
-type pattern() :: term().

-record(state, {id = diameter_lib:now(),
                receivers = dict:new()
                         :: dict:dict(pattern(), [[pid() | term()]%% subscribe
                                                  | from()]),     %% wait
                monitors = sets:new() :: sets:set(pid())}).

%% The ?TABLE bag contains the Key -> Pid mapping, as {Key, Pid}
%% tuples. Each pid is stored in the monitors set to ensure only one
%% monitor for each pid: more are harmless, but unnecessary. A pattern
%% is added to receivers a result of calls to wait/1 or subscribe/2:
%% changes to ?TABLE causes processes to be notified as required.

%% ===========================================================================
%% # add(T)
%%
%% Associate the specified term with self(). The list of pids having
%% this or other assocations can be retrieved using match/1.
%%
%% An association is removed when the calling process dies or as a
%% result of calling remove/1. Adding the same term more than once is
%% equivalent to adding it once.
%%
%% Note that since match/1 takes a pattern as argument, specifying a
%% term that contains match variables is probably not a good idea
%% ===========================================================================

-spec add(key())
   -> true.

add(T) ->
    call({add, false, T}).

%% ===========================================================================
%% # add_new(T)
%%
%% Like add/1 but only one process is allowed to have the the
%% association, false being returned if an association already exists.
%% ===========================================================================

-spec add_new(key())
   -> boolean().

add_new(T) ->
    call({add, true, T}).

%% ===========================================================================
%% # remove(Term)
%%
%% Remove any existing association of Term with self().
%% ===========================================================================

-spec remove(key())
   -> true.

remove(T) ->
    call({remove, T}).

%% ===========================================================================
%% # match(Pat)
%%
%% Return the list of associations whose Term, as specified to add/1
%% or add_new/1, matches the specified pattern.
%%
%% Note that there's no guarantee that the returned processes are
%% still alive. (Although one that isn't will soon have its
%% associations removed.)
%% ===========================================================================

-spec match(pattern())
   -> [{key(), pid()}].

match(Pat) ->
    match(Pat, '_').

%% match/2

match(Pat, Pid) ->
    ets:match_object(?TABLE, {Pat, Pid}).

%% ===========================================================================
%% # wait(Pat)
%%
%% Like match/1 but return only when the result is non-empty or fails.
%% It's up to the caller to ensure that the wait won't be forever.
%% ===========================================================================

-spec wait(pattern())
   -> [{key(), pid()}].

wait(Pat) ->
    _ = match(Pat),  %% ensure match can succeed
    call({wait, Pat}).

%% ===========================================================================
%% # subscribe(Pat, T)
%%
%% Like match/1, but additionally receive messages of the form
%% {T, add|remove, {term(), pid()} when associations are added
%% or removed.
%% ===========================================================================

-spec subscribe(Pat :: any(), T :: term())
   -> [{term(), pid()}].

subscribe(Pat, T) ->
    _ = match(Pat),  %% ensure match can succeed
    call({subscribe, Pat, T}).

%% ===========================================================================

start_link() ->
    ServerName = {local, ?SERVER},
    Options    = [{spawn_opt, diameter_lib:spawn_opts(server, [])}],
    gen_server:start_link(ServerName, ?MODULE, [], Options).

state() ->
    call(state).

uptime() ->
    call(uptime).

%% pids/0

-spec pids()
   -> [{pid(), [key()]}].

pids() ->
    to_list(fun swap/1).

to_list(Fun) ->
    ets:foldl(fun(T,D) -> append(Fun(T), D) end, orddict:new(), ?TABLE).

append({K,V}, Dict) ->
    orddict:append(K, V, Dict).

id(T) -> T.

%% terms/0

-spec terms()
   -> [{key(), [pid()]}].

terms() ->
    to_list(fun id/1).

swap({X,Y}) -> {Y,X}.

%% subs/0

-spec subs()
   -> [{pattern(), [{pid(), term()}]}].

subs() ->
    #state{receivers = RD} = state(),
    dict:fold(fun sub/3, orddict:new(), RD).

sub(Pat, Ps, Dict) ->
    lists:foldl(fun([P|T], D) -> orddict:append(Pat, {P,T}, D);
                   (_, D) -> D
                end,
                Dict,
                Ps).

%% waits/0

-spec waits()
   -> [{pattern(), [{from(), term()}]}].

waits() ->
    #state{receivers = RD} = state(),
    dict:fold(fun wait/3, orddict:new(), RD).

wait(Pat, Ps, Dict) ->
    lists:foldl(fun({_,_} = F, D) -> orddict:append(Pat, F, D);
                   (_, D) -> D
                end,
                Dict,
                Ps).

%% ----------------------------------------------------------
%% # init/1
%% ----------------------------------------------------------

init(_) ->
    ets:new(?TABLE, [bag, named_table]),
    {ok, #state{}}.

%% ----------------------------------------------------------
%% # handle_call/3
%% ----------------------------------------------------------

handle_call({add, Uniq, Key}, {Pid, _}, S0) ->
    Rec = {Key, Pid},
    S1 = flush(Uniq, Rec, S0),
    {Res, New} = insert(Uniq, Rec),
    {Recvs, S} = add(New, Rec, S1),
    notify(Recvs, Rec),
    {reply, Res, S};

handle_call({remove, Key}, {Pid, _}, S) ->
    Rec = {Key, Pid},
    Recvs = delete([Rec], S),
    ets:delete_object(?TABLE, Rec),
    notify(Recvs, remove),
    {reply, true, S};

handle_call({wait, Pat}, {Pid, _} = From, #state{receivers = RD} = S) ->
    NS = add_monitor(Pid, S),
    case match(Pat) of
        [_|_] = L ->
            {reply, L, NS};
        [] ->
            {noreply, NS#state{receivers = dict:append(Pat, From, RD)}}
    end;

handle_call({subscribe, Pat, T}, {Pid, _}, #state{receivers = RD} = S) ->
    NS = add_monitor(Pid, S),
    {reply, match(Pat), NS#state{receivers = dict:append(Pat, [Pid | T], RD)}};

handle_call(state, _, S) ->
    {reply, S, S};

handle_call(uptime, _, #state{id = Time} = S) ->
    {reply, diameter_lib:now_diff(Time), S};

handle_call(_Req, _From, S) ->
    {reply, nok, S}.

%% ----------------------------------------------------------
%% # handle_cast/2
%% ----------------------------------------------------------

handle_cast(_Msg, S)->
    {noreply, S}.

%% ----------------------------------------------------------
%% # handle_info/2
%% ----------------------------------------------------------

handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
    {noreply, down(Pid, S)};

handle_info(_Info, S) ->
    {noreply, S}.

%% ----------------------------------------------------------
%% # terminate/2
%% ----------------------------------------------------------

terminate(_Reason, _State)->
    ok.

%% ----------------------------------------------------------
%% # code_change/3
%% ----------------------------------------------------------

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%% ===========================================================================

%% insert/2

insert(false, Rec) ->
    Spec = [{'$1', [{'==', '$1', {const, Rec}}], ['$_']}],
    X = '$end_of_table' /= ets:select(?TABLE, Spec, 1),  %% entry exists?
    X orelse ets:insert(?TABLE, Rec),
    {true, not X};

insert(true, Rec) ->
    B = ets:insert_new(?TABLE, Rec),  %% entry inserted?
    {B, B}.

%% add/3

%% Only add a single monitor for any given process, since there's no
%% use to more.
add(true, {_Key, Pid} = Rec, S) ->
    NS = add_monitor(Pid, S),
    {Recvs, RD} = add(Rec, NS),
    {Recvs, NS#state{receivers = RD}};

add(false = No, _, S) ->
    {No, S}.

%% add/2

%% Notify processes whose patterns match the inserted key.
add({_Key, Pid} = Rec, #state{receivers = RD}) ->
    dict:fold(fun(Pt, Ps, A) ->
                      add(lists:member(Rec, match(Pt, Pid)), Pt, Ps, Rec, A)
              end,
              {sets:new(), RD},
              RD).

%% add/5

add(true, Pat, Recvs, {_,_} = Rec, {Set, Dict}) ->
    {lists:foldl(fun sets:add_element/2, Set, Recvs),
     remove(fun erlang:is_list/1, Pat, Recvs, Dict)};

add(false, _, _, _, Acc) ->
    Acc.

%% add_monitor/2

add_monitor(Pid, #state{monitors = MS} = S) ->
    add_monitor(sets:is_element(Pid, MS), Pid, S).

%% add_monitor/3

add_monitor(false, Pid, #state{monitors = MS} = S) ->
    monitor(process, Pid),
    S#state{monitors = sets:add_element(Pid, MS)};

add_monitor(true, _, S) ->
    S.

%% delete/2

delete(Recs, #state{receivers = RD}) ->
    lists:foldl(fun(R,S) -> delete(R, RD, S) end, sets:new(), Recs).

%% delete/3

delete({_Key, Pid} = Rec, RD, Set) ->
    dict:fold(fun(Pt, Ps, S) ->
                      delete(lists:member(Rec, match(Pt, Pid)), Rec, Ps, S)
              end,
              Set,
              RD).

%% delete/4

%% Entry matches a pattern ...
delete(true, Rec, Recvs, Set) ->
    lists:foldl(fun(R,S) -> sets:add_element({R, Rec}, S) end,
                Set,
                Recvs);

%% ... or not.
delete(false, _, _, Set) ->
    Set.

%% notify/2

notify(false = No, _) ->
    No;

notify(Recvs, remove = Op) ->
    sets:fold(fun({P,R}, N) -> send(P, R, Op), N+1 end, 0, Recvs);

notify(Recvs, {_,_} = Rec) ->
    sets:fold(fun(P,N) -> send(P, Rec, add), N+1 end, 0, Recvs).

%% send/3

%% No processes waiting on remove, by construction: they've either
%% received notification at add or aren't waiting.
send([Pid | T], Rec, Op) ->
    Pid ! {T, Op, Rec};

send({_,_} = From, Rec, add) ->
    gen_server:reply(From, [Rec]).

%% down/2

down(Pid, #state{monitors = MS} = S) ->
    NS = flush(Pid, S),
    Recvs = delete(match('_', Pid), NS),
    ets:match_delete(?TABLE, {'_', Pid}),
    notify(Recvs, remove),
    NS#state{monitors = sets:del_element(Pid, MS)}.

%% flush/3

%% Remove any processes that are dead but for which we may not have
%% received 'DOWN' yet, to ensure that add_new can be used to register
%% a unique name each time a registering process restarts.
flush(true, {Key, Pid}, S) ->
    Spec = [{{'$1', '$2'},
             [{'andalso', {'==', '$1', {const, Key}},
                          {'/=', '$2', Pid}}],
             ['$2']}],
    lists:foldl(fun down/2, S, [P || P <- ets:select(?TABLE, Spec),
                                     not is_process_alive(P)]);

flush(false, _, S) ->
    S.

%% flush/2

%% Process has died and should no longer receive messages/replies.
flush(Pid, #state{receivers = RD} = S)
  when is_pid(Pid) ->
    S#state{receivers = dict:fold(fun(Pt,Ps,D) -> flush(Pid, Pt, Ps, D) end,
                                  RD,
                                  RD)}.

%% flush/4

flush(Pid, Pat, Recvs, Dict) ->
    remove(fun(T) -> Pid /= head(T) end, Pat, Recvs, Dict).

%% head/1

head([P|_]) ->
    P;

head({P,_}) ->
    P.

%% remove/4

remove(Pred, Key, Values, Dict) ->
     case lists:filter(Pred, Values) of
         [] ->
             dict:erase(Key, Dict);
         Rest ->
             dict:store(Key, Rest, Dict)
     end.

%% call/1

call(Request) ->
    gen_server:call(?SERVER, Request, infinity).