diff options
Diffstat (limited to 'lib/diameter/src/base/diameter_watchdog.erl')
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 187 |
1 files changed, 107 insertions, 80 deletions
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 2ba60a65fb..a63425d92a 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -50,10 +50,6 @@ -define(IS_NATURAL(N), (is_integer(N) andalso 0 =< N)). --record(config, - {suspect = 1 :: non_neg_integer(), %% OKAY -> SUSPECT - okay = 3 :: non_neg_integer()}). %% REOPEN -> OKAY - -record(watchdog, {%% PCB - Peer Control Block; see RFC 3539, Appendix A status = initial :: initial | okay | suspect | down | reopen, @@ -70,12 +66,19 @@ | integer() %% monotonic time | undefined, dictionary :: module(), %% common dictionary - receive_data :: term(), - %% term passed into diameter_service with incoming message - sequence :: diameter:sequence(), %% mask - restrict :: {diameter:restriction(), boolean()}, - shutdown = false :: boolean(), - config :: #config{}}). + receive_data :: term(), %% term passed with incoming message + config :: #{sequence := diameter:sequence(), %% mask + restrict_connections := diameter:restriction(), + restrict := boolean(), + suspect := non_neg_integer(), %% OKAY -> SUSPECT + okay := non_neg_integer()}, %% REOPEN -> OKAY + codec :: #{string_decode := false, + strict_mbit := boolean(), + failed_avp := false, + rfc := 3588 | 6733, + ordered_encode := false, + incoming_maxlen := diameter:message_length()}, + shutdown = false :: boolean()}). %% --------------------------------------------------------------------------- %% start/2 @@ -85,12 +88,12 @@ %% reason. %% --------------------------------------------------------------------------- --spec start(Type, {RecvData, [Opt], SvcOpts, #diameter_service{}}) +-spec start(Type, {[Opt], SvcOpts, RecvData, #diameter_service{}}) -> {reference(), pid()} when Type :: {connect|accept, diameter:transport_ref()}, - RecvData :: term(), Opt :: diameter:transport_opt(), - SvcOpts :: [diameter:service_opt()]. + SvcOpts :: map(), + RecvData :: term(). start({_,_} = Type, T) -> Ack = make_ref(), @@ -117,22 +120,28 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({Ack, T, Pid, {RecvData, - Opts, - SvcOpts, +i({Ack, T, Pid, {Opts, + #{restrict_connections := Restrict} + = SvcOpts0, + RecvData, #diameter_service{applications = Apps, capabilities = Caps} = Svc}}) -> monitor(process, Pid), wait(Ack, Pid), + + Dict0 = common_dictionary(Apps), + SvcOpts = SvcOpts0#{rfc => rfc(Dict0)}, putr(restart, {T, Opts, Svc, SvcOpts}), %% save seeing it in trace putr(dwr, dwr(Caps)), %% - {_,_} = Mask = proplists:get_value(sequence, SvcOpts), - Restrict = proplists:get_value(restrict_connections, SvcOpts), Nodes = restrict_nodes(Restrict), - Dict0 = common_dictionary(Apps), - diameter_codec:setopts([{common_dictionary, Dict0}, - {string_decode, false}]), + CodecKeys = [string_decode, + strict_mbit, + incoming_maxlen, + spawn_opt, + rfc, + ordered_encode], + #watchdog{parent = Pid, transport = start(T, Opts, SvcOpts, Nodes, Dict0, Svc), tw = proplists:get_value(watchdog_timer, @@ -140,9 +149,14 @@ i({Ack, T, Pid, {RecvData, ?DEFAULT_TW_INIT), receive_data = RecvData, dictionary = Dict0, - sequence = Mask, - restrict = {Restrict, lists:member(node(), Nodes)}, - config = config(Opts)}. + config = + maps:without(CodecKeys, + config(SvcOpts#{restrict => restrict(Nodes), + suspect => 1, + okay => 3}, + Opts)), + codec = maps:with(CodecKeys, SvcOpts#{string_decode := false, + ordered_encode => false})}. wait(Ref, Pid) -> receive @@ -152,22 +166,31 @@ wait(Ref, Pid) -> exit({shutdown, D}) end. -%% config/1 +%% Regard anything but the generated RFC 3588 dictionary as modern. +%% This affects the interpretation of defaults during the decode +%% of values of type DiameterURI, this having changed from RFC 3588. +%% (So much for backwards compatibility.) +rfc(?BASE) -> + 3588; +rfc(_) -> + 6733. + +%% config/2 %% %% Could also configure counts for SUSPECT to DOWN and REOPEN to DOWN, %% but don't. -config(Opts) -> +config(Map, Opts) -> Config = proplists:get_value(watchdog_config, Opts, []), - lists:foldl(fun config/2, #config{}, Config). + lists:foldl(fun cfg/2, Map, Config). -config({suspect, N}, Rec) +cfg({suspect, N}, Map) when ?IS_NATURAL(N) -> - Rec#config{suspect = N}; + Map#{suspect := N}; -config({okay, N}, Rec) +cfg({okay, N}, Map) when ?IS_NATURAL(N) -> - Rec#config{okay = N}. + Map#{okay := N}. %% start/6 @@ -283,7 +306,7 @@ event(Msg, ?LOG(transition, {From, To}). data(Msg, TPid, reopen, okay) -> - {recv, TPid, 'DWA', _Pkt} = Msg, %% assert + {recv, TPid, _, 'DWA', _Pkt} = Msg, %% assert {TPid, T} = eraser(open), [T]; @@ -302,6 +325,8 @@ tpid(_, Pid) tpid(Pid, _) -> Pid. +%% send/2 + send(Pid, T) -> Pid ! T. @@ -375,8 +400,8 @@ transition({accepted = T, TPid}, #watchdog{transport = TPid, transition({open, TPid, Hosts, _} = Open, #watchdog{transport = TPid, status = initial, - restrict = {_,R}, - config = #config{suspect = OS}} + config = #{restrict := R, + suspect := OS}} = S) -> case okay(role(), Hosts, R) of okay -> @@ -394,8 +419,8 @@ transition({open, TPid, Hosts, _} = Open, transition({open = Key, TPid, _Hosts, T}, #watchdog{transport = TPid, status = down, - config = #config{suspect = OS, - okay = RO}} + config = #{suspect := OS, + okay := RO}} = S) -> case RO of 0 -> %% non-standard: skip REOPEN @@ -428,7 +453,7 @@ transition({'DOWN', _, process, TPid, _Reason}, transition({'DOWN', _, process, TPid, _Reason} = D, #watchdog{transport = TPid, status = T, - restrict = {_,R}} + config = #{restrict := R}} = S0) -> S = S0#watchdog{pending = false, transport = undefined}, @@ -447,12 +472,15 @@ transition({'DOWN', _, process, TPid, _Reason} = D, end; %% Incoming message. -transition({recv, TPid, Name, PktT}, #watchdog{transport = TPid} = S) -> - try - incoming(Name, PktT, S) - catch +transition({recv, TPid, Route, Name, Pkt}, + #watchdog{transport = TPid} + = S) -> + try incoming(Route, Name, Pkt, S) of #watchdog{dictionary = Dict0, receive_data = T} = NS -> - diameter_traffic:receive_message(TPid, PktT, Dict0, T), + diameter_traffic:receive_message(TPid, Route, Pkt, Dict0, T), + NS + catch + #watchdog{} = NS -> NS end; @@ -481,9 +509,9 @@ getr(Key) -> eraser(Key) -> erase({?MODULE, Key}). -%% encode/3 +%% encode/4 -encode(dwr = M, Dict0, Mask) -> +encode(dwr = M, Dict0, Opts, Mask) -> Msg = getr(M), Seq = diameter_session:sequence(Mask), Hdr = #diameter_header{version = ?DIAMETER_VERSION, @@ -491,10 +519,10 @@ encode(dwr = M, Dict0, Mask) -> hop_by_hop_id = Seq}, Pkt = #diameter_packet{header = Hdr, msg = Msg}, - diameter_codec:encode(Dict0, Pkt); + diameter_codec:encode(Dict0, Opts, Pkt); -encode(dwa, Dict0, #diameter_packet{header = H, transport_data = TD} - = ReqPkt) -> +encode(dwa, Dict0, Opts, #diameter_packet{header = H, transport_data = TD} + = ReqPkt) -> AnsPkt = #diameter_packet{header = H#diameter_header{is_request = false, is_error = undefined, @@ -502,7 +530,7 @@ encode(dwa, Dict0, #diameter_packet{header = H, transport_data = TD} msg = dwa(ReqPkt), transport_data = TD}, - diameter_codec:encode(Dict0, AnsPkt). + diameter_codec:encode(Dict0, Opts, AnsPkt). %% okay/3 @@ -572,9 +600,10 @@ tw({M,F,A}) -> send_watchdog(#watchdog{pending = false, transport = TPid, dictionary = Dict0, - sequence = Mask} + config = #{sequence := Mask}, + codec = Opts} = S) -> - #diameter_packet{bin = Bin} = EPkt = encode(dwr, Dict0, Mask), + #diameter_packet{bin = Bin} = EPkt = encode(dwr, Dict0, Opts, Mask), diameter_traffic:incr(send, EPkt, TPid, Dict0), send(TPid, {send, Bin}), ?LOG(send, 'DWR'), @@ -582,41 +611,32 @@ send_watchdog(#watchdog{pending = false, %% Don't count encode errors since we don't expect any on DWR/DWA. -%% incoming/3 - -incoming(Name, {Pkt, NPid}, S) -> - NS = recv(Name, Pkt, S), - NPid ! {diameter, discard}, - NS; +%% incoming/4 -incoming(Name, Pkt, S) -> - recv(Name, Pkt, S). - -%% recv/3 - -recv(Name, Pkt, S) -> - try rcv(Name, Pkt, rcv(Name, S)) of - #watchdog{} = NS -> - throw(NS) +incoming(Route, Name, Pkt, S) -> + try rcv(Name, S) of + NS -> rcv(Name, Pkt, NS) catch - #watchdog{} = NS -> %% throwaway - NS + #watchdog{transport = TPid} = NS when Route -> %% incoming request + send(TPid, {send, false}), %% requiring ack + throw(NS) end. %% rcv/3 rcv('DWR', Pkt, #watchdog{transport = TPid, - dictionary = Dict0} + dictionary = Dict0, + codec = Opts} = S) -> ?LOG(recv, 'DWR'), - DPkt = diameter_codec:decode(Dict0, Pkt), + DPkt = diameter_codec:decode(Dict0, Opts, Pkt), diameter_traffic:incr(recv, DPkt, TPid, Dict0), diameter_traffic:incr_error(recv, DPkt, TPid, Dict0), #diameter_packet{header = H, transport_data = T, bin = Bin} = EPkt - = encode(dwa, Dict0, Pkt), + = encode(dwa, Dict0, Opts, Pkt), diameter_traffic:incr(send, EPkt, TPid, Dict0), diameter_traffic:incr_rc(send, EPkt, TPid, Dict0), @@ -628,12 +648,13 @@ rcv('DWR', Pkt, #watchdog{transport = TPid, throw(S); rcv('DWA', Pkt, #watchdog{transport = TPid, - dictionary = Dict0} + dictionary = Dict0, + codec = Opts} = S) -> ?LOG(recv, 'DWA'), diameter_traffic:incr(recv, Pkt, TPid, Dict0), diameter_traffic:incr_rc(recv, - diameter_codec:decode(Dict0, Pkt), + diameter_codec:decode(Dict0, Opts, Pkt), TPid, Dict0), throw(S); @@ -695,12 +716,12 @@ rcv(_, #watchdog{status = okay} = S) -> %% SUSPECT Receive non-DWA Failback() %% SetWatchdog() OKAY -rcv('DWA', #watchdog{status = suspect, config = #config{suspect = OS}} = S) -> +rcv('DWA', #watchdog{status = suspect, config = #{suspect := OS}} = S) -> set_watchdog(S#watchdog{status = okay, num_dwa = OS, pending = false}); -rcv(_, #watchdog{status = suspect, config = #config{suspect = OS}} = S) -> +rcv(_, #watchdog{status = suspect, config = #{suspect := OS}} = S) -> set_watchdog(S#watchdog{status = okay, num_dwa = OS}); @@ -710,8 +731,8 @@ rcv(_, #watchdog{status = suspect, config = #config{suspect = OS}} = S) -> rcv('DWA', #watchdog{status = reopen, num_dwa = N, - config = #config{suspect = OS, - okay = RO}} + config = #{suspect := OS, + okay := RO}} = S) when N+1 == RO -> S#watchdog{status = okay, @@ -842,18 +863,19 @@ restart(S) -> %% reconnect has won race with timeout restart({{connect, _} = T, Opts, Svc, SvcOpts}, #watchdog{parent = Pid, - restrict = {R,_}, + config = #{restrict_connections := R} + = M, dictionary = Dict0} = S) -> send(Pid, {reconnect, self()}), Nodes = restrict_nodes(R), S#watchdog{transport = start(T, Opts, SvcOpts, Nodes, Dict0, Svc), - restrict = {R, lists:member(node(), Nodes)}}; + config = M#{restrict => restrict(Nodes)}}; %% No restriction on the number of connections to the same peer: just %% die. Note that a state machine never enters state REOPEN in this %% case. -restart({{accept, _}, _, _, _}, #watchdog{restrict = {_, false}}) -> +restart({{accept, _}, _, _, _}, #watchdog{config = #{restrict := false}}) -> stop; %% Otherwise hang around until told to die, either by the service or @@ -897,3 +919,8 @@ restrict_nodes(Nodes) restrict_nodes(F) -> diameter_lib:eval(F). + +%% restrict/1 + +restrict(Nodes) -> + lists:member(node(), Nodes). |