diff options
Diffstat (limited to 'lib/diameter/src/base/diameter_watchdog.erl')
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 223 |
1 files changed, 148 insertions, 75 deletions
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 53e659e3f6..ea8b2fdb0e 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2014. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -49,8 +50,6 @@ -define(IS_NATURAL(N), (is_integer(N) andalso 0 =< N)). --define(CHOOSE(B,T,F), if (B) -> T; true -> F end). - -record(config, {suspect = 1 :: non_neg_integer(), %% OKAY -> SUSPECT okay = 3 :: non_neg_integer()}). %% REOPEN -> OKAY @@ -67,7 +66,9 @@ %% end PCB parent = self() :: pid(), %% service process transport :: pid() | undefined, %% peer_fsm process - tref :: reference(), %% reference for current watchdog timer + tref :: reference() %% reference for current watchdog timer + | integer() %% monotonic time + | undefined, dictionary :: module(), %% common dictionary receive_data :: term(), %% term passed into diameter_service with incoming message @@ -95,7 +96,7 @@ start({_,_} = Type, T) -> Ack = make_ref(), {ok, Pid} = diameter_watchdog_sup:start_child({Ack, Type, self(), T}), try - {erlang:monitor(process, Pid), Pid} + {monitor(process, Pid), Pid} after send(Pid, Ack) end. @@ -122,17 +123,20 @@ i({Ack, T, Pid, {RecvData, #diameter_service{applications = Apps, capabilities = Caps} = Svc}}) -> - erlang:monitor(process, Pid), + monitor(process, Pid), wait(Ack, Pid), - random:seed(now()), - putr(restart, {T, Opts, Svc}), %% save seeing it in trace - putr(dwr, dwr(Caps)), %% + {_, Seed} = diameter_lib:seed(), + random:seed(Seed), + putr(restart, {T, Opts, Svc, SvcOpts}), %% save seeing it in trace + putr(dwr, dwr(Caps)), %% {_,_} = Mask = proplists:get_value(sequence, SvcOpts), Restrict = proplists:get_value(restrict_connections, SvcOpts), Nodes = restrict_nodes(Restrict), Dict0 = common_dictionary(Apps), + diameter_codec:setopts([{common_dictionary, Dict0}, + {string_decode, false}]), #watchdog{parent = Pid, - transport = start(T, Opts, Mask, Nodes, Dict0, Svc), + transport = start(T, Opts, SvcOpts, Nodes, Dict0, Svc), tw = proplists:get_value(watchdog_timer, Opts, ?DEFAULT_TW_INIT), @@ -167,11 +171,11 @@ config({okay, N}, Rec) when ?IS_NATURAL(N) -> Rec#config{okay = N}. -%% start/5 +%% start/6 -start(T, Opts, Mask, Nodes, Dict0, Svc) -> +start(T, Opts, SvcOpts, Nodes, Dict0, Svc) -> {_MRef, Pid} - = diameter_peer_fsm:start(T, Opts, {Mask, Nodes, Dict0, Svc}), + = diameter_peer_fsm:start(T, Opts, {SvcOpts, Nodes, Dict0, Svc}), Pid. %% common_dictionary/1 @@ -221,7 +225,6 @@ dict0(_, _, Acc) -> Acc. config_error(T) -> - diameter_lib:error_report(configuration_error, T), exit({shutdown, {configuration_error, T}}). %% handle_call/3 @@ -245,11 +248,16 @@ handle_info(T, #watchdog{} = State) -> event(T, State, S), %% before 'watchdog' {noreply, S}; stop -> - ?LOG(stop, T), + ?LOG(stop, truncate(T)), event(T, State, State#watchdog{status = down}), {stop, {shutdown, T}, State} end. +truncate({'DOWN' = T, _, process, Pid, _}) -> + {T, Pid}; +truncate(T) -> + T. + close({'DOWN', _, process, TPid, {shutdown, Reason}}, #watchdog{transport = TPid, parent = Pid}) -> @@ -258,11 +266,15 @@ close({'DOWN', _, process, TPid, {shutdown, Reason}}, close(_, _) -> ok. -event(_, #watchdog{status = T}, #watchdog{status = T}) -> - ok; - -event(_, #watchdog{transport = undefined}, #watchdog{transport = undefined}) -> +event(_, + #watchdog{status = From, transport = F}, + #watchdog{status = To, transport = T}) + when F == undefined, T == undefined; %% transport not started + From == initial, To == down; %% never really left INITIAL + From == To -> %% no state transition ok; +%% Note that there is no INITIAL -> DOWN transition in RFC 3539: ours +%% is just a consequence of stop. event(Msg, #watchdog{status = From, transport = F, parent = Pid}, @@ -270,7 +282,7 @@ event(Msg, TPid = tpid(F,T), E = {[TPid | data(Msg, TPid, From, To)], From, To}, send(Pid, {watchdog, self(), E}), - ?LOG(transition, {self(), E}). + ?LOG(transition, {From, To}). data(Msg, TPid, reopen, okay) -> {recv, TPid, 'DWA', _Pkt} = Msg, %% assert @@ -313,14 +325,13 @@ code_change(_, State, _) -> %% The state transitions documented here are extracted from RFC 3539, %% the commentary is ours. -%% Service or watchdog is telling the watchdog of an accepting -%% transport to die after connect_timer expiry or reestablished -%% connection (in another transport process) respectively. -transition(close, #watchdog{status = down}) -> - {{accept, _}, _, _} = getr(restart), %% assert - stop; +%% Service is telling the watchdog of an accepting transport to die +%% following transport death in state INITIAL, or after connect_timer +%% expiry; or another watchdog is saying the same after reestablishing +%% a connection previously had by this one. transition(close, #watchdog{}) -> - ok; + {accept, _} = role(), %% assert + stop; %% Service is asking for the peer to be taken down gracefully. transition({shutdown, Pid, _}, #watchdog{parent = Pid, @@ -332,6 +343,12 @@ transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid, send(TPid, {T, self(), Reason}), S#watchdog{shutdown = true}; +%% Transport is telling us that DPA has been sent in response to DPR, +%% or that DPR has been explicitly sent: transport death should lead +%% to ours. +transition({'DPR', TPid}, #watchdog{transport = TPid} = S) -> + S#watchdog{shutdown = true}; + %% Parent process has died, transition({'DOWN', _, process, Pid, _Reason}, #watchdog{parent = Pid}) -> @@ -363,7 +380,7 @@ transition({open, TPid, Hosts, _} = Open, restrict = {_,R}, config = #config{suspect = OS}} = S) -> - case okay(getr(restart), Hosts, R) of + case okay(role(), Hosts, R) of okay -> set_watchdog(S#watchdog{status = okay, num_dwa = OS}); @@ -403,18 +420,33 @@ transition({open = Key, TPid, _Hosts, T}, %% REOPEN Connection down CloseConnection() %% SetWatchdog() DOWN +%% Transport has died after DPA or service requested termination ... transition({'DOWN', _, process, TPid, _Reason}, #watchdog{transport = TPid, shutdown = true}) -> stop; -transition({'DOWN', _, process, TPid, _Reason}, +%% ... or not. +transition({'DOWN', _, process, TPid, _Reason} = D, #watchdog{transport = TPid, - status = T} - = S) -> - set_watchdog(S#watchdog{status = ?CHOOSE(initial == T, T, down), - pending = false, - transport = undefined}); + status = T, + restrict = {_,R}} + = S0) -> + S = S0#watchdog{pending = false, + transport = undefined}, + {M,_} = role(), + + %% Close an accepting watchdog immediately if there's no + %% restriction on the number of connections to the same peer: the + %% state machine never enters state REOPEN in this case. + + if T == initial; + M == accept, not R -> + close(D, S0), + stop; + true -> + set_watchdog(S#watchdog{status = down}) + end; %% Incoming message. transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) -> @@ -422,11 +454,12 @@ transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) -> %% Current watchdog has timed out. transition({timeout, TRef, tw}, #watchdog{tref = TRef} = S) -> - set_watchdog(timeout(S)); + set_watchdog(0, timeout(S)); -%% Timer was canceled after message was already sent. -transition({timeout, _, tw}, #watchdog{}) -> - ok; +%% Message has arrived since the timer was started: subtract time +%% already elapsed from new timer. +transition({timeout, _, tw}, #watchdog{tref = T0} = S) -> + set_watchdog(diameter_lib:micro_diff(T0) div 1000, S); %% State query. transition({state, Pid}, #watchdog{status = S}) -> @@ -454,9 +487,7 @@ encode(dwr = M, Dict0, Mask) -> hop_by_hop_id = Seq}, Pkt = #diameter_packet{header = Hdr, msg = Msg}, - #diameter_packet{bin = Bin} = diameter_codec:encode(Dict0, Pkt), - Bin; - + diameter_codec:encode(Dict0, Pkt); encode(dwa, Dict0, #diameter_packet{header = H, transport_data = TD} = ReqPkt) -> @@ -471,7 +502,7 @@ encode(dwa, Dict0, #diameter_packet{header = H, transport_data = TD} %% okay/3 -okay({{accept, Ref}, _, _}, Hosts, Restrict) -> +okay({accept, Ref}, Hosts, Restrict) -> T = {?MODULE, connection, Ref, Hosts}, diameter_reg:add(T), if Restrict -> @@ -482,7 +513,7 @@ okay({{accept, Ref}, _, _}, Hosts, Restrict) -> %% Register before matching so that at least one of two registering %% processes will match the other. -okay({{connect, _}, _, _}, _, _) -> +okay({connect, _}, _, _) -> okay. %% okay/2 @@ -497,20 +528,34 @@ okay(C) -> [_|_] = [send(P, close) || {_,P} <- C, self() /= P], reopen. +%% role/0 + +role() -> + element(1, getr(restart)). + %% set_watchdog/1 -set_watchdog(#watchdog{tw = TwInit, - tref = TRef} - = S) -> - cancel(TRef), - S#watchdog{tref = erlang:start_timer(tw(TwInit), self(), tw)}; -set_watchdog(stop = No) -> - No. +%% Timer not yet set. +set_watchdog(#watchdog{tref = undefined} = S) -> + set_watchdog(0, S); -cancel(undefined) -> - ok; -cancel(TRef) -> - erlang:cancel_timer(TRef). +%% Timer already set: start at new one only at expiry. +set_watchdog(#watchdog{} = S) -> + S#watchdog{tref = diameter_lib:now()}. + +%% set_watchdog/2 + +set_watchdog(_, stop = No) -> + No; + +set_watchdog(Ms, #watchdog{tw = TwInit} = S) -> + S#watchdog{tref = erlang:start_timer(tw(TwInit, Ms), self(), tw)}. + +%% A callback could return anything, so ensure the result isn't +%% negative. Don't prevent abuse, even though the smallest valid +%% timeout is 4000. +tw(TwInit, Ms) -> + max(tw(TwInit) - Ms, 0). tw(T) when is_integer(T), T >= 6000 -> @@ -525,10 +570,14 @@ send_watchdog(#watchdog{pending = false, dictionary = Dict0, sequence = Mask} = S) -> - send(TPid, {send, encode(dwr, Dict0, Mask)}), + #diameter_packet{bin = Bin} = EPkt = encode(dwr, Dict0, Mask), + diameter_traffic:incr(send, EPkt, TPid, Dict0), + send(TPid, {send, Bin}), ?LOG(send, 'DWR'), S#watchdog{pending = true}. +%% Don't count encode errors since we don't expect any on DWR/DWA. + %% recv/3 recv(Name, Pkt, S) -> @@ -545,16 +594,40 @@ recv(Name, Pkt, S) -> rcv('DWR', Pkt, #watchdog{transport = TPid, dictionary = Dict0}) -> - send(TPid, {send, encode(dwa, Dict0, Pkt)}), + ?LOG(recv, 'DWR'), + DPkt = diameter_codec:decode(Dict0, Pkt), + diameter_traffic:incr(recv, DPkt, TPid, Dict0), + diameter_traffic:incr_error(recv, DPkt, TPid, Dict0), + #diameter_packet{header = H, + transport_data = T, + bin = Bin} + = EPkt + = encode(dwa, Dict0, Pkt), + diameter_traffic:incr(send, EPkt, TPid, Dict0), + diameter_traffic:incr_rc(send, EPkt, TPid, Dict0), + + %% Strip potentially large message terms. + send(TPid, {send, #diameter_packet{header = H, + transport_data = T, + bin = Bin}}), ?LOG(send, 'DWA'); +rcv('DWA', Pkt, #watchdog{transport = TPid, + dictionary = Dict0}) -> + ?LOG(recv, 'DWA'), + diameter_traffic:incr(recv, Pkt, TPid, Dict0), + diameter_traffic:incr_rc(recv, + diameter_codec:decode(Dict0, Pkt), + TPid, + Dict0); + rcv(N, _, _) when N == 'CER'; N == 'CEA'; - N == 'DWA'; - N == 'DPR'; - N == 'DPA' -> + N == 'DPR' -> false; +%% DPR can be sent explicitly with diameter:call/4. Only the +%% corresponding DPAs arrive here. rcv(_, Pkt, #watchdog{transport = TPid, dictionary = Dict0, @@ -740,7 +813,7 @@ timeout(#watchdog{status = T} = S) restart(#watchdog{transport = undefined} = S) -> restart(getr(restart), S); -restart(S) -> +restart(S) -> %% reconnect has won race with timeout S. %% restart/2 @@ -755,25 +828,25 @@ restart(S) -> %% state down rather then initial when receiving notification of an %% open connection. -restart({{connect, _} = T, Opts, Svc}, +restart({{connect, _} = T, Opts, Svc, SvcOpts}, #watchdog{parent = Pid, - sequence = Mask, restrict = {R,_}, dictionary = Dict0} = S) -> send(Pid, {reconnect, self()}), Nodes = restrict_nodes(R), - S#watchdog{transport = start(T, Opts, Mask, Nodes, Dict0, Svc), + S#watchdog{transport = start(T, Opts, SvcOpts, Nodes, Dict0, Svc), restrict = {R, lists:member(node(), Nodes)}}; %% No restriction on the number of connections to the same peer: just %% die. Note that a state machine never enters state REOPEN in this %% case. -restart({{accept, _}, _, _}, #watchdog{restrict = {_, false}}) -> +restart({{accept, _}, _, _, _}, #watchdog{restrict = {_, false}}) -> stop; -%% Otherwise hang around until told to die. -restart({{accept, _}, _, _}, S) -> +%% Otherwise hang around until told to die, either by the service or +%% by another watchdog. +restart({{accept, _}, _, _, _}, S) -> S. %% Don't currently use Opts/Svc in the accept case. |