diff options
author | Anders Svensson <[email protected]> | 2013-02-24 17:39:46 +0100 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2013-03-17 15:07:00 +0100 |
commit | d70a02e7415caccd13fad8dda417d0d18a112a83 (patch) | |
tree | 6b3aa51ad544e74459e5bb6c4a2fcd1d74f080ee | |
parent | 63e21fa7d8aa6761640f9cf357663b03578a5446 (diff) | |
download | otp-d70a02e7415caccd13fad8dda417d0d18a112a83.tar.gz otp-d70a02e7415caccd13fad8dda417d0d18a112a83.tar.bz2 otp-d70a02e7415caccd13fad8dda417d0d18a112a83.zip |
More flexible distribution config
Allow both share_peers and use_shared_peers to be a list of nodes, or a
function that returns a list of nodes.
-rw-r--r-- | lib/diameter/doc/src/diameter.xml | 75 | ||||
-rw-r--r-- | lib/diameter/doc/src/diameter_app.xml | 8 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter.erl | 10 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_config.erl | 27 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer.erl | 8 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 88 | ||||
-rw-r--r-- | lib/diameter/test/diameter_distribution_SUITE.erl | 126 |
7 files changed, 212 insertions, 130 deletions
diff --git a/lib/diameter/doc/src/diameter.xml b/lib/diameter/doc/src/diameter.xml index df9e03b2da..8ad4af85a3 100644 --- a/lib/diameter/doc/src/diameter.xml +++ b/lib/diameter/doc/src/diameter.xml @@ -43,7 +43,7 @@ under the License. <approved></approved> <checked></checked> <date></date> -<rev>%VSN%</rev> +<rev></rev> <file>diameter.xml</file> </header> @@ -774,8 +774,8 @@ Application-Id AVP's in particular.</p> | evaluable()}</c></tag> <item> <p> -Specifies the degree to which multiple transport connections to the -same peer are accepted by the service.</p> +Specifies the degree to which the service allows multiple transport +connections to the same peer.</p> <p> If type <c>[node()]</c> then a connection is rejected if another already @@ -831,40 +831,60 @@ outgoing requests.</p> </warning> </item> -<tag><c>{share_peers, boolean()}</c></tag> +<tag><c>{share_peers, boolean() | [node()] | evaluable()}</c></tag> <item> <p> -Specifies whether or not peer connections on the local Erlang node -are shared with services on visible nodes (as returned by &nodes;). -Peers shared from remote nodes become available in the candidates list -passed as the second argument to &app_pick_peer; callbacks.</p> +Specifies nodes to which peer connections established on the local +Erlang node are communicated. +Shared peers become available in the remote candidates list passed to +&app_pick_peer; callbacks on remote nodes whose services are +configured to use them: see <c>use_shared_peers</c> below.</p> <p> -Defaults to <c>false</c>.</p> +If <c>false</c> then peers are not shared. +If <c>[node()]</c> then peers are shared with the specified list of +nodes. +If <c>evaluable()</c> then peers are shared with the nodes returned +by the specified function, evaluated whenever a peer connection +becomes available or a remote service requests information about local +connections. +The value <c>true</c> is equivalent to <c>fun &nodes;</c>. +The value <c>node()</c> in a node list is ignored, so a collection of +services can all be configured to share with the same list of +nodes.</p> -<note> <p> -Peers are only shared with other services of the same name. -Since the value of the &application_opt; <c>alias</c> is the handle -for identifying a peer, both local and remote, as a candidate for an -outgoing request, services that share peers should use the same -aliases for identifying their supported applications.</p> -</note> +Defaults to <c>false</c>.</p> <note> <p> -Services that share peers can do so in order to distribute the -implementation of a Diameter node across multiple Erlang nodes, in -which case the participating services should typically be configured -with identical &capabilities;.</p> +Peers are only shared with services of the same name for the purpose +of sending outgoing requests. +Since the value of the &application_opt; <c>alias</c>, passed to +&call;, is the handle for identifying a peer as a suitable +candidate, services that share peers must use the same aliases to +identify their supported applications. +They should typically also configure identical &capabilities;, since +by sharing peer connections they are distributing the implementation +of a single Diameter node across multiple Erlang nodes.</p> </note> </item> -<tag><c>{use_shared_peers, boolean()}</c></tag> +<tag><c>{use_shared_peers, boolean() | [node()] | evaluable()}</c></tag> <item> <p> -Specifies whether or not the service makes use of peer connections -shared by identically named services on other Erlang nodes.</p> +Specifies nodes from which communicated peers are made available in +the remote candidates list of &app_pick_peer; callbacks.</p> + +<p> +If <c>false</c> then remote peers are not used. +If <c>[node()]</c> then only peers from the specified list of nodes +are used. +If <c>evaluable()</c> then only peers returned by the specified +function are used, evaluated whenever a remote service communicates +information about an available peer connection. +The value <c>true</c> is equivalent to <c>fun &nodes;</c>. +The value <c>node()</c> in a node list is ignored.</p> <p> Defaults to <c>false</c>.</p> @@ -874,6 +894,15 @@ Defaults to <c>false</c>.</p> A service that does not use shared peers will always pass the empty list as the second argument of &app_pick_peer; callbacks.</p> </note> + +<warning> +<p> +Sending a request over a peer connection on a remote node is less +efficient than sending it over a local connection. +It may be preferable to make use of the &service_opt; +<c>restrict_connections</c> and maintain a dedicated connection on +each node from which requests are sent.</p> +</warning> </item> </taglist> diff --git a/lib/diameter/doc/src/diameter_app.xml b/lib/diameter/doc/src/diameter_app.xml index d094e1bade..d4fb792787 100644 --- a/lib/diameter/doc/src/diameter_app.xml +++ b/lib/diameter/doc/src/diameter_app.xml @@ -37,7 +37,7 @@ under the License. <approved></approved> <checked></checked> <date></date> -<rev>%REV%</rev> +<rev></rev> <file>diameter_app.xml</file> </header> @@ -297,9 +297,9 @@ The return values <c>false</c> and <c>{false, State}</c> (that is, <note> <p> -<c>RemoteCandidates</c> is the empty list if the service has been -configured with the (default) &mod_service_opt; -<c>{use_shared_peers, false}</c>.</p> +The &mod_service_opt; <c>use_shared_peers</c> determines whether or +not a service uses peers shared from other nodes. +If not then <c>RemoteCandidates</c> is the empty list.</p> </note> <warning> diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index 1a96cecc83..189f2b01b9 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -45,6 +45,7 @@ -export_type([evaluable/0, restriction/0, + remotes/0, sequence/0, app_alias/0, service_name/0, @@ -292,6 +293,11 @@ call(SvcName, App, Message) -> | [node()] | evaluable(). +-type remotes() + :: boolean() + | [node()] + | evaluable(). + %% Options passed to start_service/2 -type service_opt() @@ -299,8 +305,8 @@ call(SvcName, App, Message) -> | {application, [application_opt()]} | {restrict_connections, restriction()} | {sequence, sequence() | evaluable()} - | {share_peers, boolean()} - | {use_shared_peers, boolean()}. + | {share_peers, remotes()} + | {use_shared_peers, remotes()}. -type application_opt() :: {alias, app_alias()} diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index a638849b08..3a2e0d2140 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -573,7 +573,6 @@ make_config(SvcName, Opts) -> {false, monitor}, {?NOMASK, sequence}, {nodes, restrict_connections}]), - %% share_peers and use_shared_peers are currently undocumented. #service{name = SvcName, rec = #diameter_service{applications = Apps, @@ -592,19 +591,27 @@ opt(K, true = B) K == use_shared_peers -> B; -opt(monitor, P) - when is_pid(P) -> - P; - opt(restrict_connections, T) when T == node; - T == nodes; - T == []; - is_atom(hd(T)) -> + T == nodes -> + T; + +opt(K, T) + when (K == share_peers + orelse K == use_shared_peers + orelse K == restrict_connections), ([] == T + orelse is_atom(hd(T))) -> T; -opt(restrict_connections = K, F) -> - try diameter_lib:eval(F) of %% no guarantee that it won't fail later +opt(monitor, P) + when is_pid(P) -> + P; + +opt(K, F) + when K == restrict_connections; + K == share_peers; + K == use_shared_peers -> + try diameter_lib:eval(F) of %% but no guarantee that it won't fail later Nodes when is_list(Nodes) -> F; V -> diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index 130bedda84..dfc76eb76e 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -31,7 +31,7 @@ send/2, close/1, abort/1, - notify/2]). + notify/3]). %% Server start. -export([start_link/0]). @@ -63,11 +63,11 @@ -define(DEFAULT_TTMO, infinity). %%% --------------------------------------------------------------------------- -%%% # notify/2 +%%% # notify/3 %%% --------------------------------------------------------------------------- -notify(SvcName, T) -> - rpc:abcast(nodes(), ?SERVER, {notify, SvcName, T}). +notify(Nodes, SvcName, T) -> + rpc:abcast(Nodes, ?SERVER, {notify, SvcName, T}). %%% --------------------------------------------------------------------------- %%% # start/1 diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index edebd2fc77..255a3a44fd 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -125,9 +125,9 @@ monitor = false :: false | pid(), %% process to die with options :: [{sequence, diameter:sequence()} %% sequence mask - | {restrict_connections, diameter:restriction()} - | {share_peers, boolean()} %% broadcast peers to remote nodes? - | {use_shared_peers, boolean()}]}).%% use broadcasted peers? + | {share_peers, diameter:remotes()} %% broadcast to + | {use_shared_peers, diameter:remotes()} %% use from + | {restrict_connections, diameter:restriction()}]}). %% shared_peers reflects the peers broadcast from remote nodes. %% Record representing an RFC 3539 watchdog process implemented by @@ -681,11 +681,9 @@ mref(false = No) -> mref(P) -> erlang:monitor(process, P). -init_shared(#state{options = [_, _, {_, true} | _], +init_shared(#state{options = [_, _, {_,T} | _], service_name = Svc}) -> - diameter_peer:notify(Svc, {service, self()}); -init_shared(#state{options = [_, _, {_, false} | _]}) -> - ok. + notify(T, Svc, {service, self()}). init_mod(#diameter_app{alias = Alias, init_state = S}) -> @@ -698,6 +696,37 @@ get_value(Key, Vs) -> {_, V} = lists:keyfind(Key, 1, Vs), V. +notify(Share, SvcName, T) -> + Nodes = remotes(Share), + [] /= Nodes andalso diameter_peer:notify(Nodes, SvcName, T). +%% Test for the empty list for upgrade reasons: there's no +%% diameter_peer:notify/3 in old code so no call means no load order +%% requirement. + +remotes(false) -> + []; + +remotes(true) -> + nodes(); + +remotes(Nodes) + when is_atom(hd(Nodes)); + Nodes == [] -> + Nodes; + +remotes(F) -> + try diameter_lib:eval(F) of + L when is_list(L) -> + L; + T -> + diameter_lib:error_report({invalid_return, T}, F), + [] + catch + E:R -> + diameter_lib:error_report({failure, {E, R, ?STACK}}, F), + [] + end. + %% --------------------------------------------------------------------------- %% # start/3 %% --------------------------------------------------------------------------- @@ -1255,9 +1284,9 @@ send_event(#diameter_event{service = SvcName} = E) -> %% # share_peer/5 %% --------------------------------------------------------------------------- -share_peer(up, Caps, Apps, TPid, #state{options = [_, {_, true} | _], - service_name = Svc}) -> - diameter_peer:notify(Svc, {peer, TPid, [A || {_,A} <- Apps], Caps}); +share_peer(up, Caps, Apps, TPid, #state{options = [_, {_,T} | _], + service_name = Svc}) -> + notify(T, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps}); share_peer(_, _, _, _, _) -> ok. @@ -1266,36 +1295,34 @@ share_peer(_, _, _, _, _) -> %% # share_peers/2 %% --------------------------------------------------------------------------- -share_peers(Pid, #state{options = [_, {_, true} | _], - local_peers = PDict}) -> - ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict); - -share_peers(_, _) -> - ok. +share_peers(Pid, #state{options = [_, {_,T} | _], local_peers = PDict}) -> + is_remote(Pid, T) + andalso ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict). sp(Pid, Alias, Peers) -> lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers). +is_remote(Pid, T) -> + Node = node(Pid), + Node /= node() andalso lists:member(Node, remotes(T)). + %% --------------------------------------------------------------------------- %% # remote_peer_up/4 %% --------------------------------------------------------------------------- -remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _], - service = Svc, - shared_peers = PDict}) -> +remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) -> + is_remote(Pid, T) + andalso rpu(Pid, Aliases, Caps, S). + +rpu(Pid, Aliases, Caps, #state{service = Svc, shared_peers = PDict}) -> #diameter_service{applications = Apps} = Svc, Key = #diameter_app.alias, - rpu(Pid, Caps, PDict, lists:filter(fun(A) -> - lists:keymember(A, Key, Apps) - end, - Aliases)); - -remote_peer_up(_, _, _, #state{options = [_, _, {_, false} | _]}) -> - ok. + F = fun(A) -> lists:keymember(A, Key, Apps) end, + rpu(Pid, lists:filter(F, Aliases), Caps, PDict); -rpu(_, _, PDict, []) -> - PDict; -rpu(Pid, Caps, PDict, Aliases) -> +rpu(_, [] = No, _, _) -> + No; +rpu(Pid, Aliases, Caps, PDict) -> erlang:monitor(process, Pid), T = {Pid, Caps}, lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases). @@ -1304,8 +1331,7 @@ rpu(Pid, Caps, PDict, Aliases) -> %% # remote_peer_down/2 %% --------------------------------------------------------------------------- -remote_peer_down(Pid, #state{options = [_, _, {_, true} | _], - shared_peers = PDict}) -> +remote_peer_down(Pid, #state{shared_peers = PDict}) -> lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)). rpd(Pid, Alias, PDict) -> diff --git a/lib/diameter/test/diameter_distribution_SUITE.erl b/lib/diameter/test/diameter_distribution_SUITE.erl index 264def1e98..08b0870730 100644 --- a/lib/diameter/test/diameter_distribution_SUITE.erl +++ b/lib/diameter/test/diameter_distribution_SUITE.erl @@ -28,8 +28,9 @@ all/0]). %% testcases --export([start/1, +-export([enslave/1, ping/1, + start/1, connect/1, send_local/1, send_remote/1, @@ -71,8 +72,8 @@ {'Product-Name', "OTP/diameter"}, {'Auth-Application-Id', [?DICT:id()]}, {'Origin-State-Id', origin()}, - {share_peers, true}, - {use_shared_peers, true}, + {share_peers, peers()}, + {use_shared_peers, peers()}, {restrict_connections, false}, {sequence, fun sequence/0}, {application, [{dictionary, ?DICT}, @@ -102,8 +103,9 @@ suite() -> [{timetrap, {seconds, 60}}]. all() -> - [start, + [enslave, ping, + start, connect, send_local, send_remote, @@ -114,10 +116,49 @@ all() -> %% =========================================================================== %% start/stop testcases +%% enslave/1 +%% +%% Start four slave nodes, one to implement a Diameter server, +%% two three to implement a client. + +enslave(Config) -> + Here = filename:dirname(code:which(?MODULE)), + Ebin = filename:join([Here, "..", "ebin"]), + Dirs = [Here, Ebin], + Nodes = [{N,S} || {M,S} <- ?NODES, N <- [slave(M, Dirs)]], + ?util:write_priv(Config, nodes, [{N,S} || {{N,ok},S} <- Nodes]), + [] = [{T,S} || {{_,E} = T, S} <- Nodes, E /= ok]. + +slave(Name, Dirs) -> + add_pathsa(Dirs, ct_slave:start(Name)). + +add_pathsa(Dirs, {ok, Node}) -> + {Node, rpc:call(Node, code, add_pathsa, [Dirs])}; +add_pathsa(_, No) -> + {No, error}. + +%% ping/1 +%% +%% Ensure the client nodes are connected since the sharing of +%% transports is only between connected nodes. + +ping({?SERVER, _Nodes}) -> + []; + +ping({?CLIENT, Nodes}) -> + [N || {N,_} <- Nodes, + node() /= N, + pang <- [net_adm:ping(N)]]; + +ping(Config) -> + Nodes = ?util:read_priv(Config, nodes), + [] = [{N,RC} || {N,S} <- Nodes, + RC <- [rpc:call(N, ?MODULE, ping, [{S, Nodes}])], + RC /= []]. + %% start/1 %% -%% Start three slave nodes, one that implement a Diameter server and -%% two that implement a client. +%% Start diameter services. start(SvcName) when is_atom(SvcName) -> @@ -125,21 +166,10 @@ start(SvcName) ok = diameter:start_service(SvcName, ?SERVICE((?L(SvcName)))); start(Config) -> - Dir = filename:dirname(code:which(?MODULE)), - Nodes = [{N, Svcname} || {Nodename, Svcname} <- ?NODES, - N <- [slave(Nodename, Dir)]], - [] = [RC || {N,S} <- Nodes, - RC <- [rpc:call(N, ?MODULE, start, [S])], - RC /= ok], - ?util:write_priv(Config, nodes, Nodes). - -slave(Name, Dir) -> - {ok, Node} = ct_slave:start(Name), - ok = rpc:call(Node, - code, - add_pathsa, - [[Dir, filename:join([Dir, "..", "ebin"])]]), - Node. + Nodes = ?util:read_priv(Config, nodes), + [] = [{N,RC} || {N,S} <- Nodes, + RC <- [rpc:call(N, ?MODULE, start, [S])], + RC /= ok]. sequence() -> sequence(sname()). @@ -159,25 +189,13 @@ origin(Client) -> "client" ++ N = ?L(Client), list_to_integer(N). -%% ping/1 -%% -%% Ensure the client nodes are connected since the sharing of -%% transports is only between connected nodes. +peers() -> + peers(sname()). -ping({?SERVER, _Nodes}) -> - ok; - -ping({?CLIENT, Nodes}) -> - {_, []} = {node(), [N || {N,_} <- Nodes, - node() /= N, - pang <- [net_adm:ping(N)]]}, - ok; - -ping(Config) -> - Nodes = ?util:read_priv(Config, nodes), - [] = [RC || {N,S} <- Nodes, - RC <- [rpc:call(N, ?MODULE, ping, [{S, Nodes}])], - RC /= ok]. +peers(server) -> true; +peers(client0) -> [node() | nodes()]; +peers(client1) -> fun erlang:nodes/0; +peers(client2) -> nodes(). %% connect/1 %% @@ -194,23 +212,17 @@ connect({?CLIENT, Config}) -> connect(Config) -> Nodes = ?util:read_priv(Config, nodes), - [] = [RC || {N,S} <- Nodes, - RC <- [rpc:call(N, ?MODULE, connect, [{S,Config}])], - RC /= ok]. + [] = [{N,RC} || {N,S} <- Nodes, + RC <- [rpc:call(N, ?MODULE, connect, [{S,Config}])], + RC /= ok]. %% stop/1 %% %% Stop the slave nodes. -stop(Name) - when is_atom(Name) -> - {ok, _Node} = ct_slave:stop(Name), - ok; - stop(_Config) -> - [] = [RC || {N,_} <- ?NODES, - RC <- [stop(N)], - RC /= ok]. + [] = [{N,E} || {N,_} <- ?NODES, + {error, _, _} = E <- [ct_slave:stop(N)]]. %% =========================================================================== %% traffic testcases @@ -336,12 +348,7 @@ request(#diameter_base_STR{'Termination-Cause' = ?TIMEOUT}, _) -> request(#diameter_base_STR{'Termination-Cause' = ?MOVED}, Peer) -> {TPid, #diameter_caps{origin_state_id = {_, [N]}}} = Peer, - if N == 0 -> %% sent from the originating node ... - {protocol_error, ?BUSY}; - true -> %% ... or through a remote node: force failover - exit(TPid, kill), - discard - end; + fail(N, TPid); request(#diameter_base_STR{'Session-Id' = SId}, {_, Caps}) -> #diameter_caps{origin_host = {OH, _}, @@ -351,3 +358,10 @@ request(#diameter_base_STR{'Session-Id' = SId}, {_, Caps}) -> 'Session-Id' = SId, 'Origin-Host' = OH, 'Origin-Realm' = OR}}. + +fail(0, _) -> %% sent from the originating node ... + {protocol_error, ?BUSY}; + +fail(_, TPid) -> %% ... or through a remote node: force failover + exit(TPid, kill), + discard. |