aboutsummaryrefslogblamecommitdiffstats
path: root/lib/diameter/src/base/diameter_reg.erl
blob: db01e17c86d1c2ff561a996cd629294bf8e62cd2 (plain) (tree)
1
2
3
4
5


                   
                                                        
  










                                                                           








                                                       




                       

                  


















                         












                                                                          
                                        














                                                                              






                                             





                                                                              






                                             
                                                                              



                                                                              






                           









                                                                              
 
                  




                                                 






                                                                              


                        



                                                                              












                                                                    


                          

















                                                                     


                          





                      


                                                             




                                        


                                                             
 
                                          
                                
                                                   
                              
 

                                                              
 






                                                     
 

                           
 

                                                
 
                              
                    
 


                                                             
 
                      
                 
 


                                                             
 
                                                  

                                                   
                 
 
                        
                 
 


                                                             



                            


                                                             



                                      

                                                                              

                                                                       






























                                                                   






























                                                               



                                                
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%

%%
%% The module implements a simple term -> pid registry.
%%

-module(diameter_reg).
-behaviour(gen_server).

-export([add/1,
         add_new/1,
         del/1,
         match/1,
         wait/1]).

-export([start_link/0]).

%% gen_server callbacks
-export([init/1,
         terminate/2,
         handle_call/3,
         handle_cast/2,
         handle_info/2,
         code_change/3]).

%% test
-export([pids/0,
         terms/0]).

%% debug
-export([state/0,
         uptime/0]).

-define(SERVER, ?MODULE).
-define(TABLE, ?MODULE).

%% Table entry used to keep from starting more than one monitor on the
%% same process. This isn't a problem but there's no point in starting
%% multiple monitors if we can avoid it. Note that we can't have a 2-tuple
%% keyed on Pid since a registered term can be anything. Want the entry
%% keyed on Pid so that lookup is fast.
-define(MONITOR(Pid, MRef), {Pid, monitor, MRef}).

%% Table entry containing the Term -> Pid mapping.
-define(MAPPING(Term, Pid), {Term, Pid}).

-record(state, {id = diameter_lib:now(),
                q = []}). %% [{From, Pat}]

%% ===========================================================================
%% # add(T)
%%
%% Associate the specified term with self(). The list of pids having
%% this or other assocations can be retrieved using match/1.
%%
%% An association is removed when the calling process dies or as a
%% result of calling del/1. Adding the same term more than once is
%% equivalent to adding it exactly once.
%%
%% Note that since match/1 takes a pattern as argument, specifying a
%% term that contains match variables is probably not a good idea
%% ===========================================================================

-spec add(any())
   -> true.

add(T) ->
    call({add, fun ets:insert/2, T, self()}).

%% ===========================================================================
%% # add_new(T)
%%
%% Like add/1 but only one process is allowed to have the the
%% association, false being returned if an association already exists.
%% ===========================================================================

-spec add_new(any())
   -> boolean().

add_new(T) ->
    call({add, fun insert_new/2, T, self()}).

%% ===========================================================================
%% # del(Term)
%%
%% Remove any existing association of Term with self().
%% ===========================================================================

-spec del(any())
   -> true.

del(T) ->
    call({del, T, self()}).

%% ===========================================================================
%% # match(Pat)
%%
%% Return the list of associations whose Term, as specified to add/1
%% or add_new/1, matches the specified pattern.
%%
%% Note that there's no guarantee that the returned processes are
%% still alive. (Although one that isn't will soon have its
%% associations removed.)
%% ===========================================================================

-spec match(any())
   -> [{term(), pid()}].

match(Pat) ->
    ets:match_object(?TABLE, ?MAPPING(Pat, '_')).

%% ===========================================================================
%% # wait(Pat)
%%
%% Like match/1 but return only when the result is non-empty or fails.
%% It's up to the caller to ensure that the wait won't be forever.
%% ===========================================================================

-spec wait(any())
   -> [{term(), pid()}].

wait(Pat) ->
    call({wait, Pat}).

%% ===========================================================================

start_link() ->
    ServerName = {local, ?SERVER},
    Options    = [{spawn_opt, diameter_lib:spawn_opts(server, [])}],
    gen_server:start_link(ServerName, ?MODULE, [], Options).

state() ->
    call(state).

uptime() ->
    call(uptime).

%% pids/0

-spec pids()
   -> [{pid(), [term()]}].

pids() ->
    to_list(fun swap/1).

to_list(Fun) ->
    ets:foldl(fun(T,A) -> acc(Fun, T, A) end, orddict:new(), ?TABLE).

acc(Fun, ?MAPPING(Term, Pid), Dict) ->
    append(Fun({Term, Pid}), Dict);
acc(_, _, Dict) ->
    Dict.

append({K,V}, Dict) ->
    orddict:append(K, V, Dict).

id(T) -> T.

%% terms/0

-spec terms()
   -> [{term(), [pid()]}].

terms() ->
    to_list(fun id/1).

swap({X,Y}) -> {Y,X}.

%% ----------------------------------------------------------
%% # init/1
%% ----------------------------------------------------------

init(_) ->
    ets:new(?TABLE, [bag, named_table]),
    {ok, #state{}}.

%% ----------------------------------------------------------
%% # handle_call/3
%% ----------------------------------------------------------

handle_call({add, Fun, Key, Pid}, _, S) ->
    B = Fun(?TABLE, {Key, Pid}),
    insert_monitor(B andalso no_monitor(Pid), Pid),
    {reply, B, pending(B, S)};

handle_call({del, Key, Pid}, _, S) ->
    {reply, ets:delete_object(?TABLE, ?MAPPING(Key, Pid)), S};

handle_call({wait, Pat}, From, #state{q = Q} = S) ->
    case find(Pat) of
        {ok, L} ->
            {reply, L, S};
        false ->
            {noreply, S#state{q = [{From, Pat} | Q]}}
    end;

handle_call(state, _, S) ->
    {reply, S, S};

handle_call(uptime, _, #state{id = Time} = S) ->
    {reply, diameter_lib:now_diff(Time), S};

handle_call(_Req, _From, S) ->
    {reply, nok, S}.

%% ----------------------------------------------------------
%% # handle_cast/2
%% ----------------------------------------------------------

handle_cast(_Msg, S)->
    {noreply, S}.

%% ----------------------------------------------------------
%% # handle_info/2
%% ----------------------------------------------------------

handle_info({'DOWN', MRef, process, Pid, _}, S) ->
    ets:delete_object(?TABLE, ?MONITOR(Pid, MRef)),
    ets:match_delete(?TABLE, ?MAPPING('_', Pid)),
    {noreply, S};

handle_info(_Info, S) ->
    {noreply, S}.

%% ----------------------------------------------------------
%% # terminate/2
%% ----------------------------------------------------------

terminate(_Reason, _State)->
    ok.

%% ----------------------------------------------------------
%% # code_change/3
%% ----------------------------------------------------------

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%% ===========================================================================

insert_monitor(B, Pid) ->
    B andalso ets:insert(?TABLE, ?MONITOR(Pid, monitor(process, Pid))).

%% Do we need a monitor for the specified Pid?
no_monitor(Pid) ->
    [] == ets:match_object(?TABLE, ?MONITOR(Pid, '_')).

%% insert_new/2

insert_new(?TABLE, {Key, _} = T) ->
    flush(ets:lookup(?TABLE, Key)),
    ets:insert_new(?TABLE, T).

%% Remove any processes that are dead but for which we may not have
%% received 'DOWN' yet. This is to ensure that add_new can be used
%% to register a unique name each time a process restarts.
flush(List) ->
    lists:foreach(fun({_,P} = T) ->
                          del(erlang:is_process_alive(P), T)
                  end,
                  List).

del(Alive, T) ->
    Alive orelse ets:delete_object(?TABLE, T).

%% repl/3

repl([?MAPPING(_, Pid) = M], Key, Pid) ->
    ets:delete_object(?TABLE, M),
    true = ets:insert(?TABLE, ?MAPPING(Key, Pid));
repl([], _, _) ->
    false.

%% pending/1

pending(true, #state{q = [_|_] = Q} = S) ->
    S#state{q = q(lists:reverse(Q), [])}; %% retain reply order
pending(_, S) ->
    S.

q([], Q) ->
    Q;
q([{From, Pat} = T | Rest], Q) ->
    case find(Pat) of
        {ok, L} ->
            gen_server:reply(From, L),
            q(Rest, Q);
        false ->
            q(Rest, [T|Q])
    end.

%% find/1

find(Pat) ->
    try match(Pat) of
        [] ->
            false;
        L ->
            {ok, L}
    catch
        _:_ ->
            {ok, []}
    end.

%% call/1

call(Request) ->
    gen_server:call(?SERVER, Request, infinity).