From f115a9f7428abd12b8ec50d4cbeb654b3efa0eb1 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 25 Jan 2013 18:49:50 +0100 Subject: Simplify watchdog transitions in service process In particular, use watchdog messages as input and do away with the older connection_up/down (and other) messages. Also, only maintain the watchdog state, not the older up/down op state. --- lib/diameter/src/base/diameter_service.erl | 253 +++++++++++++---------------- 1 file changed, 111 insertions(+), 142 deletions(-) (limited to 'lib/diameter/src/base/diameter_service.erl') diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index d01a5e5ab6..a0af92c2a2 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -64,15 +64,7 @@ -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). -%% The states mirrored by peer_up/peer_down callbacks. --define(STATE_UP, up). --define(STATE_DOWN, down). - --type op_state() :: ?STATE_UP - | ?STATE_DOWN. - -%% The RFC 3539 watchdog states that are now maintained, albeit -%% along with the old up/down. okay = up, else down. +%% RFC 3539 watchdog states. -define(WD_INITIAL, initial). -define(WD_OKAY, okay). -define(WD_SUSPECT, suspect). @@ -150,11 +142,10 @@ type :: match(connect | accept), ref :: match(reference()), %% key into diameter_config options :: match([diameter:transport_opt()]),%% from start_transport - op_state = {?STATE_DOWN, ?WD_INITIAL} - :: match({op_state(), wd_state()}), + wd_state = ?WD_INITIAL :: match(wd_state()), started = now(), %% at process start conn = false :: match(boolean() | pid())}). - %% true at accepted, pid() at connection_up or reopen + %% true at accepted, pid() at okay/reopen %% Record representing a peer process as implemented by %% diameter_peer_fsm. The term "conn" is historical. Despite the name @@ -520,67 +511,26 @@ transition({accepted, Pid, TPid}, S) -> accepted(Pid, TPid, S), ok; -%% Peer process has a new open connection. -transition({connection_up, Pid, T}, S) -> - connection_up(Pid, T, S), - ok; - -%% Watchdog has a new connection that will be opened after DW[RA] -%% exchange. This message was added long after connection_up, to -%% communicate the information as soon as it's available. Leave -%% connection_up as is it for now, duplicated information and all. -transition({reopen, Pid, T}, S) -> - reopen(Pid, T, S), - ok; - -%% Watchdog has left state OKAY. -transition({connection_down, Pid}, S) -> - connection_down(Pid, S), - ok; - -%% Watchdog has returned to state OKAY. -transition({connection_up, Pid}, S) -> - connection_up(Pid, S), - ok; - -%% Accepting transport has lost connectivity. -transition({close, Pid}, S) -> - close(Pid, S), - ok; - %% Connecting transport is being restarted by watchdog. transition({reconnect, Pid}, S) -> reconnect(Pid, S), ok; -%% Watchdog is sending notification of a state transition. Note that -%% the connection_up/down messages pre-date this message and are still -%% used. A watchdog message will follow these and communicate the same -%% state as was set in handling connection_up/down. -transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName, - peerT = PeerT}) -> - #peer{ref = Ref, type = T, options = Opts, op_state = {OS,_}} +%% Watchdog is sending notification of a state transition. +transition({watchdog, Pid, {[TPid | Data], From, To}}, + #state{service_name = SvcName, + peerT = PeerT} + = S) -> + #peer{ref = Ref, type = T, options = Opts} = P = fetch(PeerT, Pid), - insert(PeerT, P#peer{op_state = {OS, To}}), + watchdog(TPid, Data, From, To, P, S), send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}), ok; %% Death of a watchdog process (#peer.pid) results in the removal of -%% it's peer and any associated conn record when 'DOWN' is received -%% (after this) but the states will be {?STATE_UP, ?WD_DOWN} for a -%% short time. (No real problem since ?WD_* is only used in -%% service_info.) We set ?WD_OKAY as a consequence of connection_up -%% since we know a watchdog is coming. We can't set anything at -%% connection_down since we don't know if the subsequent watchdog -%% message will be ?WD_DOWN or ?WD_SUSPECT. We don't (yet) set -%% ?STATE_* as a consequence of a watchdog message since this requires -%% changing some of the matching on ?STATE_*. -%% +%% it's peer and any associated conn record when 'DOWN' is received. %% Death of a peer process process (#conn.pid, #peer.conn) results in -%% connection_down followed by watchdog ?WD_DOWN. The latter doesn't -%% result in the conn record being deleted since 'DOWN' from death of -%% its watchdog doesn't (yet) deal with the record having been -%% removed. +%% ?WD_DOWN. %% Monitor process has died. Just die with a reason that tells %% diameter_config about the happening. If a cleaner shutdown is @@ -945,21 +895,53 @@ fetch(Tid, Key) -> [T] = ets:lookup(Tid, Key), T. +%%% --------------------------------------------------------------------------- +%%% # watchdog/6 +%%% --------------------------------------------------------------------------- + +%% Watchdog has a new open connection. +watchdog(TPid, [T], _, ?WD_OKAY, Peer, State) -> + connection_up({TPid, T}, Peer, State); + +%% Watchdog has a new connection that will be opened after DW[RA] +%% exchange. +watchdog(TPid, [T], _, ?WD_REOPEN, Peer, State) -> + reopen({TPid, T}, Peer, State); + +%% Watchdog has recovered a suspect connection. +watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Peer, State) -> + #peer{conn = TPid} = Peer, %% assert + connection_up(Peer, State); + +%% Watchdog has an unresponsive connection. +watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Peer, State) -> + #peer{conn = TPid} = Peer, %% assert + connection_down(Peer, To, State); + +%% Watchdog has lost its connection. +watchdog(TPid, [], _, ?WD_DOWN = To, Peer, #state{connT = ConnT} = S) -> + close(Peer, S), + connection_down(Peer, To, S), + ets:delete(ConnT, TPid); + +watchdog(_, [], _, _, _, _) -> + ok. + %%% --------------------------------------------------------------------------- %%% # connection_up/3 %%% --------------------------------------------------------------------------- %% Watchdog process has reached state OKAY. -connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT, - connT = ConnT} - = S) -> - P = fetch(PeerT, Pid), +connection_up({TPid, {Caps, SApps, Pkt}}, + #peer{pid = Pid} + = P, + #state{connT = ConnT} + = S) -> C = #conn{pid = TPid, apps = SApps, caps = Caps, peer = Pid}, - insert(ConnT, C), connection_up([Pkt], P#peer{conn = TPid}, C, S). @@ -967,56 +949,45 @@ connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT, %%% # reopen/3 %%% --------------------------------------------------------------------------- -%% Note that this connection_up/3 rewrites the same #conn{} now -%% written here. Both do so in case reopen has not happened in old -%% code. - -reopen(Pid, {TPid, {Caps, SApps, _Pkt}}, #state{peerT = PeerT, - connT = ConnT}) -> - P = fetch(PeerT, Pid), - C = #conn{pid = TPid, - apps = SApps, - caps = Caps, - peer = Pid}, - - insert(ConnT, C), - #peer{op_state = {?STATE_DOWN, _}} - = P, - insert(PeerT, P#peer{op_state = {?STATE_DOWN, ?WD_REOPEN}, +reopen({TPid, {Caps, SApps, _Pkt}}, + #peer{pid = Pid} + = P, + #state{peerT = PeerT, + connT = ConnT}) -> + insert(ConnT, #conn{pid = TPid, + apps = SApps, + caps = Caps, + peer = Pid}), + insert(PeerT, P#peer{wd_state = ?WD_REOPEN, conn = TPid}). %%% --------------------------------------------------------------------------- %%% # connection_up/2 %%% --------------------------------------------------------------------------- -%% Peer process has transitioned back into the open state. Note that there -%% has been no new capabilties exchange in this case. +%% Watchdog has recovered as suspect connection. Note that there has +%% been no new capabilties exchange in this case. -connection_up(Pid, #state{peerT = PeerT, - connT = ConnT} - = S) -> - #peer{conn = TPid} = P = fetch(PeerT, Pid), - C = fetch(ConnT, TPid), - connection_up([], P, C, S). +connection_up(#peer{conn = TPid} = P, #state{connT = ConnT} = S) -> + connection_up([], P, fetch(ConnT, TPid), S). %% connection_up/4 -connection_up(T, P, C, #state{peerT = PeerT, - local_peers = LDict, - service_name = SvcName, - service - = #diameter_service{applications = Apps}} - = S) -> - #peer{conn = TPid, op_state = {?STATE_DOWN, _}} - = P, - #conn{apps = SApps, caps = Caps} - = C, - - insert(PeerT, P#peer{op_state = {?STATE_UP, ?WD_OKAY}}), - +connection_up(Extra, + #peer{conn = TPid} + = P, + #conn{apps = SApps, caps = Caps} + = C, + #state{peerT = PeerT, + local_peers = LDict, + service_name = SvcName, + service + = #diameter_service{applications = Apps}} + = S) -> + insert(PeerT, P#peer{wd_state = ?WD_OKAY}), request_peer_up(TPid), insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), - report_status(up, P, C, S, T). + report_status(up, P, C, S, Extra). insert_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps). @@ -1052,30 +1023,11 @@ peer_cb(MFA, Alias) -> end. %%% --------------------------------------------------------------------------- -%%% # connection_down/2 +%%% # connection_down/3 %%% --------------------------------------------------------------------------- -%% Watchdog has transitioned out of state OKAY. - -connection_down(Pid, #state{peerT = PeerT, - connT = ConnT} - = S) -> - #peer{op_state = {?STATE_UP, WS}, %% assert - conn = TPid} - = P - = fetch(PeerT, Pid), - - C = fetch(ConnT, TPid), - insert(PeerT, P#peer{op_state = {?STATE_DOWN, WS}}), - connection_down(P,C,S). - -%% connection_down/3 - -connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, _) -> - ok; - -connection_down(#peer{conn = TPid, - op_state = {?STATE_UP, _}} +connection_down(#peer{wd_state = ?WD_OKAY, + conn = TPid} = P, #conn{caps = Caps, apps = SApps} @@ -1086,7 +1038,23 @@ connection_down(#peer{conn = TPid, = S) -> report_status(down, P, C, S, []), remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), - request_peer_down(TPid, S). + request_peer_down(TPid, S); + +connection_down(#peer{}, #conn{}, _) -> + ok; + +connection_down(#peer{wd_state = WS, + conn = TPid} + = P, + To, + #state{peerT = PeerT, + connT = ConnT} + = S) + when is_atom(To) -> + insert(PeerT, P#peer{wd_state = To}), + ?WD_OKAY == WS + andalso + connection_down(P, fetch(ConnT, TPid), S). remove_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). @@ -1118,11 +1086,12 @@ peer_down(Pid, Reason, #state{peerT = PeerT} = S) -> %% Send an event at connection establishment failure. closed({shutdown, {close, _TPid, Reason}}, - #peer{op_state = {?STATE_DOWN, _}, + #peer{wd_state = WS, ref = Ref, type = Type, options = Opts}, - #state{service_name = SvcName}) -> + #state{service_name = SvcName}) + when WS /= ?WD_OKAY -> send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}); closed(_, _, _) -> ok. @@ -1134,9 +1103,8 @@ peer_down(#peer{conn = B}, _) %% ... or maybe it has. peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) -> - #conn{} = C = fetch(ConnT, TPid), - ets:delete_object(ConnT, C), - connection_down(P,C,S). + connection_down(P, ?WD_DOWN, S), + ets:delete(ConnT, TPid). %% restart/2 @@ -1222,14 +1190,13 @@ tc(false = No, _, _) -> %% removed %% the accepting watchdog upon reception of a CER from the previously %% connected peer, or us after reconnect_timer timeout. -close(Pid, #state{service_name = SvcName, - peerT = PeerT}) -> - #peer{pid = Pid, - type = accept, - ref = Ref, - options = Opts} - = fetch(PeerT, Pid), - +close(#peer{type = connect}, _) -> + ok; +close(#peer{type = accept, + pid = Pid, + ref = Ref, + options = Opts}, + #state{service_name = SvcName}) -> c(Pid, diameter_config:have_transport(SvcName, Ref), Opts). %% Tell watchdog to (maybe) die later ... @@ -1716,6 +1683,8 @@ have_request(Pkt, TPid) -> %% request_peer_up/1 +%% Insert an element that is used to detect whether or not there has +%% been a failover when inserting an outgoing request. request_peer_up(TPid) -> ets:insert(?REQUEST_TABLE, {TPid}). @@ -3147,7 +3116,7 @@ peer_acc(ConnT, Acc, #peer{pid = Pid, type = Type, ref = Ref, options = Opts, - op_state = {_, WS}, + wd_state = WS, started = T, conn = TPid}) -> dict:append(Ref, -- cgit v1.2.3