diff options
author | Anders Svensson <[email protected]> | 2016-05-20 07:29:39 +0200 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2016-05-30 08:54:41 +0200 |
commit | 5809199289664e30df9ae418efb4df9bc9d73cd4 (patch) | |
tree | a6cdf6c180504a0277e10faf3630c0ef2db039aa | |
parent | bc1ab9f2f1a22a874415260a5d099281c35ce3d5 (diff) | |
download | otp-5809199289664e30df9ae418efb4df9bc9d73cd4.tar.gz otp-5809199289664e30df9ae418efb4df9bc9d73cd4.tar.bz2 otp-5809199289664e30df9ae418efb4df9bc9d73cd4.zip |
Add diameter_reg:subscribe/2
To allow processes to subscribe to a message when a matching association
is added or removed. The intention is to use this in
diameter_{tcp,sctp}, in order for listening processes to find out when
transport their transport configuration has been removed.
-rw-r--r-- | lib/diameter/src/base/diameter_reg.erl | 373 |
1 files changed, 267 insertions, 106 deletions
diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index db01e17c86..4dba261a8b 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -29,7 +29,8 @@ add_new/1, del/1, match/1, - wait/1]). + wait/1, + subscribe/2]). -export([start_link/0]). @@ -43,7 +44,9 @@ %% test -export([pids/0, - terms/0]). + terms/0, + subs/0, + waits/0]). %% debug -export([state/0, @@ -52,18 +55,21 @@ -define(SERVER, ?MODULE). -define(TABLE, ?MODULE). -%% Table entry used to keep from starting more than one monitor on the -%% same process. This isn't a problem but there's no point in starting -%% multiple monitors if we can avoid it. Note that we can't have a 2-tuple -%% keyed on Pid since a registered term can be anything. Want the entry -%% keyed on Pid so that lookup is fast. --define(MONITOR(Pid, MRef), {Pid, monitor, MRef}). - -%% Table entry containing the Term -> Pid mapping. --define(MAPPING(Term, Pid), {Term, Pid}). +-type key() :: term(). +-type from() :: {pid(), term()}. +-type pattern() :: term(). -record(state, {id = diameter_lib:now(), - q = []}). %% [{From, Pat}] + receivers = dict:new() + :: dict:dict(pattern(), [[pid() | term()]%% subscribe + | from()]), %% wait + monitors = sets:new() :: sets:set(pid())}). + +%% The ?TABLE bag contains the Key -> Pid mapping, as {Key, Pid} +%% tuples. Each pid is stored in the monitors set to ensure only one +%% monitor for each pid: more are harmless, but unnecessary. A pattern +%% is added to receivers a result of calls to wait/1 or subscribe/2: +%% changes to ?TABLE causes processes to be notified as required. %% =========================================================================== %% # add(T) @@ -73,17 +79,17 @@ %% %% An association is removed when the calling process dies or as a %% result of calling del/1. Adding the same term more than once is -%% equivalent to adding it exactly once. +%% equivalent to adding it once. %% %% Note that since match/1 takes a pattern as argument, specifying a %% term that contains match variables is probably not a good idea %% =========================================================================== --spec add(any()) +-spec add(key()) -> true. add(T) -> - call({add, fun ets:insert/2, T, self()}). + call({add, false, T}). %% =========================================================================== %% # add_new(T) @@ -92,11 +98,11 @@ add(T) -> %% association, false being returned if an association already exists. %% =========================================================================== --spec add_new(any()) +-spec add_new(key()) -> boolean(). add_new(T) -> - call({add, fun insert_new/2, T, self()}). + call({add, true, T}). %% =========================================================================== %% # del(Term) @@ -104,11 +110,11 @@ add_new(T) -> %% Remove any existing association of Term with self(). %% =========================================================================== --spec del(any()) +-spec del(key()) -> true. del(T) -> - call({del, T, self()}). + call({del, T}). %% =========================================================================== %% # match(Pat) @@ -121,12 +127,17 @@ del(T) -> %% associations removed.) %% =========================================================================== --spec match(any()) - -> [{term(), pid()}]. +-spec match(pattern()) + -> [{key(), pid()}]. match(Pat) -> - ets:match_object(?TABLE, ?MAPPING(Pat, '_')). + match(Pat, '_'). + +%% match/2 +match(Pat, Pid) -> + ets:match_object(?TABLE, {Pat, Pid}). + %% =========================================================================== %% # wait(Pat) %% @@ -134,13 +145,29 @@ match(Pat) -> %% It's up to the caller to ensure that the wait won't be forever. %% =========================================================================== --spec wait(any()) - -> [{term(), pid()}]. +-spec wait(pattern()) + -> [{key(), pid()}]. wait(Pat) -> + _ = match(Pat), %% ensure match can succeed call({wait, Pat}). %% =========================================================================== +%% # subscribe(Pat, T) +%% +%% Like match/1, but additionally receive messages of the form +%% {T, add|del, {term(), pid()} when associations are added +%% or removed. +%% =========================================================================== + +-spec subscribe(Pat :: any(), T :: term()) + -> [{term(), pid()}]. + +subscribe(Pat, T) -> + _ = match(Pat), %% ensure match can succeed + call({subscribe, Pat, T}). + +%% =========================================================================== start_link() -> ServerName = {local, ?SERVER}, @@ -156,18 +183,13 @@ uptime() -> %% pids/0 -spec pids() - -> [{pid(), [term()]}]. + -> [{pid(), [key()]}]. pids() -> to_list(fun swap/1). to_list(Fun) -> - ets:foldl(fun(T,A) -> acc(Fun, T, A) end, orddict:new(), ?TABLE). - -acc(Fun, ?MAPPING(Term, Pid), Dict) -> - append(Fun({Term, Pid}), Dict); -acc(_, _, Dict) -> - Dict. + ets:foldl(fun(T,D) -> append(Fun(T), D) end, orddict:new(), ?TABLE). append({K,V}, Dict) -> orddict:append(K, V, Dict). @@ -177,13 +199,45 @@ id(T) -> T. %% terms/0 -spec terms() - -> [{term(), [pid()]}]. + -> [{key(), [pid()]}]. terms() -> to_list(fun id/1). swap({X,Y}) -> {Y,X}. +%% subs/0 + +-spec subs() + -> [{pattern(), [{pid(), term()}]}]. + +subs() -> + #state{receivers = RD} = state(), + dict:fold(fun sub/3, orddict:new(), RD). + +sub(Pat, Ps, Dict) -> + lists:foldl(fun([P|T], D) -> orddict:append(Pat, {P,T}, D); + (_, D) -> D + end, + Dict, + Ps). + +%% waits/0 + +-spec waits() + -> [{pattern(), [{from(), term()}]}]. + +waits() -> + #state{receivers = RD} = state(), + dict:fold(fun wait/3, orddict:new(), RD). + +wait(Pat, Ps, Dict) -> + lists:foldl(fun({_,_} = F, D) -> orddict:append(Pat, F, D); + (_, D) -> D + end, + Dict, + Ps). + %% ---------------------------------------------------------- %% # init/1 %% ---------------------------------------------------------- @@ -196,22 +250,34 @@ init(_) -> %% # handle_call/3 %% ---------------------------------------------------------- -handle_call({add, Fun, Key, Pid}, _, S) -> - B = Fun(?TABLE, {Key, Pid}), - insert_monitor(B andalso no_monitor(Pid), Pid), - {reply, B, pending(B, S)}; - -handle_call({del, Key, Pid}, _, S) -> - {reply, ets:delete_object(?TABLE, ?MAPPING(Key, Pid)), S}; - -handle_call({wait, Pat}, From, #state{q = Q} = S) -> - case find(Pat) of - {ok, L} -> - {reply, L, S}; - false -> - {noreply, S#state{q = [{From, Pat} | Q]}} +handle_call({add, Uniq, Key}, {Pid, _}, S0) -> + Rec = {Key, Pid}, + S1 = flush(Uniq, Rec, S0), + {Res, New} = insert(Uniq, Rec), + {Recvs, S} = add(New, Rec, S1), + notify(Recvs, Rec), + {reply, Res, S}; + +handle_call({del, Key}, {Pid, _}, S) -> + Rec = {Key, Pid}, + Recvs = delete([Rec], S), + ets:delete_object(?TABLE, Rec), + notify(Recvs, del), + {reply, true, S}; + +handle_call({wait, Pat}, {Pid, _} = From, #state{receivers = RD} = S) -> + NS = add_monitor(Pid, S), + case match(Pat) of + [_|_] = L -> + {reply, L, NS}; + [] -> + {noreply, NS#state{receivers = dict:append(Pat, From, RD)}} end; +handle_call({subscribe, Pat, T}, {Pid, _}, #state{receivers = RD} = S) -> + NS = add_monitor(Pid, S), + {reply, match(Pat), NS#state{receivers = dict:append(Pat, [Pid | T], RD)}}; + handle_call(state, _, S) -> {reply, S, S}; @@ -232,10 +298,8 @@ handle_cast(_Msg, S)-> %% # handle_info/2 %% ---------------------------------------------------------- -handle_info({'DOWN', MRef, process, Pid, _}, S) -> - ets:delete_object(?TABLE, ?MONITOR(Pid, MRef)), - ets:match_delete(?TABLE, ?MAPPING('_', Pid)), - {noreply, S}; +handle_info({'DOWN', _MRef, process, Pid, _}, S) -> + {noreply, down(Pid, S)}; handle_info(_Info, S) -> {noreply, S}. @@ -256,69 +320,166 @@ code_change(_OldVsn, State, _Extra) -> %% =========================================================================== -insert_monitor(B, Pid) -> - B andalso ets:insert(?TABLE, ?MONITOR(Pid, monitor(process, Pid))). +%% insert/2 + +insert(false, Rec) -> + Spec = [{'$1', [{'==', '$1', {const, Rec}}], ['$_']}], + X = '$end_of_table' /= ets:select(?TABLE, Spec, 1), %% entry exists? + X orelse ets:insert(?TABLE, Rec), + {true, not X}; + +insert(true, Rec) -> + B = ets:insert_new(?TABLE, Rec), %% entry inserted? + {B, B}. + +%% add/3 + +%% Only add a single monitor for any given process, since there's no +%% use to more. +add(true, {_Key, Pid} = Rec, S) -> + NS = add_monitor(Pid, S), + {Recvs, RD} = add(Rec, NS), + {Recvs, S#state{receivers = RD}}; + +add(false = No, _, S) -> + {No, S}. + +%% add/2 + +%% Notify processes whose patterns match the inserted key. +add({_Key, Pid} = Rec, #state{receivers = RD}) -> + dict:fold(fun(Pt, Ps, A) -> + add(lists:member(Rec, match(Pt, Pid)), Pt, Ps, Rec, A) + end, + {sets:new(), RD}, + RD). -%% Do we need a monitor for the specified Pid? -no_monitor(Pid) -> - [] == ets:match_object(?TABLE, ?MONITOR(Pid, '_')). +%% add/5 -%% insert_new/2 +add(true, Pat, Recvs, {_,_} = Rec, {Set, Dict}) -> + {lists:foldl(fun sets:add_element/2, Set, Recvs), + remove(fun erlang:is_list/1, Pat, Recvs, Dict)}; -insert_new(?TABLE, {Key, _} = T) -> - flush(ets:lookup(?TABLE, Key)), - ets:insert_new(?TABLE, T). +add(false, _, _, _, Acc) -> + Acc. + +%% add_monitor/2 + +add_monitor(Pid, #state{monitors = MS} = S) -> + add_monitor(sets:is_element(Pid, MS), Pid, S). + +%% add_monitor/3 + +add_monitor(false, Pid, #state{monitors = MS} = S) -> + monitor(process, Pid), + S#state{monitors = sets:add_element(Pid, MS)}; + +add_monitor(true, _, S) -> + S. + +%% delete/2 + +delete(Recs, #state{receivers = RD}) -> + lists:foldl(fun(R,S) -> delete(R, RD, S) end, sets:new(), Recs). + +%% delete/3 + +delete({_Key, Pid} = Rec, RD, Set) -> + dict:fold(fun(Pt, Ps, S) -> + delete(lists:member(Rec, match(Pt, Pid)), Rec, Ps, S) + end, + Set, + RD). + +%% delete/4 + +%% Entry matches a pattern ... +delete(true, Rec, Recvs, Set) -> + lists:foldl(fun(R,S) -> sets:add_element({R, Rec}, S) end, + Set, + Recvs); + +%% ... or not. +delete(false, _, _, Set) -> + Set. + +%% notify/2 + +notify(false = No, _) -> + No; + +notify(Recvs, del = Op) -> + sets:fold(fun({P,R}, N) -> send(P, R, Op), N+1 end, 0, Recvs); + +notify(Recvs, {_,_} = Rec) -> + sets:fold(fun(P,N) -> send(P, Rec, add), N+1 end, 0, Recvs). + +%% send/3 + +%% No processes waiting on del, by construction: they've either +%% received notification at add or aren't waiting. +send([Pid | T], Rec, Op) -> + Pid ! {T, Op, Rec}; + +send({_,_} = From, Rec, add) -> + gen_server:reply(From, [Rec]). + +%% down/2 + +down(Pid, #state{monitors = MS} = S) -> + NS = flush(Pid, S), + Recvs = delete(match('_', Pid), NS), + ets:match_delete(?TABLE, {'_', Pid}), + notify(Recvs, del), + NS#state{monitors = sets:del_element(Pid, MS)}. + +%% flush/3 %% Remove any processes that are dead but for which we may not have -%% received 'DOWN' yet. This is to ensure that add_new can be used -%% to register a unique name each time a process restarts. -flush(List) -> - lists:foreach(fun({_,P} = T) -> - del(erlang:is_process_alive(P), T) - end, - List). - -del(Alive, T) -> - Alive orelse ets:delete_object(?TABLE, T). - -%% repl/3 - -repl([?MAPPING(_, Pid) = M], Key, Pid) -> - ets:delete_object(?TABLE, M), - true = ets:insert(?TABLE, ?MAPPING(Key, Pid)); -repl([], _, _) -> - false. - -%% pending/1 - -pending(true, #state{q = [_|_] = Q} = S) -> - S#state{q = q(lists:reverse(Q), [])}; %% retain reply order -pending(_, S) -> +%% received 'DOWN' yet, to ensure that add_new can be used to register +%% a unique name each time a registering process restarts. +flush(true, {Key, Pid}, S) -> + Spec = [{{'$1', '$2'}, + [{'andalso', {'==', '$1', {const, Key}}, + {'/=', '$2', Pid}}], + ['$2']}], + lists:foldl(fun down/2, S, [P || P <- ets:select(?TABLE, Spec), + not is_process_alive(P)]); + +flush(false, _, S) -> S. -q([], Q) -> - Q; -q([{From, Pat} = T | Rest], Q) -> - case find(Pat) of - {ok, L} -> - gen_server:reply(From, L), - q(Rest, Q); - false -> - q(Rest, [T|Q]) - end. - -%% find/1 - -find(Pat) -> - try match(Pat) of - [] -> - false; - L -> - {ok, L} - catch - _:_ -> - {ok, []} - end. +%% flush/2 + +%% Process has died and should no longer receive messages/replies. +flush(Pid, #state{receivers = RD} = S) + when is_pid(Pid) -> + S#state{receivers = dict:fold(fun(Pt,Ps,D) -> flush(Pid, Pt, Ps, D) end, + RD, + RD)}. + +%% flush/4 + +flush(Pid, Pat, Recvs, Dict) -> + remove(fun(T) -> Pid /= head(T) end, Pat, Recvs, Dict). + +%% head/1 + +head([P|_]) -> + P; + +head({P,_}) -> + P. + +%% remove/4 + +remove(Pred, Key, Values, Dict) -> + case lists:filter(Pred, Values) of + [] -> + dict:erase(Key, Dict); + Rest -> + dict:store(Key, Rest, Dict) + end. %% call/1 |