aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/src/base/diameter_service.erl253
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl76
2 files changed, 137 insertions, 192 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index d01a5e5ab6..a0af92c2a2 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -64,15 +64,7 @@
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
-%% The states mirrored by peer_up/peer_down callbacks.
--define(STATE_UP, up).
--define(STATE_DOWN, down).
-
--type op_state() :: ?STATE_UP
- | ?STATE_DOWN.
-
-%% The RFC 3539 watchdog states that are now maintained, albeit
-%% along with the old up/down. okay = up, else down.
+%% RFC 3539 watchdog states.
-define(WD_INITIAL, initial).
-define(WD_OKAY, okay).
-define(WD_SUSPECT, suspect).
@@ -150,11 +142,10 @@
type :: match(connect | accept),
ref :: match(reference()), %% key into diameter_config
options :: match([diameter:transport_opt()]),%% from start_transport
- op_state = {?STATE_DOWN, ?WD_INITIAL}
- :: match({op_state(), wd_state()}),
+ wd_state = ?WD_INITIAL :: match(wd_state()),
started = now(), %% at process start
conn = false :: match(boolean() | pid())}).
- %% true at accepted, pid() at connection_up or reopen
+ %% true at accepted, pid() at okay/reopen
%% Record representing a peer process as implemented by
%% diameter_peer_fsm. The term "conn" is historical. Despite the name
@@ -520,67 +511,26 @@ transition({accepted, Pid, TPid}, S) ->
accepted(Pid, TPid, S),
ok;
-%% Peer process has a new open connection.
-transition({connection_up, Pid, T}, S) ->
- connection_up(Pid, T, S),
- ok;
-
-%% Watchdog has a new connection that will be opened after DW[RA]
-%% exchange. This message was added long after connection_up, to
-%% communicate the information as soon as it's available. Leave
-%% connection_up as is it for now, duplicated information and all.
-transition({reopen, Pid, T}, S) ->
- reopen(Pid, T, S),
- ok;
-
-%% Watchdog has left state OKAY.
-transition({connection_down, Pid}, S) ->
- connection_down(Pid, S),
- ok;
-
-%% Watchdog has returned to state OKAY.
-transition({connection_up, Pid}, S) ->
- connection_up(Pid, S),
- ok;
-
-%% Accepting transport has lost connectivity.
-transition({close, Pid}, S) ->
- close(Pid, S),
- ok;
-
%% Connecting transport is being restarted by watchdog.
transition({reconnect, Pid}, S) ->
reconnect(Pid, S),
ok;
-%% Watchdog is sending notification of a state transition. Note that
-%% the connection_up/down messages pre-date this message and are still
-%% used. A watchdog message will follow these and communicate the same
-%% state as was set in handling connection_up/down.
-transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName,
- peerT = PeerT}) ->
- #peer{ref = Ref, type = T, options = Opts, op_state = {OS,_}}
+%% Watchdog is sending notification of a state transition.
+transition({watchdog, Pid, {[TPid | Data], From, To}},
+ #state{service_name = SvcName,
+ peerT = PeerT}
+ = S) ->
+ #peer{ref = Ref, type = T, options = Opts}
= P
= fetch(PeerT, Pid),
- insert(PeerT, P#peer{op_state = {OS, To}}),
+ watchdog(TPid, Data, From, To, P, S),
send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}),
ok;
%% Death of a watchdog process (#peer.pid) results in the removal of
-%% it's peer and any associated conn record when 'DOWN' is received
-%% (after this) but the states will be {?STATE_UP, ?WD_DOWN} for a
-%% short time. (No real problem since ?WD_* is only used in
-%% service_info.) We set ?WD_OKAY as a consequence of connection_up
-%% since we know a watchdog is coming. We can't set anything at
-%% connection_down since we don't know if the subsequent watchdog
-%% message will be ?WD_DOWN or ?WD_SUSPECT. We don't (yet) set
-%% ?STATE_* as a consequence of a watchdog message since this requires
-%% changing some of the matching on ?STATE_*.
-%%
+%% it's peer and any associated conn record when 'DOWN' is received.
%% Death of a peer process process (#conn.pid, #peer.conn) results in
-%% connection_down followed by watchdog ?WD_DOWN. The latter doesn't
-%% result in the conn record being deleted since 'DOWN' from death of
-%% its watchdog doesn't (yet) deal with the record having been
-%% removed.
+%% ?WD_DOWN.
%% Monitor process has died. Just die with a reason that tells
%% diameter_config about the happening. If a cleaner shutdown is
@@ -946,20 +896,52 @@ fetch(Tid, Key) ->
T.
%%% ---------------------------------------------------------------------------
+%%% # watchdog/6
+%%% ---------------------------------------------------------------------------
+
+%% Watchdog has a new open connection.
+watchdog(TPid, [T], _, ?WD_OKAY, Peer, State) ->
+ connection_up({TPid, T}, Peer, State);
+
+%% Watchdog has a new connection that will be opened after DW[RA]
+%% exchange.
+watchdog(TPid, [T], _, ?WD_REOPEN, Peer, State) ->
+ reopen({TPid, T}, Peer, State);
+
+%% Watchdog has recovered a suspect connection.
+watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Peer, State) ->
+ #peer{conn = TPid} = Peer, %% assert
+ connection_up(Peer, State);
+
+%% Watchdog has an unresponsive connection.
+watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Peer, State) ->
+ #peer{conn = TPid} = Peer, %% assert
+ connection_down(Peer, To, State);
+
+%% Watchdog has lost its connection.
+watchdog(TPid, [], _, ?WD_DOWN = To, Peer, #state{connT = ConnT} = S) ->
+ close(Peer, S),
+ connection_down(Peer, To, S),
+ ets:delete(ConnT, TPid);
+
+watchdog(_, [], _, _, _, _) ->
+ ok.
+
+%%% ---------------------------------------------------------------------------
%%% # connection_up/3
%%% ---------------------------------------------------------------------------
%% Watchdog process has reached state OKAY.
-connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT,
- connT = ConnT}
- = S) ->
- P = fetch(PeerT, Pid),
+connection_up({TPid, {Caps, SApps, Pkt}},
+ #peer{pid = Pid}
+ = P,
+ #state{connT = ConnT}
+ = S) ->
C = #conn{pid = TPid,
apps = SApps,
caps = Caps,
peer = Pid},
-
insert(ConnT, C),
connection_up([Pkt], P#peer{conn = TPid}, C, S).
@@ -967,56 +949,45 @@ connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT,
%%% # reopen/3
%%% ---------------------------------------------------------------------------
-%% Note that this connection_up/3 rewrites the same #conn{} now
-%% written here. Both do so in case reopen has not happened in old
-%% code.
-
-reopen(Pid, {TPid, {Caps, SApps, _Pkt}}, #state{peerT = PeerT,
- connT = ConnT}) ->
- P = fetch(PeerT, Pid),
- C = #conn{pid = TPid,
- apps = SApps,
- caps = Caps,
- peer = Pid},
-
- insert(ConnT, C),
- #peer{op_state = {?STATE_DOWN, _}}
- = P,
- insert(PeerT, P#peer{op_state = {?STATE_DOWN, ?WD_REOPEN},
+reopen({TPid, {Caps, SApps, _Pkt}},
+ #peer{pid = Pid}
+ = P,
+ #state{peerT = PeerT,
+ connT = ConnT}) ->
+ insert(ConnT, #conn{pid = TPid,
+ apps = SApps,
+ caps = Caps,
+ peer = Pid}),
+ insert(PeerT, P#peer{wd_state = ?WD_REOPEN,
conn = TPid}).
%%% ---------------------------------------------------------------------------
%%% # connection_up/2
%%% ---------------------------------------------------------------------------
-%% Peer process has transitioned back into the open state. Note that there
-%% has been no new capabilties exchange in this case.
+%% Watchdog has recovered as suspect connection. Note that there has
+%% been no new capabilties exchange in this case.
-connection_up(Pid, #state{peerT = PeerT,
- connT = ConnT}
- = S) ->
- #peer{conn = TPid} = P = fetch(PeerT, Pid),
- C = fetch(ConnT, TPid),
- connection_up([], P, C, S).
+connection_up(#peer{conn = TPid} = P, #state{connT = ConnT} = S) ->
+ connection_up([], P, fetch(ConnT, TPid), S).
%% connection_up/4
-connection_up(T, P, C, #state{peerT = PeerT,
- local_peers = LDict,
- service_name = SvcName,
- service
- = #diameter_service{applications = Apps}}
- = S) ->
- #peer{conn = TPid, op_state = {?STATE_DOWN, _}}
- = P,
- #conn{apps = SApps, caps = Caps}
- = C,
-
- insert(PeerT, P#peer{op_state = {?STATE_UP, ?WD_OKAY}}),
-
+connection_up(Extra,
+ #peer{conn = TPid}
+ = P,
+ #conn{apps = SApps, caps = Caps}
+ = C,
+ #state{peerT = PeerT,
+ local_peers = LDict,
+ service_name = SvcName,
+ service
+ = #diameter_service{applications = Apps}}
+ = S) ->
+ insert(PeerT, P#peer{wd_state = ?WD_OKAY}),
request_peer_up(TPid),
insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
- report_status(up, P, C, S, T).
+ report_status(up, P, C, S, Extra).
insert_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps).
@@ -1052,30 +1023,11 @@ peer_cb(MFA, Alias) ->
end.
%%% ---------------------------------------------------------------------------
-%%% # connection_down/2
+%%% # connection_down/3
%%% ---------------------------------------------------------------------------
-%% Watchdog has transitioned out of state OKAY.
-
-connection_down(Pid, #state{peerT = PeerT,
- connT = ConnT}
- = S) ->
- #peer{op_state = {?STATE_UP, WS}, %% assert
- conn = TPid}
- = P
- = fetch(PeerT, Pid),
-
- C = fetch(ConnT, TPid),
- insert(PeerT, P#peer{op_state = {?STATE_DOWN, WS}}),
- connection_down(P,C,S).
-
-%% connection_down/3
-
-connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, _) ->
- ok;
-
-connection_down(#peer{conn = TPid,
- op_state = {?STATE_UP, _}}
+connection_down(#peer{wd_state = ?WD_OKAY,
+ conn = TPid}
= P,
#conn{caps = Caps,
apps = SApps}
@@ -1086,7 +1038,23 @@ connection_down(#peer{conn = TPid,
= S) ->
report_status(down, P, C, S, []),
remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
- request_peer_down(TPid, S).
+ request_peer_down(TPid, S);
+
+connection_down(#peer{}, #conn{}, _) ->
+ ok;
+
+connection_down(#peer{wd_state = WS,
+ conn = TPid}
+ = P,
+ To,
+ #state{peerT = PeerT,
+ connT = ConnT}
+ = S)
+ when is_atom(To) ->
+ insert(PeerT, P#peer{wd_state = To}),
+ ?WD_OKAY == WS
+ andalso
+ connection_down(P, fetch(ConnT, TPid), S).
remove_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps).
@@ -1118,11 +1086,12 @@ peer_down(Pid, Reason, #state{peerT = PeerT} = S) ->
%% Send an event at connection establishment failure.
closed({shutdown, {close, _TPid, Reason}},
- #peer{op_state = {?STATE_DOWN, _},
+ #peer{wd_state = WS,
ref = Ref,
type = Type,
options = Opts},
- #state{service_name = SvcName}) ->
+ #state{service_name = SvcName})
+ when WS /= ?WD_OKAY ->
send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}});
closed(_, _, _) ->
ok.
@@ -1134,9 +1103,8 @@ peer_down(#peer{conn = B}, _)
%% ... or maybe it has.
peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) ->
- #conn{} = C = fetch(ConnT, TPid),
- ets:delete_object(ConnT, C),
- connection_down(P,C,S).
+ connection_down(P, ?WD_DOWN, S),
+ ets:delete(ConnT, TPid).
%% restart/2
@@ -1222,14 +1190,13 @@ tc(false = No, _, _) -> %% removed
%% the accepting watchdog upon reception of a CER from the previously
%% connected peer, or us after reconnect_timer timeout.
-close(Pid, #state{service_name = SvcName,
- peerT = PeerT}) ->
- #peer{pid = Pid,
- type = accept,
- ref = Ref,
- options = Opts}
- = fetch(PeerT, Pid),
-
+close(#peer{type = connect}, _) ->
+ ok;
+close(#peer{type = accept,
+ pid = Pid,
+ ref = Ref,
+ options = Opts},
+ #state{service_name = SvcName}) ->
c(Pid, diameter_config:have_transport(SvcName, Ref), Opts).
%% Tell watchdog to (maybe) die later ...
@@ -1716,6 +1683,8 @@ have_request(Pkt, TPid) ->
%% request_peer_up/1
+%% Insert an element that is used to detect whether or not there has
+%% been a failover when inserting an outgoing request.
request_peer_up(TPid) ->
ets:insert(?REQUEST_TABLE, {TPid}).
@@ -3147,7 +3116,7 @@ peer_acc(ConnT, Acc, #peer{pid = Pid,
type = Type,
ref = Ref,
options = Opts,
- op_state = {_, WS},
+ wd_state = WS,
started = T,
conn = TPid}) ->
dict:append(Ref,
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index 49b6776a84..8d10755180 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -151,29 +151,45 @@ handle_info(T, #watchdog{} = State) ->
ok ->
{noreply, State};
#watchdog{} = S ->
- event(State, S),
+ event(T, State, S),
{noreply, S};
stop ->
?LOG(stop, T),
- event(State, State#watchdog{status = down}),
+ event(T, State, State#watchdog{status = down}),
{stop, {shutdown, T}, State}
end.
-event(#watchdog{status = T}, #watchdog{status = T}) ->
+event(_, #watchdog{status = T}, #watchdog{status = T}) ->
ok;
-event(#watchdog{transport = undefined}, #watchdog{transport = undefined}) ->
+event(_, #watchdog{transport = undefined}, #watchdog{transport = undefined}) ->
ok;
-event(#watchdog{status = From, transport = F, parent = Pid},
+event(Msg,
+ #watchdog{status = From, transport = F, parent = Pid},
#watchdog{status = To, transport = T}) ->
- E = {tpid(F,T), From, To},
+ TPid = tpid(F,T),
+ E = {[TPid | data(Msg, TPid, From, To)], From, To},
notify(Pid, E),
?LOG(transition, {self(), E}).
+data(Msg, TPid, reopen, okay) ->
+ {recv, TPid, 'DWA', _Pkt} = Msg, %% assert
+ {TPid, T} = eraser(open),
+ [T];
+
+data({open, TPid, _Hosts, T}, TPid, _From, To)
+ when To == okay;
+ To == reopen ->
+ [T];
+
+data(_, _, _, _) ->
+ [].
+
tpid(_, Pid)
when is_pid(Pid) ->
Pid;
+
tpid(Pid, _) ->
Pid.
@@ -248,15 +264,13 @@ transition({close, TPid, _Reason}, #watchdog{transport = TPid}) ->
%% know the identity of the peer (ie. now) that we know that we're in
%% state down rather than initial.
-transition({open, TPid, Hosts, T} = Open,
+transition({open, TPid, Hosts, _} = Open,
#watchdog{transport = TPid,
status = initial,
- parent = Pid,
restrict = {_, R}}
= S) ->
case okay(getr(restart), Hosts, R) of
okay ->
- open(Pid, {TPid, T}),
set_watchdog(S#watchdog{status = okay});
reopen ->
transition(Open, S#watchdog{status = down})
@@ -267,17 +281,15 @@ transition({open, TPid, Hosts, T} = Open,
%% SetWatchdog()
%% Pending = TRUE REOPEN
-transition({open = P, TPid, _Hosts, T},
+transition({open = Key, TPid, _Hosts, T},
#watchdog{transport = TPid,
- parent = Pid,
status = down}
= S) ->
%% Store the info we need to notify the parent to reopen the
%% connection after the requisite DWA's are received, at which
%% time we eraser(open). The reopen message is a later addition,
%% to communicate the new capabilities as soon as they're known.
- putr(P, {TPid, T}),
- Pid ! {reopen, self(), {TPid, T}},
+ putr(Key, {TPid, T}),
set_watchdog(send_watchdog(S#watchdog{status = reopen,
num_dwa = 0}));
@@ -300,8 +312,6 @@ transition({'DOWN', _, process, TPid, _},
transition({'DOWN', _, process, TPid, _},
#watchdog{transport = TPid}
= S) ->
- failover(S),
- close(S),
set_watchdog(S#watchdog{status = down,
pending = false,
transport = undefined});
@@ -401,29 +411,6 @@ tw(T)
tw({M,F,A}) ->
apply(M,F,A).
-%% open/2
-
-open(Pid, {_,_} = T) ->
- Pid ! {connection_up, self(), T}.
-
-%% failover/1
-
-failover(#watchdog{status = okay,
- parent = Pid}) ->
- Pid ! {connection_down, self()};
-
-failover(_) ->
- ok.
-
-%% close/1
-
-close(#watchdog{status = down}) ->
- ok;
-
-close(#watchdog{parent = Pid}) ->
- {{T, _}, _, _} = getr(restart),
- T == accept andalso (Pid ! {close, self()}).
-
%% send_watchdog/1
send_watchdog(#watchdog{pending = false,
@@ -511,12 +498,10 @@ rcv(_, #watchdog{status = okay} = S) ->
%% SetWatchdog() OKAY
rcv('DWA', #watchdog{status = suspect} = S) ->
- failback(S),
set_watchdog(S#watchdog{status = okay,
pending = false});
rcv(_, #watchdog{status = suspect} = S) ->
- failback(S),
set_watchdog(S#watchdog{status = okay});
%% REOPEN Receive DWA & Pending = FALSE
@@ -524,10 +509,8 @@ rcv(_, #watchdog{status = suspect} = S) ->
%% Failback() OKAY
rcv('DWA', #watchdog{status = reopen,
- num_dwa = 2 = N,
- parent = Pid}
+ num_dwa = 2 = N}
= S) ->
- open(Pid, eraser(open)),
S#watchdog{status = okay,
num_dwa = N+1,
pending = false};
@@ -546,11 +529,6 @@ rcv('DWA', #watchdog{status = reopen,
rcv(_, #watchdog{status = reopen} = S) ->
throwaway(S).
-%% failback/1
-
-failback(#watchdog{parent = Pid}) ->
- Pid ! {connection_up, self()}.
-
%% timeout/1
%%
%% The caller sets the watchdog on the return value.
@@ -575,7 +553,6 @@ timeout(#watchdog{status = T,
timeout(#watchdog{status = okay,
pending = true}
= S) ->
- failover(S),
S#watchdog{status = suspect};
%% SUSPECT Timer expires CloseConnection()
@@ -592,7 +569,6 @@ timeout(#watchdog{status = T,
when T == suspect;
T == reopen, P, N < 0 ->
exit(TPid, {shutdown, watchdog_timeout}),
- close(S),
S#watchdog{status = down};
%% REOPEN Timer expires & NumDWA = -1