aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2016-05-20 07:29:39 +0200
committerAnders Svensson <[email protected]>2016-05-30 08:54:41 +0200
commit5809199289664e30df9ae418efb4df9bc9d73cd4 (patch)
treea6cdf6c180504a0277e10faf3630c0ef2db039aa
parentbc1ab9f2f1a22a874415260a5d099281c35ce3d5 (diff)
downloadotp-5809199289664e30df9ae418efb4df9bc9d73cd4.tar.gz
otp-5809199289664e30df9ae418efb4df9bc9d73cd4.tar.bz2
otp-5809199289664e30df9ae418efb4df9bc9d73cd4.zip
Add diameter_reg:subscribe/2
To allow processes to subscribe to a message when a matching association is added or removed. The intention is to use this in diameter_{tcp,sctp}, in order for listening processes to find out when transport their transport configuration has been removed.
-rw-r--r--lib/diameter/src/base/diameter_reg.erl373
1 files changed, 267 insertions, 106 deletions
diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl
index db01e17c86..4dba261a8b 100644
--- a/lib/diameter/src/base/diameter_reg.erl
+++ b/lib/diameter/src/base/diameter_reg.erl
@@ -29,7 +29,8 @@
add_new/1,
del/1,
match/1,
- wait/1]).
+ wait/1,
+ subscribe/2]).
-export([start_link/0]).
@@ -43,7 +44,9 @@
%% test
-export([pids/0,
- terms/0]).
+ terms/0,
+ subs/0,
+ waits/0]).
%% debug
-export([state/0,
@@ -52,18 +55,21 @@
-define(SERVER, ?MODULE).
-define(TABLE, ?MODULE).
-%% Table entry used to keep from starting more than one monitor on the
-%% same process. This isn't a problem but there's no point in starting
-%% multiple monitors if we can avoid it. Note that we can't have a 2-tuple
-%% keyed on Pid since a registered term can be anything. Want the entry
-%% keyed on Pid so that lookup is fast.
--define(MONITOR(Pid, MRef), {Pid, monitor, MRef}).
-
-%% Table entry containing the Term -> Pid mapping.
--define(MAPPING(Term, Pid), {Term, Pid}).
+-type key() :: term().
+-type from() :: {pid(), term()}.
+-type pattern() :: term().
-record(state, {id = diameter_lib:now(),
- q = []}). %% [{From, Pat}]
+ receivers = dict:new()
+ :: dict:dict(pattern(), [[pid() | term()]%% subscribe
+ | from()]), %% wait
+ monitors = sets:new() :: sets:set(pid())}).
+
+%% The ?TABLE bag contains the Key -> Pid mapping, as {Key, Pid}
+%% tuples. Each pid is stored in the monitors set to ensure only one
+%% monitor for each pid: more are harmless, but unnecessary. A pattern
+%% is added to receivers a result of calls to wait/1 or subscribe/2:
+%% changes to ?TABLE causes processes to be notified as required.
%% ===========================================================================
%% # add(T)
@@ -73,17 +79,17 @@
%%
%% An association is removed when the calling process dies or as a
%% result of calling del/1. Adding the same term more than once is
-%% equivalent to adding it exactly once.
+%% equivalent to adding it once.
%%
%% Note that since match/1 takes a pattern as argument, specifying a
%% term that contains match variables is probably not a good idea
%% ===========================================================================
--spec add(any())
+-spec add(key())
-> true.
add(T) ->
- call({add, fun ets:insert/2, T, self()}).
+ call({add, false, T}).
%% ===========================================================================
%% # add_new(T)
@@ -92,11 +98,11 @@ add(T) ->
%% association, false being returned if an association already exists.
%% ===========================================================================
--spec add_new(any())
+-spec add_new(key())
-> boolean().
add_new(T) ->
- call({add, fun insert_new/2, T, self()}).
+ call({add, true, T}).
%% ===========================================================================
%% # del(Term)
@@ -104,11 +110,11 @@ add_new(T) ->
%% Remove any existing association of Term with self().
%% ===========================================================================
--spec del(any())
+-spec del(key())
-> true.
del(T) ->
- call({del, T, self()}).
+ call({del, T}).
%% ===========================================================================
%% # match(Pat)
@@ -121,12 +127,17 @@ del(T) ->
%% associations removed.)
%% ===========================================================================
--spec match(any())
- -> [{term(), pid()}].
+-spec match(pattern())
+ -> [{key(), pid()}].
match(Pat) ->
- ets:match_object(?TABLE, ?MAPPING(Pat, '_')).
+ match(Pat, '_').
+
+%% match/2
+match(Pat, Pid) ->
+ ets:match_object(?TABLE, {Pat, Pid}).
+
%% ===========================================================================
%% # wait(Pat)
%%
@@ -134,13 +145,29 @@ match(Pat) ->
%% It's up to the caller to ensure that the wait won't be forever.
%% ===========================================================================
--spec wait(any())
- -> [{term(), pid()}].
+-spec wait(pattern())
+ -> [{key(), pid()}].
wait(Pat) ->
+ _ = match(Pat), %% ensure match can succeed
call({wait, Pat}).
%% ===========================================================================
+%% # subscribe(Pat, T)
+%%
+%% Like match/1, but additionally receive messages of the form
+%% {T, add|del, {term(), pid()} when associations are added
+%% or removed.
+%% ===========================================================================
+
+-spec subscribe(Pat :: any(), T :: term())
+ -> [{term(), pid()}].
+
+subscribe(Pat, T) ->
+ _ = match(Pat), %% ensure match can succeed
+ call({subscribe, Pat, T}).
+
+%% ===========================================================================
start_link() ->
ServerName = {local, ?SERVER},
@@ -156,18 +183,13 @@ uptime() ->
%% pids/0
-spec pids()
- -> [{pid(), [term()]}].
+ -> [{pid(), [key()]}].
pids() ->
to_list(fun swap/1).
to_list(Fun) ->
- ets:foldl(fun(T,A) -> acc(Fun, T, A) end, orddict:new(), ?TABLE).
-
-acc(Fun, ?MAPPING(Term, Pid), Dict) ->
- append(Fun({Term, Pid}), Dict);
-acc(_, _, Dict) ->
- Dict.
+ ets:foldl(fun(T,D) -> append(Fun(T), D) end, orddict:new(), ?TABLE).
append({K,V}, Dict) ->
orddict:append(K, V, Dict).
@@ -177,13 +199,45 @@ id(T) -> T.
%% terms/0
-spec terms()
- -> [{term(), [pid()]}].
+ -> [{key(), [pid()]}].
terms() ->
to_list(fun id/1).
swap({X,Y}) -> {Y,X}.
+%% subs/0
+
+-spec subs()
+ -> [{pattern(), [{pid(), term()}]}].
+
+subs() ->
+ #state{receivers = RD} = state(),
+ dict:fold(fun sub/3, orddict:new(), RD).
+
+sub(Pat, Ps, Dict) ->
+ lists:foldl(fun([P|T], D) -> orddict:append(Pat, {P,T}, D);
+ (_, D) -> D
+ end,
+ Dict,
+ Ps).
+
+%% waits/0
+
+-spec waits()
+ -> [{pattern(), [{from(), term()}]}].
+
+waits() ->
+ #state{receivers = RD} = state(),
+ dict:fold(fun wait/3, orddict:new(), RD).
+
+wait(Pat, Ps, Dict) ->
+ lists:foldl(fun({_,_} = F, D) -> orddict:append(Pat, F, D);
+ (_, D) -> D
+ end,
+ Dict,
+ Ps).
+
%% ----------------------------------------------------------
%% # init/1
%% ----------------------------------------------------------
@@ -196,22 +250,34 @@ init(_) ->
%% # handle_call/3
%% ----------------------------------------------------------
-handle_call({add, Fun, Key, Pid}, _, S) ->
- B = Fun(?TABLE, {Key, Pid}),
- insert_monitor(B andalso no_monitor(Pid), Pid),
- {reply, B, pending(B, S)};
-
-handle_call({del, Key, Pid}, _, S) ->
- {reply, ets:delete_object(?TABLE, ?MAPPING(Key, Pid)), S};
-
-handle_call({wait, Pat}, From, #state{q = Q} = S) ->
- case find(Pat) of
- {ok, L} ->
- {reply, L, S};
- false ->
- {noreply, S#state{q = [{From, Pat} | Q]}}
+handle_call({add, Uniq, Key}, {Pid, _}, S0) ->
+ Rec = {Key, Pid},
+ S1 = flush(Uniq, Rec, S0),
+ {Res, New} = insert(Uniq, Rec),
+ {Recvs, S} = add(New, Rec, S1),
+ notify(Recvs, Rec),
+ {reply, Res, S};
+
+handle_call({del, Key}, {Pid, _}, S) ->
+ Rec = {Key, Pid},
+ Recvs = delete([Rec], S),
+ ets:delete_object(?TABLE, Rec),
+ notify(Recvs, del),
+ {reply, true, S};
+
+handle_call({wait, Pat}, {Pid, _} = From, #state{receivers = RD} = S) ->
+ NS = add_monitor(Pid, S),
+ case match(Pat) of
+ [_|_] = L ->
+ {reply, L, NS};
+ [] ->
+ {noreply, NS#state{receivers = dict:append(Pat, From, RD)}}
end;
+handle_call({subscribe, Pat, T}, {Pid, _}, #state{receivers = RD} = S) ->
+ NS = add_monitor(Pid, S),
+ {reply, match(Pat), NS#state{receivers = dict:append(Pat, [Pid | T], RD)}};
+
handle_call(state, _, S) ->
{reply, S, S};
@@ -232,10 +298,8 @@ handle_cast(_Msg, S)->
%% # handle_info/2
%% ----------------------------------------------------------
-handle_info({'DOWN', MRef, process, Pid, _}, S) ->
- ets:delete_object(?TABLE, ?MONITOR(Pid, MRef)),
- ets:match_delete(?TABLE, ?MAPPING('_', Pid)),
- {noreply, S};
+handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
+ {noreply, down(Pid, S)};
handle_info(_Info, S) ->
{noreply, S}.
@@ -256,69 +320,166 @@ code_change(_OldVsn, State, _Extra) ->
%% ===========================================================================
-insert_monitor(B, Pid) ->
- B andalso ets:insert(?TABLE, ?MONITOR(Pid, monitor(process, Pid))).
+%% insert/2
+
+insert(false, Rec) ->
+ Spec = [{'$1', [{'==', '$1', {const, Rec}}], ['$_']}],
+ X = '$end_of_table' /= ets:select(?TABLE, Spec, 1), %% entry exists?
+ X orelse ets:insert(?TABLE, Rec),
+ {true, not X};
+
+insert(true, Rec) ->
+ B = ets:insert_new(?TABLE, Rec), %% entry inserted?
+ {B, B}.
+
+%% add/3
+
+%% Only add a single monitor for any given process, since there's no
+%% use to more.
+add(true, {_Key, Pid} = Rec, S) ->
+ NS = add_monitor(Pid, S),
+ {Recvs, RD} = add(Rec, NS),
+ {Recvs, S#state{receivers = RD}};
+
+add(false = No, _, S) ->
+ {No, S}.
+
+%% add/2
+
+%% Notify processes whose patterns match the inserted key.
+add({_Key, Pid} = Rec, #state{receivers = RD}) ->
+ dict:fold(fun(Pt, Ps, A) ->
+ add(lists:member(Rec, match(Pt, Pid)), Pt, Ps, Rec, A)
+ end,
+ {sets:new(), RD},
+ RD).
-%% Do we need a monitor for the specified Pid?
-no_monitor(Pid) ->
- [] == ets:match_object(?TABLE, ?MONITOR(Pid, '_')).
+%% add/5
-%% insert_new/2
+add(true, Pat, Recvs, {_,_} = Rec, {Set, Dict}) ->
+ {lists:foldl(fun sets:add_element/2, Set, Recvs),
+ remove(fun erlang:is_list/1, Pat, Recvs, Dict)};
-insert_new(?TABLE, {Key, _} = T) ->
- flush(ets:lookup(?TABLE, Key)),
- ets:insert_new(?TABLE, T).
+add(false, _, _, _, Acc) ->
+ Acc.
+
+%% add_monitor/2
+
+add_monitor(Pid, #state{monitors = MS} = S) ->
+ add_monitor(sets:is_element(Pid, MS), Pid, S).
+
+%% add_monitor/3
+
+add_monitor(false, Pid, #state{monitors = MS} = S) ->
+ monitor(process, Pid),
+ S#state{monitors = sets:add_element(Pid, MS)};
+
+add_monitor(true, _, S) ->
+ S.
+
+%% delete/2
+
+delete(Recs, #state{receivers = RD}) ->
+ lists:foldl(fun(R,S) -> delete(R, RD, S) end, sets:new(), Recs).
+
+%% delete/3
+
+delete({_Key, Pid} = Rec, RD, Set) ->
+ dict:fold(fun(Pt, Ps, S) ->
+ delete(lists:member(Rec, match(Pt, Pid)), Rec, Ps, S)
+ end,
+ Set,
+ RD).
+
+%% delete/4
+
+%% Entry matches a pattern ...
+delete(true, Rec, Recvs, Set) ->
+ lists:foldl(fun(R,S) -> sets:add_element({R, Rec}, S) end,
+ Set,
+ Recvs);
+
+%% ... or not.
+delete(false, _, _, Set) ->
+ Set.
+
+%% notify/2
+
+notify(false = No, _) ->
+ No;
+
+notify(Recvs, del = Op) ->
+ sets:fold(fun({P,R}, N) -> send(P, R, Op), N+1 end, 0, Recvs);
+
+notify(Recvs, {_,_} = Rec) ->
+ sets:fold(fun(P,N) -> send(P, Rec, add), N+1 end, 0, Recvs).
+
+%% send/3
+
+%% No processes waiting on del, by construction: they've either
+%% received notification at add or aren't waiting.
+send([Pid | T], Rec, Op) ->
+ Pid ! {T, Op, Rec};
+
+send({_,_} = From, Rec, add) ->
+ gen_server:reply(From, [Rec]).
+
+%% down/2
+
+down(Pid, #state{monitors = MS} = S) ->
+ NS = flush(Pid, S),
+ Recvs = delete(match('_', Pid), NS),
+ ets:match_delete(?TABLE, {'_', Pid}),
+ notify(Recvs, del),
+ NS#state{monitors = sets:del_element(Pid, MS)}.
+
+%% flush/3
%% Remove any processes that are dead but for which we may not have
-%% received 'DOWN' yet. This is to ensure that add_new can be used
-%% to register a unique name each time a process restarts.
-flush(List) ->
- lists:foreach(fun({_,P} = T) ->
- del(erlang:is_process_alive(P), T)
- end,
- List).
-
-del(Alive, T) ->
- Alive orelse ets:delete_object(?TABLE, T).
-
-%% repl/3
-
-repl([?MAPPING(_, Pid) = M], Key, Pid) ->
- ets:delete_object(?TABLE, M),
- true = ets:insert(?TABLE, ?MAPPING(Key, Pid));
-repl([], _, _) ->
- false.
-
-%% pending/1
-
-pending(true, #state{q = [_|_] = Q} = S) ->
- S#state{q = q(lists:reverse(Q), [])}; %% retain reply order
-pending(_, S) ->
+%% received 'DOWN' yet, to ensure that add_new can be used to register
+%% a unique name each time a registering process restarts.
+flush(true, {Key, Pid}, S) ->
+ Spec = [{{'$1', '$2'},
+ [{'andalso', {'==', '$1', {const, Key}},
+ {'/=', '$2', Pid}}],
+ ['$2']}],
+ lists:foldl(fun down/2, S, [P || P <- ets:select(?TABLE, Spec),
+ not is_process_alive(P)]);
+
+flush(false, _, S) ->
S.
-q([], Q) ->
- Q;
-q([{From, Pat} = T | Rest], Q) ->
- case find(Pat) of
- {ok, L} ->
- gen_server:reply(From, L),
- q(Rest, Q);
- false ->
- q(Rest, [T|Q])
- end.
-
-%% find/1
-
-find(Pat) ->
- try match(Pat) of
- [] ->
- false;
- L ->
- {ok, L}
- catch
- _:_ ->
- {ok, []}
- end.
+%% flush/2
+
+%% Process has died and should no longer receive messages/replies.
+flush(Pid, #state{receivers = RD} = S)
+ when is_pid(Pid) ->
+ S#state{receivers = dict:fold(fun(Pt,Ps,D) -> flush(Pid, Pt, Ps, D) end,
+ RD,
+ RD)}.
+
+%% flush/4
+
+flush(Pid, Pat, Recvs, Dict) ->
+ remove(fun(T) -> Pid /= head(T) end, Pat, Recvs, Dict).
+
+%% head/1
+
+head([P|_]) ->
+ P;
+
+head({P,_}) ->
+ P.
+
+%% remove/4
+
+remove(Pred, Key, Values, Dict) ->
+ case lists:filter(Pred, Values) of
+ [] ->
+ dict:erase(Key, Dict);
+ Rest ->
+ dict:store(Key, Rest, Dict)
+ end.
%% call/1