aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/examples/code/node.erl29
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl20
-rw-r--r--lib/diameter/src/base/diameter_service.erl25
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl4
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl2
5 files changed, 67 insertions, 13 deletions
diff --git a/lib/diameter/examples/code/node.erl b/lib/diameter/examples/code/node.erl
index 246be4194b..fc5830f8e2 100644
--- a/lib/diameter/examples/code/node.erl
+++ b/lib/diameter/examples/code/node.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -30,6 +30,8 @@
connect/2,
stop/1]).
+-export([message/3]).
+
-type protocol()
:: tcp | sctp.
@@ -128,6 +130,8 @@ stop(Name) ->
server_opts({T, Addr, Port}) ->
[{transport_module, tmod(T)},
{transport_config, [{reuseaddr, true},
+ {sender, true},
+ {message_cb, [fun ?MODULE:message/3, 0]},
{ip, addr(Addr)},
{port, Port}]}];
@@ -173,3 +177,26 @@ addr(loopback) ->
{127,0,0,1};
addr(A) ->
A.
+
+%% ---------------------------------------------------------------------------
+
+%% message/3
+%%
+%% Simple message callback that limits the number of concurrent
+%% requests on the peer connection in question.
+
+%% Incoming request.
+message(recv, <<_:32, 1:1, _/bits>> = Bin, N) ->
+ [Bin, N < 32, fun ?MODULE:message/3, N+1];
+
+%% Outgoing request.
+message(ack, <<_:32, 1:1, _/bits>>, _) ->
+ [];
+
+%% Incoming answer or request discarded.
+message(ack, _, N) ->
+ [N =< 32, fun ?MODULE:message/3, N-1];
+
+%% Outgoing message or incoming answer.
+message(_, Bin, _) ->
+ [Bin].
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 1b0dc417e5..e43b3f54cf 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -542,11 +542,11 @@ put_route(Pid) ->
MRef = monitor(process, Pid),
put(Pid, MRef).
-%% get_route/2
+%% get_route/3
-%% incoming answer
-get_route(_, #diameter_packet{header = #diameter_header{is_request = false}}
- = Pkt) ->
+%% Incoming answer.
+get_route(_, _, #diameter_packet{header = #diameter_header{is_request = false}}
+ = Pkt) ->
Seqs = diameter_codec:sequence_numbers(Pkt),
case erase(Seqs) of
{Pid, Ref, MRef} ->
@@ -557,8 +557,14 @@ get_route(_, #diameter_packet{header = #diameter_header{is_request = false}}
false
end;
-%% incoming request
-get_route(Ack, _) ->
+%% Requests answered here ...
+get_route(_, N, _)
+ when N == 'CER';
+ N == 'DPR' ->
+ false;
+
+%% ... or not.
+get_route(Ack, _, _) ->
Ack.
%% erase_route/1
@@ -650,7 +656,7 @@ encode(Rec, Opts, Dict) ->
%% incoming/2
incoming({recv = T, Name, Pkt}, #state{parent = Pid, ack = Ack} = S) ->
- Pid ! {T, self(), get_route(Ack, Pkt), Name, Pkt},
+ Pid ! {T, self(), get_route(Ack, Name, Pkt), Name, Pkt},
rcv(Name, Pkt, S);
incoming(#diameter_header{is_request = R}, #state{transport = TPid,
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index a976a8b998..788f697627 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -514,6 +514,13 @@ transition({tc_timeout, T}, S) ->
tc_timeout(T, S),
ok;
+transition({nodeup, Node, _}, S) ->
+ nodeup(Node, S),
+ ok;
+
+transition({nodedown, _Node, _}, _) ->
+ ok;
+
transition(Req, S) ->
unexpected(handle_info, [Req], S),
ok.
@@ -709,6 +716,8 @@ mref(P) ->
init_shared(#state{options = #{use_shared_peers := T},
service_name = Svc}) ->
+ T == false orelse net_kernel:monitor_nodes(true, [{node_type, visible},
+ nodedown_reason]),
notify(T, Svc, {service, self()}).
init_mod(#diameter_app{alias = Alias,
@@ -728,6 +737,11 @@ notify(Share, SvcName, T) ->
%% Test for the empty list for upgrade reasons: there's no
%% diameter_peer:notify/3 in old code.
+nodeup(Node, #state{options = #{share_peers := SP},
+ service_name = SvcName}) ->
+ lists:member(Node, remotes(SP))
+ andalso diameter_peer:notify([Node], SvcName, {service, self()}).
+
remotes(false) ->
[];
@@ -1400,9 +1414,15 @@ is_remote(Pid, T) ->
%% # remote_peer_up/4
%% ---------------------------------------------------------------------------
-remote_peer_up(TPid, Aliases, Caps, #state{options = #{use_shared_peers := T}}
+remote_peer_up(TPid, Aliases, Caps, #state{options = #{use_shared_peers := T},
+ remote = {PeerT, _, _}}
= S) ->
- is_remote(TPid, T) andalso rpu(TPid, Aliases, Caps, S).
+ is_remote(TPid, T)
+ andalso not ets:member(PeerT, TPid)
+ andalso rpu(TPid, Aliases, Caps, S).
+
+%% Notification can be duplicate since remote nodes push and the local
+%% node pulls.
rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) ->
#diameter_service{applications = Apps} = Svc,
@@ -1412,6 +1432,7 @@ rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) ->
rpu(_, [] = No, _, _) ->
No;
+
rpu(TPid, Aliases, Caps, {PeerT, _, _} = RT) ->
monitor(process, TPid),
ets:insert(PeerT, #peer{pid = TPid,
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 6a9f1f940b..a0104fac6e 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -112,7 +112,7 @@
{transport :: pid(),
ack = false :: boolean(),
socket :: gen_sctp:sctp_socket(),
- assoc_id :: gen_sctp:assoc_id()}). %% next output stream
+ assoc_id :: gen_sctp:assoc_id()}).
%% Listener process state.
-record(listener,
@@ -565,7 +565,7 @@ transition(Msg, S)
%% Deferred actions from a message_cb.
transition({actions, Dir, Acts}, S) ->
- actions(Acts, Dir, S);
+ setopts(ok, actions(Acts, Dir, S));
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index a2f393d5d4..aa09a261a3 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -640,7 +640,7 @@ transition(Msg, S)
%% Deferred actions from a message_cb.
transition({actions, Dir, Acts}, S) ->
- actions(Acts, Dir, S);
+ setopts(actions(Acts, Dir, S));
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,