aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base')
-rw-r--r--lib/diameter/src/base/diameter.erl10
-rw-r--r--lib/diameter/src/base/diameter_config.erl29
-rw-r--r--lib/diameter/src/base/diameter_peer.erl8
-rw-r--r--lib/diameter/src/base/diameter_service.erl90
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl16
5 files changed, 99 insertions, 54 deletions
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl
index c67fba5f89..189f2b01b9 100644
--- a/lib/diameter/src/base/diameter.erl
+++ b/lib/diameter/src/base/diameter.erl
@@ -45,6 +45,7 @@
-export_type([evaluable/0,
restriction/0,
+ remotes/0,
sequence/0,
app_alias/0,
service_name/0,
@@ -292,13 +293,20 @@ call(SvcName, App, Message) ->
| [node()]
| evaluable().
+-type remotes()
+ :: boolean()
+ | [node()]
+ | evaluable().
+
%% Options passed to start_service/2
-type service_opt()
:: capability()
| {application, [application_opt()]}
| {restrict_connections, restriction()}
- | {sequence, sequence() | evaluable()}.
+ | {sequence, sequence() | evaluable()}
+ | {share_peers, remotes()}
+ | {use_shared_peers, remotes()}.
-type application_opt()
:: {alias, app_alias()}
diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl
index 9f73815756..3a2e0d2140 100644
--- a/lib/diameter/src/base/diameter_config.erl
+++ b/lib/diameter/src/base/diameter_config.erl
@@ -573,7 +573,6 @@ make_config(SvcName, Opts) ->
{false, monitor},
{?NOMASK, sequence},
{nodes, restrict_connections}]),
- %% share_peers and use_shared_peers are currently undocumented.
#service{name = SvcName,
rec = #diameter_service{applications = Apps,
@@ -588,23 +587,31 @@ opt(K, false = B)
B;
opt(K, true = B)
- when K == share_peer;
+ when K == share_peers;
K == use_shared_peers ->
B;
-opt(monitor, P)
- when is_pid(P) ->
- P;
-
opt(restrict_connections, T)
when T == node;
- T == nodes;
- T == [];
- is_atom(hd(T)) ->
+ T == nodes ->
+ T;
+
+opt(K, T)
+ when (K == share_peers
+ orelse K == use_shared_peers
+ orelse K == restrict_connections), ([] == T
+ orelse is_atom(hd(T))) ->
T;
-opt(restrict_connections = K, F) ->
- try diameter_lib:eval(F) of %% no guarantee that it won't fail later
+opt(monitor, P)
+ when is_pid(P) ->
+ P;
+
+opt(K, F)
+ when K == restrict_connections;
+ K == share_peers;
+ K == use_shared_peers ->
+ try diameter_lib:eval(F) of %% but no guarantee that it won't fail later
Nodes when is_list(Nodes) ->
F;
V ->
diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl
index 130bedda84..dfc76eb76e 100644
--- a/lib/diameter/src/base/diameter_peer.erl
+++ b/lib/diameter/src/base/diameter_peer.erl
@@ -31,7 +31,7 @@
send/2,
close/1,
abort/1,
- notify/2]).
+ notify/3]).
%% Server start.
-export([start_link/0]).
@@ -63,11 +63,11 @@
-define(DEFAULT_TTMO, infinity).
%%% ---------------------------------------------------------------------------
-%%% # notify/2
+%%% # notify/3
%%% ---------------------------------------------------------------------------
-notify(SvcName, T) ->
- rpc:abcast(nodes(), ?SERVER, {notify, SvcName, T}).
+notify(Nodes, SvcName, T) ->
+ rpc:abcast(Nodes, ?SERVER, {notify, SvcName, T}).
%%% ---------------------------------------------------------------------------
%%% # start/1
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index f1342df16c..255a3a44fd 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -125,9 +125,9 @@
monitor = false :: false | pid(), %% process to die with
options
:: [{sequence, diameter:sequence()} %% sequence mask
- | {restrict_connections, diameter:restriction()}
- | {share_peers, boolean()} %% broadcast peers to remote nodes?
- | {use_shared_peers, boolean()}]}).%% use broadcasted peers?
+ | {share_peers, diameter:remotes()} %% broadcast to
+ | {use_shared_peers, diameter:remotes()} %% use from
+ | {restrict_connections, diameter:restriction()}]}).
%% shared_peers reflects the peers broadcast from remote nodes.
%% Record representing an RFC 3539 watchdog process implemented by
@@ -681,11 +681,9 @@ mref(false = No) ->
mref(P) ->
erlang:monitor(process, P).
-init_shared(#state{options = [_, _, {_, true} | _],
+init_shared(#state{options = [_, _, {_,T} | _],
service_name = Svc}) ->
- diameter_peer:notify(Svc, {service, self()});
-init_shared(#state{options = [_, _, {_, false} | _]}) ->
- ok.
+ notify(T, Svc, {service, self()}).
init_mod(#diameter_app{alias = Alias,
init_state = S}) ->
@@ -698,6 +696,37 @@ get_value(Key, Vs) ->
{_, V} = lists:keyfind(Key, 1, Vs),
V.
+notify(Share, SvcName, T) ->
+ Nodes = remotes(Share),
+ [] /= Nodes andalso diameter_peer:notify(Nodes, SvcName, T).
+%% Test for the empty list for upgrade reasons: there's no
+%% diameter_peer:notify/3 in old code so no call means no load order
+%% requirement.
+
+remotes(false) ->
+ [];
+
+remotes(true) ->
+ nodes();
+
+remotes(Nodes)
+ when is_atom(hd(Nodes));
+ Nodes == [] ->
+ Nodes;
+
+remotes(F) ->
+ try diameter_lib:eval(F) of
+ L when is_list(L) ->
+ L;
+ T ->
+ diameter_lib:error_report({invalid_return, T}, F),
+ []
+ catch
+ E:R ->
+ diameter_lib:error_report({failure, {E, R, ?STACK}}, F),
+ []
+ end.
+
%% ---------------------------------------------------------------------------
%% # start/3
%% ---------------------------------------------------------------------------
@@ -1233,12 +1262,12 @@ report_status(Status,
peer = TPid,
type = Type,
options = Opts},
- #peer{apps = [_|_] = As,
+ #peer{apps = [_|_] = Apps,
caps = Caps},
#state{service_name = SvcName}
= S,
Extra) ->
- share_peer(Status, Caps, As, TPid, S),
+ share_peer(Status, Caps, Apps, TPid, S),
Info = [Status, Ref, {TPid, Caps}, {type(Type), Opts} | Extra],
send_event(SvcName, list_to_tuple(Info)).
@@ -1255,9 +1284,9 @@ send_event(#diameter_event{service = SvcName} = E) ->
%% # share_peer/5
%% ---------------------------------------------------------------------------
-share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _],
- service_name = Svc}) ->
- diameter_peer:notify(Svc, {peer, TPid, Aliases, Caps});
+share_peer(up, Caps, Apps, TPid, #state{options = [_, {_,T} | _],
+ service_name = Svc}) ->
+ notify(T, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps});
share_peer(_, _, _, _, _) ->
ok.
@@ -1266,34 +1295,34 @@ share_peer(_, _, _, _, _) ->
%% # share_peers/2
%% ---------------------------------------------------------------------------
-share_peers(Pid, #state{options = [_, {_, true} | _],
- local_peers = PDict}) ->
- ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict);
-
-share_peers(_, _) ->
- ok.
+share_peers(Pid, #state{options = [_, {_,T} | _], local_peers = PDict}) ->
+ is_remote(Pid, T)
+ andalso ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict).
sp(Pid, Alias, Peers) ->
lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers).
+is_remote(Pid, T) ->
+ Node = node(Pid),
+ Node /= node() andalso lists:member(Node, remotes(T)).
+
%% ---------------------------------------------------------------------------
%% # remote_peer_up/4
%% ---------------------------------------------------------------------------
-remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _],
- service = Svc,
- shared_peers = PDict}) ->
+remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) ->
+ is_remote(Pid, T)
+ andalso rpu(Pid, Aliases, Caps, S).
+
+rpu(Pid, Aliases, Caps, #state{service = Svc, shared_peers = PDict}) ->
#diameter_service{applications = Apps} = Svc,
Key = #diameter_app.alias,
- As = lists:filter(fun(A) -> lists:keymember(A, Key, Apps) end, Aliases),
- rpu(Pid, Caps, PDict, As);
-
-remote_peer_up(_, _, _, #state{options = [_, _, {_, false} | _]}) ->
- ok.
+ F = fun(A) -> lists:keymember(A, Key, Apps) end,
+ rpu(Pid, lists:filter(F, Aliases), Caps, PDict);
-rpu(_, _, PDict, []) ->
- PDict;
-rpu(Pid, Caps, PDict, Aliases) ->
+rpu(_, [] = No, _, _) ->
+ No;
+rpu(Pid, Aliases, Caps, PDict) ->
erlang:monitor(process, Pid),
T = {Pid, Caps},
lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases).
@@ -1302,8 +1331,7 @@ rpu(Pid, Caps, PDict, Aliases) ->
%% # remote_peer_down/2
%% ---------------------------------------------------------------------------
-remote_peer_down(Pid, #state{options = [_, _, {_, true} | _],
- shared_peers = PDict}) ->
+remote_peer_down(Pid, #state{shared_peers = PDict}) ->
lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)).
rpd(Pid, Alias, PDict) ->
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index f527f7c754..25b902e3f2 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -1479,12 +1479,14 @@ send({TPid, Pkt, #request{handler = Pid} = Req, SvcName, Timeout, TRef}) ->
Req#request{handler = self()},
SvcName,
Timeout),
- Pid ! reref(receive T -> T end, Ref, TRef).
-
-reref({T, Ref, R}, Ref, TRef) ->
- {T, TRef, R};
-reref(T, _, _) ->
- T.
+ receive
+ {answer, _, _, _, _} = A ->
+ Pid ! A;
+ {failover = T, Ref} ->
+ Pid ! {T, TRef};
+ T ->
+ exit({timeout, Ref, TPid} = T)
+ end.
%% send/2
@@ -1559,7 +1561,7 @@ resend_request(Pkt0,
store_request(TPid, Bin, Req, Timeout) ->
Seqs = diameter_codec:sequence_numbers(Bin),
- TRef = erlang:start_timer(Timeout, self(), timeout),
+ TRef = erlang:start_timer(Timeout, self(), TPid),
ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}),
ets:member(?REQUEST_TABLE, TPid)
orelse (self() ! {failover, TRef}), %% failover/1 may have missed