diff options
Diffstat (limited to 'lib/diameter/src/base')
-rw-r--r-- | lib/diameter/src/base/diameter.app.src | 2 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter.appup.src | 24 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter.erl | 28 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_capx.erl | 8 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_codec.erl | 22 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_config.erl | 88 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer.erl | 143 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 508 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_reg.erl | 255 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 1328 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_session.erl | 14 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_stats.erl | 265 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 213 |
13 files changed, 1985 insertions, 913 deletions
diff --git a/lib/diameter/src/base/diameter.app.src b/lib/diameter/src/base/diameter.app.src index c092fdb022..7e17cd6c9f 100644 --- a/lib/diameter/src/base/diameter.app.src +++ b/lib/diameter/src/base/diameter.app.src @@ -21,7 +21,7 @@ [{description, "Diameter protocol"}, {vsn, "%VSN%"}, {modules, [%MODULES%]}, - {registered, []}, + {registered, [%REGISTERED%]}, {applications, [stdlib, kernel]}, {env, []}, {mod, {diameter_app, []}} diff --git a/lib/diameter/src/base/diameter.appup.src b/lib/diameter/src/base/diameter.appup.src index 2ebdad598f..a04a387918 100644 --- a/lib/diameter/src/base/diameter.appup.src +++ b/lib/diameter/src/base/diameter.appup.src @@ -2,7 +2,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2013. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -20,15 +20,21 @@ {"%VSN%", [ - {"0.9", [{restart_application, diameter}]}, - {"0.10", [{restart_application, diameter}]}, - {"1.0", [{update, diameter_service}, - {update, diameter_watchdog}]} + {"0.9", [{restart_application, diameter}]}, + {"0.10", [{restart_application, diameter}]}, + {"1.0", [{restart_application, diameter}]}, + {"1.1", [{restart_application, diameter}]}, + {"1.2", [{restart_application, diameter}]}, + {"1.2.1", [{restart_application, diameter}]}, + {"1.3", [{load_module, diameter_service}]} ], [ - {"0.9", [{restart_application, diameter}]}, - {"0.10", [{restart_application, diameter}]}, - {"1.0", [{update, diameter_watchdog}, - {update, diameter_service}]} + {"0.9", [{restart_application, diameter}]}, + {"0.10", [{restart_application, diameter}]}, + {"1.0", [{restart_application, diameter}]}, + {"1.1", [{restart_application, diameter}]}, + {"1.2", [{restart_application, diameter}]}, + {"1.2.1", [{restart_application, diameter}]}, + {"1.3", [{load_module, diameter_service}]} ] }. diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index 336f0c1f2d..8f9901907a 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -44,6 +44,8 @@ stop/0]). -export_type([evaluable/0, + restriction/0, + sequence/0, app_alias/0, service_name/0, capability/0, @@ -280,11 +282,23 @@ call(SvcName, App, Message) -> | fun() | maybe_improper_list(evaluable(), list()). +-type sequence() + :: {'Unsigned32'(), 0..32}. + +-type restriction() + :: false + | node + | nodes + | [node()] + | evaluable(). + %% Options passed to start_service/2 -type service_opt() :: capability() - | {application, [application_opt()]}. + | {application, [application_opt()]} + | {restrict_connections, restriction()} + | {sequence, sequence() | evaluable()}. -type application_opt() :: {alias, app_alias()} @@ -312,9 +326,12 @@ call(SvcName, App, Message) -> -type transport_opt() :: {transport_module, atom()} | {transport_config, any()} + | {transport_config, any(), non_neg_integer() | infinity} | {applications, [app_alias()]} | {capabilities, [capability()]} | {capabilities_cb, evaluable()} + | {capx_timeout, 'Unsigned32'()} + | {disconnect_cb, evaluable()} | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}} | {reconnect_timer, 'Unsigned32'()} | {private, any()}. @@ -322,10 +339,11 @@ call(SvcName, App, Message) -> %% Predicate passed to remove_transport/2 -type transport_pred() - :: fun((reference(), connect|listen, list()) -> boolean()) - | fun((reference(), list()) -> boolean()) + :: fun((transport_ref(), connect|listen, list()) -> boolean()) + | fun((transport_ref(), list()) -> boolean()) | fun((list()) -> boolean()) - | reference() + | transport_ref() + | boolean() | list() | {connect|listen, transport_pred()} | {atom(), atom(), list()}. diff --git a/lib/diameter/src/base/diameter_capx.erl b/lib/diameter/src/base/diameter_capx.erl index 6c4d60ee9b..c6c3d2934d 100644 --- a/lib/diameter/src/base/diameter_capx.erl +++ b/lib/diameter/src/base/diameter_capx.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -141,7 +141,9 @@ cap('Host-IP-Address', Vs) when is_list(Vs) -> lists:map(fun ipaddr/1, Vs); -cap('Firmware-Revision', V) -> +cap(K, V) + when K == 'Firmware-Revision'; + K == 'Origin-State-Id' -> [V]; cap(_, Vs) @@ -149,7 +151,7 @@ cap(_, Vs) Vs; cap(K, V) -> - ?THROW({invalid, K, V}). + ?THROW({invalid, {K,V}}). ipaddr(A) -> try diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl index fe1212b7e0..0b0bfe3f0a 100644 --- a/lib/diameter/src/base/diameter_codec.erl +++ b/lib/diameter/src/base/diameter_codec.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -63,9 +63,9 @@ encode(Mod, #diameter_packet{} = Pkt) -> e(Mod, Pkt) catch error: Reason -> - %% Be verbose rather than letting the emulator truncate the - %% error report. - X = {Reason, ?STACK}, + %% Be verbose since a crash report may be truncated and + %% encode errors are self-inflicted. + X = {?MODULE, encode, {Reason, ?STACK}}, diameter_lib:error_report(X, {?MODULE, encode, [Mod, Pkt]}), exit(X) end; @@ -91,7 +91,8 @@ e(_, #diameter_packet{msg = [#diameter_header{} = Hdr | As]} = Pkt) -> Flags = make_flags(0, Hdr), - Pkt#diameter_packet{bin = <<Vsn:8, Length:24, + Pkt#diameter_packet{header = Hdr, + bin = <<Vsn:8, Length:24, Flags:8, Code:24, Aid:32, Hid:32, @@ -192,9 +193,11 @@ encode_avps(Avps) -> msg_header(Mod, 'answer-message' = MsgName, Header) -> ?BASE = Mod, - #diameter_header{cmd_code = Code} = Header, - {_, Flags, ApplId} = ?BASE:msg_header(MsgName), - {Code, Flags, ApplId}; + #diameter_header{application_id = Aid, + cmd_code = Code} + = Header, + {-1, Flags, ?DIAMETER_APP_ID_COMMON} = ?BASE:msg_header(MsgName), + {Code, Flags, Aid}; msg_header(Mod, MsgName, _) -> Mod:msg_header(MsgName). @@ -332,6 +335,9 @@ decode_header(_) -> %% wraparound counter. The 8-bit counter is incremented each time the %% system is restarted. +sequence_numbers({_,_} = T) -> + T; + sequence_numbers(#diameter_packet{bin = Bin}) when is_binary(Bin) -> sequence_numbers(Bin); diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index 9253af0de2..63d28f25a2 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -97,6 +97,9 @@ -record(monitor, {mref = make_ref() :: reference(), service}). %% name +%% The default sequence mask. +-define(NOMASK, {0,32}). + %% Time to lay low before restarting a dead service. -define(RESTART_SLEEP, 2000). @@ -519,6 +522,7 @@ rm(SvcName, L) -> Refs = lists:map(fun(#transport{ref = R}) -> R end, L), case stop_transport(SvcName, Refs) of ok -> + diameter_stats:flush(Refs), lists:foreach(fun delete_object/1, L); {error, _} = No -> No @@ -548,9 +552,11 @@ make_config(SvcName, Opts) -> ok = encode_CER(COpts), - Os = split(Opts, [{[fun erlang:is_boolean/1], false, share_peers}, - {[fun erlang:is_boolean/1], false, use_shared_peers}, - {[fun erlang:is_pid/1, false], false, monitor}]), + Os = split(Opts, fun opt/2, [{false, share_peers}, + {false, use_shared_peers}, + {false, monitor}, + {?NOMASK, sequence}, + {nodes, restrict_connections}]), %% share_peers and use_shared_peers are currently undocumented. #service{name = SvcName, @@ -558,11 +564,66 @@ make_config(SvcName, Opts) -> capabilities = Caps}, options = Os}. +split(Opts, F, Defs) -> + [{K, F(K, get_opt(K, Opts, D))} || {D,K} <- Defs]. + +opt(K, false = B) + when K /= sequence -> + B; + +opt(K, true = B) + when K == share_peer; + K == use_shared_peers -> + B; + +opt(monitor, P) + when is_pid(P) -> + P; + +opt(restrict_connections, T) + when T == node; + T == nodes; + T == []; + is_atom(hd(T)) -> + T; + +opt(restrict_connections = K, F) -> + try diameter_lib:eval(F) of %% no guarantee that it won't fail later + Nodes when is_list(Nodes) -> + F; + V -> + ?THROW({value, {K,V}}) + catch + E:R -> + ?THROW({value, {K, E, R, ?STACK}}) + end; + +opt(sequence, {_,_} = T) -> + sequence(T); + +opt(sequence = K, F) -> + try diameter_lib:eval(F) of + T -> sequence(T) + catch + E:R -> + ?THROW({value, {K, E, R, ?STACK}}) + end; + +opt(K, _) -> + ?THROW({value, K}). + +sequence({H,N} = T) + when 0 =< N, N =< 32, 0 =< H, 0 == H bsr N -> + T; + +sequence(_) -> + ?THROW({value, sequence}). + make_caps(Caps, Opts) -> case diameter_capx:make_caps(Caps, Opts) of {ok, T} -> T; - {error, {Reason, _}} -> + {error, Reason} -> ?THROW(Reason) end. @@ -600,7 +661,7 @@ app_acc({application, Opts}, Acc) -> module = init_mod(Mod), init_state = ModS, mutable = init_mutable(M), - answer_errors = init_answers(A)} + options = [{answer_errors, init_answers(A)}]} | Acc]; app_acc(_, Acc) -> Acc. @@ -662,21 +723,6 @@ get_opt(Key, List, Def) -> _ -> ?THROW({arity, Key}) end. -split(Opts, Defs) -> - [{K, value(D, Opts)} || {_,_,K} = D <- Defs]. - -value({Preds, Def, Key}, Opts) -> - V = get_opt(Key, Opts, Def), - lists:any(fun(P) -> pred(P,V) end, Preds) - orelse ?THROW({value, Key}), - V. - -pred(F, V) - when is_function(F) -> - F(V); -pred(T, V) -> - T == V. - cb(M,F) -> try M:F() of V -> V diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index 3e78c4caef..1b2f32ddff 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -27,12 +27,15 @@ up/2]). %% ... and the stack. --export([start/3, +-export([start/1, send/2, close/1, abort/1, notify/2]). +%% Old interface only called from old code. +-export([start/3]). %% < diameter-1.2 (R15B02) + %% Server start. -export([start_link/0]). @@ -57,6 +60,11 @@ %% Server state. -record(state, {id = now()}). +%% Default transport_module/config. +-define(DEFAULT_TMOD, diameter_tcp). +-define(DEFAULT_TCFG, []). +-define(DEFAULT_TTMO, infinity). + %%% --------------------------------------------------------------------------- %%% # notify/2 %%% --------------------------------------------------------------------------- @@ -68,9 +76,119 @@ notify(SvcName, T) -> %%% # start/3 %%% --------------------------------------------------------------------------- -start(T, Opts, #diameter_service{} = Svc) -> - {Mod, Cfg} = split_transport(Opts), - apply(Mod, start, [T, Svc, Cfg]). +%% From old code: make it restart. +start(_T, _Opts, #diameter_service{}) -> + {error, restart}. + +%%% --------------------------------------------------------------------------- +%%% # start/1 +%%% --------------------------------------------------------------------------- + +-spec start({T, [Opt], #diameter_service{}}) + -> {TPid, [Addr], Tmo, Data} + | {error, [term()]} + when T :: {connect|accept, diameter:transport_ref()}, + Opt :: diameter:transport_opt(), + TPid :: pid(), + Addr :: inet:ip_address(), + Tmo :: non_neg_integer() | infinity, + Data :: {{T, Mod, Cfg}, [Mod], [{T, [Mod], Cfg}], [Err]}, + Mod :: module(), + Cfg :: term(), + Err :: term() + ; ({#diameter_service{}, Tmo, Data}) + -> {TPid, [Addr], Tmo, Data} + | {error, [term()]} + when TPid :: pid(), + Addr :: inet:ip_address(), + Tmo :: non_neg_integer() | infinity, + Data :: {{T, Mod, Cfg}, [Mod], [{T, [Mod], Cfg}], [Err]}, + T :: {connect|accept, diameter:transport_ref()}, + Mod :: module(), + Cfg :: term(), + Err :: term(). + +%% Initial start. +start({T, Opts, #diameter_service{} = Svc}) -> + start(T, Svc, pair(Opts, [], []), []); + +%% Subsequent start. +start({#diameter_service{} = Svc, Tmo, {{T, _, Cfg}, Ms, Rest, Errs}}) -> + start(T, Ms, Cfg, Svc, Tmo, Rest, Errs). + +%% pair/3 +%% +%% Pair transport modules with config. + +%% Another transport_module: accumulate it. +pair([{transport_module, M} | Rest], Mods, Acc) -> + pair(Rest, [M|Mods], Acc); + +%% Another transport_config: accumulate another tuple. +pair([{transport_config = T, C} | Rest], Mods, Acc) -> + pair([{T, C, ?DEFAULT_TTMO} | Rest], Mods, Acc); +pair([{transport_config, C, Tmo} | Rest], Mods, Acc) -> + pair(Rest, [], acc({Mods, C, Tmo}, Acc)); + +pair([_ | Rest], Mods, Acc) -> + pair(Rest, Mods, Acc); + +%% No transport_module or transport_config: defaults. +pair([], [], []) -> + [{[?DEFAULT_TMOD], ?DEFAULT_TCFG, ?DEFAULT_TTMO}]; + +%% One transport_module, one transport_config. +pair([], [M], [{[], Cfg, Tmo}]) -> + [{[M], Cfg, Tmo}]; + +%% Trailing transport_module: default transport_config. +pair([], [_|_] = Mods, Acc) -> + lists:reverse(acc({Mods, ?DEFAULT_TCFG, ?DEFAULT_TTMO}, Acc)); + +pair([], [], Acc) -> + lists:reverse(def(Acc)). + +%% acc/2 + +acc(T, Acc) -> + [T | def(Acc)]. + +%% def/1 +%% +%% Default module of previous pair if none were specified. + +def([{[], Cfg, Tmo} | Acc]) -> + [{[?DEFAULT_TMOD], Cfg, Tmo} | Acc]; +def(Acc) -> + Acc. + +%% start/4 + +start(T, Svc, [{Ms, Cfg, Tmo} | Rest], Errs) -> + start(T, Ms, Cfg, Svc, Tmo, Rest, Errs); + +start(_, _, [], Errs) -> + {error, Errs}. + +%% start/7 + +start(T, [], _, Svc, _, Rest, Errs) -> + start(T, Svc, Rest, Errs); + +start(T, [M|Ms], Cfg, Svc, Tmo, Rest, Errs) -> + case start(M, [T, Svc, Cfg]) of + {ok, TPid} -> + {TPid, [], Tmo, {{T, M, Cfg}, Ms, Rest, Errs}}; + {ok, TPid, [_|_] = Addrs} -> + {TPid, Addrs, Tmo, {{T, M, Cfg}, Ms, Rest, Errs}}; + E -> + start(T, Ms, Cfg, Svc, Tmo, Rest, [E|Errs]) + end. + +%% start/2 + +start(Mod, Args) -> + apply(Mod, start, Args). %%% --------------------------------------------------------------------------- %%% # up/[12] @@ -204,21 +322,6 @@ bang(undefined = No, _) -> bang(Pid, T) -> Pid ! T. -%% split_transport/1 -%% -%% Split options into transport module, transport config and -%% remaining options. - -split_transport(Opts) -> - {[M,C], _} = proplists:split(Opts, [transport_module, - transport_config]), - {value(M, diameter_tcp), value(C, [])}. - -value([{_,V}], _) -> - V; -value([], V) -> - V. - %% call/1 call(Request) -> diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 99644814d2..858870566f 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -48,37 +48,64 @@ -include("diameter_internal.hrl"). -include("diameter_gen_base_rfc3588.hrl"). +%% Values of Disconnect-Cause in DPR. -define(GOAWAY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_DO_NOT_WANT_TO_TALK_TO_YOU'). -define(REBOOT, ?'DIAMETER_BASE_DISCONNECT-CAUSE_REBOOTING'). +-define(BUSY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_BUSY'). -define(NO_INBAND_SECURITY, 0). -define(TLS, 1). +%% Keys in process dictionary. +-define(CB_KEY, cb). %% capabilities callback +-define(DPR_KEY, dpr). %% disconnect callback +-define(DWA_KEY, dwa). %% outgoing DWA +-define(REF_KEY, ref). %% transport_ref() +-define(Q_KEY, q). %% transport start queue +-define(START_KEY, start). %% start of connected transport +-define(SEQUENCE_KEY, mask). %% mask for sequence numbers +-define(RESTRICT_KEY, restrict). %% nodes for connection check + +%% The default sequence mask. +-define(NOMASK, {0,32}). + %% A 2xxx series Result-Code. Not necessarily 2001. -define(IS_SUCCESS(N), 2 == (N) div 1000). +%% Guards. +-define(IS_UINT32(N), (is_integer(N) andalso 0 =< N andalso 0 == N bsr 32)). +-define(IS_TIMEOUT(N), ?IS_UINT32(N)). +-define(IS_CAUSE(N), N == ?REBOOT; N == rebooting; + N == ?GOAWAY; N == goaway; + N == ?BUSY; N == busy). + %% RFC 3588: %% %% Timeout An application-defined timer has expired while waiting %% for some event. %% -define(EVENT_TIMEOUT, 10000). +%% Default timeout for reception of CER/CEA. -%% How long to wait for a DPA in response to DPR before simply -%% aborting. Used to distinguish between shutdown and not but there's -%% not really any need. Stopping a service will require a timeout if -%% the peer doesn't answer DPR so the value should be short-ish. +%% Default timeout for DPA in response to DPR. A bit short but the +%% timeout used to be hardcoded. (So it could be worse.) -define(DPA_TIMEOUT, 1000). +-type uint32() :: diameter:'Unsigned32'(). + -record(state, - {state = 'Wait-Conn-Ack' %% state of RFC 3588 Peer State Machine - :: 'Wait-Conn-Ack' | recv_CER | 'Wait-CEA' | 'Open', + {state %% of RFC 3588 Peer State Machine + :: 'Wait-Conn-Ack' %% old code + | {'Wait-Conn-Ack', uint32()} + | recv_CER + | 'Wait-CEA' %% old code + | {'Wait-CEA', uint32(), uint32()} + | 'Open', mode :: accept | connect | {connect, reference()}, - parent :: pid(), - transport :: pid(), + parent :: pid(), %% watchdog process + transport :: pid(), %% transport process service :: #diameter_service{}, - dpr = false :: false | {diameter:'Unsigned32'(), - diameter:'Unsigned32'()}}). + dpr = false :: false | {uint32(), uint32()}}). %% | hop by hop and end to end identifiers %% There are non-3588 states possible as a consequence of 5.6.1 of the @@ -115,16 +142,21 @@ %%% Output: Pid %%% --------------------------------------------------------------------------- +-spec start(T, [Opt], #diameter_service{} %% from old code + | {diameter:sequence(), + diameter:restriction(), + #diameter_service{}}) + -> pid() + when T :: {connect|accept, diameter:transport_ref()}, + Opt :: diameter:transport_opt(). + %% diameter_config requires a non-empty list of applications on the %% service but diameter_service then constrains the list to any %% specified on the transport in question. Check here that the list is %% still non-empty. -start({_, Ref} = Type, Opts, #diameter_service{applications = Apps} = Svc) -> - [] /= Apps orelse ?ERROR({no_apps, Type, Opts}), - T = {self(), Type, Opts, Svc}, - {ok, Pid} = diameter_peer_fsm_sup:start_child(T), - diameter_stats:reg(Pid, Ref), +start({_,_} = Type, Opts, MS) -> + {ok, Pid} = diameter_peer_fsm_sup:start_child({self(), Type, Opts, MS}), Pid. start_link(T) -> @@ -143,18 +175,31 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc0}) -> - putr(dwa, dwa(Caps)), +i({WPid, Type, Opts, #diameter_service{} = Svc}) -> %% from old code + i({WPid, Type, Opts, {?NOMASK, [node() | nodes()], Svc}}); + +i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, + capabilities = LCaps} + = Svc}}) -> + [] /= Apps orelse ?ERROR({no_apps, T, Opts}), + putr(?DWA_KEY, dwa(LCaps)), {M, Ref} = T, - {[Ts], Rest} = proplists:split(Opts, [capabilities_cb]), - putr(capabilities_cb, {Ref, [F || {_,F} <- Ts]}), - {ok, TPid, Svc} = start_transport(T, Rest, Svc0), - erlang:monitor(process, TPid), + diameter_stats:reg(Ref), + {[Cs,Ds], Rest} = proplists:split(Opts, [capabilities_cb, disconnect_cb]), + putr(?CB_KEY, {Ref, [F || {_,F} <- Cs]}), + putr(?DPR_KEY, [F || {_, F} <- Ds]), + putr(?REF_KEY, Ref), + putr(?SEQUENCE_KEY, Mask), + putr(?RESTRICT_KEY, Nodes), erlang:monitor(process, WPid), - #state{parent = WPid, + {TPid, Addrs} = start_transport(T, Rest, Svc), + Tmo = proplists:get_value(capx_timeout, Opts, ?EVENT_TIMEOUT), + ?IS_TIMEOUT(Tmo) orelse ?ERROR({invalid, {capx_timeout, Tmo}}), + #state{state = {'Wait-Conn-Ack', Tmo}, + parent = WPid, transport = TPid, mode = M, - service = Svc}. + service = svc(Svc, Addrs)}. %% The transport returns its local ip addresses so that different %% transports on the same service can use different local addresses. %% The local addresses are put into Host-IP-Address avps here when @@ -164,18 +209,56 @@ i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc0}) -> %% watchdog start (start/2) succeeds regardless so as not to crash the %% service. -start_transport(T, Opts, Svc) -> - case diameter_peer:start(T, Opts, Svc) of - {ok, TPid} -> - {ok, TPid, Svc}; - {ok, TPid, [_|_] = Addrs} -> - #diameter_service{capabilities = Caps0} = Svc, - Caps = Caps0#diameter_caps{host_ip_address = Addrs}, - {ok, TPid, Svc#diameter_service{capabilities = Caps}}; +start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) -> + Addrs0 = LCaps#diameter_caps.host_ip_address, + start_transport(Addrs0, {T, Opts, Svc}). + +start_transport(Addrs0, T) -> + case diameter_peer:start(T) of + {TPid, Addrs, Tmo, Data} -> + erlang:monitor(process, TPid), + q_next(TPid, Addrs0, Tmo, Data), + {TPid, addrs(Addrs, Addrs0)}; No -> exit({shutdown, No}) end. +addrs([], Addrs0) -> + Addrs0; +addrs(Addrs, _) -> + Addrs. + +svc(Svc, []) -> + Svc; +svc(Svc, Addrs) -> + readdr(Svc, Addrs). + +readdr(#diameter_service{capabilities = LCaps0} = Svc, Addrs) -> + LCaps = LCaps0#diameter_caps{host_ip_address = Addrs}, + Svc#diameter_service{capabilities = LCaps}. + +%% The 4-tuple Data returned from diameter_peer:start/1 identifies the +%% transport module/config use to start the transport process in +%% question as well as any alternates to try if a connection isn't +%% established within Tmo. +q_next(TPid, Addrs0, Tmo, {_,_,_,_} = Data) -> + send_after(Tmo, {connection_timeout, TPid}), + putr(?Q_KEY, {Addrs0, Tmo, Data}). + +%% Connection has been established: retain the started +%% pid/module/config in the process dictionary. This is a part of the +%% interface defined by this module, so that the transport pid can be +%% found when constructing service_info (in order to extract further +%% information from it). +keep_transport(TPid) -> + {_, _, {{_,_,_} = T, _, _, _}} = eraser(?Q_KEY), + putr(?START_KEY, {TPid, T}). + +send_after(infinity, _) -> + ok; +send_after(Tmo, T) -> + erlang:send_after(Tmo, self(), T). + %% handle_call/3 handle_call(_, _, State) -> @@ -202,14 +285,27 @@ handle_info(T, #state{} = State) -> ?LOG(stop, T), x(T, State) catch + exit: {diameter_codec, encode, _} = Reason -> + close_wd(Reason, State#state.parent), + ?LOG(stop, Reason), + %% diameter_codec:encode/2 emits an error report. Only + %% indicate the probable reason here. + diameter_lib:info_report(probable_configuration_error, + insufficient_capabilities), + {stop, {shutdown, Reason}, State}; {?MODULE, Tag, Reason} -> ?LOG(Tag, {Reason, T}), {stop, {shutdown, Reason}, State} end. -%% The form of the exception caught here is historical. It's +%% The form of the throw caught here is historical. It's %% significant that it's not a 2-tuple, as in ?FAILURE(Reason), %% since these are caught elsewhere. +%% Note that there's no guarantee that the service and transport +%% capabilities are good enough to build a CER/CEA that can be +%% succesfully encoded. It's not checked at diameter:add_transport/2 +%% since this can be called before creating the service. + x(Reason, #state{} = S) -> close_wd(Reason, S), {stop, {shutdown, Reason}, S}. @@ -238,34 +334,62 @@ eraser(Key) -> %% transition/2 +%% Started in old code. +transition(T, #state{state = 'Wait-Conn-Ack' = PS} = S) -> + transition(T, S#state{state = {PS, ?EVENT_TIMEOUT}}); + %% Connection to peer. transition({diameter, {TPid, connected, Remote}}, - #state{state = PS, + #state{transport = TPid, + state = PS, mode = M} = S) -> - 'Wait-Conn-Ack' = PS, %% assert + {'Wait-Conn-Ack', _} = PS, %% assert connect = M, %% - send_CER(S#state{mode = {M, Remote}, - transport = TPid}); + keep_transport(TPid), + send_CER(S#state{mode = {M, Remote}}); %% Connection from peer. transition({diameter, {TPid, connected}}, - #state{state = PS, + #state{transport = TPid, + state = PS, mode = M, parent = Pid} = S) -> - 'Wait-Conn-Ack' = PS, %% assert + {'Wait-Conn-Ack', Tmo} = PS, %% assert accept = M, %% + keep_transport(TPid), Pid ! {accepted, self()}, - start_timer(S#state{state = recv_CER, - transport = TPid}); + start_timer(Tmo, S#state{state = recv_CER}); + +%% Connection established after receiving a connection_timeout +%% message. This may be followed by an incoming message which arrived +%% before the transport was killed and this can't be distinguished +%% from one from the transport that's been started to replace it. +transition({diameter, {_, connected}}, _) -> + {stop, connection_timeout}; +transition({diameter, {_, connected, _}}, _) -> + {stop, connection_timeout}; + +%% Connection has timed out: start an alternate. +transition({connection_timeout = T, TPid}, + #state{transport = TPid, + state = {'Wait-Conn-Ack', _}} + = S) -> + exit(TPid, {shutdown, T}), + start_next(S); + +%% Connect timeout after connection or alternate start: ignore. +transition({connection_timeout, _}, _) -> + ok; %% Incoming message from the transport. transition({diameter, {recv, Pkt}}, S) -> recv(Pkt, S); %% Timeout when still in the same state ... -transition({timeout, PS}, #state{state = PS}) -> +transition({timeout = T, PS}, #state{state = PS} = S) -> + close({capx(PS), T}, S), stop; %% ... or not. @@ -277,25 +401,19 @@ transition({send, Msg}, #state{transport = TPid}) -> send(TPid, Msg), ok; -%% Request for graceful shutdown. -transition({shutdown, Pid}, #state{parent = Pid, dpr = false} = S) -> - dpr(?GOAWAY, S); -transition({shutdown, Pid}, #state{parent = Pid}) -> +%% Messages from old (diameter_service) code. +transition(shutdown = T, #state{parent = Pid} = S) -> + transition({T, Pid, service}, S); %% Reason irrelevant: old code has no cb + +%% Request for graceful shutdown at remove_transport, stop_service of +%% application shutdown. +transition({shutdown = T, Pid}, S) -> + transition({T, Pid, transport}, S); +transition({shutdown, Pid, Reason}, #state{parent = Pid, dpr = false} = S) -> + dpr(Reason, S); +transition({shutdown, Pid, _}, #state{parent = Pid}) -> ok; -%% Application shutdown. -transition(shutdown, #state{dpr = false} = S) -> - dpr(?REBOOT, S); -transition(shutdown, _) -> %% DPR already send: ensure expected timeout - dpa_timer(), - ok; - -%% Request to close the transport connection. -transition({close = T, Pid}, #state{parent = Pid, - transport = TPid}) -> - diameter_peer:close(TPid), - {stop, T}; - %% DPA reception has timed out. transition(dpa_timeout, _) -> stop; @@ -305,14 +423,21 @@ transition({resolve_port, _Pid} = T, #state{transport = TPid}) -> TPid ! T, ok; -%% Parent or transport has died. -transition({'DOWN', _, process, P, _}, - #state{parent = Pid, - transport = TPid}) - when P == Pid; - P == TPid -> +%% Parent has died. +transition({'DOWN', _, process, WPid, _}, + #state{parent = WPid}) -> stop; +%% Transport has died before connection timeout. +transition({'DOWN', _, process, TPid, _}, + #state{transport = TPid} + = S) -> + start_next(S); + +%% Transport has died after connection timeout. +transition({'DOWN', _, process, _, _}, _) -> + ok; + %% State query. transition({state, Pid}, #state{state = S, transport = TPid}) -> Pid ! {self(), [S, TPid]}, @@ -320,20 +445,43 @@ transition({state, Pid}, #state{state = S, transport = TPid}) -> %% Crash on anything unexpected. +capx(recv_CER) -> + 'CER'; +capx({'Wait-CEA', _, _}) -> + 'CEA'. + +%% start_next/1 + +start_next(#state{service = Svc0} = S) -> + case getr(?Q_KEY) of + {Addrs0, Tmo, Data} -> + Svc = readdr(Svc0, Addrs0), + {TPid, Addrs} = start_transport(Addrs0, {Svc, Tmo, Data}), + S#state{transport = TPid, + service = svc(Svc, Addrs)}; + undefined -> + stop + end. + %% send_CER/1 -send_CER(#state{mode = {connect, Remote}, - service = #diameter_service{capabilities = Caps}, +send_CER(#state{state = {'Wait-Conn-Ack', Tmo}, + mode = {connect, Remote}, + service = #diameter_service{capabilities = LCaps}, transport = TPid} = S) -> - OH = Caps#diameter_caps.origin_host, + OH = LCaps#diameter_caps.origin_host, req_send_CER(OH, Remote) orelse - close({already_connected, Remote, Caps}, S), + close({already_connected, Remote, LCaps}, S), CER = build_CER(S), ?LOG(send, 'CER'), - send(TPid, encode(CER)), - start_timer(S#state{state = 'Wait-CEA'}). + #diameter_packet{header = #diameter_header{end_to_end_id = Eid, + hop_by_hop_id = Hid}} + = Pkt + = encode(CER), + send(TPid, Pkt), + start_timer(Tmo, S#state{state = {'Wait-CEA', Hid, Eid}}). %% Register ourselves as connecting to the remote endpoint in %% question. This isn't strictly necessary since a peer implementing @@ -345,23 +493,36 @@ send_CER(#state{mode = {connect, Remote}, req_send_CER(OriginHost, Remote) -> register_everywhere({?MODULE, connection, OriginHost, {remote, Remote}}). -%% start_timer/1 +%% start_timer/2 -start_timer(#state{state = PS} = S) -> - erlang:send_after(?EVENT_TIMEOUT, self(), {timeout, PS}), +start_timer(Tmo, #state{state = PS} = S) -> + erlang:send_after(Tmo, self(), {timeout, PS}), S. %% build_CER/1 -build_CER(#state{service = #diameter_service{capabilities = Caps}}) -> - {ok, CER} = diameter_capx:build_CER(Caps), +build_CER(#state{service = #diameter_service{capabilities = LCaps}}) -> + {ok, CER} = diameter_capx:build_CER(LCaps), CER. %% encode/1 encode(Rec) -> - #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Rec), - Bin. + Seq = diameter_session:sequence(sequence()), + Hdr = #diameter_header{version = ?DIAMETER_VERSION, + end_to_end_id = Seq, + hop_by_hop_id = Seq}, + diameter_codec:encode(?BASE, #diameter_packet{header = Hdr, + msg = Rec}). + +sequence() -> + case getr(?SEQUENCE_KEY) of + {_,_} = Mask -> + Mask; + undefined -> %% started in old code + putr(?SEQUENCE_KEY, ?NOMASK), + ?NOMASK + end. %% recv/2 @@ -420,7 +581,14 @@ discard(Reason, F, A) -> %% rcv/3 %% Incoming CEA. -rcv('CEA', Pkt, #state{state = 'Wait-CEA'} = S) -> +rcv('CEA', + #diameter_packet{header = #diameter_header{end_to_end_id = Eid, + hop_by_hop_id = Hid}} + = Pkt, + #state{state = {'Wait-CEA' = T, Hid, Eid}} + = S) -> + handle_CEA(Pkt, S#state{state = T}); +rcv('CEA', Pkt, #state{state = 'Wait-CEA'} = S) -> %% old code handle_CEA(Pkt, S); %% Incoming CER @@ -440,16 +608,16 @@ rcv(N, Pkt, S) N == 'DPR' -> handle_request(N, Pkt, S); -%% DPA even though we haven't sent DPR: ignore. -rcv('DPA', _Pkt, #state{dpr = false}) -> - ok; - -%% DPA in response to DPR. We could check the sequence numbers but -%% don't bother, just close. -rcv('DPA' = N, _Pkt, #state{transport = TPid}) -> +%% DPA in response to DPR and with the expected identifiers. +rcv('DPA' = N, + #diameter_packet{header = #diameter_header{end_to_end_id = Eid, + hop_by_hop_id = Hid}}, + #state{transport = TPid, + dpr = {Hid, Eid}}) -> diameter_peer:close(TPid), {stop, N}; +%% Ignore anything else, an unsolicited DPA in particular. rcv(_, _, _) -> ok. @@ -649,7 +817,7 @@ rc([RC|_]) -> %% answer/2 answer('DWR', _) -> - getr(dwa); + getr(?DWA_KEY); answer(Name, #state{service = #diameter_service{capabilities = Caps}}) -> a(Name, Caps). @@ -667,8 +835,8 @@ a('CER', #diameter_caps{vendor_id = Vid, {'Product-Name', Name}, {'Origin-State-Id', OSI}]; -a('DPR', #diameter_caps{origin_host = Host, - origin_realm = Realm}) -> +a('DPR', #diameter_caps{origin_host = {Host, _}, + origin_realm = {Realm, _}}) -> ['DPA', {'Origin-Host', Host}, {'Origin-Realm', Realm}]. @@ -749,15 +917,15 @@ caps(#state{service = Svc}) -> %% caps_cb/1 caps_cb(Caps) -> - {Ref, Ts} = eraser(capabilities_cb), - ccb(Ts, [Ref, Caps]). + {Ref, Ts} = eraser(?CB_KEY), + caps_cb(Ts, [Ref, Caps]). -ccb([], _) -> +caps_cb([], _) -> ok; -ccb([F | Rest], T) -> +caps_cb([F | Rest], T) -> case diameter_lib:eval([F|T]) of ok -> - ccb(Rest, T); + caps_cb(Rest, T); N when ?IS_SUCCESS(N) -> %% 2xxx result code: accept immediately N; Res -> @@ -776,7 +944,9 @@ rejected(N) %% open/5 -open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid} = S) -> +open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid, + service = Svc} + = S) -> #diameter_caps{origin_host = {_,_} = H, inband_security_id = {LS,_}} = Caps, @@ -784,7 +954,9 @@ open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid} = S) -> tls_ack(lists:member(?TLS, LS), Caps, Type, IS, S), Pid ! {open, self(), H, {Caps, SupportedApps, Pkt}}, - S#state{state = 'Open'}. + %% Replace capabilities record with local/remote pairs. + S#state{state = 'Open', + service = Svc#diameter_service{capabilities = Caps}}. %% We've advertised TLS support: tell the transport the result %% and expect a reply when the handshake is complete. @@ -837,38 +1009,148 @@ dwa(#diameter_caps{origin_host = OH, {'Origin-State-Id', OSI}]. %% dpr/2 +%% +%% The RFC isn't clear on whether DPR should be send in a non-Open +%% state. The Peer State Machine transitions it documents aren't +%% exhaustive (no Stop in Wait-I-CEA for example) so assume it's up to +%% the implementation and transition to Closed (ie. die) if we haven't +%% yet reached Open. + +%% Connection is open, DPR has not been sent. +dpr(Reason, #state{state = 'Open', + dpr = false, + service = #diameter_service{capabilities = Caps}} + = S) -> + case getr(?DPR_KEY) of + CBs when is_list(CBs) -> + Ref = getr(?REF_KEY), + Peer = {self(), Caps}, + dpr(CBs, [Reason, Ref, Peer], S); + undefined -> %% started in old code + send_dpr(Reason, [], S) + end; -dpr(Cause, #state{transport = TPid, - service = #diameter_service{capabilities = Caps}} - = S) -> - #diameter_caps{origin_host = OH, - origin_realm = OR} +%% Connection is open, DPR already sent. +dpr(_, #state{state = 'Open'}) -> + ok; + +%% Connection not open. +dpr(_Reason, _S) -> + stop. + +%% dpr/3 +%% +%% Note that an implementation that wants to do something +%% transport_module-specific can lookup the pid of the transport +%% process and contact it. (eg. diameter:service_info/2) + +dpr([CB|Rest], [Reason | _] = Args, S) -> + try diameter_lib:eval([CB | Args]) of + {dpr, Opts} when is_list(Opts) -> + send_dpr(Reason, Opts, S); + dpr -> + send_dpr(Reason, [], S); + close = T -> + {stop, {disconnect_cb, T}}; + ignore -> + dpr(Rest, Args, S); + T -> + No = {disconnect_cb, T}, + diameter_lib:error_report(invalid, No), + {stop, No} + catch + E:R -> + No = {disconnect_cb, E, R, ?STACK}, + diameter_lib:error_report(failure, No), + {stop, No} + end; + +dpr([], [Reason | _], S) -> + send_dpr(Reason, [], S). + +-record(opts, {cause, timeout = ?DPA_TIMEOUT}). + +send_dpr(Reason, Opts, #state{transport = TPid, + service = #diameter_service{capabilities = Caps}} + = S) -> + #opts{cause = Cause, timeout = Tmo} + = lists:foldl(fun opt/2, + #opts{cause = case Reason of + transport -> ?GOAWAY; + _ -> ?REBOOT + end, + timeout = ?DPA_TIMEOUT}, + Opts), + #diameter_caps{origin_host = {OH, _}, + origin_realm = {OR, _}} = Caps, - Bin = encode(['DPR', {'Origin-Host', OH}, + #diameter_packet{header = #diameter_header{end_to_end_id = Eid, + hop_by_hop_id = Hid}} + = Pkt + = encode(['DPR', {'Origin-Host', OH}, {'Origin-Realm', OR}, {'Disconnect-Cause', Cause}]), - send(TPid, Bin), - dpa_timer(), + send(TPid, Pkt), + dpa_timer(Tmo), ?LOG(send, 'DPR'), - S#state{dpr = diameter_codec:sequence_numbers(Bin)}. - -dpa_timer() -> - erlang:send_after(?DPA_TIMEOUT, self(), dpa_timeout). + S#state{dpr = {Hid, Eid}}. + +opt({timeout, Tmo}, Rec) + when ?IS_TIMEOUT(Tmo) -> + Rec#opts{timeout = Tmo}; +opt({cause, Cause}, Rec) + when ?IS_CAUSE(Cause) -> + Rec#opts{cause = cause(Cause)}; +opt(T, _) -> + ?ERROR({invalid_option, T}). + +cause(rebooting) -> ?REBOOT; +cause(goaway) -> ?GOAWAY; +cause(busy) -> ?BUSY; +cause(N) + when ?IS_CAUSE(N) -> + N; +cause(N) -> + ?ERROR({invalid_cause, N}). + +dpa_timer(Tmo) -> + erlang:send_after(Tmo, self(), dpa_timeout). %% register_everywhere/1 %% %% Register a term and ensure it's not registered elsewhere. Note that %% two process that simultaneously register the same term may well %% both fail to do so this isn't foolproof. +%% +%% Everywhere is no longer everywhere, it's where a +%% restrict_connections service_opt() specifies. register_everywhere(T) -> - diameter_reg:add_new(T) - andalso unregistered(T). + reg(getr(?RESTRICT_KEY), T). + +reg(Nodes, T) -> + add(lists:member(node(), Nodes), T) andalso unregistered(Nodes, T). + +add(true, T) -> + diameter_reg:add_new(T); +add(false, T) -> + diameter_reg:add(T). + +%% unregistered +%% +%% Ensure that the term in question isn't registered on other nodes. + +unregistered(Nodes, T) -> + {ResL, _} = rpc:multicall(Nodes, ?MODULE, match, [{node(), T}]), + lists:all(fun nomatch/1, ResL). + +nomatch({badrpc, {'EXIT', {undef, _}}}) -> %% no diameter on remote node + true; +nomatch(L) -> + [] == L. -unregistered(T) -> - {ResL, _} = rpc:multicall(?MODULE, match, [{node(), T}]), - lists:all(fun(L) -> [] == L end, ResL). +%% match/1 match({Node, _}) when Node == node() -> diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 882b9da238..619b12ecad 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -30,7 +30,8 @@ add_new/1, del/1, repl/2, - match/1]). + match/1, + wait/1]). -export([start_link/0]). @@ -65,27 +66,22 @@ %% Table entry containing the Term -> Pid mapping. -define(MAPPING(Term, Pid), {Term, Pid}). --record(state, {id = now()}). - -%%% ---------------------------------------------------------- -%%% # add(T) -%%% -%%% Input: Term = term() -%%% -%%% Output: true -%%% -%%% Description: Associate the specified term with self(). The list of pids -%%% having this or other assocations can be retrieved using -%%% match/1. -%%% -%%% An association is removed when the calling process dies -%%% or as a result of calling del/1. Adding the same term -%%% more than once is equivalent to adding it exactly once. -%%% -%%% Note that since match/1 takes a pattern as argument, -%%% specifying a term that contains match variables is -%%% probably not a good idea -%%% ---------------------------------------------------------- +-record(state, {id = now(), + q = []}). %% [{From, Pat}] + +%% =========================================================================== +%% # add(T) +%% +%% Associate the specified term with self(). The list of pids having +%% this or other assocations can be retrieved using match/1. +%% +%% An association is removed when the calling process dies or as a +%% result of calling del/1. Adding the same term more than once is +%% equivalent to adding it exactly once. +%% +%% Note that since match/1 takes a pattern as argument, specifying a +%% term that contains match variables is probably not a good idea +%% =========================================================================== -spec add(any()) -> true. @@ -93,17 +89,12 @@ add(T) -> call({add, fun ets:insert/2, T, self()}). -%%% ---------------------------------------------------------- -%%% # add_new(T) -%%% -%%% Input: T = term() -%%% -%%% Output: true | false -%%% -%%% Description: Like add/1 but only one process is allowed to have the -%%% the association, false being returned if an association -%%% already exists. -%%% ---------------------------------------------------------- +%% =========================================================================== +%% # add_new(T) +%% +%% Like add/1 but only one process is allowed to have the the +%% association, false being returned if an association already exists. +%% =========================================================================== -spec add_new(any()) -> boolean(). @@ -111,16 +102,12 @@ add(T) -> add_new(T) -> call({add, fun insert_new/2, T, self()}). -%%% ---------------------------------------------------------- -%%% # repl(T, NewT) -%%% -%%% Input: T, NewT = term() -%%% -%%% Output: true | false -%%% -%%% Description: Like add/1 but only replace an existing association on T, -%%% false being returned if it doesn't exist. -%%% ---------------------------------------------------------- +%% =========================================================================== +%% # repl(T, NewT) +%% +%% Like add/1 but only replace an existing association on T, false +%% being returned if it doesn't exist. +%% =========================================================================== -spec repl(any(), any()) -> boolean(). @@ -128,15 +115,11 @@ add_new(T) -> repl(T, U) -> call({repl, T, U, self()}). -%%% ---------------------------------------------------------- -%%% # del(Term) -%%% -%%% Input: Term = term() -%%% -%%% Output: true -%%% -%%% Description: Remove any existing association of Term with self(). -%%% ---------------------------------------------------------- +%% =========================================================================== +%% # del(Term) +%% +%% Remove any existing association of Term with self(). +%% =========================================================================== -spec del(any()) -> true. @@ -144,20 +127,16 @@ repl(T, U) -> del(T) -> call({del, T, self()}). -%%% ---------------------------------------------------------- -%%% # match(Pat) -%%% -%%% Input: Pat = pattern in the sense of ets:match_object/2. -%%% -%%% Output: list of {Term, Pid} -%%% -%%% Description: Return the list of associations whose Term, as specified -%%% to add/1 or add_new/1, matches the specified pattern. -%%% -%%% Note that there's no guarantee that the returned processes -%%% are still alive. (Although one that isn't will soon have -%%% its associations removed.) -%%% ---------------------------------------------------------- +%% =========================================================================== +%% # match(Pat) +%% +%% Return the list of associations whose Term, as specified to add/1 +%% or add_new/1, matches the specified pattern. +%% +%% Note that there's no guarantee that the returned processes are +%% still alive. (Although one that isn't will soon have its +%% associations removed.) +%% =========================================================================== -spec match(tuple()) -> [{term(), pid()}]. @@ -165,9 +144,17 @@ del(T) -> match(Pat) -> ets:match_object(?TABLE, ?MAPPING(Pat, '_')). -%% --------------------------------------------------------- -%% EXPORTED INTERNAL FUNCTIONS -%% --------------------------------------------------------- +%% =========================================================================== +%% # wait(Pat) +%% +%% Like match/1 but return only when the result is non-empty or fails. +%% It's up to the caller to ensure that the wait won't be forever. +%% =========================================================================== + +wait(Pat) -> + call({wait, Pat}). + +%% =========================================================================== start_link() -> ServerName = {local, ?SERVER}, @@ -182,7 +169,7 @@ uptime() -> %% pids/0 %% -%% Output: list of {Pid, [Term, ...]} +%% Return: list of {Pid, [Term, ...]} pids() -> to_list(fun swap/1). @@ -202,89 +189,100 @@ id(T) -> T. %% terms/0 %% -%% Output: list of {Term, [Pid, ...]} +%% Return: list of {Term, [Pid, ...]} terms() -> to_list(fun id/1). swap({X,Y}) -> {Y,X}. -%%% ---------------------------------------------------------- -%%% # init(Role) -%%% -%%% Output: {ok, State} -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # init/1 +%% ---------------------------------------------------------- init(_) -> ets:new(?TABLE, [bag, named_table]), {ok, #state{}}. -%%% ---------------------------------------------------------- -%%% # handle_call(Request, From, State) -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # handle_call/3 +%% ---------------------------------------------------------- -handle_call({add, Fun, Key, Pid}, _, State) -> +handle_call(Req, From, S) + when not is_record(S, state) -> + handle_call(Req, From, upgrade(S)); + +handle_call({add, Fun, Key, Pid}, _, S) -> B = Fun(?TABLE, {Key, Pid}), monitor(B andalso no_monitor(Pid), Pid), - {reply, B, State}; + {reply, B, pending(B, S)}; -handle_call({del, Key, Pid}, _, State) -> - {reply, ets:delete_object(?TABLE, ?MAPPING(Key, Pid)), State}; +handle_call({del, Key, Pid}, _, S) -> + {reply, ets:delete_object(?TABLE, ?MAPPING(Key, Pid)), S}; -handle_call({repl, T, U, Pid}, _, State) -> +handle_call({repl, T, U, Pid}, _, S) -> MatchSpec = [{?MAPPING('$1', Pid), [{'=:=', '$1', {const, T}}], ['$_']}], - {reply, repl(ets:select(?TABLE, MatchSpec), U, Pid), State}; + {reply, repl(ets:select(?TABLE, MatchSpec), U, Pid), S}; + +handle_call({wait, Pat}, From, #state{q = Q} = S) -> + case find(Pat) of + {ok, L} -> + {reply, L, S}; + false -> + {noreply, S#state{q = [{From, Pat} | Q]}} + end; -handle_call(state, _, State) -> - {reply, State, State}; +handle_call(state, _, S) -> + {reply, S, S}; -handle_call(uptime, _, #state{id = Time} = State) -> - {reply, diameter_lib:now_diff(Time), State}; +handle_call(uptime, _, #state{id = Time} = S) -> + {reply, diameter_lib:now_diff(Time), S}; -handle_call(Req, From, State) -> +handle_call(Req, From, S) -> ?UNEXPECTED([Req, From]), - {reply, nok, State}. + {reply, nok, S}. -%%% ---------------------------------------------------------- -%%% # handle_cast(Request, State) -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # handle_cast/2 +%% ---------------------------------------------------------- -handle_cast(Msg, State)-> +handle_cast(Msg, S)-> ?UNEXPECTED([Msg]), - {noreply, State}. + {noreply, S}. -%%% ---------------------------------------------------------- -%%% # handle_info(Request, State) -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # handle_info/2 +%% ---------------------------------------------------------- -handle_info({'DOWN', MRef, process, Pid, _}, State) -> +handle_info({'DOWN', MRef, process, Pid, _}, S) -> ets:delete_object(?TABLE, ?MONITOR(Pid, MRef)), ets:match_delete(?TABLE, ?MAPPING('_', Pid)), - {noreply, State}; + {noreply, S}; -handle_info(Info, State) -> +handle_info(Info, S) -> ?UNEXPECTED([Info]), - {noreply, State}. + {noreply, S}. -%%% ---------------------------------------------------------- -%%% # terminate(Reason, State) -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # terminate/2 +%% ---------------------------------------------------------- terminate(_Reason, _State)-> ok. -%%% ---------------------------------------------------------- -%%% # code_change(OldVsn, State, Extra) -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # code_change/3 +%% ---------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}. -%% --------------------------------------------------------- -%% INTERNAL FUNCTIONS -%% --------------------------------------------------------- +%% =========================================================================== + +upgrade(S) -> + #state{} = list_to_tuple(tuple_to_list(S) ++ [[]]). monitor(true, Pid) -> ets:insert(?TABLE, ?MONITOR(Pid, erlang:monitor(process, Pid))); @@ -321,6 +319,37 @@ repl([?MAPPING(_, Pid) = M], Key, Pid) -> repl([], _, _) -> false. +%% pending/1 + +pending(true, #state{q = [_|_] = Q} = S) -> + S#state{q = q(lists:reverse(Q), [])}; %% retain reply order +pending(_, S) -> + S. + +q([], Q) -> + Q; +q([{From, Pat} = T | Rest], Q) -> + case find(Pat) of + {ok, L} -> + gen_server:reply(From, L), + q(Rest, Q); + false -> + q(Rest, [T|Q]) + end. + +%% find/1 + +find(Pat) -> + try match(Pat) of + [] -> + false; + L -> + {ok, L} + catch + _:_ -> + {ok, []} + end. + %% call/1 call(Request) -> diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 3dfdcee2b2..b5584ca0d0 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2013. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -43,8 +43,7 @@ subscriptions/0, services/0, services/1, - whois/1, - flush_stats/1]). + whois/1]). %% test/debug -export([call_module/3, @@ -65,13 +64,33 @@ -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). +%% The states mirrored by peer_up/peer_down callbacks. -define(STATE_UP, up). -define(STATE_DOWN, down). +-type op_state() :: ?STATE_UP + | ?STATE_DOWN. + +%% The RFC 3539 watchdog states that are now maintained, albeit +%% along with the old up/down. okay = up, else down. +-define(WD_INITIAL, initial). +-define(WD_OKAY, okay). +-define(WD_SUSPECT, suspect). +-define(WD_DOWN, down). +-define(WD_REOPEN, reopen). + +-type wd_state() :: ?WD_INITIAL + | ?WD_OKAY + | ?WD_SUSPECT + | ?WD_DOWN + | ?WD_REOPEN. + -define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1 -define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests -define(RESTART_TC, 1000). %% if restart was this recent +-define(RELAY, ?DIAMETER_DICT_RELAY). + %% Used to be able to swap this with anything else dict-like but now %% rely on the fact that a service's #state{} record does not change %% in storing in it ?STATE table and not always going through the @@ -88,6 +107,12 @@ %% process. -define(STATE_TABLE, ?MODULE). +%% The default sequence mask. +-define(NOMASK, {0,32}). + +%% The default restrict_connections. +-define(RESTRICT, nodes). + %% Workaround for dialyzer's lack of understanding of match specs. -type match(T) :: T | '_' | '$1' | '$2' | '$3' | '$4'. @@ -95,15 +120,18 @@ %% State of service gen_server. -record(state, {id = now(), - service_name, %% as passed to start_service/2, key in ?STATE_TABLE + service_name, %% as passed to start_service/2, key in ?STATE_TABLE service :: #diameter_service{}, - peerT = ets_new(peers) :: ets:tid(), %% #peer{} at start_fsm - connT = ets_new(conns) :: ets:tid(), %% #conn{} at connection_up - share_peers = false :: boolean(), %% broadcast peers to remote nodes? - use_shared_peers = false :: boolean(), %% use broadcasted peers? + peerT = ets_new(peers) :: ets:tid(),%% #peer{} at start_fsm + connT = ets_new(conns) :: ets:tid(),%% #conn{} at connection_up/reopen shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] - monitor = false :: false | pid()}). %% process to die with + monitor = false :: false | pid(), %% process to die with + options + :: [{sequence, diameter:sequence()} %% sequence mask + | {restrict_connections, diameter:restriction()} + | {share_peers, boolean()} %% broadcast peers to remote nodes? + | {use_shared_peers, boolean()}]}).%% use broadcasted peers? %% shared_peers reflects the peers broadcast from remote nodes. Note %% that the state term itself doesn't change, which is relevant for %% the stateless application callbacks since the state is retrieved @@ -111,18 +139,30 @@ %% service record is used to determine whether or not we need to call %% the process for a pick_peer callback. -%% Record representing a watchdog process. +%% Record representing a watchdog process as implemented by +%% diameter_watchdog. The term "peer" here is historical, made +%% especially confusing by the fact that a peer_ref() in the +%% documentation is the key of a #conn{} record, not a #peer{} record. +%% The name is also unfortunate given the meaning of peer in the +%% Diameter sense. -record(peer, {pid :: match(pid()), type :: match(connect | accept), ref :: match(reference()), %% key into diameter_config options :: match([diameter:transport_opt()]),%% from start_transport - op_state = ?STATE_DOWN :: match(?STATE_DOWN | ?STATE_UP), + op_state = {?STATE_DOWN, ?WD_INITIAL} + :: match(op_state() | {op_state(), wd_state()}), started = now(), %% at process start conn = false :: match(boolean() | pid())}). - %% true at accept, pid() at connection_up (connT key) - -%% Record representing a peer_fsm process. + %% true at accepted, pid() at connection_up or reopen + +%% Record representing a peer process as implemented by +%% diameter_peer_fsm. The term "conn" is historical. Despite the name +%% here, comments refer to watchdog and peer processes, that are keys +%% in #peer{} and #conn{} records respectively. To add to the +%% confusion, a #request.transport is a peer process = key in a +%% #conn{} record. The actual transport process (that the peer process +%% knows about and that has a transport connection) isn't seen here. -record(conn, {pid :: pid(), apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias} @@ -136,10 +176,9 @@ handler :: match(pid()), %% request process transport :: match(pid()), %% peer process caps :: match(#diameter_caps{}), - app :: match(diameter:app_alias()), %% #diameter_app.alias - dictionary :: match(module()), %% #diameter_app.dictionary - module :: match([module() | list()]), - %% #diameter_app.module + app :: match(diameter:app_alias()),%% #diameter_app.alias + dictionary :: match(module()), %% #diameter_app.dictionary + module :: match([module() | list()]), %% #diameter_app.module filter :: match(diameter:peer_filter()), packet :: match(#diameter_packet{})}). @@ -150,20 +189,6 @@ timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF, detach = false :: boolean()}). -%% Since RFC 3588 requires that a Diameter agent not modify End-to-End -%% Identifiers, the possibility of explicitly setting an End-to-End -%% Identifier would be needed to be able to implement an agent in -%% which one side of the communication is not implemented on top of -%% diameter. For example, Diameter being sent or received encapsulated -%% in some other protocol, or even another Diameter stack in a -%% non-Erlang environment. (Not that this is likely to be a normal -%% case.) -%% -%% The implemented solution is not an option but to respect any header -%% values set in a diameter_header record returned from a -%% prepare_request callback. A call to diameter:call/4 can communicate -%% values to the callback using the 'extra' option if so desired. - %%% --------------------------------------------------------------------------- %%% # start(SvcName) %%% --------------------------------------------------------------------------- @@ -216,20 +241,20 @@ stop_transport(SvcName, [_|_] = Refs) -> %%% --------------------------------------------------------------------------- info(SvcName, Item) -> - info_rc(call_service_by_name(SvcName, {info, Item})). - -info_rc({error, _}) -> - undefined; -info_rc(Info) -> - Info. + case find_state(SvcName) of + #state{} = S -> + service_info(Item, S); + false -> + undefined + end. %%% --------------------------------------------------------------------------- %%% # receive_message(TPid, Pkt, MessageData) %%% --------------------------------------------------------------------------- -%% Handle an incoming message in the watchdog process. This used to -%% come through the service process but this avoids that becoming a -%% bottleneck. +%% Handle an incoming Diameter message in the watchdog process. This +%% used to come through the service process but this avoids that +%% becoming a bottleneck. receive_message(TPid, Pkt, T) when is_pid(TPid) -> @@ -309,21 +334,39 @@ call_rc(_, _, Sent) -> %% In the process spawned for the outgoing request. call(SvcName, App, Msg, Opts, Caller) -> - c(ets:lookup(?STATE_TABLE, SvcName), App, Msg, Opts, Caller). + c(find_state(SvcName), App, Msg, Opts, Caller). -c([#state{service_name = SvcName} = S], App, Msg, Opts, Caller) -> +c(#state{service_name = Svc, options = [{_, Mask} | _]} = S, + App, + Msg, + Opts, + Caller) -> case find_transport(App, Msg, Opts, S) of {_,_,_} = T -> - send_request(T, Msg, Opts, Caller, SvcName); + send_request(T, Mask, Msg, Opts, Caller, Svc); false -> {error, no_connection}; {error, _} = No -> No end; -c([], _, _, _, _) -> +c(false, _, _, _, _) -> {error, no_service}. +%% find_state/1 + +find_state(SvcName) -> + fs(ets:lookup(?STATE_TABLE, SvcName)). + +fs([#state{} = S]) -> + S; + +fs([S]) -> %% inserted from old code + upgrade(S); + +fs([]) -> + false. + %% make_options/1 make_options(Options) -> @@ -388,15 +431,6 @@ whois(SvcName) -> undefined end. -%%% --------------------------------------------------------------------------- -%%% # flush_stats/1 -%%% -%%% Output: list of {{SvcName, Alias, Counter}, Value} -%%% --------------------------------------------------------------------------- - -flush_stats(TPid) -> - diameter_stats:flush(TPid). - %% =========================================================================== %% =========================================================================== @@ -428,6 +462,10 @@ i(_, false) -> %%% # handle_call(Req, From, State) %%% --------------------------------------------------------------------------- +handle_call(T, From, S) + when not is_record(S, state) -> + handle_call(T, From, upgrade(S)); + handle_call(state, _, S) -> {reply, S, S}; @@ -451,16 +489,25 @@ handle_call({pick_peer, Local, Remote, App}, _From, S) -> handle_call({call_module, AppMod, Req}, From, S) -> call_module(AppMod, Req, From, S); +%% Call from old code. handle_call({info, Item}, _From, S) -> {reply, service_info(Item, S), S}; handle_call(stop, _From, S) -> - shutdown(S), + shutdown(service, S), {stop, normal, ok, S}; %% The server currently isn't guaranteed to be dead when the caller %% gets the reply. We deal with this in the call to the server, %% stating a monitor that waits for DOWN before returning. +%% Watchdog is asking for the sequence mask. +handle_call(sequence, _From, #state{options = [{_, Mask} | _]} = S) -> + {reply, Mask, S}; + +%% Watchdog is asking for the nodes restriction. +handle_call(restriction, _From, #state{options = [_,_,_,{_,R} | _]} = S) -> + {reply, R, S}; + handle_call(Req, From, S) -> unexpected(handle_call, [Req, From], S), {reply, nok, S}. @@ -477,15 +524,16 @@ handle_cast(Req, S) -> %%% # handle_info(Req, State) %%% --------------------------------------------------------------------------- -handle_info(T,S) -> +handle_info(T, #state{} = S) -> case transition(T,S) of ok -> {noreply, S}; - #state{} = NS -> - {noreply, NS}; {stop, Reason} -> {stop, {shutdown, Reason}, S} - end. + end; + +handle_info(T, S) -> + handle_info(T, upgrade(S)). %% transition/2 @@ -496,15 +544,26 @@ transition({accepted, Pid, TPid}, S) -> %% Peer process has a new open connection. transition({connection_up, Pid, T}, S) -> - connection_up(Pid, T, S); + connection_up(Pid, T, S), + ok; + +%% Watchdog has a new connection that will be opened after DW[RA] +%% exchange. This message was added long after connection_up, to +%% communicate the information as soon as it's available. Leave +%% connection_up as is it for now, duplicated information and all. +transition({reopen, Pid, T}, S) -> + reopen(Pid, T, S), + ok; -%% Peer process has left state open. +%% Watchdog has left state OKAY. transition({connection_down, Pid}, S) -> - connection_down(Pid, S); + connection_down(Pid, S), + ok; -%% Peer process has returned to state open. +%% Watchdog has returned to state OKAY. transition({connection_up, Pid}, S) -> - connection_up(Pid, S); + connection_up(Pid, S), + ok; %% Accepting transport has lost connectivity. transition({close, Pid}, S) -> @@ -516,29 +575,61 @@ transition({reconnect, Pid}, S) -> reconnect(Pid, S), ok; +%% Watchdog is sending notification of a state transition. Note that +%% the connection_up/down messages are pre-date this message and are +%% still used. A watchdog message will follow these and communicate +%% the same state as was set in handling connection_up/down. +transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName, + peerT = PeerT}) -> + #peer{ref = Ref, type = T, options = Opts, op_state = {OS,_}} + = P + = fetch(PeerT, Pid), + insert(PeerT, P#peer{op_state = {OS, To}}), + send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}), + ok; +%% Death of a watchdog process (#peer.pid) results in the removal of +%% it's peer and any associated conn record when 'DOWN' is received +%% (after this) but the states will be {?STATE_UP, ?WD_DOWN} for a +%% short time. (No real problem since ?WD_* is only used in +%% service_info.) We set ?WD_OKAY as a consequence of connection_up +%% since we know a watchdog is coming. We can't set anything at +%% connection_down since we don't know if the subsequent watchdog +%% message will be ?WD_DOWN or ?WD_SUSPECT. We don't (yet) set +%% ?STATE_* as a consequence of a watchdog message since this requires +%% changing some of the matching on ?STATE_*. +%% +%% Death of a peer process process (#conn.pid, #peer.conn) results in +%% connection_down followed by watchdog ?WD_DOWN. The latter doesn't +%% result in the conn record being deleted since 'DOWN' from death of +%% its watchdog doesn't (yet) deal with the record having been +%% removed. + %% Monitor process has died. Just die with a reason that tells %% diameter_config about the happening. If a cleaner shutdown is %% required then someone should stop us. transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) -> {stop, {monitor, Reason}}; -%% Local peer process has died. +%% Local watchdog process has died. transition({'DOWN', _, process, Pid, Reason}, S) when node(Pid) == node() -> - peer_down(Pid, Reason, S); + peer_down(Pid, Reason, S), + ok; -%% Remote service wants to know about shared transports. +%% Remote service wants to know about shared peers. transition({service, Pid}, S) -> share_peers(Pid, S), ok; %% Remote service is communicating a shared peer. transition({peer, TPid, Aliases, Caps}, S) -> - remote_peer_up(TPid, Aliases, Caps, S); + remote_peer_up(TPid, Aliases, Caps, S), + ok; %% Remote peer process has died. transition({'DOWN', _, process, TPid, _}, S) -> - remote_peer_down(TPid, S); + remote_peer_down(TPid, S), + ok; %% Restart after tc expiry. transition({tc_timeout, T}, S) -> @@ -552,18 +643,48 @@ transition({failover, TRef, Seqs}, S) -> failover(TRef, Seqs, S), ok; +%% Ensure upgraded state is stored in state table. +transition(upgrade, _) -> + ok; + transition(Req, S) -> unexpected(handle_info, [Req], S), ok. +%% upgrade/1 + +upgrade({state, Id, Svc, Name, Svc, PT, CT, SB, UB, SD, LD, MPid}) -> + S = #state{id = Id, + service_name = Name, + service = Svc, + peerT = PT, + connT = CT, + shared_peers = SD, + local_peers = LD, + monitor = MPid, + options = [{sequence, ?NOMASK}, + {share_peers, SB}, + {use_shared_peers, UB}, + {restrict_connections, ?RESTRICT}]}, + upgrade_insert(S), + S. + +upgrade_insert(#state{service = #diameter_service{pid = Pid}} = S) -> + if Pid == self() -> + ets:insert(?STATE_TABLE, S); + true -> + Pid ! upgrade + end. + %%% --------------------------------------------------------------------------- %%% # terminate(Reason, State) %%% --------------------------------------------------------------------------- terminate(Reason, #state{service_name = Name} = S) -> + send_event(Name, stop), ets:delete(?STATE_TABLE, Name), shutdown == Reason %% application shutdown - andalso shutdown(S). + andalso shutdown(application, S). %%% --------------------------------------------------------------------------- %%% # code_change(FromVsn, State, Extra) @@ -646,41 +767,49 @@ mod_state(Alias, ModS) -> %%% # shutdown/2 %%% --------------------------------------------------------------------------- -shutdown(Refs, #state{peerT = PeerT}) -> - ets:foldl(fun(P,ok) -> s(P, Refs), ok end, ok, PeerT). - -s(#peer{ref = Ref, pid = Pid}, Refs) -> - s(lists:member(Ref, Refs), Pid); - -s(true, Pid) -> - Pid ! {shutdown, self()}; %% 'DOWN' will cleanup as usual -s(false, _) -> - ok. - -%%% --------------------------------------------------------------------------- -%%% # shutdown/1 -%%% --------------------------------------------------------------------------- +%% remove_transport: ask watchdogs to terminate their transport. +shutdown(Refs, #state{peerT = PeerT}) + when is_list(Refs) -> + ets:foldl(fun(P,ok) -> sp(P, Refs), ok end, ok, PeerT); -shutdown(#state{peerT = PeerT}) -> +%% application/service shutdown: ask transports to terminate themselves. +shutdown(Reason, #state{peerT = PeerT}) -> %% A transport might not be alive to receive the shutdown request %% but give those that are a chance to shutdown gracefully. - wait(fun st/2, PeerT), + shutdown(conn, Reason, PeerT), %% Kill the watchdogs explicitly in case there was no transport. - wait(fun sw/2, PeerT). + shutdown(peer, Reason, PeerT). -wait(Fun, T) -> - diameter_lib:wait(ets:foldl(Fun, [], T)). +%% sp/2 -st(#peer{conn = B}, Acc) - when is_boolean(B) -> - Acc; -st(#peer{conn = Pid}, Acc) -> - Pid ! shutdown, - [Pid | Acc]. +sp(#peer{ref = Ref, pid = Pid}, Refs) -> + lists:member(Ref, Refs) + andalso (Pid ! {shutdown, self()}). %% 'DOWN' cleans up + +%% shutdown/3 + +shutdown(Who, Reason, T) -> + diameter_lib:wait(ets:foldl(fun(X,A) -> shutdown(Who, X, Reason, A) end, + [], + T)). + +shutdown(conn = Who, #peer{op_state = {OS,_}} = P, Reason, Acc) -> + shutdown(Who, P#peer{op_state = OS}, Reason, Acc); -sw(#peer{pid = Pid}, Acc) -> +shutdown(conn, + #peer{pid = Pid, op_state = ?STATE_UP, conn = TPid}, + Reason, + Acc) -> + TPid ! {shutdown, Pid, Reason}, + [TPid | Acc]; + +shutdown(peer, #peer{pid = Pid}, _Reason, Acc) + when is_pid(Pid) -> exit(Pid, shutdown), - [Pid | Acc]. + [Pid | Acc]; + +shutdown(_, #peer{}, _, Acc) -> + Acc. %%% --------------------------------------------------------------------------- %%% # call_service/2 @@ -730,6 +859,7 @@ i(SvcName) -> true = ets:insert_new(?STATE_TABLE, S), %% Start fsms for each transport. + send_event(SvcName, start), lists:foreach(fun(T) -> start_fsm(T,S) end, CL), init_shared(S), @@ -740,9 +870,8 @@ cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts}, lists:foreach(fun init_mod/1, Apps), S = #state{service_name = SvcName, service = Rec#diameter_service{pid = self()}, - share_peers = get_value(share_peers, Opts), - use_shared_peers = get_value(use_shared_peers, Opts), - monitor = mref(get_value(monitor, Opts))}, + monitor = mref(get_value(monitor, Opts)), + options = service_options(Opts)}, {S, Acc}; cfg_acc({_Ref, Type, _Opts} = T, {S, Acc}) @@ -750,15 +879,24 @@ cfg_acc({_Ref, Type, _Opts} = T, {S, Acc}) Type == listen -> {S, [T | Acc]}. +service_options(Opts) -> + [{sequence, proplists:get_value(sequence, Opts, ?NOMASK)}, + {share_peers, get_value(share_peers, Opts)}, + {use_shared_peers, get_value(use_shared_peers, Opts)}, + {restrict_connections, proplists:get_value(restrict_connections, + Opts, + ?RESTRICT)}]. +%% The order of options is significant since we match against the list. + mref(false = No) -> No; mref(P) -> erlang:monitor(process, P). -init_shared(#state{use_shared_peers = true, +init_shared(#state{options = [_, _, {_, true} | _], service_name = Svc}) -> diameter_peer:notify(Svc, {service, self()}); -init_shared(#state{use_shared_peers = false}) -> +init_shared(#state{options = [_, _, {_, false} | _]}) -> ok. init_mod(#diameter_app{alias = Alias, @@ -821,9 +959,8 @@ start(Ref, Type, Opts, #state{peerT = PeerT, Pid. %% Note that the service record passed into the watchdog is the merged -%% record so that each watchdog (and peer_fsm) may get a different -%% record. This record is what is passed back into application -%% callbacks. +%% record so that each watchdog may get a different record. This +%% record is what is passed back into application callbacks. s(Type, Ref, T) -> case diameter_watchdog:start({Type, Ref}, T) of @@ -874,20 +1011,25 @@ accepted(Pid, _TPid, #state{peerT = PeerT} = S) -> #peer{ref = Ref, type = accept = T, conn = false, options = Opts} = P = fetch(PeerT, Pid), - insert(PeerT, P#peer{conn = true}), %% mark replacement transport started - start(Ref, T, Opts, S). %% start new peer + insert(PeerT, P#peer{conn = true}), %% mark replacement as started + start(Ref, T, Opts, S). %% start new watchdog fetch(Tid, Key) -> [T] = ets:lookup(Tid, Key), - T. + case T of + #peer{op_state = ?STATE_UP} = P -> + P#peer{op_state = {?STATE_UP, ?WD_OKAY}}; + #peer{op_state = ?STATE_DOWN} = P -> + P#peer{op_state = {?STATE_DOWN, ?WD_DOWN}}; + _ -> + T + end. %%% --------------------------------------------------------------------------- %%% # connection_up/3 -%%% -%%% Output: #state{} %%% --------------------------------------------------------------------------- -%% Peer process has reached the open state. +%% Watchdog process has reached state OKAY. connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT, connT = ConnT} @@ -902,9 +1044,29 @@ connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT, connection_up([Pkt], P#peer{conn = TPid}, C, S). %%% --------------------------------------------------------------------------- +%%% # reopen/3 +%%% --------------------------------------------------------------------------- + +%% Note that this connection_up/3 rewrites the same #conn{} now +%% written here. Both do so in case reopen has not happened in old +%% code. + +reopen(Pid, {TPid, {Caps, SApps, _Pkt}}, #state{peerT = PeerT, + connT = ConnT}) -> + P = fetch(PeerT, Pid), + C = #conn{pid = TPid, + apps = SApps, + caps = Caps, + peer = Pid}, + + insert(ConnT, C), + #peer{op_state = {?STATE_DOWN, _}} + = P, + insert(PeerT, P#peer{op_state = {?STATE_DOWN, ?WD_REOPEN}, + conn = TPid}). + +%%% --------------------------------------------------------------------------- %%% # connection_up/2 -%%% -%%% Output: #state{} %%% --------------------------------------------------------------------------- %% Peer process has transitioned back into the open state. Note that there @@ -925,18 +1087,16 @@ connection_up(T, P, C, #state{peerT = PeerT, service = #diameter_service{applications = Apps}} = S) -> - #peer{conn = TPid, op_state = ?STATE_DOWN} + #peer{conn = TPid, op_state = {?STATE_DOWN, _}} = P, #conn{apps = SApps, caps = Caps} = C, - insert(PeerT, P#peer{op_state = ?STATE_UP}), + insert(PeerT, P#peer{op_state = {?STATE_UP, ?WD_OKAY}}), request_peer_up(TPid), - report_status(up, P, C, S, T), - S#state{local_peers = insert_local_peer(SApps, - {{TPid, Caps}, {SvcName, Apps}}, - LDict)}. + insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), + report_status(up, P, C, S, T). insert_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps). @@ -945,56 +1105,62 @@ ilp({Id, Alias}, {TC, SA}, LDict) -> init_conn(Id, Alias, TC, SA), ?Dict:append(Alias, TC, LDict). -init_conn(Id, Alias, TC, {SvcName, Apps}) -> +init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) -> #diameter_app{module = ModX, id = Id} %% assert = find_app(Alias, Apps), - peer_cb({ModX, peer_up, [SvcName, TC]}, Alias). + peer_cb({ModX, peer_up, [SvcName, TC]}, Alias) + orelse exit(TPid, kill). %% fake transport failure + +%% find_app/2 find_app(Alias, Apps) -> - lists:keyfind(Alias, #diameter_app.alias, Apps). + case lists:keyfind(Alias, #diameter_app.alias, Apps) of + #diameter_app{options = E} = A when is_atom(E) -> %% upgrade + A#diameter_app{options = [{answer_errors, E}]}; + A -> + A + end. -%% A failing peer callback brings down the service. In the case of -%% peer_up we could just kill the transport and emit an error but for -%% peer_down we have no way to cleanup any state change that peer_up -%% may have introduced. +%% Don't bring down the service (and all associated connections) +%% regardless of what happens. peer_cb(MFA, Alias) -> try state_cb(MFA, Alias) of ModS -> - mod_state(Alias, ModS) + mod_state(Alias, ModS), + true catch - E: Reason -> - ?ERROR({E, Reason, MFA, ?STACK}) + E:R -> + diameter_lib:error_report({failure, {E, R, Alias, ?STACK}}, MFA), + false end. %%% --------------------------------------------------------------------------- %%% # connection_down/2 -%%% -%%% Output: #state{} %%% --------------------------------------------------------------------------- -%% Peer process has transitioned out of the open state. +%% Watchdog has transitioned out of state OKAY. connection_down(Pid, #state{peerT = PeerT, connT = ConnT} = S) -> - #peer{op_state = ?STATE_UP, %% assert + #peer{op_state = {?STATE_UP, WS}, %% assert conn = TPid} = P = fetch(PeerT, Pid), C = fetch(ConnT, TPid), - insert(PeerT, P#peer{op_state = ?STATE_DOWN}), + insert(PeerT, P#peer{op_state = {?STATE_DOWN, WS}}), connection_down(P,C,S). %% connection_down/3 -connection_down(#peer{op_state = ?STATE_DOWN}, _, S) -> - S; +connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, _) -> + ok; connection_down(#peer{conn = TPid, - op_state = ?STATE_UP} + op_state = {?STATE_UP, _}} = P, #conn{caps = Caps, apps = SApps} @@ -1004,12 +1170,8 @@ connection_down(#peer{conn = TPid, local_peers = LDict} = S) -> report_status(down, P, C, S, []), - NewS = S#state{local_peers - = remove_local_peer(SApps, - {{TPid, Caps}, {SvcName, Apps}}, - LDict)}, - request_peer_down(TPid, NewS), - NewS. + remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), + request_peer_down(TPid, S). remove_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). @@ -1028,11 +1190,9 @@ down_conn(Id, Alias, TC, {SvcName, Apps}) -> %%% --------------------------------------------------------------------------- %%% # peer_down/3 -%%% -%%% Output: #state{} %%% --------------------------------------------------------------------------- -%% Peer process has died. +%% Watchdog process has died. peer_down(Pid, Reason, #state{peerT = PeerT} = S) -> P = fetch(PeerT, Pid), @@ -1043,7 +1203,7 @@ peer_down(Pid, Reason, #state{peerT = PeerT} = S) -> %% Send an event at connection establishment failure. closed({shutdown, {close, _TPid, Reason}}, - #peer{op_state = ?STATE_DOWN, + #peer{op_state = {?STATE_DOWN, _}, ref = Ref, type = Type, options = Opts}, @@ -1052,12 +1212,12 @@ closed({shutdown, {close, _TPid, Reason}}, closed(_, _, _) -> ok. -%% The peer has never come up ... -peer_down(#peer{conn = B}, S) +%% The watchdog has never reached OKAY ... +peer_down(#peer{conn = B}, _) when is_boolean(B) -> - S; + ok; -%% ... or it has. +%% ... or maybe it has. peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) -> #conn{} = C = fetch(ConnT, TPid), ets:delete_object(ConnT, C), @@ -1085,7 +1245,7 @@ restart(#peer{ref = Ref, started = Time}) -> {Time, {Ref, T, Opts}}; -%% ... or it has: a replacement transport has already been spawned. +%% ... or it has: a replacement has already been spawned. restart(#peer{type = accept}) -> false. @@ -1111,8 +1271,8 @@ default_tc(connect, Opts) -> default_tc(accept, _) -> 0. -%% Bound tc below if the peer was restarted recently to avoid -%% continuous in case of faulty config or other problems. +%% Bound tc below if the watchdog was restarted recently to avoid +%% continuous restarted in case of faulty config or other problems. tc(Time, Tc) -> choose(Tc > ?RESTART_TC orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC, @@ -1234,7 +1394,7 @@ cm([_,_|_], _, _, _) -> multiple. %%% --------------------------------------------------------------------------- -%%% # send_request/5 +%%% # send_request/6 %%% --------------------------------------------------------------------------- %% Send an outgoing request in its dedicated process. @@ -1247,71 +1407,90 @@ cm([_,_|_], _, _, _) -> %% The mod field of the #diameter_app{} here includes any extra %% arguments passed to diameter:call/2. -send_request({TPid, Caps, App}, Msg, Opts, Caller, SvcName) -> +send_request({TPid, Caps, App} = T, Mask, Msg, Opts, Caller, SvcName) -> #diameter_app{module = ModX} = App, - Pkt = make_request_packet(Msg), - - case cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]) of - {send, P} -> - send_request(make_request_packet(P, Pkt), - TPid, - Caps, - App, - Opts, - Caller, - SvcName); - {discard, Reason} -> - {error, Reason}; - discard -> - {error, discarded}; - T -> - ?ERROR({invalid_return, prepare_request, App, T}) - end. + Pkt = make_prepare_packet(Mask, Msg), + + send_req(cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]), + Pkt, + T, + Opts, + Caller, + SvcName, + []). + +send_req({send, P}, Pkt, T, Opts, Caller, SvcName, Fs) -> + send_req(make_request_packet(P, Pkt), T, Opts, Caller, SvcName, Fs); + +send_req({discard, Reason} , _, _, _, _, _, _) -> + {error, Reason}; + +send_req(discard, _, _, _, _, _, _) -> + {error, discarded}; + +send_req({eval_packet, RC, F}, Pkt, T, Opts, Caller, SvcName, Fs) -> + send_req(RC, Pkt, T, Opts, Caller, SvcName, [F|Fs]); -%% make_request_packet/1 +send_req(E, _, {_, _, App}, _, _, _, _) -> + ?ERROR({invalid_return, prepare_request, App, E}). + +%% make_prepare_packet/2 %% %% Turn an outgoing request as passed to call/4 into a diameter_packet %% record in preparation for a prepare_request callback. -make_request_packet(Bin) +make_prepare_packet(_, Bin) when is_binary(Bin) -> #diameter_packet{header = diameter_codec:decode_header(Bin), bin = Bin}; -make_request_packet(#diameter_packet{msg = [#diameter_header{} = Hdr | Avps]} - = Pkt) -> - Pkt#diameter_packet{msg = [make_request_header(Hdr) | Avps]}; +make_prepare_packet(Mask, #diameter_packet{msg = [#diameter_header{} = Hdr + | Avps]} + = Pkt) -> + Pkt#diameter_packet{msg = [make_prepare_header(Mask, Hdr) | Avps]}; -make_request_packet(#diameter_packet{header = Hdr} = Pkt) -> - Pkt#diameter_packet{header = make_request_header(Hdr)}; +make_prepare_packet(Mask, #diameter_packet{header = Hdr} = Pkt) -> + Pkt#diameter_packet{header = make_prepare_header(Mask, Hdr)}; -make_request_packet(Msg) -> - make_request_packet(#diameter_packet{msg = Msg}). +make_prepare_packet(Mask, Msg) -> + make_prepare_packet(Mask, #diameter_packet{msg = Msg}). -%% make_request_header/1 +%% make_prepare_header/2 -make_request_header(undefined) -> - Seq = diameter_session:sequence(), - make_request_header(#diameter_header{end_to_end_id = Seq, +make_prepare_header(Mask, undefined) -> + Seq = diameter_session:sequence(Mask), + make_prepare_header(#diameter_header{end_to_end_id = Seq, hop_by_hop_id = Seq}); -make_request_header(#diameter_header{version = undefined} = Hdr) -> - make_request_header(Hdr#diameter_header{version = ?DIAMETER_VERSION}); +make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined, + hop_by_hop_id = undefined} + = H) -> + Seq = diameter_session:sequence(Mask), + make_prepare_header(H#diameter_header{end_to_end_id = Seq, + hop_by_hop_id = Seq}); + +make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined} = H) -> + Seq = diameter_session:sequence(Mask), + make_prepare_header(H#diameter_header{end_to_end_id = Seq}); + +make_prepare_header(Mask, #diameter_header{hop_by_hop_id = undefined} = H) -> + Seq = diameter_session:sequence(Mask), + make_prepare_header(H#diameter_header{hop_by_hop_id = Seq}); + +make_prepare_header(_, Hdr) -> + make_prepare_header(Hdr). -make_request_header(#diameter_header{end_to_end_id = undefined} = H) -> - Seq = diameter_session:sequence(), - make_request_header(H#diameter_header{end_to_end_id = Seq}); +%% make_prepare_header/1 -make_request_header(#diameter_header{hop_by_hop_id = undefined} = H) -> - Seq = diameter_session:sequence(), - make_request_header(H#diameter_header{hop_by_hop_id = Seq}); +make_prepare_header(#diameter_header{version = undefined} = Hdr) -> + make_prepare_header(Hdr#diameter_header{version = ?DIAMETER_VERSION}); -make_request_header(#diameter_header{} = Hdr) -> +make_prepare_header(#diameter_header{} = Hdr) -> Hdr; -make_request_header(T) -> +make_prepare_header(T) -> ?ERROR({invalid_header, T}). %% make_request_packet/2 @@ -1321,7 +1500,7 @@ make_request_header(T) -> make_request_packet(Bin, _) when is_binary(Bin) -> - make_request_packet(Bin); + make_prepare_packet(false, Bin); make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]} = Pkt, @@ -1333,7 +1512,7 @@ make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]} %% This is primarily so that the end to end and hop by hop identifiers %% are retained. make_request_packet(#diameter_packet{header = Hdr} = Pkt, - #diameter_packet{header = Hdr0}) -> + #diameter_packet{header = Hdr0}) -> Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)}; make_request_packet(Msg, Pkt) -> @@ -1346,16 +1525,16 @@ fold_record(undefined, R) -> fold_record(Rec, R) -> diameter_lib:fold_tuple(2, Rec, R). -%% send_request/7 +%% send_req/6 -send_request(Pkt, TPid, Caps, App, Opts, Caller, SvcName) -> +send_req(Pkt, {TPid, Caps, App}, Opts, Caller, SvcName, Fs) -> #diameter_app{alias = Alias, dictionary = Dict, module = ModX, - answer_errors = AE} + options = [{answer_errors, AE} | _]} = App, - EPkt = encode(Dict, Pkt), + EPkt = encode(Dict, Pkt, Fs), #options{filter = Filter, timeout = Timeout} @@ -1436,6 +1615,13 @@ msg(#diameter_packet{msg = undefined, bin = Bin}) -> msg(#diameter_packet{msg = Msg}) -> Msg. +%% encode/3 + +encode(Dict, Pkt, Fs) -> + P = encode(Dict, Pkt), + eval_packet(P, Fs), + P. + %% encode/2 %% Note that prepare_request can return a diameter_packet containing @@ -1517,38 +1703,51 @@ send(Pid, Pkt) -> %% retransmit/4 -retransmit({TPid, Caps, #diameter_app{alias = Alias} = App}, - #request{app = Alias, - packet = Pkt} +retransmit({TPid, Caps, #diameter_app{alias = Alias} = App} = T, + #request{app = Alias, packet = Pkt0} = Req, SvcName, Timeout) -> - have_request(Pkt, TPid) %% Don't failover to a peer we've + have_request(Pkt0, TPid) %% Don't failover to a peer we've andalso ?THROW(timeout), %% already sent to. - case cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]) of - {send, P} -> - retransmit(make_request_packet(P, Pkt), TPid, Caps, Req, Timeout); - {discard, Reason} -> - ?THROW(Reason); - discard -> - ?THROW(discarded); - T -> - ?ERROR({invalid_return, prepare_retransmit, App, T}) - end. + #diameter_packet{header = Hdr0} = Pkt0, + Hdr = Hdr0#diameter_header{is_retransmitted = true}, + Pkt = Pkt0#diameter_packet{header = Hdr}, + + resend_req(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]), + T, + Req#request{packet = Pkt}, + Timeout, + []). + +resend_req({send, P}, T, #request{packet = Pkt} = Req, Timeout, Fs) -> + retransmit(make_request_packet(P, Pkt), T, Req, Timeout, Fs); + +resend_req({discard, Reason}, _, _, _, _) -> + ?THROW(Reason); -%% retransmit/5 +resend_req(discard, _, _, _, _) -> + ?THROW(discarded); -retransmit(Pkt, TPid, Caps, #request{dictionary = Dict} = Req, Timeout) -> - EPkt = encode(Dict, Pkt), +resend_req({eval_packet, RC, F}, T, Req, Timeout, Fs) -> + resend_req(RC, T, Req, Timeout, [F|Fs]); + +resend_req(T, {_, _, App}, _, _, _) -> + ?ERROR({invalid_return, prepare_retransmit, App, T}). - NewReq = Req#request{transport = TPid, - packet = Pkt, - caps = Caps}, +%% retransmit/6 - ?LOG(retransmission, NewReq), - TRef = send_request(TPid, EPkt, NewReq, Timeout), - {TRef, NewReq}. +retransmit(Pkt, {TPid, Caps, _}, #request{dictionary = D} = Req0, Tmo, Fs) -> + EPkt = encode(D, Pkt, Fs), + + Req = Req0#request{transport = TPid, + packet = Pkt, + caps = Caps}, + + ?LOG(retransmission, Req), + TRef = send_request(TPid, EPkt, Req, Tmo), + {TRef, Req}. %% store_request/4 @@ -1620,10 +1819,13 @@ request_peer_down(TPid, S) -> %%% recv_request/3 %%% --------------------------------------------------------------------------- -recv_request(TPid, Pkt, {ConnT, SvcName, Apps}) -> +recv_request(TPid, Pkt, {ConnT, SvcName, Apps}) -> %% upgrade + recv_request(TPid, Pkt, {ConnT, SvcName, Apps, ?NOMASK}); + +recv_request(TPid, Pkt, {ConnT, SvcName, Apps, Mask}) -> try ets:lookup(ConnT, TPid) of [C] -> - recv_request(C, TPid, Pkt, SvcName, Apps); + recv_request(C, TPid, Pkt, SvcName, Apps, Mask); [] -> %% transport has gone down ok catch @@ -1633,7 +1835,12 @@ recv_request(TPid, Pkt, {ConnT, SvcName, Apps}) -> %% recv_request/5 -recv_request(#conn{apps = SApps, caps = Caps}, TPid, Pkt, SvcName, Apps) -> +recv_request(#conn{apps = SApps, caps = Caps}, + TPid, + Pkt, + SvcName, + Apps, + Mask) -> #diameter_caps{origin_host = {OH,_}, origin_realm = {OR,_}} = Caps, @@ -1645,6 +1852,7 @@ recv_request(#conn{apps = SApps, caps = Caps}, TPid, Pkt, SvcName, Apps) -> {SvcName, OH, OR}, TPid, Apps, + Mask, Caps, Pkt). @@ -1670,20 +1878,24 @@ keyfind([Key | Rest], Pos, L) -> T end. -%% recv_request/6 +%% recv_request/7 -recv_request({Id, Alias}, T, TPid, Apps, Caps, Pkt) -> +recv_request({Id, Alias}, T, TPid, Apps, Mask, Caps, Pkt) -> #diameter_app{dictionary = Dict} = A = find_app(Alias, Apps), - recv_request(T, {TPid, Caps}, A, diameter_codec:decode(Id, Dict, Pkt)); + recv_request(T, + {TPid, Caps}, + A, + Mask, + diameter_codec:decode(Id, Dict, Pkt)); %% Note that the decode is different depending on whether or not Id is %% ?APP_ID_RELAY. %% DIAMETER_APPLICATION_UNSUPPORTED 3007 %% A request was sent for an application that is not supported. -recv_request(false, T, TPid, _, _, Pkt) -> +recv_request(false, T, TPid, _, _, _, Pkt) -> As = collect_avps(Pkt), protocol_error(3007, T, TPid, Pkt#diameter_packet{avps = As}). @@ -1695,7 +1907,7 @@ collect_avps(Pkt) -> As end. -%% recv_request/4 +%% recv_request/5 %% Wrong number of bits somewhere in the message: reply. %% @@ -1704,7 +1916,7 @@ collect_avps(Pkt) -> %% set to an unrecognized value, or that is inconsistent with the %% AVP's definition. %% -recv_request(T, {TPid, _}, _, #diameter_packet{errors = [Bs | _]} = Pkt) +recv_request(T, {TPid, _}, _, _, #diameter_packet{errors = [Bs | _]} = Pkt) when is_bitstring(Bs) -> protocol_error(3009, T, TPid, Pkt); @@ -1719,6 +1931,7 @@ recv_request(T, {TPid, _}, _, #diameter_packet{errors = [Bs | _]} = Pkt) recv_request(T, {TPid, _}, #diameter_app{id = Id}, + _, #diameter_packet{header = #diameter_header{is_proxiable = P}, msg = M} = Pkt) @@ -1736,6 +1949,7 @@ recv_request(T, recv_request(T, {TPid, _}, _, + _, #diameter_packet{header = #diameter_header{is_error = true}} = Pkt) -> protocol_error(3008, T, TPid, Pkt); @@ -1744,14 +1958,20 @@ recv_request(T, %% in the relay application. Don't distinguish between the two since %% each application has its own callback config. That is, the user can %% easily distinguish between the two cases. -recv_request(T, TC, App, Pkt) -> - request_cb(T, TC, App, examine(Pkt)). +recv_request(T, TC, App, Mask, Pkt) -> + request_cb(T, TC, App, Mask, examine(Pkt)). %% Note that there may still be errors but these aren't protocol %% (3xxx) errors that lead to an answer-message. -request_cb({SvcName, _OH, _OR} = T, TC, App, Pkt) -> - request_cb(cb(App, handle_request, [Pkt, SvcName, TC]), App, T, TC, Pkt). +request_cb({SvcName, _OH, _OR} = T, TC, App, Mask, Pkt) -> + request_cb(cb(App, handle_request, [Pkt, SvcName, TC]), + App, + Mask, + T, + TC, + [], + Pkt). %% examine/1 %% @@ -1771,7 +1991,7 @@ examine(#diameter_packet{errors = Es} = Pkt) -> Pkt#diameter_packet{errors = [5011 | Es]}. %% It's odd/unfortunate that this isn't a protocol error. -%% request_cb/5 +%% request_cb/7 %% A reply may be an answer-message, constructed either here or by %% the handle_request callback. The header from the incoming request @@ -1781,21 +2001,23 @@ examine(#diameter_packet{errors = Es} = Pkt) -> request_cb({reply, Ans}, #diameter_app{dictionary = Dict}, _, + _, {TPid, _}, + Fs, Pkt) -> - reply(Ans, Dict, TPid, Pkt); + reply(Ans, Dict, TPid, Fs, Pkt); %% An 3xxx result code, for which the E-bit is set in the header. -request_cb({protocol_error, RC}, _, T, {TPid, _}, Pkt) +request_cb({protocol_error, RC}, _, _, T, {TPid, _}, Fs, Pkt) when 3000 =< RC, RC < 4000 -> - protocol_error(RC, T, TPid, Pkt); + protocol_error(RC, T, TPid, Fs, Pkt); %% RFC 3588 says we must reply 3001 to anything unrecognized or %% unsupported. 'noreply' is undocumented (and inappropriately named) %% backwards compatibility for this, protocol_error the documented %% alternative. -request_cb(noreply, _, T, {TPid, _}, Pkt) -> - protocol_error(3001, T, TPid, Pkt); +request_cb(noreply, _, _, T, {TPid, _}, Fs, Pkt) -> + protocol_error(3001, T, TPid, Fs, Pkt); %% Relay a request to another peer. This is equivalent to doing an %% explicit call/4 with the message in question except that (1) a loop @@ -1815,38 +2037,57 @@ request_cb(noreply, _, T, {TPid, _}, Pkt) -> request_cb({A, Opts}, #diameter_app{id = Id} = App, + Mask, T, TC, + Fs, Pkt) when A == relay, Id == ?APP_ID_RELAY; A == proxy, Id /= ?APP_ID_RELAY; A == resend -> - resend(Opts, App, T, TC, Pkt); + resend(Opts, App, Mask, T, TC, Fs, Pkt); -request_cb(discard, _, _, _, _) -> +request_cb(discard, _, _, _, _, _, _) -> ok; -request_cb({eval, RC, F}, App, T, TC, Pkt) -> - request_cb(RC, App, T, TC, Pkt), +request_cb({eval_packet, RC, F}, App, Mask, T, TC, Fs, Pkt) -> + request_cb(RC, App, Mask, T, TC, [F|Fs], Pkt); + +request_cb({eval, RC, F}, App, Mask, T, TC, Fs, Pkt) -> + request_cb(RC, App, Mask, T, TC, Fs, Pkt), diameter_lib:eval(F). -%% protocol_error/4 +%% protocol_error/5 -protocol_error(RC, {_, OH, OR}, TPid, #diameter_packet{avps = Avps} = Pkt) -> +protocol_error(RC, {_, OH, OR}, TPid, Fs, Pkt) -> + #diameter_packet{avps = Avps, errors = Es} = Pkt, ?LOG({error, RC}, Pkt), - reply(answer_message({OH, OR, RC}, Avps), ?BASE, TPid, Pkt). + reply(answer_message({OH, OR, RC}, Avps), + ?BASE, + TPid, + Fs, + Pkt#diameter_packet{errors = [RC | Es]}). +%% Note that reply/5 may set the result code once more. It's set in +%% answer_message/2 in case reply/5 doesn't. + +%% protocol_error/4 + +protocol_error(RC, T, TPid, Pkt) -> + protocol_error(RC, T, TPid, [], Pkt). -%% resend/5 +%% resend/7 %% %% Resend a message as a relay or proxy agent. resend(Opts, #diameter_app{} = App, + Mask, {_SvcName, OH, _OR} = T, {_TPid, _Caps} = TC, + Fs, #diameter_packet{avps = Avps} = Pkt) -> {Code, _Flags, Vid} = ?BASE:avp_header('Route-Record'), - resend(is_loop(Code, Vid, OH, Avps), Opts, App, T, TC, Pkt). + resend(is_loop(Code, Vid, OH, Avps), Opts, App, Mask, T, TC, Fs, Pkt). %% DIAMETER_LOOP_DETECTED 3005 %% An agent detected a loop while trying to get the message to the @@ -1854,8 +2095,8 @@ resend(Opts, %% if one is available, but the peer reporting the error has %% identified a configuration problem. -resend(true, _, _, T, {TPid, _}, Pkt) -> %% Route-Record loop - protocol_error(3005, T, TPid, Pkt); +resend(true, _, _, _, T, {TPid, _}, Fs, Pkt) -> %% Route-Record loop + protocol_error(3005, T, TPid, Fs, Pkt); %% 6.1.8. Relaying and Proxying Requests %% @@ -1866,16 +2107,18 @@ resend(true, _, _, T, {TPid, _}, Pkt) -> %% Route-Record loop resend(false, Opts, App, + Mask, {SvcName, _, _} = T, {TPid, #diameter_caps{origin_host = {_, OH}}}, + Fs, #diameter_packet{header = Hdr0, avps = Avps} = Pkt) -> Route = #diameter_avp{data = {?BASE, 'Route-Record', OH}}, - Seq = diameter_session:sequence(), + Seq = diameter_session:sequence(Mask), Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq}, Msg = [Hdr, Route | Avps], - resend(call(SvcName, App, Msg, Opts), T, TPid, Pkt). + resend(call(SvcName, App, Msg, Opts), T, TPid, Fs, Pkt). %% The incoming request is relayed with the addition of a %% Route-Record. Note the requirement on the return from call/4 below, %% which places a requirement on the value returned by the @@ -1901,15 +2144,18 @@ resend(#diameter_packet{bin = B} = Pkt, _, TPid, + Fs, #diameter_packet{header = #diameter_header{hop_by_hop_id = Id}, transport_data = TD}) -> - send(TPid, Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B), - transport_data = TD}); + P = Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B), + transport_data = TD}, + eval_packet(P, Fs), + send(TPid, P); %% TODO: counters %% Or not: DIAMETER_UNABLE_TO_DELIVER. -resend(_, T, TPid, Pkt) -> - protocol_error(3002, T, TPid, Pkt). +resend(_, T, TPid, Fs, Pkt) -> + protocol_error(3002, T, TPid, Fs, Pkt). %% is_loop/4 %% @@ -1931,46 +2177,69 @@ is_loop(Code, Vid, OH, [_ | Avps]) is_loop(Code, Vid, OH, Avps) -> is_loop(Code, Vid, ?BASE:avp(encode, OH, 'Route-Record'), Avps). -%% reply/4 +%% reply/5 %% %% Send a locally originating reply. +%% Skip the setting of Result-Code and Failed-AVP's below. This is +%% currently undocumented. +reply([Msg], Dict, TPid, Fs, Pkt) + when is_list(Msg); + is_tuple(Msg) -> + reply(Msg, Dict, TPid, Fs, Pkt#diameter_packet{errors = []}); + %% No errors or a diameter_header/avp list. -reply(Msg, Dict, TPid, #diameter_packet{errors = Es, - transport_data = TD} - = ReqPkt) +reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = Es} = ReqPkt) when [] == Es; is_record(hd(Msg), diameter_header) -> Pkt = diameter_codec:encode(Dict, make_answer_packet(Msg, ReqPkt)), - incr(send, Pkt, TPid), %% count result codes in sent answers - send(TPid, Pkt#diameter_packet{transport_data = TD}); + eval_packet(Pkt, Fs), + incr(send, Pkt, Dict, TPid), %% count result codes in sent answers + send(TPid, Pkt); %% Or not: set Result-Code and Failed-AVP AVP's. -reply(Msg, Dict, TPid, #diameter_packet{errors = [H|_] = Es} = Pkt) -> +reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = [H|_] = Es} = Pkt) -> reply(rc(Msg, rc(H), [A || {_,A} <- Es], Dict), Dict, TPid, + Fs, Pkt#diameter_packet{errors = []}). +eval_packet(Pkt, Fs) -> + lists:foreach(fun(F) -> diameter_lib:eval([F,Pkt]) end, Fs). + %% make_answer_packet/2 +%% A reply message clears the R and T flags and retains the P flag. +%% The E flag will be set at encode. 6.2 of 3588 requires the same P +%% flag on an answer as on the request. A #diameter_packet{} returned +%% from a handle_request callback can circumvent this by setting its +%% own header values. +make_answer_packet(#diameter_packet{header = Hdr, + msg = Msg, + transport_data = TD}, + #diameter_packet{header = ReqHdr}) -> + Hdr0 = ReqHdr#diameter_header{version = ?DIAMETER_VERSION, + is_request = false, + is_error = undefined, + is_retransmitted = false}, + #diameter_packet{header = fold_record(Hdr0, Hdr), + msg = Msg, + transport_data = TD}; + %% Binaries and header/avp lists are sent as-is. -make_answer_packet(Bin, _) +make_answer_packet(Bin, #diameter_packet{transport_data = TD}) when is_binary(Bin) -> - #diameter_packet{bin = Bin}; -make_answer_packet([#diameter_header{} | _] = Msg, _) -> - #diameter_packet{msg = Msg}; - -%% Otherwise a reply message clears the R and T flags and retains the -%% P flag. The E flag will be set at encode. 6.2 of 3588 requires the -%% same P flag on an answer as on the request. -make_answer_packet(Msg, #diameter_packet{header = ReqHdr}) -> - Hdr = ReqHdr#diameter_header{version = ?DIAMETER_VERSION, - is_request = false, - is_error = undefined, - is_retransmitted = false}, - #diameter_packet{header = Hdr, - msg = Msg}. + #diameter_packet{bin = Bin, + transport_data = TD}; +make_answer_packet([#diameter_header{} | _] = Msg, + #diameter_packet{transport_data = TD}) -> + #diameter_packet{msg = Msg, + transport_data = TD}; + +%% Otherwise, preserve transport_data. +make_answer_packet(Msg, #diameter_packet{transport_data = TD} = Pkt) -> + make_answer_packet(#diameter_packet{msg = Msg, transport_data = TD}, Pkt). %% rc/1 @@ -1981,9 +2250,15 @@ rc(RC) -> %% rc/4 +rc(#diameter_packet{msg = Rec} = Pkt, RC, Failed, Dict) -> + Pkt#diameter_packet{msg = rc(Rec, RC, Failed, Dict)}; + rc(Rec, RC, Failed, Dict) when is_integer(RC) -> - set(Rec, [{'Result-Code', RC} | failed_avp(Rec, Failed, Dict)], Dict). + set(Rec, + lists:append([rc(Rec, {'Result-Code', RC}, Dict), + failed_avp(Rec, Failed, Dict)]), + Dict). %% Reply as name and tuple list ... set([_|_] = Ans, Avps, _) -> @@ -1993,6 +2268,22 @@ set([_|_] = Ans, Avps, _) -> set(Rec, Avps, Dict) -> Dict:'#set-'(Avps, Rec). +%% rc/3 +%% +%% Turn the result code into a list if its optional and only set it if +%% the arity is 1 or {0,1}. In other cases (which probably shouldn't +%% exist in practise) we can't know what's appropriate. + +rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) -> + case Dict:avp_arity(MsgName, 'Result-Code') of + 1 -> [T]; + {0,1} -> [{K, [RC]}]; + _ -> [] + end; + +rc(Rec, T, Dict) -> + rc([Dict:rec2msg(element(1, Rec))], T, Dict). + %% failed_avp/3 failed_avp(_, [] = No, _) -> @@ -2200,44 +2491,39 @@ handle_answer(SvcName, _, {error, Req, Reason}) -> handle_answer(SvcName, AnswerErrors, {answer, #request{dictionary = Dict} = Req, Pkt}) -> - a(examine(diameter_codec:decode(Dict, Pkt)), - SvcName, - AnswerErrors, - Req). + answer(examine(diameter_codec:decode(Dict, Pkt)), + SvcName, + AnswerErrors, + Req). %% We don't really need to do a full decode if we're a relay and will %% just resend with a new hop by hop identifier, but might a proxy %% want to examine the answer? -a(#diameter_packet{errors = []} - = Pkt, - SvcName, - AE, - #request{transport = TPid, - caps = Caps, - packet = P} - = Req) -> +answer(Pkt, SvcName, AE, #request{transport = TPid, + dictionary = Dict} + = Req) -> try - incr(in, Pkt, TPid) + incr(recv, Pkt, Dict, TPid) of - _ -> - cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]) + _ -> a(Pkt, SvcName, AE, Req) catch exit: {invalid_error_bit, _} = E -> - e(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req) - end; + a(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req) + end. -a(#diameter_packet{} = Pkt, SvcName, AE, Req) -> - e(Pkt, SvcName, AE, Req). +a(#diameter_packet{errors = Es} = Pkt, SvcName, AE, #request{transport = TPid, + caps = Caps, + packet = P} + = Req) + when [] == Es; + callback == AE -> + cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]); -e(Pkt, SvcName, callback, #request{transport = TPid, - caps = Caps, - packet = Pkt} - = Req) -> - cb(Req, handle_answer, [Pkt, msg(Pkt), SvcName, {TPid, Caps}]); -e(Pkt, SvcName, report, Req) -> +a(Pkt, SvcName, report, Req) -> x(errors, handle_answer, [SvcName, Req, Pkt]); -e(Pkt, SvcName, discard, Req) -> + +a(Pkt, SvcName, discard, Req) -> x({errors, handle_answer, [SvcName, Req, Pkt]}). %% Note that we don't check that the application id in the answer's @@ -2249,17 +2535,19 @@ e(Pkt, SvcName, discard, Req) -> %% Increment a stats counter for an incoming or outgoing message. %% TODO: fix -incr(_, #diameter_packet{msg = undefined}, _) -> +incr(_, #diameter_packet{msg = undefined}, _, _) -> ok; -incr(Dir, Pkt, TPid) - when is_pid(TPid) -> +incr(recv = D, #diameter_packet{header = H, errors = [_|_]}, _, TPid) -> + incr(TPid, {diameter_codec:msg_id(H), D, error}); + +incr(Dir, Pkt, Dict, TPid) -> #diameter_packet{header = #diameter_header{is_error = E} = Hdr, msg = Rec} = Pkt, - RC = int(get_avp_value(?BASE, 'Result-Code', Rec)), + RC = int(get_avp_value(Dict, 'Result-Code', Rec)), PE = is_protocol_error(RC), %% Check that the E bit is set only for 3xxx result codes. @@ -2267,15 +2555,21 @@ incr(Dir, Pkt, TPid) orelse (E andalso PE) orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]), - Ctr = rc_counter(Rec, RC), - is_tuple(Ctr) - andalso incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}). + irc(TPid, Hdr, Dir, rc_counter(Dict, Rec, RC)). + +irc(_, _, _, undefined) -> + false; + +irc(TPid, Hdr, Dir, Ctr) -> + incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}). %% incr/2 incr(TPid, Counter) -> diameter_stats:incr(Counter, TPid, 1). +%% error_counter/2 + %% RFC 3588, 7.6: %% %% All Diameter answer messages defined in vendor-specific @@ -2285,26 +2579,27 @@ incr(TPid, Counter) -> %% Maintain statistics assuming one or the other, not both, which is %% surely the intent of the RFC. -rc_counter(_, RC) - when is_integer(RC) -> - {'Result-Code', RC}; -rc_counter(Rec, _) -> - rcc(get_avp_value(?BASE, 'Experimental-Result', Rec)). +rc_counter(Dict, Rec, undefined) -> + er(get_avp_value(Dict, 'Experimental-Result', Rec)); +rc_counter(_, _, RC) -> + {'Result-Code', RC}. %% Outgoing answers may be in any of the forms messages can be sent %% in. Incoming messages will be records. We're assuming here that the %% arity of the result code AVP's is 0 or 1. -rcc([{_,_,RC} = T]) - when is_integer(RC) -> +er([{_,_,N} = T | _]) + when is_integer(N) -> T; -rcc({_,_,RC} = T) - when is_integer(RC) -> +er({_,_,N} = T) + when is_integer(N) -> T; -rcc(_) -> +er(_) -> undefined. -int([N]) +%% Extract the first good looking integer. There's no guarantee +%% that what we're looking for has arity 1. +int([N|_]) when is_integer(N) -> N; int(N) @@ -2349,8 +2644,11 @@ rt(#request{packet = #diameter_packet{msg = undefined}}, _) -> false; %% TODO: Not what we should do. %% ... or not. -rt(#request{packet = #diameter_packet{msg = Msg}} = Req, S) -> - find_transport(get_destination(Msg), Req, S). +rt(#request{packet = #diameter_packet{msg = Msg}, + dictionary = Dict} + = Req, + S) -> + find_transport(get_destination(Dict, Msg), Req, S). %%% --------------------------------------------------------------------------- %%% # report_status/5 @@ -2383,7 +2681,7 @@ send_event(#diameter_event{service = SvcName} = E) -> %%% # share_peer/5 %%% --------------------------------------------------------------------------- -share_peer(up, Caps, Aliases, TPid, #state{share_peers = true, +share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _], service_name = Svc}) -> diameter_peer:notify(Svc, {peer, TPid, Aliases, Caps}); @@ -2394,11 +2692,11 @@ share_peer(_, _, _, _, _) -> %%% # share_peers/2 %%% --------------------------------------------------------------------------- -share_peers(Pid, #state{share_peers = true, +share_peers(Pid, #state{options = [_, {_, true} | _], local_peers = PDict}) -> ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict); -share_peers(_, #state{share_peers = false}) -> +share_peers(_, _) -> ok. sp(Pid, Alias, Peers) -> @@ -2408,39 +2706,31 @@ sp(Pid, Alias, Peers) -> %%% # remote_peer_up/4 %%% --------------------------------------------------------------------------- -remote_peer_up(Pid, Aliases, Caps, #state{use_shared_peers = true, +remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _], service = Svc, - shared_peers = PDict} - = S) -> + shared_peers = PDict}) -> #diameter_service{applications = Apps} = Svc, - Update = lists:filter(fun(A) -> - lists:keymember(A, #diameter_app.alias, Apps) - end, - Aliases), - S#state{shared_peers = rpu(Pid, Caps, PDict, Update)}; + Key = #diameter_app.alias, + As = lists:filter(fun(A) -> lists:keymember(A, Key, Apps) end, Aliases), + rpu(Pid, Caps, PDict, As); -remote_peer_up(_, _, _, #state{use_shared_peers = false} = S) -> - S. +remote_peer_up(_, _, _, #state{options = [_, _, {_, false} | _]}) -> + ok. rpu(_, _, PDict, []) -> PDict; rpu(Pid, Caps, PDict, Aliases) -> erlang:monitor(process, Pid), T = {Pid, Caps}, - lists:foldl(fun(A,D) -> ?Dict:append(A, T, D) end, - PDict, - Aliases). + lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases). %%% --------------------------------------------------------------------------- %%% # remote_peer_down/2 %%% --------------------------------------------------------------------------- -remote_peer_down(Pid, #state{use_shared_peers = true, - shared_peers = PDict} - = S) -> - S#state{shared_peers = lists:foldl(fun(A,D) -> rpd(Pid, A, D) end, - PDict, - ?Dict:fetch_keys(PDict))}. +remote_peer_down(Pid, #state{options = [_, _, {_, true} | _], + shared_peers = PDict}) -> + lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)). rpd(Pid, Alias, PDict) -> ?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict). @@ -2462,12 +2752,12 @@ find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) -> find_transport(#diameter_app{} = App, Msg, Opts, S) -> ft(App, Msg, Opts, S). -ft(#diameter_app{module = Mod} = App, Msg, Opts, S) -> +ft(#diameter_app{module = Mod, dictionary = Dict} = App, Msg, Opts, S) -> #options{filter = Filter, extra = Xtra} = Opts, pick_peer(App#diameter_app{module = Mod ++ Xtra}, - get_destination(Msg), + get_destination(Dict, Msg), Filter, S); ft(false = No, _, _, _) -> @@ -2503,11 +2793,11 @@ find_transport([_,_] = RH, Filter, S). -%% get_destination/1 +%% get_destination/2 -get_destination(Msg) -> - [str(get_avp_value(?BASE, 'Destination-Realm', Msg)), - str(get_avp_value(?BASE, 'Destination-Host', Msg))]. +get_destination(Dict, Msg) -> + [str(get_avp_value(Dict, 'Destination-Realm', Msg)), + str(get_avp_value(Dict, 'Destination-Host', Msg))]. %% This is not entirely correct. The avp could have an arity 1, in %% which case an empty list is a DiameterIdentity of length 0 rather @@ -2531,6 +2821,15 @@ str(T) -> %% question. The third form allows messages to be sent as is, without %% a dictionary, which is needed in the case of relay agents, for one. +%% Messages will be header/avps list as a relay and the only AVP's we +%% look for are in the common dictionary. This is required since the +%% relay dictionary doesn't inherit the common dictionary (which maybe +%% it should). +get_avp_value(?RELAY, Name, Msg) -> + get_avp_value(?BASE, Name, Msg); + +%% Message sent as a header/avps list, probably a relay case but not +%% necessarily. get_avp_value(Dict, Name, [#diameter_header{} | Avps]) -> try {Code, _, VId} = Dict:avp_header(Name), @@ -2544,6 +2843,7 @@ get_avp_value(Dict, Name, [#diameter_header{} | Avps]) -> undefined end; +%% Outgoing message as a name/values list. get_avp_value(_, Name, [_MsgName | Avps]) -> case lists:keyfind(Name, 1, Avps) of {_, V} -> @@ -2552,6 +2852,11 @@ get_avp_value(_, Name, [_MsgName | Avps]) -> undefined end; +%% Record might be an answer message in the common dictionary. +get_avp_value(Dict, Name, Rec) + when Dict /= ?BASE, element(1, Rec) == 'diameter_base_answer-message' -> + get_avp_value(?BASE, Name, Rec); + %% Message is typically a record but not necessarily: diameter:call/4 %% can be passed an arbitrary term. get_avp_value(Dict, Name, Rec) -> @@ -2746,20 +3051,59 @@ transports(#state{peerT = PeerT}) -> 'Vendor-Specific-Application-Id', 'Firmware-Revision']). +%% The config returned by diameter:service_info(SvcName, all). -define(ALL_INFO, [capabilities, applications, transport, pending, - statistics]). + options]). + +%% The rest. +-define(OTHER_INFO, [connections, + name, + peers, + statistics]). -service_info(Items, S) - when is_list(Items) -> - [{complete(I), service_info(I,S)} || I <- Items]; service_info(Item, S) when is_atom(Item) -> - service_info(Item, S, true). + case tagged_info(Item, S) of + {_, T} -> T; + undefined = No -> No + end; + +service_info(Items, S) -> + tagged_info(Items, S). + +tagged_info(Item, S) + when is_atom(Item) -> + case complete(Item) of + {value, I} -> + {I, complete_info(I,S)}; + false -> + undefined + end; -service_info(Item, #state{service = Svc} = S, Complete) -> +tagged_info(TPid, #state{peerT = PT, connT = CT}) + when is_pid(TPid) -> + try + [#conn{peer = Pid}] = ets:lookup(CT, TPid), + [#peer{ref = Ref, type = Type, options = Opts}] = ets:lookup(PT, Pid), + [{ref, Ref}, + {type, Type}, + {options, Opts}] + catch + error:_ -> + [] + end; + +tagged_info(Items, S) + when is_list(Items) -> + [T || I <- Items, T <- [tagged_info(I,S)], T /= undefined, T /= []]; + +tagged_info(_, _) -> + undefined. + +complete_info(Item, #state{service = Svc} = S) -> case Item of name -> S#state.service_name; @@ -2802,85 +3146,176 @@ service_info(Item, #state{service = Svc} = S, Complete) -> capabilities -> service_info(?CAP_INFO, S); applications -> info_apps(S); transport -> info_transport(S); + options -> info_options(S); pending -> info_pending(S); - statistics -> info_stats(S); - keys -> ?ALL_INFO ++ ?CAP_INFO; %% mostly for test + keys -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO; all -> service_info(?ALL_INFO, S); - _ when Complete -> service_info(complete(Item), S, false); - _ -> undefined + statistics -> info_stats(S); + connections -> info_connections(S); + peers -> info_peers(S) end. +complete(I) + when I == keys; + I == all -> + {value, I}; complete(Pre) -> P = atom_to_list(Pre), - case [I || I <- [name | ?ALL_INFO] ++ ?CAP_INFO, + case [I || I <- ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO, lists:prefix(P, atom_to_list(I))] of - [I] -> I; - _ -> Pre + [I] -> {value, I}; + _ -> false end. +%% info_stats/1 + info_stats(#state{peerT = PeerT}) -> - Peers = ets:select(PeerT, [{#peer{ref = '$1', conn = '$2', _ = '_'}, - [{'is_pid', '$2'}], - [['$1', '$2']]}]), - diameter_stats:read(lists:append(Peers)). -%% TODO: include peer identities in return value - -info_transport(#state{peerT = PeerT, connT = ConnT}) -> - dict:fold(fun it/3, + MatchSpec = [{#peer{ref = '$1', conn = '$2', _ = '_'}, + [{'is_pid', '$2'}], + [['$1', '$2']]}], + try ets:select(PeerT, MatchSpec) of + L -> + diameter_stats:read(lists:append(L)) + catch + error: badarg -> [] %% service has gone down + end. + +%% info_transport/1 +%% +%% One entry per configured transport. Statistics for each entry are +%% the accumulated values for the ref and associated peer pids. + +info_transport(S) -> + PeerD = peer_dict(S, config_dict(S)), + RefsD = dict:map(fun(_, Ls) -> [P || L <- Ls, {peer, {P,_}} <- L] end, + PeerD), + Refs = lists:append(dict:fold(fun(R, Ps, A) -> [[R|Ps] | A] end, + [], + RefsD)), + Stats = diameter_stats:read(Refs), + dict:fold(fun(R, Ls, A) -> + Ps = dict:fetch(R, RefsD), + [[{ref, R} | transport(Ls)] ++ [stats([R|Ps], Stats)] + | A] + end, [], - ets:foldl(fun(T,A) -> it_acc(ConnT, A, T) end, - dict:new(), - PeerT)). - -it(Ref, [[{type, connect} | _] = L], Acc) -> - [[{ref, Ref} | L] | Acc]; -it(Ref, [[{type, accept}, {options, Opts} | _] | _] = L, Acc) -> - [[{ref, Ref}, - {type, listen}, - {options, Opts}, - {accept, [lists:nthtail(2,A) || A <- L]}] - | Acc]. -%% Each entry has the same Opts. (TODO) - -it_acc(ConnT, Acc, #peer{pid = Pid, - type = Type, - ref = Ref, - options = Opts, - op_state = OS, - started = T, - conn = TPid}) -> + PeerD). + +%% Only a config entry for a listening transport: use it. +transport([[{type, listen}, _] = L]) -> + L ++ [{accept, []}]; + +%% Only one config or peer entry for a connecting transport: use it. +transport([[{type, connect} | _] = L]) -> + L; + +%% Peer entries: discard config. Note that the peer entries have +%% length at least 3. +transport([[_,_] | L]) -> + transport(L); + +%% Possibly many peer entries for a listening transport. Note that all +%% have the same options by construction, which is not terribly space +%% efficient. (TODO: all entries for the same Ref should share options.) +transport([[{type, accept}, {options, Opts} | _] | _] = Ls) -> + [{type, listen}, + {options, Opts}, + {accept, [lists:nthtail(2,L) || L <- Ls]}]. + +peer_dict(#state{peerT = PeerT, connT = ConnT}, Dict0) -> + try ets:tab2list(PeerT) of + L -> + lists:foldl(fun(T,A) -> peer_acc(ConnT, A, T) end, Dict0, L) + catch + error: badarg -> Dict0 %% service has gone down + end. + +peer_acc(ConnT, Acc, #peer{pid = Pid, + type = Type, + ref = Ref, + options = Opts, + op_state = OS, + started = T, + conn = TPid}) -> + WS = wd_state(OS), dict:append(Ref, [{type, Type}, {options, Opts}, - {watchdog, {Pid, T, OS}} - | info_conn(ConnT, TPid)], + {watchdog, {Pid, T, WS}} + | info_conn(ConnT, TPid, WS /= ?WD_DOWN)], Acc). -info_conn(ConnT, TPid) -> - info_conn(ets:lookup(ConnT, TPid)). +info_conn(ConnT, TPid, true) + when is_pid(TPid) -> + try ets:lookup(ConnT, TPid) of + T -> info_conn(T) + catch + error: badarg -> [] %% service has gone down + end; +info_conn(_, _, _) -> + []. + +%% The point of extracting the config here is so that 'transport' info +%% has one entry for each transport ref, the peer table only +%% containing entries that have a living watchdog. + +config_dict(#state{service_name = SvcName}) -> + lists:foldl(fun config_acc/2, + dict:new(), + diameter_config:lookup(SvcName)). + +config_acc({Ref, T, Opts}, Dict) + when T == listen; + T == connect -> + dict:store(Ref, [[{type, T}, {options, Opts}]], Dict); +config_acc(_, Dict) -> + Dict. + +wd_state({_,S}) -> + S; +wd_state(?STATE_UP) -> + ?WD_OKAY; +wd_state(?STATE_DOWN) -> + ?WD_DOWN. info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) -> [{peer, {Pid, T}}, {apps, SApps}, - {caps, info_caps(Caps)}]; + {caps, info_caps(Caps)} + | try [{port, info_port(Pid)}] catch _:_ -> [] end]; info_conn([] = No) -> No. +%% Extract information that the processes involved are expected to +%% "publish" in their process dictionaries. Simple but backhanded. +info_port(Pid) -> + {_, PD} = process_info(Pid, dictionary), + {_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD), + {TPid, {_Type, TMod, _Cfg}} = T, + {_, TD} = process_info(TPid, dictionary), + {_, Data} = lists:keyfind({TMod, info}, 1, TD), + [{owner, TPid}, + {module, TMod} + | try TMod:info(Data) catch _:_ -> [] end]. + +%% Use the fields names from diameter_caps instead of +%% diameter_base_CER to distinguish between the 2-tuple values +%% compared to the single capabilities values. Note also that the +%% returned list is tagged 'caps' rather than 'capabilities' to +%% emphasize the difference. info_caps(#diameter_caps{} = C) -> lists:zip(record_info(fields, diameter_caps), tl(tuple_to_list(C))). info_apps(#state{service = #diameter_service{applications = Apps}}) -> lists:map(fun mk_app/1, Apps). -mk_app(#diameter_app{alias = Alias, - dictionary = Dict, - module = ModX, - id = Id}) -> - [{alias, Alias}, - {dictionary, Dict}, - {module, ModX}, - {id, Id}]. +mk_app(#diameter_app{} = A) -> + lists:zip(record_info(fields, diameter_app), tl(tuple_to_list(A))). + +%% info_pending/1 +%% +%% One entry for each outgoing request whose answer is outstanding. info_pending(#state{} = S) -> MatchSpec = [{{'$1', @@ -2894,4 +3329,67 @@ info_pending(#state{} = S) -> {{transport, '$2'}}, {{from, '$3'}}]}}]}], - ets:select(?REQUEST_TABLE, MatchSpec). + try + ets:select(?REQUEST_TABLE, MatchSpec) + catch + error: badarg -> [] %% service has gone down + end. + +%% info_connections/1 +%% +%% One entry per transport connection. Statistics for each entry are +%% for the peer pid only. + +info_connections(S) -> + ConnL = conn_list(S), + Stats = diameter_stats:read([P || L <- ConnL, {peer, {P,_}} <- L]), + [L ++ [stats([P], Stats)] || L <- ConnL, {peer, {P,_}} <- L]. + +conn_list(S) -> + lists:append(dict:fold(fun conn_acc/3, [], peer_dict(S, dict:new()))). + +conn_acc(Ref, Peers, Acc) -> + [[[{ref, Ref} | L] || L <- Peers, lists:keymember(peer, 1, L)] + | Acc]. + +stats(Refs, Stats) -> + {statistics, dict:to_list(lists:foldl(fun(R,D) -> + stats_acc(R, D, Stats) + end, + dict:new(), + Refs))}. + +stats_acc(Ref, Dict, Stats) -> + lists:foldl(fun({C,N}, D) -> dict:update_counter(C, N, D) end, + Dict, + proplists:get_value(Ref, Stats, [])). + +%% info_peers/1 +%% +%% One entry per peer Origin-Host. Statistics for each entry are +%% accumulated values for all peer pids. + +info_peers(S) -> + {PeerD, RefD} = lists:foldl(fun peer_acc/2, + {dict:new(), dict:new()}, + conn_list(S)), + Refs = lists:append(dict:fold(fun(_, Rs, A) -> [Rs|A] end, + [], + RefD)), + Stats = diameter_stats:read(Refs), + dict:fold(fun(OH, Cs, A) -> + Rs = dict:fetch(OH, RefD), + [{OH, [{connections, Cs}, stats(Rs, Stats)]} | A] + end, + [], + PeerD). + +peer_acc(Peer, {PeerD, RefD}) -> + [{TPid, _}, [{origin_host, {_, OH}} | _]] + = [proplists:get_value(K, Peer) || K <- [peer, caps]], + {dict:append(OH, Peer, PeerD), dict:append(OH, TPid, RefD)}. + +%% info_options/1 + +info_options(S) -> + S#state.options. diff --git a/lib/diameter/src/base/diameter_session.erl b/lib/diameter/src/base/diameter_session.erl index 4c468f207c..3b236f109a 100644 --- a/lib/diameter/src/base/diameter_session.erl +++ b/lib/diameter/src/base/diameter_session.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -20,6 +20,7 @@ -module(diameter_session). -export([sequence/0, + sequence/1, session_id/1, origin_state_id/0]). @@ -30,7 +31,7 @@ -define(INT32, 16#FFFFFFFF). %% --------------------------------------------------------------------------- -%% # sequence/0 +%% # sequence/0-1 %% %% Output: 32-bit %% --------------------------------------------------------------------------- @@ -77,6 +78,15 @@ sequence() -> Instr = {_Pos = 2, _Incr = 1, _Threshold = ?INT32, _SetVal = 0}, ets:update_counter(diameter_sequence, sequence, Instr). +-spec sequence(diameter:sequence()) + -> diameter:'Unsigned32'(). + +sequence({_,32}) -> + sequence(); + +sequence({H,N}) -> + (H bsl N) bor (sequence() band (1 bsl N - 1)). + %% --------------------------------------------------------------------------- %% # origin_state_id/0 %% --------------------------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl index 71479afa95..70727d068e 100644 --- a/lib/diameter/src/base/diameter_stats.erl +++ b/lib/diameter/src/base/diameter_stats.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -22,14 +22,13 @@ %% -module(diameter_stats). --compile({no_auto_import, [monitor/2]}). -behaviour(gen_server). --export([reg/1, reg/2, - incr/1, incr/2, incr/3, +-export([reg/2, reg/1, + incr/3, incr/1, read/1, - flush/0, flush/1]). + flush/1]). %% supervisor callback -export([start_link/0]). @@ -48,123 +47,105 @@ -include("diameter_internal.hrl"). -%% ets table containing stats. reg(Pid, Ref) inserts a {Pid, Ref}, -%% incr(Counter, X, N) updates the counter keyed at {Counter, X}, and -%% Pid death causes counters keyed on {Counter, Pid} to be deleted and -%% added to those keyed on {Counter, Ref}. +%% ets table containing 2-tuple stats. reg(Pid, Ref) inserts a {Pid, +%% Ref}, incr(Counter, X, N) updates the counter keyed at {Counter, +%% X}, and Pid death causes counters keyed on {Counter, Pid} to be +%% deleted and added to those keyed on {Counter, Ref}. -define(TABLE, ?MODULE). %% Name of registered server. -define(SERVER, ?MODULE). -%% Entries in the table. --define(REC(Key, Value), {Key, Value}). - %% Server state. -record(state, {id = now()}). -type counter() :: any(). --type contrib() :: any(). - -%%% --------------------------------------------------------------------------- -%%% # reg(Pid, Contrib) -%%% -%%% Description: Register a process as a contributor of statistics -%%% associated with a specified term. Statistics can be -%%% contributed by specifying either Pid or Contrib as -%%% the second argument to incr/3. Statistics contributed -%%% by Pid are folded into the corresponding entry for -%%% Contrib when the process dies. -%%% -%%% Contrib can be any term but should not be a pid -%%% passed as the first argument to reg/2. Subsequent -%%% registrations for the same Pid overwrite the association -%%% --------------------------------------------------------------------------- - --spec reg(pid(), contrib()) - -> true. +-type ref() :: any(). + +%% --------------------------------------------------------------------------- +%% # reg(Pid, Ref) +%% +%% Register a process as a contributor of statistics associated with a +%% specified term. Statistics can be contributed by specifying either +%% Pid or Ref as the second argument to incr/3. Statistics contributed +%% by Pid are folded into the corresponding entry for Ref when the +%% process dies. +%% --------------------------------------------------------------------------- + +-spec reg(pid(), ref()) + -> boolean(). -reg(Pid, Contrib) +reg(Pid, Ref) when is_pid(Pid) -> - call({reg, Pid, Contrib}). + call({reg, Pid, Ref}). --spec reg(contrib()) +-spec reg(ref()) -> true. reg(Ref) -> reg(self(), Ref). -%%% --------------------------------------------------------------------------- -%%% # incr(Counter, Contrib, N) -%%% -%%% Description: Increment a counter for the specified contributor. -%%% -%%% Contrib will typically be an argument passed to reg/2 -%%% but there's nothing that requires this. In particular, -%%% if Contrib is a pid that hasn't been registered then -%%% counters are unaffected by the death of the process. -%%% --------------------------------------------------------------------------- - --spec incr(counter(), contrib(), integer()) - -> integer(). +%% --------------------------------------------------------------------------- +%% # incr(Counter, Ref, N) +%% +%% Increment a counter for the specified contributor. +%% +%% Ref will typically be an argument passed to reg/2 but there's +%% nothing that requires this. Only registered pids can contribute +%% counters however, otherwise incr/3 is a no-op. +%% --------------------------------------------------------------------------- -incr(Ctr, Contrib, N) -> - update_counter({Ctr, Contrib}, N). +-spec incr(counter(), ref(), integer()) + -> integer() | false. -incr(Ctr, N) +incr(Ctr, Ref, N) when is_integer(N) -> - incr(Ctr, self(), N); - -incr(Ctr, Contrib) -> - incr(Ctr, Contrib, 1). + update_counter({Ctr, Ref}, N). incr(Ctr) -> incr(Ctr, self(), 1). -%%% --------------------------------------------------------------------------- -%%% # read(Contribs) -%%% -%%% Description: Retrieve counters for the specified contributors. -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # read(Refs) +%% +%% Retrieve counters for the specified contributors. +%% --------------------------------------------------------------------------- --spec read([contrib()]) - -> [{contrib(), [{counter(), integer()}]}]. +-spec read([ref()]) + -> [{ref(), [{counter(), integer()}]}]. -read(Contribs) -> - lists:foldl(fun(?REC({T,C}, N), D) -> orddict:append(C, {T,N}, D) end, +read(Refs) -> + read(Refs, false). + +read(Refs, B) -> + MatchSpec = [{{{'_', '$1'}, '_'}, + [?ORCOND([{'=:=', '$1', {const, R}} + || R <- Refs])], + ['$_']}], + L = ets:select(?TABLE, MatchSpec), + B andalso delete(L), + lists:foldl(fun({{C,R}, N}, D) -> orddict:append(R, {C,N}, D) end, orddict:new(), - ets:select(?TABLE, [{?REC({'_', '$1'}, '_'), - [?ORCOND([{'=:=', '$1', {const, C}} - || C <- Contribs])], - ['$_']}])). - -%%% --------------------------------------------------------------------------- -%%% # flush(Contrib) -%%% -%%% Description: Retrieve and delete statistics for the specified -%%% contributor. -%%% -%%% If Contrib is a pid registered with reg/2 then statistics -%%% for both and its associated contributor are retrieved. -%%% --------------------------------------------------------------------------- - --spec flush(contrib()) - -> [{counter(), integer()}]. + L). + +%% --------------------------------------------------------------------------- +%% # flush(Refs) +%% +%% Retrieve and delete statistics for the specified contributors. +%% --------------------------------------------------------------------------- + +-spec flush([ref()]) + -> [{ref(), {counter(), integer()}}]. -flush(Contrib) -> +flush(Refs) -> try - call({flush, Contrib}) + call({flush, Refs}) catch exit: _ -> [] end. -flush() -> - flush(self()). - -%%% --------------------------------------------------------- -%%% EXPORTED INTERNAL FUNCTIONS -%%% --------------------------------------------------------- +%% =========================================================================== start_link() -> ServerName = {local, ?SERVER}, @@ -179,18 +160,16 @@ state() -> uptime() -> call(uptime). -%%% ---------------------------------------------------------- -%%% # init(_) -%%% -%%% Output: {ok, State} -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # init/1 +%% ---------------------------------------------------------- init([]) -> ets:new(?TABLE, [named_table, ordered_set, public]), {ok, #state{}}. %% ---------------------------------------------------------- -%% handle_call(Request, From, State) +%% # handle_call/3 %% ---------------------------------------------------------- handle_call(state, _, State) -> @@ -199,31 +178,31 @@ handle_call(state, _, State) -> handle_call(uptime, _, #state{id = Time} = State) -> {reply, diameter_lib:now_diff(Time), State}; -handle_call({reg, Pid, Contrib}, _From, State) -> - monitor(not ets:member(?TABLE, Pid), Pid), - {reply, insert(?REC(Pid, Contrib)), State}; +handle_call({incr, T}, _, State) -> + {reply, update_counter(T), State}; -handle_call({flush, Contrib}, _From, State) -> - {reply, fetch(Contrib), State}; +handle_call({reg, Pid, Ref}, _From, State) -> + B = ets:insert_new(?TABLE, {Pid, Ref}), + B andalso erlang:monitor(process, Pid), + {reply, B, State}; + +handle_call({flush, Refs}, _From, State) -> + {reply, read(Refs, true), State}; handle_call(Req, From, State) -> ?UNEXPECTED([Req, From]), {reply, nok, State}. %% ---------------------------------------------------------- -%% handle_cast(Request, State) +%% # handle_cast/2 %% ---------------------------------------------------------- -handle_cast({incr, Rec}, State) -> - update_counter(Rec), - {noreply, State}; - handle_cast(Msg, State) -> ?UNEXPECTED([Msg]), {noreply, State}. %% ---------------------------------------------------------- -%% handle_info(Request, State) +%% # handle_info/2 %% ---------------------------------------------------------- handle_info({'DOWN', _MRef, process, Pid, _}, State) -> @@ -235,91 +214,62 @@ handle_info(Info, State) -> {noreply, State}. %% ---------------------------------------------------------- -%% terminate(Reason, State) +%% # terminate/2 %% ---------------------------------------------------------- terminate(_Reason, _State) -> ok. %% ---------------------------------------------------------- -%% code_change(OldVsn, State, Extra) +%% # code_change/3 %% ---------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%% --------------------------------------------------------- -%%% INTERNAL FUNCTIONS -%%% --------------------------------------------------------- - -%% monitor/2 - -monitor(true, Pid) -> - erlang:monitor(process, Pid); -monitor(false = No, _) -> - No. +%% =========================================================================== %% down/1 down(Pid) -> - L = ets:match_object(?TABLE, ?REC({'_', Pid}, '_')), - [?REC(_, Ref) = T] = lookup(Pid), + down(lookup(Pid), ets:match_object(?TABLE, {{'_', Pid}, '_'})). + +down([{_, Ref} = T], L) -> fold(Ref, L), - delete_object(T), + delete([T|L]); +down([], L) -> %% flushed delete(L). -%% Fold Pid-based entries into Ref-based ones. +%% Fold pid-based entries into ref-based ones. fold(Ref, L) -> - lists:foreach(fun(?REC({K, _}, V)) -> update_counter({{K, Ref}, V}) end, - L). - -delete(Objs) -> - lists:foreach(fun delete_object/1, Objs). - -%% fetch/1 - -fetch(X) -> - MatchSpec = [{?REC({'_', '$1'}, '_'), - [?ORCOND([{'==', '$1', {const, T}} || T <- [X | ref(X)]])], - ['$_']}], - L = ets:select(?TABLE, MatchSpec), - delete(L), - D = lists:foldl(fun sum/2, dict:new(), L), - dict:to_list(D). - -sum({{Ctr, _}, N}, Dict) -> - dict:update(Ctr, fun(V) -> V+N end, N, Dict). - -ref(Pid) - when is_pid(Pid) -> - ets:select(?TABLE, [{?REC(Pid, '$1'), [], ['$1']}]); -ref(_) -> - []. + lists:foreach(fun({{K, _}, V}) -> update_counter({{K, Ref}, V}) end, L). %% update_counter/2 %% -%% From an arbitrary request process. Cast to the server process to -%% insert a new element if the counter doesn't exists so that two -%% processes don't do so simultaneously. +%% From an arbitrary process. Call to the server process to insert a +%% new element if the counter doesn't exists so that two processes +%% don't insert simultaneously. update_counter(Key, N) -> try ets:update_counter(?TABLE, Key, N) catch error: badarg -> - cast({incr, ?REC(Key, N)}) + call({incr, {Key, N}}) end. %% update_counter/1 %% -%% From the server process. +%% From the server process, when update_counter/2 failed due to a +%% non-existent entry. -update_counter(?REC(Key, N) = T) -> +update_counter({{_Ctr, Ref} = Key, N} = T) -> try ets:update_counter(?TABLE, Key, N) catch error: badarg -> - insert(T) + (not is_pid(Ref) orelse ets:member(?TABLE, Ref)) + andalso begin insert(T), N end end. insert(T) -> @@ -328,13 +278,8 @@ insert(T) -> lookup(Key) -> ets:lookup(?TABLE, Key). -delete_object(T) -> - ets:delete_object(?TABLE, T). - -%% cast/1 - -cast(Msg) -> - gen_server:cast(?SERVER, Msg). +delete(Objs) -> + lists:foreach(fun({K,_}) -> ets:delete(?TABLE, K) end, Objs). %% call/1 diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index fb22fd8275..243ad0a986 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -43,20 +43,24 @@ -include("diameter_internal.hrl"). -define(DEFAULT_TW_INIT, 30000). %% RFC 3539 ch 3.4.1 +-define(NOMASK, {0,32}). %% default sequence mask -record(watchdog, {%% PCB - Peer Control Block; see RFC 3539, Appendix A status = initial :: initial | okay | suspect | down | reopen, - pending = false :: boolean(), + pending = false :: boolean(), %% DWA tw :: 6000..16#FFFFFFFF | {module(), atom(), list()}, %% {M,F,A} -> integer() >= 0 num_dwa = 0 :: -1 | non_neg_integer(), %% number of DWAs received during reopen %% end PCB - parent = self() :: pid(), - transport :: pid(), + parent = self() :: pid(), %% service process + transport :: pid() | undefined, %% peer_fsm process tref :: reference(), %% reference for current watchdog timer - message_data}). %% term passed into diameter_service with message + message_data, %% term passed into diameter_service with message + sequence :: diameter:sequence(), %% mask + restrict :: {diameter:restriction(), boolean()}, + shutdown = false :: boolean()}). %% start/2 %% @@ -64,6 +68,13 @@ %% that a failed capabilities exchange produces the desired exit %% reason. +-spec start(Type, {RecvData, [Opt], SvcName, #diameter_service{}}) + -> {reference(), pid()} + when Type :: {connect|accept, diameter:transport_ref()}, + RecvData :: term(), + Opt :: diameter:transport_opt(), + SvcName :: diameter:service_name(). + start({_,_} = Type, T) -> Ref = make_ref(), {ok, Pid} = diameter_watchdog_sup:start_child({Ref, {Type, self(), T}}), @@ -102,7 +113,7 @@ i({_, Pid, _} = T) -> %% from old code erlang:monitor(process, Pid), make_state(T). -make_state({T, Pid, {ConnT, +make_state({T, Pid, {RecvData, Opts, SvcName, #diameter_service{applications = Apps, @@ -111,12 +122,23 @@ make_state({T, Pid, {ConnT, random:seed(now()), putr(restart, {T, Opts, Svc}), %% save seeing it in trace putr(dwr, dwr(Caps)), %% + {_,_} = Mask = call(Pid, sequence), + Restrict = call(Pid, restriction), + Nodes = restrict_nodes(Restrict), #watchdog{parent = Pid, - transport = monitor(diameter_peer_fsm:start(T, Opts, Svc)), + transport = monitor(diameter_peer_fsm:start(T, + Opts, + {Mask, Nodes, Svc})), tw = proplists:get_value(watchdog_timer, Opts, ?DEFAULT_TW_INIT), - message_data = {ConnT, SvcName, Apps}}. + message_data = {RecvData, SvcName, Apps, Mask}, + sequence = Mask, + restrict = {Restrict, lists:member(node(), Nodes)}}. + +%% Retrieve the sequence mask from the parent from the parent, rather +%% than having it passed into init/1, for upgrade reasons: the call to +%% diameter_service:receive_message/3 passes back the mask. %% handle_call/3 @@ -130,17 +152,46 @@ handle_cast(_, State) -> %% handle_info/2 -handle_info(T, State) -> +handle_info(T, #watchdog{} = State) -> case transition(T, State) of ok -> {noreply, State}; - #watchdog{status = X} = S -> - ?LOGC(X =/= State#watchdog.status, transition, X), + #watchdog{} = S -> + event(State, S), {noreply, S}; stop -> ?LOG(stop, T), + event(State, State#watchdog{status = down}), {stop, {shutdown, T}, State} - end. + end; + +handle_info(T, S) -> + handle_info(T, upgrade(S)). + +upgrade(S) -> + #watchdog{} = list_to_tuple(tuple_to_list(S) + ++ [?NOMASK, {nodes, true}, false]). + +event(#watchdog{status = T}, #watchdog{status = T}) -> + ok; + +event(#watchdog{transport = undefined}, #watchdog{transport = undefined}) -> + ok; + +event(#watchdog{status = From, transport = F, parent = Pid}, + #watchdog{status = To, transport = T}) -> + E = {tpid(F,T), From, To}, + notify(Pid, E), + ?LOG(transition, {self(), E}). + +tpid(_, Pid) + when is_pid(Pid) -> + Pid; +tpid(Pid, _) -> + Pid. + +notify(Pid, E) -> + Pid ! {watchdog, self(), E}. %% terminate/2 @@ -176,9 +227,10 @@ transition({shutdown, Pid}, #watchdog{parent = Pid, down = S, %% sanity check stop; transition({shutdown = T, Pid}, #watchdog{parent = Pid, - transport = TPid}) -> + transport = TPid} + = S) -> TPid ! {T, self()}, - ok; + S#watchdog{shutdown = true}; %% Parent process has died, transition({'DOWN', _, process, Pid, _Reason}, @@ -212,9 +264,10 @@ transition({close, TPid, _Reason}, #watchdog{transport = TPid}) -> transition({open, TPid, Hosts, T} = Open, #watchdog{transport = TPid, status = initial, - parent = Pid} + parent = Pid, + restrict = {_, R}} = S) -> - case okay(getr(restart), Hosts) of + case okay(getr(restart), Hosts, R) of okay -> open(Pid, {TPid, T}), set_watchdog(S#watchdog{status = okay}); @@ -229,12 +282,15 @@ transition({open, TPid, Hosts, T} = Open, transition({open = P, TPid, _Hosts, T}, #watchdog{transport = TPid, + parent = Pid, status = down} = S) -> %% Store the info we need to notify the parent to reopen the %% connection after the requisite DWA's are received, at which - %% time we eraser(open). + %% time we eraser(open). The reopen message is a later addition, + %% to communicate the new capabilities as soon as they're known. putr(P, {TPid, T}), + Pid ! {reopen, self(), {TPid, T}}, set_watchdog(send_watchdog(S#watchdog{status = reopen, num_dwa = 0})); @@ -248,11 +304,14 @@ transition({open = P, TPid, _Hosts, T}, transition({'DOWN', _, process, TPid, _}, #watchdog{transport = TPid, - status = initial}) -> + status = S, + shutdown = D}) + when S == initial; + D -> stop; -transition({'DOWN', _, process, Pid, _}, - #watchdog{transport = Pid} +transition({'DOWN', _, process, TPid, _}, + #watchdog{transport = TPid} = S) -> failover(S), close(S), @@ -283,6 +342,15 @@ transition({state, Pid}, #watchdog{status = S}) -> %% =========================================================================== +%% Only call "upwards", to the parent service. +call(Pid, Req) -> + try + gen_server:call(Pid, Req, infinity) + catch + exit: Reason -> + exit({shutdown, {Req, Reason}}) + end. + monitor(Pid) -> erlang:monitor(process, Pid), Pid. @@ -296,26 +364,36 @@ getr(Key) -> eraser(Key) -> erase({?MODULE, Key}). -%% encode/1 +%% encode/2 -encode(Msg) -> - #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Msg), +encode(Msg, Mask) -> + Seq = diameter_session:sequence(Mask), + Hdr = #diameter_header{version = ?DIAMETER_VERSION, + end_to_end_id = Seq, + hop_by_hop_id = Seq}, + Pkt = #diameter_packet{header = Hdr, + msg = Msg}, + #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt), Bin. -%% okay/2 +%% okay/3 -okay({{accept, Ref}, _, _}, Hosts) -> +okay({{accept, Ref}, _, _}, Hosts, Restrict) -> T = {?MODULE, connection, Ref, Hosts}, diameter_reg:add(T), - okay(diameter_reg:match(T)); + if Restrict -> + okay(diameter_reg:match(T)); + true -> + okay + end; %% Register before matching so that at least one of two registering -%% processes will match the other. (Which can't happen as long as -%% diameter_peer_fsm guarantees at most one open connection to the same -%% peer.) +%% processes will match the other. -okay({{connect, _}, _, _}, _) -> +okay({{connect, _}, _, _}, _, _) -> okay. +%% okay/2 + %% The peer hasn't been connected recently ... okay([{_,P}]) -> P = self(), %% assert @@ -371,9 +449,10 @@ close(#watchdog{parent = Pid}) -> %% send_watchdog/1 send_watchdog(#watchdog{pending = false, - transport = TPid} + transport = TPid, + sequence = Mask} = S) -> - TPid ! {send, encode(getr(dwr))}, + TPid ! {send, encode(getr(dwr), Mask)}, ?LOG(send, 'DWR'), S#watchdog{pending = true}. @@ -385,7 +464,7 @@ recv(Name, Pkt, S) -> rcv(Name, Pkt, S), NS catch - throw: {?MODULE, throwaway, #watchdog{} = NS} -> + {?MODULE, throwaway, #watchdog{} = NS} -> NS end. @@ -408,6 +487,14 @@ throwaway(S) -> throw({?MODULE, throwaway, S}). %% rcv/2 +%% +%% The lack of Hop-by-Hop and End-to-End Identifiers checks in a +%% received DWA is intentional. The purpose of the message is to +%% demonstrate life but a peer that consistently bungles it by sending +%% the wrong identifiers causes the connection to toggle between OPEN +%% and SUSPECT, with failover and failback as result, despite there +%% being no real problem with connectivity. Thus, relax and accept any +%% incoming DWA as being in response to an outgoing DWR. %% INITIAL Receive DWA Pending = FALSE %% Throwaway() INITIAL @@ -526,7 +613,7 @@ timeout(#watchdog{status = T, = S) when T == suspect; T == reopen, P, N < 0 -> - exit(TPid, shutdown), + exit(TPid, {shutdown, watchdog_timeout}), close(S), S#watchdog{status = down}; @@ -571,19 +658,40 @@ restart(#watchdog{transport = undefined} = S) -> restart(S) -> S. +%% restart/2 +%% %% Only restart the transport in the connecting case. For an accepting -%% transport, we've registered the peer connection when leaving state -%% initial and this is used by a new accepting process to realize that -%% it's actually in state down rather then initial when receiving -%% notification of an open connection. - -restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid} = S) -> +%% transport, there's no guarantee that an accepted connection in a +%% restarted transport if from the peer we've lost contact with so +%% have to be prepared for another watchdog to handle it. This is what +%% the diameter_reg registration in this module is for: the peer +%% connection is registered when leaving state initial and this is +%% used by a new accepting watchdog to realize that it's actually in +%% state down rather then initial when receiving notification of an +%% open connection. + +restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid, + sequence = Mask, + restrict = {R,_}} + = S) -> Pid ! {reconnect, self()}, - S#watchdog{transport = monitor(diameter_peer_fsm:start(T, Opts, Svc))}; + Nodes = restrict_nodes(R), + S#watchdog{transport = monitor(diameter_peer_fsm:start(T, + Opts, + {Mask, Nodes, Svc})), + restrict = {R, lists:member(node(), Nodes)}}; + +%% No restriction on the number of connections to the same peer: just +%% die. Note that a state machine never enters state REOPEN in this +%% case. +restart({{accept, _}, _, _}, #watchdog{restrict = {_, false}}) -> + stop; + +%% Otherwise hang around until told to die. restart({{accept, _}, _, _}, S) -> S. -%% Don't currently use Opts/Svc in the accept case but having them in -%% the process dictionary is helpful if the process dies unexpectedly. + +%% Don't currently use Opts/Svc in the accept case. %% dwr/1 @@ -593,3 +701,22 @@ dwr(#diameter_caps{origin_host = OH, ['DWR', {'Origin-Host', OH}, {'Origin-Realm', OR}, {'Origin-State-Id', OSI}]. + +%% restrict_nodes/1 + +restrict_nodes(false) -> + []; + +restrict_nodes(nodes) -> + [node() | nodes()]; + +restrict_nodes(node) -> + [node()]; + +restrict_nodes(Nodes) + when [] == Nodes; + is_atom(hd(Nodes)) -> + Nodes; + +restrict_nodes(F) -> + diameter_lib:eval(F). |