From d70a02e7415caccd13fad8dda417d0d18a112a83 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 24 Feb 2013 17:39:46 +0100 Subject: More flexible distribution config Allow both share_peers and use_shared_peers to be a list of nodes, or a function that returns a list of nodes. --- lib/diameter/src/base/diameter.erl | 10 +++- lib/diameter/src/base/diameter_config.erl | 27 +++++---- lib/diameter/src/base/diameter_peer.erl | 8 +-- lib/diameter/src/base/diameter_service.erl | 88 +++++++++++++++++++----------- 4 files changed, 86 insertions(+), 47 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index 1a96cecc83..189f2b01b9 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -45,6 +45,7 @@ -export_type([evaluable/0, restriction/0, + remotes/0, sequence/0, app_alias/0, service_name/0, @@ -292,6 +293,11 @@ call(SvcName, App, Message) -> | [node()] | evaluable(). +-type remotes() + :: boolean() + | [node()] + | evaluable(). + %% Options passed to start_service/2 -type service_opt() @@ -299,8 +305,8 @@ call(SvcName, App, Message) -> | {application, [application_opt()]} | {restrict_connections, restriction()} | {sequence, sequence() | evaluable()} - | {share_peers, boolean()} - | {use_shared_peers, boolean()}. + | {share_peers, remotes()} + | {use_shared_peers, remotes()}. -type application_opt() :: {alias, app_alias()} diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index a638849b08..3a2e0d2140 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -573,7 +573,6 @@ make_config(SvcName, Opts) -> {false, monitor}, {?NOMASK, sequence}, {nodes, restrict_connections}]), - %% share_peers and use_shared_peers are currently undocumented. #service{name = SvcName, rec = #diameter_service{applications = Apps, @@ -592,19 +591,27 @@ opt(K, true = B) K == use_shared_peers -> B; -opt(monitor, P) - when is_pid(P) -> - P; - opt(restrict_connections, T) when T == node; - T == nodes; - T == []; - is_atom(hd(T)) -> + T == nodes -> + T; + +opt(K, T) + when (K == share_peers + orelse K == use_shared_peers + orelse K == restrict_connections), ([] == T + orelse is_atom(hd(T))) -> T; -opt(restrict_connections = K, F) -> - try diameter_lib:eval(F) of %% no guarantee that it won't fail later +opt(monitor, P) + when is_pid(P) -> + P; + +opt(K, F) + when K == restrict_connections; + K == share_peers; + K == use_shared_peers -> + try diameter_lib:eval(F) of %% but no guarantee that it won't fail later Nodes when is_list(Nodes) -> F; V -> diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index 130bedda84..dfc76eb76e 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -31,7 +31,7 @@ send/2, close/1, abort/1, - notify/2]). + notify/3]). %% Server start. -export([start_link/0]). @@ -63,11 +63,11 @@ -define(DEFAULT_TTMO, infinity). %%% --------------------------------------------------------------------------- -%%% # notify/2 +%%% # notify/3 %%% --------------------------------------------------------------------------- -notify(SvcName, T) -> - rpc:abcast(nodes(), ?SERVER, {notify, SvcName, T}). +notify(Nodes, SvcName, T) -> + rpc:abcast(Nodes, ?SERVER, {notify, SvcName, T}). %%% --------------------------------------------------------------------------- %%% # start/1 diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index edebd2fc77..255a3a44fd 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -125,9 +125,9 @@ monitor = false :: false | pid(), %% process to die with options :: [{sequence, diameter:sequence()} %% sequence mask - | {restrict_connections, diameter:restriction()} - | {share_peers, boolean()} %% broadcast peers to remote nodes? - | {use_shared_peers, boolean()}]}).%% use broadcasted peers? + | {share_peers, diameter:remotes()} %% broadcast to + | {use_shared_peers, diameter:remotes()} %% use from + | {restrict_connections, diameter:restriction()}]}). %% shared_peers reflects the peers broadcast from remote nodes. %% Record representing an RFC 3539 watchdog process implemented by @@ -681,11 +681,9 @@ mref(false = No) -> mref(P) -> erlang:monitor(process, P). -init_shared(#state{options = [_, _, {_, true} | _], +init_shared(#state{options = [_, _, {_,T} | _], service_name = Svc}) -> - diameter_peer:notify(Svc, {service, self()}); -init_shared(#state{options = [_, _, {_, false} | _]}) -> - ok. + notify(T, Svc, {service, self()}). init_mod(#diameter_app{alias = Alias, init_state = S}) -> @@ -698,6 +696,37 @@ get_value(Key, Vs) -> {_, V} = lists:keyfind(Key, 1, Vs), V. +notify(Share, SvcName, T) -> + Nodes = remotes(Share), + [] /= Nodes andalso diameter_peer:notify(Nodes, SvcName, T). +%% Test for the empty list for upgrade reasons: there's no +%% diameter_peer:notify/3 in old code so no call means no load order +%% requirement. + +remotes(false) -> + []; + +remotes(true) -> + nodes(); + +remotes(Nodes) + when is_atom(hd(Nodes)); + Nodes == [] -> + Nodes; + +remotes(F) -> + try diameter_lib:eval(F) of + L when is_list(L) -> + L; + T -> + diameter_lib:error_report({invalid_return, T}, F), + [] + catch + E:R -> + diameter_lib:error_report({failure, {E, R, ?STACK}}, F), + [] + end. + %% --------------------------------------------------------------------------- %% # start/3 %% --------------------------------------------------------------------------- @@ -1255,9 +1284,9 @@ send_event(#diameter_event{service = SvcName} = E) -> %% # share_peer/5 %% --------------------------------------------------------------------------- -share_peer(up, Caps, Apps, TPid, #state{options = [_, {_, true} | _], - service_name = Svc}) -> - diameter_peer:notify(Svc, {peer, TPid, [A || {_,A} <- Apps], Caps}); +share_peer(up, Caps, Apps, TPid, #state{options = [_, {_,T} | _], + service_name = Svc}) -> + notify(T, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps}); share_peer(_, _, _, _, _) -> ok. @@ -1266,36 +1295,34 @@ share_peer(_, _, _, _, _) -> %% # share_peers/2 %% --------------------------------------------------------------------------- -share_peers(Pid, #state{options = [_, {_, true} | _], - local_peers = PDict}) -> - ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict); - -share_peers(_, _) -> - ok. +share_peers(Pid, #state{options = [_, {_,T} | _], local_peers = PDict}) -> + is_remote(Pid, T) + andalso ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict). sp(Pid, Alias, Peers) -> lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers). +is_remote(Pid, T) -> + Node = node(Pid), + Node /= node() andalso lists:member(Node, remotes(T)). + %% --------------------------------------------------------------------------- %% # remote_peer_up/4 %% --------------------------------------------------------------------------- -remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _], - service = Svc, - shared_peers = PDict}) -> +remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) -> + is_remote(Pid, T) + andalso rpu(Pid, Aliases, Caps, S). + +rpu(Pid, Aliases, Caps, #state{service = Svc, shared_peers = PDict}) -> #diameter_service{applications = Apps} = Svc, Key = #diameter_app.alias, - rpu(Pid, Caps, PDict, lists:filter(fun(A) -> - lists:keymember(A, Key, Apps) - end, - Aliases)); - -remote_peer_up(_, _, _, #state{options = [_, _, {_, false} | _]}) -> - ok. + F = fun(A) -> lists:keymember(A, Key, Apps) end, + rpu(Pid, lists:filter(F, Aliases), Caps, PDict); -rpu(_, _, PDict, []) -> - PDict; -rpu(Pid, Caps, PDict, Aliases) -> +rpu(_, [] = No, _, _) -> + No; +rpu(Pid, Aliases, Caps, PDict) -> erlang:monitor(process, Pid), T = {Pid, Caps}, lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases). @@ -1304,8 +1331,7 @@ rpu(Pid, Caps, PDict, Aliases) -> %% # remote_peer_down/2 %% --------------------------------------------------------------------------- -remote_peer_down(Pid, #state{options = [_, _, {_, true} | _], - shared_peers = PDict}) -> +remote_peer_down(Pid, #state{shared_peers = PDict}) -> lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)). rpd(Pid, Alias, PDict) -> -- cgit v1.2.3