From 0b7c87dc62d845d059d250ba152f16e94c660e55 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 10 Oct 2012 10:59:08 +0200 Subject: Implement service_opt() restrict_connections --- lib/diameter/src/base/diameter_watchdog.erl | 99 +++++++++++++++++++++-------- 1 file changed, 73 insertions(+), 26 deletions(-) (limited to 'lib/diameter/src/base/diameter_watchdog.erl') diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index b37a1a10e9..d814f1afe2 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -57,8 +57,9 @@ parent = self() :: pid(), transport :: pid() | undefined, tref :: reference(), %% reference for current watchdog timer - message_data, %% term passed into diameter_service with message - sequence :: diameter:sequence()}). %% mask + message_data, %% term passed into diameter_service with message + sequence :: diameter:sequence(), %% mask + restrict :: {diameter:restriction(), boolean()}}). %% start/2 %% @@ -121,15 +122,18 @@ make_state({T, Pid, {RecvData, putr(restart, {T, Opts, Svc}), %% save seeing it in trace putr(dwr, dwr(Caps)), %% {_,_} = Mask = call(Pid, sequence), + Restrict = call(Pid, restriction), + Nodes = restrict_nodes(Restrict), #watchdog{parent = Pid, transport = monitor(diameter_peer_fsm:start(T, - Opts, - {Mask, Svc})), + Opts, + {Mask, Nodes, Svc})), tw = proplists:get_value(watchdog_timer, Opts, ?DEFAULT_TW_INIT), message_data = {RecvData, SvcName, Apps, Mask}, - sequence = Mask}. + sequence = Mask, + restrict = {Restrict, lists:member(node(), Nodes)}}. %% Retrieve the sequence mask from the parent from the parent, rather %% than having it passed into init/1, for upgrade reasons: the call to @@ -160,9 +164,11 @@ handle_info(T, #watchdog{} = State) -> {stop, {shutdown, T}, State} end; -handle_info(T, S) -> %% upgrade - handle_info(T, #watchdog{} = list_to_tuple(tuple_to_list(S) - ++ [?NOMASK])). +handle_info(T, S) -> + handle_info(T, upgrade(S)). + +upgrade(S) -> + #watchdog{} = list_to_tuple(tuple_to_list(S) ++ [?NOMASK, {nodes, true}]). event(#watchdog{status = T}, #watchdog{status = T}) -> ok; @@ -255,9 +261,10 @@ transition({close, TPid, _Reason}, #watchdog{transport = TPid}) -> transition({open, TPid, Hosts, T} = Open, #watchdog{transport = TPid, status = initial, - parent = Pid} + parent = Pid, + restrict = {_, R}} = S) -> - case okay(getr(restart), Hosts) of + case okay(getr(restart), Hosts, R) of okay -> open(Pid, {TPid, T}), set_watchdog(S#watchdog{status = okay}); @@ -363,20 +370,24 @@ encode(Msg, Mask) -> #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt), Bin. -%% okay/2 +%% okay/3 -okay({{accept, Ref}, _, _}, Hosts) -> +okay({{accept, Ref}, _, _}, Hosts, Restrict) -> T = {?MODULE, connection, Ref, Hosts}, diameter_reg:add(T), - okay(diameter_reg:match(T)); + if Restrict -> + okay(diameter_reg:match(T)); + true -> + okay + end; %% Register before matching so that at least one of two registering -%% processes will match the other. (Which can't happen as long as -%% diameter_peer_fsm guarantees at most one open connection to the same -%% peer.) +%% processes will match the other. -okay({{connect, _}, _, _}, _) -> +okay({{connect, _}, _, _}, _, _) -> okay. +%% okay/2 + %% The peer hasn't been connected recently ... okay([{_,P}]) -> P = self(), %% assert @@ -633,23 +644,40 @@ restart(#watchdog{transport = undefined} = S) -> restart(S) -> S. +%% restart/2 +%% %% Only restart the transport in the connecting case. For an accepting -%% transport, we've registered the peer connection when leaving state -%% initial and this is used by a new accepting process to realize that -%% it's actually in state down rather then initial when receiving -%% notification of an open connection. +%% transport, there's no guarantee that an accepted connection in a +%% restarted transport if from the peer we've lost contact with so +%% have to be prepared for another watchdog to handle it. This is what +%% the diameter_reg registration in this module is for: the peer +%% connection is registered when leaving state initial and this is +%% used by a new accepting watchdog to realize that it's actually in +%% state down rather then initial when receiving notification of an +%% open connection. restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid, - sequence = Mask} + sequence = Mask, + restrict = {R,_}} = S) -> Pid ! {reconnect, self()}, + Nodes = restrict_nodes(R), S#watchdog{transport = monitor(diameter_peer_fsm:start(T, - Opts, - {Mask, Svc}))}; + Opts, + {Mask, Nodes, Svc})), + restrict = {R, lists:member(node(), Nodes)}}; + +%% No restriction on the number of connections to the same peer: just +%% die. Note that a state machine never enters state REOPEN in this +%% case. +restart({{accept, _}, _, _}, #watchdog{restrict = {_, false}}) -> + stop; + +%% Otherwise hang around until told to die. restart({{accept, _}, _, _}, S) -> S. -%% Don't currently use Opts/Svc in the accept case but having them in -%% the process dictionary is helpful if the process dies unexpectedly. + +%% Don't currently use Opts/Svc in the accept case. %% dwr/1 @@ -659,3 +687,22 @@ dwr(#diameter_caps{origin_host = OH, ['DWR', {'Origin-Host', OH}, {'Origin-Realm', OR}, {'Origin-State-Id', OSI}]. + +%% restrict_nodes/1 + +restrict_nodes(false) -> + []; + +restrict_nodes(nodes) -> + [node() | nodes()]; + +restrict_nodes(node) -> + [node()]; + +restrict_nodes(Nodes) + when [] == Nodes; + is_atom(hd(Nodes)) -> + Nodes; + +restrict_nodes(F) -> + diameter_lib:eval(F). -- cgit v1.2.3