aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base')
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl111
-rw-r--r--lib/diameter/src/base/diameter_reg.erl9
-rw-r--r--lib/diameter/src/base/diameter_service.erl570
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl206
4 files changed, 388 insertions, 508 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 5dab6214b1..de341741db 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -126,24 +126,17 @@
%% State Machine rather than closer to the transport. This is what we
%% now do below: connect/accept call diameter_watchdog and return the
%% pid of the watchdog process, and the watchdog in turn calls start/3
-%% below to start the process implementing the Peer State Machine. The
-%% former is a "peer" in diameter_service while the latter is a
-%% "conn". In a sense, diameter_service sees the watchdog as
-%% implementing the Peer State Machine and the process implemented
-%% here as being the transport, not being aware of the watchdog at
-%% all.
+%% below to start the process implementing the Peer State Machine.
%%
-%%% ---------------------------------------------------------------------------
-%%% # start({connect|accept, Ref}, Opts, Service)
-%%%
-%%% Output: Pid
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # start/3
+%% ---------------------------------------------------------------------------
-spec start(T, [Opt], {diameter:sequence(),
diameter:restriction(),
#diameter_service{}})
- -> pid()
+ -> {reference(), pid()}
when T :: {connect|accept, diameter:transport_ref()},
Opt :: diameter:transport_opt().
@@ -152,9 +145,15 @@
%% specified on the transport in question. Check here that the list is
%% still non-empty.
-start({_,_} = Type, Opts, MS) ->
- {ok, Pid} = diameter_peer_fsm_sup:start_child({self(), Type, Opts, MS}),
- Pid.
+start({_,_} = Type, Opts, S) ->
+ Ack = make_ref(),
+ T = {Ack, self(), Type, Opts, S},
+ {ok, Pid} = diameter_peer_fsm_sup:start_child(T),
+ try
+ {erlang:monitor(process, Pid), Pid}
+ after
+ Pid ! Ack
+ end.
start_link(T) ->
{ok, _} = proc_lib:start_link(?MODULE,
@@ -163,8 +162,8 @@ start_link(T) ->
infinity,
diameter_lib:spawn_opts(server, [])).
-%%% ---------------------------------------------------------------------------
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
%% init/1
@@ -172,12 +171,13 @@ init(T) ->
proc_lib:init_ack({ok, self()}),
gen_server:enter_loop(?MODULE, [], i(T)).
-i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps,
- capabilities = LCaps}
- = Svc}}) ->
- [] /= Apps orelse ?ERROR({no_apps, T, Opts}),
+i({Ack, WPid, {M, Ref} = T, Opts, {Mask,
+ Nodes,
+ #diameter_service{capabilities = LCaps}
+ = Svc}}) ->
+ erlang:monitor(process, WPid),
+ wait(Ack, WPid),
putr(?DWA_KEY, dwa(LCaps)),
- {M, Ref} = T,
diameter_stats:reg(Ref),
{[Cs,Ds], Rest} = proplists:split(Opts, [capabilities_cb, disconnect_cb]),
putr(?CB_KEY, {Ref, [F || {_,F} <- Cs]}),
@@ -185,7 +185,6 @@ i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps,
putr(?REF_KEY, Ref),
putr(?SEQUENCE_KEY, Mask),
putr(?RESTRICT_KEY, Nodes),
- erlang:monitor(process, WPid),
{TPid, Addrs} = start_transport(T, Rest, Svc),
Tmo = proplists:get_value(capx_timeout, Opts, ?EVENT_TIMEOUT),
?IS_TIMEOUT(Tmo) orelse ?ERROR({invalid, {capx_timeout, Tmo}}),
@@ -203,6 +202,16 @@ i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps,
%% watchdog start (start/2) succeeds regardless so as not to crash the
%% service.
+%% Wait for the caller to have a monitor to avoid a race with our
+%% death. (Since the exit reason is used in diameter_service.)
+wait(Ref, Pid) ->
+ receive
+ Ref ->
+ ok;
+ {'DOWN', _, process, Pid, _} = D ->
+ exit({shutdown, D})
+ end.
+
start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) ->
Addrs0 = LCaps#diameter_caps.host_ip_address,
start_transport(Addrs0, {T, Opts, Svc}).
@@ -274,13 +283,12 @@ handle_info(T, #state{} = State) ->
{noreply, S};
{stop, Reason} ->
?LOG(stop, Reason),
- x(Reason, State);
+ {stop, {shutdown, Reason}, State};
stop ->
?LOG(stop, T),
- x(T, State)
+ {stop, {shutdown, T}, State}
catch
exit: {diameter_codec, encode, _} = Reason ->
- close_wd(Reason, State#state.parent),
?LOG(stop, Reason),
%% diameter_codec:encode/2 emits an error report. Only
%% indicate the probable reason here.
@@ -300,10 +308,6 @@ handle_info(T, #state{} = State) ->
%% succesfully encoded. It's not checked at diameter:add_transport/2
%% since this can be called before creating the service.
-x(Reason, #state{} = S) ->
- close_wd(Reason, S),
- {stop, {shutdown, Reason}, S}.
-
%% terminate/2
terminate(_, _) ->
@@ -314,8 +318,8 @@ terminate(_, _) ->
code_change(_, State, _) ->
{ok, State}.
-%%% ---------------------------------------------------------------------------
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
putr(Key, Val) ->
put({?MODULE, Key}, Val).
@@ -378,9 +382,8 @@ transition({diameter, {recv, Pkt}}, S) ->
recv(Pkt, S);
%% Timeout when still in the same state ...
-transition({timeout = T, PS}, #state{state = PS} = S) ->
- close({capx(PS), T}, S),
- stop;
+transition({timeout = T, PS}, #state{state = PS}) ->
+ {stop, {capx(PS), T}};
%% ... or not.
transition({timeout, _}, _) ->
@@ -393,8 +396,6 @@ transition({send, Msg}, #state{transport = TPid}) ->
%% Request for graceful shutdown at remove_transport, stop_service of
%% application shutdown.
-transition({shutdown = T, Pid}, S) ->
- transition({T, Pid, transport}, S);
transition({shutdown, Pid, Reason}, #state{parent = Pid, dpr = false} = S) ->
dpr(Reason, S);
transition({shutdown, Pid, _}, #state{parent = Pid}) ->
@@ -459,7 +460,7 @@ send_CER(#state{state = {'Wait-Conn-Ack', Tmo},
OH = LCaps#diameter_caps.origin_host,
req_send_CER(OH, Remote)
orelse
- close({already_connected, Remote, LCaps}, S),
+ close({already_connected, Remote, LCaps}),
CER = build_CER(S),
?LOG(send, 'CER'),
#diameter_packet{header = #diameter_header{end_to_end_id = Eid,
@@ -692,17 +693,17 @@ cea(CEA, RC) ->
CEA#diameter_base_CEA{'Result-Code' = RC}.
post('CER' = T, RC, Pkt, S) ->
- [fun close/2, {T, caps(S), {RC, Pkt}}];
+ [fun(_) -> close({T, caps(S), {RC, Pkt}}) end];
post(_, _, _, _) ->
ok.
rejected({capabilities_cb, _F, Reason}, T, S) ->
rejected(Reason, T, S);
-rejected(discard, T, S) ->
- close(T, S);
+rejected(discard, T, _) ->
+ close(T);
rejected({N, Es}, T, S) ->
- {answer('CER', N, Es, S), [fun close/2, T]};
+ {answer('CER', N, Es, S), [fun(_) -> close(T) end]};
rejected(N, T, S) ->
rejected({N, []}, T, S).
@@ -862,7 +863,7 @@ handle_CEA(#diameter_packet{bin = Bin}
of
_ -> open(DPkt, SApps, Caps, {connect, hd([_] = IS)}, S)
catch
- ?FAILURE(Reason) -> close({'CEA', Reason, Caps, DPkt}, S)
+ ?FAILURE(Reason) -> close({'CEA', Reason, Caps, DPkt})
end.
%% Check more than the result code since the peer could send success
%% regardless. If not 2001 then a peer_up callback could do anything
@@ -882,7 +883,7 @@ recv_CEA(#diameter_packet{header = #diameter_header{version
T;
recv_CEA(Pkt, S) ->
- close({'CEA', caps(S), Pkt}, S).
+ close({'CEA', caps(S), Pkt}).
caps(#diameter_service{capabilities = Caps}) ->
Caps;
@@ -935,14 +936,14 @@ open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid,
%% We've advertised TLS support: tell the transport the result
%% and expect a reply when the handshake is complete.
-tls_ack(true, Caps, Type, IS, #state{transport = TPid} = S) ->
+tls_ack(true, Caps, Type, IS, #state{transport = TPid}) ->
Ref = make_ref(),
TPid ! {diameter, {tls, Ref, Type, IS == ?TLS}},
receive
{diameter, {tls, Ref}} ->
ok;
{'DOWN', _, process, TPid, Reason} ->
- close({tls_ack, Reason, Caps}, S)
+ close({tls_ack, Reason, Caps})
end;
%% Or not. Don't send anything to the transport so that transports
@@ -955,25 +956,11 @@ capz(#diameter_caps{} = L, #diameter_caps{} = R) ->
= list_to_tuple([diameter_caps | lists:zip(tl(tuple_to_list(L)),
tl(tuple_to_list(R)))]).
-%% close/2
+%% close/1
-%% Tell the watchdog that our death isn't due to transport failure.
-close(Reason, #state{parent = Pid}) ->
- close_wd(Reason, Pid),
+close(Reason) ->
throw({?MODULE, close, Reason}).
-%% close_wd/2
-
-%% Ensure the watchdog dies if DPR has been sent ...
-close_wd(_, #state{dpr = false}) ->
- ok;
-close_wd(Reason, #state{parent = Pid}) ->
- close_wd(Reason, Pid);
-
-%% ... or otherwise
-close_wd(Reason, Pid) ->
- Pid ! {close, self(), Reason}.
-
%% dwa/1
dwa(#diameter_caps{origin_host = OH,
diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl
index 619b12ecad..ac58e4bf5b 100644
--- a/lib/diameter/src/base/diameter_reg.erl
+++ b/lib/diameter/src/base/diameter_reg.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -208,10 +208,6 @@ init(_) ->
%% # handle_call/3
%% ----------------------------------------------------------
-handle_call(Req, From, S)
- when not is_record(S, state) ->
- handle_call(Req, From, upgrade(S));
-
handle_call({add, Fun, Key, Pid}, _, S) ->
B = Fun(?TABLE, {Key, Pid}),
monitor(B andalso no_monitor(Pid), Pid),
@@ -281,9 +277,6 @@ code_change(_OldVsn, State, _Extra) ->
%% ===========================================================================
-upgrade(S) ->
- #state{} = list_to_tuple(tuple_to_list(S) ++ [[]]).
-
monitor(true, Pid) ->
ets:insert(?TABLE, ?MONITOR(Pid, erlang:monitor(process, Pid)));
monitor(false, _) ->
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index d2a416166f..3cab914fdb 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -64,15 +64,7 @@
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
-%% The states mirrored by peer_up/peer_down callbacks.
--define(STATE_UP, up).
--define(STATE_DOWN, down).
-
--type op_state() :: ?STATE_UP
- | ?STATE_DOWN.
-
-%% The RFC 3539 watchdog states that are now maintained, albeit
-%% along with the old up/down. okay = up, else down.
+%% RFC 3539 watchdog states.
-define(WD_INITIAL, initial).
-define(WD_OKAY, okay).
-define(WD_SUSPECT, suspect).
@@ -122,8 +114,10 @@
{id = now(),
service_name, %% as passed to start_service/2, key in ?STATE_TABLE
service :: #diameter_service{},
- peerT = ets_new(peers) :: ets:tid(),%% #peer{} at start_fsm
- connT = ets_new(conns) :: ets:tid(),%% #conn{} at connection_up/reopen
+ watchdogT = ets_new(watchdogs) %% #watchdog{} at start
+ :: ets:tid(),
+ peerT = ets_new(peers) %% #peer{pid = TPid} at okay/reopen
+ :: ets:tid(),
shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
monitor = false :: false | pid(), %% process to die with
@@ -139,36 +133,26 @@
%% service record is used to determine whether or not we need to call
%% the process for a pick_peer callback.
-%% Record representing a watchdog process as implemented by
-%% diameter_watchdog. The term "peer" here is historical, made
-%% especially confusing by the fact that a peer_ref() in the
-%% documentation is the key of a #conn{} record, not a #peer{} record.
-%% The name is also unfortunate given the meaning of peer in the
-%% Diameter sense.
--record(peer,
+%% Record representing an RFC 3539 watchdog process implemented by
+%% diameter_watchdog.
+-record(watchdog,
{pid :: match(pid()),
type :: match(connect | accept),
ref :: match(reference()), %% key into diameter_config
options :: match([diameter:transport_opt()]),%% from start_transport
- op_state = {?STATE_DOWN, ?WD_INITIAL}
- :: match(op_state() | {op_state(), wd_state()}),
+ state = ?WD_INITIAL :: match(wd_state()),
started = now(), %% at process start
- conn = false :: match(boolean() | pid())}).
- %% true at accepted, pid() at connection_up or reopen
-
-%% Record representing a peer process as implemented by
-%% diameter_peer_fsm. The term "conn" is historical. Despite the name
-%% here, comments refer to watchdog and peer processes, that are keys
-%% in #peer{} and #conn{} records respectively. To add to the
-%% confusion, a #request.transport is a peer process = key in a
-%% #conn{} record. The actual transport process (that the peer process
-%% knows about and that has a transport connection) isn't seen here.
--record(conn,
+ peer = false :: match(boolean() | pid())}).
+ %% true at accepted, pid() at okay/reopen
+
+%% Record representing an Peer State Machine processes implemented by
+%% diameter_peer_fsm.
+-record(peer,
{pid :: pid(),
apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias}
caps :: #diameter_caps{},
started = now(), %% at process start
- peer :: pid()}). %% key into peerT
+ watchdog :: pid()}). %% key into watchdogT
%% Record stored in diameter_request for each outgoing request.
-record(request,
@@ -520,67 +504,39 @@ transition({accepted, Pid, TPid}, S) ->
accepted(Pid, TPid, S),
ok;
-%% Peer process has a new open connection.
-transition({connection_up, Pid, T}, S) ->
- connection_up(Pid, T, S),
- ok;
-
-%% Watchdog has a new connection that will be opened after DW[RA]
-%% exchange. This message was added long after connection_up, to
-%% communicate the information as soon as it's available. Leave
-%% connection_up as is it for now, duplicated information and all.
-transition({reopen, Pid, T}, S) ->
- reopen(Pid, T, S),
- ok;
-
-%% Watchdog has left state OKAY.
-transition({connection_down, Pid}, S) ->
- connection_down(Pid, S),
- ok;
-
-%% Watchdog has returned to state OKAY.
-transition({connection_up, Pid}, S) ->
- connection_up(Pid, S),
- ok;
-
-%% Accepting transport has lost connectivity.
-transition({close, Pid}, S) ->
- close(Pid, S),
- ok;
-
%% Connecting transport is being restarted by watchdog.
transition({reconnect, Pid}, S) ->
reconnect(Pid, S),
ok;
-%% Watchdog is sending notification of a state transition. Note that
-%% the connection_up/down messages pre-date this message and are still
-%% used. A watchdog message will follow these and communicate the same
-%% state as was set in handling connection_up/down.
-transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName,
- peerT = PeerT}) ->
- #peer{ref = Ref, type = T, options = Opts, op_state = {OS,_}}
- = P
- = fetch(PeerT, Pid),
- insert(PeerT, P#peer{op_state = {OS, To}}),
+%% Watchdog is sending notification of transport death.
+transition({close, Pid, Reason}, #state{service_name = SvcName,
+ watchdogT = WatchdogT}) ->
+ #watchdog{state = WS,
+ ref = Ref,
+ type = Type,
+ options = Opts}
+ = fetch(WatchdogT, Pid),
+ WS /= ?WD_OKAY
+ andalso
+ send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}),
+ ok;
+
+%% Watchdog is sending notification of a state transition.
+transition({watchdog, Pid, {[TPid | Data], From, To}},
+ #state{service_name = SvcName,
+ watchdogT = WatchdogT}
+ = S) ->
+ #watchdog{ref = Ref, type = T, options = Opts}
+ = Wd
+ = fetch(WatchdogT, Pid),
+ watchdog(TPid, Data, From, To, Wd, S),
send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}),
ok;
-%% Death of a watchdog process (#peer.pid) results in the removal of
-%% it's peer and any associated conn record when 'DOWN' is received
-%% (after this) but the states will be {?STATE_UP, ?WD_DOWN} for a
-%% short time. (No real problem since ?WD_* is only used in
-%% service_info.) We set ?WD_OKAY as a consequence of connection_up
-%% since we know a watchdog is coming. We can't set anything at
-%% connection_down since we don't know if the subsequent watchdog
-%% message will be ?WD_DOWN or ?WD_SUSPECT. We don't (yet) set
-%% ?STATE_* as a consequence of a watchdog message since this requires
-%% changing some of the matching on ?STATE_*.
-%%
-%% Death of a peer process process (#conn.pid, #peer.conn) results in
-%% connection_down followed by watchdog ?WD_DOWN. The latter doesn't
-%% result in the conn record being deleted since 'DOWN' from death of
-%% its watchdog doesn't (yet) deal with the record having been
-%% removed.
+%% Death of a watchdog process (#watchdog.pid) results in the removal of
+%% it's peer and any associated conn record when 'DOWN' is received.
+%% Death of a peer process process (#peer.pid, #watchdog.peer) results in
+%% ?WD_DOWN.
%% Monitor process has died. Just die with a reason that tells
%% diameter_config about the happening. If a cleaner shutdown is
@@ -589,9 +545,9 @@ transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) ->
{stop, {monitor, Reason}};
%% Local watchdog process has died.
-transition({'DOWN', _, process, Pid, Reason}, S)
+transition({'DOWN', _, process, Pid, _Reason}, S)
when node(Pid) == node() ->
- peer_down(Pid, Reason, S),
+ watchdog_down(Pid, S),
ok;
%% Remote service wants to know about shared peers.
@@ -716,49 +672,30 @@ mod_state(Alias, ModS) ->
%%% # shutdown/2
%%% ---------------------------------------------------------------------------
-%% remove_transport: ask watchdogs to terminate their transport.
-shutdown(Refs, #state{peerT = PeerT})
+%% remove_transport
+shutdown(Refs, #state{watchdogT = WatchdogT})
when is_list(Refs) ->
- ets:foldl(fun(P,ok) -> sp(P, Refs), ok end, ok, PeerT);
-
-%% application/service shutdown: ask transports to terminate themselves.
-shutdown(Reason, #state{peerT = PeerT}) ->
- %% A transport might not be alive to receive the shutdown request
- %% but give those that are a chance to shutdown gracefully.
- shutdown(conn, Reason, PeerT),
- %% Kill the watchdogs explicitly in case there was no transport.
- shutdown(peer, Reason, PeerT).
-
-%% sp/2
-
-sp(#peer{ref = Ref, pid = Pid}, Refs) ->
- lists:member(Ref, Refs)
- andalso (Pid ! {shutdown, self()}). %% 'DOWN' cleans up
+ ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, WatchdogT);
-%% shutdown/3
-
-shutdown(Who, Reason, T) ->
- diameter_lib:wait(ets:foldl(fun(X,A) -> shutdown(Who, X, Reason, A) end,
+%% application/service shutdown
+shutdown(Reason, #state{watchdogT = WatchdogT})
+ when Reason == application;
+ Reason == service ->
+ diameter_lib:wait(ets:foldl(fun(P,A) -> st(P, Reason, A) end,
[],
- T)).
+ WatchdogT)).
-shutdown(conn = Who, #peer{op_state = {OS,_}} = P, Reason, Acc) ->
- shutdown(Who, P#peer{op_state = OS}, Reason, Acc);
+%% st/2
-shutdown(conn,
- #peer{pid = Pid, op_state = ?STATE_UP, conn = TPid},
- Reason,
- Acc) ->
- TPid ! {shutdown, Pid, Reason},
- [TPid | Acc];
+st(#watchdog{ref = Ref, pid = Pid}, Refs) ->
+ lists:member(Ref, Refs)
+ andalso (Pid ! {shutdown, self(), transport}). %% 'DOWN' cleans up
-shutdown(peer, #peer{pid = Pid}, _Reason, Acc)
- when is_pid(Pid) ->
- exit(Pid, shutdown),
- [Pid | Acc];
+%% st/3
-shutdown(_, #peer{}, _, Acc) ->
- Acc.
+st(#watchdog{pid = Pid}, Reason, Acc) ->
+ Pid ! {shutdown, self(), Reason},
+ [Pid | Acc].
%%% ---------------------------------------------------------------------------
%%% # call_service/2
@@ -787,8 +724,6 @@ cs(undefined, _) ->
%%% ---------------------------------------------------------------------------
%%% # i/1
-%%%
-%%% Output: #state{}
%%% ---------------------------------------------------------------------------
%% Intialize the state of a service gen_server.
@@ -891,22 +826,22 @@ type(connect = T) -> T.
%% start/4
-start(Ref, Type, Opts, #state{peerT = PeerT,
- connT = ConnT,
+start(Ref, Type, Opts, #state{watchdogT = WatchdogT,
+ peerT = PeerT,
options = SvcOpts,
service_name = SvcName,
service = Svc})
when Type == connect;
Type == accept ->
- Pid = s(Type, Ref, {ConnT,
+ Pid = s(Type, Ref, {PeerT,
Opts,
SvcName,
SvcOpts,
merge_service(Opts, Svc)}),
- insert(PeerT, #peer{pid = Pid,
- type = Type,
- ref = Ref,
- options = Opts}),
+ insert(WatchdogT, #watchdog{pid = Pid,
+ type = Type,
+ ref = Ref,
+ options = Opts}),
Pid.
%% Note that the service record passed into the watchdog is the merged
@@ -953,23 +888,50 @@ ms(_, Svc) ->
%%% # accepted/3
%%% ---------------------------------------------------------------------------
-accepted(Pid, _TPid, #state{peerT = PeerT} = S) ->
- #peer{ref = Ref, type = accept = T, conn = false, options = Opts}
- = P
- = fetch(PeerT, Pid),
- insert(PeerT, P#peer{conn = true}), %% mark replacement as started
- start(Ref, T, Opts, S). %% start new watchdog
+accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) ->
+ #watchdog{ref = Ref, type = accept = T, peer = false, options = Opts}
+ = Wd
+ = fetch(WatchdogT, Pid),
+ insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement as started
+ start(Ref, T, Opts, S). %% start new watchdog
fetch(Tid, Key) ->
[T] = ets:lookup(Tid, Key),
- case T of
- #peer{op_state = ?STATE_UP} = P ->
- P#peer{op_state = {?STATE_UP, ?WD_OKAY}};
- #peer{op_state = ?STATE_DOWN} = P ->
- P#peer{op_state = {?STATE_DOWN, ?WD_DOWN}};
- _ ->
- T
- end.
+ T.
+
+%%% ---------------------------------------------------------------------------
+%%% # watchdog/6
+%%%
+%%% React to a watchdog state transition.
+%%% ---------------------------------------------------------------------------
+
+%% Watchdog has a new open connection.
+watchdog(TPid, [T], _, ?WD_OKAY, Wd, State) ->
+ connection_up({TPid, T}, Wd, State);
+
+%% Watchdog has a new connection that will be opened after DW[RA]
+%% exchange.
+watchdog(TPid, [T], _, ?WD_REOPEN, Wd, State) ->
+ reopen({TPid, T}, Wd, State);
+
+%% Watchdog has recovered a suspect connection.
+watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) ->
+ #watchdog{peer = TPid} = Wd, %% assert
+ connection_up(Wd, State);
+
+%% Watchdog has an unresponsive connection.
+watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) ->
+ #watchdog{peer = TPid} = Wd, %% assert
+ connection_down(Wd, To, State);
+
+%% Watchdog has lost its connection.
+watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) ->
+ close(Wd, S),
+ connection_down(Wd, To, S),
+ ets:delete(PeerT, TPid);
+
+watchdog(_, [], _, _, _, _) ->
+ ok.
%%% ---------------------------------------------------------------------------
%%% # connection_up/3
@@ -977,72 +939,61 @@ fetch(Tid, Key) ->
%% Watchdog process has reached state OKAY.
-connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT,
- connT = ConnT}
- = S) ->
- P = fetch(PeerT, Pid),
- C = #conn{pid = TPid,
- apps = SApps,
- caps = Caps,
- peer = Pid},
-
- insert(ConnT, C),
- connection_up([Pkt], P#peer{conn = TPid}, C, S).
+connection_up({TPid, {Caps, SApps, Pkt}},
+ #watchdog{pid = Pid}
+ = Wd,
+ #state{peerT = PeerT}
+ = S) ->
+ Pr = #peer{pid = TPid,
+ apps = SApps,
+ caps = Caps,
+ watchdog = Pid},
+ insert(PeerT, Pr),
+ connection_up([Pkt], Wd#watchdog{peer = TPid}, Pr, S).
%%% ---------------------------------------------------------------------------
%%% # reopen/3
%%% ---------------------------------------------------------------------------
-%% Note that this connection_up/3 rewrites the same #conn{} now
-%% written here. Both do so in case reopen has not happened in old
-%% code.
-
-reopen(Pid, {TPid, {Caps, SApps, _Pkt}}, #state{peerT = PeerT,
- connT = ConnT}) ->
- P = fetch(PeerT, Pid),
- C = #conn{pid = TPid,
- apps = SApps,
- caps = Caps,
- peer = Pid},
-
- insert(ConnT, C),
- #peer{op_state = {?STATE_DOWN, _}}
- = P,
- insert(PeerT, P#peer{op_state = {?STATE_DOWN, ?WD_REOPEN},
- conn = TPid}).
+reopen({TPid, {Caps, SApps, _Pkt}},
+ #watchdog{pid = Pid}
+ = Wd,
+ #state{watchdogT = WatchdogT,
+ peerT = PeerT}) ->
+ insert(PeerT, #peer{pid = TPid,
+ apps = SApps,
+ caps = Caps,
+ watchdog = Pid}),
+ insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN,
+ peer = TPid}).
%%% ---------------------------------------------------------------------------
%%% # connection_up/2
%%% ---------------------------------------------------------------------------
-%% Peer process has transitioned back into the open state. Note that there
-%% has been no new capabilties exchange in this case.
+%% Watchdog has recovered as suspect connection. Note that there has
+%% been no new capabilties exchange in this case.
-connection_up(Pid, #state{peerT = PeerT,
- connT = ConnT}
- = S) ->
- #peer{conn = TPid} = P = fetch(PeerT, Pid),
- C = fetch(ConnT, TPid),
- connection_up([], P, C, S).
+connection_up(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
+ connection_up([], Wd, fetch(PeerT, TPid), S).
%% connection_up/4
-connection_up(T, P, C, #state{peerT = PeerT,
- local_peers = LDict,
- service_name = SvcName,
- service
- = #diameter_service{applications = Apps}}
- = S) ->
- #peer{conn = TPid, op_state = {?STATE_DOWN, _}}
- = P,
- #conn{apps = SApps, caps = Caps}
- = C,
-
- insert(PeerT, P#peer{op_state = {?STATE_UP, ?WD_OKAY}}),
-
+connection_up(Extra,
+ #watchdog{peer = TPid}
+ = Wd,
+ #peer{apps = SApps, caps = Caps}
+ = Pr,
+ #state{watchdogT = WatchdogT,
+ local_peers = LDict,
+ service_name = SvcName,
+ service
+ = #diameter_service{applications = Apps}}
+ = S) ->
+ insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}),
request_peer_up(TPid),
insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
- report_status(up, P, C, S, T).
+ report_status(up, Wd, Pr, S, Extra).
insert_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps).
@@ -1078,41 +1029,38 @@ peer_cb(MFA, Alias) ->
end.
%%% ---------------------------------------------------------------------------
-%%% # connection_down/2
+%%% # connection_down/3
%%% ---------------------------------------------------------------------------
-%% Watchdog has transitioned out of state OKAY.
-
-connection_down(Pid, #state{peerT = PeerT,
- connT = ConnT}
- = S) ->
- #peer{op_state = {?STATE_UP, WS}, %% assert
- conn = TPid}
- = P
- = fetch(PeerT, Pid),
-
- C = fetch(ConnT, TPid),
- insert(PeerT, P#peer{op_state = {?STATE_DOWN, WS}}),
- connection_down(P,C,S).
-
-%% connection_down/3
-
-connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, _) ->
- ok;
-
-connection_down(#peer{conn = TPid,
- op_state = {?STATE_UP, _}}
- = P,
- #conn{caps = Caps,
+connection_down(#watchdog{state = ?WD_OKAY,
+ peer = TPid}
+ = Wd,
+ #peer{caps = Caps,
apps = SApps}
- = C,
+ = Pr,
#state{service_name = SvcName,
service = #diameter_service{applications = Apps},
local_peers = LDict}
= S) ->
- report_status(down, P, C, S, []),
+ report_status(down, Wd, Pr, S, []),
remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
- request_peer_down(TPid, S).
+ request_peer_down(TPid, S);
+
+connection_down(#watchdog{}, #peer{}, _) ->
+ ok;
+
+connection_down(#watchdog{state = WS,
+ peer = TPid}
+ = Wd,
+ To,
+ #state{watchdogT = WatchdogT,
+ peerT = PeerT}
+ = S)
+ when is_atom(To) ->
+ insert(WatchdogT, Wd#watchdog{state = To}),
+ ?WD_OKAY == WS
+ andalso
+ connection_down(Wd, fetch(PeerT, TPid), S).
remove_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps).
@@ -1130,64 +1078,51 @@ down_conn(Id, Alias, TC, {SvcName, Apps}) ->
peer_cb({ModX, peer_down, [SvcName, TC]}, Alias).
%%% ---------------------------------------------------------------------------
-%%% # peer_down/3
+%%% # watchdog_down/2
%%% ---------------------------------------------------------------------------
%% Watchdog process has died.
-peer_down(Pid, Reason, #state{peerT = PeerT} = S) ->
- P = fetch(PeerT, Pid),
- ets:delete_object(PeerT, P),
- closed(Reason, P, S),
- restart(P,S),
- peer_down(P,S).
-
-%% Send an event at connection establishment failure.
-closed({shutdown, {close, _TPid, Reason}},
- #peer{op_state = {?STATE_DOWN, _},
- ref = Ref,
- type = Type,
- options = Opts},
- #state{service_name = SvcName}) ->
- send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}});
-closed(_, _, _) ->
- ok.
+watchdog_down(Pid, #state{watchdogT = WatchdogT} = S) ->
+ Wd = fetch(WatchdogT, Pid),
+ ets:delete_object(WatchdogT, Wd),
+ restart(Wd,S),
+ wd_down(Wd,S).
-%% The watchdog has never reached OKAY ...
-peer_down(#peer{conn = B}, _)
+%% Watchdog has never reached OKAY ...
+wd_down(#watchdog{peer = B}, _)
when is_boolean(B) ->
ok;
%% ... or maybe it has.
-peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) ->
- #conn{} = C = fetch(ConnT, TPid),
- ets:delete_object(ConnT, C),
- connection_down(P,C,S).
+wd_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
+ connection_down(Wd, ?WD_DOWN, S),
+ ets:delete(PeerT, TPid).
%% restart/2
-restart(P,S) ->
- q_restart(restart(P), S).
+restart(Wd, S) ->
+ q_restart(restart(Wd), S).
%% restart/1
%% Always try to reconnect.
-restart(#peer{ref = Ref,
- type = connect = T,
- options = Opts,
- started = Time}) ->
+restart(#watchdog{ref = Ref,
+ type = connect = T,
+ options = Opts,
+ started = Time}) ->
{Time, {Ref, T, Opts}};
%% Transport connection hasn't yet been accepted ...
-restart(#peer{ref = Ref,
- type = accept = T,
- options = Opts,
- conn = false,
- started = Time}) ->
+restart(#watchdog{ref = Ref,
+ type = accept = T,
+ options = Opts,
+ peer = false,
+ started = Time}) ->
{Time, {Ref, T, Opts}};
%% ... or it has: a replacement has already been spawned.
-restart(#peer{type = accept}) ->
+restart(#watchdog{type = accept}) ->
false.
%% q_restart/2
@@ -1248,14 +1183,13 @@ tc(false = No, _, _) -> %% removed
%% the accepting watchdog upon reception of a CER from the previously
%% connected peer, or us after reconnect_timer timeout.
-close(Pid, #state{service_name = SvcName,
- peerT = PeerT}) ->
- #peer{pid = Pid,
- type = accept,
- ref = Ref,
- options = Opts}
- = fetch(PeerT, Pid),
-
+close(#watchdog{type = connect}, _) ->
+ ok;
+close(#watchdog{type = accept,
+ pid = Pid,
+ ref = Ref,
+ options = Opts},
+ #state{service_name = SvcName}) ->
c(Pid, diameter_config:have_transport(SvcName, Ref), Opts).
%% Tell watchdog to (maybe) die later ...
@@ -1278,11 +1212,11 @@ c(Pid, false, _Opts) ->
%%% ---------------------------------------------------------------------------
reconnect(Pid, #state{service_name = SvcName,
- peerT = PeerT}) ->
- #peer{ref = Ref,
- type = connect,
- options = Opts}
- = fetch(PeerT, Pid),
+ watchdogT = WatchdogT}) ->
+ #watchdog{ref = Ref,
+ type = connect,
+ options = Opts}
+ = fetch(WatchdogT, Pid),
send_event(SvcName, {reconnect, Ref, Opts}).
%%% ---------------------------------------------------------------------------
@@ -1742,6 +1676,8 @@ have_request(Pkt, TPid) ->
%% request_peer_up/1
+%% Insert an element that is used to detect whether or not there has
+%% been a failover when inserting an outgoing request.
request_peer_up(TPid) ->
ets:insert(?REQUEST_TABLE, {TPid}).
@@ -1760,8 +1696,8 @@ request_peer_down(TPid, S) ->
%%% recv_request/3
%%% ---------------------------------------------------------------------------
-recv_request(TPid, Pkt, {ConnT, SvcName, Apps, Mask}) ->
- try ets:lookup(ConnT, TPid) of
+recv_request(TPid, Pkt, {PeerT, SvcName, Apps, Mask}) ->
+ try ets:lookup(PeerT, TPid) of
[C] ->
recv_request(C, TPid, Pkt, SvcName, Apps, Mask);
[] -> %% transport has gone down
@@ -1773,7 +1709,7 @@ recv_request(TPid, Pkt, {ConnT, SvcName, Apps, Mask}) ->
%% recv_request/5
-recv_request(#conn{apps = SApps, caps = Caps},
+recv_request(#peer{apps = SApps, caps = Caps},
TPid,
Pkt,
SvcName,
@@ -2593,11 +2529,11 @@ rt(#request{packet = #diameter_packet{msg = Msg},
%%% ---------------------------------------------------------------------------
report_status(Status,
- #peer{ref = Ref,
- conn = TPid,
- type = Type,
- options = Opts},
- #conn{apps = [_|_] = As,
+ #watchdog{ref = Ref,
+ peer = TPid,
+ type = Type,
+ options = Opts},
+ #peer{apps = [_|_] = As,
caps = Caps},
#state{service_name = SvcName}
= S,
@@ -2676,7 +2612,7 @@ rpd(Pid, Alias, PDict) ->
%%% ---------------------------------------------------------------------------
%%% find_transport/[34]
%%%
-%%% Output: {TransportPid, #diameter_caps{}, #diameter_app{}}
+%%% Return: {TransportPid, #diameter_caps{}, #diameter_app{}}
%%% | false
%%% | {error, Reason}
%%% ---------------------------------------------------------------------------
@@ -2814,7 +2750,7 @@ avp_decode(_, _, #diameter_avp{value = V}) ->
%%% ---------------------------------------------------------------------------
%%% # pick_peer(App, [DestRealm, DestHost], Filter, #state{})
%%%
-%%% Output: {TransportPid, #diameter_caps{}, App}
+%%% Return: {TransportPid, #diameter_caps{}, App}
%%% | false
%%% | {error, Reason}
%%% ---------------------------------------------------------------------------
@@ -2966,8 +2902,8 @@ eq(Any, Id, PeerId) ->
%% transports/1
-transports(#state{peerT = PeerT}) ->
- ets:select(PeerT, [{#peer{conn = '$1', _ = '_'},
+transports(#state{watchdogT = WatchdogT}) ->
+ ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'},
[{'is_pid', '$1'}],
['$1']}]).
@@ -3021,11 +2957,12 @@ tagged_info(Item, S)
undefined
end;
-tagged_info(TPid, #state{peerT = PT, connT = CT})
+tagged_info(TPid, #state{watchdogT = WatchdogT, peerT = PeerT})
when is_pid(TPid) ->
try
- [#conn{peer = Pid}] = ets:lookup(CT, TPid),
- [#peer{ref = Ref, type = Type, options = Opts}] = ets:lookup(PT, Pid),
+ [#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid),
+ [#watchdog{ref = Ref, type = Type, options = Opts}]
+ = ets:lookup(WatchdogT, Pid),
[{ref, Ref},
{type, Type},
{options, Opts}]
@@ -3108,11 +3045,11 @@ complete(Pre) ->
%% info_stats/1
-info_stats(#state{peerT = PeerT}) ->
- MatchSpec = [{#peer{ref = '$1', conn = '$2', _ = '_'},
+info_stats(#state{watchdogT = WatchdogT}) ->
+ MatchSpec = [{#watchdog{ref = '$1', peer = '$2', _ = '_'},
[{'is_pid', '$2'}],
[['$1', '$2']]}],
- try ets:select(PeerT, MatchSpec) of
+ try ets:select(WatchdogT, MatchSpec) of
L ->
diameter_stats:read(lists:append(L))
catch
@@ -3122,7 +3059,8 @@ info_stats(#state{peerT = PeerT}) ->
%% info_transport/1
%%
%% One entry per configured transport. Statistics for each entry are
-%% the accumulated values for the ref and associated peer pids.
+%% the accumulated values for the ref and associated watchdog/peer
+%% pids.
info_transport(S) ->
PeerD = peer_dict(S, config_dict(S)),
@@ -3155,43 +3093,42 @@ transport([[_,_] | L]) ->
%% Possibly many peer entries for a listening transport. Note that all
%% have the same options by construction, which is not terribly space
-%% efficient. (TODO: all entries for the same Ref should share options.)
+%% efficient.
transport([[{type, accept}, {options, Opts} | _] | _] = Ls) ->
[{type, listen},
{options, Opts},
{accept, [lists:nthtail(2,L) || L <- Ls]}].
-peer_dict(#state{peerT = PeerT, connT = ConnT}, Dict0) ->
- try ets:tab2list(PeerT) of
+peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) ->
+ try ets:tab2list(WatchdogT) of
L ->
- lists:foldl(fun(T,A) -> peer_acc(ConnT, A, T) end, Dict0, L)
+ lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L)
catch
error: badarg -> Dict0 %% service has gone down
end.
-peer_acc(ConnT, Acc, #peer{pid = Pid,
- type = Type,
- ref = Ref,
- options = Opts,
- op_state = OS,
- started = T,
- conn = TPid}) ->
- WS = wd_state(OS),
+peer_acc(PeerT, Acc, #watchdog{pid = Pid,
+ type = Type,
+ ref = Ref,
+ options = Opts,
+ state = WS,
+ started = At,
+ peer = TPid}) ->
dict:append(Ref,
[{type, Type},
{options, Opts},
- {watchdog, {Pid, T, WS}}
- | info_conn(ConnT, TPid, WS /= ?WD_DOWN)],
+ {watchdog, {Pid, At, WS}}
+ | info_peer(PeerT, TPid, WS)],
Acc).
-info_conn(ConnT, TPid, true)
- when is_pid(TPid) ->
- try ets:lookup(ConnT, TPid) of
- T -> info_conn(T)
+info_peer(PeerT, TPid, WS)
+ when is_pid(TPid), WS /= ?WD_DOWN ->
+ try ets:lookup(PeerT, TPid) of
+ T -> info_peer(T)
catch
error: badarg -> [] %% service has gone down
end;
-info_conn(_, _, _) ->
+info_peer(_, _, _) ->
[].
%% The point of extracting the config here is so that 'transport' info
@@ -3210,19 +3147,12 @@ config_acc({Ref, T, Opts}, Dict)
config_acc(_, Dict) ->
Dict.
-wd_state({_,S}) ->
- S;
-wd_state(?STATE_UP) ->
- ?WD_OKAY;
-wd_state(?STATE_DOWN) ->
- ?WD_DOWN.
-
-info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
+info_peer([#peer{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
[{peer, {Pid, T}},
{apps, SApps},
{caps, info_caps(Caps)}
| try [{port, info_port(Pid)}] catch _:_ -> [] end];
-info_conn([] = No) ->
+info_peer([] = No) ->
No.
%% Extract information that the processes involved are expected to
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index a5429c967c..10ab246b88 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -62,11 +62,13 @@
restrict :: {diameter:restriction(), boolean()},
shutdown = false :: boolean()}).
+%% ---------------------------------------------------------------------------
%% start/2
%%
%% Start a monitor before the watchdog is allowed to proceed to ensure
%% that a failed capabilities exchange produces the desired exit
%% reason.
+%% ---------------------------------------------------------------------------
-spec start(Type, {RecvData, [Opt], SvcName, SvcOpts, #diameter_service{}})
-> {reference(), pid()}
@@ -77,12 +79,12 @@
SvcName :: diameter:service_name().
start({_,_} = Type, T) ->
- Ref = make_ref(),
- {ok, Pid} = diameter_watchdog_sup:start_child({Ref, {Type, self(), T}}),
+ Ack = make_ref(),
+ {ok, Pid} = diameter_watchdog_sup:start_child({Ack, Type, self(), T}),
try
{erlang:monitor(process, Pid), Pid}
after
- Pid ! Ref
+ send(Pid, Ack)
end.
start_link(T) ->
@@ -101,22 +103,15 @@ init(T) ->
proc_lib:init_ack({ok, self()}),
gen_server:enter_loop(?MODULE, [], i(T)).
-i({Ref, {_, Pid, _} = T}) ->
- MRef = erlang:monitor(process, Pid),
- receive
- Ref ->
- make_state(T);
- {'DOWN', MRef, process, _, _} = D ->
- exit({shutdown, D})
- end.
-
-make_state({T, Pid, {RecvData,
- Opts,
- SvcName,
- SvcOpts,
- #diameter_service{applications = Apps,
- capabilities = Caps}
- = Svc}}) ->
+i({Ack, T, Pid, {RecvData,
+ Opts,
+ SvcName,
+ SvcOpts,
+ #diameter_service{applications = Apps,
+ capabilities = Caps}
+ = Svc}}) ->
+ erlang:monitor(process, Pid),
+ wait(Ack, Pid),
random:seed(now()),
putr(restart, {T, Opts, Svc}), %% save seeing it in trace
putr(dwr, dwr(Caps)), %%
@@ -124,9 +119,7 @@ make_state({T, Pid, {RecvData,
Restrict = proplists:get_value(restrict_connections, SvcOpts),
Nodes = restrict_nodes(Restrict),
#watchdog{parent = Pid,
- transport = monitor(diameter_peer_fsm:start(T,
- Opts,
- {Mask, Nodes, Svc})),
+ transport = start(T, Opts, Mask, Nodes, Svc),
tw = proplists:get_value(watchdog_timer,
Opts,
?DEFAULT_TW_INIT),
@@ -134,6 +127,21 @@ make_state({T, Pid, {RecvData,
sequence = Mask,
restrict = {Restrict, lists:member(node(), Nodes)}}.
+wait(Ref, Pid) ->
+ receive
+ Ref ->
+ ok;
+ {'DOWN', _, process, Pid, _} = D ->
+ exit({shutdown, D})
+ end.
+
+%% start/5
+
+start(T, Opts, Mask, Nodes, Svc) ->
+ {_MRef, Pid}
+ = diameter_peer_fsm:start(T, Opts, {Mask, Nodes, Svc}),
+ Pid.
+
%% handle_call/3
handle_call(_, _, State) ->
@@ -151,41 +159,59 @@ handle_info(T, #watchdog{} = State) ->
ok ->
{noreply, State};
#watchdog{} = S ->
- event(State, S),
+ close(T, State), %% service expects 'close' message
+ event(T, State, S), %% before 'watchdog'
{noreply, S};
stop ->
?LOG(stop, T),
- event(State, State#watchdog{status = down}),
+ event(T, State, State#watchdog{status = down}),
{stop, {shutdown, T}, State}
- end;
+ end.
-handle_info(T, S) ->
- handle_info(T, upgrade(S)).
+close({'DOWN', _, process, TPid, {shutdown, Reason}},
+ #watchdog{transport = TPid,
+ parent = Pid}) ->
+ send(Pid, {close, self(), Reason});
-upgrade(S) ->
- #watchdog{} = list_to_tuple(tuple_to_list(S)
- ++ [?NOMASK, {nodes, true}, false]).
+close(_, _) ->
+ ok.
-event(#watchdog{status = T}, #watchdog{status = T}) ->
+event(_, #watchdog{status = T}, #watchdog{status = T}) ->
ok;
-event(#watchdog{transport = undefined}, #watchdog{transport = undefined}) ->
+event(_, #watchdog{transport = undefined}, #watchdog{transport = undefined}) ->
ok;
-event(#watchdog{status = From, transport = F, parent = Pid},
+event(Msg,
+ #watchdog{status = From, transport = F, parent = Pid},
#watchdog{status = To, transport = T}) ->
- E = {tpid(F,T), From, To},
- notify(Pid, E),
+ TPid = tpid(F,T),
+ E = {[TPid | data(Msg, TPid, From, To)], From, To},
+ send(Pid, {watchdog, self(), E}),
?LOG(transition, {self(), E}).
+data(Msg, TPid, reopen, okay) ->
+ {recv, TPid, 'DWA', _Pkt} = Msg, %% assert
+ {TPid, T} = eraser(open),
+ [T];
+
+data({open, TPid, _Hosts, T}, TPid, _From, To)
+ when To == okay;
+ To == reopen ->
+ [T];
+
+data(_, _, _, _) ->
+ [].
+
tpid(_, Pid)
when is_pid(Pid) ->
Pid;
+
tpid(Pid, _) ->
Pid.
-notify(Pid, E) ->
- Pid ! {watchdog, self(), E}.
+send(Pid, T) ->
+ Pid ! T.
%% terminate/2
@@ -215,15 +241,13 @@ transition(close, #watchdog{}) ->
ok;
%% Service is asking for the peer to be taken down gracefully.
-transition({shutdown, Pid}, #watchdog{parent = Pid,
- transport = undefined,
- status = S}) ->
- down = S, %% sanity check
+transition({shutdown, Pid, _}, #watchdog{parent = Pid,
+ transport = undefined}) ->
stop;
-transition({shutdown = T, Pid}, #watchdog{parent = Pid,
- transport = TPid}
- = S) ->
- TPid ! {T, self()},
+transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid,
+ transport = TPid}
+ = S) ->
+ send(TPid, {T, self(), Reason}),
S#watchdog{shutdown = true};
%% Parent process has died,
@@ -234,13 +258,9 @@ transition({'DOWN', _, process, Pid, _Reason},
%% Transport has accepted a connection.
transition({accepted = T, TPid}, #watchdog{transport = TPid,
parent = Pid}) ->
- Pid ! {T, self(), TPid},
+ send(Pid, {T, self(), TPid}),
ok;
-%% Transport is telling us that its impending death isn't failure.
-transition({close, TPid, _Reason}, #watchdog{transport = TPid}) ->
- stop;
-
%% STATE Event Actions New State
%% ===== ------ ------- ----------
%% INITIAL Connection up SetWatchdog() OKAY
@@ -255,15 +275,13 @@ transition({close, TPid, _Reason}, #watchdog{transport = TPid}) ->
%% know the identity of the peer (ie. now) that we know that we're in
%% state down rather than initial.
-transition({open, TPid, Hosts, T} = Open,
+transition({open, TPid, Hosts, _} = Open,
#watchdog{transport = TPid,
status = initial,
- parent = Pid,
restrict = {_, R}}
= S) ->
case okay(getr(restart), Hosts, R) of
okay ->
- open(Pid, {TPid, T}),
set_watchdog(S#watchdog{status = okay});
reopen ->
transition(Open, S#watchdog{status = down})
@@ -274,17 +292,15 @@ transition({open, TPid, Hosts, T} = Open,
%% SetWatchdog()
%% Pending = TRUE REOPEN
-transition({open = P, TPid, _Hosts, T},
+transition({open = Key, TPid, _Hosts, T},
#watchdog{transport = TPid,
- parent = Pid,
status = down}
= S) ->
%% Store the info we need to notify the parent to reopen the
%% connection after the requisite DWA's are received, at which
%% time we eraser(open). The reopen message is a later addition,
%% to communicate the new capabilities as soon as they're known.
- putr(P, {TPid, T}),
- Pid ! {reopen, self(), {TPid, T}},
+ putr(Key, {TPid, T}),
set_watchdog(send_watchdog(S#watchdog{status = reopen,
num_dwa = 0}));
@@ -296,26 +312,18 @@ transition({open = P, TPid, _Hosts, T},
%% REOPEN Connection down CloseConnection()
%% SetWatchdog() DOWN
-transition({'DOWN', _, process, TPid, _},
+transition({'DOWN', _, process, TPid, _Reason},
#watchdog{transport = TPid,
- status = S,
- shutdown = D})
- when S == initial;
- D ->
+ shutdown = true}) ->
stop;
-transition({'DOWN', _, process, TPid, _},
- #watchdog{transport = TPid}
+transition({'DOWN', _, process, TPid, _Reason},
+ #watchdog{transport = TPid,
+ status = T}
= S) ->
- failover(S),
- close(S),
- set_watchdog(S#watchdog{status = down,
+ set_watchdog(S#watchdog{status = case T of initial -> T; _ -> down end,
pending = false,
transport = undefined});
-%% Any outstanding pending (or other messages from the transport) will
-%% have arrived before 'DOWN' since the message comes from the same
-%% process. Note that we could also get this message in the initial
-%% state.
%% Incoming message.
transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) ->
@@ -331,15 +339,11 @@ transition({timeout, _, tw}, #watchdog{}) ->
%% State query.
transition({state, Pid}, #watchdog{status = S}) ->
- Pid ! {self(), S},
+ send(Pid, {self(), S}),
ok.
%% ===========================================================================
-monitor(Pid) ->
- erlang:monitor(process, Pid),
- Pid.
-
putr(Key, Val) ->
put({?MODULE, Key}, Val).
@@ -386,7 +390,7 @@ okay([{_,P}]) ->
%% ... or it has.
okay(C) ->
- [_|_] = [P ! close || {_,P} <- C, self() /= P],
+ [_|_] = [send(P, close) || {_,P} <- C, self() /= P],
reopen.
%% set_watchdog/1
@@ -408,36 +412,13 @@ tw(T)
tw({M,F,A}) ->
apply(M,F,A).
-%% open/2
-
-open(Pid, {_,_} = T) ->
- Pid ! {connection_up, self(), T}.
-
-%% failover/1
-
-failover(#watchdog{status = okay,
- parent = Pid}) ->
- Pid ! {connection_down, self()};
-
-failover(_) ->
- ok.
-
-%% close/1
-
-close(#watchdog{status = down}) ->
- ok;
-
-close(#watchdog{parent = Pid}) ->
- {{T, _}, _, _} = getr(restart),
- T == accept andalso (Pid ! {close, self()}).
-
%% send_watchdog/1
send_watchdog(#watchdog{pending = false,
transport = TPid,
sequence = Mask}
= S) ->
- TPid ! {send, encode(getr(dwr), Mask)},
+ send(TPid, {send, encode(getr(dwr), Mask)}),
?LOG(send, 'DWR'),
S#watchdog{pending = true}.
@@ -518,12 +499,10 @@ rcv(_, #watchdog{status = okay} = S) ->
%% SetWatchdog() OKAY
rcv('DWA', #watchdog{status = suspect} = S) ->
- failback(S),
set_watchdog(S#watchdog{status = okay,
pending = false});
rcv(_, #watchdog{status = suspect} = S) ->
- failback(S),
set_watchdog(S#watchdog{status = okay});
%% REOPEN Receive DWA & Pending = FALSE
@@ -531,10 +510,8 @@ rcv(_, #watchdog{status = suspect} = S) ->
%% Failback() OKAY
rcv('DWA', #watchdog{status = reopen,
- num_dwa = 2 = N,
- parent = Pid}
+ num_dwa = 2 = N}
= S) ->
- open(Pid, eraser(open)),
S#watchdog{status = okay,
num_dwa = N+1,
pending = false};
@@ -553,11 +530,6 @@ rcv('DWA', #watchdog{status = reopen,
rcv(_, #watchdog{status = reopen} = S) ->
throwaway(S).
-%% failback/1
-
-failback(#watchdog{parent = Pid}) ->
- Pid ! {connection_up, self()}.
-
%% timeout/1
%%
%% The caller sets the watchdog on the return value.
@@ -582,7 +554,6 @@ timeout(#watchdog{status = T,
timeout(#watchdog{status = okay,
pending = true}
= S) ->
- failover(S),
S#watchdog{status = suspect};
%% SUSPECT Timer expires CloseConnection()
@@ -599,7 +570,6 @@ timeout(#watchdog{status = T,
when T == suspect;
T == reopen, P, N < 0 ->
exit(TPid, {shutdown, watchdog_timeout}),
- close(S),
S#watchdog{status = down};
%% REOPEN Timer expires & NumDWA = -1
@@ -633,7 +603,9 @@ timeout(#watchdog{status = reopen,
%% process has died. We only need to handle state down since we start
%% the first watchdog when transitioning out of initial.
-timeout(#watchdog{status = down} = S) ->
+timeout(#watchdog{status = T} = S)
+ when T == initial;
+ T == down ->
restart(S).
%% restart/1
@@ -659,11 +631,9 @@ restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid,
sequence = Mask,
restrict = {R,_}}
= S) ->
- Pid ! {reconnect, self()},
+ send(Pid, {reconnect, self()}),
Nodes = restrict_nodes(R),
- S#watchdog{transport = monitor(diameter_peer_fsm:start(T,
- Opts,
- {Mask, Nodes, Svc})),
+ S#watchdog{transport = start(T, Opts, Mask, Nodes, Svc),
restrict = {R, lists:member(node(), Nodes)}};
%% No restriction on the number of connections to the same peer: just