diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 57 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 52 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 48 |
3 files changed, 65 insertions, 92 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index b515a599ed..d593d0ab84 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -126,12 +126,7 @@ %% State Machine rather than closer to the transport. This is what we %% now do below: connect/accept call diameter_watchdog and return the %% pid of the watchdog process, and the watchdog in turn calls start/3 -%% below to start the process implementing the Peer State Machine. The -%% former is a "peer" in diameter_service while the latter is a -%% "conn". In a sense, diameter_service sees the watchdog as -%% implementing the Peer State Machine and the process implemented -%% here as being the transport, not being aware of the watchdog at -%% all. +%% below to start the process implementing the Peer State Machine. %% %%% --------------------------------------------------------------------------- @@ -274,13 +269,12 @@ handle_info(T, #state{} = State) -> {noreply, S}; {stop, Reason} -> ?LOG(stop, Reason), - x(Reason, State); + {stop, {shutdown, Reason}, State}; stop -> ?LOG(stop, T), - x(T, State) + {stop, {shutdown, T}, State} catch exit: {diameter_codec, encode, _} = Reason -> - close_wd(Reason, State#state.parent), ?LOG(stop, Reason), %% diameter_codec:encode/2 emits an error report. Only %% indicate the probable reason here. @@ -300,10 +294,6 @@ handle_info(T, #state{} = State) -> %% succesfully encoded. It's not checked at diameter:add_transport/2 %% since this can be called before creating the service. -x(Reason, #state{} = S) -> - close_wd(Reason, S), - {stop, {shutdown, Reason}, S}. - %% terminate/2 terminate(_, _) -> @@ -378,9 +368,8 @@ transition({diameter, {recv, Pkt}}, S) -> recv(Pkt, S); %% Timeout when still in the same state ... -transition({timeout = T, PS}, #state{state = PS} = S) -> - close({capx(PS), T}, S), - stop; +transition({timeout = T, PS}, #state{state = PS}) -> + {stop, {capx(PS), T}}; %% ... or not. transition({timeout, _}, _) -> @@ -457,7 +446,7 @@ send_CER(#state{state = {'Wait-Conn-Ack', Tmo}, OH = LCaps#diameter_caps.origin_host, req_send_CER(OH, Remote) orelse - close({already_connected, Remote, LCaps}, S), + close({already_connected, Remote, LCaps}), CER = build_CER(S), ?LOG(send, 'CER'), #diameter_packet{header = #diameter_header{end_to_end_id = Eid, @@ -690,17 +679,17 @@ cea(CEA, RC) -> CEA#diameter_base_CEA{'Result-Code' = RC}. post('CER' = T, RC, Pkt, S) -> - [fun close/2, {T, caps(S), {RC, Pkt}}]; + [fun(_) -> close({T, caps(S), {RC, Pkt}}) end]; post(_, _, _, _) -> ok. rejected({capabilities_cb, _F, Reason}, T, S) -> rejected(Reason, T, S); -rejected(discard, T, S) -> - close(T, S); +rejected(discard, T, _) -> + close(T); rejected({N, Es}, T, S) -> - {answer('CER', N, Es, S), [fun close/2, T]}; + {answer('CER', N, Es, S), [fun(_) -> close(T) end]}; rejected(N, T, S) -> rejected({N, []}, T, S). @@ -860,7 +849,7 @@ handle_CEA(#diameter_packet{bin = Bin} of _ -> open(DPkt, SApps, Caps, {connect, hd([_] = IS)}, S) catch - ?FAILURE(Reason) -> close({'CEA', Reason, Caps, DPkt}, S) + ?FAILURE(Reason) -> close({'CEA', Reason, Caps, DPkt}) end. %% Check more than the result code since the peer could send success %% regardless. If not 2001 then a peer_up callback could do anything @@ -880,7 +869,7 @@ recv_CEA(#diameter_packet{header = #diameter_header{version T; recv_CEA(Pkt, S) -> - close({'CEA', caps(S), Pkt}, S). + close({'CEA', caps(S), Pkt}). caps(#diameter_service{capabilities = Caps}) -> Caps; @@ -933,14 +922,14 @@ open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid, %% We've advertised TLS support: tell the transport the result %% and expect a reply when the handshake is complete. -tls_ack(true, Caps, Type, IS, #state{transport = TPid} = S) -> +tls_ack(true, Caps, Type, IS, #state{transport = TPid}) -> Ref = make_ref(), TPid ! {diameter, {tls, Ref, Type, IS == ?TLS}}, receive {diameter, {tls, Ref}} -> ok; {'DOWN', _, process, TPid, Reason} -> - close({tls_ack, Reason, Caps}, S) + close({tls_ack, Reason, Caps}) end; %% Or not. Don't send anything to the transport so that transports @@ -953,25 +942,11 @@ capz(#diameter_caps{} = L, #diameter_caps{} = R) -> = list_to_tuple([diameter_caps | lists:zip(tl(tuple_to_list(L)), tl(tuple_to_list(R)))]). -%% close/2 +%% close/1 -%% Tell the watchdog that our death isn't due to transport failure. -close(Reason, #state{parent = Pid}) -> - close_wd(Reason, Pid), +close(Reason) -> throw({?MODULE, close, Reason}). -%% close_wd/2 - -%% Ensure the watchdog dies if DPR has been sent ... -close_wd(_, #state{dpr = false}) -> - ok; -close_wd(Reason, #state{parent = Pid}) -> - close_wd(Reason, Pid); - -%% ... or otherwise -close_wd(Reason, Pid) -> - Pid ! {close, self(), Reason}. - %% dwa/1 dwa(#diameter_caps{origin_host = OH, diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 4d21d28512..3cab914fdb 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -509,6 +509,19 @@ transition({reconnect, Pid}, S) -> reconnect(Pid, S), ok; +%% Watchdog is sending notification of transport death. +transition({close, Pid, Reason}, #state{service_name = SvcName, + watchdogT = WatchdogT}) -> + #watchdog{state = WS, + ref = Ref, + type = Type, + options = Opts} + = fetch(WatchdogT, Pid), + WS /= ?WD_OKAY + andalso + send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}), + ok; + %% Watchdog is sending notification of a state transition. transition({watchdog, Pid, {[TPid | Data], From, To}}, #state{service_name = SvcName, @@ -532,9 +545,9 @@ transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) -> {stop, {monitor, Reason}}; %% Local watchdog process has died. -transition({'DOWN', _, process, Pid, Reason}, S) +transition({'DOWN', _, process, Pid, _Reason}, S) when node(Pid) == node() -> - peer_down(Pid, Reason, S), + watchdog_down(Pid, S), ok; %% Remote service wants to know about shared peers. @@ -1065,44 +1078,31 @@ down_conn(Id, Alias, TC, {SvcName, Apps}) -> peer_cb({ModX, peer_down, [SvcName, TC]}, Alias). %%% --------------------------------------------------------------------------- -%%% # peer_down/3 +%%% # watchdog_down/2 %%% --------------------------------------------------------------------------- %% Watchdog process has died. -peer_down(Pid, Reason, #state{watchdogT = WatchdogT} = S) -> - P = fetch(WatchdogT, Pid), - ets:delete_object(WatchdogT, P), - closed(Reason, P, S), - restart(P,S), - peer_down(P,S). - -%% Send an event at connection establishment failure. -closed({shutdown, {close, _TPid, Reason}}, - #watchdog{state = WS, - ref = Ref, - type = Type, - options = Opts}, - #state{service_name = SvcName}) - when WS /= ?WD_OKAY -> - send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}); -closed(_, _, _) -> - ok. +watchdog_down(Pid, #state{watchdogT = WatchdogT} = S) -> + Wd = fetch(WatchdogT, Pid), + ets:delete_object(WatchdogT, Wd), + restart(Wd,S), + wd_down(Wd,S). -%% The watchdog has never reached OKAY ... -peer_down(#watchdog{peer = B}, _) +%% Watchdog has never reached OKAY ... +wd_down(#watchdog{peer = B}, _) when is_boolean(B) -> ok; %% ... or maybe it has. -peer_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> +wd_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> connection_down(Wd, ?WD_DOWN, S), ets:delete(PeerT, TPid). %% restart/2 -restart(P,S) -> - q_restart(restart(P), S). +restart(Wd, S) -> + q_restart(restart(Wd), S). %% restart/1 diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 8d10755180..ece5f0e359 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -82,7 +82,7 @@ start({_,_} = Type, T) -> try {erlang:monitor(process, Pid), Pid} after - Pid ! Ref + send(Pid, Ref) end. start_link(T) -> @@ -151,7 +151,8 @@ handle_info(T, #watchdog{} = State) -> ok -> {noreply, State}; #watchdog{} = S -> - event(T, State, S), + close(T, State), %% service expects 'close' message + event(T, State, S), %% before 'watchdog' {noreply, S}; stop -> ?LOG(stop, T), @@ -159,6 +160,14 @@ handle_info(T, #watchdog{} = State) -> {stop, {shutdown, T}, State} end. +close({'DOWN', _, process, TPid, {shutdown, Reason}}, + #watchdog{transport = TPid, + parent = Pid}) -> + send(Pid, {close, self(), Reason}); + +close(_, _) -> + ok. + event(_, #watchdog{status = T}, #watchdog{status = T}) -> ok; @@ -170,7 +179,7 @@ event(Msg, #watchdog{status = To, transport = T}) -> TPid = tpid(F,T), E = {[TPid | data(Msg, TPid, From, To)], From, To}, - notify(Pid, E), + send(Pid, {watchdog, self(), E}), ?LOG(transition, {self(), E}). data(Msg, TPid, reopen, okay) -> @@ -193,8 +202,8 @@ tpid(_, Pid) tpid(Pid, _) -> Pid. -notify(Pid, E) -> - Pid ! {watchdog, self(), E}. +send(Pid, T) -> + Pid ! T. %% terminate/2 @@ -232,7 +241,7 @@ transition({shutdown, Pid, _}, #watchdog{parent = Pid, transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid, transport = TPid} = S) -> - TPid ! {T, self(), Reason}, + send(TPid, {T, self(), Reason}), S#watchdog{shutdown = true}; %% Parent process has died, @@ -243,13 +252,9 @@ transition({'DOWN', _, process, Pid, _Reason}, %% Transport has accepted a connection. transition({accepted = T, TPid}, #watchdog{transport = TPid, parent = Pid}) -> - Pid ! {T, self(), TPid}, + send(Pid, {T, self(), TPid}), ok; -%% Transport is telling us that its impending death isn't failure. -transition({close, TPid, _Reason}, #watchdog{transport = TPid}) -> - stop; - %% STATE Event Actions New State %% ===== ------ ------- ---------- %% INITIAL Connection up SetWatchdog() OKAY @@ -301,24 +306,17 @@ transition({open = Key, TPid, _Hosts, T}, %% REOPEN Connection down CloseConnection() %% SetWatchdog() DOWN -transition({'DOWN', _, process, TPid, _}, +transition({'DOWN', _, process, TPid, _Reason}, #watchdog{transport = TPid, - status = S, - shutdown = D}) - when S == initial; - D -> + shutdown = true}) -> stop; -transition({'DOWN', _, process, TPid, _}, +transition({'DOWN', _, process, TPid, _Reason}, #watchdog{transport = TPid} = S) -> set_watchdog(S#watchdog{status = down, pending = false, transport = undefined}); -%% Any outstanding pending (or other messages from the transport) will -%% have arrived before 'DOWN' since the message comes from the same -%% process. Note that we could also get this message in the initial -%% state. %% Incoming message. transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) -> @@ -334,7 +332,7 @@ transition({timeout, _, tw}, #watchdog{}) -> %% State query. transition({state, Pid}, #watchdog{status = S}) -> - Pid ! {self(), S}, + send(Pid, {self(), S}), ok. %% =========================================================================== @@ -389,7 +387,7 @@ okay([{_,P}]) -> %% ... or it has. okay(C) -> - [_|_] = [P ! close || {_,P} <- C, self() /= P], + [_|_] = [send(P, close) || {_,P} <- C, self() /= P], reopen. %% set_watchdog/1 @@ -417,7 +415,7 @@ send_watchdog(#watchdog{pending = false, transport = TPid, sequence = Mask} = S) -> - TPid ! {send, encode(getr(dwr), Mask)}, + send(TPid, {send, encode(getr(dwr), Mask)}), ?LOG(send, 'DWR'), S#watchdog{pending = true}. @@ -628,7 +626,7 @@ restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid, sequence = Mask, restrict = {R,_}} = S) -> - Pid ! {reconnect, self()}, + send(Pid, {reconnect, self()}), Nodes = restrict_nodes(R), S#watchdog{transport = monitor(diameter_peer_fsm:start(T, Opts, |