From b3ac2949c102d9569c14f6f20af31283fed6b459 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 24 Aug 2017 20:13:15 +0200 Subject: Fix minor monitor blunder in diameter_reg Commit 58091992 discarded a new monitor reference, so didn't avoid multiple monitors as was the intention. The blunder was harmless since all but the first DOWN message resulted in notifications. --- lib/diameter/src/base/diameter_reg.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 97e74657bd..e6a82e1764 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -339,7 +339,7 @@ insert(true, Rec) -> add(true, {_Key, Pid} = Rec, S) -> NS = add_monitor(Pid, S), {Recvs, RD} = add(Rec, NS), - {Recvs, S#state{receivers = RD}}; + {Recvs, NS#state{receivers = RD}}; add(false = No, _, S) -> {No, S}. -- cgit v1.2.3 From 257268ba6a2faa38032b2a2a1b6e0276103fca6b Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 6 Sep 2017 14:03:05 +0200 Subject: Fix type spec That dialyzer hasn't noticed is wrong. --- lib/diameter/src/base/diameter_reg.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index e6a82e1764..cabe90ffe8 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -225,7 +225,7 @@ sub(Pat, Ps, Dict) -> %% waits/0 -spec waits() - -> [{pattern(), [{from(), term()}]}]. + -> [{pattern(), [from()]}]. waits() -> #state{receivers = RD} = state(), -- cgit v1.2.3 From fae8ca0c37e54ad109b1612185d17dc5695bebcc Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 24 Aug 2017 20:35:27 +0200 Subject: Simplify implementation in diameter_reg Use maps instead of dict for traceability more than performance. --- lib/diameter/src/base/diameter_reg.erl | 251 ++++++++++++--------------------- 1 file changed, 94 insertions(+), 157 deletions(-) diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index cabe90ffe8..9ada36acc5 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -19,10 +19,11 @@ %% %% -%% The module implements a simple term -> pid registry. +%% A simple term -> pid registry. %% -module(diameter_reg). + -behaviour(gen_server). -export([add/1, @@ -57,18 +58,18 @@ -type key() :: term(). -type from() :: {pid(), term()}. +-type rcvr() :: [pid() | term()] %% subscribe + | from(). %% wait -type pattern() :: term(). -record(state, {id = diameter_lib:now(), - receivers = dict:new() - :: dict:dict(pattern(), [[pid() | term()]%% subscribe - | from()]), %% wait + notify = #{} :: #{pattern() => [rcvr()]}, monitors = sets:new() :: sets:set(pid())}). %% The ?TABLE bag contains the Key -> Pid mapping, as {Key, Pid} %% tuples. Each pid is stored in the monitors set to ensure only one %% monitor for each pid: more are harmless, but unnecessary. A pattern -%% is added to receivers a result of calls to wait/1 or subscribe/2: +%% is added to notify a result of calls to wait/1 or subscribe/2: %% changes to ?TABLE causes processes to be notified as required. %% =========================================================================== @@ -156,7 +157,7 @@ wait(Pat) -> %% # subscribe(Pat, T) %% %% Like match/1, but additionally receive messages of the form -%% {T, add|remove, {term(), pid()} when associations are added +%% {T, add|remove, {term(), pid()}} when associations are added %% or removed. %% =========================================================================== @@ -186,15 +187,12 @@ uptime() -> -> [{pid(), [key()]}]. pids() -> - to_list(fun swap/1). - -to_list(Fun) -> - ets:foldl(fun(T,D) -> append(Fun(T), D) end, orddict:new(), ?TABLE). - -append({K,V}, Dict) -> - orddict:append(K, V, Dict). + append(ets:select(?TABLE, [{{'$1','$2'}, [], [{{'$2', '$1'}}]}])). -id(T) -> T. +append(Pairs) -> + dict:to_list(lists:foldl(fun({K,V}, D) -> dict:append(K, V, D) end, + dict:new(), + Pairs)). %% terms/0 @@ -202,9 +200,7 @@ id(T) -> T. -> [{key(), [pid()]}]. terms() -> - to_list(fun id/1). - -swap({X,Y}) -> {Y,X}. + append(ets:tab2list(?TABLE)). %% subs/0 @@ -212,15 +208,9 @@ swap({X,Y}) -> {Y,X}. -> [{pattern(), [{pid(), term()}]}]. subs() -> - #state{receivers = RD} = state(), - dict:fold(fun sub/3, orddict:new(), RD). - -sub(Pat, Ps, Dict) -> - lists:foldl(fun([P|T], D) -> orddict:append(Pat, {P,T}, D); - (_, D) -> D - end, - Dict, - Ps). + #state{notify = Dict} = state(), + [{K, Ts} || {K,Ps} <- maps:to_list(Dict), + Ts <- [[{P,T} || [P|T] <- Ps]]]. %% waits/0 @@ -228,15 +218,9 @@ sub(Pat, Ps, Dict) -> -> [{pattern(), [from()]}]. waits() -> - #state{receivers = RD} = state(), - dict:fold(fun wait/3, orddict:new(), RD). - -wait(Pat, Ps, Dict) -> - lists:foldl(fun({_,_} = F, D) -> orddict:append(Pat, F, D); - (_, D) -> D - end, - Dict, - Ps). + #state{notify = Dict} = state(), + [{K, Ts} || {K,Ps} <- maps:to_list(Dict), + Ts <- [[T || {_,_} = T <- Ps]]]. %% ---------------------------------------------------------- %% # init/1 @@ -250,33 +234,28 @@ init(_) -> %% # handle_call/3 %% ---------------------------------------------------------- -handle_call({add, Uniq, Key}, {Pid, _}, S0) -> +handle_call({add, Uniq, Key}, {Pid, _}, S) -> Rec = {Key, Pid}, - S1 = flush(Uniq, Rec, S0), + NS = flush(Uniq, Rec, S), %% before insert {Res, New} = insert(Uniq, Rec), - {Recvs, S} = add(New, Rec, S1), - notify(Recvs, Rec), - {reply, Res, S}; + {reply, Res, notify(add, New andalso Rec, NS)}; handle_call({remove, Key}, {Pid, _}, S) -> Rec = {Key, Pid}, - Recvs = delete([Rec], S), ets:delete_object(?TABLE, Rec), - notify(Recvs, remove), - {reply, true, S}; + {reply, true, notify(remove, Rec, S)}; -handle_call({wait, Pat}, {Pid, _} = From, #state{receivers = RD} = S) -> +handle_call({wait, Pat}, {Pid, _} = From, S) -> NS = add_monitor(Pid, S), case match(Pat) of - [_|_] = L -> - {reply, L, NS}; + [_|_] = Recs -> + {reply, Recs, NS}; [] -> - {noreply, NS#state{receivers = dict:append(Pat, From, RD)}} + {noreply, queue(Pat, From, NS)} end; -handle_call({subscribe, Pat, T}, {Pid, _}, #state{receivers = RD} = S) -> - NS = add_monitor(Pid, S), - {reply, match(Pat), NS#state{receivers = dict:append(Pat, [Pid | T], RD)}}; +handle_call({subscribe, Pat, T}, {Pid, _}, S) -> + {reply, match(Pat), queue(Pat, [Pid | T], add_monitor(Pid, S))}; handle_call(state, _, S) -> {reply, S, S}; @@ -332,106 +311,62 @@ insert(true, Rec) -> B = ets:insert_new(?TABLE, Rec), %% entry inserted? {B, B}. -%% add/3 - +%% add_monitor/2 +%% %% Only add a single monitor for any given process, since there's no %% use to more. -add(true, {_Key, Pid} = Rec, S) -> - NS = add_monitor(Pid, S), - {Recvs, RD} = add(Rec, NS), - {Recvs, NS#state{receivers = RD}}; - -add(false = No, _, S) -> - {No, S}. - -%% add/2 - -%% Notify processes whose patterns match the inserted key. -add({_Key, Pid} = Rec, #state{receivers = RD}) -> - dict:fold(fun(Pt, Ps, A) -> - add(lists:member(Rec, match(Pt, Pid)), Pt, Ps, Rec, A) - end, - {sets:new(), RD}, - RD). - -%% add/5 -add(true, Pat, Recvs, {_,_} = Rec, {Set, Dict}) -> - {lists:foldl(fun sets:add_element/2, Set, Recvs), - remove(fun erlang:is_list/1, Pat, Recvs, Dict)}; - -add(false, _, _, _, Acc) -> +add_monitor(Pid, #state{monitors = Ps} = S) -> + case sets:is_element(Pid, Ps) of + false -> + monitor(process, Pid), + S#state{monitors = sets:add_element(Pid, Ps)}; + true -> + S + end. + +%% notify/3 + +notify(_, false, S) -> + S; + +notify(Op, {_,_} = Rec, #state{notify = Dict} = S) -> + S#state{notify = maps:fold(fun(P,Rs,D) -> notify(Op, Rec, P, Rs, D) end, + Dict, + Dict)}. + +%% notify/5 + +notify(Op, {_, Pid} = Rec, Pat, Rcvrs, Dict) -> + case lists:member(Rec, match(Pat, Pid)) of + true -> + reset(Pat, Dict, lists:foldr(fun(P,A) -> send(P, Op, Rec, A) end, + [], + Rcvrs)); + false -> + Dict + end. + +%% send/4 + +send([Pid | T] = Rcvr, Op, Rec, Acc) -> + Pid ! {T, Op, Rec}, + [Rcvr | Acc]; + +%% No processes wait on remove: they receive notification immediately +%% or at add, by construction. +send({_,_} = From, add, Rec, Acc) -> + gen_server:reply(From, [Rec]), Acc. -%% add_monitor/2 - -add_monitor(Pid, #state{monitors = MS} = S) -> - add_monitor(sets:is_element(Pid, MS), Pid, S). - -%% add_monitor/3 - -add_monitor(false, Pid, #state{monitors = MS} = S) -> - monitor(process, Pid), - S#state{monitors = sets:add_element(Pid, MS)}; - -add_monitor(true, _, S) -> - S. - -%% delete/2 - -delete(Recs, #state{receivers = RD}) -> - lists:foldl(fun(R,S) -> delete(R, RD, S) end, sets:new(), Recs). - -%% delete/3 - -delete({_Key, Pid} = Rec, RD, Set) -> - dict:fold(fun(Pt, Ps, S) -> - delete(lists:member(Rec, match(Pt, Pid)), Rec, Ps, S) - end, - Set, - RD). - -%% delete/4 - -%% Entry matches a pattern ... -delete(true, Rec, Recvs, Set) -> - lists:foldl(fun(R,S) -> sets:add_element({R, Rec}, S) end, - Set, - Recvs); - -%% ... or not. -delete(false, _, _, Set) -> - Set. - -%% notify/2 - -notify(false = No, _) -> - No; - -notify(Recvs, remove = Op) -> - sets:fold(fun({P,R}, N) -> send(P, R, Op), N+1 end, 0, Recvs); - -notify(Recvs, {_,_} = Rec) -> - sets:fold(fun(P,N) -> send(P, Rec, add), N+1 end, 0, Recvs). - -%% send/3 - -%% No processes waiting on remove, by construction: they've either -%% received notification at add or aren't waiting. -send([Pid | T], Rec, Op) -> - Pid ! {T, Op, Rec}; - -send({_,_} = From, Rec, add) -> - gen_server:reply(From, [Rec]). - %% down/2 -down(Pid, #state{monitors = MS} = S) -> - NS = flush(Pid, S), - Recvs = delete(match('_', Pid), NS), +down(Pid, #state{monitors = Ps} = S) -> + Recs = match('_', Pid), ets:match_delete(?TABLE, {'_', Pid}), - notify(Recvs, remove), - NS#state{monitors = sets:del_element(Pid, MS)}. + lists:foldl(fun(R,NS) -> notify(remove, R, NS) end, + flush(Pid, S#state{monitors = sets:del_element(Pid, Ps)}), + Recs). %% flush/3 @@ -452,16 +387,15 @@ flush(false, _, S) -> %% flush/2 %% Process has died and should no longer receive messages/replies. -flush(Pid, #state{receivers = RD} = S) - when is_pid(Pid) -> - S#state{receivers = dict:fold(fun(Pt,Ps,D) -> flush(Pid, Pt, Ps, D) end, - RD, - RD)}. +flush(Pid, #state{notify = Dict} = S) -> + S#state{notify = maps:fold(fun(P,Rs,D) -> flush(Pid, P, Rs, D) end, + Dict, + Dict)}. %% flush/4 -flush(Pid, Pat, Recvs, Dict) -> - remove(fun(T) -> Pid /= head(T) end, Pat, Recvs, Dict). +flush(Pid, Pat, Rcvrs, Dict) -> + reset(Pat, Dict, [T || T <- Rcvrs, Pid /= head(T)]). %% head/1 @@ -471,15 +405,18 @@ head([P|_]) -> head({P,_}) -> P. -%% remove/4 +%% reset/3 + +reset(Key, Map, []) -> + maps:remove(Key, Map); + +reset(Key, Map, List) -> + maps:put(Key, List, Map). + +%% queue/3 -remove(Pred, Key, Values, Dict) -> - case lists:filter(Pred, Values) of - [] -> - dict:erase(Key, Dict); - Rest -> - dict:store(Key, Rest, Dict) - end. +queue(Pat, Rcvr, #state{notify = Dict} = S) -> + S#state{notify = maps:put(Pat, [Rcvr | maps:get(Pat, Dict, [])], Dict)}. %% call/1 -- cgit v1.2.3