aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base')
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl57
-rw-r--r--lib/diameter/src/base/diameter_service.erl52
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl48
3 files changed, 65 insertions, 92 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index b515a599ed..d593d0ab84 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -126,12 +126,7 @@
%% State Machine rather than closer to the transport. This is what we
%% now do below: connect/accept call diameter_watchdog and return the
%% pid of the watchdog process, and the watchdog in turn calls start/3
-%% below to start the process implementing the Peer State Machine. The
-%% former is a "peer" in diameter_service while the latter is a
-%% "conn". In a sense, diameter_service sees the watchdog as
-%% implementing the Peer State Machine and the process implemented
-%% here as being the transport, not being aware of the watchdog at
-%% all.
+%% below to start the process implementing the Peer State Machine.
%%
%%% ---------------------------------------------------------------------------
@@ -274,13 +269,12 @@ handle_info(T, #state{} = State) ->
{noreply, S};
{stop, Reason} ->
?LOG(stop, Reason),
- x(Reason, State);
+ {stop, {shutdown, Reason}, State};
stop ->
?LOG(stop, T),
- x(T, State)
+ {stop, {shutdown, T}, State}
catch
exit: {diameter_codec, encode, _} = Reason ->
- close_wd(Reason, State#state.parent),
?LOG(stop, Reason),
%% diameter_codec:encode/2 emits an error report. Only
%% indicate the probable reason here.
@@ -300,10 +294,6 @@ handle_info(T, #state{} = State) ->
%% succesfully encoded. It's not checked at diameter:add_transport/2
%% since this can be called before creating the service.
-x(Reason, #state{} = S) ->
- close_wd(Reason, S),
- {stop, {shutdown, Reason}, S}.
-
%% terminate/2
terminate(_, _) ->
@@ -378,9 +368,8 @@ transition({diameter, {recv, Pkt}}, S) ->
recv(Pkt, S);
%% Timeout when still in the same state ...
-transition({timeout = T, PS}, #state{state = PS} = S) ->
- close({capx(PS), T}, S),
- stop;
+transition({timeout = T, PS}, #state{state = PS}) ->
+ {stop, {capx(PS), T}};
%% ... or not.
transition({timeout, _}, _) ->
@@ -457,7 +446,7 @@ send_CER(#state{state = {'Wait-Conn-Ack', Tmo},
OH = LCaps#diameter_caps.origin_host,
req_send_CER(OH, Remote)
orelse
- close({already_connected, Remote, LCaps}, S),
+ close({already_connected, Remote, LCaps}),
CER = build_CER(S),
?LOG(send, 'CER'),
#diameter_packet{header = #diameter_header{end_to_end_id = Eid,
@@ -690,17 +679,17 @@ cea(CEA, RC) ->
CEA#diameter_base_CEA{'Result-Code' = RC}.
post('CER' = T, RC, Pkt, S) ->
- [fun close/2, {T, caps(S), {RC, Pkt}}];
+ [fun(_) -> close({T, caps(S), {RC, Pkt}}) end];
post(_, _, _, _) ->
ok.
rejected({capabilities_cb, _F, Reason}, T, S) ->
rejected(Reason, T, S);
-rejected(discard, T, S) ->
- close(T, S);
+rejected(discard, T, _) ->
+ close(T);
rejected({N, Es}, T, S) ->
- {answer('CER', N, Es, S), [fun close/2, T]};
+ {answer('CER', N, Es, S), [fun(_) -> close(T) end]};
rejected(N, T, S) ->
rejected({N, []}, T, S).
@@ -860,7 +849,7 @@ handle_CEA(#diameter_packet{bin = Bin}
of
_ -> open(DPkt, SApps, Caps, {connect, hd([_] = IS)}, S)
catch
- ?FAILURE(Reason) -> close({'CEA', Reason, Caps, DPkt}, S)
+ ?FAILURE(Reason) -> close({'CEA', Reason, Caps, DPkt})
end.
%% Check more than the result code since the peer could send success
%% regardless. If not 2001 then a peer_up callback could do anything
@@ -880,7 +869,7 @@ recv_CEA(#diameter_packet{header = #diameter_header{version
T;
recv_CEA(Pkt, S) ->
- close({'CEA', caps(S), Pkt}, S).
+ close({'CEA', caps(S), Pkt}).
caps(#diameter_service{capabilities = Caps}) ->
Caps;
@@ -933,14 +922,14 @@ open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid,
%% We've advertised TLS support: tell the transport the result
%% and expect a reply when the handshake is complete.
-tls_ack(true, Caps, Type, IS, #state{transport = TPid} = S) ->
+tls_ack(true, Caps, Type, IS, #state{transport = TPid}) ->
Ref = make_ref(),
TPid ! {diameter, {tls, Ref, Type, IS == ?TLS}},
receive
{diameter, {tls, Ref}} ->
ok;
{'DOWN', _, process, TPid, Reason} ->
- close({tls_ack, Reason, Caps}, S)
+ close({tls_ack, Reason, Caps})
end;
%% Or not. Don't send anything to the transport so that transports
@@ -953,25 +942,11 @@ capz(#diameter_caps{} = L, #diameter_caps{} = R) ->
= list_to_tuple([diameter_caps | lists:zip(tl(tuple_to_list(L)),
tl(tuple_to_list(R)))]).
-%% close/2
+%% close/1
-%% Tell the watchdog that our death isn't due to transport failure.
-close(Reason, #state{parent = Pid}) ->
- close_wd(Reason, Pid),
+close(Reason) ->
throw({?MODULE, close, Reason}).
-%% close_wd/2
-
-%% Ensure the watchdog dies if DPR has been sent ...
-close_wd(_, #state{dpr = false}) ->
- ok;
-close_wd(Reason, #state{parent = Pid}) ->
- close_wd(Reason, Pid);
-
-%% ... or otherwise
-close_wd(Reason, Pid) ->
- Pid ! {close, self(), Reason}.
-
%% dwa/1
dwa(#diameter_caps{origin_host = OH,
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index 4d21d28512..3cab914fdb 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -509,6 +509,19 @@ transition({reconnect, Pid}, S) ->
reconnect(Pid, S),
ok;
+%% Watchdog is sending notification of transport death.
+transition({close, Pid, Reason}, #state{service_name = SvcName,
+ watchdogT = WatchdogT}) ->
+ #watchdog{state = WS,
+ ref = Ref,
+ type = Type,
+ options = Opts}
+ = fetch(WatchdogT, Pid),
+ WS /= ?WD_OKAY
+ andalso
+ send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}),
+ ok;
+
%% Watchdog is sending notification of a state transition.
transition({watchdog, Pid, {[TPid | Data], From, To}},
#state{service_name = SvcName,
@@ -532,9 +545,9 @@ transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) ->
{stop, {monitor, Reason}};
%% Local watchdog process has died.
-transition({'DOWN', _, process, Pid, Reason}, S)
+transition({'DOWN', _, process, Pid, _Reason}, S)
when node(Pid) == node() ->
- peer_down(Pid, Reason, S),
+ watchdog_down(Pid, S),
ok;
%% Remote service wants to know about shared peers.
@@ -1065,44 +1078,31 @@ down_conn(Id, Alias, TC, {SvcName, Apps}) ->
peer_cb({ModX, peer_down, [SvcName, TC]}, Alias).
%%% ---------------------------------------------------------------------------
-%%% # peer_down/3
+%%% # watchdog_down/2
%%% ---------------------------------------------------------------------------
%% Watchdog process has died.
-peer_down(Pid, Reason, #state{watchdogT = WatchdogT} = S) ->
- P = fetch(WatchdogT, Pid),
- ets:delete_object(WatchdogT, P),
- closed(Reason, P, S),
- restart(P,S),
- peer_down(P,S).
-
-%% Send an event at connection establishment failure.
-closed({shutdown, {close, _TPid, Reason}},
- #watchdog{state = WS,
- ref = Ref,
- type = Type,
- options = Opts},
- #state{service_name = SvcName})
- when WS /= ?WD_OKAY ->
- send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}});
-closed(_, _, _) ->
- ok.
+watchdog_down(Pid, #state{watchdogT = WatchdogT} = S) ->
+ Wd = fetch(WatchdogT, Pid),
+ ets:delete_object(WatchdogT, Wd),
+ restart(Wd,S),
+ wd_down(Wd,S).
-%% The watchdog has never reached OKAY ...
-peer_down(#watchdog{peer = B}, _)
+%% Watchdog has never reached OKAY ...
+wd_down(#watchdog{peer = B}, _)
when is_boolean(B) ->
ok;
%% ... or maybe it has.
-peer_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
+wd_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
connection_down(Wd, ?WD_DOWN, S),
ets:delete(PeerT, TPid).
%% restart/2
-restart(P,S) ->
- q_restart(restart(P), S).
+restart(Wd, S) ->
+ q_restart(restart(Wd), S).
%% restart/1
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index 8d10755180..ece5f0e359 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -82,7 +82,7 @@ start({_,_} = Type, T) ->
try
{erlang:monitor(process, Pid), Pid}
after
- Pid ! Ref
+ send(Pid, Ref)
end.
start_link(T) ->
@@ -151,7 +151,8 @@ handle_info(T, #watchdog{} = State) ->
ok ->
{noreply, State};
#watchdog{} = S ->
- event(T, State, S),
+ close(T, State), %% service expects 'close' message
+ event(T, State, S), %% before 'watchdog'
{noreply, S};
stop ->
?LOG(stop, T),
@@ -159,6 +160,14 @@ handle_info(T, #watchdog{} = State) ->
{stop, {shutdown, T}, State}
end.
+close({'DOWN', _, process, TPid, {shutdown, Reason}},
+ #watchdog{transport = TPid,
+ parent = Pid}) ->
+ send(Pid, {close, self(), Reason});
+
+close(_, _) ->
+ ok.
+
event(_, #watchdog{status = T}, #watchdog{status = T}) ->
ok;
@@ -170,7 +179,7 @@ event(Msg,
#watchdog{status = To, transport = T}) ->
TPid = tpid(F,T),
E = {[TPid | data(Msg, TPid, From, To)], From, To},
- notify(Pid, E),
+ send(Pid, {watchdog, self(), E}),
?LOG(transition, {self(), E}).
data(Msg, TPid, reopen, okay) ->
@@ -193,8 +202,8 @@ tpid(_, Pid)
tpid(Pid, _) ->
Pid.
-notify(Pid, E) ->
- Pid ! {watchdog, self(), E}.
+send(Pid, T) ->
+ Pid ! T.
%% terminate/2
@@ -232,7 +241,7 @@ transition({shutdown, Pid, _}, #watchdog{parent = Pid,
transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid,
transport = TPid}
= S) ->
- TPid ! {T, self(), Reason},
+ send(TPid, {T, self(), Reason}),
S#watchdog{shutdown = true};
%% Parent process has died,
@@ -243,13 +252,9 @@ transition({'DOWN', _, process, Pid, _Reason},
%% Transport has accepted a connection.
transition({accepted = T, TPid}, #watchdog{transport = TPid,
parent = Pid}) ->
- Pid ! {T, self(), TPid},
+ send(Pid, {T, self(), TPid}),
ok;
-%% Transport is telling us that its impending death isn't failure.
-transition({close, TPid, _Reason}, #watchdog{transport = TPid}) ->
- stop;
-
%% STATE Event Actions New State
%% ===== ------ ------- ----------
%% INITIAL Connection up SetWatchdog() OKAY
@@ -301,24 +306,17 @@ transition({open = Key, TPid, _Hosts, T},
%% REOPEN Connection down CloseConnection()
%% SetWatchdog() DOWN
-transition({'DOWN', _, process, TPid, _},
+transition({'DOWN', _, process, TPid, _Reason},
#watchdog{transport = TPid,
- status = S,
- shutdown = D})
- when S == initial;
- D ->
+ shutdown = true}) ->
stop;
-transition({'DOWN', _, process, TPid, _},
+transition({'DOWN', _, process, TPid, _Reason},
#watchdog{transport = TPid}
= S) ->
set_watchdog(S#watchdog{status = down,
pending = false,
transport = undefined});
-%% Any outstanding pending (or other messages from the transport) will
-%% have arrived before 'DOWN' since the message comes from the same
-%% process. Note that we could also get this message in the initial
-%% state.
%% Incoming message.
transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) ->
@@ -334,7 +332,7 @@ transition({timeout, _, tw}, #watchdog{}) ->
%% State query.
transition({state, Pid}, #watchdog{status = S}) ->
- Pid ! {self(), S},
+ send(Pid, {self(), S}),
ok.
%% ===========================================================================
@@ -389,7 +387,7 @@ okay([{_,P}]) ->
%% ... or it has.
okay(C) ->
- [_|_] = [P ! close || {_,P} <- C, self() /= P],
+ [_|_] = [send(P, close) || {_,P} <- C, self() /= P],
reopen.
%% set_watchdog/1
@@ -417,7 +415,7 @@ send_watchdog(#watchdog{pending = false,
transport = TPid,
sequence = Mask}
= S) ->
- TPid ! {send, encode(getr(dwr), Mask)},
+ send(TPid, {send, encode(getr(dwr), Mask)}),
?LOG(send, 'DWR'),
S#watchdog{pending = true}.
@@ -628,7 +626,7 @@ restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid,
sequence = Mask,
restrict = {R,_}}
= S) ->
- Pid ! {reconnect, self()},
+ send(Pid, {reconnect, self()}),
Nodes = restrict_nodes(R),
S#watchdog{transport = monitor(diameter_peer_fsm:start(T,
Opts,