aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_watchdog.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base/diameter_watchdog.erl')
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl390
1 files changed, 250 insertions, 140 deletions
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index d7474e5c56..073a415d10 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -43,41 +43,51 @@
-include("diameter_internal.hrl").
-define(DEFAULT_TW_INIT, 30000). %% RFC 3539 ch 3.4.1
+-define(NOMASK, {0,32}). %% default sequence mask
+
+-define(BASE, ?DIAMETER_DICT_COMMON).
-record(watchdog,
{%% PCB - Peer Control Block; see RFC 3539, Appendix A
status = initial :: initial | okay | suspect | down | reopen,
- pending = false :: boolean(),
+ pending = false :: boolean(), %% DWA
tw :: 6000..16#FFFFFFFF | {module(), atom(), list()},
%% {M,F,A} -> integer() >= 0
num_dwa = 0 :: -1 | non_neg_integer(),
%% number of DWAs received during reopen
%% end PCB
- parent = self() :: pid(),
- transport :: pid() | undefined,
- tref :: reference(), %% reference for current watchdog timer
- message_data}). %% term passed into diameter_service with message
-
+ parent = self() :: pid(), %% service process
+ transport :: pid() | undefined, %% peer_fsm process
+ tref :: reference(), %% reference for current watchdog timer
+ dictionary :: module(), %% common dictionary
+ receive_data :: term(),
+ %% term passed into diameter_service with incoming message
+ sequence :: diameter:sequence(), %% mask
+ restrict :: {diameter:restriction(), boolean()},
+ shutdown = false :: boolean()}).
+
+%% ---------------------------------------------------------------------------
%% start/2
%%
%% Start a monitor before the watchdog is allowed to proceed to ensure
%% that a failed capabilities exchange produces the desired exit
%% reason.
+%% ---------------------------------------------------------------------------
--spec start(Type, {RecvData, [Opt], SvcName, #diameter_service{}})
+-spec start(Type, {RecvData, [Opt], SvcOpts, #diameter_service{}})
-> {reference(), pid()}
when Type :: {connect|accept, diameter:transport_ref()},
RecvData :: term(),
Opt :: diameter:transport_opt(),
- SvcName :: diameter:service_name().
+ SvcOpts :: [diameter:service_opt()].
start({_,_} = Type, T) ->
- Ref = make_ref(),
- {ok, Pid} = diameter_watchdog_sup:start_child({Ref, {Type, self(), T}}),
+ Ack = make_ref(),
+ {ok, Pid} = diameter_watchdog_sup:start_child({Ack, Type, self(), T}),
try
{erlang:monitor(process, Pid), Pid}
after
- Pid ! Ref
+ send(Pid, Ack)
end.
start_link(T) ->
@@ -96,34 +106,94 @@ init(T) ->
proc_lib:init_ack({ok, self()}),
gen_server:enter_loop(?MODULE, [], i(T)).
-i({Ref, {_, Pid, _} = T}) ->
- MRef = erlang:monitor(process, Pid),
- receive
- Ref ->
- make_state(T);
- {'DOWN', MRef, process, _, _} = D ->
- exit({shutdown, D})
- end;
-
-i({_, Pid, _} = T) -> %% from old code
+i({Ack, T, Pid, {RecvData,
+ Opts,
+ SvcOpts,
+ #diameter_service{applications = Apps,
+ capabilities = Caps}
+ = Svc}}) ->
erlang:monitor(process, Pid),
- make_state(T).
-
-make_state({T, Pid, {RecvData,
- Opts,
- SvcName,
- #diameter_service{applications = Apps,
- capabilities = Caps}
- = Svc}}) ->
+ wait(Ack, Pid),
random:seed(now()),
putr(restart, {T, Opts, Svc}), %% save seeing it in trace
putr(dwr, dwr(Caps)), %%
+ {_,_} = Mask = proplists:get_value(sequence, SvcOpts),
+ Restrict = proplists:get_value(restrict_connections, SvcOpts),
+ Nodes = restrict_nodes(Restrict),
+ Dict0 = common_dictionary(Apps),
#watchdog{parent = Pid,
- transport = monitor(diameter_peer_fsm:start(T, Opts, Svc)),
+ transport = start(T, Opts, Mask, Nodes, Dict0, Svc),
tw = proplists:get_value(watchdog_timer,
Opts,
?DEFAULT_TW_INIT),
- message_data = {RecvData, SvcName, Apps}}.
+ receive_data = RecvData,
+ dictionary = Dict0,
+ sequence = Mask,
+ restrict = {Restrict, lists:member(node(), Nodes)}}.
+
+wait(Ref, Pid) ->
+ receive
+ Ref ->
+ ok;
+ {'DOWN', _, process, Pid, _} = D ->
+ exit({shutdown, D})
+ end.
+
+%% start/5
+
+start(T, Opts, Mask, Nodes, Dict0, Svc) ->
+ {_MRef, Pid}
+ = diameter_peer_fsm:start(T, Opts, {Mask, Nodes, Dict0, Svc}),
+ Pid.
+
+%% common_dictionary/1
+%%
+%% Determine the dictionary of the Diameter common application with
+%% Application Id 0. Fail on config errors.
+
+common_dictionary(Apps) ->
+ case
+ orddict:fold(fun dict0/3,
+ false,
+ lists:foldl(fun(#diameter_app{dictionary = M}, D) ->
+ orddict:append(M:id(), M, D)
+ end,
+ orddict:new(),
+ Apps))
+ of
+ {value, Mod} ->
+ Mod;
+ false ->
+ %% A transport should configure a common dictionary but
+ %% don't require it. Not configuring a common dictionary
+ %% means a user won't be able either send of receive
+ %% messages in the common dictionary: incoming request
+ %% will be answered with 3007 and outgoing requests cannot
+ %% be sent. The dictionary returned here is oly used for
+ %% messages diameter sends and receives: CER/CEA, DPR/DPA
+ %% and DWR/DWA.
+ ?BASE
+ end.
+
+%% Each application should be represented by a single dictionary.
+dict0(Id, [_,_|_] = Ms, _) ->
+ config_error({multiple_dictionaries, Ms, {application_id, Id}});
+
+%% An explicit common dictionary.
+dict0(?APP_ID_COMMON, [Mod], _) ->
+ {value, Mod};
+
+%% A pure relay, in which case the common application is implicit.
+%% This uses the fact that the common application will already have
+%% been folded.
+dict0(?APP_ID_RELAY, _, false) ->
+ {value, ?BASE};
+
+dict0(_, _, Acc) ->
+ Acc.
+
+config_error(T) ->
+ ?ERROR({configuration_error, T}).
%% handle_call/3
@@ -137,39 +207,64 @@ handle_cast(_, State) ->
%% handle_info/2
-handle_info(T, State) ->
+handle_info(T, #watchdog{} = State) ->
case transition(T, State) of
ok ->
{noreply, State};
#watchdog{} = S ->
- event(State, S),
+ close(T, State), %% service expects 'close' message
+ event(T, State, S), %% before 'watchdog'
{noreply, S};
stop ->
?LOG(stop, T),
- event(State, State#watchdog{status = down}),
+ event(T, State, State#watchdog{status = down}),
{stop, {shutdown, T}, State}
end.
-event(#watchdog{status = T}, #watchdog{status = T}) ->
+close({'DOWN', _, process, TPid, {shutdown, Reason}},
+ #watchdog{transport = TPid,
+ parent = Pid}) ->
+ send(Pid, {close, self(), Reason});
+
+close(_, _) ->
+ ok.
+
+event(_, #watchdog{status = T}, #watchdog{status = T}) ->
ok;
-event(#watchdog{transport = undefined}, #watchdog{transport = undefined}) ->
+event(_, #watchdog{transport = undefined}, #watchdog{transport = undefined}) ->
ok;
-event(#watchdog{status = From, transport = F, parent = Pid},
+event(Msg,
+ #watchdog{status = From, transport = F, parent = Pid},
#watchdog{status = To, transport = T}) ->
- E = {tpid(F,T), From, To},
- notify(Pid, E),
+ TPid = tpid(F,T),
+ E = {[TPid | data(Msg, TPid, From, To)], From, To},
+ send(Pid, {watchdog, self(), E}),
?LOG(transition, {self(), E}).
+data(Msg, TPid, reopen, okay) ->
+ {recv, TPid, 'DWA', _Pkt} = Msg, %% assert
+ {TPid, T} = eraser(open),
+ [T];
+
+data({open, TPid, _Hosts, T}, TPid, _From, To)
+ when To == okay;
+ To == reopen ->
+ [T];
+
+data(_, _, _, _) ->
+ [].
+
tpid(_, Pid)
when is_pid(Pid) ->
Pid;
+
tpid(Pid, _) ->
Pid.
-notify(Pid, E) ->
- Pid ! {watchdog, self(), E}.
+send(Pid, T) ->
+ Pid ! T.
%% terminate/2
@@ -199,15 +294,14 @@ transition(close, #watchdog{}) ->
ok;
%% Service is asking for the peer to be taken down gracefully.
-transition({shutdown, Pid}, #watchdog{parent = Pid,
- transport = undefined,
- status = S}) ->
- down = S, %% sanity check
+transition({shutdown, Pid, _}, #watchdog{parent = Pid,
+ transport = undefined}) ->
stop;
-transition({shutdown = T, Pid}, #watchdog{parent = Pid,
- transport = TPid}) ->
- TPid ! {T, self()},
- ok;
+transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid,
+ transport = TPid}
+ = S) ->
+ send(TPid, {T, self(), Reason}),
+ S#watchdog{shutdown = true};
%% Parent process has died,
transition({'DOWN', _, process, Pid, _Reason},
@@ -217,13 +311,9 @@ transition({'DOWN', _, process, Pid, _Reason},
%% Transport has accepted a connection.
transition({accepted = T, TPid}, #watchdog{transport = TPid,
parent = Pid}) ->
- Pid ! {T, self(), TPid},
+ send(Pid, {T, self(), TPid}),
ok;
-%% Transport is telling us that its impending death isn't failure.
-transition({close, TPid, _Reason}, #watchdog{transport = TPid}) ->
- stop;
-
%% STATE Event Actions New State
%% ===== ------ ------- ----------
%% INITIAL Connection up SetWatchdog() OKAY
@@ -238,14 +328,13 @@ transition({close, TPid, _Reason}, #watchdog{transport = TPid}) ->
%% know the identity of the peer (ie. now) that we know that we're in
%% state down rather than initial.
-transition({open, TPid, Hosts, T} = Open,
+transition({open, TPid, Hosts, _} = Open,
#watchdog{transport = TPid,
status = initial,
- parent = Pid}
+ restrict = {_, R}}
= S) ->
- case okay(getr(restart), Hosts) of
+ case okay(getr(restart), Hosts, R) of
okay ->
- open(Pid, {TPid, T}),
set_watchdog(S#watchdog{status = okay});
reopen ->
transition(Open, S#watchdog{status = down})
@@ -256,14 +345,15 @@ transition({open, TPid, Hosts, T} = Open,
%% SetWatchdog()
%% Pending = TRUE REOPEN
-transition({open = P, TPid, _Hosts, T},
+transition({open = Key, TPid, _Hosts, T},
#watchdog{transport = TPid,
status = down}
= S) ->
%% Store the info we need to notify the parent to reopen the
%% connection after the requisite DWA's are received, at which
- %% time we eraser(open).
- putr(P, {TPid, T}),
+ %% time we eraser(open). The reopen message is a later addition,
+ %% to communicate the new capabilities as soon as they're known.
+ putr(Key, {TPid, T}),
set_watchdog(send_watchdog(S#watchdog{status = reopen,
num_dwa = 0}));
@@ -275,23 +365,18 @@ transition({open = P, TPid, _Hosts, T},
%% REOPEN Connection down CloseConnection()
%% SetWatchdog() DOWN
-transition({'DOWN', _, process, TPid, _},
+transition({'DOWN', _, process, TPid, _Reason},
#watchdog{transport = TPid,
- status = initial}) ->
+ shutdown = true}) ->
stop;
-transition({'DOWN', _, process, TPid, _},
- #watchdog{transport = TPid}
+transition({'DOWN', _, process, TPid, _Reason},
+ #watchdog{transport = TPid,
+ status = T}
= S) ->
- failover(S),
- close(S),
- set_watchdog(S#watchdog{status = down,
+ set_watchdog(S#watchdog{status = case T of initial -> T; _ -> down end,
pending = false,
transport = undefined});
-%% Any outstanding pending (or other messages from the transport) will
-%% have arrived before 'DOWN' since the message comes from the same
-%% process. Note that we could also get this message in the initial
-%% state.
%% Incoming message.
transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) ->
@@ -307,15 +392,11 @@ transition({timeout, _, tw}, #watchdog{}) ->
%% State query.
transition({state, Pid}, #watchdog{status = S}) ->
- Pid ! {self(), S},
+ send(Pid, {self(), S}),
ok.
%% ===========================================================================
-monitor(Pid) ->
- erlang:monitor(process, Pid),
- Pid.
-
putr(Key, Val) ->
put({?MODULE, Key}, Val).
@@ -325,26 +406,36 @@ getr(Key) ->
eraser(Key) ->
erase({?MODULE, Key}).
-%% encode/1
+%% encode/3
-encode(Msg) ->
- #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Msg),
+encode(Msg, Mask, Dict) ->
+ Seq = diameter_session:sequence(Mask),
+ Hdr = #diameter_header{version = ?DIAMETER_VERSION,
+ end_to_end_id = Seq,
+ hop_by_hop_id = Seq},
+ Pkt = #diameter_packet{header = Hdr,
+ msg = Msg},
+ #diameter_packet{bin = Bin} = diameter_codec:encode(Dict, Pkt),
Bin.
-%% okay/2
+%% okay/3
-okay({{accept, Ref}, _, _}, Hosts) ->
+okay({{accept, Ref}, _, _}, Hosts, Restrict) ->
T = {?MODULE, connection, Ref, Hosts},
diameter_reg:add(T),
- okay(diameter_reg:match(T));
+ if Restrict ->
+ okay(diameter_reg:match(T));
+ true ->
+ okay
+ end;
%% Register before matching so that at least one of two registering
-%% processes will match the other. (Which can't happen as long as
-%% diameter_peer_fsm guarantees at most one open connection to the same
-%% peer.)
+%% processes will match the other.
-okay({{connect, _}, _, _}, _) ->
+okay({{connect, _}, _, _}, _, _) ->
okay.
+%% okay/2
+
%% The peer hasn't been connected recently ...
okay([{_,P}]) ->
P = self(), %% assert
@@ -352,7 +443,7 @@ okay([{_,P}]) ->
%% ... or it has.
okay(C) ->
- [_|_] = [P ! close || {_,P} <- C, self() /= P],
+ [_|_] = [send(P, close) || {_,P} <- C, self() /= P],
reopen.
%% set_watchdog/1
@@ -374,35 +465,14 @@ tw(T)
tw({M,F,A}) ->
apply(M,F,A).
-%% open/2
-
-open(Pid, {_,_} = T) ->
- Pid ! {connection_up, self(), T}.
-
-%% failover/1
-
-failover(#watchdog{status = okay,
- parent = Pid}) ->
- Pid ! {connection_down, self()};
-
-failover(_) ->
- ok.
-
-%% close/1
-
-close(#watchdog{status = down}) ->
- ok;
-
-close(#watchdog{parent = Pid}) ->
- {{T, _}, _, _} = getr(restart),
- T == accept andalso (Pid ! {close, self()}).
-
%% send_watchdog/1
send_watchdog(#watchdog{pending = false,
- transport = TPid}
+ transport = TPid,
+ dictionary = Dict0,
+ sequence = Mask}
= S) ->
- TPid ! {send, encode(getr(dwr))},
+ send(TPid, {send, encode(getr(dwr), Mask, Dict0)}),
?LOG(send, 'DWR'),
S#watchdog{pending = true}.
@@ -430,13 +500,22 @@ rcv(N, _, _)
false;
rcv(_, Pkt, #watchdog{transport = TPid,
- message_data = T}) ->
- diameter_service:receive_message(TPid, Pkt, T).
+ dictionary = Dict0,
+ receive_data = T}) ->
+ diameter_traffic:receive_message(TPid, Pkt, Dict0, T).
throwaway(S) ->
throw({?MODULE, throwaway, S}).
%% rcv/2
+%%
+%% The lack of Hop-by-Hop and End-to-End Identifiers checks in a
+%% received DWA is intentional. The purpose of the message is to
+%% demonstrate life but a peer that consistently bungles it by sending
+%% the wrong identifiers causes the connection to toggle between OPEN
+%% and SUSPECT, with failover and failback as result, despite there
+%% being no real problem with connectivity. Thus, relax and accept any
+%% incoming DWA as being in response to an outgoing DWR.
%% INITIAL Receive DWA Pending = FALSE
%% Throwaway() INITIAL
@@ -475,12 +554,10 @@ rcv(_, #watchdog{status = okay} = S) ->
%% SetWatchdog() OKAY
rcv('DWA', #watchdog{status = suspect} = S) ->
- failback(S),
set_watchdog(S#watchdog{status = okay,
pending = false});
rcv(_, #watchdog{status = suspect} = S) ->
- failback(S),
set_watchdog(S#watchdog{status = okay});
%% REOPEN Receive DWA & Pending = FALSE
@@ -488,10 +565,8 @@ rcv(_, #watchdog{status = suspect} = S) ->
%% Failback() OKAY
rcv('DWA', #watchdog{status = reopen,
- num_dwa = 2 = N,
- parent = Pid}
+ num_dwa = 2 = N}
= S) ->
- open(Pid, eraser(open)),
S#watchdog{status = okay,
num_dwa = N+1,
pending = false};
@@ -510,11 +585,6 @@ rcv('DWA', #watchdog{status = reopen,
rcv(_, #watchdog{status = reopen} = S) ->
throwaway(S).
-%% failback/1
-
-failback(#watchdog{parent = Pid}) ->
- Pid ! {connection_up, self()}.
-
%% timeout/1
%%
%% The caller sets the watchdog on the return value.
@@ -539,7 +609,6 @@ timeout(#watchdog{status = T,
timeout(#watchdog{status = okay,
pending = true}
= S) ->
- failover(S),
S#watchdog{status = suspect};
%% SUSPECT Timer expires CloseConnection()
@@ -555,8 +624,7 @@ timeout(#watchdog{status = T,
= S)
when T == suspect;
T == reopen, P, N < 0 ->
- exit(TPid, shutdown),
- close(S),
+ exit(TPid, {shutdown, watchdog_timeout}),
S#watchdog{status = down};
%% REOPEN Timer expires & NumDWA = -1
@@ -590,7 +658,9 @@ timeout(#watchdog{status = reopen,
%% process has died. We only need to handle state down since we start
%% the first watchdog when transitioning out of initial.
-timeout(#watchdog{status = down} = S) ->
+timeout(#watchdog{status = T} = S)
+ when T == initial;
+ T == down ->
restart(S).
%% restart/1
@@ -600,19 +670,40 @@ restart(#watchdog{transport = undefined} = S) ->
restart(S) ->
S.
+%% restart/2
+%%
%% Only restart the transport in the connecting case. For an accepting
-%% transport, we've registered the peer connection when leaving state
-%% initial and this is used by a new accepting process to realize that
-%% it's actually in state down rather then initial when receiving
-%% notification of an open connection.
-
-restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid} = S) ->
- Pid ! {reconnect, self()},
- S#watchdog{transport = monitor(diameter_peer_fsm:start(T, Opts, Svc))};
+%% transport, there's no guarantee that an accepted connection in a
+%% restarted transport if from the peer we've lost contact with so
+%% have to be prepared for another watchdog to handle it. This is what
+%% the diameter_reg registration in this module is for: the peer
+%% connection is registered when leaving state initial and this is
+%% used by a new accepting watchdog to realize that it's actually in
+%% state down rather then initial when receiving notification of an
+%% open connection.
+
+restart({{connect, _} = T, Opts, Svc},
+ #watchdog{parent = Pid,
+ sequence = Mask,
+ restrict = {R,_},
+ dictionary = Dict0}
+ = S) ->
+ send(Pid, {reconnect, self()}),
+ Nodes = restrict_nodes(R),
+ S#watchdog{transport = start(T, Opts, Mask, Nodes, Dict0, Svc),
+ restrict = {R, lists:member(node(), Nodes)}};
+
+%% No restriction on the number of connections to the same peer: just
+%% die. Note that a state machine never enters state REOPEN in this
+%% case.
+restart({{accept, _}, _, _}, #watchdog{restrict = {_, false}}) ->
+ stop;
+
+%% Otherwise hang around until told to die.
restart({{accept, _}, _, _}, S) ->
S.
-%% Don't currently use Opts/Svc in the accept case but having them in
-%% the process dictionary is helpful if the process dies unexpectedly.
+
+%% Don't currently use Opts/Svc in the accept case.
%% dwr/1
@@ -622,3 +713,22 @@ dwr(#diameter_caps{origin_host = OH,
['DWR', {'Origin-Host', OH},
{'Origin-Realm', OR},
{'Origin-State-Id', OSI}].
+
+%% restrict_nodes/1
+
+restrict_nodes(false) ->
+ [];
+
+restrict_nodes(nodes) ->
+ [node() | nodes()];
+
+restrict_nodes(node) ->
+ [node()];
+
+restrict_nodes(Nodes)
+ when [] == Nodes;
+ is_atom(hd(Nodes)) ->
+ Nodes;
+
+restrict_nodes(F) ->
+ diameter_lib:eval(F).