diff options
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 253 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 76 |
2 files changed, 137 insertions, 192 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index d01a5e5ab6..a0af92c2a2 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -64,15 +64,7 @@ -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). -%% The states mirrored by peer_up/peer_down callbacks. --define(STATE_UP, up). --define(STATE_DOWN, down). - --type op_state() :: ?STATE_UP - | ?STATE_DOWN. - -%% The RFC 3539 watchdog states that are now maintained, albeit -%% along with the old up/down. okay = up, else down. +%% RFC 3539 watchdog states. -define(WD_INITIAL, initial). -define(WD_OKAY, okay). -define(WD_SUSPECT, suspect). @@ -150,11 +142,10 @@ type :: match(connect | accept), ref :: match(reference()), %% key into diameter_config options :: match([diameter:transport_opt()]),%% from start_transport - op_state = {?STATE_DOWN, ?WD_INITIAL} - :: match({op_state(), wd_state()}), + wd_state = ?WD_INITIAL :: match(wd_state()), started = now(), %% at process start conn = false :: match(boolean() | pid())}). - %% true at accepted, pid() at connection_up or reopen + %% true at accepted, pid() at okay/reopen %% Record representing a peer process as implemented by %% diameter_peer_fsm. The term "conn" is historical. Despite the name @@ -520,67 +511,26 @@ transition({accepted, Pid, TPid}, S) -> accepted(Pid, TPid, S), ok; -%% Peer process has a new open connection. -transition({connection_up, Pid, T}, S) -> - connection_up(Pid, T, S), - ok; - -%% Watchdog has a new connection that will be opened after DW[RA] -%% exchange. This message was added long after connection_up, to -%% communicate the information as soon as it's available. Leave -%% connection_up as is it for now, duplicated information and all. -transition({reopen, Pid, T}, S) -> - reopen(Pid, T, S), - ok; - -%% Watchdog has left state OKAY. -transition({connection_down, Pid}, S) -> - connection_down(Pid, S), - ok; - -%% Watchdog has returned to state OKAY. -transition({connection_up, Pid}, S) -> - connection_up(Pid, S), - ok; - -%% Accepting transport has lost connectivity. -transition({close, Pid}, S) -> - close(Pid, S), - ok; - %% Connecting transport is being restarted by watchdog. transition({reconnect, Pid}, S) -> reconnect(Pid, S), ok; -%% Watchdog is sending notification of a state transition. Note that -%% the connection_up/down messages pre-date this message and are still -%% used. A watchdog message will follow these and communicate the same -%% state as was set in handling connection_up/down. -transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName, - peerT = PeerT}) -> - #peer{ref = Ref, type = T, options = Opts, op_state = {OS,_}} +%% Watchdog is sending notification of a state transition. +transition({watchdog, Pid, {[TPid | Data], From, To}}, + #state{service_name = SvcName, + peerT = PeerT} + = S) -> + #peer{ref = Ref, type = T, options = Opts} = P = fetch(PeerT, Pid), - insert(PeerT, P#peer{op_state = {OS, To}}), + watchdog(TPid, Data, From, To, P, S), send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}), ok; %% Death of a watchdog process (#peer.pid) results in the removal of -%% it's peer and any associated conn record when 'DOWN' is received -%% (after this) but the states will be {?STATE_UP, ?WD_DOWN} for a -%% short time. (No real problem since ?WD_* is only used in -%% service_info.) We set ?WD_OKAY as a consequence of connection_up -%% since we know a watchdog is coming. We can't set anything at -%% connection_down since we don't know if the subsequent watchdog -%% message will be ?WD_DOWN or ?WD_SUSPECT. We don't (yet) set -%% ?STATE_* as a consequence of a watchdog message since this requires -%% changing some of the matching on ?STATE_*. -%% +%% it's peer and any associated conn record when 'DOWN' is received. %% Death of a peer process process (#conn.pid, #peer.conn) results in -%% connection_down followed by watchdog ?WD_DOWN. The latter doesn't -%% result in the conn record being deleted since 'DOWN' from death of -%% its watchdog doesn't (yet) deal with the record having been -%% removed. +%% ?WD_DOWN. %% Monitor process has died. Just die with a reason that tells %% diameter_config about the happening. If a cleaner shutdown is @@ -946,20 +896,52 @@ fetch(Tid, Key) -> T. %%% --------------------------------------------------------------------------- +%%% # watchdog/6 +%%% --------------------------------------------------------------------------- + +%% Watchdog has a new open connection. +watchdog(TPid, [T], _, ?WD_OKAY, Peer, State) -> + connection_up({TPid, T}, Peer, State); + +%% Watchdog has a new connection that will be opened after DW[RA] +%% exchange. +watchdog(TPid, [T], _, ?WD_REOPEN, Peer, State) -> + reopen({TPid, T}, Peer, State); + +%% Watchdog has recovered a suspect connection. +watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Peer, State) -> + #peer{conn = TPid} = Peer, %% assert + connection_up(Peer, State); + +%% Watchdog has an unresponsive connection. +watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Peer, State) -> + #peer{conn = TPid} = Peer, %% assert + connection_down(Peer, To, State); + +%% Watchdog has lost its connection. +watchdog(TPid, [], _, ?WD_DOWN = To, Peer, #state{connT = ConnT} = S) -> + close(Peer, S), + connection_down(Peer, To, S), + ets:delete(ConnT, TPid); + +watchdog(_, [], _, _, _, _) -> + ok. + +%%% --------------------------------------------------------------------------- %%% # connection_up/3 %%% --------------------------------------------------------------------------- %% Watchdog process has reached state OKAY. -connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT, - connT = ConnT} - = S) -> - P = fetch(PeerT, Pid), +connection_up({TPid, {Caps, SApps, Pkt}}, + #peer{pid = Pid} + = P, + #state{connT = ConnT} + = S) -> C = #conn{pid = TPid, apps = SApps, caps = Caps, peer = Pid}, - insert(ConnT, C), connection_up([Pkt], P#peer{conn = TPid}, C, S). @@ -967,56 +949,45 @@ connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT, %%% # reopen/3 %%% --------------------------------------------------------------------------- -%% Note that this connection_up/3 rewrites the same #conn{} now -%% written here. Both do so in case reopen has not happened in old -%% code. - -reopen(Pid, {TPid, {Caps, SApps, _Pkt}}, #state{peerT = PeerT, - connT = ConnT}) -> - P = fetch(PeerT, Pid), - C = #conn{pid = TPid, - apps = SApps, - caps = Caps, - peer = Pid}, - - insert(ConnT, C), - #peer{op_state = {?STATE_DOWN, _}} - = P, - insert(PeerT, P#peer{op_state = {?STATE_DOWN, ?WD_REOPEN}, +reopen({TPid, {Caps, SApps, _Pkt}}, + #peer{pid = Pid} + = P, + #state{peerT = PeerT, + connT = ConnT}) -> + insert(ConnT, #conn{pid = TPid, + apps = SApps, + caps = Caps, + peer = Pid}), + insert(PeerT, P#peer{wd_state = ?WD_REOPEN, conn = TPid}). %%% --------------------------------------------------------------------------- %%% # connection_up/2 %%% --------------------------------------------------------------------------- -%% Peer process has transitioned back into the open state. Note that there -%% has been no new capabilties exchange in this case. +%% Watchdog has recovered as suspect connection. Note that there has +%% been no new capabilties exchange in this case. -connection_up(Pid, #state{peerT = PeerT, - connT = ConnT} - = S) -> - #peer{conn = TPid} = P = fetch(PeerT, Pid), - C = fetch(ConnT, TPid), - connection_up([], P, C, S). +connection_up(#peer{conn = TPid} = P, #state{connT = ConnT} = S) -> + connection_up([], P, fetch(ConnT, TPid), S). %% connection_up/4 -connection_up(T, P, C, #state{peerT = PeerT, - local_peers = LDict, - service_name = SvcName, - service - = #diameter_service{applications = Apps}} - = S) -> - #peer{conn = TPid, op_state = {?STATE_DOWN, _}} - = P, - #conn{apps = SApps, caps = Caps} - = C, - - insert(PeerT, P#peer{op_state = {?STATE_UP, ?WD_OKAY}}), - +connection_up(Extra, + #peer{conn = TPid} + = P, + #conn{apps = SApps, caps = Caps} + = C, + #state{peerT = PeerT, + local_peers = LDict, + service_name = SvcName, + service + = #diameter_service{applications = Apps}} + = S) -> + insert(PeerT, P#peer{wd_state = ?WD_OKAY}), request_peer_up(TPid), insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), - report_status(up, P, C, S, T). + report_status(up, P, C, S, Extra). insert_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps). @@ -1052,30 +1023,11 @@ peer_cb(MFA, Alias) -> end. %%% --------------------------------------------------------------------------- -%%% # connection_down/2 +%%% # connection_down/3 %%% --------------------------------------------------------------------------- -%% Watchdog has transitioned out of state OKAY. - -connection_down(Pid, #state{peerT = PeerT, - connT = ConnT} - = S) -> - #peer{op_state = {?STATE_UP, WS}, %% assert - conn = TPid} - = P - = fetch(PeerT, Pid), - - C = fetch(ConnT, TPid), - insert(PeerT, P#peer{op_state = {?STATE_DOWN, WS}}), - connection_down(P,C,S). - -%% connection_down/3 - -connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, _) -> - ok; - -connection_down(#peer{conn = TPid, - op_state = {?STATE_UP, _}} +connection_down(#peer{wd_state = ?WD_OKAY, + conn = TPid} = P, #conn{caps = Caps, apps = SApps} @@ -1086,7 +1038,23 @@ connection_down(#peer{conn = TPid, = S) -> report_status(down, P, C, S, []), remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), - request_peer_down(TPid, S). + request_peer_down(TPid, S); + +connection_down(#peer{}, #conn{}, _) -> + ok; + +connection_down(#peer{wd_state = WS, + conn = TPid} + = P, + To, + #state{peerT = PeerT, + connT = ConnT} + = S) + when is_atom(To) -> + insert(PeerT, P#peer{wd_state = To}), + ?WD_OKAY == WS + andalso + connection_down(P, fetch(ConnT, TPid), S). remove_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). @@ -1118,11 +1086,12 @@ peer_down(Pid, Reason, #state{peerT = PeerT} = S) -> %% Send an event at connection establishment failure. closed({shutdown, {close, _TPid, Reason}}, - #peer{op_state = {?STATE_DOWN, _}, + #peer{wd_state = WS, ref = Ref, type = Type, options = Opts}, - #state{service_name = SvcName}) -> + #state{service_name = SvcName}) + when WS /= ?WD_OKAY -> send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}); closed(_, _, _) -> ok. @@ -1134,9 +1103,8 @@ peer_down(#peer{conn = B}, _) %% ... or maybe it has. peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) -> - #conn{} = C = fetch(ConnT, TPid), - ets:delete_object(ConnT, C), - connection_down(P,C,S). + connection_down(P, ?WD_DOWN, S), + ets:delete(ConnT, TPid). %% restart/2 @@ -1222,14 +1190,13 @@ tc(false = No, _, _) -> %% removed %% the accepting watchdog upon reception of a CER from the previously %% connected peer, or us after reconnect_timer timeout. -close(Pid, #state{service_name = SvcName, - peerT = PeerT}) -> - #peer{pid = Pid, - type = accept, - ref = Ref, - options = Opts} - = fetch(PeerT, Pid), - +close(#peer{type = connect}, _) -> + ok; +close(#peer{type = accept, + pid = Pid, + ref = Ref, + options = Opts}, + #state{service_name = SvcName}) -> c(Pid, diameter_config:have_transport(SvcName, Ref), Opts). %% Tell watchdog to (maybe) die later ... @@ -1716,6 +1683,8 @@ have_request(Pkt, TPid) -> %% request_peer_up/1 +%% Insert an element that is used to detect whether or not there has +%% been a failover when inserting an outgoing request. request_peer_up(TPid) -> ets:insert(?REQUEST_TABLE, {TPid}). @@ -3147,7 +3116,7 @@ peer_acc(ConnT, Acc, #peer{pid = Pid, type = Type, ref = Ref, options = Opts, - op_state = {_, WS}, + wd_state = WS, started = T, conn = TPid}) -> dict:append(Ref, diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 49b6776a84..8d10755180 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -151,29 +151,45 @@ handle_info(T, #watchdog{} = State) -> ok -> {noreply, State}; #watchdog{} = S -> - event(State, S), + event(T, State, S), {noreply, S}; stop -> ?LOG(stop, T), - event(State, State#watchdog{status = down}), + event(T, State, State#watchdog{status = down}), {stop, {shutdown, T}, State} end. -event(#watchdog{status = T}, #watchdog{status = T}) -> +event(_, #watchdog{status = T}, #watchdog{status = T}) -> ok; -event(#watchdog{transport = undefined}, #watchdog{transport = undefined}) -> +event(_, #watchdog{transport = undefined}, #watchdog{transport = undefined}) -> ok; -event(#watchdog{status = From, transport = F, parent = Pid}, +event(Msg, + #watchdog{status = From, transport = F, parent = Pid}, #watchdog{status = To, transport = T}) -> - E = {tpid(F,T), From, To}, + TPid = tpid(F,T), + E = {[TPid | data(Msg, TPid, From, To)], From, To}, notify(Pid, E), ?LOG(transition, {self(), E}). +data(Msg, TPid, reopen, okay) -> + {recv, TPid, 'DWA', _Pkt} = Msg, %% assert + {TPid, T} = eraser(open), + [T]; + +data({open, TPid, _Hosts, T}, TPid, _From, To) + when To == okay; + To == reopen -> + [T]; + +data(_, _, _, _) -> + []. + tpid(_, Pid) when is_pid(Pid) -> Pid; + tpid(Pid, _) -> Pid. @@ -248,15 +264,13 @@ transition({close, TPid, _Reason}, #watchdog{transport = TPid}) -> %% know the identity of the peer (ie. now) that we know that we're in %% state down rather than initial. -transition({open, TPid, Hosts, T} = Open, +transition({open, TPid, Hosts, _} = Open, #watchdog{transport = TPid, status = initial, - parent = Pid, restrict = {_, R}} = S) -> case okay(getr(restart), Hosts, R) of okay -> - open(Pid, {TPid, T}), set_watchdog(S#watchdog{status = okay}); reopen -> transition(Open, S#watchdog{status = down}) @@ -267,17 +281,15 @@ transition({open, TPid, Hosts, T} = Open, %% SetWatchdog() %% Pending = TRUE REOPEN -transition({open = P, TPid, _Hosts, T}, +transition({open = Key, TPid, _Hosts, T}, #watchdog{transport = TPid, - parent = Pid, status = down} = S) -> %% Store the info we need to notify the parent to reopen the %% connection after the requisite DWA's are received, at which %% time we eraser(open). The reopen message is a later addition, %% to communicate the new capabilities as soon as they're known. - putr(P, {TPid, T}), - Pid ! {reopen, self(), {TPid, T}}, + putr(Key, {TPid, T}), set_watchdog(send_watchdog(S#watchdog{status = reopen, num_dwa = 0})); @@ -300,8 +312,6 @@ transition({'DOWN', _, process, TPid, _}, transition({'DOWN', _, process, TPid, _}, #watchdog{transport = TPid} = S) -> - failover(S), - close(S), set_watchdog(S#watchdog{status = down, pending = false, transport = undefined}); @@ -401,29 +411,6 @@ tw(T) tw({M,F,A}) -> apply(M,F,A). -%% open/2 - -open(Pid, {_,_} = T) -> - Pid ! {connection_up, self(), T}. - -%% failover/1 - -failover(#watchdog{status = okay, - parent = Pid}) -> - Pid ! {connection_down, self()}; - -failover(_) -> - ok. - -%% close/1 - -close(#watchdog{status = down}) -> - ok; - -close(#watchdog{parent = Pid}) -> - {{T, _}, _, _} = getr(restart), - T == accept andalso (Pid ! {close, self()}). - %% send_watchdog/1 send_watchdog(#watchdog{pending = false, @@ -511,12 +498,10 @@ rcv(_, #watchdog{status = okay} = S) -> %% SetWatchdog() OKAY rcv('DWA', #watchdog{status = suspect} = S) -> - failback(S), set_watchdog(S#watchdog{status = okay, pending = false}); rcv(_, #watchdog{status = suspect} = S) -> - failback(S), set_watchdog(S#watchdog{status = okay}); %% REOPEN Receive DWA & Pending = FALSE @@ -524,10 +509,8 @@ rcv(_, #watchdog{status = suspect} = S) -> %% Failback() OKAY rcv('DWA', #watchdog{status = reopen, - num_dwa = 2 = N, - parent = Pid} + num_dwa = 2 = N} = S) -> - open(Pid, eraser(open)), S#watchdog{status = okay, num_dwa = N+1, pending = false}; @@ -546,11 +529,6 @@ rcv('DWA', #watchdog{status = reopen, rcv(_, #watchdog{status = reopen} = S) -> throwaway(S). -%% failback/1 - -failback(#watchdog{parent = Pid}) -> - Pid ! {connection_up, self()}. - %% timeout/1 %% %% The caller sets the watchdog on the return value. @@ -575,7 +553,6 @@ timeout(#watchdog{status = T, timeout(#watchdog{status = okay, pending = true} = S) -> - failover(S), S#watchdog{status = suspect}; %% SUSPECT Timer expires CloseConnection() @@ -592,7 +569,6 @@ timeout(#watchdog{status = T, when T == suspect; T == reopen, P, N < 0 -> exit(TPid, {shutdown, watchdog_timeout}), - close(S), S#watchdog{status = down}; %% REOPEN Timer expires & NumDWA = -1 |