diff options
Diffstat (limited to 'lib/diameter/src')
-rw-r--r-- | lib/diameter/src/Makefile | 2 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter.appup.src | 11 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter.erl | 13 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_capx.erl | 45 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_config.erl | 142 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_lib.erl | 245 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer.erl | 16 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 42 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_reg.erl | 2 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 126 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_stats.erl | 79 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_traffic.erl | 16 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 114 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 79 |
14 files changed, 626 insertions, 306 deletions
diff --git a/lib/diameter/src/Makefile b/lib/diameter/src/Makefile index df10c33268..c0cf418f06 100644 --- a/lib/diameter/src/Makefile +++ b/lib/diameter/src/Makefile @@ -230,7 +230,7 @@ include $(ERL_TOP)/make/otp_release_targets.mk # Can't $(INSTALL_DIR) more than one directory at a time on Solaris. release_spec: opt - for d in bin ebin include src/dict; do \ + for d in bin ebin examples include src/dict; do \ $(INSTALL_DIR) "$(RELSYSDIR)/$$d"; \ done $(INSTALL_SCRIPT) $(BINS:%=../bin/%) "$(RELSYSDIR)/bin" diff --git a/lib/diameter/src/base/diameter.appup.src b/lib/diameter/src/base/diameter.appup.src index 2ce89579ff..359f434941 100644 --- a/lib/diameter/src/base/diameter.appup.src +++ b/lib/diameter/src/base/diameter.appup.src @@ -28,7 +28,13 @@ {"1.2.1", [{restart_application, diameter}]}, {"1.3", [{restart_application, diameter}]}, %% R15B03 {"1.3.1", [{restart_application, diameter}]}, - {"1.4", [{restart_application, diameter}]} %% R16A + {"1.4", [{restart_application, diameter}]}, %% R16A + {"1.4.1", [{load_module, diameter_reg}, %% R16B + {load_module, diameter_stats}, + {load_module, diameter_service}, + {load_module, diameter_watchdog}, + {load_module, diameter_capx}, + {load_module, diameter}]} ], [ {"0.9", [{restart_application, diameter}]}, @@ -39,6 +45,7 @@ {"1.2.1", [{restart_application, diameter}]}, {"1.3", [{restart_application, diameter}]}, {"1.3.1", [{restart_application, diameter}]}, - {"1.4", [{restart_application, diameter}]} + {"1.4", [{restart_application, diameter}]}, + {"1.4.1", [{restart_application, diameter}]} ] }. diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index c67fba5f89..57730cad61 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -45,6 +45,7 @@ -export_type([evaluable/0, restriction/0, + remotes/0, sequence/0, app_alias/0, service_name/0, @@ -292,13 +293,20 @@ call(SvcName, App, Message) -> | [node()] | evaluable(). +-type remotes() + :: boolean() + | [node()] + | evaluable(). + %% Options passed to start_service/2 -type service_opt() :: capability() | {application, [application_opt()]} | {restrict_connections, restriction()} - | {sequence, sequence() | evaluable()}. + | {sequence, sequence() | evaluable()} + | {share_peers, remotes()} + | {use_shared_peers, remotes()}. -type application_opt() :: {alias, app_alias()} @@ -327,7 +335,7 @@ call(SvcName, App, Message) -> -type transport_opt() :: {transport_module, atom()} | {transport_config, any()} - | {transport_config, any(), non_neg_integer() | infinity} + | {transport_config, any(), 'Unsigned32'() | infinity} | {applications, [app_alias()]} | {capabilities, [capability()]} | {capabilities_cb, evaluable()} @@ -336,6 +344,7 @@ call(SvcName, App, Message) -> | {length_errors, exit | handle | discard} | {reconnect_timer, 'Unsigned32'()} | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}} + | {watchdog_config, [{okay|suspect, non_neg_integer()}]} | {private, any()}. %% Predicate passed to remove_transport/2 diff --git a/lib/diameter/src/base/diameter_capx.erl b/lib/diameter/src/base/diameter_capx.erl index 715b15628c..9a443fead0 100644 --- a/lib/diameter/src/base/diameter_capx.erl +++ b/lib/diameter/src/base/diameter_capx.erl @@ -172,7 +172,50 @@ ipaddr(A) -> bCER(#diameter_caps{} = Rec, Dict) -> Values = lists:zip(Dict:'#info-'(diameter_base_CER, fields), tl(tuple_to_list(Rec))), - Dict:'#new-'(diameter_base_CER, Values). + Dict:'#new-'(diameter_base_CER, [{K, map(K, V, Dict)} + || {K,V} <- Values]). + +%% map/3 +%% +%% Deal with differerences in common dictionary AVP's to make changes +%% transparent in service/transport config. In particular, one +%% annoying difference between RFC 3588 and RFC 6733. +%% +%% RFC 6773 changes the definition of Vendor-Specific-Application-Id, +%% giving Vendor-Id arity 1 instead of 3588's 1*. This causes woe +%% since the corresponding dictionaries expect different values for a +%% 'Vendor-Id': a list for 3588, an integer for 6733. + +map('Vendor-Specific-Application-Id', L, Dict) -> + Rec = Dict:'#new-'('diameter_base_Vendor-Specific-Application-Id', []), + Def = Dict:'#get-'('Vendor-Id', Rec), + [vsa(V, Def) || V <- L]; +map(_, V, _) -> + V. + +vsa({_, N, _, _} = Rec, []) + when is_integer(N) -> + setelement(2, Rec, [N]); + +vsa({_, [N], _, _} = Rec, undefined) + when is_integer(N) -> + setelement(2, Rec, N); + +vsa([_|_] = L, Def) -> + [vid(T, Def) || T <- L]; + +vsa(T, _) -> + T. + +vid({'Vendor-Id' = K, N}, []) + when is_integer(N) -> + {K, [N]}; + +vid({'Vendor-Id' = K, [N]}, undefined) -> + {K, N}; + +vid(T, _) -> + T. %% rCER/3 %% diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index 9f73815756..2a145c874b 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -103,6 +103,10 @@ %% Time to lay low before restarting a dead service. -define(RESTART_SLEEP, 2000). +%% Test for a valid timeout. +-define(IS_UINT32(N), + is_integer(N) andalso 0 =< N andalso 0 == N bsr 32). + %% A minimal diameter_caps for checking for valid capabilities values. -define(EXAMPLE_CAPS, #diameter_caps{origin_host = "TheHost", @@ -490,13 +494,11 @@ stop(SvcName) -> %% has many. add(SvcName, Type, Opts) -> - %% Ensure usable capabilities. diameter_service:merge_service/2 - %% depends on this. - lists:foreach(fun(Os) -> - is_list(Os) orelse ?THROW({capabilities, Os}), - ok = encode_CER(Os) - end, - [Os || {capabilities, Os} <- Opts, is_list(Os)]), + %% Ensure acceptable transport options. This won't catch all + %% possible errors (faulty callbacks for example) but it catches + %% many. diameter_service:merge_service/2 depends on usable + %% capabilities for example. + ok = transport_opts(Opts), Ref = make_ref(), T = {Ref, Type, Opts}, @@ -514,6 +516,61 @@ add(SvcName, Type, Opts) -> No end. +transport_opts(Opts) -> + lists:foreach(fun(T) -> opt(T) orelse ?THROW({invalid, T}) end, Opts). + +opt({transport_module, M}) -> + is_atom(M); + +opt({transport_config, _, Tmo}) -> + ?IS_UINT32(Tmo) orelse Tmo == infinity; + +opt({applications, As}) -> + is_list(As); + +opt({capabilities, Os}) -> + is_list(Os) andalso ok == encode_CER(Os); + +opt({capx_timeout, Tmo}) -> + ?IS_UINT32(Tmo); + +opt({length_errors, T}) -> + lists:member(T, [exit, handle, discard]); + +opt({reconnect_timer, Tmo}) -> + ?IS_UINT32(Tmo); + +opt({watchdog_timer, {M,F,A}}) + when is_atom(M), is_atom(F), is_list(A) -> + true; +opt({watchdog_timer, Tmo}) -> + ?IS_UINT32(Tmo); + +opt({watchdog_config, L}) -> + is_list(L) andalso lists:all(fun wdopt/1, L); + +%% Options that we can't validate. +opt({K, _}) + when K == transport_config; + K == capabilities_cb; + K == disconnect_cb; + K == private -> + true; + +%% Anything else, which is ignored by us. This makes options sensitive +%% to spelling mistakes but arbitrary options are passed by some users +%% as a way to identify transports. (That is, can't just do away with +%% it.) +opt(_) -> + true. + +wdopt({K,N}) -> + (K == okay orelse K == suspect) andalso is_integer(N) andalso 0 =< N; +wdopt(_) -> + false. + +%% start_transport/2 + start_transport(SvcName, T) -> case diameter_service:start_transport(SvcName, T) of {ok, _Pid} -> @@ -557,54 +614,69 @@ stop_transport(SvcName, Refs) -> %% make_config/2 make_config(SvcName, Opts) -> - Apps = init_apps(Opts), + AppOpts = [T || {application, _} = T <- Opts], + Apps = init_apps(AppOpts), + [] == Apps andalso ?THROW(no_apps), %% Use the fact that diameter_caps has the same field names as CER. Fields = ?BASE:'#info-'(diameter_base_CER) -- ['AVP'], - COpts = [T || {K,_} = T <- Opts, lists:member(K, Fields)], - Caps = make_caps(#diameter_caps{}, COpts), + CapOpts = [T || {K,_} = T <- Opts, lists:member(K, Fields)], + Caps = make_caps(#diameter_caps{}, CapOpts), - ok = encode_CER(COpts), + ok = encode_CER(CapOpts), - Os = split(Opts, fun opt/2, [{false, share_peers}, - {false, use_shared_peers}, - {false, monitor}, - {?NOMASK, sequence}, - {nodes, restrict_connections}]), - %% share_peers and use_shared_peers are currently undocumented. + SvcOpts = make_opts((Opts -- AppOpts) -- CapOpts, + [{false, share_peers}, + {false, use_shared_peers}, + {false, monitor}, + {?NOMASK, sequence}, + {nodes, restrict_connections}]), #service{name = SvcName, rec = #diameter_service{applications = Apps, capabilities = Caps}, - options = Os}. + options = SvcOpts}. + +make_opts(Opts, Defs) -> + Known = [{K, get_opt(K, Opts, D)} || {D,K} <- Defs], + Unknown = Opts -- Known, -split(Opts, F, Defs) -> - [{K, F(K, get_opt(K, Opts, D))} || {D,K} <- Defs]. + [] == Unknown orelse ?THROW({invalid, hd(Unknown)}), + + [{K, opt(K,V)} || {K,V} <- Known]. opt(K, false = B) when K /= sequence -> B; opt(K, true = B) - when K == share_peer; + when K == share_peers; K == use_shared_peers -> B; -opt(monitor, P) - when is_pid(P) -> - P; - opt(restrict_connections, T) when T == node; - T == nodes; - T == []; - is_atom(hd(T)) -> + T == nodes -> + T; + +opt(K, T) + when (K == share_peers + orelse K == use_shared_peers + orelse K == restrict_connections), ([] == T + orelse is_atom(hd(T))) -> T; -opt(restrict_connections = K, F) -> - try diameter_lib:eval(F) of %% no guarantee that it won't fail later +opt(monitor, P) + when is_pid(P) -> + P; + +opt(K, F) + when K == restrict_connections; + K == share_peers; + K == use_shared_peers -> + try diameter_lib:eval(F) of %% but no guarantee that it won't fail later Nodes when is_list(Nodes) -> F; V -> @@ -629,7 +701,7 @@ opt(K, _) -> ?THROW({value, K}). sequence({H,N} = T) - when 0 =< N, N =< 32, 0 =< H, 0 == H bsr N -> + when 0 =< N, N =< 32, 0 =< H, 0 == H bsr (32-N) -> T; sequence(_) -> @@ -664,8 +736,8 @@ encode_CER(Opts) -> init_apps(Opts) -> lists:foldl(fun app_acc/2, [], lists:reverse(Opts)). -app_acc({application, Opts}, Acc) -> - is_list(Opts) orelse ?THROW({application, Opts}), +app_acc({application, Opts} = T, Acc) -> + is_list(Opts) orelse ?THROW(T), [Dict, Mod] = get_opt([dictionary, module], Opts), Alias = get_opt(alias, Opts, Dict), @@ -681,9 +753,7 @@ app_acc({application, Opts}, Acc) -> mutable = M, options = [{answer_errors, A}, {request_errors, P}]} - | Acc]; -app_acc(_, Acc) -> - Acc. + | Acc]. init_mod(#diameter_callback{} = R) -> init_mod([diameter_callback, R]); diff --git a/lib/diameter/src/base/diameter_lib.erl b/lib/diameter/src/base/diameter_lib.erl index 362d593b24..44d81e2778 100644 --- a/lib/diameter/src/base/diameter_lib.erl +++ b/lib/diameter/src/base/diameter_lib.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2013. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -19,77 +19,86 @@ -module(diameter_lib). --export([report/2, info_report/2, +-export([info_report/2, error_report/2, warning_report/2, now_diff/1, time/1, eval/1, - ip4address/1, - ip6address/1, ipaddr/1, spawn_opts/2, wait/1, fold_tuple/3, log/4]). --include("diameter_internal.hrl"). - %% --------------------------------------------------------------------------- -%% # info_report(Reason, MFA) -%% -%% Input: Reason = Arbitrary term indicating the reason for the report. -%% MFA = {Module, Function, Args} to report. -%% -%% Output: true +%% # info_report/2 %% --------------------------------------------------------------------------- -report(Reason, MFA) -> - info_report(Reason, MFA). +-spec info_report(Reason :: term(), T :: term()) + -> true. -info_report(Reason, MFA) -> - report(fun error_logger:info_report/1, Reason, MFA), +info_report(Reason, T) -> + report(fun error_logger:info_report/1, Reason, T), true. -%%% --------------------------------------------------------------------------- -%%% # error_report(Reason, MFA) -%%% # warning_report(Reason, MFA) -%%% -%%% Output: false -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # error_report/2 +%% # warning_report/2 +%% --------------------------------------------------------------------------- + +-spec error_report(Reason :: term(), T :: term()) + -> false. + +error_report(Reason, T) -> + report(fun error_logger:error_report/1, Reason, T). -error_report(Reason, MFA) -> - report(fun error_logger:error_report/1, Reason, MFA). +-spec warning_report(Reason :: term(), T :: term()) + -> false. -warning_report(Reason, MFA) -> - report(fun error_logger:warning_report/1, Reason, MFA). +warning_report(Reason, T) -> + report(fun error_logger:warning_report/1, Reason, T). -report(Fun, Reason, MFA) -> - Fun([{why, Reason}, {who, self()}, {what, MFA}]), +report(Fun, Reason, T) -> + Fun([{why, Reason}, {who, self()}, {what, T}]), false. -%%% --------------------------------------------------------------------------- -%%% # now_diff(Time) -%%% -%%% Description: Return timer:now_diff(now(), Time) as an {H, M, S, MicroS} -%%% tuple instead of as integer microseconds. -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # now_diff/1 +%% --------------------------------------------------------------------------- + +-spec now_diff(NowT) + -> {Hours, Mins, Secs, MicroSecs} + when NowT :: {non_neg_integer(), 0..999999, 0..999999}, + Hours :: non_neg_integer(), + Mins :: 0..59, + Secs :: 0..59, + MicroSecs :: 0..999999. + +%% Return timer:now_diff(now(), NowT) as an {H, M, S, MicroS} tuple +%% instead of as integer microseconds. now_diff({_,_,_} = Time) -> - time(timer:now_diff(erlang:now(), Time)). - -%%% --------------------------------------------------------------------------- -%%% # time(Time) -%%% -%%% Input: Time = {MegaSec, Sec, MicroSec} -%%% | MicroSec -%%% -%%% Output: {H, M, S, MicroS} -%%% --------------------------------------------------------------------------- - -time({_,_,_} = Time) -> %% time of day + time(timer:now_diff(now(), Time)). + +%% --------------------------------------------------------------------------- +%% # time/1 +%% +%% Return an elapsed time as an {H, M, S, MicroS} tuple. +%% --------------------------------------------------------------------------- + +-spec time(NowT | Diff) + -> {Hours, Mins, Secs, MicroSecs} + when NowT :: {non_neg_integer(), 0..999999, 0..999999}, + Diff :: non_neg_integer(), + Hours :: non_neg_integer(), + Mins :: 0..59, + Secs :: 0..59, + MicroSecs :: 0..999999. + +time({_,_,_} = NowT) -> %% time of day %% 24 hours = 24*60*60*1000000 = 86400000000 microsec - time(timer:now_diff(Time, {0,0,0}) rem 86400000000); + time(timer:now_diff(NowT, {0,0,0}) rem 86400000000); time(Micro) -> %% elapsed time Seconds = Micro div 1000000, @@ -98,9 +107,21 @@ time(Micro) -> %% elapsed time S = Seconds rem 60, {H, M, S, Micro rem 1000000}. -%%% --------------------------------------------------------------------------- -%%% # eval(Func) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # eval/1 +%% +%% Evaluate a function in various forms. +%% --------------------------------------------------------------------------- + +-type f() :: {module(), atom(), list()} + | nonempty_maybe_improper_list(fun(), list()) + | fun(). + +-spec eval(Fun) + -> term() + when Fun :: f() + | {f()} + | nonempty_maybe_improper_list(f(), list()). eval({M,F,A}) -> apply(M,F,A); @@ -120,66 +141,15 @@ eval({F}) -> eval(F) -> F(). -%%% --------------------------------------------------------------------------- -%%% # ip4address(Addr) -%%% -%%% Input: string() (eg. "10.0.0.1") -%%% | list of integer() -%%% | tuple of integer() -%%% -%%% Output: {_,_,_,_} of integer -%%% -%%% Exceptions: error: {invalid_address, Addr, erlang:get_stacktrace()} -%%% --------------------------------------------------------------------------- - -ip4address([_,_,_,_] = Addr) -> %% Length 4 string can't be an address. - ipaddr(list_to_tuple(Addr)); - -%% Be brutal. -ip4address(Addr) -> - try - {_,_,_,_} = ipaddr(Addr) - catch - error: _ -> - erlang:error({invalid_address, Addr, ?STACK}) - end. - -%%% --------------------------------------------------------------------------- -%%% # ip6address(Addr) -%%% -%%% Input: string() (eg. "1080::8:800:200C:417A") -%%% | list of integer() -%%% | tuple of integer() -%%% -%%% Output: {_,_,_,_,_,_,_,_} of integer -%%% -%%% Exceptions: error: {invalid_address, Addr, erlang:get_stacktrace()} -%%% --------------------------------------------------------------------------- - -ip6address([_,_,_,_,_,_,_,_] = Addr) -> %% Length 8 string can't be an address. - ipaddr(list_to_tuple(Addr)); - -%% Be brutal. -ip6address(Addr) -> - try - {_,_,_,_,_,_,_,_} = ipaddr(Addr) - catch - error: _ -> - erlang:error({invalid_address, Addr, ?STACK}) - end. - -%%% --------------------------------------------------------------------------- -%%% # ipaddr(Addr) -%%% -%%% Input: string() | tuple of integer() -%%% -%%% Output: {_,_,_,_} | {_,_,_,_,_,_,_,_} -%%% -%%% Exceptions: error: {invalid_address, erlang:get_stacktrace()} -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # ipaddr/1 +%% +%% Parse an IP address. +%% --------------------------------------------------------------------------- --spec ipaddr(string() | tuple()) - -> inet:ip_address(). +-spec ipaddr([byte()] | tuple()) + -> inet:ip_address() + | none(). %% Don't convert lists of integers since a length 8 list like %% [$1,$0,$.,$0,$.,$0,$.,$1] is ambiguous: is it "10.0.0.1" or @@ -193,7 +163,7 @@ ipaddr(Addr) -> ip(Addr) catch error: _ -> - erlang:error({invalid_address, ?STACK}) + erlang:error({invalid_address, erlang:get_stacktrace()}) end. %% Already a tuple: ensure non-negative integers of the right size. @@ -210,11 +180,12 @@ ip(Addr) -> {ok, A} = inet_parse:address(Addr), %% documented in inet(3) A. -%%% --------------------------------------------------------------------------- -%%% # spawn_opts(Type, Opts) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # spawn_opts/2 +%% --------------------------------------------------------------------------- -%% TODO: config variables. +-spec spawn_opts(server|worker, list()) + -> list(). spawn_opts(server, Opts) -> opts(75000, Opts); @@ -224,24 +195,32 @@ spawn_opts(worker, Opts) -> opts(HeapSize, Opts) -> [{min_heap_size, HeapSize} | lists:keydelete(min_heap_size, 1, Opts)]. -%%% --------------------------------------------------------------------------- -%%% # wait(MRefs) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # wait/1 +%% --------------------------------------------------------------------------- + +-spec wait([pid()]) + -> ok. wait(L) -> - w([erlang:monitor(process, P) || P <- L]). + down([erlang:monitor(process, P) || P <- L]). -w([]) -> +down([]) -> ok; -w(L) -> - receive - {'DOWN', MRef, process, _, _} -> - w(lists:delete(MRef, L)) - end. +down([MRef|T]) -> + receive {'DOWN', MRef, process, _, _} -> ok end, + down(T). -%%% --------------------------------------------------------------------------- -%%% # fold_tuple(N, T0, T) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # fold_tuple/3 +%% --------------------------------------------------------------------------- + +-spec fold_tuple(N, T0, T) + -> tuple() + when N :: pos_integer(), + T0 :: tuple(), + T :: tuple() + | undefined. %% Replace fields in T0 by those of T starting at index N, unless the %% new value is 'undefined'. @@ -262,11 +241,11 @@ ft(undefined, {_, T}) -> ft(Value, {Idx, T}) -> setelement(Idx, T, Value). -%%% ---------------------------------------------------------- -%%% # log(Slogan, Mod, Line, Details) -%%% -%%% Called to have something to trace on for happenings of interest. -%%% ---------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # log/4 +%% +%% Called to have something to trace on for happenings of interest. +%% --------------------------------------------------------------------------- -log(_, _, _, _) -> +log(_Slogan, _Mod, _Line, _Details) -> ok. diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index 130bedda84..0d2efd4d1f 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -24,14 +24,15 @@ %% Interface towards transport modules ... -export([recv/2, up/1, - up/2]). + up/2, + up/3]). %% ... and the stack. -export([start/1, send/2, close/1, abort/1, - notify/2]). + notify/3]). %% Server start. -export([start_link/0]). @@ -63,11 +64,11 @@ -define(DEFAULT_TTMO, infinity). %%% --------------------------------------------------------------------------- -%%% # notify/2 +%%% # notify/3 %%% --------------------------------------------------------------------------- -notify(SvcName, T) -> - rpc:abcast(nodes(), ?SERVER, {notify, SvcName, T}). +notify(Nodes, SvcName, T) -> + rpc:abcast(Nodes, ?SERVER, {notify, SvcName, T}). %%% --------------------------------------------------------------------------- %%% # start/1 @@ -180,7 +181,7 @@ start(Mod, Args) -> apply(Mod, start, Args). %%% --------------------------------------------------------------------------- -%%% # up/[12] +%%% # up/1-3 %%% --------------------------------------------------------------------------- up(Pid) -> %% accepting transport @@ -189,6 +190,9 @@ up(Pid) -> %% accepting transport up(Pid, Remote) -> %% connecting transport ifc_send(Pid, {self(), connected, Remote}). +up(Pid, Remote, LAddrs) -> %% connecting transport + ifc_send(Pid, {self(), connected, Remote, LAddrs}). + %%% --------------------------------------------------------------------------- %%% # recv/2 %%% --------------------------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 66342f7b62..6be4259510 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -198,6 +198,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {Mask, OnLengthErr = proplists:get_value(length_errors, Opts, exit), lists:member(OnLengthErr, [exit, handle, discard]) orelse ?ERROR({invalid, {length_errors, OnLengthErr}}), + %% Error checking is for configuration added in old code. {TPid, Addrs} = start_transport(T, Rest, Svc), @@ -212,9 +213,6 @@ i({Ack, WPid, {M, Ref} = T, Opts, {Mask, %% transports on the same service can use different local addresses. %% The local addresses are put into Host-IP-Address avps here when %% sending capabilities exchange messages. -%% -%% Invalid transport config may cause us to crash but note that the -%% watchdog start (start/2) succeeds regardless. %% Wait for the caller to have a monitor to avoid a race with our %% death. (Since the exit reason is used in diameter_service.) @@ -353,10 +351,17 @@ transition({diameter, {TPid, connected, Remote}}, mode = M} = S) -> {'Wait-Conn-Ack', _} = PS, %% assert - connect = M, %% + connect = M, %% keep_transport(TPid), send_CER(S#state{mode = {M, Remote}}); +transition({diameter, {TPid, connected, Remote, LAddrs}}, + #state{transport = TPid, + service = Svc} + = S) -> + transition({diameter, {TPid, connected, Remote}}, + S#state{service = readdr(Svc, LAddrs)}); + %% Connection from peer. transition({diameter, {TPid, connected}}, #state{transport = TPid, @@ -365,7 +370,7 @@ transition({diameter, {TPid, connected}}, parent = Pid} = S) -> {'Wait-Conn-Ack', Tmo} = PS, %% assert - accept = M, %% + accept = M, %% keep_transport(TPid), Pid ! {accepted, self()}, start_timer(Tmo, S#state{state = recv_CER}); @@ -378,6 +383,8 @@ transition({diameter, {_, connected}}, _) -> {stop, connection_timeout}; transition({diameter, {_, connected, _}}, _) -> {stop, connection_timeout}; +transition({diameter, {_, connected, _, _}}, _) -> + {stop, connection_timeout}; %% Connection has timed out: start an alternate. transition({connection_timeout = T, TPid}, @@ -846,8 +853,12 @@ a('DPR', #diameter_caps{origin_host = {Host, _}, %% recv_CER/2 recv_CER(CER, #state{service = Svc, dictionary = Dict}) -> - {ok, T} = diameter_capx:recv_CER(CER, Svc, Dict), - T. + case diameter_capx:recv_CER(CER, Svc, Dict) of + {ok, T} -> + T; + {error, Reason} -> + close({'CER', CER, Svc, Dict, Reason}) + end. %% handle_CEA/1 @@ -907,8 +918,12 @@ recv_CEA(#diameter_packet{header = #diameter_header{version errors = []}, #state{service = Svc, dictionary = Dict}) -> - {ok, T} = diameter_capx:recv_CEA(CEA, Svc, Dict), - T; + case diameter_capx:recv_CEA(CEA, Svc, Dict) of + {ok, T} -> + T; + {error, Reason} -> + close({'CEA', CEA, Svc, Dict, Reason}) + end; recv_CEA(Pkt, S) -> close({'CEA', caps(S), Pkt}). @@ -987,8 +1002,17 @@ capz(#diameter_caps{} = L, #diameter_caps{} = R) -> %% close/1 close(Reason) -> + report(Reason), throw({?MODULE, close, Reason}). +%% Could possibly log more here. +report({M, _, _, _, _} = T) + when M == 'CER'; + M == 'CEA' -> + diameter_lib:error_report(failure, T); +report(_) -> + ok. + %% dwa/1 dwa(#diameter_caps{origin_host = OH, diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index ac58e4bf5b..3197c1aee1 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -138,7 +138,7 @@ del(T) -> %% associations removed.) %% =========================================================================== --spec match(tuple()) +-spec match(any()) -> [{term(), pid()}]. match(Pat) -> diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index f1342df16c..112e83476d 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -125,9 +125,9 @@ monitor = false :: false | pid(), %% process to die with options :: [{sequence, diameter:sequence()} %% sequence mask - | {restrict_connections, diameter:restriction()} - | {share_peers, boolean()} %% broadcast peers to remote nodes? - | {use_shared_peers, boolean()}]}).%% use broadcasted peers? + | {share_peers, diameter:remotes()} %% broadcast to + | {use_shared_peers, diameter:remotes()} %% use from + | {restrict_connections, diameter:restriction()}]}). %% shared_peers reflects the peers broadcast from remote nodes. %% Record representing an RFC 3539 watchdog process implemented by @@ -681,11 +681,9 @@ mref(false = No) -> mref(P) -> erlang:monitor(process, P). -init_shared(#state{options = [_, _, {_, true} | _], +init_shared(#state{options = [_, _, {_,T} | _], service_name = Svc}) -> - diameter_peer:notify(Svc, {service, self()}); -init_shared(#state{options = [_, _, {_, false} | _]}) -> - ok. + notify(T, Svc, {service, self()}). init_mod(#diameter_app{alias = Alias, init_state = S}) -> @@ -698,6 +696,37 @@ get_value(Key, Vs) -> {_, V} = lists:keyfind(Key, 1, Vs), V. +notify(Share, SvcName, T) -> + Nodes = remotes(Share), + [] /= Nodes andalso diameter_peer:notify(Nodes, SvcName, T). +%% Test for the empty list for upgrade reasons: there's no +%% diameter_peer:notify/3 in old code so no call means no load order +%% requirement. + +remotes(false) -> + []; + +remotes(true) -> + nodes(); + +remotes(Nodes) + when is_atom(hd(Nodes)); + Nodes == [] -> + Nodes; + +remotes(F) -> + try diameter_lib:eval(F) of + L when is_list(L) -> + L; + T -> + diameter_lib:error_report({invalid_return, T}, F), + [] + catch + E:R -> + diameter_lib:error_report({failure, {E, R, ?STACK}}, F), + [] + end. + %% --------------------------------------------------------------------------- %% # start/3 %% --------------------------------------------------------------------------- @@ -832,17 +861,21 @@ watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) -> %% Watchdog has an unresponsive connection. watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) -> #watchdog{peer = TPid} = Wd, %% assert - connection_down(Wd, To, State); + watchdog_down(Wd, To, State); %% Watchdog has lost its connection. watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) -> close(Wd, S), - connection_down(Wd, To, S), + watchdog_down(Wd, To, S), ets:delete(PeerT, TPid); watchdog(_, [], _, _, _, _) -> ok. +watchdog_down(Wd, To, #state{watchdogT = WatchdogT} = S) -> + insert(WatchdogT, Wd#watchdog{state = To}), + connection_down(Wd, To, S). + %% --------------------------------------------------------------------------- %% # connection_up/3 %% --------------------------------------------------------------------------- @@ -1000,21 +1033,17 @@ connection_down(#watchdog{state = ?WD_OKAY, remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), diameter_traffic:peer_down(TPid); -connection_down(#watchdog{}, #peer{}, _) -> - ok; - -connection_down(#watchdog{state = WS, +connection_down(#watchdog{state = ?WD_OKAY, peer = TPid} = Wd, To, - #state{watchdogT = WatchdogT, - peerT = PeerT} + #state{peerT = PeerT} = S) when is_atom(To) -> - insert(WatchdogT, Wd#watchdog{state = To}), - ?WD_OKAY == WS - andalso - connection_down(Wd, fetch(PeerT, TPid), S). + connection_down(Wd, #peer{} = fetch(PeerT, TPid), S); + +connection_down(#watchdog{}, _, _) -> + ok. remove_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). @@ -1233,12 +1262,12 @@ report_status(Status, peer = TPid, type = Type, options = Opts}, - #peer{apps = [_|_] = As, + #peer{apps = [_|_] = Apps, caps = Caps}, #state{service_name = SvcName} = S, Extra) -> - share_peer(Status, Caps, As, TPid, S), + share_peer(Status, Caps, Apps, TPid, S), Info = [Status, Ref, {TPid, Caps}, {type(Type), Opts} | Extra], send_event(SvcName, list_to_tuple(Info)). @@ -1255,9 +1284,9 @@ send_event(#diameter_event{service = SvcName} = E) -> %% # share_peer/5 %% --------------------------------------------------------------------------- -share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _], - service_name = Svc}) -> - diameter_peer:notify(Svc, {peer, TPid, Aliases, Caps}); +share_peer(up, Caps, Apps, TPid, #state{options = [_, {_,T} | _], + service_name = Svc}) -> + notify(T, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps}); share_peer(_, _, _, _, _) -> ok. @@ -1266,34 +1295,34 @@ share_peer(_, _, _, _, _) -> %% # share_peers/2 %% --------------------------------------------------------------------------- -share_peers(Pid, #state{options = [_, {_, true} | _], - local_peers = PDict}) -> - ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict); - -share_peers(_, _) -> - ok. +share_peers(Pid, #state{options = [_, {_,T} | _], local_peers = PDict}) -> + is_remote(Pid, T) + andalso ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict). sp(Pid, Alias, Peers) -> lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers). +is_remote(Pid, T) -> + Node = node(Pid), + Node /= node() andalso lists:member(Node, remotes(T)). + %% --------------------------------------------------------------------------- %% # remote_peer_up/4 %% --------------------------------------------------------------------------- -remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _], - service = Svc, - shared_peers = PDict}) -> +remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) -> + is_remote(Pid, T) + andalso rpu(Pid, Aliases, Caps, S). + +rpu(Pid, Aliases, Caps, #state{service = Svc, shared_peers = PDict}) -> #diameter_service{applications = Apps} = Svc, Key = #diameter_app.alias, - As = lists:filter(fun(A) -> lists:keymember(A, Key, Apps) end, Aliases), - rpu(Pid, Caps, PDict, As); - -remote_peer_up(_, _, _, #state{options = [_, _, {_, false} | _]}) -> - ok. + F = fun(A) -> lists:keymember(A, Key, Apps) end, + rpu(Pid, lists:filter(F, Aliases), Caps, PDict); -rpu(_, _, PDict, []) -> - PDict; -rpu(Pid, Caps, PDict, Aliases) -> +rpu(_, [] = No, _, _) -> + No; +rpu(Pid, Aliases, Caps, PDict) -> erlang:monitor(process, Pid), T = {Pid, Caps}, lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases). @@ -1302,8 +1331,7 @@ rpu(Pid, Caps, PDict, Aliases) -> %% # remote_peer_down/2 %% --------------------------------------------------------------------------- -remote_peer_down(Pid, #state{options = [_, _, {_, true} | _], - shared_peers = PDict}) -> +remote_peer_down(Pid, #state{shared_peers = PDict}) -> lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)). rpd(Pid, Alias, PDict) -> @@ -1626,16 +1654,10 @@ info_stats(#state{watchdogT = WatchdogT}) -> info_transport(S) -> PeerD = peer_dict(S, config_dict(S)), - RefsD = dict:map(fun(_, Ls) -> [P || L <- Ls, {peer, {P,_}} <- L] end, - PeerD), - Refs = lists:append(dict:fold(fun(R, Ps, A) -> [[R|Ps] | A] end, - [], - RefsD)), - Stats = diameter_stats:read(Refs), + Stats = diameter_stats:sum(dict:fetch_keys(PeerD)), dict:fold(fun(R, Ls, A) -> - Ps = dict:fetch(R, RefsD), - [[{ref, R} | transport(Ls)] ++ [stats([R|Ps], Stats)] - | A] + Cs = proplists:get_value(R, Stats, []), + [[{ref, R} | transport(Ls)] ++ [{statistics, Cs}] | A] end, [], PeerD). diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl index 8fd5ded300..b68d4af11f 100644 --- a/lib/diameter/src/base/diameter_stats.erl +++ b/lib/diameter/src/base/diameter_stats.erl @@ -28,6 +28,7 @@ -export([reg/2, reg/1, incr/3, incr/1, read/1, + sum/1, flush/1]). %% supervisor callback @@ -77,10 +78,14 @@ reg(Pid, Ref) when is_pid(Pid) -> - call({reg, Pid, Ref}). + try + call({reg, Pid, Ref}) + catch + exit: _ -> false + end. -spec reg(ref()) - -> true. + -> boolean(). reg(Ref) -> reg(self(), Ref). @@ -111,11 +116,19 @@ incr(Ctr) -> %% Retrieve counters for the specified contributors. %% --------------------------------------------------------------------------- +%% Read in the server process to ensure that counters for a dying +%% contributor aren't folded concurrently with select. + -spec read([ref()]) -> [{ref(), [{counter(), integer()}]}]. -read(Refs) -> - read(Refs, false). +read(Refs) + when is_list(Refs) -> + try call({read, Refs, false}) of + L -> to_refdict(L) + catch + exit: _ -> [] + end. read(Refs, B) -> MatchSpec = [{{{'_', '$1'}, '_'}, @@ -124,11 +137,52 @@ read(Refs, B) -> ['$_']}], L = ets:select(?TABLE, MatchSpec), B andalso delete(L), + L. + +to_refdict(L) -> lists:foldl(fun({{C,R}, N}, D) -> orddict:append(R, {C,N}, D) end, orddict:new(), L). %% --------------------------------------------------------------------------- +%% # sum(Refs) +%% +%% Retrieve counters summed over all contributors for each term. +%% --------------------------------------------------------------------------- + +-spec sum([ref()]) + -> [{ref(), [{counter(), integer()}]}]. + +sum(Refs) + when is_list(Refs) -> + try call({read, Refs}) of + L -> [{R, to_ctrdict(Cs)} || {R, [_|_] = Cs} <- L] + catch + exit: _ -> [] + end. + +read_refs(Refs) -> + [{R, readr(R)} || R <- Refs]. + +readr(Ref) -> + MatchSpec = [{{{'_', '$1'}, '_'}, + [?ORCOND([{'=:=', '$1', {const, R}} + || R <- [Ref | pids(Ref)]])], + ['$_']}], + ets:select(?TABLE, MatchSpec). + +pids(Ref) -> + MatchSpec = [{{'$1', '$2'}, + [{'=:=', '$2', {const, Ref}}], + ['$1']}], + ets:select(?TABLE, MatchSpec). + +to_ctrdict(L) -> + lists:foldl(fun({{C,_}, N}, D) -> orddict:update_counter(C, N, D) end, + orddict:new(), + L). + +%% --------------------------------------------------------------------------- %% # flush(Refs) %% %% Retrieve and delete statistics for the specified contributors. @@ -138,11 +192,10 @@ read(Refs, B) -> -> [{ref(), {counter(), integer()}}]. flush(Refs) -> - try - call({flush, Refs}) + try call({read, Refs, true}) of + L -> to_refdict(L) catch - exit: _ -> - [] + exit: _ -> [] end. %% =========================================================================== @@ -186,8 +239,14 @@ handle_call({reg, Pid, Ref}, _From, State) -> B andalso erlang:monitor(process, Pid), {reply, B, State}; -handle_call({flush, Refs}, _From, State) -> - {reply, read(Refs, true), State}; +handle_call({read, Refs, Del}, _From, State) -> + {reply, read(Refs, Del), State}; + +handle_call({read, Refs}, _, State) -> + {reply, read_refs(Refs), State}; + +handle_call({flush, Refs}, _From, State) -> %% from old code + {reply, to_refdict(read(Refs, true)), State}; handle_call(Req, From, State) -> ?UNEXPECTED([Req, From]), diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index f527f7c754..25b902e3f2 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -1479,12 +1479,14 @@ send({TPid, Pkt, #request{handler = Pid} = Req, SvcName, Timeout, TRef}) -> Req#request{handler = self()}, SvcName, Timeout), - Pid ! reref(receive T -> T end, Ref, TRef). - -reref({T, Ref, R}, Ref, TRef) -> - {T, TRef, R}; -reref(T, _, _) -> - T. + receive + {answer, _, _, _, _} = A -> + Pid ! A; + {failover = T, Ref} -> + Pid ! {T, TRef}; + T -> + exit({timeout, Ref, TPid} = T) + end. %% send/2 @@ -1559,7 +1561,7 @@ resend_request(Pkt0, store_request(TPid, Bin, Req, Timeout) -> Seqs = diameter_codec:sequence_numbers(Bin), - TRef = erlang:start_timer(Timeout, self(), timeout), + TRef = erlang:start_timer(Timeout, self(), TPid), ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}), ets:member(?REQUEST_TABLE, TPid) orelse (self() ! {failover, TRef}), %% failover/1 may have missed diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 073a415d10..41c493ff20 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -47,6 +47,14 @@ -define(BASE, ?DIAMETER_DICT_COMMON). +-define(IS_NATURAL(N), (is_integer(N) andalso 0 =< N)). + +-define(CHOOSE(B,T,F), if (B) -> T; true -> F end). + +-record(config, + {suspect = 1 :: non_neg_integer(), %% OKAY -> SUSPECT + okay = 3 :: non_neg_integer()}). %% REOPEN -> OKAY + -record(watchdog, {%% PCB - Peer Control Block; see RFC 3539, Appendix A status = initial :: initial | okay | suspect | down | reopen, @@ -54,7 +62,8 @@ tw :: 6000..16#FFFFFFFF | {module(), atom(), list()}, %% {M,F,A} -> integer() >= 0 num_dwa = 0 :: -1 | non_neg_integer(), - %% number of DWAs received during reopen + %% number of DWAs received in reopen, + %% or number of timeouts before okay -> suspect %% end PCB parent = self() :: pid(), %% service process transport :: pid() | undefined, %% peer_fsm process @@ -64,7 +73,8 @@ %% term passed into diameter_service with incoming message sequence :: diameter:sequence(), %% mask restrict :: {diameter:restriction(), boolean()}, - shutdown = false :: boolean()}). + shutdown = false :: boolean(), + config :: #config{}}). %% --------------------------------------------------------------------------- %% start/2 @@ -129,7 +139,8 @@ i({Ack, T, Pid, {RecvData, receive_data = RecvData, dictionary = Dict0, sequence = Mask, - restrict = {Restrict, lists:member(node(), Nodes)}}. + restrict = {Restrict, lists:member(node(), Nodes)}, + config = config(Opts)}. wait(Ref, Pid) -> receive @@ -139,6 +150,27 @@ wait(Ref, Pid) -> exit({shutdown, D}) end. +%% config/1 +%% +%% Could also configure counts for SUSPECT to DOWN and REOPEN to DOWN, +%% but don't. + +config(Opts) -> + Config = proplists:get_value(watchdog_config, Opts, []), + is_list(Config) orelse config_error({watchdog_config, Config}), + lists:foldl(fun config/2, #config{}, Config). %% ^ added in old code + +config({suspect, N}, Rec) + when ?IS_NATURAL(N) -> + Rec#config{suspect = N}; + +config({okay, N}, Rec) + when ?IS_NATURAL(N) -> + Rec#config{okay = N}; + +config(T, _) -> %% added in old code + config_error(T). + %% start/5 start(T, Opts, Mask, Nodes, Dict0, Svc) -> @@ -193,7 +225,8 @@ dict0(_, _, Acc) -> Acc. config_error(T) -> - ?ERROR({configuration_error, T}). + diameter_lib:error_report(configuration_error, T), + exit({shutdown, {configuration_error, T}}). %% handle_call/3 @@ -219,6 +252,17 @@ handle_info(T, #watchdog{} = State) -> ?LOG(stop, T), event(T, State, State#watchdog{status = down}), {stop, {shutdown, T}, State} + end; + +handle_info(T, State) -> %% started in old code + handle_info(T, upgrade(State)). + +upgrade(State) -> + case erlang:append_element(State, #config{}) of + #watchdog{status = okay, config = #config{suspect = OS}} = S -> + S#watchdog{num_dwa = OS}; + #watchdog{} = S -> + S end. close({'DOWN', _, process, TPid, {shutdown, Reason}}, @@ -331,11 +375,13 @@ transition({accepted = T, TPid}, #watchdog{transport = TPid, transition({open, TPid, Hosts, _} = Open, #watchdog{transport = TPid, status = initial, - restrict = {_, R}} + restrict = {_,R}, + config = #config{suspect = OS}} = S) -> case okay(getr(restart), Hosts, R) of okay -> - set_watchdog(S#watchdog{status = okay}); + set_watchdog(S#watchdog{status = okay, + num_dwa = OS}); reopen -> transition(Open, S#watchdog{status = down}) end; @@ -347,15 +393,22 @@ transition({open, TPid, Hosts, _} = Open, transition({open = Key, TPid, _Hosts, T}, #watchdog{transport = TPid, - status = down} + status = down, + config = #config{suspect = OS, + okay = RO}} = S) -> - %% Store the info we need to notify the parent to reopen the - %% connection after the requisite DWA's are received, at which - %% time we eraser(open). The reopen message is a later addition, - %% to communicate the new capabilities as soon as they're known. - putr(Key, {TPid, T}), - set_watchdog(send_watchdog(S#watchdog{status = reopen, - num_dwa = 0})); + case RO of + 0 -> %% non-standard: skip REOPEN + set_watchdog(S#watchdog{status = okay, + num_dwa = OS}); + _ -> + %% Store the info we need to notify the parent to reopen + %% the connection after the requisite DWA's are received, + %% at which time we eraser(open). + putr(Key, {TPid, T}), + set_watchdog(send_watchdog(S#watchdog{status = reopen, + num_dwa = 0})) + end; %% OKAY Connection down CloseConnection() %% Failover() @@ -374,7 +427,7 @@ transition({'DOWN', _, process, TPid, _Reason}, #watchdog{transport = TPid, status = T} = S) -> - set_watchdog(S#watchdog{status = case T of initial -> T; _ -> down end, + set_watchdog(S#watchdog{status = ?CHOOSE(initial == T, T, down), pending = false, transport = undefined}); @@ -553,22 +606,27 @@ rcv(_, #watchdog{status = okay} = S) -> %% SUSPECT Receive non-DWA Failback() %% SetWatchdog() OKAY -rcv('DWA', #watchdog{status = suspect} = S) -> +rcv('DWA', #watchdog{status = suspect, config = #config{suspect = OS}} = S) -> set_watchdog(S#watchdog{status = okay, + num_dwa = OS, pending = false}); -rcv(_, #watchdog{status = suspect} = S) -> - set_watchdog(S#watchdog{status = okay}); +rcv(_, #watchdog{status = suspect, config = #config{suspect = OS}} = S) -> + set_watchdog(S#watchdog{status = okay, + num_dwa = OS}); %% REOPEN Receive DWA & Pending = FALSE %% NumDWA == 2 NumDWA++ %% Failback() OKAY rcv('DWA', #watchdog{status = reopen, - num_dwa = 2 = N} - = S) -> + num_dwa = N, + config = #config{suspect = OS, + okay = RO}} + = S) + when N+1 == RO -> S#watchdog{status = okay, - num_dwa = N+1, + num_dwa = OS, pending = false}; %% REOPEN Receive DWA & Pending = FALSE @@ -607,9 +665,17 @@ timeout(#watchdog{status = T, %% Pending SetWatchdog() SUSPECT timeout(#watchdog{status = okay, - pending = true} - = S) -> - S#watchdog{status = suspect}; + pending = true, + num_dwa = N} + = S) -> + case N of + 1 -> + S#watchdog{status = suspect}; + 0 -> %% non-standard: never move to suspect + S; + N -> %% non-standard: more timeouts before moving + S#watchdog{num_dwa = N-1} + end; %% SUSPECT Timer expires CloseConnection() %% SetWatchdog() DOWN diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 132088b514..cbbba714ac 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -100,6 +100,18 @@ %% # start/3 %% --------------------------------------------------------------------------- +-spec start({accept, Ref}, Svc, [Opt]) + -> {ok, pid(), [inet:ip_address()]} + when Ref :: diameter:transport_ref(), + Svc :: #diameter_service{}, + Opt :: diameter:transport_opt(); + ({connect, Ref}, Svc, [Opt]) + -> {ok, pid(), [inet:ip_address()]} + | {ok, pid()} + when Ref :: diameter:transport_ref(), + Svc :: #diameter_service{}, + Opt :: diameter:transport_opt(). + start({T, Ref}, #diameter_service{capabilities = Caps}, Opts) -> diameter_tcp_sup:start(), %% start tcp supervisors on demand {Mod, Rest} = split(Opts), @@ -172,7 +184,7 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), - Sock = i(T, Ref, Mod, Pid, SslOpts, Rest, Addrs), + Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs), MPid ! {stop, self()}, %% tell the monitor to die M = if SslOpts -> ssl; true -> Mod end, setopts(M, Sock), @@ -199,14 +211,21 @@ i(#monitor{parent = Pid, transport = TPid} = S) -> i({listen, LRef, APid, {Mod, Opts, Addrs}}) -> {[LA, LP], Rest} = proplists:split(Opts, [ip, port]), - LAddr = get_addr(LA, Addrs), + LAddrOpt = get_addr(LA, Addrs), LPort = get_port(LP), - {ok, LSock} = Mod:listen(LPort, gen_opts(LAddr, Rest)), + {ok, LSock} = Mod:listen(LPort, gen_opts(LAddrOpt, Rest)), + LAddr = laddr(LAddrOpt, Mod, LSock), true = diameter_reg:add_new({?MODULE, listener, {LRef, {LAddr, LSock}}}), proc_lib:init_ack({ok, self(), {LAddr, LSock}}), erlang:monitor(process, APid), start_timer(#listener{socket = LSock}). +laddr([], Mod, Sock) -> + {ok, {Addr, _Port}} = sockname(Mod, Sock), + Addr; +laddr([{ip, Addr}], _, _) -> + Addr. + own(Opts) -> {Own, Rest} = proplists:split(Opts, [fragment_timer]), {lists:append(Own), Rest}. @@ -225,17 +244,19 @@ ssl_opts([{ssl_options, Opts}]) ssl_opts(L) -> ?ERROR({ssl_options, L}). -%% i/7 +%% init/7 %% Establish a TLS connection before capabilities exchange ... -i(Type, Ref, Mod, Pid, true, Opts, Addrs) -> - i(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs); +init(Type, Ref, Mod, Pid, true, Opts, Addrs) -> + init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs); %% ... or not. -i(Type, Ref, Mod, Pid, _, Opts, Addrs) -> - i(Type, Ref, Mod, Pid, Opts, Addrs). +init(Type, Ref, Mod, Pid, _, Opts, Addrs) -> + init(Type, Ref, Mod, Pid, Opts, Addrs). + +%% init/6 -i(accept = T, Ref, Mod, Pid, Opts, Addrs) -> +init(accept = T, Ref, Mod, Pid, Opts, Addrs) -> {LAddr, LSock} = listener(Ref, {Mod, Opts, Addrs}), proc_lib:init_ack({ok, self(), [LAddr]}), Sock = ok(accept(Mod, LSock)), @@ -243,17 +264,28 @@ i(accept = T, Ref, Mod, Pid, Opts, Addrs) -> diameter_peer:up(Pid), Sock; -i(connect = T, Ref, Mod, Pid, Opts, Addrs) -> +init(connect = T, Ref, Mod, Pid, Opts, Addrs) -> {[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]), - LAddr = get_addr(LA, Addrs), - RAddr = get_addr(RA, []), + LAddrOpt = get_addr(LA, Addrs), + RAddr = get_addr(RA), RPort = get_port(RP), - proc_lib:init_ack({ok, self(), [LAddr]}), - Sock = ok(connect(Mod, RAddr, RPort, gen_opts(LAddr, Rest))), + proc_lib:init_ack(init_rc(LAddrOpt)), + Sock = ok(connect(Mod, RAddr, RPort, gen_opts(LAddrOpt, Rest))), publish(Mod, T, Ref, Sock), - diameter_peer:up(Pid, {RAddr, RPort}), + up(Pid, {RAddr, RPort}, LAddrOpt, Mod, Sock), Sock. +init_rc([{ip, Addr}]) -> + {ok, self(), [Addr]}; +init_rc([]) -> + {ok, self()}. + +up(Pid, Remote, [{ip, _Addr}], _, _) -> + diameter_peer:up(Pid, Remote); +up(Pid, Remote, [], Mod, Sock) -> + {Addr, _Port} = ok(sockname(Mod, Sock)), + diameter_peer:up(Pid, Remote, [Addr]). + publish(Mod, T, Ref, Sock) -> true = diameter_reg:add_new({?MODULE, T, {Ref, Sock}}), putr(?INFO_KEY, {Mod, Sock}). %% for info/1 @@ -281,10 +313,17 @@ l([], LRef, T) -> {ok, _, AS} = diameter_tcp_sup:start_child({listen, LRef, self(), T}), AS. +%% get_addr/1 + +get_addr(As) -> + diameter_lib:ipaddr(addr(As, [])). + %% get_addr/2 +get_addr([], []) -> + []; get_addr(As, Def) -> - diameter_lib:ipaddr(addr(As, Def)). + [{ip, diameter_lib:ipaddr(addr(As, Def))}]. %% Take the first address from the service if several are unspecified. addr([], [Addr | _]) -> @@ -305,14 +344,10 @@ get_port(Ps) -> %% gen_opts/2 -gen_opts(LAddr, Opts) -> +gen_opts(LAddrOpt, Opts) -> {L,_} = proplists:split(Opts, [binary, packet, active]), [[],[],[]] == L orelse ?ERROR({reserved_options, Opts}), - [binary, - {packet, 0}, - {active, once}, - {ip, LAddr} - | Opts]. + [binary, {packet, 0}, {active, once}] ++ LAddrOpt ++ Opts. %% --------------------------------------------------------------------------- %% # ports/1 |