aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/doc/src/diameter.xml90
-rw-r--r--lib/diameter/doc/src/diameter_app.xml34
-rw-r--r--lib/diameter/src/base/diameter.erl10
-rw-r--r--lib/diameter/src/base/diameter_config.erl29
-rw-r--r--lib/diameter/src/base/diameter_peer.erl8
-rw-r--r--lib/diameter/src/base/diameter_service.erl90
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl16
-rw-r--r--lib/diameter/test/diameter_distribution_SUITE.erl372
-rw-r--r--lib/diameter/test/diameter_util.erl3
-rw-r--r--lib/diameter/test/modules.mk1
10 files changed, 583 insertions, 70 deletions
diff --git a/lib/diameter/doc/src/diameter.xml b/lib/diameter/doc/src/diameter.xml
index 379e9f0738..8ad4af85a3 100644
--- a/lib/diameter/doc/src/diameter.xml
+++ b/lib/diameter/doc/src/diameter.xml
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="latin1" ?>
<!DOCTYPE erlref SYSTEM "erlref.dtd" [
+ <!ENTITY nodes
+ '<seealso marker="erts:erlang#nodes-0">erlang:nodes/0</seealso>'>
<!ENTITY make_ref
'<seealso marker="erts:erlang#make_ref-0">erlang:make_ref/0</seealso>'>
<!ENTITY transport_module
@@ -41,7 +43,7 @@ under the License.
<approved></approved>
<checked></checked>
<date></date>
-<rev>%VSN%</rev>
+<rev></rev>
<file>diameter.xml</file>
</header>
@@ -772,8 +774,8 @@ Application-Id AVP's in particular.</p>
| evaluable()}</c></tag>
<item>
<p>
-Specifies the degree to which multiple transport connections to the
-same peer are accepted by the service.</p>
+Specifies the degree to which the service allows multiple transport
+connections to the same peer.</p>
<p>
If type <c>[node()]</c> then a connection is rejected if another already
@@ -819,6 +821,88 @@ non-negative integer less than <c>1 bsl (32-N)</c>.</p>
<p>
Defaults to <c>{0,32}</c>.</p>
+
+<warning>
+<p>
+Multiple Erlang nodes implementing the same Diameter node should
+be configured with different sequence masks to ensure that each node
+uses a unique range of End-to-End and Hop-by-Hop identifiers for
+outgoing requests.</p>
+</warning>
+</item>
+
+<tag><c>{share_peers, boolean() | [node()] | evaluable()}</c></tag>
+<item>
+<p>
+Specifies nodes to which peer connections established on the local
+Erlang node are communicated.
+Shared peers become available in the remote candidates list passed to
+&app_pick_peer; callbacks on remote nodes whose services are
+configured to use them: see <c>use_shared_peers</c> below.</p>
+
+<p>
+If <c>false</c> then peers are not shared.
+If <c>[node()]</c> then peers are shared with the specified list of
+nodes.
+If <c>evaluable()</c> then peers are shared with the nodes returned
+by the specified function, evaluated whenever a peer connection
+becomes available or a remote service requests information about local
+connections.
+The value <c>true</c> is equivalent to <c>fun &nodes;</c>.
+The value <c>node()</c> in a node list is ignored, so a collection of
+services can all be configured to share with the same list of
+nodes.</p>
+
+<p>
+Defaults to <c>false</c>.</p>
+
+<note>
+<p>
+Peers are only shared with services of the same name for the purpose
+of sending outgoing requests.
+Since the value of the &application_opt; <c>alias</c>, passed to
+&call;, is the handle for identifying a peer as a suitable
+candidate, services that share peers must use the same aliases to
+identify their supported applications.
+They should typically also configure identical &capabilities;, since
+by sharing peer connections they are distributing the implementation
+of a single Diameter node across multiple Erlang nodes.</p>
+</note>
+</item>
+
+<tag><c>{use_shared_peers, boolean() | [node()] | evaluable()}</c></tag>
+<item>
+<p>
+Specifies nodes from which communicated peers are made available in
+the remote candidates list of &app_pick_peer; callbacks.</p>
+
+<p>
+If <c>false</c> then remote peers are not used.
+If <c>[node()]</c> then only peers from the specified list of nodes
+are used.
+If <c>evaluable()</c> then only peers returned by the specified
+function are used, evaluated whenever a remote service communicates
+information about an available peer connection.
+The value <c>true</c> is equivalent to <c>fun &nodes;</c>.
+The value <c>node()</c> in a node list is ignored.</p>
+
+<p>
+Defaults to <c>false</c>.</p>
+
+<note>
+<p>
+A service that does not use shared peers will always pass the empty
+list as the second argument of &app_pick_peer; callbacks.</p>
+</note>
+
+<warning>
+<p>
+Sending a request over a peer connection on a remote node is less
+efficient than sending it over a local connection.
+It may be preferable to make use of the &service_opt;
+<c>restrict_connections</c> and maintain a dedicated connection on
+each node from which requests are sent.</p>
+</warning>
</item>
</taglist>
diff --git a/lib/diameter/doc/src/diameter_app.xml b/lib/diameter/doc/src/diameter_app.xml
index d0f1b22ebd..d4fb792787 100644
--- a/lib/diameter/doc/src/diameter_app.xml
+++ b/lib/diameter/doc/src/diameter_app.xml
@@ -37,7 +37,7 @@ under the License.
<approved></approved>
<checked></checked>
<date></date>
-<rev>%REV%</rev>
+<rev></rev>
<file>diameter_app.xml</file>
</header>
@@ -196,7 +196,8 @@ process.</p>
</type>
<desc>
<p>
-Invoked to signal the availability of a peer connection.
+Invoked to signal the availability of a peer connection on the local
+Erlang node.
In particular, capabilities exchange with the peer has indicated
support for the application in question, the RFC 3539 watchdog state
machine for the connection has reached state <c>OKAY</c> and Diameter
@@ -230,8 +231,8 @@ handled independently of &peer_up; and &peer_down;.</p>
</type>
<desc>
<p>
-Invoked to signal that a peer connection is no longer available
-following a previous call to &peer_up;.
+Invoked to signal that a peer connection on the local Erlang node is
+no longer available following a previous call to &peer_up;.
In particular, that the RFC 3539 watchdog state machine for the
connection has left state <c>OKAY</c> and the peer will no longer be a
candidate in &pick_peer; callbacks.</p>
@@ -240,11 +241,11 @@ candidate in &pick_peer; callbacks.</p>
</func>
<func>
-<name>Mod:pick_peer(Candidates, _Reserved, SvcName, State)
+<name>Mod:pick_peer(LocalCandidates, RemoteCandidates, SvcName, State)
-> Selection | false</name>
<fsummary>Select a target peer for an outgoing request.</fsummary>
<type>
-<v>Candidates = [&peer;]</v>
+<v>LocalCandidates = RemoteCandidates = [&peer;]</v>
<v>SvcName = &mod_service_name;</v>
<v>State = NewState = &state;</v>
<v>Selection = {ok, Peer} | {Peer, NewState}</v>
@@ -257,7 +258,7 @@ peer for an outgoing request.
The return value indicates the selected peer.</p>
<p>
-The candidate list contains only those peers that have advertised
+The candidate lists contain only those peers that have advertised
support for the Diameter application in question during capabilities
exchange, that have not be excluded by a <c>filter</c> option in
the call to &mod_call;
@@ -266,7 +267,11 @@ The order of the elements is unspecified except that any
peers whose Origin-Host and Origin-Realm matches that of the
outgoing request (in the sense of a <c>{filter, {all, [host, realm]}}</c>
option to &mod_call;)
-will be placed at the head of the list.</p>
+will be placed at the head of the list.
+<c>LocalCandidates</c> contains peers whose transport process resides
+on the local Erlang node while
+<c>RemoteCandidates</c> contains peers that have been communicated
+from other nodes by services of the same name.</p>
<p>
A callback that returns a peer() will be followed by a
@@ -286,16 +291,19 @@ retransmission to an alternate peer is abandoned if an answer is
received from a previously selected peer.</p>
<p>
-Returning <c>false</c> or <c>{false, NewState}</c> causes <c>{error,
-no_connection}</c> to be returned from &mod_call;.</p>
-
-<p>
The return values <c>false</c> and <c>{false, State}</c> (that is,
<c>NewState = State</c>) are equivalent, as are <c>{ok, Peer}</c> and
<c>{Peer, State}</c>.</p>
<note>
<p>
+The &mod_service_opt; <c>use_shared_peers</c> determines whether or
+not a service uses peers shared from other nodes.
+If not then <c>RemoteCandidates</c> is the empty list.</p>
+</note>
+
+<warning>
+<p>
The return value <c>{Peer, NewState}</c> is only allowed if
the Diameter application in question was configured with the
&mod_application_opt; <c>{call_mutates_state, true}</c>.
@@ -303,7 +311,7 @@ Otherwise, the <c>State</c> argument is always
the intial value as configured on the application, not any subsequent
value returned by a &peer_up;
or &peer_down; callback.</p>
-</note>
+</warning>
</desc>
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl
index c67fba5f89..189f2b01b9 100644
--- a/lib/diameter/src/base/diameter.erl
+++ b/lib/diameter/src/base/diameter.erl
@@ -45,6 +45,7 @@
-export_type([evaluable/0,
restriction/0,
+ remotes/0,
sequence/0,
app_alias/0,
service_name/0,
@@ -292,13 +293,20 @@ call(SvcName, App, Message) ->
| [node()]
| evaluable().
+-type remotes()
+ :: boolean()
+ | [node()]
+ | evaluable().
+
%% Options passed to start_service/2
-type service_opt()
:: capability()
| {application, [application_opt()]}
| {restrict_connections, restriction()}
- | {sequence, sequence() | evaluable()}.
+ | {sequence, sequence() | evaluable()}
+ | {share_peers, remotes()}
+ | {use_shared_peers, remotes()}.
-type application_opt()
:: {alias, app_alias()}
diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl
index 9f73815756..3a2e0d2140 100644
--- a/lib/diameter/src/base/diameter_config.erl
+++ b/lib/diameter/src/base/diameter_config.erl
@@ -573,7 +573,6 @@ make_config(SvcName, Opts) ->
{false, monitor},
{?NOMASK, sequence},
{nodes, restrict_connections}]),
- %% share_peers and use_shared_peers are currently undocumented.
#service{name = SvcName,
rec = #diameter_service{applications = Apps,
@@ -588,23 +587,31 @@ opt(K, false = B)
B;
opt(K, true = B)
- when K == share_peer;
+ when K == share_peers;
K == use_shared_peers ->
B;
-opt(monitor, P)
- when is_pid(P) ->
- P;
-
opt(restrict_connections, T)
when T == node;
- T == nodes;
- T == [];
- is_atom(hd(T)) ->
+ T == nodes ->
+ T;
+
+opt(K, T)
+ when (K == share_peers
+ orelse K == use_shared_peers
+ orelse K == restrict_connections), ([] == T
+ orelse is_atom(hd(T))) ->
T;
-opt(restrict_connections = K, F) ->
- try diameter_lib:eval(F) of %% no guarantee that it won't fail later
+opt(monitor, P)
+ when is_pid(P) ->
+ P;
+
+opt(K, F)
+ when K == restrict_connections;
+ K == share_peers;
+ K == use_shared_peers ->
+ try diameter_lib:eval(F) of %% but no guarantee that it won't fail later
Nodes when is_list(Nodes) ->
F;
V ->
diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl
index 130bedda84..dfc76eb76e 100644
--- a/lib/diameter/src/base/diameter_peer.erl
+++ b/lib/diameter/src/base/diameter_peer.erl
@@ -31,7 +31,7 @@
send/2,
close/1,
abort/1,
- notify/2]).
+ notify/3]).
%% Server start.
-export([start_link/0]).
@@ -63,11 +63,11 @@
-define(DEFAULT_TTMO, infinity).
%%% ---------------------------------------------------------------------------
-%%% # notify/2
+%%% # notify/3
%%% ---------------------------------------------------------------------------
-notify(SvcName, T) ->
- rpc:abcast(nodes(), ?SERVER, {notify, SvcName, T}).
+notify(Nodes, SvcName, T) ->
+ rpc:abcast(Nodes, ?SERVER, {notify, SvcName, T}).
%%% ---------------------------------------------------------------------------
%%% # start/1
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index f1342df16c..255a3a44fd 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -125,9 +125,9 @@
monitor = false :: false | pid(), %% process to die with
options
:: [{sequence, diameter:sequence()} %% sequence mask
- | {restrict_connections, diameter:restriction()}
- | {share_peers, boolean()} %% broadcast peers to remote nodes?
- | {use_shared_peers, boolean()}]}).%% use broadcasted peers?
+ | {share_peers, diameter:remotes()} %% broadcast to
+ | {use_shared_peers, diameter:remotes()} %% use from
+ | {restrict_connections, diameter:restriction()}]}).
%% shared_peers reflects the peers broadcast from remote nodes.
%% Record representing an RFC 3539 watchdog process implemented by
@@ -681,11 +681,9 @@ mref(false = No) ->
mref(P) ->
erlang:monitor(process, P).
-init_shared(#state{options = [_, _, {_, true} | _],
+init_shared(#state{options = [_, _, {_,T} | _],
service_name = Svc}) ->
- diameter_peer:notify(Svc, {service, self()});
-init_shared(#state{options = [_, _, {_, false} | _]}) ->
- ok.
+ notify(T, Svc, {service, self()}).
init_mod(#diameter_app{alias = Alias,
init_state = S}) ->
@@ -698,6 +696,37 @@ get_value(Key, Vs) ->
{_, V} = lists:keyfind(Key, 1, Vs),
V.
+notify(Share, SvcName, T) ->
+ Nodes = remotes(Share),
+ [] /= Nodes andalso diameter_peer:notify(Nodes, SvcName, T).
+%% Test for the empty list for upgrade reasons: there's no
+%% diameter_peer:notify/3 in old code so no call means no load order
+%% requirement.
+
+remotes(false) ->
+ [];
+
+remotes(true) ->
+ nodes();
+
+remotes(Nodes)
+ when is_atom(hd(Nodes));
+ Nodes == [] ->
+ Nodes;
+
+remotes(F) ->
+ try diameter_lib:eval(F) of
+ L when is_list(L) ->
+ L;
+ T ->
+ diameter_lib:error_report({invalid_return, T}, F),
+ []
+ catch
+ E:R ->
+ diameter_lib:error_report({failure, {E, R, ?STACK}}, F),
+ []
+ end.
+
%% ---------------------------------------------------------------------------
%% # start/3
%% ---------------------------------------------------------------------------
@@ -1233,12 +1262,12 @@ report_status(Status,
peer = TPid,
type = Type,
options = Opts},
- #peer{apps = [_|_] = As,
+ #peer{apps = [_|_] = Apps,
caps = Caps},
#state{service_name = SvcName}
= S,
Extra) ->
- share_peer(Status, Caps, As, TPid, S),
+ share_peer(Status, Caps, Apps, TPid, S),
Info = [Status, Ref, {TPid, Caps}, {type(Type), Opts} | Extra],
send_event(SvcName, list_to_tuple(Info)).
@@ -1255,9 +1284,9 @@ send_event(#diameter_event{service = SvcName} = E) ->
%% # share_peer/5
%% ---------------------------------------------------------------------------
-share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _],
- service_name = Svc}) ->
- diameter_peer:notify(Svc, {peer, TPid, Aliases, Caps});
+share_peer(up, Caps, Apps, TPid, #state{options = [_, {_,T} | _],
+ service_name = Svc}) ->
+ notify(T, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps});
share_peer(_, _, _, _, _) ->
ok.
@@ -1266,34 +1295,34 @@ share_peer(_, _, _, _, _) ->
%% # share_peers/2
%% ---------------------------------------------------------------------------
-share_peers(Pid, #state{options = [_, {_, true} | _],
- local_peers = PDict}) ->
- ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict);
-
-share_peers(_, _) ->
- ok.
+share_peers(Pid, #state{options = [_, {_,T} | _], local_peers = PDict}) ->
+ is_remote(Pid, T)
+ andalso ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict).
sp(Pid, Alias, Peers) ->
lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers).
+is_remote(Pid, T) ->
+ Node = node(Pid),
+ Node /= node() andalso lists:member(Node, remotes(T)).
+
%% ---------------------------------------------------------------------------
%% # remote_peer_up/4
%% ---------------------------------------------------------------------------
-remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _],
- service = Svc,
- shared_peers = PDict}) ->
+remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) ->
+ is_remote(Pid, T)
+ andalso rpu(Pid, Aliases, Caps, S).
+
+rpu(Pid, Aliases, Caps, #state{service = Svc, shared_peers = PDict}) ->
#diameter_service{applications = Apps} = Svc,
Key = #diameter_app.alias,
- As = lists:filter(fun(A) -> lists:keymember(A, Key, Apps) end, Aliases),
- rpu(Pid, Caps, PDict, As);
-
-remote_peer_up(_, _, _, #state{options = [_, _, {_, false} | _]}) ->
- ok.
+ F = fun(A) -> lists:keymember(A, Key, Apps) end,
+ rpu(Pid, lists:filter(F, Aliases), Caps, PDict);
-rpu(_, _, PDict, []) ->
- PDict;
-rpu(Pid, Caps, PDict, Aliases) ->
+rpu(_, [] = No, _, _) ->
+ No;
+rpu(Pid, Aliases, Caps, PDict) ->
erlang:monitor(process, Pid),
T = {Pid, Caps},
lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases).
@@ -1302,8 +1331,7 @@ rpu(Pid, Caps, PDict, Aliases) ->
%% # remote_peer_down/2
%% ---------------------------------------------------------------------------
-remote_peer_down(Pid, #state{options = [_, _, {_, true} | _],
- shared_peers = PDict}) ->
+remote_peer_down(Pid, #state{shared_peers = PDict}) ->
lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)).
rpd(Pid, Alias, PDict) ->
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index f527f7c754..25b902e3f2 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -1479,12 +1479,14 @@ send({TPid, Pkt, #request{handler = Pid} = Req, SvcName, Timeout, TRef}) ->
Req#request{handler = self()},
SvcName,
Timeout),
- Pid ! reref(receive T -> T end, Ref, TRef).
-
-reref({T, Ref, R}, Ref, TRef) ->
- {T, TRef, R};
-reref(T, _, _) ->
- T.
+ receive
+ {answer, _, _, _, _} = A ->
+ Pid ! A;
+ {failover = T, Ref} ->
+ Pid ! {T, TRef};
+ T ->
+ exit({timeout, Ref, TPid} = T)
+ end.
%% send/2
@@ -1559,7 +1561,7 @@ resend_request(Pkt0,
store_request(TPid, Bin, Req, Timeout) ->
Seqs = diameter_codec:sequence_numbers(Bin),
- TRef = erlang:start_timer(Timeout, self(), timeout),
+ TRef = erlang:start_timer(Timeout, self(), TPid),
ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}),
ets:member(?REQUEST_TABLE, TPid)
orelse (self() ! {failover, TRef}), %% failover/1 may have missed
diff --git a/lib/diameter/test/diameter_distribution_SUITE.erl b/lib/diameter/test/diameter_distribution_SUITE.erl
new file mode 100644
index 0000000000..01d3507b27
--- /dev/null
+++ b/lib/diameter/test/diameter_distribution_SUITE.erl
@@ -0,0 +1,372 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2013. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% Tests of traffic between two Diameter nodes, the client being
+%% spread across three Erlang nodes.
+%%
+
+-module(diameter_distribution_SUITE).
+
+-export([suite/0,
+ all/0]).
+
+%% testcases
+-export([enslave/1,
+ ping/1,
+ start/1,
+ connect/1,
+ send_local/1,
+ send_remote/1,
+ send_timeout/1,
+ send_failover/1,
+ stop/1]).
+
+%% diameter callbacks
+-export([peer_up/3,
+ peer_down/3,
+ pick_peer/5,
+ prepare_request/4,
+ prepare_retransmit/4,
+ handle_answer/5,
+ handle_error/5,
+ handle_request/3]).
+
+-export([call/1]).
+
+-include("diameter.hrl").
+-include("diameter_gen_base_rfc6733.hrl").
+
+%% ===========================================================================
+
+-define(util, diameter_util).
+
+-define(CLIENT, 'CLIENT').
+-define(SERVER, 'SERVER').
+-define(REALM, "erlang.org").
+-define(DICT, diameter_gen_base_rfc6733).
+-define(ADDR, {127,0,0,1}).
+
+%% Config for diameter:start_service/2.
+-define(SERVICE(Host),
+ [{'Origin-Host', Host ++ [$.|?REALM]},
+ {'Origin-Realm', ?REALM},
+ {'Host-IP-Address', [?ADDR]},
+ {'Vendor-Id', 12345},
+ {'Product-Name', "OTP/diameter"},
+ {'Auth-Application-Id', [?DICT:id()]},
+ {'Origin-State-Id', origin()},
+ {share_peers, peers()},
+ {use_shared_peers, peers()},
+ {restrict_connections, false},
+ {sequence, fun sequence/0},
+ {application, [{dictionary, ?DICT},
+ {module, ?MODULE},
+ {request_errors, callback},
+ {answer_errors, callback}]}]).
+
+-define(SUCCESS, 2001).
+-define(BUSY, 3004).
+-define(LOGOUT, ?'DIAMETER_BASE_TERMINATION-CAUSE_LOGOUT').
+-define(MOVED, ?'DIAMETER_BASE_TERMINATION-CAUSE_USER_MOVED').
+-define(TIMEOUT, ?'DIAMETER_BASE_TERMINATION-CAUSE_SESSION_TIMEOUT').
+
+-define(L, atom_to_list).
+-define(A, list_to_atom).
+
+%% The order here is significant and causes the server to listen
+%% before the clients connect.
+-define(NODES, [{server, ?SERVER},
+ {client0, ?CLIENT},
+ {client1, ?CLIENT},
+ {client2, ?CLIENT}]).
+
+%% Options to ct_slave:start/2.
+-define(TIMEOUTS, [{T, 15000} || T <- [boot_timeout,
+ init_timeout,
+ start_timeout]]).
+
+%% ===========================================================================
+
+suite() ->
+ [{timetrap, {seconds, 60}}].
+
+all() ->
+ [enslave,
+ ping,
+ start,
+ connect,
+ send_local,
+ send_remote,
+ send_timeout,
+ send_failover,
+ stop].
+
+%% ===========================================================================
+%% start/stop testcases
+
+%% enslave/1
+%%
+%% Start four slave nodes, one to implement a Diameter server,
+%% two three to implement a client.
+
+enslave(Config) ->
+ Here = filename:dirname(code:which(?MODULE)),
+ Ebin = filename:join([Here, "..", "ebin"]),
+ Dirs = [Here, Ebin],
+ Nodes = [{N,S} || {M,S} <- ?NODES, N <- [slave(M, Dirs)]],
+ ?util:write_priv(Config, nodes, [{N,S} || {{N,ok},S} <- Nodes]),
+ [] = [{T,S} || {{_,E} = T, S} <- Nodes, E /= ok].
+
+slave(Name, Dirs) ->
+ add_pathsa(Dirs, ct_slave:start(Name, ?TIMEOUTS)).
+
+add_pathsa(Dirs, {ok, Node}) ->
+ {Node, rpc:call(Node, code, add_pathsa, [Dirs])};
+add_pathsa(_, No) ->
+ {No, error}.
+
+%% ping/1
+%%
+%% Ensure the client nodes are connected since the sharing of
+%% transports is only between connected nodes.
+
+ping({?SERVER, _Nodes}) ->
+ [];
+
+ping({?CLIENT, Nodes}) ->
+ [N || {N,_} <- Nodes,
+ node() /= N,
+ pang <- [net_adm:ping(N)]];
+
+ping(Config) ->
+ Nodes = ?util:read_priv(Config, nodes),
+ [] = [{N,RC} || {N,S} <- Nodes,
+ RC <- [rpc:call(N, ?MODULE, ping, [{S, Nodes}])],
+ RC /= []].
+
+%% start/1
+%%
+%% Start diameter services.
+
+start(SvcName)
+ when is_atom(SvcName) ->
+ ok = diameter:start(),
+ ok = diameter:start_service(SvcName, ?SERVICE((?L(SvcName))));
+
+start(Config) ->
+ Nodes = ?util:read_priv(Config, nodes),
+ [] = [{N,RC} || {N,S} <- Nodes,
+ RC <- [rpc:call(N, ?MODULE, start, [S])],
+ RC /= ok].
+
+sequence() ->
+ sequence(sname()).
+
+sequence(server) ->
+ {0,32};
+sequence(Client) ->
+ "client" ++ N = ?L(Client),
+ {list_to_integer(N), 30}.
+
+origin() ->
+ origin(sname()).
+
+origin(server) ->
+ 99;
+origin(Client) ->
+ "client" ++ N = ?L(Client),
+ list_to_integer(N).
+
+peers() ->
+ peers(sname()).
+
+peers(server) -> true;
+peers(client0) -> [node() | nodes()];
+peers(client1) -> fun erlang:nodes/0;
+peers(client2) -> nodes().
+
+%% connect/1
+%%
+%% Establish one connection to the server from each of the client
+%% nodes.
+
+connect({?SERVER, Config}) ->
+ ?util:write_priv(Config, lref, {node(), ?util:listen(?SERVER, tcp)}),
+ ok;
+
+connect({?CLIENT, Config}) ->
+ ?util:connect(?CLIENT, tcp, ?util:read_priv(Config, lref)),
+ ok;
+
+connect(Config) ->
+ Nodes = ?util:read_priv(Config, nodes),
+ [] = [{N,RC} || {N,S} <- Nodes,
+ RC <- [rpc:call(N, ?MODULE, connect, [{S,Config}])],
+ RC /= ok].
+
+%% stop/1
+%%
+%% Stop the slave nodes.
+
+stop(_Config) ->
+ [] = [{N,E} || {N,_} <- ?NODES,
+ {error, _, _} = E <- [ct_slave:stop(N)]].
+
+%% ===========================================================================
+%% traffic testcases
+
+%% send_local/1
+%%
+%% Send a request from the first client node, using a the local
+%% transport.
+
+send_local(Config) ->
+ #diameter_base_STA{'Result-Code' = ?SUCCESS}
+ = send(Config, local, str(?LOGOUT)).
+
+%% send_remote/1
+%%
+%% Send a request from the first client node, using a transport on the
+%% another node.
+
+send_remote(Config) ->
+ #diameter_base_STA{'Result-Code' = ?SUCCESS}
+ = send(Config, remote, str(?LOGOUT)).
+
+%% send_timeout/1
+%%
+%% Send a request that the server discards.
+
+send_timeout(Config) ->
+ {error, timeout} = send(Config, remote, str(?TIMEOUT)).
+
+%% send_failover/1
+%%
+%% Send a request that causes the server to remote transports down.
+
+send_failover(Config) ->
+ #'diameter_base_answer-message'{'Result-Code' = ?BUSY}
+ = send(Config, remote, str(?MOVED)).
+
+%% ===========================================================================
+
+str(Cause) ->
+ #diameter_base_STR{'Destination-Realm' = ?REALM,
+ 'Auth-Application-Id' = ?DICT:id(),
+ 'Termination-Cause' = Cause}.
+
+%% send/2
+
+send(Config, Where, Req) ->
+ [_, {Node, _} | _] = ?util:read_priv(Config, nodes) ,
+ rpc:call(Node, ?MODULE, call, [{Where, Req}]).
+
+%% call/1
+
+call({Where, Req}) ->
+ diameter:call(?CLIENT, ?DICT, Req, [{extra, [{Where, sname()}]}]).
+
+%% sname/0
+
+sname() ->
+ ?A(hd(string:tokens(?L(node()), "@"))).
+
+%% ===========================================================================
+%% diameter callbacks
+
+%% peer_up/3
+
+peer_up(_SvcName, _Peer, State) ->
+ State.
+
+%% peer_down/3
+
+peer_down(_SvcName, _Peer, State) ->
+ State.
+
+%% pick_peer/4
+
+pick_peer([LP], [_, _], ?CLIENT, _State, {local, client0}) ->
+ {ok, LP};
+
+pick_peer([_], [RP | _], ?CLIENT, _State, {remote, client0}) ->
+ {ok, RP};
+
+pick_peer([LP], [], ?CLIENT, _State, {remote, client0}) ->
+ {ok, LP}.
+
+%% prepare_request/4
+
+prepare_request(Pkt, ?CLIENT, {_Ref, Caps}, {_, client0}) ->
+ #diameter_packet{msg = Req}
+ = Pkt,
+ #diameter_caps{origin_host = {OH, _},
+ origin_realm = {OR, _}}
+ = Caps,
+ {send, Req#diameter_base_STR{'Origin-Host' = OH,
+ 'Origin-Realm' = OR,
+ 'Session-Id' = diameter:session_id(OH)}}.
+
+prepare_retransmit(Pkt, ?CLIENT, _, {_, client0}) ->
+ #diameter_packet{msg = #diameter_base_STR{'Termination-Cause' = ?MOVED}}
+ = Pkt, %% assert
+ {send, Pkt}.
+
+%% handle_answer/5
+
+handle_answer(Pkt, _Req, ?CLIENT, _Peer, {_, client0}) ->
+ #diameter_packet{msg = Rec, errors = []} = Pkt,
+ Rec.
+
+%% handle_error/5
+
+handle_error(Reason, _Req, ?CLIENT, _Peer, {_, client0}) ->
+ {error, Reason}.
+
+%% handle_request/3
+
+handle_request(Pkt, ?SERVER, Peer) ->
+ server = sname(), %% assert
+ #diameter_packet{msg = Req}
+ = Pkt,
+ request(Req, Peer).
+
+request(#diameter_base_STR{'Termination-Cause' = ?TIMEOUT}, _) ->
+ discard;
+
+request(#diameter_base_STR{'Termination-Cause' = ?MOVED}, Peer) ->
+ {TPid, #diameter_caps{origin_state_id = {_, [N]}}} = Peer,
+ fail(N, TPid);
+
+request(#diameter_base_STR{'Session-Id' = SId}, {_, Caps}) ->
+ #diameter_caps{origin_host = {OH, _},
+ origin_realm = {OR, _}}
+ = Caps,
+ {reply, #diameter_base_STA{'Result-Code' = ?SUCCESS,
+ 'Session-Id' = SId,
+ 'Origin-Host' = OH,
+ 'Origin-Realm' = OR}}.
+
+fail(0, _) -> %% sent from the originating node ...
+ {protocol_error, ?BUSY};
+
+fail(_, TPid) -> %% ... or through a remote node: force failover
+ exit(TPid, kill),
+ discard.
diff --git a/lib/diameter/test/diameter_util.erl b/lib/diameter/test/diameter_util.erl
index 5af4ad9ba5..a9872f32e1 100644
--- a/lib/diameter/test/diameter_util.erl
+++ b/lib/diameter/test/diameter_util.erl
@@ -258,6 +258,9 @@ path(Config, Name) ->
lport(M, Ref) ->
lport(M, Ref, 1).
+lport(M, {Node, Ref}, Tries) ->
+ rpc:call(Node, ?MODULE, lport, [M, Ref, Tries]);
+
lport(M, Ref, Tries) ->
lp(tmod(M), Ref, Tries).
diff --git a/lib/diameter/test/modules.mk b/lib/diameter/test/modules.mk
index c4a713fb10..beff588a02 100644
--- a/lib/diameter/test/modules.mk
+++ b/lib/diameter/test/modules.mk
@@ -31,6 +31,7 @@ MODULES = \
diameter_codec_test \
diameter_compiler_SUITE \
diameter_dict_SUITE \
+ diameter_distribution_SUITE \
diameter_dpr_SUITE \
diameter_event_SUITE \
diameter_failover_SUITE \