aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src')
-rw-r--r--lib/diameter/src/Makefile2
-rw-r--r--lib/diameter/src/base/diameter.appup.src11
-rw-r--r--lib/diameter/src/base/diameter.erl13
-rw-r--r--lib/diameter/src/base/diameter_capx.erl45
-rw-r--r--lib/diameter/src/base/diameter_config.erl142
-rw-r--r--lib/diameter/src/base/diameter_lib.erl245
-rw-r--r--lib/diameter/src/base/diameter_peer.erl16
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl42
-rw-r--r--lib/diameter/src/base/diameter_reg.erl2
-rw-r--r--lib/diameter/src/base/diameter_service.erl126
-rw-r--r--lib/diameter/src/base/diameter_stats.erl79
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl16
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl114
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl79
14 files changed, 626 insertions, 306 deletions
diff --git a/lib/diameter/src/Makefile b/lib/diameter/src/Makefile
index df10c33268..c0cf418f06 100644
--- a/lib/diameter/src/Makefile
+++ b/lib/diameter/src/Makefile
@@ -230,7 +230,7 @@ include $(ERL_TOP)/make/otp_release_targets.mk
# Can't $(INSTALL_DIR) more than one directory at a time on Solaris.
release_spec: opt
- for d in bin ebin include src/dict; do \
+ for d in bin ebin examples include src/dict; do \
$(INSTALL_DIR) "$(RELSYSDIR)/$$d"; \
done
$(INSTALL_SCRIPT) $(BINS:%=../bin/%) "$(RELSYSDIR)/bin"
diff --git a/lib/diameter/src/base/diameter.appup.src b/lib/diameter/src/base/diameter.appup.src
index 2ce89579ff..359f434941 100644
--- a/lib/diameter/src/base/diameter.appup.src
+++ b/lib/diameter/src/base/diameter.appup.src
@@ -28,7 +28,13 @@
{"1.2.1", [{restart_application, diameter}]},
{"1.3", [{restart_application, diameter}]}, %% R15B03
{"1.3.1", [{restart_application, diameter}]},
- {"1.4", [{restart_application, diameter}]} %% R16A
+ {"1.4", [{restart_application, diameter}]}, %% R16A
+ {"1.4.1", [{load_module, diameter_reg}, %% R16B
+ {load_module, diameter_stats},
+ {load_module, diameter_service},
+ {load_module, diameter_watchdog},
+ {load_module, diameter_capx},
+ {load_module, diameter}]}
],
[
{"0.9", [{restart_application, diameter}]},
@@ -39,6 +45,7 @@
{"1.2.1", [{restart_application, diameter}]},
{"1.3", [{restart_application, diameter}]},
{"1.3.1", [{restart_application, diameter}]},
- {"1.4", [{restart_application, diameter}]}
+ {"1.4", [{restart_application, diameter}]},
+ {"1.4.1", [{restart_application, diameter}]}
]
}.
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl
index c67fba5f89..57730cad61 100644
--- a/lib/diameter/src/base/diameter.erl
+++ b/lib/diameter/src/base/diameter.erl
@@ -45,6 +45,7 @@
-export_type([evaluable/0,
restriction/0,
+ remotes/0,
sequence/0,
app_alias/0,
service_name/0,
@@ -292,13 +293,20 @@ call(SvcName, App, Message) ->
| [node()]
| evaluable().
+-type remotes()
+ :: boolean()
+ | [node()]
+ | evaluable().
+
%% Options passed to start_service/2
-type service_opt()
:: capability()
| {application, [application_opt()]}
| {restrict_connections, restriction()}
- | {sequence, sequence() | evaluable()}.
+ | {sequence, sequence() | evaluable()}
+ | {share_peers, remotes()}
+ | {use_shared_peers, remotes()}.
-type application_opt()
:: {alias, app_alias()}
@@ -327,7 +335,7 @@ call(SvcName, App, Message) ->
-type transport_opt()
:: {transport_module, atom()}
| {transport_config, any()}
- | {transport_config, any(), non_neg_integer() | infinity}
+ | {transport_config, any(), 'Unsigned32'() | infinity}
| {applications, [app_alias()]}
| {capabilities, [capability()]}
| {capabilities_cb, evaluable()}
@@ -336,6 +344,7 @@ call(SvcName, App, Message) ->
| {length_errors, exit | handle | discard}
| {reconnect_timer, 'Unsigned32'()}
| {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}}
+ | {watchdog_config, [{okay|suspect, non_neg_integer()}]}
| {private, any()}.
%% Predicate passed to remove_transport/2
diff --git a/lib/diameter/src/base/diameter_capx.erl b/lib/diameter/src/base/diameter_capx.erl
index 715b15628c..9a443fead0 100644
--- a/lib/diameter/src/base/diameter_capx.erl
+++ b/lib/diameter/src/base/diameter_capx.erl
@@ -172,7 +172,50 @@ ipaddr(A) ->
bCER(#diameter_caps{} = Rec, Dict) ->
Values = lists:zip(Dict:'#info-'(diameter_base_CER, fields),
tl(tuple_to_list(Rec))),
- Dict:'#new-'(diameter_base_CER, Values).
+ Dict:'#new-'(diameter_base_CER, [{K, map(K, V, Dict)}
+ || {K,V} <- Values]).
+
+%% map/3
+%%
+%% Deal with differerences in common dictionary AVP's to make changes
+%% transparent in service/transport config. In particular, one
+%% annoying difference between RFC 3588 and RFC 6733.
+%%
+%% RFC 6773 changes the definition of Vendor-Specific-Application-Id,
+%% giving Vendor-Id arity 1 instead of 3588's 1*. This causes woe
+%% since the corresponding dictionaries expect different values for a
+%% 'Vendor-Id': a list for 3588, an integer for 6733.
+
+map('Vendor-Specific-Application-Id', L, Dict) ->
+ Rec = Dict:'#new-'('diameter_base_Vendor-Specific-Application-Id', []),
+ Def = Dict:'#get-'('Vendor-Id', Rec),
+ [vsa(V, Def) || V <- L];
+map(_, V, _) ->
+ V.
+
+vsa({_, N, _, _} = Rec, [])
+ when is_integer(N) ->
+ setelement(2, Rec, [N]);
+
+vsa({_, [N], _, _} = Rec, undefined)
+ when is_integer(N) ->
+ setelement(2, Rec, N);
+
+vsa([_|_] = L, Def) ->
+ [vid(T, Def) || T <- L];
+
+vsa(T, _) ->
+ T.
+
+vid({'Vendor-Id' = K, N}, [])
+ when is_integer(N) ->
+ {K, [N]};
+
+vid({'Vendor-Id' = K, [N]}, undefined) ->
+ {K, N};
+
+vid(T, _) ->
+ T.
%% rCER/3
%%
diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl
index 9f73815756..2a145c874b 100644
--- a/lib/diameter/src/base/diameter_config.erl
+++ b/lib/diameter/src/base/diameter_config.erl
@@ -103,6 +103,10 @@
%% Time to lay low before restarting a dead service.
-define(RESTART_SLEEP, 2000).
+%% Test for a valid timeout.
+-define(IS_UINT32(N),
+ is_integer(N) andalso 0 =< N andalso 0 == N bsr 32).
+
%% A minimal diameter_caps for checking for valid capabilities values.
-define(EXAMPLE_CAPS,
#diameter_caps{origin_host = "TheHost",
@@ -490,13 +494,11 @@ stop(SvcName) ->
%% has many.
add(SvcName, Type, Opts) ->
- %% Ensure usable capabilities. diameter_service:merge_service/2
- %% depends on this.
- lists:foreach(fun(Os) ->
- is_list(Os) orelse ?THROW({capabilities, Os}),
- ok = encode_CER(Os)
- end,
- [Os || {capabilities, Os} <- Opts, is_list(Os)]),
+ %% Ensure acceptable transport options. This won't catch all
+ %% possible errors (faulty callbacks for example) but it catches
+ %% many. diameter_service:merge_service/2 depends on usable
+ %% capabilities for example.
+ ok = transport_opts(Opts),
Ref = make_ref(),
T = {Ref, Type, Opts},
@@ -514,6 +516,61 @@ add(SvcName, Type, Opts) ->
No
end.
+transport_opts(Opts) ->
+ lists:foreach(fun(T) -> opt(T) orelse ?THROW({invalid, T}) end, Opts).
+
+opt({transport_module, M}) ->
+ is_atom(M);
+
+opt({transport_config, _, Tmo}) ->
+ ?IS_UINT32(Tmo) orelse Tmo == infinity;
+
+opt({applications, As}) ->
+ is_list(As);
+
+opt({capabilities, Os}) ->
+ is_list(Os) andalso ok == encode_CER(Os);
+
+opt({capx_timeout, Tmo}) ->
+ ?IS_UINT32(Tmo);
+
+opt({length_errors, T}) ->
+ lists:member(T, [exit, handle, discard]);
+
+opt({reconnect_timer, Tmo}) ->
+ ?IS_UINT32(Tmo);
+
+opt({watchdog_timer, {M,F,A}})
+ when is_atom(M), is_atom(F), is_list(A) ->
+ true;
+opt({watchdog_timer, Tmo}) ->
+ ?IS_UINT32(Tmo);
+
+opt({watchdog_config, L}) ->
+ is_list(L) andalso lists:all(fun wdopt/1, L);
+
+%% Options that we can't validate.
+opt({K, _})
+ when K == transport_config;
+ K == capabilities_cb;
+ K == disconnect_cb;
+ K == private ->
+ true;
+
+%% Anything else, which is ignored by us. This makes options sensitive
+%% to spelling mistakes but arbitrary options are passed by some users
+%% as a way to identify transports. (That is, can't just do away with
+%% it.)
+opt(_) ->
+ true.
+
+wdopt({K,N}) ->
+ (K == okay orelse K == suspect) andalso is_integer(N) andalso 0 =< N;
+wdopt(_) ->
+ false.
+
+%% start_transport/2
+
start_transport(SvcName, T) ->
case diameter_service:start_transport(SvcName, T) of
{ok, _Pid} ->
@@ -557,54 +614,69 @@ stop_transport(SvcName, Refs) ->
%% make_config/2
make_config(SvcName, Opts) ->
- Apps = init_apps(Opts),
+ AppOpts = [T || {application, _} = T <- Opts],
+ Apps = init_apps(AppOpts),
+
[] == Apps andalso ?THROW(no_apps),
%% Use the fact that diameter_caps has the same field names as CER.
Fields = ?BASE:'#info-'(diameter_base_CER) -- ['AVP'],
- COpts = [T || {K,_} = T <- Opts, lists:member(K, Fields)],
- Caps = make_caps(#diameter_caps{}, COpts),
+ CapOpts = [T || {K,_} = T <- Opts, lists:member(K, Fields)],
+ Caps = make_caps(#diameter_caps{}, CapOpts),
- ok = encode_CER(COpts),
+ ok = encode_CER(CapOpts),
- Os = split(Opts, fun opt/2, [{false, share_peers},
- {false, use_shared_peers},
- {false, monitor},
- {?NOMASK, sequence},
- {nodes, restrict_connections}]),
- %% share_peers and use_shared_peers are currently undocumented.
+ SvcOpts = make_opts((Opts -- AppOpts) -- CapOpts,
+ [{false, share_peers},
+ {false, use_shared_peers},
+ {false, monitor},
+ {?NOMASK, sequence},
+ {nodes, restrict_connections}]),
#service{name = SvcName,
rec = #diameter_service{applications = Apps,
capabilities = Caps},
- options = Os}.
+ options = SvcOpts}.
+
+make_opts(Opts, Defs) ->
+ Known = [{K, get_opt(K, Opts, D)} || {D,K} <- Defs],
+ Unknown = Opts -- Known,
-split(Opts, F, Defs) ->
- [{K, F(K, get_opt(K, Opts, D))} || {D,K} <- Defs].
+ [] == Unknown orelse ?THROW({invalid, hd(Unknown)}),
+
+ [{K, opt(K,V)} || {K,V} <- Known].
opt(K, false = B)
when K /= sequence ->
B;
opt(K, true = B)
- when K == share_peer;
+ when K == share_peers;
K == use_shared_peers ->
B;
-opt(monitor, P)
- when is_pid(P) ->
- P;
-
opt(restrict_connections, T)
when T == node;
- T == nodes;
- T == [];
- is_atom(hd(T)) ->
+ T == nodes ->
+ T;
+
+opt(K, T)
+ when (K == share_peers
+ orelse K == use_shared_peers
+ orelse K == restrict_connections), ([] == T
+ orelse is_atom(hd(T))) ->
T;
-opt(restrict_connections = K, F) ->
- try diameter_lib:eval(F) of %% no guarantee that it won't fail later
+opt(monitor, P)
+ when is_pid(P) ->
+ P;
+
+opt(K, F)
+ when K == restrict_connections;
+ K == share_peers;
+ K == use_shared_peers ->
+ try diameter_lib:eval(F) of %% but no guarantee that it won't fail later
Nodes when is_list(Nodes) ->
F;
V ->
@@ -629,7 +701,7 @@ opt(K, _) ->
?THROW({value, K}).
sequence({H,N} = T)
- when 0 =< N, N =< 32, 0 =< H, 0 == H bsr N ->
+ when 0 =< N, N =< 32, 0 =< H, 0 == H bsr (32-N) ->
T;
sequence(_) ->
@@ -664,8 +736,8 @@ encode_CER(Opts) ->
init_apps(Opts) ->
lists:foldl(fun app_acc/2, [], lists:reverse(Opts)).
-app_acc({application, Opts}, Acc) ->
- is_list(Opts) orelse ?THROW({application, Opts}),
+app_acc({application, Opts} = T, Acc) ->
+ is_list(Opts) orelse ?THROW(T),
[Dict, Mod] = get_opt([dictionary, module], Opts),
Alias = get_opt(alias, Opts, Dict),
@@ -681,9 +753,7 @@ app_acc({application, Opts}, Acc) ->
mutable = M,
options = [{answer_errors, A},
{request_errors, P}]}
- | Acc];
-app_acc(_, Acc) ->
- Acc.
+ | Acc].
init_mod(#diameter_callback{} = R) ->
init_mod([diameter_callback, R]);
diff --git a/lib/diameter/src/base/diameter_lib.erl b/lib/diameter/src/base/diameter_lib.erl
index 362d593b24..44d81e2778 100644
--- a/lib/diameter/src/base/diameter_lib.erl
+++ b/lib/diameter/src/base/diameter_lib.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -19,77 +19,86 @@
-module(diameter_lib).
--export([report/2, info_report/2,
+-export([info_report/2,
error_report/2,
warning_report/2,
now_diff/1,
time/1,
eval/1,
- ip4address/1,
- ip6address/1,
ipaddr/1,
spawn_opts/2,
wait/1,
fold_tuple/3,
log/4]).
--include("diameter_internal.hrl").
-
%% ---------------------------------------------------------------------------
-%% # info_report(Reason, MFA)
-%%
-%% Input: Reason = Arbitrary term indicating the reason for the report.
-%% MFA = {Module, Function, Args} to report.
-%%
-%% Output: true
+%% # info_report/2
%% ---------------------------------------------------------------------------
-report(Reason, MFA) ->
- info_report(Reason, MFA).
+-spec info_report(Reason :: term(), T :: term())
+ -> true.
-info_report(Reason, MFA) ->
- report(fun error_logger:info_report/1, Reason, MFA),
+info_report(Reason, T) ->
+ report(fun error_logger:info_report/1, Reason, T),
true.
-%%% ---------------------------------------------------------------------------
-%%% # error_report(Reason, MFA)
-%%% # warning_report(Reason, MFA)
-%%%
-%%% Output: false
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # error_report/2
+%% # warning_report/2
+%% ---------------------------------------------------------------------------
+
+-spec error_report(Reason :: term(), T :: term())
+ -> false.
+
+error_report(Reason, T) ->
+ report(fun error_logger:error_report/1, Reason, T).
-error_report(Reason, MFA) ->
- report(fun error_logger:error_report/1, Reason, MFA).
+-spec warning_report(Reason :: term(), T :: term())
+ -> false.
-warning_report(Reason, MFA) ->
- report(fun error_logger:warning_report/1, Reason, MFA).
+warning_report(Reason, T) ->
+ report(fun error_logger:warning_report/1, Reason, T).
-report(Fun, Reason, MFA) ->
- Fun([{why, Reason}, {who, self()}, {what, MFA}]),
+report(Fun, Reason, T) ->
+ Fun([{why, Reason}, {who, self()}, {what, T}]),
false.
-%%% ---------------------------------------------------------------------------
-%%% # now_diff(Time)
-%%%
-%%% Description: Return timer:now_diff(now(), Time) as an {H, M, S, MicroS}
-%%% tuple instead of as integer microseconds.
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # now_diff/1
+%% ---------------------------------------------------------------------------
+
+-spec now_diff(NowT)
+ -> {Hours, Mins, Secs, MicroSecs}
+ when NowT :: {non_neg_integer(), 0..999999, 0..999999},
+ Hours :: non_neg_integer(),
+ Mins :: 0..59,
+ Secs :: 0..59,
+ MicroSecs :: 0..999999.
+
+%% Return timer:now_diff(now(), NowT) as an {H, M, S, MicroS} tuple
+%% instead of as integer microseconds.
now_diff({_,_,_} = Time) ->
- time(timer:now_diff(erlang:now(), Time)).
-
-%%% ---------------------------------------------------------------------------
-%%% # time(Time)
-%%%
-%%% Input: Time = {MegaSec, Sec, MicroSec}
-%%% | MicroSec
-%%%
-%%% Output: {H, M, S, MicroS}
-%%% ---------------------------------------------------------------------------
-
-time({_,_,_} = Time) -> %% time of day
+ time(timer:now_diff(now(), Time)).
+
+%% ---------------------------------------------------------------------------
+%% # time/1
+%%
+%% Return an elapsed time as an {H, M, S, MicroS} tuple.
+%% ---------------------------------------------------------------------------
+
+-spec time(NowT | Diff)
+ -> {Hours, Mins, Secs, MicroSecs}
+ when NowT :: {non_neg_integer(), 0..999999, 0..999999},
+ Diff :: non_neg_integer(),
+ Hours :: non_neg_integer(),
+ Mins :: 0..59,
+ Secs :: 0..59,
+ MicroSecs :: 0..999999.
+
+time({_,_,_} = NowT) -> %% time of day
%% 24 hours = 24*60*60*1000000 = 86400000000 microsec
- time(timer:now_diff(Time, {0,0,0}) rem 86400000000);
+ time(timer:now_diff(NowT, {0,0,0}) rem 86400000000);
time(Micro) -> %% elapsed time
Seconds = Micro div 1000000,
@@ -98,9 +107,21 @@ time(Micro) -> %% elapsed time
S = Seconds rem 60,
{H, M, S, Micro rem 1000000}.
-%%% ---------------------------------------------------------------------------
-%%% # eval(Func)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # eval/1
+%%
+%% Evaluate a function in various forms.
+%% ---------------------------------------------------------------------------
+
+-type f() :: {module(), atom(), list()}
+ | nonempty_maybe_improper_list(fun(), list())
+ | fun().
+
+-spec eval(Fun)
+ -> term()
+ when Fun :: f()
+ | {f()}
+ | nonempty_maybe_improper_list(f(), list()).
eval({M,F,A}) ->
apply(M,F,A);
@@ -120,66 +141,15 @@ eval({F}) ->
eval(F) ->
F().
-%%% ---------------------------------------------------------------------------
-%%% # ip4address(Addr)
-%%%
-%%% Input: string() (eg. "10.0.0.1")
-%%% | list of integer()
-%%% | tuple of integer()
-%%%
-%%% Output: {_,_,_,_} of integer
-%%%
-%%% Exceptions: error: {invalid_address, Addr, erlang:get_stacktrace()}
-%%% ---------------------------------------------------------------------------
-
-ip4address([_,_,_,_] = Addr) -> %% Length 4 string can't be an address.
- ipaddr(list_to_tuple(Addr));
-
-%% Be brutal.
-ip4address(Addr) ->
- try
- {_,_,_,_} = ipaddr(Addr)
- catch
- error: _ ->
- erlang:error({invalid_address, Addr, ?STACK})
- end.
-
-%%% ---------------------------------------------------------------------------
-%%% # ip6address(Addr)
-%%%
-%%% Input: string() (eg. "1080::8:800:200C:417A")
-%%% | list of integer()
-%%% | tuple of integer()
-%%%
-%%% Output: {_,_,_,_,_,_,_,_} of integer
-%%%
-%%% Exceptions: error: {invalid_address, Addr, erlang:get_stacktrace()}
-%%% ---------------------------------------------------------------------------
-
-ip6address([_,_,_,_,_,_,_,_] = Addr) -> %% Length 8 string can't be an address.
- ipaddr(list_to_tuple(Addr));
-
-%% Be brutal.
-ip6address(Addr) ->
- try
- {_,_,_,_,_,_,_,_} = ipaddr(Addr)
- catch
- error: _ ->
- erlang:error({invalid_address, Addr, ?STACK})
- end.
-
-%%% ---------------------------------------------------------------------------
-%%% # ipaddr(Addr)
-%%%
-%%% Input: string() | tuple of integer()
-%%%
-%%% Output: {_,_,_,_} | {_,_,_,_,_,_,_,_}
-%%%
-%%% Exceptions: error: {invalid_address, erlang:get_stacktrace()}
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # ipaddr/1
+%%
+%% Parse an IP address.
+%% ---------------------------------------------------------------------------
--spec ipaddr(string() | tuple())
- -> inet:ip_address().
+-spec ipaddr([byte()] | tuple())
+ -> inet:ip_address()
+ | none().
%% Don't convert lists of integers since a length 8 list like
%% [$1,$0,$.,$0,$.,$0,$.,$1] is ambiguous: is it "10.0.0.1" or
@@ -193,7 +163,7 @@ ipaddr(Addr) ->
ip(Addr)
catch
error: _ ->
- erlang:error({invalid_address, ?STACK})
+ erlang:error({invalid_address, erlang:get_stacktrace()})
end.
%% Already a tuple: ensure non-negative integers of the right size.
@@ -210,11 +180,12 @@ ip(Addr) ->
{ok, A} = inet_parse:address(Addr), %% documented in inet(3)
A.
-%%% ---------------------------------------------------------------------------
-%%% # spawn_opts(Type, Opts)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # spawn_opts/2
+%% ---------------------------------------------------------------------------
-%% TODO: config variables.
+-spec spawn_opts(server|worker, list())
+ -> list().
spawn_opts(server, Opts) ->
opts(75000, Opts);
@@ -224,24 +195,32 @@ spawn_opts(worker, Opts) ->
opts(HeapSize, Opts) ->
[{min_heap_size, HeapSize} | lists:keydelete(min_heap_size, 1, Opts)].
-%%% ---------------------------------------------------------------------------
-%%% # wait(MRefs)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # wait/1
+%% ---------------------------------------------------------------------------
+
+-spec wait([pid()])
+ -> ok.
wait(L) ->
- w([erlang:monitor(process, P) || P <- L]).
+ down([erlang:monitor(process, P) || P <- L]).
-w([]) ->
+down([]) ->
ok;
-w(L) ->
- receive
- {'DOWN', MRef, process, _, _} ->
- w(lists:delete(MRef, L))
- end.
+down([MRef|T]) ->
+ receive {'DOWN', MRef, process, _, _} -> ok end,
+ down(T).
-%%% ---------------------------------------------------------------------------
-%%% # fold_tuple(N, T0, T)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # fold_tuple/3
+%% ---------------------------------------------------------------------------
+
+-spec fold_tuple(N, T0, T)
+ -> tuple()
+ when N :: pos_integer(),
+ T0 :: tuple(),
+ T :: tuple()
+ | undefined.
%% Replace fields in T0 by those of T starting at index N, unless the
%% new value is 'undefined'.
@@ -262,11 +241,11 @@ ft(undefined, {_, T}) ->
ft(Value, {Idx, T}) ->
setelement(Idx, T, Value).
-%%% ----------------------------------------------------------
-%%% # log(Slogan, Mod, Line, Details)
-%%%
-%%% Called to have something to trace on for happenings of interest.
-%%% ----------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # log/4
+%%
+%% Called to have something to trace on for happenings of interest.
+%% ---------------------------------------------------------------------------
-log(_, _, _, _) ->
+log(_Slogan, _Mod, _Line, _Details) ->
ok.
diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl
index 130bedda84..0d2efd4d1f 100644
--- a/lib/diameter/src/base/diameter_peer.erl
+++ b/lib/diameter/src/base/diameter_peer.erl
@@ -24,14 +24,15 @@
%% Interface towards transport modules ...
-export([recv/2,
up/1,
- up/2]).
+ up/2,
+ up/3]).
%% ... and the stack.
-export([start/1,
send/2,
close/1,
abort/1,
- notify/2]).
+ notify/3]).
%% Server start.
-export([start_link/0]).
@@ -63,11 +64,11 @@
-define(DEFAULT_TTMO, infinity).
%%% ---------------------------------------------------------------------------
-%%% # notify/2
+%%% # notify/3
%%% ---------------------------------------------------------------------------
-notify(SvcName, T) ->
- rpc:abcast(nodes(), ?SERVER, {notify, SvcName, T}).
+notify(Nodes, SvcName, T) ->
+ rpc:abcast(Nodes, ?SERVER, {notify, SvcName, T}).
%%% ---------------------------------------------------------------------------
%%% # start/1
@@ -180,7 +181,7 @@ start(Mod, Args) ->
apply(Mod, start, Args).
%%% ---------------------------------------------------------------------------
-%%% # up/[12]
+%%% # up/1-3
%%% ---------------------------------------------------------------------------
up(Pid) -> %% accepting transport
@@ -189,6 +190,9 @@ up(Pid) -> %% accepting transport
up(Pid, Remote) -> %% connecting transport
ifc_send(Pid, {self(), connected, Remote}).
+up(Pid, Remote, LAddrs) -> %% connecting transport
+ ifc_send(Pid, {self(), connected, Remote, LAddrs}).
+
%%% ---------------------------------------------------------------------------
%%% # recv/2
%%% ---------------------------------------------------------------------------
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 66342f7b62..6be4259510 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -198,6 +198,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {Mask,
OnLengthErr = proplists:get_value(length_errors, Opts, exit),
lists:member(OnLengthErr, [exit, handle, discard])
orelse ?ERROR({invalid, {length_errors, OnLengthErr}}),
+ %% Error checking is for configuration added in old code.
{TPid, Addrs} = start_transport(T, Rest, Svc),
@@ -212,9 +213,6 @@ i({Ack, WPid, {M, Ref} = T, Opts, {Mask,
%% transports on the same service can use different local addresses.
%% The local addresses are put into Host-IP-Address avps here when
%% sending capabilities exchange messages.
-%%
-%% Invalid transport config may cause us to crash but note that the
-%% watchdog start (start/2) succeeds regardless.
%% Wait for the caller to have a monitor to avoid a race with our
%% death. (Since the exit reason is used in diameter_service.)
@@ -353,10 +351,17 @@ transition({diameter, {TPid, connected, Remote}},
mode = M}
= S) ->
{'Wait-Conn-Ack', _} = PS, %% assert
- connect = M, %%
+ connect = M, %%
keep_transport(TPid),
send_CER(S#state{mode = {M, Remote}});
+transition({diameter, {TPid, connected, Remote, LAddrs}},
+ #state{transport = TPid,
+ service = Svc}
+ = S) ->
+ transition({diameter, {TPid, connected, Remote}},
+ S#state{service = readdr(Svc, LAddrs)});
+
%% Connection from peer.
transition({diameter, {TPid, connected}},
#state{transport = TPid,
@@ -365,7 +370,7 @@ transition({diameter, {TPid, connected}},
parent = Pid}
= S) ->
{'Wait-Conn-Ack', Tmo} = PS, %% assert
- accept = M, %%
+ accept = M, %%
keep_transport(TPid),
Pid ! {accepted, self()},
start_timer(Tmo, S#state{state = recv_CER});
@@ -378,6 +383,8 @@ transition({diameter, {_, connected}}, _) ->
{stop, connection_timeout};
transition({diameter, {_, connected, _}}, _) ->
{stop, connection_timeout};
+transition({diameter, {_, connected, _, _}}, _) ->
+ {stop, connection_timeout};
%% Connection has timed out: start an alternate.
transition({connection_timeout = T, TPid},
@@ -846,8 +853,12 @@ a('DPR', #diameter_caps{origin_host = {Host, _},
%% recv_CER/2
recv_CER(CER, #state{service = Svc, dictionary = Dict}) ->
- {ok, T} = diameter_capx:recv_CER(CER, Svc, Dict),
- T.
+ case diameter_capx:recv_CER(CER, Svc, Dict) of
+ {ok, T} ->
+ T;
+ {error, Reason} ->
+ close({'CER', CER, Svc, Dict, Reason})
+ end.
%% handle_CEA/1
@@ -907,8 +918,12 @@ recv_CEA(#diameter_packet{header = #diameter_header{version
errors = []},
#state{service = Svc,
dictionary = Dict}) ->
- {ok, T} = diameter_capx:recv_CEA(CEA, Svc, Dict),
- T;
+ case diameter_capx:recv_CEA(CEA, Svc, Dict) of
+ {ok, T} ->
+ T;
+ {error, Reason} ->
+ close({'CEA', CEA, Svc, Dict, Reason})
+ end;
recv_CEA(Pkt, S) ->
close({'CEA', caps(S), Pkt}).
@@ -987,8 +1002,17 @@ capz(#diameter_caps{} = L, #diameter_caps{} = R) ->
%% close/1
close(Reason) ->
+ report(Reason),
throw({?MODULE, close, Reason}).
+%% Could possibly log more here.
+report({M, _, _, _, _} = T)
+ when M == 'CER';
+ M == 'CEA' ->
+ diameter_lib:error_report(failure, T);
+report(_) ->
+ ok.
+
%% dwa/1
dwa(#diameter_caps{origin_host = OH,
diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl
index ac58e4bf5b..3197c1aee1 100644
--- a/lib/diameter/src/base/diameter_reg.erl
+++ b/lib/diameter/src/base/diameter_reg.erl
@@ -138,7 +138,7 @@ del(T) ->
%% associations removed.)
%% ===========================================================================
--spec match(tuple())
+-spec match(any())
-> [{term(), pid()}].
match(Pat) ->
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index f1342df16c..112e83476d 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -125,9 +125,9 @@
monitor = false :: false | pid(), %% process to die with
options
:: [{sequence, diameter:sequence()} %% sequence mask
- | {restrict_connections, diameter:restriction()}
- | {share_peers, boolean()} %% broadcast peers to remote nodes?
- | {use_shared_peers, boolean()}]}).%% use broadcasted peers?
+ | {share_peers, diameter:remotes()} %% broadcast to
+ | {use_shared_peers, diameter:remotes()} %% use from
+ | {restrict_connections, diameter:restriction()}]}).
%% shared_peers reflects the peers broadcast from remote nodes.
%% Record representing an RFC 3539 watchdog process implemented by
@@ -681,11 +681,9 @@ mref(false = No) ->
mref(P) ->
erlang:monitor(process, P).
-init_shared(#state{options = [_, _, {_, true} | _],
+init_shared(#state{options = [_, _, {_,T} | _],
service_name = Svc}) ->
- diameter_peer:notify(Svc, {service, self()});
-init_shared(#state{options = [_, _, {_, false} | _]}) ->
- ok.
+ notify(T, Svc, {service, self()}).
init_mod(#diameter_app{alias = Alias,
init_state = S}) ->
@@ -698,6 +696,37 @@ get_value(Key, Vs) ->
{_, V} = lists:keyfind(Key, 1, Vs),
V.
+notify(Share, SvcName, T) ->
+ Nodes = remotes(Share),
+ [] /= Nodes andalso diameter_peer:notify(Nodes, SvcName, T).
+%% Test for the empty list for upgrade reasons: there's no
+%% diameter_peer:notify/3 in old code so no call means no load order
+%% requirement.
+
+remotes(false) ->
+ [];
+
+remotes(true) ->
+ nodes();
+
+remotes(Nodes)
+ when is_atom(hd(Nodes));
+ Nodes == [] ->
+ Nodes;
+
+remotes(F) ->
+ try diameter_lib:eval(F) of
+ L when is_list(L) ->
+ L;
+ T ->
+ diameter_lib:error_report({invalid_return, T}, F),
+ []
+ catch
+ E:R ->
+ diameter_lib:error_report({failure, {E, R, ?STACK}}, F),
+ []
+ end.
+
%% ---------------------------------------------------------------------------
%% # start/3
%% ---------------------------------------------------------------------------
@@ -832,17 +861,21 @@ watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) ->
%% Watchdog has an unresponsive connection.
watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) ->
#watchdog{peer = TPid} = Wd, %% assert
- connection_down(Wd, To, State);
+ watchdog_down(Wd, To, State);
%% Watchdog has lost its connection.
watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) ->
close(Wd, S),
- connection_down(Wd, To, S),
+ watchdog_down(Wd, To, S),
ets:delete(PeerT, TPid);
watchdog(_, [], _, _, _, _) ->
ok.
+watchdog_down(Wd, To, #state{watchdogT = WatchdogT} = S) ->
+ insert(WatchdogT, Wd#watchdog{state = To}),
+ connection_down(Wd, To, S).
+
%% ---------------------------------------------------------------------------
%% # connection_up/3
%% ---------------------------------------------------------------------------
@@ -1000,21 +1033,17 @@ connection_down(#watchdog{state = ?WD_OKAY,
remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
diameter_traffic:peer_down(TPid);
-connection_down(#watchdog{}, #peer{}, _) ->
- ok;
-
-connection_down(#watchdog{state = WS,
+connection_down(#watchdog{state = ?WD_OKAY,
peer = TPid}
= Wd,
To,
- #state{watchdogT = WatchdogT,
- peerT = PeerT}
+ #state{peerT = PeerT}
= S)
when is_atom(To) ->
- insert(WatchdogT, Wd#watchdog{state = To}),
- ?WD_OKAY == WS
- andalso
- connection_down(Wd, fetch(PeerT, TPid), S).
+ connection_down(Wd, #peer{} = fetch(PeerT, TPid), S);
+
+connection_down(#watchdog{}, _, _) ->
+ ok.
remove_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps).
@@ -1233,12 +1262,12 @@ report_status(Status,
peer = TPid,
type = Type,
options = Opts},
- #peer{apps = [_|_] = As,
+ #peer{apps = [_|_] = Apps,
caps = Caps},
#state{service_name = SvcName}
= S,
Extra) ->
- share_peer(Status, Caps, As, TPid, S),
+ share_peer(Status, Caps, Apps, TPid, S),
Info = [Status, Ref, {TPid, Caps}, {type(Type), Opts} | Extra],
send_event(SvcName, list_to_tuple(Info)).
@@ -1255,9 +1284,9 @@ send_event(#diameter_event{service = SvcName} = E) ->
%% # share_peer/5
%% ---------------------------------------------------------------------------
-share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _],
- service_name = Svc}) ->
- diameter_peer:notify(Svc, {peer, TPid, Aliases, Caps});
+share_peer(up, Caps, Apps, TPid, #state{options = [_, {_,T} | _],
+ service_name = Svc}) ->
+ notify(T, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps});
share_peer(_, _, _, _, _) ->
ok.
@@ -1266,34 +1295,34 @@ share_peer(_, _, _, _, _) ->
%% # share_peers/2
%% ---------------------------------------------------------------------------
-share_peers(Pid, #state{options = [_, {_, true} | _],
- local_peers = PDict}) ->
- ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict);
-
-share_peers(_, _) ->
- ok.
+share_peers(Pid, #state{options = [_, {_,T} | _], local_peers = PDict}) ->
+ is_remote(Pid, T)
+ andalso ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict).
sp(Pid, Alias, Peers) ->
lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers).
+is_remote(Pid, T) ->
+ Node = node(Pid),
+ Node /= node() andalso lists:member(Node, remotes(T)).
+
%% ---------------------------------------------------------------------------
%% # remote_peer_up/4
%% ---------------------------------------------------------------------------
-remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _],
- service = Svc,
- shared_peers = PDict}) ->
+remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) ->
+ is_remote(Pid, T)
+ andalso rpu(Pid, Aliases, Caps, S).
+
+rpu(Pid, Aliases, Caps, #state{service = Svc, shared_peers = PDict}) ->
#diameter_service{applications = Apps} = Svc,
Key = #diameter_app.alias,
- As = lists:filter(fun(A) -> lists:keymember(A, Key, Apps) end, Aliases),
- rpu(Pid, Caps, PDict, As);
-
-remote_peer_up(_, _, _, #state{options = [_, _, {_, false} | _]}) ->
- ok.
+ F = fun(A) -> lists:keymember(A, Key, Apps) end,
+ rpu(Pid, lists:filter(F, Aliases), Caps, PDict);
-rpu(_, _, PDict, []) ->
- PDict;
-rpu(Pid, Caps, PDict, Aliases) ->
+rpu(_, [] = No, _, _) ->
+ No;
+rpu(Pid, Aliases, Caps, PDict) ->
erlang:monitor(process, Pid),
T = {Pid, Caps},
lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases).
@@ -1302,8 +1331,7 @@ rpu(Pid, Caps, PDict, Aliases) ->
%% # remote_peer_down/2
%% ---------------------------------------------------------------------------
-remote_peer_down(Pid, #state{options = [_, _, {_, true} | _],
- shared_peers = PDict}) ->
+remote_peer_down(Pid, #state{shared_peers = PDict}) ->
lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)).
rpd(Pid, Alias, PDict) ->
@@ -1626,16 +1654,10 @@ info_stats(#state{watchdogT = WatchdogT}) ->
info_transport(S) ->
PeerD = peer_dict(S, config_dict(S)),
- RefsD = dict:map(fun(_, Ls) -> [P || L <- Ls, {peer, {P,_}} <- L] end,
- PeerD),
- Refs = lists:append(dict:fold(fun(R, Ps, A) -> [[R|Ps] | A] end,
- [],
- RefsD)),
- Stats = diameter_stats:read(Refs),
+ Stats = diameter_stats:sum(dict:fetch_keys(PeerD)),
dict:fold(fun(R, Ls, A) ->
- Ps = dict:fetch(R, RefsD),
- [[{ref, R} | transport(Ls)] ++ [stats([R|Ps], Stats)]
- | A]
+ Cs = proplists:get_value(R, Stats, []),
+ [[{ref, R} | transport(Ls)] ++ [{statistics, Cs}] | A]
end,
[],
PeerD).
diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl
index 8fd5ded300..b68d4af11f 100644
--- a/lib/diameter/src/base/diameter_stats.erl
+++ b/lib/diameter/src/base/diameter_stats.erl
@@ -28,6 +28,7 @@
-export([reg/2, reg/1,
incr/3, incr/1,
read/1,
+ sum/1,
flush/1]).
%% supervisor callback
@@ -77,10 +78,14 @@
reg(Pid, Ref)
when is_pid(Pid) ->
- call({reg, Pid, Ref}).
+ try
+ call({reg, Pid, Ref})
+ catch
+ exit: _ -> false
+ end.
-spec reg(ref())
- -> true.
+ -> boolean().
reg(Ref) ->
reg(self(), Ref).
@@ -111,11 +116,19 @@ incr(Ctr) ->
%% Retrieve counters for the specified contributors.
%% ---------------------------------------------------------------------------
+%% Read in the server process to ensure that counters for a dying
+%% contributor aren't folded concurrently with select.
+
-spec read([ref()])
-> [{ref(), [{counter(), integer()}]}].
-read(Refs) ->
- read(Refs, false).
+read(Refs)
+ when is_list(Refs) ->
+ try call({read, Refs, false}) of
+ L -> to_refdict(L)
+ catch
+ exit: _ -> []
+ end.
read(Refs, B) ->
MatchSpec = [{{{'_', '$1'}, '_'},
@@ -124,11 +137,52 @@ read(Refs, B) ->
['$_']}],
L = ets:select(?TABLE, MatchSpec),
B andalso delete(L),
+ L.
+
+to_refdict(L) ->
lists:foldl(fun({{C,R}, N}, D) -> orddict:append(R, {C,N}, D) end,
orddict:new(),
L).
%% ---------------------------------------------------------------------------
+%% # sum(Refs)
+%%
+%% Retrieve counters summed over all contributors for each term.
+%% ---------------------------------------------------------------------------
+
+-spec sum([ref()])
+ -> [{ref(), [{counter(), integer()}]}].
+
+sum(Refs)
+ when is_list(Refs) ->
+ try call({read, Refs}) of
+ L -> [{R, to_ctrdict(Cs)} || {R, [_|_] = Cs} <- L]
+ catch
+ exit: _ -> []
+ end.
+
+read_refs(Refs) ->
+ [{R, readr(R)} || R <- Refs].
+
+readr(Ref) ->
+ MatchSpec = [{{{'_', '$1'}, '_'},
+ [?ORCOND([{'=:=', '$1', {const, R}}
+ || R <- [Ref | pids(Ref)]])],
+ ['$_']}],
+ ets:select(?TABLE, MatchSpec).
+
+pids(Ref) ->
+ MatchSpec = [{{'$1', '$2'},
+ [{'=:=', '$2', {const, Ref}}],
+ ['$1']}],
+ ets:select(?TABLE, MatchSpec).
+
+to_ctrdict(L) ->
+ lists:foldl(fun({{C,_}, N}, D) -> orddict:update_counter(C, N, D) end,
+ orddict:new(),
+ L).
+
+%% ---------------------------------------------------------------------------
%% # flush(Refs)
%%
%% Retrieve and delete statistics for the specified contributors.
@@ -138,11 +192,10 @@ read(Refs, B) ->
-> [{ref(), {counter(), integer()}}].
flush(Refs) ->
- try
- call({flush, Refs})
+ try call({read, Refs, true}) of
+ L -> to_refdict(L)
catch
- exit: _ ->
- []
+ exit: _ -> []
end.
%% ===========================================================================
@@ -186,8 +239,14 @@ handle_call({reg, Pid, Ref}, _From, State) ->
B andalso erlang:monitor(process, Pid),
{reply, B, State};
-handle_call({flush, Refs}, _From, State) ->
- {reply, read(Refs, true), State};
+handle_call({read, Refs, Del}, _From, State) ->
+ {reply, read(Refs, Del), State};
+
+handle_call({read, Refs}, _, State) ->
+ {reply, read_refs(Refs), State};
+
+handle_call({flush, Refs}, _From, State) -> %% from old code
+ {reply, to_refdict(read(Refs, true)), State};
handle_call(Req, From, State) ->
?UNEXPECTED([Req, From]),
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index f527f7c754..25b902e3f2 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -1479,12 +1479,14 @@ send({TPid, Pkt, #request{handler = Pid} = Req, SvcName, Timeout, TRef}) ->
Req#request{handler = self()},
SvcName,
Timeout),
- Pid ! reref(receive T -> T end, Ref, TRef).
-
-reref({T, Ref, R}, Ref, TRef) ->
- {T, TRef, R};
-reref(T, _, _) ->
- T.
+ receive
+ {answer, _, _, _, _} = A ->
+ Pid ! A;
+ {failover = T, Ref} ->
+ Pid ! {T, TRef};
+ T ->
+ exit({timeout, Ref, TPid} = T)
+ end.
%% send/2
@@ -1559,7 +1561,7 @@ resend_request(Pkt0,
store_request(TPid, Bin, Req, Timeout) ->
Seqs = diameter_codec:sequence_numbers(Bin),
- TRef = erlang:start_timer(Timeout, self(), timeout),
+ TRef = erlang:start_timer(Timeout, self(), TPid),
ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}),
ets:member(?REQUEST_TABLE, TPid)
orelse (self() ! {failover, TRef}), %% failover/1 may have missed
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index 073a415d10..41c493ff20 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -47,6 +47,14 @@
-define(BASE, ?DIAMETER_DICT_COMMON).
+-define(IS_NATURAL(N), (is_integer(N) andalso 0 =< N)).
+
+-define(CHOOSE(B,T,F), if (B) -> T; true -> F end).
+
+-record(config,
+ {suspect = 1 :: non_neg_integer(), %% OKAY -> SUSPECT
+ okay = 3 :: non_neg_integer()}). %% REOPEN -> OKAY
+
-record(watchdog,
{%% PCB - Peer Control Block; see RFC 3539, Appendix A
status = initial :: initial | okay | suspect | down | reopen,
@@ -54,7 +62,8 @@
tw :: 6000..16#FFFFFFFF | {module(), atom(), list()},
%% {M,F,A} -> integer() >= 0
num_dwa = 0 :: -1 | non_neg_integer(),
- %% number of DWAs received during reopen
+ %% number of DWAs received in reopen,
+ %% or number of timeouts before okay -> suspect
%% end PCB
parent = self() :: pid(), %% service process
transport :: pid() | undefined, %% peer_fsm process
@@ -64,7 +73,8 @@
%% term passed into diameter_service with incoming message
sequence :: diameter:sequence(), %% mask
restrict :: {diameter:restriction(), boolean()},
- shutdown = false :: boolean()}).
+ shutdown = false :: boolean(),
+ config :: #config{}}).
%% ---------------------------------------------------------------------------
%% start/2
@@ -129,7 +139,8 @@ i({Ack, T, Pid, {RecvData,
receive_data = RecvData,
dictionary = Dict0,
sequence = Mask,
- restrict = {Restrict, lists:member(node(), Nodes)}}.
+ restrict = {Restrict, lists:member(node(), Nodes)},
+ config = config(Opts)}.
wait(Ref, Pid) ->
receive
@@ -139,6 +150,27 @@ wait(Ref, Pid) ->
exit({shutdown, D})
end.
+%% config/1
+%%
+%% Could also configure counts for SUSPECT to DOWN and REOPEN to DOWN,
+%% but don't.
+
+config(Opts) ->
+ Config = proplists:get_value(watchdog_config, Opts, []),
+ is_list(Config) orelse config_error({watchdog_config, Config}),
+ lists:foldl(fun config/2, #config{}, Config). %% ^ added in old code
+
+config({suspect, N}, Rec)
+ when ?IS_NATURAL(N) ->
+ Rec#config{suspect = N};
+
+config({okay, N}, Rec)
+ when ?IS_NATURAL(N) ->
+ Rec#config{okay = N};
+
+config(T, _) -> %% added in old code
+ config_error(T).
+
%% start/5
start(T, Opts, Mask, Nodes, Dict0, Svc) ->
@@ -193,7 +225,8 @@ dict0(_, _, Acc) ->
Acc.
config_error(T) ->
- ?ERROR({configuration_error, T}).
+ diameter_lib:error_report(configuration_error, T),
+ exit({shutdown, {configuration_error, T}}).
%% handle_call/3
@@ -219,6 +252,17 @@ handle_info(T, #watchdog{} = State) ->
?LOG(stop, T),
event(T, State, State#watchdog{status = down}),
{stop, {shutdown, T}, State}
+ end;
+
+handle_info(T, State) -> %% started in old code
+ handle_info(T, upgrade(State)).
+
+upgrade(State) ->
+ case erlang:append_element(State, #config{}) of
+ #watchdog{status = okay, config = #config{suspect = OS}} = S ->
+ S#watchdog{num_dwa = OS};
+ #watchdog{} = S ->
+ S
end.
close({'DOWN', _, process, TPid, {shutdown, Reason}},
@@ -331,11 +375,13 @@ transition({accepted = T, TPid}, #watchdog{transport = TPid,
transition({open, TPid, Hosts, _} = Open,
#watchdog{transport = TPid,
status = initial,
- restrict = {_, R}}
+ restrict = {_,R},
+ config = #config{suspect = OS}}
= S) ->
case okay(getr(restart), Hosts, R) of
okay ->
- set_watchdog(S#watchdog{status = okay});
+ set_watchdog(S#watchdog{status = okay,
+ num_dwa = OS});
reopen ->
transition(Open, S#watchdog{status = down})
end;
@@ -347,15 +393,22 @@ transition({open, TPid, Hosts, _} = Open,
transition({open = Key, TPid, _Hosts, T},
#watchdog{transport = TPid,
- status = down}
+ status = down,
+ config = #config{suspect = OS,
+ okay = RO}}
= S) ->
- %% Store the info we need to notify the parent to reopen the
- %% connection after the requisite DWA's are received, at which
- %% time we eraser(open). The reopen message is a later addition,
- %% to communicate the new capabilities as soon as they're known.
- putr(Key, {TPid, T}),
- set_watchdog(send_watchdog(S#watchdog{status = reopen,
- num_dwa = 0}));
+ case RO of
+ 0 -> %% non-standard: skip REOPEN
+ set_watchdog(S#watchdog{status = okay,
+ num_dwa = OS});
+ _ ->
+ %% Store the info we need to notify the parent to reopen
+ %% the connection after the requisite DWA's are received,
+ %% at which time we eraser(open).
+ putr(Key, {TPid, T}),
+ set_watchdog(send_watchdog(S#watchdog{status = reopen,
+ num_dwa = 0}))
+ end;
%% OKAY Connection down CloseConnection()
%% Failover()
@@ -374,7 +427,7 @@ transition({'DOWN', _, process, TPid, _Reason},
#watchdog{transport = TPid,
status = T}
= S) ->
- set_watchdog(S#watchdog{status = case T of initial -> T; _ -> down end,
+ set_watchdog(S#watchdog{status = ?CHOOSE(initial == T, T, down),
pending = false,
transport = undefined});
@@ -553,22 +606,27 @@ rcv(_, #watchdog{status = okay} = S) ->
%% SUSPECT Receive non-DWA Failback()
%% SetWatchdog() OKAY
-rcv('DWA', #watchdog{status = suspect} = S) ->
+rcv('DWA', #watchdog{status = suspect, config = #config{suspect = OS}} = S) ->
set_watchdog(S#watchdog{status = okay,
+ num_dwa = OS,
pending = false});
-rcv(_, #watchdog{status = suspect} = S) ->
- set_watchdog(S#watchdog{status = okay});
+rcv(_, #watchdog{status = suspect, config = #config{suspect = OS}} = S) ->
+ set_watchdog(S#watchdog{status = okay,
+ num_dwa = OS});
%% REOPEN Receive DWA & Pending = FALSE
%% NumDWA == 2 NumDWA++
%% Failback() OKAY
rcv('DWA', #watchdog{status = reopen,
- num_dwa = 2 = N}
- = S) ->
+ num_dwa = N,
+ config = #config{suspect = OS,
+ okay = RO}}
+ = S)
+ when N+1 == RO ->
S#watchdog{status = okay,
- num_dwa = N+1,
+ num_dwa = OS,
pending = false};
%% REOPEN Receive DWA & Pending = FALSE
@@ -607,9 +665,17 @@ timeout(#watchdog{status = T,
%% Pending SetWatchdog() SUSPECT
timeout(#watchdog{status = okay,
- pending = true}
- = S) ->
- S#watchdog{status = suspect};
+ pending = true,
+ num_dwa = N}
+ = S) ->
+ case N of
+ 1 ->
+ S#watchdog{status = suspect};
+ 0 -> %% non-standard: never move to suspect
+ S;
+ N -> %% non-standard: more timeouts before moving
+ S#watchdog{num_dwa = N-1}
+ end;
%% SUSPECT Timer expires CloseConnection()
%% SetWatchdog() DOWN
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 132088b514..cbbba714ac 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -100,6 +100,18 @@
%% # start/3
%% ---------------------------------------------------------------------------
+-spec start({accept, Ref}, Svc, [Opt])
+ -> {ok, pid(), [inet:ip_address()]}
+ when Ref :: diameter:transport_ref(),
+ Svc :: #diameter_service{},
+ Opt :: diameter:transport_opt();
+ ({connect, Ref}, Svc, [Opt])
+ -> {ok, pid(), [inet:ip_address()]}
+ | {ok, pid()}
+ when Ref :: diameter:transport_ref(),
+ Svc :: #diameter_service{},
+ Opt :: diameter:transport_opt().
+
start({T, Ref}, #diameter_service{capabilities = Caps}, Opts) ->
diameter_tcp_sup:start(), %% start tcp supervisors on demand
{Mod, Rest} = split(Opts),
@@ -172,7 +184,7 @@ i({T, Ref, Mod, Pid, Opts, Addrs})
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
- Sock = i(T, Ref, Mod, Pid, SslOpts, Rest, Addrs),
+ Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs),
MPid ! {stop, self()}, %% tell the monitor to die
M = if SslOpts -> ssl; true -> Mod end,
setopts(M, Sock),
@@ -199,14 +211,21 @@ i(#monitor{parent = Pid, transport = TPid} = S) ->
i({listen, LRef, APid, {Mod, Opts, Addrs}}) ->
{[LA, LP], Rest} = proplists:split(Opts, [ip, port]),
- LAddr = get_addr(LA, Addrs),
+ LAddrOpt = get_addr(LA, Addrs),
LPort = get_port(LP),
- {ok, LSock} = Mod:listen(LPort, gen_opts(LAddr, Rest)),
+ {ok, LSock} = Mod:listen(LPort, gen_opts(LAddrOpt, Rest)),
+ LAddr = laddr(LAddrOpt, Mod, LSock),
true = diameter_reg:add_new({?MODULE, listener, {LRef, {LAddr, LSock}}}),
proc_lib:init_ack({ok, self(), {LAddr, LSock}}),
erlang:monitor(process, APid),
start_timer(#listener{socket = LSock}).
+laddr([], Mod, Sock) ->
+ {ok, {Addr, _Port}} = sockname(Mod, Sock),
+ Addr;
+laddr([{ip, Addr}], _, _) ->
+ Addr.
+
own(Opts) ->
{Own, Rest} = proplists:split(Opts, [fragment_timer]),
{lists:append(Own), Rest}.
@@ -225,17 +244,19 @@ ssl_opts([{ssl_options, Opts}])
ssl_opts(L) ->
?ERROR({ssl_options, L}).
-%% i/7
+%% init/7
%% Establish a TLS connection before capabilities exchange ...
-i(Type, Ref, Mod, Pid, true, Opts, Addrs) ->
- i(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs);
+init(Type, Ref, Mod, Pid, true, Opts, Addrs) ->
+ init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs);
%% ... or not.
-i(Type, Ref, Mod, Pid, _, Opts, Addrs) ->
- i(Type, Ref, Mod, Pid, Opts, Addrs).
+init(Type, Ref, Mod, Pid, _, Opts, Addrs) ->
+ init(Type, Ref, Mod, Pid, Opts, Addrs).
+
+%% init/6
-i(accept = T, Ref, Mod, Pid, Opts, Addrs) ->
+init(accept = T, Ref, Mod, Pid, Opts, Addrs) ->
{LAddr, LSock} = listener(Ref, {Mod, Opts, Addrs}),
proc_lib:init_ack({ok, self(), [LAddr]}),
Sock = ok(accept(Mod, LSock)),
@@ -243,17 +264,28 @@ i(accept = T, Ref, Mod, Pid, Opts, Addrs) ->
diameter_peer:up(Pid),
Sock;
-i(connect = T, Ref, Mod, Pid, Opts, Addrs) ->
+init(connect = T, Ref, Mod, Pid, Opts, Addrs) ->
{[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]),
- LAddr = get_addr(LA, Addrs),
- RAddr = get_addr(RA, []),
+ LAddrOpt = get_addr(LA, Addrs),
+ RAddr = get_addr(RA),
RPort = get_port(RP),
- proc_lib:init_ack({ok, self(), [LAddr]}),
- Sock = ok(connect(Mod, RAddr, RPort, gen_opts(LAddr, Rest))),
+ proc_lib:init_ack(init_rc(LAddrOpt)),
+ Sock = ok(connect(Mod, RAddr, RPort, gen_opts(LAddrOpt, Rest))),
publish(Mod, T, Ref, Sock),
- diameter_peer:up(Pid, {RAddr, RPort}),
+ up(Pid, {RAddr, RPort}, LAddrOpt, Mod, Sock),
Sock.
+init_rc([{ip, Addr}]) ->
+ {ok, self(), [Addr]};
+init_rc([]) ->
+ {ok, self()}.
+
+up(Pid, Remote, [{ip, _Addr}], _, _) ->
+ diameter_peer:up(Pid, Remote);
+up(Pid, Remote, [], Mod, Sock) ->
+ {Addr, _Port} = ok(sockname(Mod, Sock)),
+ diameter_peer:up(Pid, Remote, [Addr]).
+
publish(Mod, T, Ref, Sock) ->
true = diameter_reg:add_new({?MODULE, T, {Ref, Sock}}),
putr(?INFO_KEY, {Mod, Sock}). %% for info/1
@@ -281,10 +313,17 @@ l([], LRef, T) ->
{ok, _, AS} = diameter_tcp_sup:start_child({listen, LRef, self(), T}),
AS.
+%% get_addr/1
+
+get_addr(As) ->
+ diameter_lib:ipaddr(addr(As, [])).
+
%% get_addr/2
+get_addr([], []) ->
+ [];
get_addr(As, Def) ->
- diameter_lib:ipaddr(addr(As, Def)).
+ [{ip, diameter_lib:ipaddr(addr(As, Def))}].
%% Take the first address from the service if several are unspecified.
addr([], [Addr | _]) ->
@@ -305,14 +344,10 @@ get_port(Ps) ->
%% gen_opts/2
-gen_opts(LAddr, Opts) ->
+gen_opts(LAddrOpt, Opts) ->
{L,_} = proplists:split(Opts, [binary, packet, active]),
[[],[],[]] == L orelse ?ERROR({reserved_options, Opts}),
- [binary,
- {packet, 0},
- {active, once},
- {ip, LAddr}
- | Opts].
+ [binary, {packet, 0}, {active, once}] ++ LAddrOpt ++ Opts.
%% ---------------------------------------------------------------------------
%% # ports/1