aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_watchdog.erl
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2012-10-10 10:59:08 +0200
committerAnders Svensson <[email protected]>2012-11-05 11:54:30 +0100
commit0b7c87dc62d845d059d250ba152f16e94c660e55 (patch)
tree84e7f32c851cca24ad39b5b875d284e2794cee3d /lib/diameter/src/base/diameter_watchdog.erl
parentf3ea0395506e7e80f9efb53d8c96c28bd288a066 (diff)
downloadotp-0b7c87dc62d845d059d250ba152f16e94c660e55.tar.gz
otp-0b7c87dc62d845d059d250ba152f16e94c660e55.tar.bz2
otp-0b7c87dc62d845d059d250ba152f16e94c660e55.zip
Implement service_opt() restrict_connections
Diffstat (limited to 'lib/diameter/src/base/diameter_watchdog.erl')
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl99
1 files changed, 73 insertions, 26 deletions
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index b37a1a10e9..d814f1afe2 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -57,8 +57,9 @@
parent = self() :: pid(),
transport :: pid() | undefined,
tref :: reference(), %% reference for current watchdog timer
- message_data, %% term passed into diameter_service with message
- sequence :: diameter:sequence()}). %% mask
+ message_data, %% term passed into diameter_service with message
+ sequence :: diameter:sequence(), %% mask
+ restrict :: {diameter:restriction(), boolean()}}).
%% start/2
%%
@@ -121,15 +122,18 @@ make_state({T, Pid, {RecvData,
putr(restart, {T, Opts, Svc}), %% save seeing it in trace
putr(dwr, dwr(Caps)), %%
{_,_} = Mask = call(Pid, sequence),
+ Restrict = call(Pid, restriction),
+ Nodes = restrict_nodes(Restrict),
#watchdog{parent = Pid,
transport = monitor(diameter_peer_fsm:start(T,
- Opts,
- {Mask, Svc})),
+ Opts,
+ {Mask, Nodes, Svc})),
tw = proplists:get_value(watchdog_timer,
Opts,
?DEFAULT_TW_INIT),
message_data = {RecvData, SvcName, Apps, Mask},
- sequence = Mask}.
+ sequence = Mask,
+ restrict = {Restrict, lists:member(node(), Nodes)}}.
%% Retrieve the sequence mask from the parent from the parent, rather
%% than having it passed into init/1, for upgrade reasons: the call to
@@ -160,9 +164,11 @@ handle_info(T, #watchdog{} = State) ->
{stop, {shutdown, T}, State}
end;
-handle_info(T, S) -> %% upgrade
- handle_info(T, #watchdog{} = list_to_tuple(tuple_to_list(S)
- ++ [?NOMASK])).
+handle_info(T, S) ->
+ handle_info(T, upgrade(S)).
+
+upgrade(S) ->
+ #watchdog{} = list_to_tuple(tuple_to_list(S) ++ [?NOMASK, {nodes, true}]).
event(#watchdog{status = T}, #watchdog{status = T}) ->
ok;
@@ -255,9 +261,10 @@ transition({close, TPid, _Reason}, #watchdog{transport = TPid}) ->
transition({open, TPid, Hosts, T} = Open,
#watchdog{transport = TPid,
status = initial,
- parent = Pid}
+ parent = Pid,
+ restrict = {_, R}}
= S) ->
- case okay(getr(restart), Hosts) of
+ case okay(getr(restart), Hosts, R) of
okay ->
open(Pid, {TPid, T}),
set_watchdog(S#watchdog{status = okay});
@@ -363,20 +370,24 @@ encode(Msg, Mask) ->
#diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt),
Bin.
-%% okay/2
+%% okay/3
-okay({{accept, Ref}, _, _}, Hosts) ->
+okay({{accept, Ref}, _, _}, Hosts, Restrict) ->
T = {?MODULE, connection, Ref, Hosts},
diameter_reg:add(T),
- okay(diameter_reg:match(T));
+ if Restrict ->
+ okay(diameter_reg:match(T));
+ true ->
+ okay
+ end;
%% Register before matching so that at least one of two registering
-%% processes will match the other. (Which can't happen as long as
-%% diameter_peer_fsm guarantees at most one open connection to the same
-%% peer.)
+%% processes will match the other.
-okay({{connect, _}, _, _}, _) ->
+okay({{connect, _}, _, _}, _, _) ->
okay.
+%% okay/2
+
%% The peer hasn't been connected recently ...
okay([{_,P}]) ->
P = self(), %% assert
@@ -633,23 +644,40 @@ restart(#watchdog{transport = undefined} = S) ->
restart(S) ->
S.
+%% restart/2
+%%
%% Only restart the transport in the connecting case. For an accepting
-%% transport, we've registered the peer connection when leaving state
-%% initial and this is used by a new accepting process to realize that
-%% it's actually in state down rather then initial when receiving
-%% notification of an open connection.
+%% transport, there's no guarantee that an accepted connection in a
+%% restarted transport if from the peer we've lost contact with so
+%% have to be prepared for another watchdog to handle it. This is what
+%% the diameter_reg registration in this module is for: the peer
+%% connection is registered when leaving state initial and this is
+%% used by a new accepting watchdog to realize that it's actually in
+%% state down rather then initial when receiving notification of an
+%% open connection.
restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid,
- sequence = Mask}
+ sequence = Mask,
+ restrict = {R,_}}
= S) ->
Pid ! {reconnect, self()},
+ Nodes = restrict_nodes(R),
S#watchdog{transport = monitor(diameter_peer_fsm:start(T,
- Opts,
- {Mask, Svc}))};
+ Opts,
+ {Mask, Nodes, Svc})),
+ restrict = {R, lists:member(node(), Nodes)}};
+
+%% No restriction on the number of connections to the same peer: just
+%% die. Note that a state machine never enters state REOPEN in this
+%% case.
+restart({{accept, _}, _, _}, #watchdog{restrict = {_, false}}) ->
+ stop;
+
+%% Otherwise hang around until told to die.
restart({{accept, _}, _, _}, S) ->
S.
-%% Don't currently use Opts/Svc in the accept case but having them in
-%% the process dictionary is helpful if the process dies unexpectedly.
+
+%% Don't currently use Opts/Svc in the accept case.
%% dwr/1
@@ -659,3 +687,22 @@ dwr(#diameter_caps{origin_host = OH,
['DWR', {'Origin-Host', OH},
{'Origin-Realm', OR},
{'Origin-State-Id', OSI}].
+
+%% restrict_nodes/1
+
+restrict_nodes(false) ->
+ [];
+
+restrict_nodes(nodes) ->
+ [node() | nodes()];
+
+restrict_nodes(node) ->
+ [node()];
+
+restrict_nodes(Nodes)
+ when [] == Nodes;
+ is_atom(hd(Nodes)) ->
+ Nodes;
+
+restrict_nodes(F) ->
+ diameter_lib:eval(F).