diff options
Diffstat (limited to 'lib/diameter/src/base')
-rw-r--r-- | lib/diameter/src/base/diameter.erl | 16 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_capx.erl | 6 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_codec.erl | 3 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_config.erl | 83 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer.erl | 2 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 82 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_reg.erl | 255 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 735 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_session.erl | 14 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 144 |
10 files changed, 895 insertions, 445 deletions
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index 4f90b741ae..3e3a6be0ef 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -44,6 +44,8 @@ stop/0]). -export_type([evaluable/0, + restriction/0, + sequence/0, app_alias/0, service_name/0, capability/0, @@ -280,11 +282,23 @@ call(SvcName, App, Message) -> | fun() | maybe_improper_list(evaluable(), list()). +-type sequence() + :: {'Unsigned32'(), 0..32}. + +-type restriction() + :: false + | node + | nodes + | [node()] + | evaluable(). + %% Options passed to start_service/2 -type service_opt() :: capability() - | {application, [application_opt()]}. + | {application, [application_opt()]} + | {restrict_connections, restriction()} + | {sequence, sequence() | evaluable()}. -type application_opt() :: {alias, app_alias()} diff --git a/lib/diameter/src/base/diameter_capx.erl b/lib/diameter/src/base/diameter_capx.erl index 6c4d60ee9b..190d37262b 100644 --- a/lib/diameter/src/base/diameter_capx.erl +++ b/lib/diameter/src/base/diameter_capx.erl @@ -141,7 +141,9 @@ cap('Host-IP-Address', Vs) when is_list(Vs) -> lists:map(fun ipaddr/1, Vs); -cap('Firmware-Revision', V) -> +cap(K, V) + when K == 'Firmware-Revision'; + K == 'Origin-State-Id' -> [V]; cap(_, Vs) @@ -149,7 +151,7 @@ cap(_, Vs) Vs; cap(K, V) -> - ?THROW({invalid, K, V}). + ?THROW({invalid, {K,V}}). ipaddr(A) -> try diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl index 421e280422..a94d37f7a8 100644 --- a/lib/diameter/src/base/diameter_codec.erl +++ b/lib/diameter/src/base/diameter_codec.erl @@ -333,6 +333,9 @@ decode_header(_) -> %% wraparound counter. The 8-bit counter is incremented each time the %% system is restarted. +sequence_numbers({_,_} = T) -> + T; + sequence_numbers(#diameter_packet{bin = Bin}) when is_binary(Bin) -> sequence_numbers(Bin); diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index e47f63f814..63d28f25a2 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -97,6 +97,9 @@ -record(monitor, {mref = make_ref() :: reference(), service}). %% name +%% The default sequence mask. +-define(NOMASK, {0,32}). + %% Time to lay low before restarting a dead service. -define(RESTART_SLEEP, 2000). @@ -549,9 +552,11 @@ make_config(SvcName, Opts) -> ok = encode_CER(COpts), - Os = split(Opts, [{[fun erlang:is_boolean/1], false, share_peers}, - {[fun erlang:is_boolean/1], false, use_shared_peers}, - {[fun erlang:is_pid/1, false], false, monitor}]), + Os = split(Opts, fun opt/2, [{false, share_peers}, + {false, use_shared_peers}, + {false, monitor}, + {?NOMASK, sequence}, + {nodes, restrict_connections}]), %% share_peers and use_shared_peers are currently undocumented. #service{name = SvcName, @@ -559,11 +564,66 @@ make_config(SvcName, Opts) -> capabilities = Caps}, options = Os}. +split(Opts, F, Defs) -> + [{K, F(K, get_opt(K, Opts, D))} || {D,K} <- Defs]. + +opt(K, false = B) + when K /= sequence -> + B; + +opt(K, true = B) + when K == share_peer; + K == use_shared_peers -> + B; + +opt(monitor, P) + when is_pid(P) -> + P; + +opt(restrict_connections, T) + when T == node; + T == nodes; + T == []; + is_atom(hd(T)) -> + T; + +opt(restrict_connections = K, F) -> + try diameter_lib:eval(F) of %% no guarantee that it won't fail later + Nodes when is_list(Nodes) -> + F; + V -> + ?THROW({value, {K,V}}) + catch + E:R -> + ?THROW({value, {K, E, R, ?STACK}}) + end; + +opt(sequence, {_,_} = T) -> + sequence(T); + +opt(sequence = K, F) -> + try diameter_lib:eval(F) of + T -> sequence(T) + catch + E:R -> + ?THROW({value, {K, E, R, ?STACK}}) + end; + +opt(K, _) -> + ?THROW({value, K}). + +sequence({H,N} = T) + when 0 =< N, N =< 32, 0 =< H, 0 == H bsr N -> + T; + +sequence(_) -> + ?THROW({value, sequence}). + make_caps(Caps, Opts) -> case diameter_capx:make_caps(Caps, Opts) of {ok, T} -> T; - {error, {Reason, _}} -> + {error, Reason} -> ?THROW(Reason) end. @@ -663,21 +723,6 @@ get_opt(Key, List, Def) -> _ -> ?THROW({arity, Key}) end. -split(Opts, Defs) -> - [{K, value(D, Opts)} || {_,_,K} = D <- Defs]. - -value({Preds, Def, Key}, Opts) -> - V = get_opt(Key, Opts, Def), - lists:any(fun(P) -> pred(P,V) end, Preds) - orelse ?THROW({value, Key}), - V. - -pred(F, V) - when is_function(F) -> - F(V); -pred(T, V) -> - T == V. - cb(M,F) -> try M:F() of V -> V diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index 46b2ba9465..1b2f32ddff 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -76,7 +76,7 @@ notify(SvcName, T) -> %%% # start/3 %%% --------------------------------------------------------------------------- -%% From old code: make is restart. +%% From old code: make it restart. start(_T, _Opts, #diameter_service{}) -> {error, restart}. diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 302540e76b..3f4945f7a6 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -55,10 +55,15 @@ -define(TLS, 1). %% Keys in process dictionary. --define(CB_KEY, cb). %% capabilities callback --define(DWA_KEY, dwa). %% outgoing DWA --define(Q_KEY, q). %% transport start queue --define(START_KEY, start). %% start of connected transport +-define(CB_KEY, cb). %% capabilities callback +-define(DWA_KEY, dwa). %% outgoing DWA +-define(Q_KEY, q). %% transport start queue +-define(START_KEY, start). %% start of connected transport +-define(SEQUENCE_KEY, mask). %% mask for sequence numbers +-define(RESTRICT_KEY, restrict). %% nodes for connection check + +%% The default sequence mask. +-define(NOMASK, {0,32}). %% A 2xxx series Result-Code. Not necessarily 2001. -define(IS_SUCCESS(N), 2 == (N) div 1000). @@ -121,7 +126,10 @@ %%% Output: Pid %%% --------------------------------------------------------------------------- --spec start(T, [Opt], #diameter_service{}) +-spec start(T, [Opt], #diameter_service{} %% from old code + | {diameter:sequence(), + diameter:restriction(), + #diameter_service{}}) -> pid() when T :: {connect|accept, diameter:transport_ref()}, Opt :: diameter:transport_opt(). @@ -131,10 +139,8 @@ %% specified on the transport in question. Check here that the list is %% still non-empty. -start({_,_} = Type, Opts, #diameter_service{applications = Apps} = Svc) -> - [] /= Apps orelse ?ERROR({no_apps, Type, Opts}), - T = {self(), Type, Opts, Svc}, - {ok, Pid} = diameter_peer_fsm_sup:start_child(T), +start({_,_} = Type, Opts, MS) -> + {ok, Pid} = diameter_peer_fsm_sup:start_child({self(), Type, Opts, MS}), Pid. start_link(T) -> @@ -153,12 +159,20 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc}) -> +i({WPid, Type, Opts, #diameter_service{} = Svc}) -> %% from old code + i({WPid, Type, Opts, {?NOMASK, [node() | nodes()], Svc}}); + +i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, + capabilities = Caps} + = Svc}}) -> + [] /= Apps orelse ?ERROR({no_apps, T, Opts}), putr(?DWA_KEY, dwa(Caps)), {M, Ref} = T, diameter_stats:reg(Ref), {[Ts], Rest} = proplists:split(Opts, [capabilities_cb]), putr(?CB_KEY, {Ref, [F || {_,F} <- Ts]}), + putr(?SEQUENCE_KEY, Mask), + putr(?RESTRICT_KEY, Nodes), erlang:monitor(process, WPid), {TPid, Addrs} = start_transport(T, Rest, Svc), #state{parent = WPid, @@ -464,9 +478,24 @@ build_CER(#state{service = #diameter_service{capabilities = Caps}}) -> %% encode/1 encode(Rec) -> - #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Rec), + Seq = diameter_session:sequence(sequence()), + Hdr = #diameter_header{version = ?DIAMETER_VERSION, + end_to_end_id = Seq, + hop_by_hop_id = Seq}, + Pkt = #diameter_packet{header = Hdr, + msg = Rec}, + #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt), Bin. +sequence() -> + case getr(?SEQUENCE_KEY) of + {_,_} = Mask -> + Mask; + undefined -> %% started in old code + putr(?SEQUENCE_KEY, ?NOMASK), + ?NOMASK + end. + %% recv/2 %% RFC 3588 has result code 5015 for an invalid length but if a @@ -965,14 +994,35 @@ dpa_timer() -> %% Register a term and ensure it's not registered elsewhere. Note that %% two process that simultaneously register the same term may well %% both fail to do so this isn't foolproof. +%% +%% Everywhere is no longer everywhere, it's where a +%% restrict_connections service_opt() specifies. register_everywhere(T) -> - diameter_reg:add_new(T) - andalso unregistered(T). + reg(getr(?RESTRICT_KEY), T). + +reg(Nodes, T) -> + add(lists:member(node(), Nodes), T) andalso unregistered(Nodes, T). + +add(true, T) -> + diameter_reg:add_new(T); +add(false, T) -> + diameter_reg:add(T). + +%% unregistered +%% +%% Ensure that the term in question isn't registered on other nodes. + +unregistered(Nodes, T) -> + {ResL, _} = rpc:multicall(Nodes, ?MODULE, match, [{node(), T}]), + lists:all(fun nomatch/1, ResL). + +nomatch({badrpc, {'EXIT', {undef, _}}}) -> %% no diameter on remote node + true; +nomatch(L) -> + [] == L. -unregistered(T) -> - {ResL, _} = rpc:multicall(?MODULE, match, [{node(), T}]), - lists:all(fun(L) -> [] == L end, ResL). +%% match/1 match({Node, _}) when Node == node() -> diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 882b9da238..619b12ecad 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -30,7 +30,8 @@ add_new/1, del/1, repl/2, - match/1]). + match/1, + wait/1]). -export([start_link/0]). @@ -65,27 +66,22 @@ %% Table entry containing the Term -> Pid mapping. -define(MAPPING(Term, Pid), {Term, Pid}). --record(state, {id = now()}). - -%%% ---------------------------------------------------------- -%%% # add(T) -%%% -%%% Input: Term = term() -%%% -%%% Output: true -%%% -%%% Description: Associate the specified term with self(). The list of pids -%%% having this or other assocations can be retrieved using -%%% match/1. -%%% -%%% An association is removed when the calling process dies -%%% or as a result of calling del/1. Adding the same term -%%% more than once is equivalent to adding it exactly once. -%%% -%%% Note that since match/1 takes a pattern as argument, -%%% specifying a term that contains match variables is -%%% probably not a good idea -%%% ---------------------------------------------------------- +-record(state, {id = now(), + q = []}). %% [{From, Pat}] + +%% =========================================================================== +%% # add(T) +%% +%% Associate the specified term with self(). The list of pids having +%% this or other assocations can be retrieved using match/1. +%% +%% An association is removed when the calling process dies or as a +%% result of calling del/1. Adding the same term more than once is +%% equivalent to adding it exactly once. +%% +%% Note that since match/1 takes a pattern as argument, specifying a +%% term that contains match variables is probably not a good idea +%% =========================================================================== -spec add(any()) -> true. @@ -93,17 +89,12 @@ add(T) -> call({add, fun ets:insert/2, T, self()}). -%%% ---------------------------------------------------------- -%%% # add_new(T) -%%% -%%% Input: T = term() -%%% -%%% Output: true | false -%%% -%%% Description: Like add/1 but only one process is allowed to have the -%%% the association, false being returned if an association -%%% already exists. -%%% ---------------------------------------------------------- +%% =========================================================================== +%% # add_new(T) +%% +%% Like add/1 but only one process is allowed to have the the +%% association, false being returned if an association already exists. +%% =========================================================================== -spec add_new(any()) -> boolean(). @@ -111,16 +102,12 @@ add(T) -> add_new(T) -> call({add, fun insert_new/2, T, self()}). -%%% ---------------------------------------------------------- -%%% # repl(T, NewT) -%%% -%%% Input: T, NewT = term() -%%% -%%% Output: true | false -%%% -%%% Description: Like add/1 but only replace an existing association on T, -%%% false being returned if it doesn't exist. -%%% ---------------------------------------------------------- +%% =========================================================================== +%% # repl(T, NewT) +%% +%% Like add/1 but only replace an existing association on T, false +%% being returned if it doesn't exist. +%% =========================================================================== -spec repl(any(), any()) -> boolean(). @@ -128,15 +115,11 @@ add_new(T) -> repl(T, U) -> call({repl, T, U, self()}). -%%% ---------------------------------------------------------- -%%% # del(Term) -%%% -%%% Input: Term = term() -%%% -%%% Output: true -%%% -%%% Description: Remove any existing association of Term with self(). -%%% ---------------------------------------------------------- +%% =========================================================================== +%% # del(Term) +%% +%% Remove any existing association of Term with self(). +%% =========================================================================== -spec del(any()) -> true. @@ -144,20 +127,16 @@ repl(T, U) -> del(T) -> call({del, T, self()}). -%%% ---------------------------------------------------------- -%%% # match(Pat) -%%% -%%% Input: Pat = pattern in the sense of ets:match_object/2. -%%% -%%% Output: list of {Term, Pid} -%%% -%%% Description: Return the list of associations whose Term, as specified -%%% to add/1 or add_new/1, matches the specified pattern. -%%% -%%% Note that there's no guarantee that the returned processes -%%% are still alive. (Although one that isn't will soon have -%%% its associations removed.) -%%% ---------------------------------------------------------- +%% =========================================================================== +%% # match(Pat) +%% +%% Return the list of associations whose Term, as specified to add/1 +%% or add_new/1, matches the specified pattern. +%% +%% Note that there's no guarantee that the returned processes are +%% still alive. (Although one that isn't will soon have its +%% associations removed.) +%% =========================================================================== -spec match(tuple()) -> [{term(), pid()}]. @@ -165,9 +144,17 @@ del(T) -> match(Pat) -> ets:match_object(?TABLE, ?MAPPING(Pat, '_')). -%% --------------------------------------------------------- -%% EXPORTED INTERNAL FUNCTIONS -%% --------------------------------------------------------- +%% =========================================================================== +%% # wait(Pat) +%% +%% Like match/1 but return only when the result is non-empty or fails. +%% It's up to the caller to ensure that the wait won't be forever. +%% =========================================================================== + +wait(Pat) -> + call({wait, Pat}). + +%% =========================================================================== start_link() -> ServerName = {local, ?SERVER}, @@ -182,7 +169,7 @@ uptime() -> %% pids/0 %% -%% Output: list of {Pid, [Term, ...]} +%% Return: list of {Pid, [Term, ...]} pids() -> to_list(fun swap/1). @@ -202,89 +189,100 @@ id(T) -> T. %% terms/0 %% -%% Output: list of {Term, [Pid, ...]} +%% Return: list of {Term, [Pid, ...]} terms() -> to_list(fun id/1). swap({X,Y}) -> {Y,X}. -%%% ---------------------------------------------------------- -%%% # init(Role) -%%% -%%% Output: {ok, State} -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # init/1 +%% ---------------------------------------------------------- init(_) -> ets:new(?TABLE, [bag, named_table]), {ok, #state{}}. -%%% ---------------------------------------------------------- -%%% # handle_call(Request, From, State) -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # handle_call/3 +%% ---------------------------------------------------------- -handle_call({add, Fun, Key, Pid}, _, State) -> +handle_call(Req, From, S) + when not is_record(S, state) -> + handle_call(Req, From, upgrade(S)); + +handle_call({add, Fun, Key, Pid}, _, S) -> B = Fun(?TABLE, {Key, Pid}), monitor(B andalso no_monitor(Pid), Pid), - {reply, B, State}; + {reply, B, pending(B, S)}; -handle_call({del, Key, Pid}, _, State) -> - {reply, ets:delete_object(?TABLE, ?MAPPING(Key, Pid)), State}; +handle_call({del, Key, Pid}, _, S) -> + {reply, ets:delete_object(?TABLE, ?MAPPING(Key, Pid)), S}; -handle_call({repl, T, U, Pid}, _, State) -> +handle_call({repl, T, U, Pid}, _, S) -> MatchSpec = [{?MAPPING('$1', Pid), [{'=:=', '$1', {const, T}}], ['$_']}], - {reply, repl(ets:select(?TABLE, MatchSpec), U, Pid), State}; + {reply, repl(ets:select(?TABLE, MatchSpec), U, Pid), S}; + +handle_call({wait, Pat}, From, #state{q = Q} = S) -> + case find(Pat) of + {ok, L} -> + {reply, L, S}; + false -> + {noreply, S#state{q = [{From, Pat} | Q]}} + end; -handle_call(state, _, State) -> - {reply, State, State}; +handle_call(state, _, S) -> + {reply, S, S}; -handle_call(uptime, _, #state{id = Time} = State) -> - {reply, diameter_lib:now_diff(Time), State}; +handle_call(uptime, _, #state{id = Time} = S) -> + {reply, diameter_lib:now_diff(Time), S}; -handle_call(Req, From, State) -> +handle_call(Req, From, S) -> ?UNEXPECTED([Req, From]), - {reply, nok, State}. + {reply, nok, S}. -%%% ---------------------------------------------------------- -%%% # handle_cast(Request, State) -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # handle_cast/2 +%% ---------------------------------------------------------- -handle_cast(Msg, State)-> +handle_cast(Msg, S)-> ?UNEXPECTED([Msg]), - {noreply, State}. + {noreply, S}. -%%% ---------------------------------------------------------- -%%% # handle_info(Request, State) -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # handle_info/2 +%% ---------------------------------------------------------- -handle_info({'DOWN', MRef, process, Pid, _}, State) -> +handle_info({'DOWN', MRef, process, Pid, _}, S) -> ets:delete_object(?TABLE, ?MONITOR(Pid, MRef)), ets:match_delete(?TABLE, ?MAPPING('_', Pid)), - {noreply, State}; + {noreply, S}; -handle_info(Info, State) -> +handle_info(Info, S) -> ?UNEXPECTED([Info]), - {noreply, State}. + {noreply, S}. -%%% ---------------------------------------------------------- -%%% # terminate(Reason, State) -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # terminate/2 +%% ---------------------------------------------------------- terminate(_Reason, _State)-> ok. -%%% ---------------------------------------------------------- -%%% # code_change(OldVsn, State, Extra) -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # code_change/3 +%% ---------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}. -%% --------------------------------------------------------- -%% INTERNAL FUNCTIONS -%% --------------------------------------------------------- +%% =========================================================================== + +upgrade(S) -> + #state{} = list_to_tuple(tuple_to_list(S) ++ [[]]). monitor(true, Pid) -> ets:insert(?TABLE, ?MONITOR(Pid, erlang:monitor(process, Pid))); @@ -321,6 +319,37 @@ repl([?MAPPING(_, Pid) = M], Key, Pid) -> repl([], _, _) -> false. +%% pending/1 + +pending(true, #state{q = [_|_] = Q} = S) -> + S#state{q = q(lists:reverse(Q), [])}; %% retain reply order +pending(_, S) -> + S. + +q([], Q) -> + Q; +q([{From, Pat} = T | Rest], Q) -> + case find(Pat) of + {ok, L} -> + gen_server:reply(From, L), + q(Rest, Q); + false -> + q(Rest, [T|Q]) + end. + +%% find/1 + +find(Pat) -> + try match(Pat) of + [] -> + false; + L -> + {ok, L} + catch + _:_ -> + {ok, []} + end. + %% call/1 call(Request) -> diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 725cccda1e..cffba4fc94 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -64,7 +64,7 @@ -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). -%% The "old" states maintained in this module historically. +%% The states mirrored by peer_up/peer_down callbacks. -define(STATE_UP, up). -define(STATE_DOWN, down). @@ -107,6 +107,12 @@ %% process. -define(STATE_TABLE, ?MODULE). +%% The default sequence mask. +-define(NOMASK, {0,32}). + +%% The default restrict_connections. +-define(RESTRICT, nodes). + %% Workaround for dialyzer's lack of understanding of match specs. -type match(T) :: T | '_' | '$1' | '$2' | '$3' | '$4'. @@ -114,15 +120,18 @@ %% State of service gen_server. -record(state, {id = now(), - service_name, %% as passed to start_service/2, key in ?STATE_TABLE + service_name, %% as passed to start_service/2, key in ?STATE_TABLE service :: #diameter_service{}, - peerT = ets_new(peers) :: ets:tid(), %% #peer{} at start_fsm - connT = ets_new(conns) :: ets:tid(), %% #conn{} at connection_up - share_peers = false :: boolean(), %% broadcast peers to remote nodes? - use_shared_peers = false :: boolean(), %% use broadcasted peers? + peerT = ets_new(peers) :: ets:tid(),%% #peer{} at start_fsm + connT = ets_new(conns) :: ets:tid(),%% #conn{} at connection_up/reopen shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] - monitor = false :: false | pid()}). %% process to die with + monitor = false :: false | pid(), %% process to die with + options + :: [{sequence, diameter:sequence()} %% sequence mask + | {restrict_connections, diameter:restriction()} + | {share_peers, boolean()} %% broadcast peers to remote nodes? + | {use_shared_peers, boolean()}]}).%% use broadcasted peers? %% shared_peers reflects the peers broadcast from remote nodes. Note %% that the state term itself doesn't change, which is relevant for %% the stateless application callbacks since the state is retrieved @@ -130,7 +139,12 @@ %% service record is used to determine whether or not we need to call %% the process for a pick_peer callback. -%% Record representing a watchdog process. +%% Record representing a watchdog process as implemented by +%% diameter_watchdog. The term "peer" here is historical, made +%% especially confusing by the fact that a peer_ref() in the +%% documentation is the key of a #conn{} record, not a #peer{} record. +%% The name is also unfortunate given the meaning of peer in the +%% Diameter sense. -record(peer, {pid :: match(pid()), type :: match(connect | accept), @@ -140,9 +154,15 @@ :: match(op_state() | {op_state(), wd_state()}), started = now(), %% at process start conn = false :: match(boolean() | pid())}). - %% true at accept, pid() at connection_up (connT key) - -%% Record representing a peer_fsm process. + %% true at accepted, pid() at connection_up or reopen + +%% Record representing a peer process as implemented by +%% diameter_peer_fsm. The term "conn" is historical. Despite the name +%% here, comments refer to watchdog and peer processes, that are keys +%% in #peer{} and #conn{} records respectively. To add to the +%% confusion, a #request.transport is a peer process = key in a +%% #conn{} record. The actual transport process (that the peer process +%% knows about and that has a transport connection) isn't seen here. -record(conn, {pid :: pid(), apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias} @@ -156,10 +176,9 @@ handler :: match(pid()), %% request process transport :: match(pid()), %% peer process caps :: match(#diameter_caps{}), - app :: match(diameter:app_alias()), %% #diameter_app.alias - dictionary :: match(module()), %% #diameter_app.dictionary - module :: match([module() | list()]), - %% #diameter_app.module + app :: match(diameter:app_alias()),%% #diameter_app.alias + dictionary :: match(module()), %% #diameter_app.dictionary + module :: match([module() | list()]), %% #diameter_app.module filter :: match(diameter:peer_filter()), packet :: match(#diameter_packet{})}). @@ -170,20 +189,6 @@ timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF, detach = false :: boolean()}). -%% Since RFC 3588 requires that a Diameter agent not modify End-to-End -%% Identifiers, the possibility of explicitly setting an End-to-End -%% Identifier would be needed to be able to implement an agent in -%% which one side of the communication is not implemented on top of -%% diameter. For example, Diameter being sent or received encapsulated -%% in some other protocol, or even another Diameter stack in a -%% non-Erlang environment. (Not that this is likely to be a normal -%% case.) -%% -%% The implemented solution is not an option but to respect any header -%% values set in a diameter_header record returned from a -%% prepare_request callback. A call to diameter:call/4 can communicate -%% values to the callback using the 'extra' option if so desired. - %%% --------------------------------------------------------------------------- %%% # start(SvcName) %%% --------------------------------------------------------------------------- @@ -236,20 +241,20 @@ stop_transport(SvcName, [_|_] = Refs) -> %%% --------------------------------------------------------------------------- info(SvcName, Item) -> - info_rc(call_service_by_name(SvcName, {info, Item})). - -info_rc({error, _}) -> - undefined; -info_rc(Info) -> - Info. + case find_state(SvcName) of + #state{} = S -> + service_info(Item, S); + false -> + undefined + end. %%% --------------------------------------------------------------------------- %%% # receive_message(TPid, Pkt, MessageData) %%% --------------------------------------------------------------------------- -%% Handle an incoming message in the watchdog process. This used to -%% come through the service process but this avoids that becoming a -%% bottleneck. +%% Handle an incoming Diameter message in the watchdog process. This +%% used to come through the service process but this avoids that +%% becoming a bottleneck. receive_message(TPid, Pkt, T) when is_pid(TPid) -> @@ -329,21 +334,39 @@ call_rc(_, _, Sent) -> %% In the process spawned for the outgoing request. call(SvcName, App, Msg, Opts, Caller) -> - c(ets:lookup(?STATE_TABLE, SvcName), App, Msg, Opts, Caller). + c(find_state(SvcName), App, Msg, Opts, Caller). -c([#state{service_name = SvcName} = S], App, Msg, Opts, Caller) -> +c(#state{service_name = Svc, options = [{_, Mask} | _]} = S, + App, + Msg, + Opts, + Caller) -> case find_transport(App, Msg, Opts, S) of {_,_,_} = T -> - send_request(T, Msg, Opts, Caller, SvcName); + send_request(T, Mask, Msg, Opts, Caller, Svc); false -> {error, no_connection}; {error, _} = No -> No end; -c([], _, _, _, _) -> +c(false, _, _, _, _) -> {error, no_service}. +%% find_state/1 + +find_state(SvcName) -> + fs(ets:lookup(?STATE_TABLE, SvcName)). + +fs([#state{} = S]) -> + S; + +fs([S]) -> %% inserted from old code + upgrade(S); + +fs([]) -> + false. + %% make_options/1 make_options(Options) -> @@ -439,6 +462,10 @@ i(_, false) -> %%% # handle_call(Req, From, State) %%% --------------------------------------------------------------------------- +handle_call(T, From, S) + when not is_record(S, state) -> + handle_call(T, From, upgrade(S)); + handle_call(state, _, S) -> {reply, S, S}; @@ -462,6 +489,7 @@ handle_call({pick_peer, Local, Remote, App}, _From, S) -> handle_call({call_module, AppMod, Req}, From, S) -> call_module(AppMod, Req, From, S); +%% Call from old code. handle_call({info, Item}, _From, S) -> {reply, service_info(Item, S), S}; @@ -472,6 +500,14 @@ handle_call(stop, _From, S) -> %% gets the reply. We deal with this in the call to the server, %% stating a monitor that waits for DOWN before returning. +%% Watchdog is asking for the sequence mask. +handle_call(sequence, _From, #state{options = [{_, Mask} | _]} = S) -> + {reply, Mask, S}; + +%% Watchdog is asking for the nodes restriction. +handle_call(restriction, _From, #state{options = [_,_,_,{_,R} | _]} = S) -> + {reply, R, S}; + handle_call(Req, From, S) -> unexpected(handle_call, [Req, From], S), {reply, nok, S}. @@ -488,15 +524,16 @@ handle_cast(Req, S) -> %%% # handle_info(Req, State) %%% --------------------------------------------------------------------------- -handle_info(T,S) -> +handle_info(T, #state{} = S) -> case transition(T,S) of ok -> {noreply, S}; - #state{} = NS -> - {noreply, NS}; {stop, Reason} -> {stop, {shutdown, Reason}, S} - end. + end; + +handle_info(T, S) -> + handle_info(T, upgrade(S)). %% transition/2 @@ -507,15 +544,26 @@ transition({accepted, Pid, TPid}, S) -> %% Peer process has a new open connection. transition({connection_up, Pid, T}, S) -> - connection_up(Pid, T, S); + connection_up(Pid, T, S), + ok; + +%% Watchdog has a new connection that will be opened after DW[RA] +%% exchange. This message was added long after connection_up, to +%% communicate the information as soon as it's available. Leave +%% connection_up as is it for now, duplicated information and all. +transition({reopen, Pid, T}, S) -> + reopen(Pid, T, S), + ok; -%% Peer process has left state open. +%% Watchdog has left state OKAY. transition({connection_down, Pid}, S) -> - connection_down(Pid, S); + connection_down(Pid, S), + ok; -%% Peer process has returned to state open. +%% Watchdog has returned to state OKAY. transition({connection_up, Pid}, S) -> - connection_up(Pid, S); + connection_up(Pid, S), + ok; %% Accepting transport has lost connectivity. transition({close, Pid}, S) -> @@ -529,7 +577,7 @@ transition({reconnect, Pid}, S) -> %% Watchdog is sending notification of a state transition. Note that %% the connection_up/down messages are pre-date this message and are -%% still used. A 'watchdog' message will follow these and communicate +%% still used. A watchdog message will follow these and communicate %% the same state as was set in handling connection_up/down. transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName, peerT = PeerT}) -> @@ -539,21 +587,22 @@ transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName, insert(PeerT, P#peer{op_state = {OS, To}}), send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}), ok; -%% Death of a peer process results in the removal of it's peer and any -%% associated conn record when 'DOWN' is received (after this) but the -%% states will be {?STATE_UP, ?WD_DOWN} for a short time. (No real -%% problem since ?WD_* is only used in service_info.) We set ?WD_OKAY -%% as a consequence of connection_up since we know a watchdog is -%% coming. We can't set anything at connection_down since we don't -%% know if the subsequent watchdog message will be ?WD_DOWN or -%% ?WD_SUSPECT. We don't (yet) set ?STATE_* as a consequence of a -%% watchdog message since this requires changing some of the matching -%% on ?STATE_*. -%% -%% Death of a conn process results in connection_down followed by -%% watchdog ?WD_DOWN. The latter doesn't result in the conn record -%% being deleted since 'DOWN' from death of its peer doesn't (yet) -%% deal with the record having been removed. +%% Death of a watchdog process (#peer.pid) results in the removal of +%% it's peer and any associated conn record when 'DOWN' is received +%% (after this) but the states will be {?STATE_UP, ?WD_DOWN} for a +%% short time. (No real problem since ?WD_* is only used in +%% service_info.) We set ?WD_OKAY as a consequence of connection_up +%% since we know a watchdog is coming. We can't set anything at +%% connection_down since we don't know if the subsequent watchdog +%% message will be ?WD_DOWN or ?WD_SUSPECT. We don't (yet) set +%% ?STATE_* as a consequence of a watchdog message since this requires +%% changing some of the matching on ?STATE_*. +%% +%% Death of a peer process process (#conn.pid, #peer.conn) results in +%% connection_down followed by watchdog ?WD_DOWN. The latter doesn't +%% result in the conn record being deleted since 'DOWN' from death of +%% its watchdog doesn't (yet) deal with the record having been +%% removed. %% Monitor process has died. Just die with a reason that tells %% diameter_config about the happening. If a cleaner shutdown is @@ -561,23 +610,26 @@ transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName, transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) -> {stop, {monitor, Reason}}; -%% Local peer process has died. +%% Local watchdog process has died. transition({'DOWN', _, process, Pid, Reason}, S) when node(Pid) == node() -> - peer_down(Pid, Reason, S); + peer_down(Pid, Reason, S), + ok; -%% Remote service wants to know about shared transports. +%% Remote service wants to know about shared peers. transition({service, Pid}, S) -> share_peers(Pid, S), ok; %% Remote service is communicating a shared peer. transition({peer, TPid, Aliases, Caps}, S) -> - remote_peer_up(TPid, Aliases, Caps, S); + remote_peer_up(TPid, Aliases, Caps, S), + ok; %% Remote peer process has died. transition({'DOWN', _, process, TPid, _}, S) -> - remote_peer_down(TPid, S); + remote_peer_down(TPid, S), + ok; %% Restart after tc expiry. transition({tc_timeout, T}, S) -> @@ -591,10 +643,39 @@ transition({failover, TRef, Seqs}, S) -> failover(TRef, Seqs, S), ok; +%% Ensure upgraded state is stored in state table. +transition(upgrade, _) -> + ok; + transition(Req, S) -> unexpected(handle_info, [Req], S), ok. +%% upgrade/1 + +upgrade({state, Id, Svc, Name, Svc, PT, CT, SB, UB, SD, LD, MPid}) -> + S = #state{id = Id, + service_name = Name, + service = Svc, + peerT = PT, + connT = CT, + shared_peers = SD, + local_peers = LD, + monitor = MPid, + options = [{sequence, ?NOMASK}, + {share_peers, SB}, + {use_shared_peers, UB}, + {restrict_connections, ?RESTRICT}]}, + upgrade_insert(S), + S. + +upgrade_insert(#state{service = #diameter_service{pid = Pid}} = S) -> + if Pid == self() -> + ets:insert(?STATE_TABLE, S); + true -> + Pid ! upgrade + end. + %%% --------------------------------------------------------------------------- %%% # terminate(Reason, State) %%% --------------------------------------------------------------------------- @@ -710,16 +791,20 @@ shutdown(#state{peerT = PeerT}) -> wait(Fun, T) -> diameter_lib:wait(ets:foldl(Fun, [], T)). -st(#peer{conn = B}, Acc) - when is_boolean(B) -> - Acc; -st(#peer{conn = Pid}, Acc) -> +st(#peer{op_state = {OS,_}} = P, Acc) -> + st(P#peer{op_state = OS}, Acc); +st(#peer{op_state = ?STATE_UP, conn = Pid}, Acc) -> Pid ! shutdown, - [Pid | Acc]. + [Pid | Acc]; +st(#peer{}, Acc) -> + Acc. -sw(#peer{pid = Pid}, Acc) -> +sw(#peer{pid = Pid}, Acc) + when is_pid(Pid) -> exit(Pid, shutdown), - [Pid | Acc]. + [Pid | Acc]; +sw(#peer{}, Acc) -> + Acc. %%% --------------------------------------------------------------------------- %%% # call_service/2 @@ -779,9 +864,8 @@ cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts}, lists:foreach(fun init_mod/1, Apps), S = #state{service_name = SvcName, service = Rec#diameter_service{pid = self()}, - share_peers = get_value(share_peers, Opts), - use_shared_peers = get_value(use_shared_peers, Opts), - monitor = mref(get_value(monitor, Opts))}, + monitor = mref(get_value(monitor, Opts)), + options = service_options(Opts)}, {S, Acc}; cfg_acc({_Ref, Type, _Opts} = T, {S, Acc}) @@ -789,15 +873,24 @@ cfg_acc({_Ref, Type, _Opts} = T, {S, Acc}) Type == listen -> {S, [T | Acc]}. +service_options(Opts) -> + [{sequence, proplists:get_value(sequence, Opts, ?NOMASK)}, + {share_peers, get_value(share_peers, Opts)}, + {use_shared_peers, get_value(use_shared_peers, Opts)}, + {restrict_connections, proplists:get_value(restrict_connections, + Opts, + ?RESTRICT)}]. +%% The order of options is significant since we match against the list. + mref(false = No) -> No; mref(P) -> erlang:monitor(process, P). -init_shared(#state{use_shared_peers = true, +init_shared(#state{options = [_, _, {_, true} | _], service_name = Svc}) -> diameter_peer:notify(Svc, {service, self()}); -init_shared(#state{use_shared_peers = false}) -> +init_shared(#state{options = [_, _, {_, false} | _]}) -> ok. init_mod(#diameter_app{alias = Alias, @@ -860,9 +953,8 @@ start(Ref, Type, Opts, #state{peerT = PeerT, Pid. %% Note that the service record passed into the watchdog is the merged -%% record so that each watchdog (and peer_fsm) may get a different -%% record. This record is what is passed back into application -%% callbacks. +%% record so that each watchdog may get a different record. This +%% record is what is passed back into application callbacks. s(Type, Ref, T) -> case diameter_watchdog:start({Type, Ref}, T) of @@ -913,8 +1005,8 @@ accepted(Pid, _TPid, #state{peerT = PeerT} = S) -> #peer{ref = Ref, type = accept = T, conn = false, options = Opts} = P = fetch(PeerT, Pid), - insert(PeerT, P#peer{conn = true}), %% mark replacement transport started - start(Ref, T, Opts, S). %% start new peer + insert(PeerT, P#peer{conn = true}), %% mark replacement as started + start(Ref, T, Opts, S). %% start new watchdog fetch(Tid, Key) -> [T] = ets:lookup(Tid, Key), @@ -929,11 +1021,9 @@ fetch(Tid, Key) -> %%% --------------------------------------------------------------------------- %%% # connection_up/3 -%%% -%%% Output: #state{} %%% --------------------------------------------------------------------------- -%% Peer process has reached the open state. +%% Watchdog process has reached state OKAY. connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT, connT = ConnT} @@ -948,9 +1038,29 @@ connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT, connection_up([Pkt], P#peer{conn = TPid}, C, S). %%% --------------------------------------------------------------------------- +%%% # reopen/3 +%%% --------------------------------------------------------------------------- + +%% Note that this connection_up/3 rewrites the same #conn{} now +%% written here. Both do so in case reopen has not happened in old +%% code. + +reopen(Pid, {TPid, {Caps, SApps, _Pkt}}, #state{peerT = PeerT, + connT = ConnT}) -> + P = fetch(PeerT, Pid), + C = #conn{pid = TPid, + apps = SApps, + caps = Caps, + peer = Pid}, + + insert(ConnT, C), + #peer{op_state = {?STATE_DOWN, _}} + = P, + insert(PeerT, P#peer{op_state = {?STATE_DOWN, ?WD_REOPEN}, + conn = TPid}). + +%%% --------------------------------------------------------------------------- %%% # connection_up/2 -%%% -%%% Output: #state{} %%% --------------------------------------------------------------------------- %% Peer process has transitioned back into the open state. Note that there @@ -979,10 +1089,8 @@ connection_up(T, P, C, #state{peerT = PeerT, insert(PeerT, P#peer{op_state = {?STATE_UP, ?WD_OKAY}}), request_peer_up(TPid), - report_status(up, P, C, S, T), - S#state{local_peers = insert_local_peer(SApps, - {{TPid, Caps}, {SvcName, Apps}}, - LDict)}. + insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), + report_status(up, P, C, S, T). insert_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps). @@ -1024,11 +1132,9 @@ peer_cb(MFA, Alias) -> %%% --------------------------------------------------------------------------- %%% # connection_down/2 -%%% -%%% Output: #state{} %%% --------------------------------------------------------------------------- -%% Peer process has transitioned out of the open state. +%% Watchdog has transitioned out of state OKAY. connection_down(Pid, #state{peerT = PeerT, connT = ConnT} @@ -1044,8 +1150,8 @@ connection_down(Pid, #state{peerT = PeerT, %% connection_down/3 -connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, S) -> - S; +connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, _) -> + ok; connection_down(#peer{conn = TPid, op_state = {?STATE_UP, _}} @@ -1058,12 +1164,8 @@ connection_down(#peer{conn = TPid, local_peers = LDict} = S) -> report_status(down, P, C, S, []), - NewS = S#state{local_peers - = remove_local_peer(SApps, - {{TPid, Caps}, {SvcName, Apps}}, - LDict)}, - request_peer_down(TPid, NewS), - NewS. + remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), + request_peer_down(TPid, S). remove_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). @@ -1082,11 +1184,9 @@ down_conn(Id, Alias, TC, {SvcName, Apps}) -> %%% --------------------------------------------------------------------------- %%% # peer_down/3 -%%% -%%% Output: #state{} %%% --------------------------------------------------------------------------- -%% Peer process has died. +%% Watchdog process has died. peer_down(Pid, Reason, #state{peerT = PeerT} = S) -> P = fetch(PeerT, Pid), @@ -1106,12 +1206,12 @@ closed({shutdown, {close, _TPid, Reason}}, closed(_, _, _) -> ok. -%% The peer has never come up ... -peer_down(#peer{conn = B}, S) +%% The watchdog has never reached OKAY ... +peer_down(#peer{conn = B}, _) when is_boolean(B) -> - S; + ok; -%% ... or it has. +%% ... or maybe it has. peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) -> #conn{} = C = fetch(ConnT, TPid), ets:delete_object(ConnT, C), @@ -1139,7 +1239,7 @@ restart(#peer{ref = Ref, started = Time}) -> {Time, {Ref, T, Opts}}; -%% ... or it has: a replacement transport has already been spawned. +%% ... or it has: a replacement has already been spawned. restart(#peer{type = accept}) -> false. @@ -1165,8 +1265,8 @@ default_tc(connect, Opts) -> default_tc(accept, _) -> 0. -%% Bound tc below if the peer was restarted recently to avoid -%% continuous in case of faulty config or other problems. +%% Bound tc below if the watchdog was restarted recently to avoid +%% continuous restarted in case of faulty config or other problems. tc(Time, Tc) -> choose(Tc > ?RESTART_TC orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC, @@ -1288,7 +1388,7 @@ cm([_,_|_], _, _, _) -> multiple. %%% --------------------------------------------------------------------------- -%%% # send_request/5 +%%% # send_request/6 %%% --------------------------------------------------------------------------- %% Send an outgoing request in its dedicated process. @@ -1301,71 +1401,89 @@ cm([_,_|_], _, _, _) -> %% The mod field of the #diameter_app{} here includes any extra %% arguments passed to diameter:call/2. -send_request({TPid, Caps, App}, Msg, Opts, Caller, SvcName) -> +send_request({TPid, Caps, App} = T, Mask, Msg, Opts, Caller, SvcName) -> #diameter_app{module = ModX} = App, - Pkt = make_request_packet(Msg), - - case cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]) of - {send, P} -> - send_request(make_request_packet(P, Pkt), - TPid, - Caps, - App, - Opts, - Caller, - SvcName); - {discard, Reason} -> - {error, Reason}; - discard -> - {error, discarded}; - T -> - ?ERROR({invalid_return, prepare_request, App, T}) - end. + Pkt = make_prepare_packet(Mask, Msg), + + send_req(cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]), + Pkt, + T, + Opts, + Caller, + SvcName, + []). + +send_req({send, P}, Pkt, T, Opts, Caller, SvcName, Fs) -> + send_req(make_request_packet(P, Pkt), T, Opts, Caller, SvcName, Fs); -%% make_request_packet/1 +send_req({discard, Reason} , _, _, _, _, _, _) -> + {error, Reason}; + +send_req(discard, _, _, _, _, _, _) -> + {error, discarded}; + +send_req({eval_packet, RC, F}, Pkt, T, Opts, Caller, SvcName, Fs) -> + send_req(RC, Pkt, T, Opts, Caller, SvcName, [F|Fs]); + +send_req(E, _, {_, _, App}, _, _, _, _) -> + ?ERROR({invalid_return, prepare_request, App, E}). + +%% make_prepare_packet/2 %% %% Turn an outgoing request as passed to call/4 into a diameter_packet %% record in preparation for a prepare_request callback. -make_request_packet(Bin) +make_prepare_packet(_, Bin) when is_binary(Bin) -> #diameter_packet{header = diameter_codec:decode_header(Bin), bin = Bin}; -make_request_packet(#diameter_packet{msg = [#diameter_header{} = Hdr | Avps]} - = Pkt) -> - Pkt#diameter_packet{msg = [make_request_header(Hdr) | Avps]}; +make_prepare_packet(Mask, #diameter_packet{msg = [#diameter_header{} = Hdr + | Avps]} + = Pkt) -> + Pkt#diameter_packet{msg = [make_prepare_header(Mask, Hdr) | Avps]}; -make_request_packet(#diameter_packet{header = Hdr} = Pkt) -> - Pkt#diameter_packet{header = make_request_header(Hdr)}; +make_prepare_packet(Mask, #diameter_packet{header = Hdr} = Pkt) -> + Pkt#diameter_packet{header = make_prepare_header(Mask, Hdr)}; -make_request_packet(Msg) -> - make_request_packet(#diameter_packet{msg = Msg}). +make_prepare_packet(Mask, Msg) -> + make_prepare_packet(Mask, #diameter_packet{msg = Msg}). -%% make_request_header/1 +%% make_prepare_header/1 -make_request_header(undefined) -> - Seq = diameter_session:sequence(), - make_request_header(#diameter_header{end_to_end_id = Seq, +make_prepare_header(Mask, undefined) -> + Seq = diameter_session:sequence(Mask), + make_prepare_header(#diameter_header{end_to_end_id = Seq, hop_by_hop_id = Seq}); -make_request_header(#diameter_header{version = undefined} = Hdr) -> - make_request_header(Hdr#diameter_header{version = ?DIAMETER_VERSION}); +make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined, + hop_by_hop_id = undefined}) -> + Seq = diameter_session:sequence(Mask), + make_prepare_header(#diameter_header{end_to_end_id = Seq, + hop_by_hop_id = Seq}); + +make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined} = H) -> + Seq = diameter_session:sequence(Mask), + make_prepare_header(H#diameter_header{end_to_end_id = Seq}); + +make_prepare_header(Mask, #diameter_header{hop_by_hop_id = undefined} = H) -> + Seq = diameter_session:sequence(Mask), + make_prepare_header(H#diameter_header{hop_by_hop_id = Seq}); + +make_prepare_header(_, Hdr) -> + make_prepare_header(Hdr). -make_request_header(#diameter_header{end_to_end_id = undefined} = H) -> - Seq = diameter_session:sequence(), - make_request_header(H#diameter_header{end_to_end_id = Seq}); +%% make_prepare_header/1 -make_request_header(#diameter_header{hop_by_hop_id = undefined} = H) -> - Seq = diameter_session:sequence(), - make_request_header(H#diameter_header{hop_by_hop_id = Seq}); +make_prepare_header(#diameter_header{version = undefined} = Hdr) -> + make_prepare_header(Hdr#diameter_header{version = ?DIAMETER_VERSION}); -make_request_header(#diameter_header{} = Hdr) -> +make_prepare_header(#diameter_header{} = Hdr) -> Hdr; -make_request_header(T) -> +make_prepare_header(T) -> ?ERROR({invalid_header, T}). %% make_request_packet/2 @@ -1375,7 +1493,7 @@ make_request_header(T) -> make_request_packet(Bin, _) when is_binary(Bin) -> - make_request_packet(Bin); + make_prepare_packet(false, Bin); make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]} = Pkt, @@ -1387,7 +1505,7 @@ make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]} %% This is primarily so that the end to end and hop by hop identifiers %% are retained. make_request_packet(#diameter_packet{header = Hdr} = Pkt, - #diameter_packet{header = Hdr0}) -> + #diameter_packet{header = Hdr0}) -> Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)}; make_request_packet(Msg, Pkt) -> @@ -1400,16 +1518,16 @@ fold_record(undefined, R) -> fold_record(Rec, R) -> diameter_lib:fold_tuple(2, Rec, R). -%% send_request/7 +%% send_req/6 -send_request(Pkt, TPid, Caps, App, Opts, Caller, SvcName) -> +send_req(Pkt, {TPid, Caps, App}, Opts, Caller, SvcName, Fs) -> #diameter_app{alias = Alias, dictionary = Dict, module = ModX, options = [{answer_errors, AE} | _]} = App, - EPkt = encode(Dict, Pkt), + EPkt = encode(Dict, Pkt, Fs), #options{filter = Filter, timeout = Timeout} @@ -1490,6 +1608,13 @@ msg(#diameter_packet{msg = undefined, bin = Bin}) -> msg(#diameter_packet{msg = Msg}) -> Msg. +%% encode/3 + +encode(Dict, Pkt, Fs) -> + P = encode(Dict, Pkt), + eval_packet(P, Fs), + P. + %% encode/2 %% Note that prepare_request can return a diameter_packet containing @@ -1571,38 +1696,47 @@ send(Pid, Pkt) -> %% retransmit/4 -retransmit({TPid, Caps, #diameter_app{alias = Alias} = App}, - #request{app = Alias, - packet = Pkt} +retransmit({TPid, Caps, #diameter_app{alias = Alias} = App} = T, + #request{app = Alias, packet = Pkt} = Req, SvcName, Timeout) -> have_request(Pkt, TPid) %% Don't failover to a peer we've andalso ?THROW(timeout), %% already sent to. - case cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]) of - {send, P} -> - retransmit(make_request_packet(P, Pkt), TPid, Caps, Req, Timeout); - {discard, Reason} -> - ?THROW(Reason); - discard -> - ?THROW(discarded); - T -> - ?ERROR({invalid_return, prepare_retransmit, App, T}) - end. + resend_req(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]), + T, + Req, + Timeout, + []). + +resend_req({send, P}, T, #request{packet = Pkt} = Req, Timeout, Fs) -> + retransmit(make_request_packet(P, Pkt), T, Req, Timeout, Fs); + +resend_req({discard, Reason}, _, _, _, _) -> + ?THROW(Reason); + +resend_req(discard, _, _, _, _) -> + ?THROW(discarded); -%% retransmit/5 +resend_req({eval_packet, RC, F}, T, Req, Timeout, Fs) -> + resend_req(RC, T, Req, Timeout, [F|Fs]); + +resend_req(T, {_, _, App}, _, _, _) -> + ?ERROR({invalid_return, prepare_retransmit, App, T}). + +%% retransmit/6 -retransmit(Pkt, TPid, Caps, #request{dictionary = Dict} = Req, Timeout) -> - EPkt = encode(Dict, Pkt), +retransmit(Pkt, {TPid, Caps, _}, #request{dictionary = D} = Req0, Tmo, Fs) -> + EPkt = encode(D, Pkt, Fs), - NewReq = Req#request{transport = TPid, - packet = Pkt, - caps = Caps}, + Req = Req0#request{transport = TPid, + packet = Pkt, + caps = Caps}, - ?LOG(retransmission, NewReq), - TRef = send_request(TPid, EPkt, NewReq, Timeout), - {TRef, NewReq}. + ?LOG(retransmission, Req), + TRef = send_request(TPid, EPkt, Req, Tmo), + {TRef, Req}. %% store_request/4 @@ -1674,10 +1808,13 @@ request_peer_down(TPid, S) -> %%% recv_request/3 %%% --------------------------------------------------------------------------- -recv_request(TPid, Pkt, {ConnT, SvcName, Apps}) -> +recv_request(TPid, Pkt, {ConnT, SvcName, Apps}) -> %% upgrade + recv_request(TPid, Pkt, {ConnT, SvcName, Apps, ?NOMASK}); + +recv_request(TPid, Pkt, {ConnT, SvcName, Apps, Mask}) -> try ets:lookup(ConnT, TPid) of [C] -> - recv_request(C, TPid, Pkt, SvcName, Apps); + recv_request(C, TPid, Pkt, SvcName, Apps, Mask); [] -> %% transport has gone down ok catch @@ -1687,7 +1824,12 @@ recv_request(TPid, Pkt, {ConnT, SvcName, Apps}) -> %% recv_request/5 -recv_request(#conn{apps = SApps, caps = Caps}, TPid, Pkt, SvcName, Apps) -> +recv_request(#conn{apps = SApps, caps = Caps}, + TPid, + Pkt, + SvcName, + Apps, + Mask) -> #diameter_caps{origin_host = {OH,_}, origin_realm = {OR,_}} = Caps, @@ -1699,6 +1841,7 @@ recv_request(#conn{apps = SApps, caps = Caps}, TPid, Pkt, SvcName, Apps) -> {SvcName, OH, OR}, TPid, Apps, + Mask, Caps, Pkt). @@ -1724,20 +1867,24 @@ keyfind([Key | Rest], Pos, L) -> T end. -%% recv_request/6 +%% recv_request/7 -recv_request({Id, Alias}, T, TPid, Apps, Caps, Pkt) -> +recv_request({Id, Alias}, T, TPid, Apps, Mask, Caps, Pkt) -> #diameter_app{dictionary = Dict} = A = find_app(Alias, Apps), - recv_request(T, {TPid, Caps}, A, diameter_codec:decode(Id, Dict, Pkt)); + recv_request(T, + {TPid, Caps}, + A, + Mask, + diameter_codec:decode(Id, Dict, Pkt)); %% Note that the decode is different depending on whether or not Id is %% ?APP_ID_RELAY. %% DIAMETER_APPLICATION_UNSUPPORTED 3007 %% A request was sent for an application that is not supported. -recv_request(false, T, TPid, _, _, Pkt) -> +recv_request(false, T, TPid, _, _, _, Pkt) -> As = collect_avps(Pkt), protocol_error(3007, T, TPid, Pkt#diameter_packet{avps = As}). @@ -1749,7 +1896,7 @@ collect_avps(Pkt) -> As end. -%% recv_request/4 +%% recv_request/5 %% Wrong number of bits somewhere in the message: reply. %% @@ -1758,7 +1905,7 @@ collect_avps(Pkt) -> %% set to an unrecognized value, or that is inconsistent with the %% AVP's definition. %% -recv_request(T, {TPid, _}, _, #diameter_packet{errors = [Bs | _]} = Pkt) +recv_request(T, {TPid, _}, _, _, #diameter_packet{errors = [Bs | _]} = Pkt) when is_bitstring(Bs) -> protocol_error(3009, T, TPid, Pkt); @@ -1773,6 +1920,7 @@ recv_request(T, {TPid, _}, _, #diameter_packet{errors = [Bs | _]} = Pkt) recv_request(T, {TPid, _}, #diameter_app{id = Id}, + _, #diameter_packet{header = #diameter_header{is_proxiable = P}, msg = M} = Pkt) @@ -1790,6 +1938,7 @@ recv_request(T, recv_request(T, {TPid, _}, _, + _, #diameter_packet{header = #diameter_header{is_error = true}} = Pkt) -> protocol_error(3008, T, TPid, Pkt); @@ -1798,14 +1947,20 @@ recv_request(T, %% in the relay application. Don't distinguish between the two since %% each application has its own callback config. That is, the user can %% easily distinguish between the two cases. -recv_request(T, TC, App, Pkt) -> - request_cb(T, TC, App, examine(Pkt)). +recv_request(T, TC, App, Mask, Pkt) -> + request_cb(T, TC, App, Mask, examine(Pkt)). %% Note that there may still be errors but these aren't protocol %% (3xxx) errors that lead to an answer-message. -request_cb({SvcName, _OH, _OR} = T, TC, App, Pkt) -> - request_cb(cb(App, handle_request, [Pkt, SvcName, TC]), App, T, TC, Pkt). +request_cb({SvcName, _OH, _OR} = T, TC, App, Mask, Pkt) -> + request_cb(cb(App, handle_request, [Pkt, SvcName, TC]), + App, + Mask, + T, + TC, + [], + Pkt). %% examine/1 %% @@ -1825,7 +1980,7 @@ examine(#diameter_packet{errors = Es} = Pkt) -> Pkt#diameter_packet{errors = [5011 | Es]}. %% It's odd/unfortunate that this isn't a protocol error. -%% request_cb/5 +%% request_cb/7 %% A reply may be an answer-message, constructed either here or by %% the handle_request callback. The header from the incoming request @@ -1835,21 +1990,23 @@ examine(#diameter_packet{errors = Es} = Pkt) -> request_cb({reply, Ans}, #diameter_app{dictionary = Dict}, _, + _, {TPid, _}, + Fs, Pkt) -> - reply(Ans, Dict, TPid, Pkt); + reply(Ans, Dict, TPid, Fs, Pkt); %% An 3xxx result code, for which the E-bit is set in the header. -request_cb({protocol_error, RC}, _, T, {TPid, _}, Pkt) +request_cb({protocol_error, RC}, _, _, T, {TPid, _}, Fs, Pkt) when 3000 =< RC, RC < 4000 -> - protocol_error(RC, T, TPid, Pkt); + protocol_error(RC, T, TPid, Fs, Pkt); %% RFC 3588 says we must reply 3001 to anything unrecognized or %% unsupported. 'noreply' is undocumented (and inappropriately named) %% backwards compatibility for this, protocol_error the documented %% alternative. -request_cb(noreply, _, T, {TPid, _}, Pkt) -> - protocol_error(3001, T, TPid, Pkt); +request_cb(noreply, _, _, T, {TPid, _}, Fs, Pkt) -> + protocol_error(3001, T, TPid, Fs, Pkt); %% Relay a request to another peer. This is equivalent to doing an %% explicit call/4 with the message in question except that (1) a loop @@ -1869,38 +2026,51 @@ request_cb(noreply, _, T, {TPid, _}, Pkt) -> request_cb({A, Opts}, #diameter_app{id = Id} = App, + Mask, T, TC, + Fs, Pkt) when A == relay, Id == ?APP_ID_RELAY; A == proxy, Id /= ?APP_ID_RELAY; A == resend -> - resend(Opts, App, T, TC, Pkt); + resend(Opts, App, Mask, T, TC, Fs, Pkt); -request_cb(discard, _, _, _, _) -> +request_cb(discard, _, _, _, _, _, _) -> ok; -request_cb({eval, RC, F}, App, T, TC, Pkt) -> - request_cb(RC, App, T, TC, Pkt), +request_cb({eval_packet, RC, F}, App, Mask, T, TC, Fs, Pkt) -> + request_cb(RC, App, Mask, T, TC, [F|Fs], Pkt); + +request_cb({eval, RC, F}, App, Mask, T, TC, Fs, Pkt) -> + request_cb(RC, App, Mask, T, TC, Pkt, Fs), diameter_lib:eval(F). -%% protocol_error/4 +%% protocol_error/5 -protocol_error(RC, {_, OH, OR}, TPid, #diameter_packet{avps = Avps} = Pkt) -> +protocol_error(RC, {_, OH, OR}, TPid, Fs, Pkt) -> + #diameter_packet{avps = Avps} = Pkt, ?LOG({error, RC}, Pkt), - reply(answer_message({OH, OR, RC}, Avps), ?BASE, TPid, Pkt). + reply(answer_message({OH, OR, RC}, Avps), ?BASE, TPid, Fs, Pkt). + +%% protocol_error/4 -%% resend/5 +protocol_error(RC, T, TPid, Pkt) -> + protocol_error(RC, T, TPid, [], Pkt). + +%% resend/7 %% %% Resend a message as a relay or proxy agent. resend(Opts, #diameter_app{} = App, + Mask, {_SvcName, OH, _OR} = T, {_TPid, _Caps} = TC, + Fs, #diameter_packet{avps = Avps} = Pkt) -> {Code, _Flags, Vid} = ?BASE:avp_header('Route-Record'), - resend(is_loop(Code, Vid, OH, Avps), Opts, App, T, TC, Pkt). + resend(is_loop(Code, Vid, OH, Avps), Opts, App, Mask, T, TC, Fs, Pkt). %% DIAMETER_LOOP_DETECTED 3005 %% An agent detected a loop while trying to get the message to the @@ -1908,8 +2078,8 @@ resend(Opts, %% if one is available, but the peer reporting the error has %% identified a configuration problem. -resend(true, _, _, T, {TPid, _}, Pkt) -> %% Route-Record loop - protocol_error(3005, T, TPid, Pkt); +resend(true, _, _, _, T, {TPid, _}, Fs, Pkt) -> %% Route-Record loop + protocol_error(3005, T, TPid, Fs, Pkt); %% 6.1.8. Relaying and Proxying Requests %% @@ -1920,16 +2090,18 @@ resend(true, _, _, T, {TPid, _}, Pkt) -> %% Route-Record loop resend(false, Opts, App, + Mask, {SvcName, _, _} = T, {TPid, #diameter_caps{origin_host = {_, OH}}}, + Fs, #diameter_packet{header = Hdr0, avps = Avps} = Pkt) -> Route = #diameter_avp{data = {?BASE, 'Route-Record', OH}}, - Seq = diameter_session:sequence(), + Seq = diameter_session:sequence(Mask), Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq}, Msg = [Hdr, Route | Avps], - resend(call(SvcName, App, Msg, Opts), T, TPid, Pkt). + resend(call(SvcName, App, Msg, Opts), T, TPid, Fs, Pkt). %% The incoming request is relayed with the addition of a %% Route-Record. Note the requirement on the return from call/4 below, %% which places a requirement on the value returned by the @@ -1955,15 +2127,18 @@ resend(#diameter_packet{bin = B} = Pkt, _, TPid, + Fs, #diameter_packet{header = #diameter_header{hop_by_hop_id = Id}, transport_data = TD}) -> - send(TPid, Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B), - transport_data = TD}); + P = Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B), + transport_data = TD}, + eval_packet(P, Fs), + send(TPid, P); %% TODO: counters %% Or not: DIAMETER_UNABLE_TO_DELIVER. -resend(_, T, TPid, Pkt) -> - protocol_error(3002, T, TPid, Pkt). +resend(_, T, TPid, Fs, Pkt) -> + protocol_error(3002, T, TPid, Fs, Pkt). %% is_loop/4 %% @@ -1985,33 +2160,38 @@ is_loop(Code, Vid, OH, [_ | Avps]) is_loop(Code, Vid, OH, Avps) -> is_loop(Code, Vid, ?BASE:avp(encode, OH, 'Route-Record'), Avps). -%% reply/4 +%% reply/5 %% %% Send a locally originating reply. %% Skip the setting of Result-Code and Failed-AVP's below. -reply([Msg], Dict, TPid, Pkt) +reply([Msg], Dict, TPid, Fs, Pkt) when is_list(Msg); is_tuple(Msg) -> - reply(Msg, Dict, TPid, Pkt#diameter_packet{errors = []}); + reply(Msg, Dict, TPid, Fs, Pkt#diameter_packet{errors = []}); %% No errors or a diameter_header/avp list. -reply(Msg, Dict, TPid, #diameter_packet{errors = Es, - transport_data = TD} - = ReqPkt) +reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = Es, + transport_data = TD} + = ReqPkt) when [] == Es; is_record(hd(Msg), diameter_header) -> Pkt = diameter_codec:encode(Dict, make_answer_packet(Msg, ReqPkt)), + eval_packet(Pkt, Fs), incr(send, Pkt, Dict, TPid), %% count result codes in sent answers send(TPid, Pkt#diameter_packet{transport_data = TD}); %% Or not: set Result-Code and Failed-AVP AVP's. -reply(Msg, Dict, TPid, #diameter_packet{errors = [H|_] = Es} = Pkt) -> +reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = [H|_] = Es} = Pkt) -> reply(rc(Msg, rc(H), [A || {_,A} <- Es], Dict), Dict, TPid, + Fs, Pkt#diameter_packet{errors = []}). +eval_packet(Pkt, Fs) -> + lists:foreach(fun(F) -> diameter_lib:eval([F,Pkt]) end, Fs). + %% make_answer_packet/2 %% Binaries and header/avp lists are sent as-is. @@ -2071,7 +2251,7 @@ rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) -> rc(Rec, T, Dict) -> rc([Dict:rec2msg(element(1, Rec))], T, Dict). - + %% failed_avp/3 failed_avp(_, [] = No, _) -> @@ -2307,7 +2487,7 @@ a(#diameter_packet{errors = Es} = Pkt, SvcName, AE, #request{transport = TPid, when [] == Es; callback == AE -> cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]); - + a(Pkt, SvcName, report, Req) -> x(errors, handle_answer, [SvcName, Req, Pkt]); @@ -2469,7 +2649,7 @@ send_event(#diameter_event{service = SvcName} = E) -> %%% # share_peer/5 %%% --------------------------------------------------------------------------- -share_peer(up, Caps, Aliases, TPid, #state{share_peers = true, +share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _], service_name = Svc}) -> diameter_peer:notify(Svc, {peer, TPid, Aliases, Caps}); @@ -2480,11 +2660,11 @@ share_peer(_, _, _, _, _) -> %%% # share_peers/2 %%% --------------------------------------------------------------------------- -share_peers(Pid, #state{share_peers = true, +share_peers(Pid, #state{options = [_, {_, true} | _], local_peers = PDict}) -> ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict); -share_peers(_, #state{share_peers = false}) -> +share_peers(_, _) -> ok. sp(Pid, Alias, Peers) -> @@ -2494,39 +2674,31 @@ sp(Pid, Alias, Peers) -> %%% # remote_peer_up/4 %%% --------------------------------------------------------------------------- -remote_peer_up(Pid, Aliases, Caps, #state{use_shared_peers = true, +remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _], service = Svc, - shared_peers = PDict} - = S) -> + shared_peers = PDict}) -> #diameter_service{applications = Apps} = Svc, - Update = lists:filter(fun(A) -> - lists:keymember(A, #diameter_app.alias, Apps) - end, - Aliases), - S#state{shared_peers = rpu(Pid, Caps, PDict, Update)}; + Key = #diameter_app.alias, + As = lists:filter(fun(A) -> lists:keymember(A, Key, Apps) end, Aliases), + rpu(Pid, Caps, PDict, As); -remote_peer_up(_, _, _, #state{use_shared_peers = false} = S) -> - S. +remote_peer_up(_, _, _, #state{options = [_, _, {_, false} | _]}) -> + ok. rpu(_, _, PDict, []) -> PDict; rpu(Pid, Caps, PDict, Aliases) -> erlang:monitor(process, Pid), T = {Pid, Caps}, - lists:foldl(fun(A,D) -> ?Dict:append(A, T, D) end, - PDict, - Aliases). + lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases). %%% --------------------------------------------------------------------------- %%% # remote_peer_down/2 %%% --------------------------------------------------------------------------- -remote_peer_down(Pid, #state{use_shared_peers = true, - shared_peers = PDict} - = S) -> - S#state{shared_peers = lists:foldl(fun(A,D) -> rpd(Pid, A, D) end, - PDict, - ?Dict:fetch_keys(PDict))}. +remote_peer_down(Pid, #state{options = [_, _, {_, true} | _], + shared_peers = PDict}) -> + lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)). rpd(Pid, Alias, PDict) -> ?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict). @@ -2851,7 +3023,8 @@ transports(#state{peerT = PeerT}) -> -define(ALL_INFO, [capabilities, applications, transport, - pending]). + pending, + options]). %% The rest. -define(OTHER_INFO, [connections, @@ -2878,6 +3051,19 @@ tagged_info(Item, S) undefined end; +tagged_info(TPid, #state{peerT = PT, connT = CT}) + when is_pid(TPid) -> + try + [#conn{peer = Pid}] = ets:lookup(CT, TPid), + [#peer{ref = Ref, type = Type, options = Opts}] = ets:lookup(PT, Pid), + [{ref, Ref}, + {type, Type}, + {options, Opts}] + catch + error:_ -> + [] + end; + tagged_info(Items, S) when is_list(Items) -> [T || I <- Items, T <- [tagged_info(I,S)], T /= undefined, T /= []]; @@ -2928,6 +3114,7 @@ complete_info(Item, #state{service = Svc} = S) -> capabilities -> service_info(?CAP_INFO, S); applications -> info_apps(S); transport -> info_transport(S); + options -> info_options(S); pending -> info_pending(S); keys -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO; all -> service_info(?ALL_INFO, S); @@ -2955,7 +3142,12 @@ info_stats(#state{peerT = PeerT}) -> MatchSpec = [{#peer{ref = '$1', conn = '$2', _ = '_'}, [{'is_pid', '$2'}], [['$1', '$2']]}], - diameter_stats:read(lists:append(ets:select(PeerT, MatchSpec))). + try ets:select(PeerT, MatchSpec) of + L -> + diameter_stats:read(lists:append(L)) + catch + error: badarg -> [] %% service has gone down + end. %% info_transport/1 %% @@ -3000,7 +3192,12 @@ transport([[{type, accept}, {options, Opts} | _] | _] = Ls) -> {accept, [lists:nthtail(2,L) || L <- Ls]}]. peer_dict(#state{peerT = PeerT, connT = ConnT}, Dict0) -> - ets:foldl(fun(T,A) -> peer_acc(ConnT, A, T) end, Dict0, PeerT). + try ets:tab2list(PeerT) of + L -> + lists:foldl(fun(T,A) -> peer_acc(ConnT, A, T) end, Dict0, L) + catch + error: badarg -> Dict0 %% service has gone down + end. peer_acc(ConnT, Acc, #peer{pid = Pid, type = Type, @@ -3017,9 +3214,16 @@ peer_acc(ConnT, Acc, #peer{pid = Pid, | info_conn(ConnT, TPid, WS /= ?WD_DOWN)], Acc). +info_conn(ConnT, [TPid], B) -> + info_conn(ConnT, TPid, B); + info_conn(ConnT, TPid, true) when is_pid(TPid) -> - info_conn(ets:lookup(ConnT, TPid)); + try ets:lookup(ConnT, TPid) of + T -> info_conn(T) + catch + error: badarg -> [] %% service has gone down + end; info_conn(_, _, _) -> []. @@ -3096,7 +3300,11 @@ info_pending(#state{} = S) -> {{transport, '$2'}}, {{from, '$3'}}]}}]}], - ets:select(?REQUEST_TABLE, MatchSpec). + try + ets:select(?REQUEST_TABLE, MatchSpec) + catch + error: badarg -> [] %% service has gone down + end. %% info_connections/1 %% @@ -3151,3 +3359,8 @@ peer_acc(Peer, {PeerD, RefD}) -> [{TPid, _}, [{origin_host, {_, OH}} | _]] = [proplists:get_value(K, Peer) || K <- [peer, caps]], {dict:append(OH, Peer, PeerD), dict:append(OH, TPid, RefD)}. + +%% info_options/1 + +info_options(S) -> + S#state.options. diff --git a/lib/diameter/src/base/diameter_session.erl b/lib/diameter/src/base/diameter_session.erl index 4c468f207c..3b236f109a 100644 --- a/lib/diameter/src/base/diameter_session.erl +++ b/lib/diameter/src/base/diameter_session.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -20,6 +20,7 @@ -module(diameter_session). -export([sequence/0, + sequence/1, session_id/1, origin_state_id/0]). @@ -30,7 +31,7 @@ -define(INT32, 16#FFFFFFFF). %% --------------------------------------------------------------------------- -%% # sequence/0 +%% # sequence/0-1 %% %% Output: 32-bit %% --------------------------------------------------------------------------- @@ -77,6 +78,15 @@ sequence() -> Instr = {_Pos = 2, _Incr = 1, _Threshold = ?INT32, _SetVal = 0}, ets:update_counter(diameter_sequence, sequence, Instr). +-spec sequence(diameter:sequence()) + -> diameter:'Unsigned32'(). + +sequence({_,32}) -> + sequence(); + +sequence({H,N}) -> + (H bsl N) bor (sequence() band (1 bsl N - 1)). + %% --------------------------------------------------------------------------- %% # origin_state_id/0 %% --------------------------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index d7474e5c56..d814f1afe2 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -43,6 +43,7 @@ -include("diameter_internal.hrl"). -define(DEFAULT_TW_INIT, 30000). %% RFC 3539 ch 3.4.1 +-define(NOMASK, {0,32}). %% default sequence mask -record(watchdog, {%% PCB - Peer Control Block; see RFC 3539, Appendix A @@ -56,7 +57,9 @@ parent = self() :: pid(), transport :: pid() | undefined, tref :: reference(), %% reference for current watchdog timer - message_data}). %% term passed into diameter_service with message + message_data, %% term passed into diameter_service with message + sequence :: diameter:sequence(), %% mask + restrict :: {diameter:restriction(), boolean()}}). %% start/2 %% @@ -118,12 +121,23 @@ make_state({T, Pid, {RecvData, random:seed(now()), putr(restart, {T, Opts, Svc}), %% save seeing it in trace putr(dwr, dwr(Caps)), %% + {_,_} = Mask = call(Pid, sequence), + Restrict = call(Pid, restriction), + Nodes = restrict_nodes(Restrict), #watchdog{parent = Pid, - transport = monitor(diameter_peer_fsm:start(T, Opts, Svc)), + transport = monitor(diameter_peer_fsm:start(T, + Opts, + {Mask, Nodes, Svc})), tw = proplists:get_value(watchdog_timer, Opts, ?DEFAULT_TW_INIT), - message_data = {RecvData, SvcName, Apps}}. + message_data = {RecvData, SvcName, Apps, Mask}, + sequence = Mask, + restrict = {Restrict, lists:member(node(), Nodes)}}. + +%% Retrieve the sequence mask from the parent from the parent, rather +%% than having it passed into init/1, for upgrade reasons: the call to +%% diameter_service:receive_message/3 passes back the mask. %% handle_call/3 @@ -137,7 +151,7 @@ handle_cast(_, State) -> %% handle_info/2 -handle_info(T, State) -> +handle_info(T, #watchdog{} = State) -> case transition(T, State) of ok -> {noreply, State}; @@ -148,7 +162,13 @@ handle_info(T, State) -> ?LOG(stop, T), event(State, State#watchdog{status = down}), {stop, {shutdown, T}, State} - end. + end; + +handle_info(T, S) -> + handle_info(T, upgrade(S)). + +upgrade(S) -> + #watchdog{} = list_to_tuple(tuple_to_list(S) ++ [?NOMASK, {nodes, true}]). event(#watchdog{status = T}, #watchdog{status = T}) -> ok; @@ -241,9 +261,10 @@ transition({close, TPid, _Reason}, #watchdog{transport = TPid}) -> transition({open, TPid, Hosts, T} = Open, #watchdog{transport = TPid, status = initial, - parent = Pid} + parent = Pid, + restrict = {_, R}} = S) -> - case okay(getr(restart), Hosts) of + case okay(getr(restart), Hosts, R) of okay -> open(Pid, {TPid, T}), set_watchdog(S#watchdog{status = okay}); @@ -258,12 +279,15 @@ transition({open, TPid, Hosts, T} = Open, transition({open = P, TPid, _Hosts, T}, #watchdog{transport = TPid, + parent = Pid, status = down} = S) -> %% Store the info we need to notify the parent to reopen the %% connection after the requisite DWA's are received, at which - %% time we eraser(open). + %% time we eraser(open). The reopen message is a later addition, + %% to communicate the new capabilities as soon as they're known. putr(P, {TPid, T}), + Pid ! {reopen, self(), {TPid, T}}, set_watchdog(send_watchdog(S#watchdog{status = reopen, num_dwa = 0})); @@ -312,6 +336,15 @@ transition({state, Pid}, #watchdog{status = S}) -> %% =========================================================================== +%% Only call "upwards", to the parent service. +call(Pid, Req) -> + try + gen_server:call(Pid, Req, infinity) + catch + exit: Reason -> + exit({shutdown, {Req, Reason}}) + end. + monitor(Pid) -> erlang:monitor(process, Pid), Pid. @@ -325,26 +358,36 @@ getr(Key) -> eraser(Key) -> erase({?MODULE, Key}). -%% encode/1 +%% encode/2 -encode(Msg) -> - #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Msg), +encode(Msg, Mask) -> + Seq = diameter_session:sequence(Mask), + Hdr = #diameter_header{version = ?DIAMETER_VERSION, + end_to_end_id = Seq, + hop_by_hop_id = Seq}, + Pkt = #diameter_packet{header = Hdr, + msg = Msg}, + #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt), Bin. -%% okay/2 +%% okay/3 -okay({{accept, Ref}, _, _}, Hosts) -> +okay({{accept, Ref}, _, _}, Hosts, Restrict) -> T = {?MODULE, connection, Ref, Hosts}, diameter_reg:add(T), - okay(diameter_reg:match(T)); + if Restrict -> + okay(diameter_reg:match(T)); + true -> + okay + end; %% Register before matching so that at least one of two registering -%% processes will match the other. (Which can't happen as long as -%% diameter_peer_fsm guarantees at most one open connection to the same -%% peer.) +%% processes will match the other. -okay({{connect, _}, _, _}, _) -> +okay({{connect, _}, _, _}, _, _) -> okay. +%% okay/2 + %% The peer hasn't been connected recently ... okay([{_,P}]) -> P = self(), %% assert @@ -400,9 +443,10 @@ close(#watchdog{parent = Pid}) -> %% send_watchdog/1 send_watchdog(#watchdog{pending = false, - transport = TPid} + transport = TPid, + sequence = Mask} = S) -> - TPid ! {send, encode(getr(dwr))}, + TPid ! {send, encode(getr(dwr), Mask)}, ?LOG(send, 'DWR'), S#watchdog{pending = true}. @@ -555,7 +599,7 @@ timeout(#watchdog{status = T, = S) when T == suspect; T == reopen, P, N < 0 -> - exit(TPid, shutdown), + exit(TPid, {shutdown, watchdog_timeout}), close(S), S#watchdog{status = down}; @@ -600,19 +644,40 @@ restart(#watchdog{transport = undefined} = S) -> restart(S) -> S. +%% restart/2 +%% %% Only restart the transport in the connecting case. For an accepting -%% transport, we've registered the peer connection when leaving state -%% initial and this is used by a new accepting process to realize that -%% it's actually in state down rather then initial when receiving -%% notification of an open connection. - -restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid} = S) -> +%% transport, there's no guarantee that an accepted connection in a +%% restarted transport if from the peer we've lost contact with so +%% have to be prepared for another watchdog to handle it. This is what +%% the diameter_reg registration in this module is for: the peer +%% connection is registered when leaving state initial and this is +%% used by a new accepting watchdog to realize that it's actually in +%% state down rather then initial when receiving notification of an +%% open connection. + +restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid, + sequence = Mask, + restrict = {R,_}} + = S) -> Pid ! {reconnect, self()}, - S#watchdog{transport = monitor(diameter_peer_fsm:start(T, Opts, Svc))}; + Nodes = restrict_nodes(R), + S#watchdog{transport = monitor(diameter_peer_fsm:start(T, + Opts, + {Mask, Nodes, Svc})), + restrict = {R, lists:member(node(), Nodes)}}; + +%% No restriction on the number of connections to the same peer: just +%% die. Note that a state machine never enters state REOPEN in this +%% case. +restart({{accept, _}, _, _}, #watchdog{restrict = {_, false}}) -> + stop; + +%% Otherwise hang around until told to die. restart({{accept, _}, _, _}, S) -> S. -%% Don't currently use Opts/Svc in the accept case but having them in -%% the process dictionary is helpful if the process dies unexpectedly. + +%% Don't currently use Opts/Svc in the accept case. %% dwr/1 @@ -622,3 +687,22 @@ dwr(#diameter_caps{origin_host = OH, ['DWR', {'Origin-Host', OH}, {'Origin-Realm', OR}, {'Origin-State-Id', OSI}]. + +%% restrict_nodes/1 + +restrict_nodes(false) -> + []; + +restrict_nodes(nodes) -> + [node() | nodes()]; + +restrict_nodes(node) -> + [node()]; + +restrict_nodes(Nodes) + when [] == Nodes; + is_atom(hd(Nodes)) -> + Nodes; + +restrict_nodes(F) -> + diameter_lib:eval(F). |