aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/doc/src/diameter.xml13
-rw-r--r--lib/diameter/src/base/diameter_service.erl91
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl47
3 files changed, 129 insertions, 22 deletions
diff --git a/lib/diameter/doc/src/diameter.xml b/lib/diameter/doc/src/diameter.xml
index ec7bd2012e..10139b90c7 100644
--- a/lib/diameter/doc/src/diameter.xml
+++ b/lib/diameter/doc/src/diameter.xml
@@ -692,6 +692,19 @@ The packet record contains the CEA in question.</p>
</taglist>
</item>
+<tag><c>{watchdog, Ref, PeerRef, {From, To}, Config}</c></tag>
+<item>
+<code>
+Ref = transport_ref()
+PeerRef = diameter_app:peer_ref()
+From, To = initial | okay | suspect | down | reopen
+Config = {connect|listen, [transport_opt()]}
+</code>
+
+<p>
+An RFC 3539 watchdog state machine has changed state.</p>
+</item>
+
</taglist>
<p>
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index dc74c57334..26790ebfb4 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -64,9 +64,27 @@
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
+%% The "old" states maintained in this module historically.
-define(STATE_UP, up).
-define(STATE_DOWN, down).
+-type op_state() :: ?STATE_UP
+ | ?STATE_DOWN.
+
+%% The RFC 3539 watchdog states that are now maintained, albeit
+%% along with the old up/down. okay = up, else down.
+-define(WD_INITIAL, initial).
+-define(WD_OKAY, okay).
+-define(WD_SUSPECT, suspect).
+-define(WD_DOWN, down).
+-define(WD_REOPEN, reopen).
+
+-type wd_state() :: ?WD_INITIAL
+ | ?WD_OKAY
+ | ?WD_SUSPECT
+ | ?WD_DOWN
+ | ?WD_REOPEN.
+
-define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1
-define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests
-define(RESTART_TC, 1000). %% if restart was this recent
@@ -118,7 +136,8 @@
type :: match(connect | accept),
ref :: match(reference()), %% key into diameter_config
options :: match([diameter:transport_opt()]),%% from start_transport
- op_state = ?STATE_DOWN :: match(?STATE_DOWN | ?STATE_UP),
+ op_state = {?STATE_DOWN, ?WD_INITIAL}
+ :: match(op_state() | {op_state(), wd_state()}),
started = now(), %% at process start
conn = false :: match(boolean() | pid())}).
%% true at accept, pid() at connection_up (connT key)
@@ -508,6 +527,34 @@ transition({reconnect, Pid}, S) ->
reconnect(Pid, S),
ok;
+%% Watchdog is sending notification of a state transition. Note that
+%% the connection_up/down messages are pre-date this message and are
+%% still used. A 'watchdog' message will follow these and communicate
+%% the same state as was set in handling connection_up/down.
+transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName,
+ peerT = PeerT}) ->
+ #peer{ref = Ref, type = T, options = Opts, op_state = {OS,_}}
+ = P
+ = fetch(PeerT, Pid),
+ insert(PeerT, P#peer{op_state = {OS, To}}),
+ send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}),
+ ok;
+%% Death of a peer process results in the removal of it's peer and any
+%% associated conn record when 'DOWN' is received (after this) but the
+%% states will be {?STATE_UP, ?WD_DOWN} for a short time. (No real
+%% problem since ?WD_* is only used in service_info.) We set ?WD_OKAY
+%% as a consequence of connection_up since we know a watchdog is
+%% coming. We can't set anything at connection_down since we don't
+%% know if the subsequent watchdog message will be ?WD_DOWN or
+%% ?WD_SUSPECT. We don't (yet) set ?STATE_* as a consequence of a
+%% watchdog message since this requires changing some of the matching
+%% on ?STATE_*.
+%%
+%% Death of a conn process results in connection_down followed by
+%% watchdog ?WD_DOWN. The latter doesn't result in the conn record
+%% being deleted since 'DOWN' from death of its peer doesn't (yet)
+%% deal with the record having been removed.
+
%% Monitor process has died. Just die with a reason that tells
%% diameter_config about the happening. If a cleaner shutdown is
%% required then someone should stop us.
@@ -871,7 +918,14 @@ accepted(Pid, _TPid, #state{peerT = PeerT} = S) ->
fetch(Tid, Key) ->
[T] = ets:lookup(Tid, Key),
- T.
+ case T of
+ #peer{op_state = ?STATE_UP} = P ->
+ P#peer{op_state = {?STATE_UP, ?WD_OKAY}};
+ #peer{op_state = ?STATE_DOWN} = P ->
+ P#peer{op_state = {?STATE_DOWN, ?WD_DOWN}};
+ _ ->
+ T
+ end.
%%% ---------------------------------------------------------------------------
%%% # connection_up/3
@@ -917,12 +971,12 @@ connection_up(T, P, C, #state{peerT = PeerT,
service
= #diameter_service{applications = Apps}}
= S) ->
- #peer{conn = TPid, op_state = ?STATE_DOWN}
+ #peer{conn = TPid, op_state = {?STATE_DOWN, _}}
= P,
#conn{apps = SApps, caps = Caps}
= C,
- insert(PeerT, P#peer{op_state = ?STATE_UP}),
+ insert(PeerT, P#peer{op_state = {?STATE_UP, ?WD_OKAY}}),
request_peer_up(TPid),
report_status(up, P, C, S, T),
@@ -971,22 +1025,22 @@ peer_cb(MFA, Alias) ->
connection_down(Pid, #state{peerT = PeerT,
connT = ConnT}
= S) ->
- #peer{op_state = ?STATE_UP, %% assert
+ #peer{op_state = {?STATE_UP, WS}, %% assert
conn = TPid}
= P
= fetch(PeerT, Pid),
C = fetch(ConnT, TPid),
- insert(PeerT, P#peer{op_state = ?STATE_DOWN}),
+ insert(PeerT, P#peer{op_state = {?STATE_DOWN, WS}}),
connection_down(P,C,S).
%% connection_down/3
-connection_down(#peer{op_state = ?STATE_DOWN}, _, S) ->
+connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, S) ->
S;
connection_down(#peer{conn = TPid,
- op_state = ?STATE_UP}
+ op_state = {?STATE_UP, _}}
= P,
#conn{caps = Caps,
apps = SApps}
@@ -1035,7 +1089,7 @@ peer_down(Pid, Reason, #state{peerT = PeerT} = S) ->
%% Send an event at connection establishment failure.
closed({shutdown, {close, _TPid, Reason}},
- #peer{op_state = ?STATE_DOWN,
+ #peer{op_state = {?STATE_DOWN, _},
ref = Ref,
type = Type,
options = Opts},
@@ -2876,15 +2930,26 @@ it_acc(ConnT, Acc, #peer{pid = Pid,
op_state = OS,
started = T,
conn = TPid}) ->
+ WS = wd_state(OS),
dict:append(Ref,
[{type, Type},
{options, Opts},
- {watchdog, {Pid, T, OS}}
- | info_conn(ConnT, TPid)],
+ {watchdog, {Pid, T, WS}}
+ | info_conn(ConnT, TPid, WS /= ?WD_DOWN)],
Acc).
-info_conn(ConnT, TPid) ->
- info_conn(ets:lookup(ConnT, TPid)).
+info_conn(ConnT, TPid, true)
+ when is_pid(TPid) ->
+ info_conn(ets:lookup(ConnT, TPid));
+info_conn(_, _, _) ->
+ [].
+
+wd_state({_,S}) ->
+ S;
+wd_state(?STATE_UP) ->
+ ?WD_OKAY;
+wd_state(?STATE_DOWN) ->
+ ?WD_DOWN.
info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
[{peer, {Pid, T}},
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index fb22fd8275..d7474e5c56 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -54,7 +54,7 @@
%% number of DWAs received during reopen
%% end PCB
parent = self() :: pid(),
- transport :: pid(),
+ transport :: pid() | undefined,
tref :: reference(), %% reference for current watchdog timer
message_data}). %% term passed into diameter_service with message
@@ -64,6 +64,13 @@
%% that a failed capabilities exchange produces the desired exit
%% reason.
+-spec start(Type, {RecvData, [Opt], SvcName, #diameter_service{}})
+ -> {reference(), pid()}
+ when Type :: {connect|accept, diameter:transport_ref()},
+ RecvData :: term(),
+ Opt :: diameter:transport_opt(),
+ SvcName :: diameter:service_name().
+
start({_,_} = Type, T) ->
Ref = make_ref(),
{ok, Pid} = diameter_watchdog_sup:start_child({Ref, {Type, self(), T}}),
@@ -102,7 +109,7 @@ i({_, Pid, _} = T) -> %% from old code
erlang:monitor(process, Pid),
make_state(T).
-make_state({T, Pid, {ConnT,
+make_state({T, Pid, {RecvData,
Opts,
SvcName,
#diameter_service{applications = Apps,
@@ -116,7 +123,7 @@ make_state({T, Pid, {ConnT,
tw = proplists:get_value(watchdog_timer,
Opts,
?DEFAULT_TW_INIT),
- message_data = {ConnT, SvcName, Apps}}.
+ message_data = {RecvData, SvcName, Apps}}.
%% handle_call/3
@@ -134,14 +141,36 @@ handle_info(T, State) ->
case transition(T, State) of
ok ->
{noreply, State};
- #watchdog{status = X} = S ->
- ?LOGC(X =/= State#watchdog.status, transition, X),
+ #watchdog{} = S ->
+ event(State, S),
{noreply, S};
stop ->
?LOG(stop, T),
+ event(State, State#watchdog{status = down}),
{stop, {shutdown, T}, State}
end.
+event(#watchdog{status = T}, #watchdog{status = T}) ->
+ ok;
+
+event(#watchdog{transport = undefined}, #watchdog{transport = undefined}) ->
+ ok;
+
+event(#watchdog{status = From, transport = F, parent = Pid},
+ #watchdog{status = To, transport = T}) ->
+ E = {tpid(F,T), From, To},
+ notify(Pid, E),
+ ?LOG(transition, {self(), E}).
+
+tpid(_, Pid)
+ when is_pid(Pid) ->
+ Pid;
+tpid(Pid, _) ->
+ Pid.
+
+notify(Pid, E) ->
+ Pid ! {watchdog, self(), E}.
+
%% terminate/2
terminate(_, _) ->
@@ -251,8 +280,8 @@ transition({'DOWN', _, process, TPid, _},
status = initial}) ->
stop;
-transition({'DOWN', _, process, Pid, _},
- #watchdog{transport = Pid}
+transition({'DOWN', _, process, TPid, _},
+ #watchdog{transport = TPid}
= S) ->
failover(S),
close(S),
@@ -385,7 +414,7 @@ recv(Name, Pkt, S) ->
rcv(Name, Pkt, S),
NS
catch
- throw: {?MODULE, throwaway, #watchdog{} = NS} ->
+ {?MODULE, throwaway, #watchdog{} = NS} ->
NS
end.