diff options
Diffstat (limited to 'lib/diameter/src/base/diameter_watchdog.erl')
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 170 |
1 files changed, 134 insertions, 36 deletions
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index d7474e5c56..243ad0a986 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -43,20 +43,24 @@ -include("diameter_internal.hrl"). -define(DEFAULT_TW_INIT, 30000). %% RFC 3539 ch 3.4.1 +-define(NOMASK, {0,32}). %% default sequence mask -record(watchdog, {%% PCB - Peer Control Block; see RFC 3539, Appendix A status = initial :: initial | okay | suspect | down | reopen, - pending = false :: boolean(), + pending = false :: boolean(), %% DWA tw :: 6000..16#FFFFFFFF | {module(), atom(), list()}, %% {M,F,A} -> integer() >= 0 num_dwa = 0 :: -1 | non_neg_integer(), %% number of DWAs received during reopen %% end PCB - parent = self() :: pid(), - transport :: pid() | undefined, + parent = self() :: pid(), %% service process + transport :: pid() | undefined, %% peer_fsm process tref :: reference(), %% reference for current watchdog timer - message_data}). %% term passed into diameter_service with message + message_data, %% term passed into diameter_service with message + sequence :: diameter:sequence(), %% mask + restrict :: {diameter:restriction(), boolean()}, + shutdown = false :: boolean()}). %% start/2 %% @@ -118,12 +122,23 @@ make_state({T, Pid, {RecvData, random:seed(now()), putr(restart, {T, Opts, Svc}), %% save seeing it in trace putr(dwr, dwr(Caps)), %% + {_,_} = Mask = call(Pid, sequence), + Restrict = call(Pid, restriction), + Nodes = restrict_nodes(Restrict), #watchdog{parent = Pid, - transport = monitor(diameter_peer_fsm:start(T, Opts, Svc)), + transport = monitor(diameter_peer_fsm:start(T, + Opts, + {Mask, Nodes, Svc})), tw = proplists:get_value(watchdog_timer, Opts, ?DEFAULT_TW_INIT), - message_data = {RecvData, SvcName, Apps}}. + message_data = {RecvData, SvcName, Apps, Mask}, + sequence = Mask, + restrict = {Restrict, lists:member(node(), Nodes)}}. + +%% Retrieve the sequence mask from the parent from the parent, rather +%% than having it passed into init/1, for upgrade reasons: the call to +%% diameter_service:receive_message/3 passes back the mask. %% handle_call/3 @@ -137,7 +152,7 @@ handle_cast(_, State) -> %% handle_info/2 -handle_info(T, State) -> +handle_info(T, #watchdog{} = State) -> case transition(T, State) of ok -> {noreply, State}; @@ -148,7 +163,14 @@ handle_info(T, State) -> ?LOG(stop, T), event(State, State#watchdog{status = down}), {stop, {shutdown, T}, State} - end. + end; + +handle_info(T, S) -> + handle_info(T, upgrade(S)). + +upgrade(S) -> + #watchdog{} = list_to_tuple(tuple_to_list(S) + ++ [?NOMASK, {nodes, true}, false]). event(#watchdog{status = T}, #watchdog{status = T}) -> ok; @@ -205,9 +227,10 @@ transition({shutdown, Pid}, #watchdog{parent = Pid, down = S, %% sanity check stop; transition({shutdown = T, Pid}, #watchdog{parent = Pid, - transport = TPid}) -> + transport = TPid} + = S) -> TPid ! {T, self()}, - ok; + S#watchdog{shutdown = true}; %% Parent process has died, transition({'DOWN', _, process, Pid, _Reason}, @@ -241,9 +264,10 @@ transition({close, TPid, _Reason}, #watchdog{transport = TPid}) -> transition({open, TPid, Hosts, T} = Open, #watchdog{transport = TPid, status = initial, - parent = Pid} + parent = Pid, + restrict = {_, R}} = S) -> - case okay(getr(restart), Hosts) of + case okay(getr(restart), Hosts, R) of okay -> open(Pid, {TPid, T}), set_watchdog(S#watchdog{status = okay}); @@ -258,12 +282,15 @@ transition({open, TPid, Hosts, T} = Open, transition({open = P, TPid, _Hosts, T}, #watchdog{transport = TPid, + parent = Pid, status = down} = S) -> %% Store the info we need to notify the parent to reopen the %% connection after the requisite DWA's are received, at which - %% time we eraser(open). + %% time we eraser(open). The reopen message is a later addition, + %% to communicate the new capabilities as soon as they're known. putr(P, {TPid, T}), + Pid ! {reopen, self(), {TPid, T}}, set_watchdog(send_watchdog(S#watchdog{status = reopen, num_dwa = 0})); @@ -277,7 +304,10 @@ transition({open = P, TPid, _Hosts, T}, transition({'DOWN', _, process, TPid, _}, #watchdog{transport = TPid, - status = initial}) -> + status = S, + shutdown = D}) + when S == initial; + D -> stop; transition({'DOWN', _, process, TPid, _}, @@ -312,6 +342,15 @@ transition({state, Pid}, #watchdog{status = S}) -> %% =========================================================================== +%% Only call "upwards", to the parent service. +call(Pid, Req) -> + try + gen_server:call(Pid, Req, infinity) + catch + exit: Reason -> + exit({shutdown, {Req, Reason}}) + end. + monitor(Pid) -> erlang:monitor(process, Pid), Pid. @@ -325,26 +364,36 @@ getr(Key) -> eraser(Key) -> erase({?MODULE, Key}). -%% encode/1 +%% encode/2 -encode(Msg) -> - #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Msg), +encode(Msg, Mask) -> + Seq = diameter_session:sequence(Mask), + Hdr = #diameter_header{version = ?DIAMETER_VERSION, + end_to_end_id = Seq, + hop_by_hop_id = Seq}, + Pkt = #diameter_packet{header = Hdr, + msg = Msg}, + #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt), Bin. -%% okay/2 +%% okay/3 -okay({{accept, Ref}, _, _}, Hosts) -> +okay({{accept, Ref}, _, _}, Hosts, Restrict) -> T = {?MODULE, connection, Ref, Hosts}, diameter_reg:add(T), - okay(diameter_reg:match(T)); + if Restrict -> + okay(diameter_reg:match(T)); + true -> + okay + end; %% Register before matching so that at least one of two registering -%% processes will match the other. (Which can't happen as long as -%% diameter_peer_fsm guarantees at most one open connection to the same -%% peer.) +%% processes will match the other. -okay({{connect, _}, _, _}, _) -> +okay({{connect, _}, _, _}, _, _) -> okay. +%% okay/2 + %% The peer hasn't been connected recently ... okay([{_,P}]) -> P = self(), %% assert @@ -400,9 +449,10 @@ close(#watchdog{parent = Pid}) -> %% send_watchdog/1 send_watchdog(#watchdog{pending = false, - transport = TPid} + transport = TPid, + sequence = Mask} = S) -> - TPid ! {send, encode(getr(dwr))}, + TPid ! {send, encode(getr(dwr), Mask)}, ?LOG(send, 'DWR'), S#watchdog{pending = true}. @@ -437,6 +487,14 @@ throwaway(S) -> throw({?MODULE, throwaway, S}). %% rcv/2 +%% +%% The lack of Hop-by-Hop and End-to-End Identifiers checks in a +%% received DWA is intentional. The purpose of the message is to +%% demonstrate life but a peer that consistently bungles it by sending +%% the wrong identifiers causes the connection to toggle between OPEN +%% and SUSPECT, with failover and failback as result, despite there +%% being no real problem with connectivity. Thus, relax and accept any +%% incoming DWA as being in response to an outgoing DWR. %% INITIAL Receive DWA Pending = FALSE %% Throwaway() INITIAL @@ -555,7 +613,7 @@ timeout(#watchdog{status = T, = S) when T == suspect; T == reopen, P, N < 0 -> - exit(TPid, shutdown), + exit(TPid, {shutdown, watchdog_timeout}), close(S), S#watchdog{status = down}; @@ -600,19 +658,40 @@ restart(#watchdog{transport = undefined} = S) -> restart(S) -> S. +%% restart/2 +%% %% Only restart the transport in the connecting case. For an accepting -%% transport, we've registered the peer connection when leaving state -%% initial and this is used by a new accepting process to realize that -%% it's actually in state down rather then initial when receiving -%% notification of an open connection. - -restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid} = S) -> +%% transport, there's no guarantee that an accepted connection in a +%% restarted transport if from the peer we've lost contact with so +%% have to be prepared for another watchdog to handle it. This is what +%% the diameter_reg registration in this module is for: the peer +%% connection is registered when leaving state initial and this is +%% used by a new accepting watchdog to realize that it's actually in +%% state down rather then initial when receiving notification of an +%% open connection. + +restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid, + sequence = Mask, + restrict = {R,_}} + = S) -> Pid ! {reconnect, self()}, - S#watchdog{transport = monitor(diameter_peer_fsm:start(T, Opts, Svc))}; + Nodes = restrict_nodes(R), + S#watchdog{transport = monitor(diameter_peer_fsm:start(T, + Opts, + {Mask, Nodes, Svc})), + restrict = {R, lists:member(node(), Nodes)}}; + +%% No restriction on the number of connections to the same peer: just +%% die. Note that a state machine never enters state REOPEN in this +%% case. +restart({{accept, _}, _, _}, #watchdog{restrict = {_, false}}) -> + stop; + +%% Otherwise hang around until told to die. restart({{accept, _}, _, _}, S) -> S. -%% Don't currently use Opts/Svc in the accept case but having them in -%% the process dictionary is helpful if the process dies unexpectedly. + +%% Don't currently use Opts/Svc in the accept case. %% dwr/1 @@ -622,3 +701,22 @@ dwr(#diameter_caps{origin_host = OH, ['DWR', {'Origin-Host', OH}, {'Origin-Realm', OR}, {'Origin-State-Id', OSI}]. + +%% restrict_nodes/1 + +restrict_nodes(false) -> + []; + +restrict_nodes(nodes) -> + [node() | nodes()]; + +restrict_nodes(node) -> + [node()]; + +restrict_nodes(Nodes) + when [] == Nodes; + is_atom(hd(Nodes)) -> + Nodes; + +restrict_nodes(F) -> + diameter_lib:eval(F). |