%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %% %% %CopyrightEnd% %% %% %% A simple term -> pid registry. %% -module(diameter_reg). -behaviour(gen_server). -export([add/1, add_new/1, remove/1, match/1, wait/1, subscribe/2]). -export([start_link/0]). %% gen_server callbacks -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]). %% test -export([pids/0, terms/0, subs/0, waits/0]). %% debug -export([state/0, uptime/0]). -define(SERVER, ?MODULE). -define(TABLE, ?MODULE). -type key() :: term(). -type from() :: {pid(), term()}. -type rcvr() :: [pid() | term()] %% subscribe | from(). %% wait -type pattern() :: term(). -record(state, {id = diameter_lib:now(), notify = #{} :: #{pattern() => [rcvr()]}, monitors = sets:new() :: sets:set(pid())}). %% The ?TABLE bag contains the Key -> Pid mapping, as {Key, Pid} %% tuples. Each pid is stored in the monitors set to ensure only one %% monitor for each pid: more are harmless, but unnecessary. A pattern %% is added to notify a result of calls to wait/1 or subscribe/2: %% changes to ?TABLE causes processes to be notified as required. %% =========================================================================== %% # add(T) %% %% Associate the specified term with self(). The list of pids having %% this or other assocations can be retrieved using match/1. %% %% An association is removed when the calling process dies or as a %% result of calling remove/1. Adding the same term more than once is %% equivalent to adding it once. %% %% Note that since match/1 takes a pattern as argument, specifying a %% term that contains match variables is probably not a good idea %% =========================================================================== -spec add(key()) -> true. add(T) -> call({add, false, T}). %% =========================================================================== %% # add_new(T) %% %% Like add/1 but only one process is allowed to have the the %% association, false being returned if an association already exists. %% =========================================================================== -spec add_new(key()) -> boolean(). add_new(T) -> call({add, true, T}). %% =========================================================================== %% # remove(Term) %% %% Remove any existing association of Term with self(). %% =========================================================================== -spec remove(key()) -> true. remove(T) -> call({remove, T}). %% =========================================================================== %% # match(Pat) %% %% Return the list of associations whose Term, as specified to add/1 %% or add_new/1, matches the specified pattern. %% %% Note that there's no guarantee that the returned processes are %% still alive. (Although one that isn't will soon have its %% associations removed.) %% =========================================================================== -spec match(pattern()) -> [{key(), pid()}]. match(Pat) -> match(Pat, '_'). %% match/2 match(Pat, Pid) -> ets:match_object(?TABLE, {Pat, Pid}). %% =========================================================================== %% # wait(Pat) %% %% Like match/1 but return only when the result is non-empty or fails. %% It's up to the caller to ensure that the wait won't be forever. %% =========================================================================== -spec wait(pattern()) -> [{key(), pid()}]. wait(Pat) -> _ = match(Pat), %% ensure match can succeed call({wait, Pat}). %% =========================================================================== %% # subscribe(Pat, T) %% %% Like match/1, but additionally receive messages of the form %% {T, add|remove, {term(), pid()}} when associations are added %% or removed. %% =========================================================================== -spec subscribe(Pat :: any(), T :: term()) -> [{term(), pid()}]. subscribe(Pat, T) -> _ = match(Pat), %% ensure match can succeed call({subscribe, Pat, T}). %% =========================================================================== start_link() -> ServerName = {local, ?SERVER}, Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}], gen_server:start_link(ServerName, ?MODULE, [], Options). state() -> call(state). uptime() -> call(uptime). %% pids/0 -spec pids() -> [{pid(), [key()]}]. pids() -> append(ets:select(?TABLE, [{{'$1','$2'}, [], [{{'$2', '$1'}}]}])). append(Pairs) -> dict:to_list(lists:foldl(fun({K,V}, D) -> dict:append(K, V, D) end, dict:new(), Pairs)). %% terms/0 -spec terms() -> [{key(), [pid()]}]. terms() -> append(ets:tab2list(?TABLE)). %% subs/0 -spec subs() -> [{pattern(), [{pid(), term()}]}]. subs() -> #state{notify = Dict} = state(), [{K, Ts} || {K,Ps} <- maps:to_list(Dict), Ts <- [[{P,T} || [P|T] <- Ps]]]. %% waits/0 -spec waits() -> [{pattern(), [from()]}]. waits() -> #state{notify = Dict} = state(), [{K, Ts} || {K,Ps} <- maps:to_list(Dict), Ts <- [[T || {_,_} = T <- Ps]]]. %% ---------------------------------------------------------- %% # init/1 %% ---------------------------------------------------------- init(_) -> ets:new(?TABLE, [bag, named_table]), {ok, #state{}}. %% ---------------------------------------------------------- %% # handle_call/3 %% ---------------------------------------------------------- handle_call({add, Uniq, Key}, {Pid, _}, S) -> Rec = {Key, Pid}, NS = flush(Uniq, Rec, S), %% before insert {Res, New} = insert(Uniq, Rec), {reply, Res, notify(add, New andalso Rec, if New -> add_monitor(Pid, NS); true -> NS end)}; handle_call({remove, Key}, {Pid, _}, S) -> Rec = {Key, Pid}, {reply, true, try notify(remove, Rec, S) after ets:delete_object(?TABLE, Rec) end}; handle_call({wait, Pat}, {Pid, _} = From, S) -> NS = add_monitor(Pid, S), case match(Pat) of [_|_] = Recs -> {reply, Recs, NS}; [] -> {noreply, queue(Pat, From, NS)} end; handle_call({subscribe, Pat, T}, {Pid, _}, S) -> {reply, match(Pat), queue(Pat, [Pid | T], add_monitor(Pid, S))}; handle_call(state, _, S) -> {reply, S, S}; handle_call(uptime, _, #state{id = Time} = S) -> {reply, diameter_lib:now_diff(Time), S}; handle_call(_Req, _From, S) -> {reply, nok, S}. %% ---------------------------------------------------------- %% # handle_cast/2 %% ---------------------------------------------------------- handle_cast(_Msg, S)-> {noreply, S}. %% ---------------------------------------------------------- %% # handle_info/2 %% ---------------------------------------------------------- handle_info({'DOWN', _MRef, process, Pid, _}, S) -> {noreply, down(Pid, S)}; handle_info(_Info, S) -> {noreply, S}. %% ---------------------------------------------------------- %% # terminate/2 %% ---------------------------------------------------------- terminate(_Reason, _State)-> ok. %% ---------------------------------------------------------- %% # code_change/3 %% ---------------------------------------------------------- code_change(_, State, "2.1") -> {ok, lists:foldl(fun add_monitor/2, State, ets:select(?TABLE, [{{'_', '$1'}, [], ['$1']}]))}; code_change(_OldVsn, State, _Extra) -> {ok, State}. %% =========================================================================== %% insert/2 insert(false, Rec) -> Spec = [{'$1', [{'==', '$1', {const, Rec}}], ['$_']}], X = '$end_of_table' /= ets:select(?TABLE, Spec, 1), %% entry exists? X orelse ets:insert(?TABLE, Rec), {true, not X}; insert(true, Rec) -> B = ets:insert_new(?TABLE, Rec), %% entry inserted? {B, B}. %% add_monitor/2 %% %% Only add a single monitor for any given process, since there's no %% use to more. add_monitor(Pid, #state{monitors = Ps} = S) -> case sets:is_element(Pid, Ps) of false -> monitor(process, Pid), S#state{monitors = sets:add_element(Pid, Ps)}; true -> S end. %% notify/3 notify(_, false, S) -> S; notify(Op, {_,_} = Rec, #state{notify = Dict} = S) -> S#state{notify = maps:fold(fun(P,Rs,D) -> notify(Op, Rec, P, Rs, D) end, Dict, Dict)}. %% notify/5 notify(Op, {_, Pid} = Rec, Pat, Rcvrs, Dict) -> case lists:member(Rec, match(Pat, Pid)) of true -> reset(Pat, Dict, [P || P <- Rcvrs, send(P, Op, Rec)]); false -> Dict end. %% send/3 send([Pid | T], Op, Rec) -> Pid ! {T, Op, Rec}, true; %% No processes wait on remove: they receive notification immediately %% or at add, by construction. send({_,_} = From, add, Rec) -> gen_server:reply(From, [Rec]), false. %% down/2 down(Pid, #state{monitors = Ps} = S) -> Recs = match('_', Pid), Acc0 = flush(Pid, S#state{monitors = sets:del_element(Pid, Ps)}), try lists:foldl(fun(R,NS) -> notify(remove, R, NS) end, Acc0, Recs) after ets:match_delete(?TABLE, {'_', Pid}) end. %% flush/3 %% Remove any processes that are dead but for which we may not have %% received 'DOWN' yet, to ensure that add_new can be used to register %% a unique name each time a registering process restarts. flush(true, {Key, Pid}, S) -> Spec = [{{'$1', '$2'}, [{'andalso', {'==', '$1', {const, Key}}, {'/=', '$2', Pid}}], ['$2']}], lists:foldl(fun down/2, S, [P || P <- ets:select(?TABLE, Spec), not is_process_alive(P)]); flush(false, _, S) -> S. %% flush/2 %% Process has died and should no longer receive messages/replies. flush(Pid, #state{notify = Dict} = S) -> S#state{notify = maps:fold(fun(P,Rs,D) -> flush(Pid, P, Rs, D) end, Dict, Dict)}. %% flush/4 flush(Pid, Pat, Rcvrs, Dict) -> reset(Pat, Dict, [T || T <- Rcvrs, Pid /= head(T)]). %% head/1 head([P|_]) -> P; head({P,_}) -> P. %% reset/3 reset(Key, Map, []) -> maps:remove(Key, Map); reset(Key, Map, List) -> maps:put(Key, List, Map). %% queue/3 queue(Pat, Rcvr, #state{notify = Dict} = S) -> S#state{notify = maps:put(Pat, [Rcvr | maps:get(Pat, Dict, [])], Dict)}. %% call/1 call(Request) -> gen_server:call(?SERVER, Request, infinity).