%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
%%
%% The module implements a simple term -> pid registry.
%%
-module(diameter_reg).
-behaviour(gen_server).
-export([add/1,
add_new/1,
remove/1,
match/1,
wait/1,
subscribe/2]).
-export([start_link/0]).
%% gen_server callbacks
-export([init/1,
terminate/2,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3]).
%% test
-export([pids/0,
terms/0,
subs/0,
waits/0]).
%% debug
-export([state/0,
uptime/0]).
-define(SERVER, ?MODULE).
-define(TABLE, ?MODULE).
-type key() :: term().
-type from() :: {pid(), term()}.
-type pattern() :: term().
-record(state, {id = diameter_lib:now(),
receivers = dict:new()
:: dict:dict(pattern(), [[pid() | term()]%% subscribe
| from()]), %% wait
monitors = sets:new() :: sets:set(pid())}).
%% The ?TABLE bag contains the Key -> Pid mapping, as {Key, Pid}
%% tuples. Each pid is stored in the monitors set to ensure only one
%% monitor for each pid: more are harmless, but unnecessary. A pattern
%% is added to receivers a result of calls to wait/1 or subscribe/2:
%% changes to ?TABLE causes processes to be notified as required.
%% ===========================================================================
%% # add(T)
%%
%% Associate the specified term with self(). The list of pids having
%% this or other assocations can be retrieved using match/1.
%%
%% An association is removed when the calling process dies or as a
%% result of calling remove/1. Adding the same term more than once is
%% equivalent to adding it once.
%%
%% Note that since match/1 takes a pattern as argument, specifying a
%% term that contains match variables is probably not a good idea
%% ===========================================================================
-spec add(key())
-> true.
add(T) ->
call({add, false, T}).
%% ===========================================================================
%% # add_new(T)
%%
%% Like add/1 but only one process is allowed to have the the
%% association, false being returned if an association already exists.
%% ===========================================================================
-spec add_new(key())
-> boolean().
add_new(T) ->
call({add, true, T}).
%% ===========================================================================
%% # remove(Term)
%%
%% Remove any existing association of Term with self().
%% ===========================================================================
-spec remove(key())
-> true.
remove(T) ->
call({remove, T}).
%% ===========================================================================
%% # match(Pat)
%%
%% Return the list of associations whose Term, as specified to add/1
%% or add_new/1, matches the specified pattern.
%%
%% Note that there's no guarantee that the returned processes are
%% still alive. (Although one that isn't will soon have its
%% associations removed.)
%% ===========================================================================
-spec match(pattern())
-> [{key(), pid()}].
match(Pat) ->
match(Pat, '_').
%% match/2
match(Pat, Pid) ->
ets:match_object(?TABLE, {Pat, Pid}).
%% ===========================================================================
%% # wait(Pat)
%%
%% Like match/1 but return only when the result is non-empty or fails.
%% It's up to the caller to ensure that the wait won't be forever.
%% ===========================================================================
-spec wait(pattern())
-> [{key(), pid()}].
wait(Pat) ->
_ = match(Pat), %% ensure match can succeed
call({wait, Pat}).
%% ===========================================================================
%% # subscribe(Pat, T)
%%
%% Like match/1, but additionally receive messages of the form
%% {T, add|remove, {term(), pid()} when associations are added
%% or removed.
%% ===========================================================================
-spec subscribe(Pat :: any(), T :: term())
-> [{term(), pid()}].
subscribe(Pat, T) ->
_ = match(Pat), %% ensure match can succeed
call({subscribe, Pat, T}).
%% ===========================================================================
start_link() ->
ServerName = {local, ?SERVER},
Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}],
gen_server:start_link(ServerName, ?MODULE, [], Options).
state() ->
call(state).
uptime() ->
call(uptime).
%% pids/0
-spec pids()
-> [{pid(), [key()]}].
pids() ->
to_list(fun swap/1).
to_list(Fun) ->
ets:foldl(fun(T,D) -> append(Fun(T), D) end, orddict:new(), ?TABLE).
append({K,V}, Dict) ->
orddict:append(K, V, Dict).
id(T) -> T.
%% terms/0
-spec terms()
-> [{key(), [pid()]}].
terms() ->
to_list(fun id/1).
swap({X,Y}) -> {Y,X}.
%% subs/0
-spec subs()
-> [{pattern(), [{pid(), term()}]}].
subs() ->
#state{receivers = RD} = state(),
dict:fold(fun sub/3, orddict:new(), RD).
sub(Pat, Ps, Dict) ->
lists:foldl(fun([P|T], D) -> orddict:append(Pat, {P,T}, D);
(_, D) -> D
end,
Dict,
Ps).
%% waits/0
-spec waits()
-> [{pattern(), [{from(), term()}]}].
waits() ->
#state{receivers = RD} = state(),
dict:fold(fun wait/3, orddict:new(), RD).
wait(Pat, Ps, Dict) ->
lists:foldl(fun({_,_} = F, D) -> orddict:append(Pat, F, D);
(_, D) -> D
end,
Dict,
Ps).
%% ----------------------------------------------------------
%% # init/1
%% ----------------------------------------------------------
init(_) ->
ets:new(?TABLE, [bag, named_table]),
{ok, #state{}}.
%% ----------------------------------------------------------
%% # handle_call/3
%% ----------------------------------------------------------
handle_call({add, Uniq, Key}, {Pid, _}, S0) ->
Rec = {Key, Pid},
S1 = flush(Uniq, Rec, S0),
{Res, New} = insert(Uniq, Rec),
{Recvs, S} = add(New, Rec, S1),
notify(Recvs, Rec),
{reply, Res, S};
handle_call({remove, Key}, {Pid, _}, S) ->
Rec = {Key, Pid},
Recvs = delete([Rec], S),
ets:delete_object(?TABLE, Rec),
notify(Recvs, remove),
{reply, true, S};
handle_call({wait, Pat}, {Pid, _} = From, #state{receivers = RD} = S) ->
NS = add_monitor(Pid, S),
case match(Pat) of
[_|_] = L ->
{reply, L, NS};
[] ->
{noreply, NS#state{receivers = dict:append(Pat, From, RD)}}
end;
handle_call({subscribe, Pat, T}, {Pid, _}, #state{receivers = RD} = S) ->
NS = add_monitor(Pid, S),
{reply, match(Pat), NS#state{receivers = dict:append(Pat, [Pid | T], RD)}};
handle_call(state, _, S) ->
{reply, S, S};
handle_call(uptime, _, #state{id = Time} = S) ->
{reply, diameter_lib:now_diff(Time), S};
handle_call(_Req, _From, S) ->
{reply, nok, S}.
%% ----------------------------------------------------------
%% # handle_cast/2
%% ----------------------------------------------------------
handle_cast(_Msg, S)->
{noreply, S}.
%% ----------------------------------------------------------
%% # handle_info/2
%% ----------------------------------------------------------
handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
{noreply, down(Pid, S)};
handle_info(_Info, S) ->
{noreply, S}.
%% ----------------------------------------------------------
%% # terminate/2
%% ----------------------------------------------------------
terminate(_Reason, _State)->
ok.
%% ----------------------------------------------------------
%% # code_change/3
%% ----------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ===========================================================================
%% insert/2
insert(false, Rec) ->
Spec = [{'$1', [{'==', '$1', {const, Rec}}], ['$_']}],
X = '$end_of_table' /= ets:select(?TABLE, Spec, 1), %% entry exists?
X orelse ets:insert(?TABLE, Rec),
{true, not X};
insert(true, Rec) ->
B = ets:insert_new(?TABLE, Rec), %% entry inserted?
{B, B}.
%% add/3
%% Only add a single monitor for any given process, since there's no
%% use to more.
add(true, {_Key, Pid} = Rec, S) ->
NS = add_monitor(Pid, S),
{Recvs, RD} = add(Rec, NS),
{Recvs, S#state{receivers = RD}};
add(false = No, _, S) ->
{No, S}.
%% add/2
%% Notify processes whose patterns match the inserted key.
add({_Key, Pid} = Rec, #state{receivers = RD}) ->
dict:fold(fun(Pt, Ps, A) ->
add(lists:member(Rec, match(Pt, Pid)), Pt, Ps, Rec, A)
end,
{sets:new(), RD},
RD).
%% add/5
add(true, Pat, Recvs, {_,_} = Rec, {Set, Dict}) ->
{lists:foldl(fun sets:add_element/2, Set, Recvs),
remove(fun erlang:is_list/1, Pat, Recvs, Dict)};
add(false, _, _, _, Acc) ->
Acc.
%% add_monitor/2
add_monitor(Pid, #state{monitors = MS} = S) ->
add_monitor(sets:is_element(Pid, MS), Pid, S).
%% add_monitor/3
add_monitor(false, Pid, #state{monitors = MS} = S) ->
monitor(process, Pid),
S#state{monitors = sets:add_element(Pid, MS)};
add_monitor(true, _, S) ->
S.
%% delete/2
delete(Recs, #state{receivers = RD}) ->
lists:foldl(fun(R,S) -> delete(R, RD, S) end, sets:new(), Recs).
%% delete/3
delete({_Key, Pid} = Rec, RD, Set) ->
dict:fold(fun(Pt, Ps, S) ->
delete(lists:member(Rec, match(Pt, Pid)), Rec, Ps, S)
end,
Set,
RD).
%% delete/4
%% Entry matches a pattern ...
delete(true, Rec, Recvs, Set) ->
lists:foldl(fun(R,S) -> sets:add_element({R, Rec}, S) end,
Set,
Recvs);
%% ... or not.
delete(false, _, _, Set) ->
Set.
%% notify/2
notify(false = No, _) ->
No;
notify(Recvs, remove = Op) ->
sets:fold(fun({P,R}, N) -> send(P, R, Op), N+1 end, 0, Recvs);
notify(Recvs, {_,_} = Rec) ->
sets:fold(fun(P,N) -> send(P, Rec, add), N+1 end, 0, Recvs).
%% send/3
%% No processes waiting on remove, by construction: they've either
%% received notification at add or aren't waiting.
send([Pid | T], Rec, Op) ->
Pid ! {T, Op, Rec};
send({_,_} = From, Rec, add) ->
gen_server:reply(From, [Rec]).
%% down/2
down(Pid, #state{monitors = MS} = S) ->
NS = flush(Pid, S),
Recvs = delete(match('_', Pid), NS),
ets:match_delete(?TABLE, {'_', Pid}),
notify(Recvs, remove),
NS#state{monitors = sets:del_element(Pid, MS)}.
%% flush/3
%% Remove any processes that are dead but for which we may not have
%% received 'DOWN' yet, to ensure that add_new can be used to register
%% a unique name each time a registering process restarts.
flush(true, {Key, Pid}, S) ->
Spec = [{{'$1', '$2'},
[{'andalso', {'==', '$1', {const, Key}},
{'/=', '$2', Pid}}],
['$2']}],
lists:foldl(fun down/2, S, [P || P <- ets:select(?TABLE, Spec),
not is_process_alive(P)]);
flush(false, _, S) ->
S.
%% flush/2
%% Process has died and should no longer receive messages/replies.
flush(Pid, #state{receivers = RD} = S)
when is_pid(Pid) ->
S#state{receivers = dict:fold(fun(Pt,Ps,D) -> flush(Pid, Pt, Ps, D) end,
RD,
RD)}.
%% flush/4
flush(Pid, Pat, Recvs, Dict) ->
remove(fun(T) -> Pid /= head(T) end, Pat, Recvs, Dict).
%% head/1
head([P|_]) ->
P;
head({P,_}) ->
P.
%% remove/4
remove(Pred, Key, Values, Dict) ->
case lists:filter(Pred, Values) of
[] ->
dict:erase(Key, Dict);
Rest ->
dict:store(Key, Rest, Dict)
end.
%% call/1
call(Request) ->
gen_server:call(?SERVER, Request, infinity).