aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2013-02-24 17:39:46 +0100
committerAnders Svensson <[email protected]>2013-03-17 15:07:00 +0100
commitd70a02e7415caccd13fad8dda417d0d18a112a83 (patch)
tree6b3aa51ad544e74459e5bb6c4a2fcd1d74f080ee /lib
parent63e21fa7d8aa6761640f9cf357663b03578a5446 (diff)
downloadotp-d70a02e7415caccd13fad8dda417d0d18a112a83.tar.gz
otp-d70a02e7415caccd13fad8dda417d0d18a112a83.tar.bz2
otp-d70a02e7415caccd13fad8dda417d0d18a112a83.zip
More flexible distribution config
Allow both share_peers and use_shared_peers to be a list of nodes, or a function that returns a list of nodes.
Diffstat (limited to 'lib')
-rw-r--r--lib/diameter/doc/src/diameter.xml75
-rw-r--r--lib/diameter/doc/src/diameter_app.xml8
-rw-r--r--lib/diameter/src/base/diameter.erl10
-rw-r--r--lib/diameter/src/base/diameter_config.erl27
-rw-r--r--lib/diameter/src/base/diameter_peer.erl8
-rw-r--r--lib/diameter/src/base/diameter_service.erl88
-rw-r--r--lib/diameter/test/diameter_distribution_SUITE.erl126
7 files changed, 212 insertions, 130 deletions
diff --git a/lib/diameter/doc/src/diameter.xml b/lib/diameter/doc/src/diameter.xml
index df9e03b2da..8ad4af85a3 100644
--- a/lib/diameter/doc/src/diameter.xml
+++ b/lib/diameter/doc/src/diameter.xml
@@ -43,7 +43,7 @@ under the License.
<approved></approved>
<checked></checked>
<date></date>
-<rev>%VSN%</rev>
+<rev></rev>
<file>diameter.xml</file>
</header>
@@ -774,8 +774,8 @@ Application-Id AVP's in particular.</p>
| evaluable()}</c></tag>
<item>
<p>
-Specifies the degree to which multiple transport connections to the
-same peer are accepted by the service.</p>
+Specifies the degree to which the service allows multiple transport
+connections to the same peer.</p>
<p>
If type <c>[node()]</c> then a connection is rejected if another already
@@ -831,40 +831,60 @@ outgoing requests.</p>
</warning>
</item>
-<tag><c>{share_peers, boolean()}</c></tag>
+<tag><c>{share_peers, boolean() | [node()] | evaluable()}</c></tag>
<item>
<p>
-Specifies whether or not peer connections on the local Erlang node
-are shared with services on visible nodes (as returned by &nodes;).
-Peers shared from remote nodes become available in the candidates list
-passed as the second argument to &app_pick_peer; callbacks.</p>
+Specifies nodes to which peer connections established on the local
+Erlang node are communicated.
+Shared peers become available in the remote candidates list passed to
+&app_pick_peer; callbacks on remote nodes whose services are
+configured to use them: see <c>use_shared_peers</c> below.</p>
<p>
-Defaults to <c>false</c>.</p>
+If <c>false</c> then peers are not shared.
+If <c>[node()]</c> then peers are shared with the specified list of
+nodes.
+If <c>evaluable()</c> then peers are shared with the nodes returned
+by the specified function, evaluated whenever a peer connection
+becomes available or a remote service requests information about local
+connections.
+The value <c>true</c> is equivalent to <c>fun &nodes;</c>.
+The value <c>node()</c> in a node list is ignored, so a collection of
+services can all be configured to share with the same list of
+nodes.</p>
-<note>
<p>
-Peers are only shared with other services of the same name.
-Since the value of the &application_opt; <c>alias</c> is the handle
-for identifying a peer, both local and remote, as a candidate for an
-outgoing request, services that share peers should use the same
-aliases for identifying their supported applications.</p>
-</note>
+Defaults to <c>false</c>.</p>
<note>
<p>
-Services that share peers can do so in order to distribute the
-implementation of a Diameter node across multiple Erlang nodes, in
-which case the participating services should typically be configured
-with identical &capabilities;.</p>
+Peers are only shared with services of the same name for the purpose
+of sending outgoing requests.
+Since the value of the &application_opt; <c>alias</c>, passed to
+&call;, is the handle for identifying a peer as a suitable
+candidate, services that share peers must use the same aliases to
+identify their supported applications.
+They should typically also configure identical &capabilities;, since
+by sharing peer connections they are distributing the implementation
+of a single Diameter node across multiple Erlang nodes.</p>
</note>
</item>
-<tag><c>{use_shared_peers, boolean()}</c></tag>
+<tag><c>{use_shared_peers, boolean() | [node()] | evaluable()}</c></tag>
<item>
<p>
-Specifies whether or not the service makes use of peer connections
-shared by identically named services on other Erlang nodes.</p>
+Specifies nodes from which communicated peers are made available in
+the remote candidates list of &app_pick_peer; callbacks.</p>
+
+<p>
+If <c>false</c> then remote peers are not used.
+If <c>[node()]</c> then only peers from the specified list of nodes
+are used.
+If <c>evaluable()</c> then only peers returned by the specified
+function are used, evaluated whenever a remote service communicates
+information about an available peer connection.
+The value <c>true</c> is equivalent to <c>fun &nodes;</c>.
+The value <c>node()</c> in a node list is ignored.</p>
<p>
Defaults to <c>false</c>.</p>
@@ -874,6 +894,15 @@ Defaults to <c>false</c>.</p>
A service that does not use shared peers will always pass the empty
list as the second argument of &app_pick_peer; callbacks.</p>
</note>
+
+<warning>
+<p>
+Sending a request over a peer connection on a remote node is less
+efficient than sending it over a local connection.
+It may be preferable to make use of the &service_opt;
+<c>restrict_connections</c> and maintain a dedicated connection on
+each node from which requests are sent.</p>
+</warning>
</item>
</taglist>
diff --git a/lib/diameter/doc/src/diameter_app.xml b/lib/diameter/doc/src/diameter_app.xml
index d094e1bade..d4fb792787 100644
--- a/lib/diameter/doc/src/diameter_app.xml
+++ b/lib/diameter/doc/src/diameter_app.xml
@@ -37,7 +37,7 @@ under the License.
<approved></approved>
<checked></checked>
<date></date>
-<rev>%REV%</rev>
+<rev></rev>
<file>diameter_app.xml</file>
</header>
@@ -297,9 +297,9 @@ The return values <c>false</c> and <c>{false, State}</c> (that is,
<note>
<p>
-<c>RemoteCandidates</c> is the empty list if the service has been
-configured with the (default) &mod_service_opt;
-<c>{use_shared_peers, false}</c>.</p>
+The &mod_service_opt; <c>use_shared_peers</c> determines whether or
+not a service uses peers shared from other nodes.
+If not then <c>RemoteCandidates</c> is the empty list.</p>
</note>
<warning>
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl
index 1a96cecc83..189f2b01b9 100644
--- a/lib/diameter/src/base/diameter.erl
+++ b/lib/diameter/src/base/diameter.erl
@@ -45,6 +45,7 @@
-export_type([evaluable/0,
restriction/0,
+ remotes/0,
sequence/0,
app_alias/0,
service_name/0,
@@ -292,6 +293,11 @@ call(SvcName, App, Message) ->
| [node()]
| evaluable().
+-type remotes()
+ :: boolean()
+ | [node()]
+ | evaluable().
+
%% Options passed to start_service/2
-type service_opt()
@@ -299,8 +305,8 @@ call(SvcName, App, Message) ->
| {application, [application_opt()]}
| {restrict_connections, restriction()}
| {sequence, sequence() | evaluable()}
- | {share_peers, boolean()}
- | {use_shared_peers, boolean()}.
+ | {share_peers, remotes()}
+ | {use_shared_peers, remotes()}.
-type application_opt()
:: {alias, app_alias()}
diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl
index a638849b08..3a2e0d2140 100644
--- a/lib/diameter/src/base/diameter_config.erl
+++ b/lib/diameter/src/base/diameter_config.erl
@@ -573,7 +573,6 @@ make_config(SvcName, Opts) ->
{false, monitor},
{?NOMASK, sequence},
{nodes, restrict_connections}]),
- %% share_peers and use_shared_peers are currently undocumented.
#service{name = SvcName,
rec = #diameter_service{applications = Apps,
@@ -592,19 +591,27 @@ opt(K, true = B)
K == use_shared_peers ->
B;
-opt(monitor, P)
- when is_pid(P) ->
- P;
-
opt(restrict_connections, T)
when T == node;
- T == nodes;
- T == [];
- is_atom(hd(T)) ->
+ T == nodes ->
+ T;
+
+opt(K, T)
+ when (K == share_peers
+ orelse K == use_shared_peers
+ orelse K == restrict_connections), ([] == T
+ orelse is_atom(hd(T))) ->
T;
-opt(restrict_connections = K, F) ->
- try diameter_lib:eval(F) of %% no guarantee that it won't fail later
+opt(monitor, P)
+ when is_pid(P) ->
+ P;
+
+opt(K, F)
+ when K == restrict_connections;
+ K == share_peers;
+ K == use_shared_peers ->
+ try diameter_lib:eval(F) of %% but no guarantee that it won't fail later
Nodes when is_list(Nodes) ->
F;
V ->
diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl
index 130bedda84..dfc76eb76e 100644
--- a/lib/diameter/src/base/diameter_peer.erl
+++ b/lib/diameter/src/base/diameter_peer.erl
@@ -31,7 +31,7 @@
send/2,
close/1,
abort/1,
- notify/2]).
+ notify/3]).
%% Server start.
-export([start_link/0]).
@@ -63,11 +63,11 @@
-define(DEFAULT_TTMO, infinity).
%%% ---------------------------------------------------------------------------
-%%% # notify/2
+%%% # notify/3
%%% ---------------------------------------------------------------------------
-notify(SvcName, T) ->
- rpc:abcast(nodes(), ?SERVER, {notify, SvcName, T}).
+notify(Nodes, SvcName, T) ->
+ rpc:abcast(Nodes, ?SERVER, {notify, SvcName, T}).
%%% ---------------------------------------------------------------------------
%%% # start/1
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index edebd2fc77..255a3a44fd 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -125,9 +125,9 @@
monitor = false :: false | pid(), %% process to die with
options
:: [{sequence, diameter:sequence()} %% sequence mask
- | {restrict_connections, diameter:restriction()}
- | {share_peers, boolean()} %% broadcast peers to remote nodes?
- | {use_shared_peers, boolean()}]}).%% use broadcasted peers?
+ | {share_peers, diameter:remotes()} %% broadcast to
+ | {use_shared_peers, diameter:remotes()} %% use from
+ | {restrict_connections, diameter:restriction()}]}).
%% shared_peers reflects the peers broadcast from remote nodes.
%% Record representing an RFC 3539 watchdog process implemented by
@@ -681,11 +681,9 @@ mref(false = No) ->
mref(P) ->
erlang:monitor(process, P).
-init_shared(#state{options = [_, _, {_, true} | _],
+init_shared(#state{options = [_, _, {_,T} | _],
service_name = Svc}) ->
- diameter_peer:notify(Svc, {service, self()});
-init_shared(#state{options = [_, _, {_, false} | _]}) ->
- ok.
+ notify(T, Svc, {service, self()}).
init_mod(#diameter_app{alias = Alias,
init_state = S}) ->
@@ -698,6 +696,37 @@ get_value(Key, Vs) ->
{_, V} = lists:keyfind(Key, 1, Vs),
V.
+notify(Share, SvcName, T) ->
+ Nodes = remotes(Share),
+ [] /= Nodes andalso diameter_peer:notify(Nodes, SvcName, T).
+%% Test for the empty list for upgrade reasons: there's no
+%% diameter_peer:notify/3 in old code so no call means no load order
+%% requirement.
+
+remotes(false) ->
+ [];
+
+remotes(true) ->
+ nodes();
+
+remotes(Nodes)
+ when is_atom(hd(Nodes));
+ Nodes == [] ->
+ Nodes;
+
+remotes(F) ->
+ try diameter_lib:eval(F) of
+ L when is_list(L) ->
+ L;
+ T ->
+ diameter_lib:error_report({invalid_return, T}, F),
+ []
+ catch
+ E:R ->
+ diameter_lib:error_report({failure, {E, R, ?STACK}}, F),
+ []
+ end.
+
%% ---------------------------------------------------------------------------
%% # start/3
%% ---------------------------------------------------------------------------
@@ -1255,9 +1284,9 @@ send_event(#diameter_event{service = SvcName} = E) ->
%% # share_peer/5
%% ---------------------------------------------------------------------------
-share_peer(up, Caps, Apps, TPid, #state{options = [_, {_, true} | _],
- service_name = Svc}) ->
- diameter_peer:notify(Svc, {peer, TPid, [A || {_,A} <- Apps], Caps});
+share_peer(up, Caps, Apps, TPid, #state{options = [_, {_,T} | _],
+ service_name = Svc}) ->
+ notify(T, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps});
share_peer(_, _, _, _, _) ->
ok.
@@ -1266,36 +1295,34 @@ share_peer(_, _, _, _, _) ->
%% # share_peers/2
%% ---------------------------------------------------------------------------
-share_peers(Pid, #state{options = [_, {_, true} | _],
- local_peers = PDict}) ->
- ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict);
-
-share_peers(_, _) ->
- ok.
+share_peers(Pid, #state{options = [_, {_,T} | _], local_peers = PDict}) ->
+ is_remote(Pid, T)
+ andalso ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict).
sp(Pid, Alias, Peers) ->
lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers).
+is_remote(Pid, T) ->
+ Node = node(Pid),
+ Node /= node() andalso lists:member(Node, remotes(T)).
+
%% ---------------------------------------------------------------------------
%% # remote_peer_up/4
%% ---------------------------------------------------------------------------
-remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _],
- service = Svc,
- shared_peers = PDict}) ->
+remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) ->
+ is_remote(Pid, T)
+ andalso rpu(Pid, Aliases, Caps, S).
+
+rpu(Pid, Aliases, Caps, #state{service = Svc, shared_peers = PDict}) ->
#diameter_service{applications = Apps} = Svc,
Key = #diameter_app.alias,
- rpu(Pid, Caps, PDict, lists:filter(fun(A) ->
- lists:keymember(A, Key, Apps)
- end,
- Aliases));
-
-remote_peer_up(_, _, _, #state{options = [_, _, {_, false} | _]}) ->
- ok.
+ F = fun(A) -> lists:keymember(A, Key, Apps) end,
+ rpu(Pid, lists:filter(F, Aliases), Caps, PDict);
-rpu(_, _, PDict, []) ->
- PDict;
-rpu(Pid, Caps, PDict, Aliases) ->
+rpu(_, [] = No, _, _) ->
+ No;
+rpu(Pid, Aliases, Caps, PDict) ->
erlang:monitor(process, Pid),
T = {Pid, Caps},
lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases).
@@ -1304,8 +1331,7 @@ rpu(Pid, Caps, PDict, Aliases) ->
%% # remote_peer_down/2
%% ---------------------------------------------------------------------------
-remote_peer_down(Pid, #state{options = [_, _, {_, true} | _],
- shared_peers = PDict}) ->
+remote_peer_down(Pid, #state{shared_peers = PDict}) ->
lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)).
rpd(Pid, Alias, PDict) ->
diff --git a/lib/diameter/test/diameter_distribution_SUITE.erl b/lib/diameter/test/diameter_distribution_SUITE.erl
index 264def1e98..08b0870730 100644
--- a/lib/diameter/test/diameter_distribution_SUITE.erl
+++ b/lib/diameter/test/diameter_distribution_SUITE.erl
@@ -28,8 +28,9 @@
all/0]).
%% testcases
--export([start/1,
+-export([enslave/1,
ping/1,
+ start/1,
connect/1,
send_local/1,
send_remote/1,
@@ -71,8 +72,8 @@
{'Product-Name', "OTP/diameter"},
{'Auth-Application-Id', [?DICT:id()]},
{'Origin-State-Id', origin()},
- {share_peers, true},
- {use_shared_peers, true},
+ {share_peers, peers()},
+ {use_shared_peers, peers()},
{restrict_connections, false},
{sequence, fun sequence/0},
{application, [{dictionary, ?DICT},
@@ -102,8 +103,9 @@ suite() ->
[{timetrap, {seconds, 60}}].
all() ->
- [start,
+ [enslave,
ping,
+ start,
connect,
send_local,
send_remote,
@@ -114,10 +116,49 @@ all() ->
%% ===========================================================================
%% start/stop testcases
+%% enslave/1
+%%
+%% Start four slave nodes, one to implement a Diameter server,
+%% two three to implement a client.
+
+enslave(Config) ->
+ Here = filename:dirname(code:which(?MODULE)),
+ Ebin = filename:join([Here, "..", "ebin"]),
+ Dirs = [Here, Ebin],
+ Nodes = [{N,S} || {M,S} <- ?NODES, N <- [slave(M, Dirs)]],
+ ?util:write_priv(Config, nodes, [{N,S} || {{N,ok},S} <- Nodes]),
+ [] = [{T,S} || {{_,E} = T, S} <- Nodes, E /= ok].
+
+slave(Name, Dirs) ->
+ add_pathsa(Dirs, ct_slave:start(Name)).
+
+add_pathsa(Dirs, {ok, Node}) ->
+ {Node, rpc:call(Node, code, add_pathsa, [Dirs])};
+add_pathsa(_, No) ->
+ {No, error}.
+
+%% ping/1
+%%
+%% Ensure the client nodes are connected since the sharing of
+%% transports is only between connected nodes.
+
+ping({?SERVER, _Nodes}) ->
+ [];
+
+ping({?CLIENT, Nodes}) ->
+ [N || {N,_} <- Nodes,
+ node() /= N,
+ pang <- [net_adm:ping(N)]];
+
+ping(Config) ->
+ Nodes = ?util:read_priv(Config, nodes),
+ [] = [{N,RC} || {N,S} <- Nodes,
+ RC <- [rpc:call(N, ?MODULE, ping, [{S, Nodes}])],
+ RC /= []].
+
%% start/1
%%
-%% Start three slave nodes, one that implement a Diameter server and
-%% two that implement a client.
+%% Start diameter services.
start(SvcName)
when is_atom(SvcName) ->
@@ -125,21 +166,10 @@ start(SvcName)
ok = diameter:start_service(SvcName, ?SERVICE((?L(SvcName))));
start(Config) ->
- Dir = filename:dirname(code:which(?MODULE)),
- Nodes = [{N, Svcname} || {Nodename, Svcname} <- ?NODES,
- N <- [slave(Nodename, Dir)]],
- [] = [RC || {N,S} <- Nodes,
- RC <- [rpc:call(N, ?MODULE, start, [S])],
- RC /= ok],
- ?util:write_priv(Config, nodes, Nodes).
-
-slave(Name, Dir) ->
- {ok, Node} = ct_slave:start(Name),
- ok = rpc:call(Node,
- code,
- add_pathsa,
- [[Dir, filename:join([Dir, "..", "ebin"])]]),
- Node.
+ Nodes = ?util:read_priv(Config, nodes),
+ [] = [{N,RC} || {N,S} <- Nodes,
+ RC <- [rpc:call(N, ?MODULE, start, [S])],
+ RC /= ok].
sequence() ->
sequence(sname()).
@@ -159,25 +189,13 @@ origin(Client) ->
"client" ++ N = ?L(Client),
list_to_integer(N).
-%% ping/1
-%%
-%% Ensure the client nodes are connected since the sharing of
-%% transports is only between connected nodes.
+peers() ->
+ peers(sname()).
-ping({?SERVER, _Nodes}) ->
- ok;
-
-ping({?CLIENT, Nodes}) ->
- {_, []} = {node(), [N || {N,_} <- Nodes,
- node() /= N,
- pang <- [net_adm:ping(N)]]},
- ok;
-
-ping(Config) ->
- Nodes = ?util:read_priv(Config, nodes),
- [] = [RC || {N,S} <- Nodes,
- RC <- [rpc:call(N, ?MODULE, ping, [{S, Nodes}])],
- RC /= ok].
+peers(server) -> true;
+peers(client0) -> [node() | nodes()];
+peers(client1) -> fun erlang:nodes/0;
+peers(client2) -> nodes().
%% connect/1
%%
@@ -194,23 +212,17 @@ connect({?CLIENT, Config}) ->
connect(Config) ->
Nodes = ?util:read_priv(Config, nodes),
- [] = [RC || {N,S} <- Nodes,
- RC <- [rpc:call(N, ?MODULE, connect, [{S,Config}])],
- RC /= ok].
+ [] = [{N,RC} || {N,S} <- Nodes,
+ RC <- [rpc:call(N, ?MODULE, connect, [{S,Config}])],
+ RC /= ok].
%% stop/1
%%
%% Stop the slave nodes.
-stop(Name)
- when is_atom(Name) ->
- {ok, _Node} = ct_slave:stop(Name),
- ok;
-
stop(_Config) ->
- [] = [RC || {N,_} <- ?NODES,
- RC <- [stop(N)],
- RC /= ok].
+ [] = [{N,E} || {N,_} <- ?NODES,
+ {error, _, _} = E <- [ct_slave:stop(N)]].
%% ===========================================================================
%% traffic testcases
@@ -336,12 +348,7 @@ request(#diameter_base_STR{'Termination-Cause' = ?TIMEOUT}, _) ->
request(#diameter_base_STR{'Termination-Cause' = ?MOVED}, Peer) ->
{TPid, #diameter_caps{origin_state_id = {_, [N]}}} = Peer,
- if N == 0 -> %% sent from the originating node ...
- {protocol_error, ?BUSY};
- true -> %% ... or through a remote node: force failover
- exit(TPid, kill),
- discard
- end;
+ fail(N, TPid);
request(#diameter_base_STR{'Session-Id' = SId}, {_, Caps}) ->
#diameter_caps{origin_host = {OH, _},
@@ -351,3 +358,10 @@ request(#diameter_base_STR{'Session-Id' = SId}, {_, Caps}) ->
'Session-Id' = SId,
'Origin-Host' = OH,
'Origin-Realm' = OR}}.
+
+fail(0, _) -> %% sent from the originating node ...
+ {protocol_error, ?BUSY};
+
+fail(_, TPid) -> %% ... or through a remote node: force failover
+ exit(TPid, kill),
+ discard.