From ebc80a66cd2251553565a2abf6d9ca23381bde22 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 20 May 2016 07:33:06 +0200 Subject: Remove diameter_reg bloat Unexpected messages don't happen in practice, and no_auto_import is neither necessery nor difficult to avoid. --- lib/diameter/src/base/diameter_reg.erl | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) (limited to 'lib/diameter') diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 7f198080ba..2f2dd7aaa4 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -25,8 +25,6 @@ -module(diameter_reg). -behaviour(gen_server). --compile({no_auto_import, [monitor/2]}). - -export([add/1, add_new/1, del/1, @@ -52,8 +50,6 @@ -export([state/0, uptime/0]). --include("diameter_internal.hrl"). - -define(SERVER, ?MODULE). -define(TABLE, ?MODULE). @@ -211,7 +207,7 @@ init(_) -> handle_call({add, Fun, Key, Pid}, _, S) -> B = Fun(?TABLE, {Key, Pid}), - monitor(B andalso no_monitor(Pid), Pid), + insert_monitor(B andalso no_monitor(Pid), Pid), {reply, B, pending(B, S)}; handle_call({del, Key, Pid}, _, S) -> @@ -237,16 +233,14 @@ handle_call(state, _, S) -> handle_call(uptime, _, #state{id = Time} = S) -> {reply, diameter_lib:now_diff(Time), S}; -handle_call(Req, From, S) -> - ?UNEXPECTED([Req, From]), +handle_call(_Req, _From, S) -> {reply, nok, S}. %% ---------------------------------------------------------- %% # handle_cast/2 %% ---------------------------------------------------------- -handle_cast(Msg, S)-> - ?UNEXPECTED([Msg]), +handle_cast(_Msg, S)-> {noreply, S}. %% ---------------------------------------------------------- @@ -258,8 +252,7 @@ handle_info({'DOWN', MRef, process, Pid, _}, S) -> ets:match_delete(?TABLE, ?MAPPING('_', Pid)), {noreply, S}; -handle_info(Info, S) -> - ?UNEXPECTED([Info]), +handle_info(_Info, S) -> {noreply, S}. %% ---------------------------------------------------------- @@ -278,10 +271,8 @@ code_change(_OldVsn, State, _Extra) -> %% =========================================================================== -monitor(true, Pid) -> - ets:insert(?TABLE, ?MONITOR(Pid, erlang:monitor(process, Pid))); -monitor(false, _) -> - ok. +insert_monitor(B, Pid) -> + B andalso ets:insert(?TABLE, ?MONITOR(Pid, monitor(process, Pid))). %% Do we need a monitor for the specified Pid? no_monitor(Pid) -> -- cgit v1.2.3 From a2222ae07cbe05ba2a1b3d1d201c748969665ce4 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 22 May 2016 19:57:58 +0200 Subject: Remove diameter_reg:repl/2 Unused, and in the way for what's to come. --- lib/diameter/src/base/diameter_reg.erl | 20 -------------------- lib/diameter/test/diameter_reg_SUITE.erl | 11 ----------- 2 files changed, 31 deletions(-) (limited to 'lib/diameter') diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 2f2dd7aaa4..76cc79c8e9 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -28,7 +28,6 @@ -export([add/1, add_new/1, del/1, - repl/2, match/1, wait/1]). @@ -99,19 +98,6 @@ add(T) -> add_new(T) -> call({add, fun insert_new/2, T, self()}). -%% =========================================================================== -%% # repl(T, NewT) -%% -%% Like add/1 but only replace an existing association on T, false -%% being returned if it doesn't exist. -%% =========================================================================== - --spec repl(any(), any()) - -> boolean(). - -repl(T, U) -> - call({repl, T, U, self()}). - %% =========================================================================== %% # del(Term) %% @@ -213,12 +199,6 @@ handle_call({add, Fun, Key, Pid}, _, S) -> handle_call({del, Key, Pid}, _, S) -> {reply, ets:delete_object(?TABLE, ?MAPPING(Key, Pid)), S}; -handle_call({repl, T, U, Pid}, _, S) -> - MatchSpec = [{?MAPPING('$1', Pid), - [{'=:=', '$1', {const, T}}], - ['$_']}], - {reply, repl(ets:select(?TABLE, MatchSpec), U, Pid), S}; - handle_call({wait, Pat}, From, #state{q = Q} = S) -> case find(Pat) of {ok, L} -> diff --git a/lib/diameter/test/diameter_reg_SUITE.erl b/lib/diameter/test/diameter_reg_SUITE.erl index 3d9ad8bfa8..0092553191 100644 --- a/lib/diameter/test/diameter_reg_SUITE.erl +++ b/lib/diameter/test/diameter_reg_SUITE.erl @@ -34,7 +34,6 @@ -export([add/1, add_new/1, del/1, - repl/1, terms/1, pids/1]). @@ -57,7 +56,6 @@ tc() -> [add, add_new, del, - repl, terms, pids]. @@ -90,15 +88,6 @@ del(_) -> [{Ref, Pid}] = ?reg:match(Ref), Pid = self(). -repl(_) -> - Ref = make_ref(), - true = ?reg:add_new({Ref}), - true = ?reg:repl({Ref}, Ref), - false = ?reg:add_new(Ref), - false = ?reg:repl({Ref}, Ref), - [{Ref, Pid}] = ?reg:match(Ref), - Pid = self(). - terms(_) -> Ref = make_ref(), true = ?reg:add_new(Ref), -- cgit v1.2.3 From bc1ab9f2f1a22a874415260a5d099281c35ce3d5 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 20 May 2016 07:37:13 +0200 Subject: Add dialyzer specs to diameter_reg Last missed in commit 25bef13f. --- lib/diameter/src/base/diameter_reg.erl | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'lib/diameter') diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 76cc79c8e9..db01e17c86 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -134,6 +134,9 @@ match(Pat) -> %% It's up to the caller to ensure that the wait won't be forever. %% =========================================================================== +-spec wait(any()) + -> [{term(), pid()}]. + wait(Pat) -> call({wait, Pat}). @@ -151,8 +154,9 @@ uptime() -> call(uptime). %% pids/0 -%% -%% Return: list of {Pid, [Term, ...]} + +-spec pids() + -> [{pid(), [term()]}]. pids() -> to_list(fun swap/1). @@ -171,8 +175,9 @@ append({K,V}, Dict) -> id(T) -> T. %% terms/0 -%% -%% Return: list of {Term, [Pid, ...]} + +-spec terms() + -> [{term(), [pid()]}]. terms() -> to_list(fun id/1). -- cgit v1.2.3 From 5809199289664e30df9ae418efb4df9bc9d73cd4 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 20 May 2016 07:29:39 +0200 Subject: Add diameter_reg:subscribe/2 To allow processes to subscribe to a message when a matching association is added or removed. The intention is to use this in diameter_{tcp,sctp}, in order for listening processes to find out when transport their transport configuration has been removed. --- lib/diameter/src/base/diameter_reg.erl | 373 +++++++++++++++++++++++---------- 1 file changed, 267 insertions(+), 106 deletions(-) (limited to 'lib/diameter') diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index db01e17c86..4dba261a8b 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -29,7 +29,8 @@ add_new/1, del/1, match/1, - wait/1]). + wait/1, + subscribe/2]). -export([start_link/0]). @@ -43,7 +44,9 @@ %% test -export([pids/0, - terms/0]). + terms/0, + subs/0, + waits/0]). %% debug -export([state/0, @@ -52,18 +55,21 @@ -define(SERVER, ?MODULE). -define(TABLE, ?MODULE). -%% Table entry used to keep from starting more than one monitor on the -%% same process. This isn't a problem but there's no point in starting -%% multiple monitors if we can avoid it. Note that we can't have a 2-tuple -%% keyed on Pid since a registered term can be anything. Want the entry -%% keyed on Pid so that lookup is fast. --define(MONITOR(Pid, MRef), {Pid, monitor, MRef}). - -%% Table entry containing the Term -> Pid mapping. --define(MAPPING(Term, Pid), {Term, Pid}). +-type key() :: term(). +-type from() :: {pid(), term()}. +-type pattern() :: term(). -record(state, {id = diameter_lib:now(), - q = []}). %% [{From, Pat}] + receivers = dict:new() + :: dict:dict(pattern(), [[pid() | term()]%% subscribe + | from()]), %% wait + monitors = sets:new() :: sets:set(pid())}). + +%% The ?TABLE bag contains the Key -> Pid mapping, as {Key, Pid} +%% tuples. Each pid is stored in the monitors set to ensure only one +%% monitor for each pid: more are harmless, but unnecessary. A pattern +%% is added to receivers a result of calls to wait/1 or subscribe/2: +%% changes to ?TABLE causes processes to be notified as required. %% =========================================================================== %% # add(T) @@ -73,17 +79,17 @@ %% %% An association is removed when the calling process dies or as a %% result of calling del/1. Adding the same term more than once is -%% equivalent to adding it exactly once. +%% equivalent to adding it once. %% %% Note that since match/1 takes a pattern as argument, specifying a %% term that contains match variables is probably not a good idea %% =========================================================================== --spec add(any()) +-spec add(key()) -> true. add(T) -> - call({add, fun ets:insert/2, T, self()}). + call({add, false, T}). %% =========================================================================== %% # add_new(T) @@ -92,11 +98,11 @@ add(T) -> %% association, false being returned if an association already exists. %% =========================================================================== --spec add_new(any()) +-spec add_new(key()) -> boolean(). add_new(T) -> - call({add, fun insert_new/2, T, self()}). + call({add, true, T}). %% =========================================================================== %% # del(Term) @@ -104,11 +110,11 @@ add_new(T) -> %% Remove any existing association of Term with self(). %% =========================================================================== --spec del(any()) +-spec del(key()) -> true. del(T) -> - call({del, T, self()}). + call({del, T}). %% =========================================================================== %% # match(Pat) @@ -121,12 +127,17 @@ del(T) -> %% associations removed.) %% =========================================================================== --spec match(any()) - -> [{term(), pid()}]. +-spec match(pattern()) + -> [{key(), pid()}]. match(Pat) -> - ets:match_object(?TABLE, ?MAPPING(Pat, '_')). + match(Pat, '_'). + +%% match/2 +match(Pat, Pid) -> + ets:match_object(?TABLE, {Pat, Pid}). + %% =========================================================================== %% # wait(Pat) %% @@ -134,12 +145,28 @@ match(Pat) -> %% It's up to the caller to ensure that the wait won't be forever. %% =========================================================================== --spec wait(any()) - -> [{term(), pid()}]. +-spec wait(pattern()) + -> [{key(), pid()}]. wait(Pat) -> + _ = match(Pat), %% ensure match can succeed call({wait, Pat}). +%% =========================================================================== +%% # subscribe(Pat, T) +%% +%% Like match/1, but additionally receive messages of the form +%% {T, add|del, {term(), pid()} when associations are added +%% or removed. +%% =========================================================================== + +-spec subscribe(Pat :: any(), T :: term()) + -> [{term(), pid()}]. + +subscribe(Pat, T) -> + _ = match(Pat), %% ensure match can succeed + call({subscribe, Pat, T}). + %% =========================================================================== start_link() -> @@ -156,18 +183,13 @@ uptime() -> %% pids/0 -spec pids() - -> [{pid(), [term()]}]. + -> [{pid(), [key()]}]. pids() -> to_list(fun swap/1). to_list(Fun) -> - ets:foldl(fun(T,A) -> acc(Fun, T, A) end, orddict:new(), ?TABLE). - -acc(Fun, ?MAPPING(Term, Pid), Dict) -> - append(Fun({Term, Pid}), Dict); -acc(_, _, Dict) -> - Dict. + ets:foldl(fun(T,D) -> append(Fun(T), D) end, orddict:new(), ?TABLE). append({K,V}, Dict) -> orddict:append(K, V, Dict). @@ -177,13 +199,45 @@ id(T) -> T. %% terms/0 -spec terms() - -> [{term(), [pid()]}]. + -> [{key(), [pid()]}]. terms() -> to_list(fun id/1). swap({X,Y}) -> {Y,X}. +%% subs/0 + +-spec subs() + -> [{pattern(), [{pid(), term()}]}]. + +subs() -> + #state{receivers = RD} = state(), + dict:fold(fun sub/3, orddict:new(), RD). + +sub(Pat, Ps, Dict) -> + lists:foldl(fun([P|T], D) -> orddict:append(Pat, {P,T}, D); + (_, D) -> D + end, + Dict, + Ps). + +%% waits/0 + +-spec waits() + -> [{pattern(), [{from(), term()}]}]. + +waits() -> + #state{receivers = RD} = state(), + dict:fold(fun wait/3, orddict:new(), RD). + +wait(Pat, Ps, Dict) -> + lists:foldl(fun({_,_} = F, D) -> orddict:append(Pat, F, D); + (_, D) -> D + end, + Dict, + Ps). + %% ---------------------------------------------------------- %% # init/1 %% ---------------------------------------------------------- @@ -196,22 +250,34 @@ init(_) -> %% # handle_call/3 %% ---------------------------------------------------------- -handle_call({add, Fun, Key, Pid}, _, S) -> - B = Fun(?TABLE, {Key, Pid}), - insert_monitor(B andalso no_monitor(Pid), Pid), - {reply, B, pending(B, S)}; - -handle_call({del, Key, Pid}, _, S) -> - {reply, ets:delete_object(?TABLE, ?MAPPING(Key, Pid)), S}; - -handle_call({wait, Pat}, From, #state{q = Q} = S) -> - case find(Pat) of - {ok, L} -> - {reply, L, S}; - false -> - {noreply, S#state{q = [{From, Pat} | Q]}} +handle_call({add, Uniq, Key}, {Pid, _}, S0) -> + Rec = {Key, Pid}, + S1 = flush(Uniq, Rec, S0), + {Res, New} = insert(Uniq, Rec), + {Recvs, S} = add(New, Rec, S1), + notify(Recvs, Rec), + {reply, Res, S}; + +handle_call({del, Key}, {Pid, _}, S) -> + Rec = {Key, Pid}, + Recvs = delete([Rec], S), + ets:delete_object(?TABLE, Rec), + notify(Recvs, del), + {reply, true, S}; + +handle_call({wait, Pat}, {Pid, _} = From, #state{receivers = RD} = S) -> + NS = add_monitor(Pid, S), + case match(Pat) of + [_|_] = L -> + {reply, L, NS}; + [] -> + {noreply, NS#state{receivers = dict:append(Pat, From, RD)}} end; +handle_call({subscribe, Pat, T}, {Pid, _}, #state{receivers = RD} = S) -> + NS = add_monitor(Pid, S), + {reply, match(Pat), NS#state{receivers = dict:append(Pat, [Pid | T], RD)}}; + handle_call(state, _, S) -> {reply, S, S}; @@ -232,10 +298,8 @@ handle_cast(_Msg, S)-> %% # handle_info/2 %% ---------------------------------------------------------- -handle_info({'DOWN', MRef, process, Pid, _}, S) -> - ets:delete_object(?TABLE, ?MONITOR(Pid, MRef)), - ets:match_delete(?TABLE, ?MAPPING('_', Pid)), - {noreply, S}; +handle_info({'DOWN', _MRef, process, Pid, _}, S) -> + {noreply, down(Pid, S)}; handle_info(_Info, S) -> {noreply, S}. @@ -256,69 +320,166 @@ code_change(_OldVsn, State, _Extra) -> %% =========================================================================== -insert_monitor(B, Pid) -> - B andalso ets:insert(?TABLE, ?MONITOR(Pid, monitor(process, Pid))). +%% insert/2 + +insert(false, Rec) -> + Spec = [{'$1', [{'==', '$1', {const, Rec}}], ['$_']}], + X = '$end_of_table' /= ets:select(?TABLE, Spec, 1), %% entry exists? + X orelse ets:insert(?TABLE, Rec), + {true, not X}; + +insert(true, Rec) -> + B = ets:insert_new(?TABLE, Rec), %% entry inserted? + {B, B}. + +%% add/3 + +%% Only add a single monitor for any given process, since there's no +%% use to more. +add(true, {_Key, Pid} = Rec, S) -> + NS = add_monitor(Pid, S), + {Recvs, RD} = add(Rec, NS), + {Recvs, S#state{receivers = RD}}; + +add(false = No, _, S) -> + {No, S}. + +%% add/2 + +%% Notify processes whose patterns match the inserted key. +add({_Key, Pid} = Rec, #state{receivers = RD}) -> + dict:fold(fun(Pt, Ps, A) -> + add(lists:member(Rec, match(Pt, Pid)), Pt, Ps, Rec, A) + end, + {sets:new(), RD}, + RD). -%% Do we need a monitor for the specified Pid? -no_monitor(Pid) -> - [] == ets:match_object(?TABLE, ?MONITOR(Pid, '_')). +%% add/5 -%% insert_new/2 +add(true, Pat, Recvs, {_,_} = Rec, {Set, Dict}) -> + {lists:foldl(fun sets:add_element/2, Set, Recvs), + remove(fun erlang:is_list/1, Pat, Recvs, Dict)}; -insert_new(?TABLE, {Key, _} = T) -> - flush(ets:lookup(?TABLE, Key)), - ets:insert_new(?TABLE, T). +add(false, _, _, _, Acc) -> + Acc. + +%% add_monitor/2 + +add_monitor(Pid, #state{monitors = MS} = S) -> + add_monitor(sets:is_element(Pid, MS), Pid, S). + +%% add_monitor/3 + +add_monitor(false, Pid, #state{monitors = MS} = S) -> + monitor(process, Pid), + S#state{monitors = sets:add_element(Pid, MS)}; + +add_monitor(true, _, S) -> + S. + +%% delete/2 + +delete(Recs, #state{receivers = RD}) -> + lists:foldl(fun(R,S) -> delete(R, RD, S) end, sets:new(), Recs). + +%% delete/3 + +delete({_Key, Pid} = Rec, RD, Set) -> + dict:fold(fun(Pt, Ps, S) -> + delete(lists:member(Rec, match(Pt, Pid)), Rec, Ps, S) + end, + Set, + RD). + +%% delete/4 + +%% Entry matches a pattern ... +delete(true, Rec, Recvs, Set) -> + lists:foldl(fun(R,S) -> sets:add_element({R, Rec}, S) end, + Set, + Recvs); + +%% ... or not. +delete(false, _, _, Set) -> + Set. + +%% notify/2 + +notify(false = No, _) -> + No; + +notify(Recvs, del = Op) -> + sets:fold(fun({P,R}, N) -> send(P, R, Op), N+1 end, 0, Recvs); + +notify(Recvs, {_,_} = Rec) -> + sets:fold(fun(P,N) -> send(P, Rec, add), N+1 end, 0, Recvs). + +%% send/3 + +%% No processes waiting on del, by construction: they've either +%% received notification at add or aren't waiting. +send([Pid | T], Rec, Op) -> + Pid ! {T, Op, Rec}; + +send({_,_} = From, Rec, add) -> + gen_server:reply(From, [Rec]). + +%% down/2 + +down(Pid, #state{monitors = MS} = S) -> + NS = flush(Pid, S), + Recvs = delete(match('_', Pid), NS), + ets:match_delete(?TABLE, {'_', Pid}), + notify(Recvs, del), + NS#state{monitors = sets:del_element(Pid, MS)}. + +%% flush/3 %% Remove any processes that are dead but for which we may not have -%% received 'DOWN' yet. This is to ensure that add_new can be used -%% to register a unique name each time a process restarts. -flush(List) -> - lists:foreach(fun({_,P} = T) -> - del(erlang:is_process_alive(P), T) - end, - List). - -del(Alive, T) -> - Alive orelse ets:delete_object(?TABLE, T). - -%% repl/3 - -repl([?MAPPING(_, Pid) = M], Key, Pid) -> - ets:delete_object(?TABLE, M), - true = ets:insert(?TABLE, ?MAPPING(Key, Pid)); -repl([], _, _) -> - false. - -%% pending/1 - -pending(true, #state{q = [_|_] = Q} = S) -> - S#state{q = q(lists:reverse(Q), [])}; %% retain reply order -pending(_, S) -> +%% received 'DOWN' yet, to ensure that add_new can be used to register +%% a unique name each time a registering process restarts. +flush(true, {Key, Pid}, S) -> + Spec = [{{'$1', '$2'}, + [{'andalso', {'==', '$1', {const, Key}}, + {'/=', '$2', Pid}}], + ['$2']}], + lists:foldl(fun down/2, S, [P || P <- ets:select(?TABLE, Spec), + not is_process_alive(P)]); + +flush(false, _, S) -> S. -q([], Q) -> - Q; -q([{From, Pat} = T | Rest], Q) -> - case find(Pat) of - {ok, L} -> - gen_server:reply(From, L), - q(Rest, Q); - false -> - q(Rest, [T|Q]) - end. - -%% find/1 - -find(Pat) -> - try match(Pat) of - [] -> - false; - L -> - {ok, L} - catch - _:_ -> - {ok, []} - end. +%% flush/2 + +%% Process has died and should no longer receive messages/replies. +flush(Pid, #state{receivers = RD} = S) + when is_pid(Pid) -> + S#state{receivers = dict:fold(fun(Pt,Ps,D) -> flush(Pid, Pt, Ps, D) end, + RD, + RD)}. + +%% flush/4 + +flush(Pid, Pat, Recvs, Dict) -> + remove(fun(T) -> Pid /= head(T) end, Pat, Recvs, Dict). + +%% head/1 + +head([P|_]) -> + P; + +head({P,_}) -> + P. + +%% remove/4 + +remove(Pred, Key, Values, Dict) -> + case lists:filter(Pred, Values) of + [] -> + dict:erase(Key, Dict); + Rest -> + dict:store(Key, Rest, Dict) + end. %% call/1 -- cgit v1.2.3 From f83a883cbf448d2136fe716c5a318915d1e6eecc Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Tue, 24 May 2016 15:12:36 +0200 Subject: Rename diameter_reg:del -> remove Letters are cheap. --- lib/diameter/src/base/diameter_reg.erl | 24 ++++++++++++------------ lib/diameter/src/base/diameter_service.erl | 2 +- lib/diameter/test/diameter_reg_SUITE.erl | 8 ++++---- 3 files changed, 17 insertions(+), 17 deletions(-) (limited to 'lib/diameter') diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 4dba261a8b..9027130063 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -27,7 +27,7 @@ -export([add/1, add_new/1, - del/1, + remove/1, match/1, wait/1, subscribe/2]). @@ -78,7 +78,7 @@ %% this or other assocations can be retrieved using match/1. %% %% An association is removed when the calling process dies or as a -%% result of calling del/1. Adding the same term more than once is +%% result of calling remove/1. Adding the same term more than once is %% equivalent to adding it once. %% %% Note that since match/1 takes a pattern as argument, specifying a @@ -105,16 +105,16 @@ add_new(T) -> call({add, true, T}). %% =========================================================================== -%% # del(Term) +%% # remove(Term) %% %% Remove any existing association of Term with self(). %% =========================================================================== --spec del(key()) +-spec remove(key()) -> true. -del(T) -> - call({del, T}). +remove(T) -> + call({remove, T}). %% =========================================================================== %% # match(Pat) @@ -156,7 +156,7 @@ wait(Pat) -> %% # subscribe(Pat, T) %% %% Like match/1, but additionally receive messages of the form -%% {T, add|del, {term(), pid()} when associations are added +%% {T, add|remove, {term(), pid()} when associations are added %% or removed. %% =========================================================================== @@ -258,11 +258,11 @@ handle_call({add, Uniq, Key}, {Pid, _}, S0) -> notify(Recvs, Rec), {reply, Res, S}; -handle_call({del, Key}, {Pid, _}, S) -> +handle_call({remove, Key}, {Pid, _}, S) -> Rec = {Key, Pid}, Recvs = delete([Rec], S), ets:delete_object(?TABLE, Rec), - notify(Recvs, del), + notify(Recvs, remove), {reply, true, S}; handle_call({wait, Pat}, {Pid, _} = From, #state{receivers = RD} = S) -> @@ -408,7 +408,7 @@ delete(false, _, _, Set) -> notify(false = No, _) -> No; -notify(Recvs, del = Op) -> +notify(Recvs, remove = Op) -> sets:fold(fun({P,R}, N) -> send(P, R, Op), N+1 end, 0, Recvs); notify(Recvs, {_,_} = Rec) -> @@ -416,7 +416,7 @@ notify(Recvs, {_,_} = Rec) -> %% send/3 -%% No processes waiting on del, by construction: they've either +%% No processes waiting on remove, by construction: they've either %% received notification at add or aren't waiting. send([Pid | T], Rec, Op) -> Pid ! {T, Op, Rec}; @@ -430,7 +430,7 @@ down(Pid, #state{monitors = MS} = S) -> NS = flush(Pid, S), Recvs = delete(match('_', Pid), NS), ets:match_delete(?TABLE, {'_', Pid}), - notify(Recvs, del), + notify(Recvs, remove), NS#state{monitors = sets:del_element(Pid, MS)}. %% flush/3 diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index cfb5cb5b82..b1b4c7e050 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -250,7 +250,7 @@ subscribe(SvcName) -> diameter_reg:add({?MODULE, subscriber, SvcName}). unsubscribe(SvcName) -> - diameter_reg:del({?MODULE, subscriber, SvcName}). + diameter_reg:remove({?MODULE, subscriber, SvcName}). subscriptions(Pat) -> pmap(diameter_reg:match({?MODULE, subscriber, Pat})). diff --git a/lib/diameter/test/diameter_reg_SUITE.erl b/lib/diameter/test/diameter_reg_SUITE.erl index 0092553191..e2a1ca00c3 100644 --- a/lib/diameter/test/diameter_reg_SUITE.erl +++ b/lib/diameter/test/diameter_reg_SUITE.erl @@ -33,7 +33,7 @@ %% testcases -export([add/1, add_new/1, - del/1, + remove/1, terms/1, pids/1]). @@ -55,7 +55,7 @@ groups() -> tc() -> [add, add_new, - del, + remove, terms, pids]. @@ -80,11 +80,11 @@ add_new(_) -> true = ?reg:add_new(Ref), false = ?reg:add_new(Ref). -del(_) -> +remove(_) -> Ref = make_ref(), true = ?reg:add_new(Ref), true = ?reg:add_new({Ref}), - true = ?reg:del({Ref}), + true = ?reg:remove({Ref}), [{Ref, Pid}] = ?reg:match(Ref), Pid = self(). -- cgit v1.2.3 From 7d7271121b4d2e786bf6fb41508166efb96e5842 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 18 May 2016 11:34:48 +0200 Subject: Don't restart transport processes after transport removal A replacement accepting transport could be started after the service process received a shutdown message from diameter_config, if a connection was accepted before the transport process in question was terminated. The replacement lived on until the service needed to restart it. --- lib/diameter/src/base/diameter_service.erl | 47 +++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 14 deletions(-) (limited to 'lib/diameter') diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index b1b4c7e050..ccf68f4d93 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -136,7 +136,7 @@ state = ?WD_INITIAL :: match(wd_state()), started = diameter_lib:now(),%% at process start peer = false :: match(boolean() | pid())}). - %% true at accepted, pid() at okay/reopen + %% true at accepted/remove, pid() at okay/reopen %% Record representing a Peer State Machine processes implemented by %% diameter_peer_fsm. @@ -676,25 +676,34 @@ mod_state(Alias, ModS) -> %% remove_transport shutdown(Refs, #state{watchdogT = WatchdogT}) when is_list(Refs) -> - ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, WatchdogT); + ets:insert(WatchdogT, ets:foldl(fun(R,A) -> st(R, Refs, A) end, + [], + WatchdogT)); %% application/service shutdown shutdown(Reason, #state{watchdogT = WatchdogT}) when Reason == application; Reason == service -> - diameter_lib:wait(ets:foldl(fun(P,A) -> st(P, Reason, A) end, + diameter_lib:wait(ets:foldl(fun(P,A) -> ss(P, Reason, A) end, [], WatchdogT)). -%% st/2 +%% st/3 -st(#watchdog{ref = Ref, pid = Pid}, Refs) -> - lists:member(Ref, Refs) - andalso (Pid ! {shutdown, self(), transport}). %% 'DOWN' cleans up +%% Mark replacement as started so that a subsequent accept doesn't +%% result in a new process that isn't terminated. +st(#watchdog{ref = Ref, pid = Pid, peer = P} = Rec, Refs, Acc) -> + case lists:member(Ref, Refs) of + true -> + Pid ! {shutdown, self(), transport}, %% 'DOWN' cleans up + [Rec#watchdog{peer = true} || P == false] ++ Acc; + false -> + Acc + end. -%% st/3 +%% ss/3 -st(#watchdog{pid = Pid}, Reason, Acc) -> +ss(#watchdog{pid = Pid}, Reason, Acc) -> MRef = monitor(process, Pid), Pid ! {shutdown, self(), Reason}, [MRef | Acc]. @@ -974,11 +983,22 @@ ms(_, Svc) -> %% --------------------------------------------------------------------------- accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) -> - #watchdog{ref = Ref, type = accept = T, peer = false, options = Opts} + #watchdog{type = accept = T, peer = P} = Wd = fetch(WatchdogT, Pid), - ets:insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement started - start(Ref, T, Opts, S). %% start new watchdog + if not P -> + #watchdog{ref = Ref, options = Opts} = Wd, + %% Mark replacement started, and start new watchdog. + ets:insert(WatchdogT, Wd#watchdog{peer = true}), + start(Ref, T, Opts, S); + P -> + %% Transport removal in progress: true has been set in + %% shutdown/2, and the transport will die as a + %% consequence. + ok + end. + +%% fetch/2 fetch(Tid, Key) -> [T] = ets:lookup(Tid, Key), @@ -1317,8 +1337,7 @@ start_tc(Tc, T, _) -> tc_timeout({Ref, _Type, _Opts} = T, #state{service_name = SvcName} = S) -> tc(diameter_config:have_transport(SvcName, Ref), T, S). -tc(true, {Ref, Type, Opts}, #state{service_name = SvcName} - = S) -> +tc(true, {Ref, Type, Opts}, #state{service_name = SvcName} = S) -> send_event(SvcName, {reconnect, Ref, Opts}), start(Ref, Type, Opts, S); tc(false = No, _, _) -> %% removed -- cgit v1.2.3 From 5ca5fb71695ccdf7267007b8405f7a13497a2f17 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 18 May 2016 14:39:32 +0200 Subject: Close listening sockets at transport removal The transport interface documented in diameter_transport(3) is used to start/stop accepting/connecting transport processes: they're started with a function call, and told to die with their parent process. In the accepting case, both diameter_tcp and diameter_sctp start a listening process when the first accepting transport is started. However, there's no way for a listening process to find out that that it should stop listening when transport configuration is removed. Both diameter_tcp and diameter_sctp have used a timer to terminate the listening process after all existing accepting processes have died as a consequence of transport removal. The problem with this is that nothing stops a new client from connecting before this, and also that no new transport can succeed in opening the same listening port (eg. reconfiguration) until the old listener dies. This commit solves the problem by adding diameter_reg:subscribe/2, to allow callers to subscribe to messages about added/removed associations. A call to diameter:add_transport/2 results in a new child process that registers a term that a listening process subscribes to. Transport removal results in the death of the child, and the resulting notification to the listener causes the latter to close its socket and terminate. This is still an internal interface, but the subscription mechanism should probably be made external (eg. a diameter:subscribe/1 that can be used to subscribe to specified messages), so that transport modules other than diameter's own can make use of it. There is no support for soft upgrade. --- lib/diameter/src/base/diameter_config.erl | 106 +++++++++++++++++++------ lib/diameter/src/base/diameter_config_sup.erl | 58 ++++++++++++++ lib/diameter/src/base/diameter_sup.erl | 3 +- lib/diameter/src/modules.mk | 3 +- lib/diameter/src/transport/diameter_sctp.erl | 37 +++------ lib/diameter/src/transport/diameter_tcp.erl | 31 +++----- lib/diameter/test/diameter_transport_SUITE.erl | 6 +- 7 files changed, 172 insertions(+), 72 deletions(-) create mode 100644 lib/diameter/src/base/diameter_config_sup.erl (limited to 'lib/diameter') diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index 702f11593a..73b828536b 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -45,10 +45,12 @@ add_transport/2, remove_transport/2, have_transport/2, - lookup/1]). + lookup/1, + subscribe/2]). -%% child server start --export([start_link/0]). +%% server start +-export([start_link/0, + start_link/1]). %% gen_server callbacks -export([init/1, @@ -58,8 +60,9 @@ handle_info/2, code_change/3]). -%% diameter_sync requests. --export([sync/1]). +%% callbacks +-export([sync/1, %% diameter_sync requests + remove/0]). %% transport server termination %% debug -export([state/0, @@ -77,6 +80,9 @@ %% Table config is written to. -define(TABLE, ?MODULE). +%% Key on which a transport-specific child registers itself. +-define(TRANSPORT_KEY(Ref), {?MODULE, transport, Ref}). + %% Workaround for dialyzer's lack of understanding of match specs. -type match(T) :: T | '_' | '$1' | '$2' | '$3' | '$4'. @@ -224,6 +230,13 @@ pred(B) pred(_) -> ?THROW(pred). +%% -------------------------------------------------------------------------- +%% # subscribe/2 +%% -------------------------------------------------------------------------- + +subscribe(Ref, T) -> + diameter_reg:subscribe(?TRANSPORT_KEY(Ref), T). + %% -------------------------------------------------------------------------- %% # have_transport/2 %% @@ -264,6 +277,9 @@ start_link() -> Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}], gen_server:start_link(ServerName, Module, Args, Options). +start_link(T) -> + proc_lib:start_link(?MODULE, init, [T], infinity, []). + state() -> call(state). @@ -274,8 +290,42 @@ uptime() -> %%% # init/1 %%% ---------------------------------------------------------- +%% ?SERVER start. init([]) -> - {ok, #state{}}. + {ok, #state{}}; + +%% Child start as a consequence of add_transport. +init({SvcName, Type, Opts}) -> + Res = try + add(SvcName, Type, Opts) + catch + ?FAILURE(Reason) -> {error, Reason} + end, + proc_lib:init_ack({ok, self(), Res}), + sleep(Res). + +%% sleep/1 + +sleep({ok, _}) -> + sleep(); + +sleep({error, _}) -> + ok. %% die + +%% sleep/0 + +sleep() -> + proc_lib:hibernate(?MODULE, remove, []). + +%% remove/0 + +remove() -> + receive + {?MODULE, stop} -> + ok; + _ -> + sleep() + end. %%% ---------------------------------------------------------- %%% # handle_call/2 @@ -404,19 +454,22 @@ sync({start_service, SvcName, Opts}) -> sync({stop_service, SvcName}) -> stop(SvcName); +%% Start a child whose only purpose is to be alive for the lifetime of +%% the transport configuration and publish itself in diameter_reg. +%% This is to provide a way for processes to to be notified when the +%% configuration is removed (diameter_reg:subscribe/2). sync({add, SvcName, Type, Opts}) -> - try - add(SvcName, Type, Opts) - catch - ?FAILURE(Reason) -> {error, Reason} - end; + {ok, _Pid, Res} = diameter_config_sup:start_child({SvcName, Type, Opts}), + Res; sync({remove, SvcName, Pred}) -> - remove(select([{#transport{service = '$1', _ = '_'}, + Recs = select([{#transport{service = '$1', _ = '_'}, [{'=:=', '$1', {const, SvcName}}], ['$_']}]), - SvcName, - Pred). + F = fun(#transport{ref = R, type = T, options = O}) -> + Pred(R,T,O) + end, + remove(SvcName, lists:filter(F, Recs)). %% start/3 @@ -503,6 +556,7 @@ add(SvcName, Type, Opts) -> ok = transport_opts(Opts), Ref = make_ref(), + true = diameter_reg:add_new(?TRANSPORT_KEY(Ref)), T = {Ref, Type, Opts}, %% The call to the service returns error if the service isn't %% started yet, which is harmless. The transport will be started @@ -594,26 +648,30 @@ start_transport(SvcName, T) -> No end. -%% remove/3 - -remove(L, SvcName, Pred) -> - rm(SvcName, lists:filter(fun(#transport{ref = R, type = T, options = O}) -> - Pred(R,T,O) - end, - L)). +%% remove/2 -rm(_, []) -> +remove(_, []) -> ok; -rm(SvcName, L) -> + +remove(SvcName, L) -> Refs = lists:map(fun(#transport{ref = R}) -> R end, L), case stop_transport(SvcName, Refs) of ok -> + lists:foreach(fun stop_child/1, Refs), diameter_stats:flush(Refs), lists:foreach(fun delete_object/1, L); {error, _} = No -> No end. +stop_child(Ref) -> + case diameter_reg:match(?TRANSPORT_KEY(Ref)) of + [{_, Pid}] -> %% tell the transport-specific child to die + Pid ! {?MODULE, stop}; + [] -> %% already removed/dead + ok + end. + stop_transport(SvcName, Refs) -> case diameter_service:stop_transport(SvcName, Refs) of ok -> diff --git a/lib/diameter/src/base/diameter_config_sup.erl b/lib/diameter/src/base/diameter_config_sup.erl new file mode 100644 index 0000000000..9524573378 --- /dev/null +++ b/lib/diameter/src/base/diameter_config_sup.erl @@ -0,0 +1,58 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2016. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% Supervisor for config processes. +%% + +-module(diameter_config_sup). + +-behaviour(supervisor). + +%% interface +-export([start_link/0, %% supervisor start + start_child/1]). %% config start + +-export([init/1]). + +-define(NAME, ?MODULE). %% supervisor name + +%% start_link/0 + +start_link() -> + SupName = {local, ?NAME}, + supervisor:start_link(SupName, ?MODULE, []). + +%% start_child/1 + +start_child(T) -> + supervisor:start_child(?NAME, [T]). + +%% init/1 + +init([]) -> + Mod = diameter_config, + Flags = {simple_one_for_one, 0, 1}, + ChildSpec = {Mod, + {Mod, start_link, []}, + temporary, + 1000, + worker, + [Mod]}, + {ok, {Flags, [ChildSpec]}}. diff --git a/lib/diameter/src/base/diameter_sup.erl b/lib/diameter/src/base/diameter_sup.erl index e89ede9843..482289cb9a 100644 --- a/lib/diameter/src/base/diameter_sup.erl +++ b/lib/diameter/src/base/diameter_sup.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -34,6 +34,7 @@ -export([init/1]). -define(CHILDREN, [diameter_misc_sup, + diameter_config_sup, diameter_watchdog_sup, diameter_peer_fsm_sup, diameter_transport_sup, diff --git a/lib/diameter/src/modules.mk b/lib/diameter/src/modules.mk index 3b223ea391..4e4ce60ddf 100644 --- a/lib/diameter/src/modules.mk +++ b/lib/diameter/src/modules.mk @@ -1,7 +1,7 @@ # %CopyrightBegin% # -# Copyright Ericsson AB 2010-2015. All Rights Reserved. +# Copyright Ericsson AB 2010-2016. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -37,6 +37,7 @@ RT_MODULES = \ base/diameter_callback \ base/diameter_capx \ base/diameter_config \ + base/diameter_config_sup \ base/diameter_codec \ base/diameter_dict \ base/diameter_lib \ diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 8a80ce630a..4a005b853d 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -61,10 +61,6 @@ %% Remote addresses to accept connections from. -define(DEFAULT_ACCEPT, []). %% any -%% How long a listener with no associations lives before offing -%% itself. --define(LISTENER_TIMEOUT, 30000). - %% How long to wait for a transport process to attach after %% association establishment. -define(ACCEPT_TIMEOUT, 5000). @@ -104,7 +100,6 @@ socket :: gen_sctp:sctp_socket(), count = 0 :: uint(), %% attached transport processes pending = {0, queue:new()}, - tref :: reference() | undefined, accept :: [match()]}). %% Field pending implements two queues: the first of transport-to-be %% processes to which an association has been assigned but for which @@ -216,14 +211,15 @@ init(T) -> %% A process owning a listening socket. i({listen, Ref, {Opts, Addrs}}) -> + [_] = diameter_config:subscribe(Ref, transport), %% assert existence {[Matches], Rest} = proplists:split(Opts, [accept]), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), proc_lib:init_ack({ok, self(), LAs}), - start_timer(#listener{ref = Ref, - socket = Sock, - accept = [[M] || {accept, M} <- Matches]}); + #listener{ref = Ref, + socket = Sock, + accept = [[M] || {accept, M} <- Matches]}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> @@ -431,13 +427,6 @@ putr(Key, Val) -> getr(Key) -> get({?MODULE, Key}). -%% start_timer/1 - -start_timer(#listener{count = 0} = S) -> - S#listener{tref = erlang:start_timer(?LISTENER_TIMEOUT, self(), close)}; -start_timer(S) -> - S. - %% l/2 %% %% Transition listener state. @@ -455,12 +444,10 @@ l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> down(queue:member(TPid, Q), TPid, S); -%% Timeout after the last accepting process has died. -l({timeout, TRef, close = T}, #listener{tref = TRef, - count = 0}) -> - x(T); -l({timeout, _, close}, #listener{} = S) -> - S. +%% Transport has been removed. +l({transport, remove, _} = T, #listener{socket = Sock}) -> + gen_sctp:close(Sock), + x(T). %% down/3 %% @@ -472,15 +459,15 @@ down(true, TPid, #listener{pending = {N,Q}, = S) -> NQ = queue:filter(fun(P) -> P /= TPid end, Q), if N < 0 -> %% awaiting an association ... - start_timer(S#listener{count = K-1, - pending = {N+1, NQ}}); + S#listener{count = K-1, + pending = {N+1, NQ}}; true -> %% ... or one has been assigned S#listener{pending = {N-1, NQ}} end; %% ... or one that's already attached. down(false, _TPid, #listener{count = K} = S) -> - start_timer(S#listener{count = K-1}). + S#listener{count = K-1}. %% t/2 %% diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 6a5e5fe89d..546c2cfa5e 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -57,7 +57,6 @@ -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). -define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 --define(LISTENER_TIMEOUT, 30000). -define(DEFAULT_FRAGMENT_TIMEOUT, 1000). -define(IS_UINT32(N), (is_integer(N) andalso 0 =< N andalso 0 == N bsr 32)). @@ -73,8 +72,10 @@ %% Listener process state. -record(listener, {socket :: inet:socket(), - count = 1 :: non_neg_integer(), - tref :: reference() | undefined}). + count = 1 :: non_neg_integer()}). %% accepting processes +%% The count of accepting processes was previously used to terminate +%% the listening process, but diameter_reg:subscribe/2 is now used for +%% this. Leave the the count for trace purposes. %% Monitor process state. -record(monitor, @@ -240,6 +241,7 @@ i(#monitor{parent = Pid, transport = TPid} = S) -> %% gen_tcp seems to so. Links should be left to supervisors. i({listen, LRef, APid, {Mod, Opts, Addrs}}) -> + [_] = diameter_config:subscribe(LRef, transport), %% assert existence {[LA, LP], Rest} = proplists:split(Opts, [ip, port]), LAddrOpt = get_addr(LA, Addrs), LPort = get_port(LP), @@ -248,7 +250,7 @@ i({listen, LRef, APid, {Mod, Opts, Addrs}}) -> true = diameter_reg:add_new({?MODULE, listener, {LRef, {LAddr, LSock}}}), proc_lib:init_ack({ok, self(), {LAddr, LSock}}), monitor(process, APid), - start_timer(#listener{socket = LSock}). + #listener{socket = LSock}. laddr([], Mod, Sock) -> {ok, {Addr, _Port}} = sockname(Mod, Sock), @@ -484,13 +486,6 @@ putr(Key, Val) -> getr(Key) -> get({?MODULE, Key}). -%% start_timer/1 - -start_timer(#listener{count = 0} = S) -> - S#listener{tref = erlang:start_timer(?LISTENER_TIMEOUT, self(), close)}; -start_timer(S) -> - S. - %% m/2 %% %% Transition monitor state. @@ -512,21 +507,19 @@ m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, %% %% Transition listener state. -%% Another accept transport is attaching. +%% An accepting transport is attaching. l({accept, TPid}, #listener{count = N} = S) -> monitor(process, TPid), S#listener{count = N+1}; %% Accepting process has died. l({'DOWN', _, process, _, _}, #listener{count = N} = S) -> - start_timer(S#listener{count = N-1}); + S#listener{count = N-1}; -%% Timeout after the last accepting process has died. -l({timeout, TRef, close = T}, #listener{tref = TRef, - count = 0}) -> - x(T); -l({timeout, _, close}, #listener{} = S) -> - S. +%% Transport has been removed. +l({transport, remove, _} = T, #listener{socket = Sock}) -> + gen_tcp:close(Sock), + x(T). %% t/2 %% diff --git a/lib/diameter/test/diameter_transport_SUITE.erl b/lib/diameter/test/diameter_transport_SUITE.erl index 53d2d6660e..f2884eacb4 100644 --- a/lib/diameter/test/diameter_transport_SUITE.erl +++ b/lib/diameter/test/diameter_transport_SUITE.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -140,7 +140,9 @@ sctp_accept(Config) -> -define(PEER_COUNT, 8). accept(Prot) -> - T = {Prot, make_ref()}, + Ref = make_ref(), + true = diameter_reg:add_new({diameter_config, transport, Ref}), %% fake it + T = {Prot, Ref}, [] = ?util:run(?util:scramble(acc(2*?PEER_COUNT, T, []))). acc(0, _, Acc) -> -- cgit v1.2.3 From 80dc4f14b21cf316a8000f91cd77b1f0653afa7c Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Tue, 24 May 2016 14:58:50 +0200 Subject: Fix leaking transports in traffic/relay suites Listening transports weren't removed, which diameter_reg:subs/0 revealed. --- lib/diameter/test/diameter_relay_SUITE.erl | 12 ++++++++++-- lib/diameter/test/diameter_traffic_SUITE.erl | 6 +++++- lib/diameter/test/diameter_util.erl | 11 ++++++----- 3 files changed, 21 insertions(+), 8 deletions(-) (limited to 'lib/diameter') diff --git a/lib/diameter/test/diameter_relay_SUITE.erl b/lib/diameter/test/diameter_relay_SUITE.erl index f766f54a80..b5e520e642 100644 --- a/lib/diameter/test/diameter_relay_SUITE.erl +++ b/lib/diameter/test/diameter_relay_SUITE.erl @@ -171,8 +171,9 @@ connect(Config) -> Conns)). disconnect(Config) -> - lists:foreach(fun({{CN,CR},{SN,SR}}) -> ?util:disconnect(CN,CR,SN,SR) end, - ?util:read_priv(Config, "cfg")). + [] = [{T,C} || C <- ?util:read_priv(Config, "cfg"), + T <- [break(C)], + T /= ok]. stop_services(_Config) -> [] = [{H,T} || H <- ?SERVICES, @@ -184,6 +185,13 @@ stop(_Config) -> %% ---------------------------------------- +break({{CN,CR},{SN,SR}}) -> + try + ?util:disconnect(CN,CR,SN,SR) + after + diameter:remove_transport(SN, SR) + end. + server(Name, Dict) -> ok = diameter:start_service(Name, ?SERVICE(Name, Dict)), {Name, ?util:listen(Name, tcp)}. diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index 976abf9138..6f3a4801ee 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -428,7 +428,11 @@ remove_transports(Config) -> server_service = SN} = group(Config), [LRef | Cs] = ?util:read_priv(Config, "transport"), - [?util:disconnect(CN, C, SN, LRef) || C <- Cs]. + try + [] = [T || C <- Cs, T <- [?util:disconnect(CN, C, SN, LRef)], T /= ok] + after + ok = diameter:remove_transport(SN, LRef) + end. stop_services(Config) -> #group{client_service = CN, diff --git a/lib/diameter/test/diameter_util.erl b/lib/diameter/test/diameter_util.erl index 52b747e99c..f26f1e999a 100644 --- a/lib/diameter/test/diameter_util.erl +++ b/lib/diameter/test/diameter_util.erl @@ -345,11 +345,12 @@ transport(SvcName, Ref) -> disconnect(Client, Ref, Server, LRef) -> true = diameter:subscribe(Server), ok = diameter:remove_transport(Client, Ref), - ok = receive - {diameter_event, Server, {down, LRef, _, _}} -> ok - after 10000 -> - {Client, Ref, Server, LRef, process_info(self(), messages)} - end. + receive + {diameter_event, Server, {down, LRef, _, _}} -> + ok + after 10000 -> + {Client, Ref, Server, LRef, process_info(self(), messages)} + end. %% --------------------------------------------------------------------------- -- cgit v1.2.3