diff options
Diffstat (limited to 'lib/diameter/src/base')
-rw-r--r-- | lib/diameter/src/base/diameter_lib.erl | 2 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer.erl | 6 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 20 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_reg.erl | 235 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 25 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_traffic.erl | 1 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 2 |
7 files changed, 126 insertions, 165 deletions
diff --git a/lib/diameter/src/base/diameter_lib.erl b/lib/diameter/src/base/diameter_lib.erl index 8792e97621..1c1ea42cb5 100644 --- a/lib/diameter/src/base/diameter_lib.erl +++ b/lib/diameter/src/base/diameter_lib.erl @@ -283,7 +283,7 @@ ip(T) %% Or not: convert from '.'/':'-separated decimal/hex. ip(Addr) -> - {ok, A} = inet_parse:address(Addr), %% documented in inet(3) + {ok, A} = inet:parse_address(Addr), A. %% --------------------------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index 2759f17e64..4cb5a57a54 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -202,10 +202,10 @@ match1(Addr, Match) -> match(Addr, {ok, A}, _) -> Addr == A; match(Addr, {error, _}, RE) -> - match == re:run(inet_parse:ntoa(Addr), RE, [{capture, none}]). + match == re:run(inet:ntoa(Addr), RE, [{capture, none}, caseless]). addr([_|_] = A) -> - inet_parse:address(A); + inet:parse_address(A); addr(A) -> {ok, A}. diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 18904b8a55..d99f11a697 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -544,11 +544,11 @@ put_route(Pid) -> MRef = monitor(process, Pid), put(Pid, MRef). -%% get_route/2 +%% get_route/3 -%% incoming answer -get_route(_, #diameter_packet{header = #diameter_header{is_request = false}} - = Pkt) -> +%% Incoming answer. +get_route(_, _, #diameter_packet{header = #diameter_header{is_request = false}} + = Pkt) -> Seqs = diameter_codec:sequence_numbers(Pkt), case erase(Seqs) of {Pid, Ref, MRef} -> @@ -559,8 +559,14 @@ get_route(_, #diameter_packet{header = #diameter_header{is_request = false}} false end; -%% incoming request -get_route(Ack, _) -> +%% Requests answered here ... +get_route(_, N, _) + when N == 'CER'; + N == 'DPR' -> + false; + +%% ... or not. +get_route(Ack, _, _) -> Ack. %% erase_route/1 @@ -745,7 +751,7 @@ recv1('DPA' = Name, %% Any other message with a header and no length errors. recv1(Name, H, Msg, #state{parent = Pid, ack = Ack} = S) -> Pkt = pkt(H, Msg), - Pid ! {recv, self(), get_route(Ack, Pkt), Name, Pkt}, + Pid ! {recv, self(), get_route(Ack, Name, Pkt), Name, Pkt}, handle(Name, Pkt, S). %% pkt/2 diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 97e74657bd..bd5db54a5c 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -19,10 +19,11 @@ %% %% -%% The module implements a simple term -> pid registry. +%% A simple term -> pid registry. %% -module(diameter_reg). + -behaviour(gen_server). -export([add/1, @@ -57,18 +58,18 @@ -type key() :: term(). -type from() :: {pid(), term()}. +-type rcvr() :: [pid() | term()] %% subscribe + | from(). %% wait -type pattern() :: term(). -record(state, {id = diameter_lib:now(), - receivers = dict:new() - :: dict:dict(pattern(), [[pid() | term()]%% subscribe - | from()]), %% wait + notify = #{} :: #{pattern() => [rcvr()]}, monitors = sets:new() :: sets:set(pid())}). %% The ?TABLE bag contains the Key -> Pid mapping, as {Key, Pid} %% tuples. Each pid is stored in the monitors set to ensure only one %% monitor for each pid: more are harmless, but unnecessary. A pattern -%% is added to receivers a result of calls to wait/1 or subscribe/2: +%% is added to notify a result of calls to wait/1 or subscribe/2: %% changes to ?TABLE causes processes to be notified as required. %% =========================================================================== @@ -156,7 +157,7 @@ wait(Pat) -> %% # subscribe(Pat, T) %% %% Like match/1, but additionally receive messages of the form -%% {T, add|remove, {term(), pid()} when associations are added +%% {T, add|remove, {term(), pid()}} when associations are added %% or removed. %% =========================================================================== @@ -186,15 +187,12 @@ uptime() -> -> [{pid(), [key()]}]. pids() -> - to_list(fun swap/1). - -to_list(Fun) -> - ets:foldl(fun(T,D) -> append(Fun(T), D) end, orddict:new(), ?TABLE). + append(ets:select(?TABLE, [{{'$1','$2'}, [], [{{'$2', '$1'}}]}])). -append({K,V}, Dict) -> - orddict:append(K, V, Dict). - -id(T) -> T. +append(Pairs) -> + dict:to_list(lists:foldl(fun({K,V}, D) -> dict:append(K, V, D) end, + dict:new(), + Pairs)). %% terms/0 @@ -202,9 +200,7 @@ id(T) -> T. -> [{key(), [pid()]}]. terms() -> - to_list(fun id/1). - -swap({X,Y}) -> {Y,X}. + append(ets:tab2list(?TABLE)). %% subs/0 @@ -212,31 +208,19 @@ swap({X,Y}) -> {Y,X}. -> [{pattern(), [{pid(), term()}]}]. subs() -> - #state{receivers = RD} = state(), - dict:fold(fun sub/3, orddict:new(), RD). - -sub(Pat, Ps, Dict) -> - lists:foldl(fun([P|T], D) -> orddict:append(Pat, {P,T}, D); - (_, D) -> D - end, - Dict, - Ps). + #state{notify = Dict} = state(), + [{K, Ts} || {K,Ps} <- maps:to_list(Dict), + Ts <- [[{P,T} || [P|T] <- Ps]]]. %% waits/0 -spec waits() - -> [{pattern(), [{from(), term()}]}]. + -> [{pattern(), [from()]}]. waits() -> - #state{receivers = RD} = state(), - dict:fold(fun wait/3, orddict:new(), RD). - -wait(Pat, Ps, Dict) -> - lists:foldl(fun({_,_} = F, D) -> orddict:append(Pat, F, D); - (_, D) -> D - end, - Dict, - Ps). + #state{notify = Dict} = state(), + [{K, Ts} || {K,Ps} <- maps:to_list(Dict), + Ts <- [[T || {_,_} = T <- Ps]]]. %% ---------------------------------------------------------- %% # init/1 @@ -250,33 +234,28 @@ init(_) -> %% # handle_call/3 %% ---------------------------------------------------------- -handle_call({add, Uniq, Key}, {Pid, _}, S0) -> +handle_call({add, Uniq, Key}, {Pid, _}, S) -> Rec = {Key, Pid}, - S1 = flush(Uniq, Rec, S0), + NS = flush(Uniq, Rec, S), %% before insert {Res, New} = insert(Uniq, Rec), - {Recvs, S} = add(New, Rec, S1), - notify(Recvs, Rec), - {reply, Res, S}; + {reply, Res, notify(add, New andalso Rec, NS)}; handle_call({remove, Key}, {Pid, _}, S) -> Rec = {Key, Pid}, - Recvs = delete([Rec], S), ets:delete_object(?TABLE, Rec), - notify(Recvs, remove), - {reply, true, S}; + {reply, true, notify(remove, Rec, S)}; -handle_call({wait, Pat}, {Pid, _} = From, #state{receivers = RD} = S) -> +handle_call({wait, Pat}, {Pid, _} = From, S) -> NS = add_monitor(Pid, S), case match(Pat) of - [_|_] = L -> - {reply, L, NS}; + [_|_] = Recs -> + {reply, Recs, NS}; [] -> - {noreply, NS#state{receivers = dict:append(Pat, From, RD)}} + {noreply, queue(Pat, From, NS)} end; -handle_call({subscribe, Pat, T}, {Pid, _}, #state{receivers = RD} = S) -> - NS = add_monitor(Pid, S), - {reply, match(Pat), NS#state{receivers = dict:append(Pat, [Pid | T], RD)}}; +handle_call({subscribe, Pat, T}, {Pid, _}, S) -> + {reply, match(Pat), queue(Pat, [Pid | T], add_monitor(Pid, S))}; handle_call(state, _, S) -> {reply, S, S}; @@ -332,106 +311,60 @@ insert(true, Rec) -> B = ets:insert_new(?TABLE, Rec), %% entry inserted? {B, B}. -%% add/3 - +%% add_monitor/2 +%% %% Only add a single monitor for any given process, since there's no %% use to more. -add(true, {_Key, Pid} = Rec, S) -> - NS = add_monitor(Pid, S), - {Recvs, RD} = add(Rec, NS), - {Recvs, S#state{receivers = RD}}; - -add(false = No, _, S) -> - {No, S}. - -%% add/2 - -%% Notify processes whose patterns match the inserted key. -add({_Key, Pid} = Rec, #state{receivers = RD}) -> - dict:fold(fun(Pt, Ps, A) -> - add(lists:member(Rec, match(Pt, Pid)), Pt, Ps, Rec, A) - end, - {sets:new(), RD}, - RD). - -%% add/5 - -add(true, Pat, Recvs, {_,_} = Rec, {Set, Dict}) -> - {lists:foldl(fun sets:add_element/2, Set, Recvs), - remove(fun erlang:is_list/1, Pat, Recvs, Dict)}; -add(false, _, _, _, Acc) -> - Acc. +add_monitor(Pid, #state{monitors = Ps} = S) -> + case sets:is_element(Pid, Ps) of + false -> + monitor(process, Pid), + S#state{monitors = sets:add_element(Pid, Ps)}; + true -> + S + end. -%% add_monitor/2 - -add_monitor(Pid, #state{monitors = MS} = S) -> - add_monitor(sets:is_element(Pid, MS), Pid, S). - -%% add_monitor/3 - -add_monitor(false, Pid, #state{monitors = MS} = S) -> - monitor(process, Pid), - S#state{monitors = sets:add_element(Pid, MS)}; - -add_monitor(true, _, S) -> - S. - -%% delete/2 - -delete(Recs, #state{receivers = RD}) -> - lists:foldl(fun(R,S) -> delete(R, RD, S) end, sets:new(), Recs). +%% notify/3 -%% delete/3 +notify(_, false, S) -> + S; -delete({_Key, Pid} = Rec, RD, Set) -> - dict:fold(fun(Pt, Ps, S) -> - delete(lists:member(Rec, match(Pt, Pid)), Rec, Ps, S) - end, - Set, - RD). +notify(Op, {_,_} = Rec, #state{notify = Dict} = S) -> + S#state{notify = maps:fold(fun(P,Rs,D) -> notify(Op, Rec, P, Rs, D) end, + Dict, + Dict)}. -%% delete/4 +%% notify/5 -%% Entry matches a pattern ... -delete(true, Rec, Recvs, Set) -> - lists:foldl(fun(R,S) -> sets:add_element({R, Rec}, S) end, - Set, - Recvs); - -%% ... or not. -delete(false, _, _, Set) -> - Set. - -%% notify/2 - -notify(false = No, _) -> - No; - -notify(Recvs, remove = Op) -> - sets:fold(fun({P,R}, N) -> send(P, R, Op), N+1 end, 0, Recvs); - -notify(Recvs, {_,_} = Rec) -> - sets:fold(fun(P,N) -> send(P, Rec, add), N+1 end, 0, Recvs). +notify(Op, {_, Pid} = Rec, Pat, Rcvrs, Dict) -> + case lists:member(Rec, match(Pat, Pid)) of + true -> + reset(Pat, Dict, [P || P <- Rcvrs, send(P, Op, Rec)]); + false -> + Dict + end. %% send/3 -%% No processes waiting on remove, by construction: they've either -%% received notification at add or aren't waiting. -send([Pid | T], Rec, Op) -> - Pid ! {T, Op, Rec}; +send([Pid | T], Op, Rec) -> + Pid ! {T, Op, Rec}, + true; -send({_,_} = From, Rec, add) -> - gen_server:reply(From, [Rec]). +%% No processes wait on remove: they receive notification immediately +%% or at add, by construction. +send({_,_} = From, add, Rec) -> + gen_server:reply(From, [Rec]), + false. %% down/2 -down(Pid, #state{monitors = MS} = S) -> - NS = flush(Pid, S), - Recvs = delete(match('_', Pid), NS), +down(Pid, #state{monitors = Ps} = S) -> + Recs = match('_', Pid), ets:match_delete(?TABLE, {'_', Pid}), - notify(Recvs, remove), - NS#state{monitors = sets:del_element(Pid, MS)}. + lists:foldl(fun(R,NS) -> notify(remove, R, NS) end, + flush(Pid, S#state{monitors = sets:del_element(Pid, Ps)}), + Recs). %% flush/3 @@ -452,16 +385,15 @@ flush(false, _, S) -> %% flush/2 %% Process has died and should no longer receive messages/replies. -flush(Pid, #state{receivers = RD} = S) - when is_pid(Pid) -> - S#state{receivers = dict:fold(fun(Pt,Ps,D) -> flush(Pid, Pt, Ps, D) end, - RD, - RD)}. +flush(Pid, #state{notify = Dict} = S) -> + S#state{notify = maps:fold(fun(P,Rs,D) -> flush(Pid, P, Rs, D) end, + Dict, + Dict)}. %% flush/4 -flush(Pid, Pat, Recvs, Dict) -> - remove(fun(T) -> Pid /= head(T) end, Pat, Recvs, Dict). +flush(Pid, Pat, Rcvrs, Dict) -> + reset(Pat, Dict, [T || T <- Rcvrs, Pid /= head(T)]). %% head/1 @@ -471,15 +403,18 @@ head([P|_]) -> head({P,_}) -> P. -%% remove/4 +%% reset/3 + +reset(Key, Map, []) -> + maps:remove(Key, Map); + +reset(Key, Map, List) -> + maps:put(Key, List, Map). + +%% queue/3 -remove(Pred, Key, Values, Dict) -> - case lists:filter(Pred, Values) of - [] -> - dict:erase(Key, Dict); - Rest -> - dict:store(Key, Rest, Dict) - end. +queue(Pat, Rcvr, #state{notify = Dict} = S) -> + S#state{notify = maps:put(Pat, [Rcvr | maps:get(Pat, Dict, [])], Dict)}. %% call/1 diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 7cd4d23402..31dd92f878 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -530,6 +530,13 @@ transition({tc_timeout, T}, S) -> tc_timeout(T, S), ok; +transition({nodeup, Node, _}, S) -> + nodeup(Node, S), + ok; + +transition({nodedown, _Node, _}, _) -> + ok; + transition(Req, S) -> unexpected(handle_info, [Req], S), ok. @@ -749,6 +756,8 @@ mref(P) -> init_shared(#state{options = #{use_shared_peers := T}, service_name = Svc}) -> + T == false orelse net_kernel:monitor_nodes(true, [{node_type, visible}, + nodedown_reason]), notify(T, Svc, {service, self()}). init_mod(#diameter_app{alias = Alias, @@ -764,6 +773,11 @@ notify(Share, SvcName, T) -> %% Test for the empty list for upgrade reasons: there's no %% diameter_peer:notify/3 in old code. +nodeup(Node, #state{options = #{share_peers := SP}, + service_name = SvcName}) -> + lists:member(Node, remotes(SP)) + andalso diameter_peer:notify([Node], SvcName, {service, self()}). + remotes(false) -> []; @@ -1440,9 +1454,15 @@ is_remote(Pid, T) -> %% # remote_peer_up/4 %% --------------------------------------------------------------------------- -remote_peer_up(TPid, Aliases, Caps, #state{options = #{use_shared_peers := T}} +remote_peer_up(TPid, Aliases, Caps, #state{options = #{use_shared_peers := T}, + remote = {PeerT, _, _}} = S) -> - is_remote(TPid, T) andalso rpu(TPid, Aliases, Caps, S). + is_remote(TPid, T) + andalso not ets:member(PeerT, TPid) + andalso rpu(TPid, Aliases, Caps, S). + +%% Notification can be duplicate since remote nodes push and the local +%% node pulls. rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) -> #diameter_service{applications = Apps} = Svc, @@ -1452,6 +1472,7 @@ rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) -> rpu(_, [] = No, _, _) -> No; + rpu(TPid, Aliases, Caps, {PeerT, _, _} = RT) -> monitor(process, TPid), ets:insert(PeerT, #peer{pid = TPid, diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index a10bf78d6e..f510f40a17 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -93,6 +93,7 @@ caller :: pid() | undefined, %% calling process handler :: pid(), %% request process peer :: undefined | {pid(), #diameter_caps{}}, + caps :: undefined, %% no longer used packet :: #diameter_packet{} | undefined}). %% of request %% --------------------------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index c08e2da672..43623334a9 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -74,7 +74,6 @@ okay := non_neg_integer()}, %% REOPEN -> OKAY codec :: #{decode_format := none, string_decode := false, - strict_arities => diameter:strict_arities(), strict_mbit := boolean(), rfc := 3588 | 6733, ordered_encode := false}, @@ -152,7 +151,6 @@ i({Ack, T, Pid, {Opts, okay => 3}, Opts)), codec = maps:with([decode_format, - strict_arities, strict_mbit, string_decode, rfc, |