aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base')
-rw-r--r--lib/diameter/src/base/diameter_lib.erl2
-rw-r--r--lib/diameter/src/base/diameter_peer.erl6
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl20
-rw-r--r--lib/diameter/src/base/diameter_reg.erl235
-rw-r--r--lib/diameter/src/base/diameter_service.erl25
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl1
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl2
7 files changed, 126 insertions, 165 deletions
diff --git a/lib/diameter/src/base/diameter_lib.erl b/lib/diameter/src/base/diameter_lib.erl
index 8792e97621..1c1ea42cb5 100644
--- a/lib/diameter/src/base/diameter_lib.erl
+++ b/lib/diameter/src/base/diameter_lib.erl
@@ -283,7 +283,7 @@ ip(T)
%% Or not: convert from '.'/':'-separated decimal/hex.
ip(Addr) ->
- {ok, A} = inet_parse:address(Addr), %% documented in inet(3)
+ {ok, A} = inet:parse_address(Addr),
A.
%% ---------------------------------------------------------------------------
diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl
index 2759f17e64..4cb5a57a54 100644
--- a/lib/diameter/src/base/diameter_peer.erl
+++ b/lib/diameter/src/base/diameter_peer.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -202,10 +202,10 @@ match1(Addr, Match) ->
match(Addr, {ok, A}, _) ->
Addr == A;
match(Addr, {error, _}, RE) ->
- match == re:run(inet_parse:ntoa(Addr), RE, [{capture, none}]).
+ match == re:run(inet:ntoa(Addr), RE, [{capture, none}, caseless]).
addr([_|_] = A) ->
- inet_parse:address(A);
+ inet:parse_address(A);
addr(A) ->
{ok, A}.
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 18904b8a55..d99f11a697 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -544,11 +544,11 @@ put_route(Pid) ->
MRef = monitor(process, Pid),
put(Pid, MRef).
-%% get_route/2
+%% get_route/3
-%% incoming answer
-get_route(_, #diameter_packet{header = #diameter_header{is_request = false}}
- = Pkt) ->
+%% Incoming answer.
+get_route(_, _, #diameter_packet{header = #diameter_header{is_request = false}}
+ = Pkt) ->
Seqs = diameter_codec:sequence_numbers(Pkt),
case erase(Seqs) of
{Pid, Ref, MRef} ->
@@ -559,8 +559,14 @@ get_route(_, #diameter_packet{header = #diameter_header{is_request = false}}
false
end;
-%% incoming request
-get_route(Ack, _) ->
+%% Requests answered here ...
+get_route(_, N, _)
+ when N == 'CER';
+ N == 'DPR' ->
+ false;
+
+%% ... or not.
+get_route(Ack, _, _) ->
Ack.
%% erase_route/1
@@ -745,7 +751,7 @@ recv1('DPA' = Name,
%% Any other message with a header and no length errors.
recv1(Name, H, Msg, #state{parent = Pid, ack = Ack} = S) ->
Pkt = pkt(H, Msg),
- Pid ! {recv, self(), get_route(Ack, Pkt), Name, Pkt},
+ Pid ! {recv, self(), get_route(Ack, Name, Pkt), Name, Pkt},
handle(Name, Pkt, S).
%% pkt/2
diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl
index 97e74657bd..bd5db54a5c 100644
--- a/lib/diameter/src/base/diameter_reg.erl
+++ b/lib/diameter/src/base/diameter_reg.erl
@@ -19,10 +19,11 @@
%%
%%
-%% The module implements a simple term -> pid registry.
+%% A simple term -> pid registry.
%%
-module(diameter_reg).
+
-behaviour(gen_server).
-export([add/1,
@@ -57,18 +58,18 @@
-type key() :: term().
-type from() :: {pid(), term()}.
+-type rcvr() :: [pid() | term()] %% subscribe
+ | from(). %% wait
-type pattern() :: term().
-record(state, {id = diameter_lib:now(),
- receivers = dict:new()
- :: dict:dict(pattern(), [[pid() | term()]%% subscribe
- | from()]), %% wait
+ notify = #{} :: #{pattern() => [rcvr()]},
monitors = sets:new() :: sets:set(pid())}).
%% The ?TABLE bag contains the Key -> Pid mapping, as {Key, Pid}
%% tuples. Each pid is stored in the monitors set to ensure only one
%% monitor for each pid: more are harmless, but unnecessary. A pattern
-%% is added to receivers a result of calls to wait/1 or subscribe/2:
+%% is added to notify a result of calls to wait/1 or subscribe/2:
%% changes to ?TABLE causes processes to be notified as required.
%% ===========================================================================
@@ -156,7 +157,7 @@ wait(Pat) ->
%% # subscribe(Pat, T)
%%
%% Like match/1, but additionally receive messages of the form
-%% {T, add|remove, {term(), pid()} when associations are added
+%% {T, add|remove, {term(), pid()}} when associations are added
%% or removed.
%% ===========================================================================
@@ -186,15 +187,12 @@ uptime() ->
-> [{pid(), [key()]}].
pids() ->
- to_list(fun swap/1).
-
-to_list(Fun) ->
- ets:foldl(fun(T,D) -> append(Fun(T), D) end, orddict:new(), ?TABLE).
+ append(ets:select(?TABLE, [{{'$1','$2'}, [], [{{'$2', '$1'}}]}])).
-append({K,V}, Dict) ->
- orddict:append(K, V, Dict).
-
-id(T) -> T.
+append(Pairs) ->
+ dict:to_list(lists:foldl(fun({K,V}, D) -> dict:append(K, V, D) end,
+ dict:new(),
+ Pairs)).
%% terms/0
@@ -202,9 +200,7 @@ id(T) -> T.
-> [{key(), [pid()]}].
terms() ->
- to_list(fun id/1).
-
-swap({X,Y}) -> {Y,X}.
+ append(ets:tab2list(?TABLE)).
%% subs/0
@@ -212,31 +208,19 @@ swap({X,Y}) -> {Y,X}.
-> [{pattern(), [{pid(), term()}]}].
subs() ->
- #state{receivers = RD} = state(),
- dict:fold(fun sub/3, orddict:new(), RD).
-
-sub(Pat, Ps, Dict) ->
- lists:foldl(fun([P|T], D) -> orddict:append(Pat, {P,T}, D);
- (_, D) -> D
- end,
- Dict,
- Ps).
+ #state{notify = Dict} = state(),
+ [{K, Ts} || {K,Ps} <- maps:to_list(Dict),
+ Ts <- [[{P,T} || [P|T] <- Ps]]].
%% waits/0
-spec waits()
- -> [{pattern(), [{from(), term()}]}].
+ -> [{pattern(), [from()]}].
waits() ->
- #state{receivers = RD} = state(),
- dict:fold(fun wait/3, orddict:new(), RD).
-
-wait(Pat, Ps, Dict) ->
- lists:foldl(fun({_,_} = F, D) -> orddict:append(Pat, F, D);
- (_, D) -> D
- end,
- Dict,
- Ps).
+ #state{notify = Dict} = state(),
+ [{K, Ts} || {K,Ps} <- maps:to_list(Dict),
+ Ts <- [[T || {_,_} = T <- Ps]]].
%% ----------------------------------------------------------
%% # init/1
@@ -250,33 +234,28 @@ init(_) ->
%% # handle_call/3
%% ----------------------------------------------------------
-handle_call({add, Uniq, Key}, {Pid, _}, S0) ->
+handle_call({add, Uniq, Key}, {Pid, _}, S) ->
Rec = {Key, Pid},
- S1 = flush(Uniq, Rec, S0),
+ NS = flush(Uniq, Rec, S), %% before insert
{Res, New} = insert(Uniq, Rec),
- {Recvs, S} = add(New, Rec, S1),
- notify(Recvs, Rec),
- {reply, Res, S};
+ {reply, Res, notify(add, New andalso Rec, NS)};
handle_call({remove, Key}, {Pid, _}, S) ->
Rec = {Key, Pid},
- Recvs = delete([Rec], S),
ets:delete_object(?TABLE, Rec),
- notify(Recvs, remove),
- {reply, true, S};
+ {reply, true, notify(remove, Rec, S)};
-handle_call({wait, Pat}, {Pid, _} = From, #state{receivers = RD} = S) ->
+handle_call({wait, Pat}, {Pid, _} = From, S) ->
NS = add_monitor(Pid, S),
case match(Pat) of
- [_|_] = L ->
- {reply, L, NS};
+ [_|_] = Recs ->
+ {reply, Recs, NS};
[] ->
- {noreply, NS#state{receivers = dict:append(Pat, From, RD)}}
+ {noreply, queue(Pat, From, NS)}
end;
-handle_call({subscribe, Pat, T}, {Pid, _}, #state{receivers = RD} = S) ->
- NS = add_monitor(Pid, S),
- {reply, match(Pat), NS#state{receivers = dict:append(Pat, [Pid | T], RD)}};
+handle_call({subscribe, Pat, T}, {Pid, _}, S) ->
+ {reply, match(Pat), queue(Pat, [Pid | T], add_monitor(Pid, S))};
handle_call(state, _, S) ->
{reply, S, S};
@@ -332,106 +311,60 @@ insert(true, Rec) ->
B = ets:insert_new(?TABLE, Rec), %% entry inserted?
{B, B}.
-%% add/3
-
+%% add_monitor/2
+%%
%% Only add a single monitor for any given process, since there's no
%% use to more.
-add(true, {_Key, Pid} = Rec, S) ->
- NS = add_monitor(Pid, S),
- {Recvs, RD} = add(Rec, NS),
- {Recvs, S#state{receivers = RD}};
-
-add(false = No, _, S) ->
- {No, S}.
-
-%% add/2
-
-%% Notify processes whose patterns match the inserted key.
-add({_Key, Pid} = Rec, #state{receivers = RD}) ->
- dict:fold(fun(Pt, Ps, A) ->
- add(lists:member(Rec, match(Pt, Pid)), Pt, Ps, Rec, A)
- end,
- {sets:new(), RD},
- RD).
-
-%% add/5
-
-add(true, Pat, Recvs, {_,_} = Rec, {Set, Dict}) ->
- {lists:foldl(fun sets:add_element/2, Set, Recvs),
- remove(fun erlang:is_list/1, Pat, Recvs, Dict)};
-add(false, _, _, _, Acc) ->
- Acc.
+add_monitor(Pid, #state{monitors = Ps} = S) ->
+ case sets:is_element(Pid, Ps) of
+ false ->
+ monitor(process, Pid),
+ S#state{monitors = sets:add_element(Pid, Ps)};
+ true ->
+ S
+ end.
-%% add_monitor/2
-
-add_monitor(Pid, #state{monitors = MS} = S) ->
- add_monitor(sets:is_element(Pid, MS), Pid, S).
-
-%% add_monitor/3
-
-add_monitor(false, Pid, #state{monitors = MS} = S) ->
- monitor(process, Pid),
- S#state{monitors = sets:add_element(Pid, MS)};
-
-add_monitor(true, _, S) ->
- S.
-
-%% delete/2
-
-delete(Recs, #state{receivers = RD}) ->
- lists:foldl(fun(R,S) -> delete(R, RD, S) end, sets:new(), Recs).
+%% notify/3
-%% delete/3
+notify(_, false, S) ->
+ S;
-delete({_Key, Pid} = Rec, RD, Set) ->
- dict:fold(fun(Pt, Ps, S) ->
- delete(lists:member(Rec, match(Pt, Pid)), Rec, Ps, S)
- end,
- Set,
- RD).
+notify(Op, {_,_} = Rec, #state{notify = Dict} = S) ->
+ S#state{notify = maps:fold(fun(P,Rs,D) -> notify(Op, Rec, P, Rs, D) end,
+ Dict,
+ Dict)}.
-%% delete/4
+%% notify/5
-%% Entry matches a pattern ...
-delete(true, Rec, Recvs, Set) ->
- lists:foldl(fun(R,S) -> sets:add_element({R, Rec}, S) end,
- Set,
- Recvs);
-
-%% ... or not.
-delete(false, _, _, Set) ->
- Set.
-
-%% notify/2
-
-notify(false = No, _) ->
- No;
-
-notify(Recvs, remove = Op) ->
- sets:fold(fun({P,R}, N) -> send(P, R, Op), N+1 end, 0, Recvs);
-
-notify(Recvs, {_,_} = Rec) ->
- sets:fold(fun(P,N) -> send(P, Rec, add), N+1 end, 0, Recvs).
+notify(Op, {_, Pid} = Rec, Pat, Rcvrs, Dict) ->
+ case lists:member(Rec, match(Pat, Pid)) of
+ true ->
+ reset(Pat, Dict, [P || P <- Rcvrs, send(P, Op, Rec)]);
+ false ->
+ Dict
+ end.
%% send/3
-%% No processes waiting on remove, by construction: they've either
-%% received notification at add or aren't waiting.
-send([Pid | T], Rec, Op) ->
- Pid ! {T, Op, Rec};
+send([Pid | T], Op, Rec) ->
+ Pid ! {T, Op, Rec},
+ true;
-send({_,_} = From, Rec, add) ->
- gen_server:reply(From, [Rec]).
+%% No processes wait on remove: they receive notification immediately
+%% or at add, by construction.
+send({_,_} = From, add, Rec) ->
+ gen_server:reply(From, [Rec]),
+ false.
%% down/2
-down(Pid, #state{monitors = MS} = S) ->
- NS = flush(Pid, S),
- Recvs = delete(match('_', Pid), NS),
+down(Pid, #state{monitors = Ps} = S) ->
+ Recs = match('_', Pid),
ets:match_delete(?TABLE, {'_', Pid}),
- notify(Recvs, remove),
- NS#state{monitors = sets:del_element(Pid, MS)}.
+ lists:foldl(fun(R,NS) -> notify(remove, R, NS) end,
+ flush(Pid, S#state{monitors = sets:del_element(Pid, Ps)}),
+ Recs).
%% flush/3
@@ -452,16 +385,15 @@ flush(false, _, S) ->
%% flush/2
%% Process has died and should no longer receive messages/replies.
-flush(Pid, #state{receivers = RD} = S)
- when is_pid(Pid) ->
- S#state{receivers = dict:fold(fun(Pt,Ps,D) -> flush(Pid, Pt, Ps, D) end,
- RD,
- RD)}.
+flush(Pid, #state{notify = Dict} = S) ->
+ S#state{notify = maps:fold(fun(P,Rs,D) -> flush(Pid, P, Rs, D) end,
+ Dict,
+ Dict)}.
%% flush/4
-flush(Pid, Pat, Recvs, Dict) ->
- remove(fun(T) -> Pid /= head(T) end, Pat, Recvs, Dict).
+flush(Pid, Pat, Rcvrs, Dict) ->
+ reset(Pat, Dict, [T || T <- Rcvrs, Pid /= head(T)]).
%% head/1
@@ -471,15 +403,18 @@ head([P|_]) ->
head({P,_}) ->
P.
-%% remove/4
+%% reset/3
+
+reset(Key, Map, []) ->
+ maps:remove(Key, Map);
+
+reset(Key, Map, List) ->
+ maps:put(Key, List, Map).
+
+%% queue/3
-remove(Pred, Key, Values, Dict) ->
- case lists:filter(Pred, Values) of
- [] ->
- dict:erase(Key, Dict);
- Rest ->
- dict:store(Key, Rest, Dict)
- end.
+queue(Pat, Rcvr, #state{notify = Dict} = S) ->
+ S#state{notify = maps:put(Pat, [Rcvr | maps:get(Pat, Dict, [])], Dict)}.
%% call/1
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index 7cd4d23402..31dd92f878 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -530,6 +530,13 @@ transition({tc_timeout, T}, S) ->
tc_timeout(T, S),
ok;
+transition({nodeup, Node, _}, S) ->
+ nodeup(Node, S),
+ ok;
+
+transition({nodedown, _Node, _}, _) ->
+ ok;
+
transition(Req, S) ->
unexpected(handle_info, [Req], S),
ok.
@@ -749,6 +756,8 @@ mref(P) ->
init_shared(#state{options = #{use_shared_peers := T},
service_name = Svc}) ->
+ T == false orelse net_kernel:monitor_nodes(true, [{node_type, visible},
+ nodedown_reason]),
notify(T, Svc, {service, self()}).
init_mod(#diameter_app{alias = Alias,
@@ -764,6 +773,11 @@ notify(Share, SvcName, T) ->
%% Test for the empty list for upgrade reasons: there's no
%% diameter_peer:notify/3 in old code.
+nodeup(Node, #state{options = #{share_peers := SP},
+ service_name = SvcName}) ->
+ lists:member(Node, remotes(SP))
+ andalso diameter_peer:notify([Node], SvcName, {service, self()}).
+
remotes(false) ->
[];
@@ -1440,9 +1454,15 @@ is_remote(Pid, T) ->
%% # remote_peer_up/4
%% ---------------------------------------------------------------------------
-remote_peer_up(TPid, Aliases, Caps, #state{options = #{use_shared_peers := T}}
+remote_peer_up(TPid, Aliases, Caps, #state{options = #{use_shared_peers := T},
+ remote = {PeerT, _, _}}
= S) ->
- is_remote(TPid, T) andalso rpu(TPid, Aliases, Caps, S).
+ is_remote(TPid, T)
+ andalso not ets:member(PeerT, TPid)
+ andalso rpu(TPid, Aliases, Caps, S).
+
+%% Notification can be duplicate since remote nodes push and the local
+%% node pulls.
rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) ->
#diameter_service{applications = Apps} = Svc,
@@ -1452,6 +1472,7 @@ rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) ->
rpu(_, [] = No, _, _) ->
No;
+
rpu(TPid, Aliases, Caps, {PeerT, _, _} = RT) ->
monitor(process, TPid),
ets:insert(PeerT, #peer{pid = TPid,
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index a10bf78d6e..f510f40a17 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -93,6 +93,7 @@
caller :: pid() | undefined, %% calling process
handler :: pid(), %% request process
peer :: undefined | {pid(), #diameter_caps{}},
+ caps :: undefined, %% no longer used
packet :: #diameter_packet{} | undefined}). %% of request
%% ---------------------------------------------------------------------------
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index c08e2da672..43623334a9 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -74,7 +74,6 @@
okay := non_neg_integer()}, %% REOPEN -> OKAY
codec :: #{decode_format := none,
string_decode := false,
- strict_arities => diameter:strict_arities(),
strict_mbit := boolean(),
rfc := 3588 | 6733,
ordered_encode := false},
@@ -152,7 +151,6 @@ i({Ack, T, Pid, {Opts,
okay => 3},
Opts)),
codec = maps:with([decode_format,
- strict_arities,
strict_mbit,
string_decode,
rfc,