aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src')
-rw-r--r--lib/diameter/src/base/diameter_lib.erl2
-rw-r--r--lib/diameter/src/base/diameter_peer.erl6
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl20
-rw-r--r--lib/diameter/src/base/diameter_reg.erl235
-rw-r--r--lib/diameter/src/base/diameter_service.erl25
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl1
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl2
-rw-r--r--lib/diameter/src/compiler/diameter_exprecs.erl4
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl90
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl92
10 files changed, 241 insertions, 236 deletions
diff --git a/lib/diameter/src/base/diameter_lib.erl b/lib/diameter/src/base/diameter_lib.erl
index 8792e97621..1c1ea42cb5 100644
--- a/lib/diameter/src/base/diameter_lib.erl
+++ b/lib/diameter/src/base/diameter_lib.erl
@@ -283,7 +283,7 @@ ip(T)
%% Or not: convert from '.'/':'-separated decimal/hex.
ip(Addr) ->
- {ok, A} = inet_parse:address(Addr), %% documented in inet(3)
+ {ok, A} = inet:parse_address(Addr),
A.
%% ---------------------------------------------------------------------------
diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl
index 2759f17e64..4cb5a57a54 100644
--- a/lib/diameter/src/base/diameter_peer.erl
+++ b/lib/diameter/src/base/diameter_peer.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -202,10 +202,10 @@ match1(Addr, Match) ->
match(Addr, {ok, A}, _) ->
Addr == A;
match(Addr, {error, _}, RE) ->
- match == re:run(inet_parse:ntoa(Addr), RE, [{capture, none}]).
+ match == re:run(inet:ntoa(Addr), RE, [{capture, none}, caseless]).
addr([_|_] = A) ->
- inet_parse:address(A);
+ inet:parse_address(A);
addr(A) ->
{ok, A}.
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 18904b8a55..d99f11a697 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -544,11 +544,11 @@ put_route(Pid) ->
MRef = monitor(process, Pid),
put(Pid, MRef).
-%% get_route/2
+%% get_route/3
-%% incoming answer
-get_route(_, #diameter_packet{header = #diameter_header{is_request = false}}
- = Pkt) ->
+%% Incoming answer.
+get_route(_, _, #diameter_packet{header = #diameter_header{is_request = false}}
+ = Pkt) ->
Seqs = diameter_codec:sequence_numbers(Pkt),
case erase(Seqs) of
{Pid, Ref, MRef} ->
@@ -559,8 +559,14 @@ get_route(_, #diameter_packet{header = #diameter_header{is_request = false}}
false
end;
-%% incoming request
-get_route(Ack, _) ->
+%% Requests answered here ...
+get_route(_, N, _)
+ when N == 'CER';
+ N == 'DPR' ->
+ false;
+
+%% ... or not.
+get_route(Ack, _, _) ->
Ack.
%% erase_route/1
@@ -745,7 +751,7 @@ recv1('DPA' = Name,
%% Any other message with a header and no length errors.
recv1(Name, H, Msg, #state{parent = Pid, ack = Ack} = S) ->
Pkt = pkt(H, Msg),
- Pid ! {recv, self(), get_route(Ack, Pkt), Name, Pkt},
+ Pid ! {recv, self(), get_route(Ack, Name, Pkt), Name, Pkt},
handle(Name, Pkt, S).
%% pkt/2
diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl
index 97e74657bd..bd5db54a5c 100644
--- a/lib/diameter/src/base/diameter_reg.erl
+++ b/lib/diameter/src/base/diameter_reg.erl
@@ -19,10 +19,11 @@
%%
%%
-%% The module implements a simple term -> pid registry.
+%% A simple term -> pid registry.
%%
-module(diameter_reg).
+
-behaviour(gen_server).
-export([add/1,
@@ -57,18 +58,18 @@
-type key() :: term().
-type from() :: {pid(), term()}.
+-type rcvr() :: [pid() | term()] %% subscribe
+ | from(). %% wait
-type pattern() :: term().
-record(state, {id = diameter_lib:now(),
- receivers = dict:new()
- :: dict:dict(pattern(), [[pid() | term()]%% subscribe
- | from()]), %% wait
+ notify = #{} :: #{pattern() => [rcvr()]},
monitors = sets:new() :: sets:set(pid())}).
%% The ?TABLE bag contains the Key -> Pid mapping, as {Key, Pid}
%% tuples. Each pid is stored in the monitors set to ensure only one
%% monitor for each pid: more are harmless, but unnecessary. A pattern
-%% is added to receivers a result of calls to wait/1 or subscribe/2:
+%% is added to notify a result of calls to wait/1 or subscribe/2:
%% changes to ?TABLE causes processes to be notified as required.
%% ===========================================================================
@@ -156,7 +157,7 @@ wait(Pat) ->
%% # subscribe(Pat, T)
%%
%% Like match/1, but additionally receive messages of the form
-%% {T, add|remove, {term(), pid()} when associations are added
+%% {T, add|remove, {term(), pid()}} when associations are added
%% or removed.
%% ===========================================================================
@@ -186,15 +187,12 @@ uptime() ->
-> [{pid(), [key()]}].
pids() ->
- to_list(fun swap/1).
-
-to_list(Fun) ->
- ets:foldl(fun(T,D) -> append(Fun(T), D) end, orddict:new(), ?TABLE).
+ append(ets:select(?TABLE, [{{'$1','$2'}, [], [{{'$2', '$1'}}]}])).
-append({K,V}, Dict) ->
- orddict:append(K, V, Dict).
-
-id(T) -> T.
+append(Pairs) ->
+ dict:to_list(lists:foldl(fun({K,V}, D) -> dict:append(K, V, D) end,
+ dict:new(),
+ Pairs)).
%% terms/0
@@ -202,9 +200,7 @@ id(T) -> T.
-> [{key(), [pid()]}].
terms() ->
- to_list(fun id/1).
-
-swap({X,Y}) -> {Y,X}.
+ append(ets:tab2list(?TABLE)).
%% subs/0
@@ -212,31 +208,19 @@ swap({X,Y}) -> {Y,X}.
-> [{pattern(), [{pid(), term()}]}].
subs() ->
- #state{receivers = RD} = state(),
- dict:fold(fun sub/3, orddict:new(), RD).
-
-sub(Pat, Ps, Dict) ->
- lists:foldl(fun([P|T], D) -> orddict:append(Pat, {P,T}, D);
- (_, D) -> D
- end,
- Dict,
- Ps).
+ #state{notify = Dict} = state(),
+ [{K, Ts} || {K,Ps} <- maps:to_list(Dict),
+ Ts <- [[{P,T} || [P|T] <- Ps]]].
%% waits/0
-spec waits()
- -> [{pattern(), [{from(), term()}]}].
+ -> [{pattern(), [from()]}].
waits() ->
- #state{receivers = RD} = state(),
- dict:fold(fun wait/3, orddict:new(), RD).
-
-wait(Pat, Ps, Dict) ->
- lists:foldl(fun({_,_} = F, D) -> orddict:append(Pat, F, D);
- (_, D) -> D
- end,
- Dict,
- Ps).
+ #state{notify = Dict} = state(),
+ [{K, Ts} || {K,Ps} <- maps:to_list(Dict),
+ Ts <- [[T || {_,_} = T <- Ps]]].
%% ----------------------------------------------------------
%% # init/1
@@ -250,33 +234,28 @@ init(_) ->
%% # handle_call/3
%% ----------------------------------------------------------
-handle_call({add, Uniq, Key}, {Pid, _}, S0) ->
+handle_call({add, Uniq, Key}, {Pid, _}, S) ->
Rec = {Key, Pid},
- S1 = flush(Uniq, Rec, S0),
+ NS = flush(Uniq, Rec, S), %% before insert
{Res, New} = insert(Uniq, Rec),
- {Recvs, S} = add(New, Rec, S1),
- notify(Recvs, Rec),
- {reply, Res, S};
+ {reply, Res, notify(add, New andalso Rec, NS)};
handle_call({remove, Key}, {Pid, _}, S) ->
Rec = {Key, Pid},
- Recvs = delete([Rec], S),
ets:delete_object(?TABLE, Rec),
- notify(Recvs, remove),
- {reply, true, S};
+ {reply, true, notify(remove, Rec, S)};
-handle_call({wait, Pat}, {Pid, _} = From, #state{receivers = RD} = S) ->
+handle_call({wait, Pat}, {Pid, _} = From, S) ->
NS = add_monitor(Pid, S),
case match(Pat) of
- [_|_] = L ->
- {reply, L, NS};
+ [_|_] = Recs ->
+ {reply, Recs, NS};
[] ->
- {noreply, NS#state{receivers = dict:append(Pat, From, RD)}}
+ {noreply, queue(Pat, From, NS)}
end;
-handle_call({subscribe, Pat, T}, {Pid, _}, #state{receivers = RD} = S) ->
- NS = add_monitor(Pid, S),
- {reply, match(Pat), NS#state{receivers = dict:append(Pat, [Pid | T], RD)}};
+handle_call({subscribe, Pat, T}, {Pid, _}, S) ->
+ {reply, match(Pat), queue(Pat, [Pid | T], add_monitor(Pid, S))};
handle_call(state, _, S) ->
{reply, S, S};
@@ -332,106 +311,60 @@ insert(true, Rec) ->
B = ets:insert_new(?TABLE, Rec), %% entry inserted?
{B, B}.
-%% add/3
-
+%% add_monitor/2
+%%
%% Only add a single monitor for any given process, since there's no
%% use to more.
-add(true, {_Key, Pid} = Rec, S) ->
- NS = add_monitor(Pid, S),
- {Recvs, RD} = add(Rec, NS),
- {Recvs, S#state{receivers = RD}};
-
-add(false = No, _, S) ->
- {No, S}.
-
-%% add/2
-
-%% Notify processes whose patterns match the inserted key.
-add({_Key, Pid} = Rec, #state{receivers = RD}) ->
- dict:fold(fun(Pt, Ps, A) ->
- add(lists:member(Rec, match(Pt, Pid)), Pt, Ps, Rec, A)
- end,
- {sets:new(), RD},
- RD).
-
-%% add/5
-
-add(true, Pat, Recvs, {_,_} = Rec, {Set, Dict}) ->
- {lists:foldl(fun sets:add_element/2, Set, Recvs),
- remove(fun erlang:is_list/1, Pat, Recvs, Dict)};
-add(false, _, _, _, Acc) ->
- Acc.
+add_monitor(Pid, #state{monitors = Ps} = S) ->
+ case sets:is_element(Pid, Ps) of
+ false ->
+ monitor(process, Pid),
+ S#state{monitors = sets:add_element(Pid, Ps)};
+ true ->
+ S
+ end.
-%% add_monitor/2
-
-add_monitor(Pid, #state{monitors = MS} = S) ->
- add_monitor(sets:is_element(Pid, MS), Pid, S).
-
-%% add_monitor/3
-
-add_monitor(false, Pid, #state{monitors = MS} = S) ->
- monitor(process, Pid),
- S#state{monitors = sets:add_element(Pid, MS)};
-
-add_monitor(true, _, S) ->
- S.
-
-%% delete/2
-
-delete(Recs, #state{receivers = RD}) ->
- lists:foldl(fun(R,S) -> delete(R, RD, S) end, sets:new(), Recs).
+%% notify/3
-%% delete/3
+notify(_, false, S) ->
+ S;
-delete({_Key, Pid} = Rec, RD, Set) ->
- dict:fold(fun(Pt, Ps, S) ->
- delete(lists:member(Rec, match(Pt, Pid)), Rec, Ps, S)
- end,
- Set,
- RD).
+notify(Op, {_,_} = Rec, #state{notify = Dict} = S) ->
+ S#state{notify = maps:fold(fun(P,Rs,D) -> notify(Op, Rec, P, Rs, D) end,
+ Dict,
+ Dict)}.
-%% delete/4
+%% notify/5
-%% Entry matches a pattern ...
-delete(true, Rec, Recvs, Set) ->
- lists:foldl(fun(R,S) -> sets:add_element({R, Rec}, S) end,
- Set,
- Recvs);
-
-%% ... or not.
-delete(false, _, _, Set) ->
- Set.
-
-%% notify/2
-
-notify(false = No, _) ->
- No;
-
-notify(Recvs, remove = Op) ->
- sets:fold(fun({P,R}, N) -> send(P, R, Op), N+1 end, 0, Recvs);
-
-notify(Recvs, {_,_} = Rec) ->
- sets:fold(fun(P,N) -> send(P, Rec, add), N+1 end, 0, Recvs).
+notify(Op, {_, Pid} = Rec, Pat, Rcvrs, Dict) ->
+ case lists:member(Rec, match(Pat, Pid)) of
+ true ->
+ reset(Pat, Dict, [P || P <- Rcvrs, send(P, Op, Rec)]);
+ false ->
+ Dict
+ end.
%% send/3
-%% No processes waiting on remove, by construction: they've either
-%% received notification at add or aren't waiting.
-send([Pid | T], Rec, Op) ->
- Pid ! {T, Op, Rec};
+send([Pid | T], Op, Rec) ->
+ Pid ! {T, Op, Rec},
+ true;
-send({_,_} = From, Rec, add) ->
- gen_server:reply(From, [Rec]).
+%% No processes wait on remove: they receive notification immediately
+%% or at add, by construction.
+send({_,_} = From, add, Rec) ->
+ gen_server:reply(From, [Rec]),
+ false.
%% down/2
-down(Pid, #state{monitors = MS} = S) ->
- NS = flush(Pid, S),
- Recvs = delete(match('_', Pid), NS),
+down(Pid, #state{monitors = Ps} = S) ->
+ Recs = match('_', Pid),
ets:match_delete(?TABLE, {'_', Pid}),
- notify(Recvs, remove),
- NS#state{monitors = sets:del_element(Pid, MS)}.
+ lists:foldl(fun(R,NS) -> notify(remove, R, NS) end,
+ flush(Pid, S#state{monitors = sets:del_element(Pid, Ps)}),
+ Recs).
%% flush/3
@@ -452,16 +385,15 @@ flush(false, _, S) ->
%% flush/2
%% Process has died and should no longer receive messages/replies.
-flush(Pid, #state{receivers = RD} = S)
- when is_pid(Pid) ->
- S#state{receivers = dict:fold(fun(Pt,Ps,D) -> flush(Pid, Pt, Ps, D) end,
- RD,
- RD)}.
+flush(Pid, #state{notify = Dict} = S) ->
+ S#state{notify = maps:fold(fun(P,Rs,D) -> flush(Pid, P, Rs, D) end,
+ Dict,
+ Dict)}.
%% flush/4
-flush(Pid, Pat, Recvs, Dict) ->
- remove(fun(T) -> Pid /= head(T) end, Pat, Recvs, Dict).
+flush(Pid, Pat, Rcvrs, Dict) ->
+ reset(Pat, Dict, [T || T <- Rcvrs, Pid /= head(T)]).
%% head/1
@@ -471,15 +403,18 @@ head([P|_]) ->
head({P,_}) ->
P.
-%% remove/4
+%% reset/3
+
+reset(Key, Map, []) ->
+ maps:remove(Key, Map);
+
+reset(Key, Map, List) ->
+ maps:put(Key, List, Map).
+
+%% queue/3
-remove(Pred, Key, Values, Dict) ->
- case lists:filter(Pred, Values) of
- [] ->
- dict:erase(Key, Dict);
- Rest ->
- dict:store(Key, Rest, Dict)
- end.
+queue(Pat, Rcvr, #state{notify = Dict} = S) ->
+ S#state{notify = maps:put(Pat, [Rcvr | maps:get(Pat, Dict, [])], Dict)}.
%% call/1
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index 7cd4d23402..31dd92f878 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -530,6 +530,13 @@ transition({tc_timeout, T}, S) ->
tc_timeout(T, S),
ok;
+transition({nodeup, Node, _}, S) ->
+ nodeup(Node, S),
+ ok;
+
+transition({nodedown, _Node, _}, _) ->
+ ok;
+
transition(Req, S) ->
unexpected(handle_info, [Req], S),
ok.
@@ -749,6 +756,8 @@ mref(P) ->
init_shared(#state{options = #{use_shared_peers := T},
service_name = Svc}) ->
+ T == false orelse net_kernel:monitor_nodes(true, [{node_type, visible},
+ nodedown_reason]),
notify(T, Svc, {service, self()}).
init_mod(#diameter_app{alias = Alias,
@@ -764,6 +773,11 @@ notify(Share, SvcName, T) ->
%% Test for the empty list for upgrade reasons: there's no
%% diameter_peer:notify/3 in old code.
+nodeup(Node, #state{options = #{share_peers := SP},
+ service_name = SvcName}) ->
+ lists:member(Node, remotes(SP))
+ andalso diameter_peer:notify([Node], SvcName, {service, self()}).
+
remotes(false) ->
[];
@@ -1440,9 +1454,15 @@ is_remote(Pid, T) ->
%% # remote_peer_up/4
%% ---------------------------------------------------------------------------
-remote_peer_up(TPid, Aliases, Caps, #state{options = #{use_shared_peers := T}}
+remote_peer_up(TPid, Aliases, Caps, #state{options = #{use_shared_peers := T},
+ remote = {PeerT, _, _}}
= S) ->
- is_remote(TPid, T) andalso rpu(TPid, Aliases, Caps, S).
+ is_remote(TPid, T)
+ andalso not ets:member(PeerT, TPid)
+ andalso rpu(TPid, Aliases, Caps, S).
+
+%% Notification can be duplicate since remote nodes push and the local
+%% node pulls.
rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) ->
#diameter_service{applications = Apps} = Svc,
@@ -1452,6 +1472,7 @@ rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) ->
rpu(_, [] = No, _, _) ->
No;
+
rpu(TPid, Aliases, Caps, {PeerT, _, _} = RT) ->
monitor(process, TPid),
ets:insert(PeerT, #peer{pid = TPid,
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index a10bf78d6e..f510f40a17 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -93,6 +93,7 @@
caller :: pid() | undefined, %% calling process
handler :: pid(), %% request process
peer :: undefined | {pid(), #diameter_caps{}},
+ caps :: undefined, %% no longer used
packet :: #diameter_packet{} | undefined}). %% of request
%% ---------------------------------------------------------------------------
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index c08e2da672..43623334a9 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -74,7 +74,6 @@
okay := non_neg_integer()}, %% REOPEN -> OKAY
codec :: #{decode_format := none,
string_decode := false,
- strict_arities => diameter:strict_arities(),
strict_mbit := boolean(),
rfc := 3588 | 6733,
ordered_encode := false},
@@ -152,7 +151,6 @@ i({Ack, T, Pid, {Opts,
okay => 3},
Opts)),
codec = maps:with([decode_format,
- strict_arities,
strict_mbit,
string_decode,
rfc,
diff --git a/lib/diameter/src/compiler/diameter_exprecs.erl b/lib/diameter/src/compiler/diameter_exprecs.erl
index 9a0cb6baf2..143dede037 100644
--- a/lib/diameter/src/compiler/diameter_exprecs.erl
+++ b/lib/diameter/src/compiler/diameter_exprecs.erl
@@ -110,9 +110,9 @@
%% parse_transform/2
parse_transform(Forms, _Options) ->
- Rs = [R || {attribute, _, record, R} <- Forms],
- Es = lists:append([E || {attribute, _, export_records, E} <- Forms]),
{H,T} = lists:splitwith(fun is_head/1, Forms),
+ Rs = [R || {attribute, _, record, R} <- H],
+ Es = lists:append([E || {attribute, _, export_records, E} <- H]),
H ++ [a_export(Es) | f_accessors(Es, Rs)] ++ T.
is_head(T) ->
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 4f56475529..4eb3379d59 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -102,6 +102,7 @@
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
os = 0 :: uint(), %% next output stream
+ rotate = 1 :: boolean() | 0 | 1, %% rotate os?
packet = true :: boolean() %% legacy transport_data?
| raw,
message_cb = false :: false | diameter:eval(),
@@ -112,7 +113,7 @@
{transport :: pid(),
ack = false :: boolean(),
socket :: gen_sctp:sctp_socket(),
- assoc_id :: gen_sctp:assoc_id()}). %% next output stream
+ assoc_id :: gen_sctp:assoc_id()}).
%% Listener process state.
-record(listener,
@@ -156,12 +157,7 @@ start(T, Svc, Opts)
= Svc,
diameter_sctp_sup:start(), %% start supervisors on demand
Addrs = Caps#diameter_caps.host_ip_address,
- s(T, Addrs, Pid, lists:map(fun ip/1, Opts)).
-
-ip({ifaddr, A}) ->
- {ip, A};
-ip(T) ->
- T.
+ s(T, Addrs, Pid, Opts).
%% A listener spawns transports either as a consequence of this call
%% when there is not yet an association to assign it, or at comm_up on
@@ -354,23 +350,35 @@ l([], Ref, T) ->
%% open/3
open(Addrs, Opts, PortNr) ->
- {LAs, Os} = addrs(Addrs, Opts),
- {LAs, case gen_sctp:open(gen_opts(portnr(Os, PortNr))) of
- {ok, Sock} ->
- Sock;
- {error, Reason} ->
- x({open, Reason})
- end}.
+ case gen_sctp:open(gen_opts(portnr(addrs(Addrs, Opts), PortNr))) of
+ {ok, Sock} ->
+ {addrs(Sock), Sock};
+ {error, Reason} ->
+ x({open, Reason})
+ end.
addrs(Addrs, Opts) ->
- case proplists:split(Opts, [ip]) of
- {[[]], _} ->
- {Addrs, Opts ++ [{ip, A} || A <- Addrs]};
- {[As], Os} ->
- LAs = [diameter_lib:ipaddr(A) || {ip, A} <- As],
- {LAs, Os ++ [{ip, A} || A <- LAs]}
+ case lists:mapfoldl(fun ipaddr/2, false, Opts) of
+ {Os, true} ->
+ Os;
+ {_, false} ->
+ Opts ++ [{ip, A} || A <- Addrs]
end.
+ipaddr({K,A}, _)
+ when K == ifaddr;
+ K == ip ->
+ {{ip, ipaddr(A)}, true};
+ipaddr(T, B) ->
+ {T, B}.
+
+ipaddr(A)
+ when A == loopback;
+ A == any ->
+ A;
+ipaddr(A) ->
+ diameter_lib:ipaddr(A).
+
portnr(Opts, PortNr) ->
case proplists:get_value(port, Opts) of
undefined ->
@@ -379,6 +387,14 @@ portnr(Opts, PortNr) ->
Opts
end.
+addrs(Sock) ->
+ case inet:socknames(Sock) of
+ {ok, As} ->
+ [A || {A,_} <- As];
+ {error, Reason} ->
+ x({socknames, Reason})
+ end.
+
%% x/1
x(Reason) ->
@@ -565,7 +581,7 @@ transition(Msg, S)
%% Deferred actions from a message_cb.
transition({actions, Dir, Acts}, S) ->
- actions(Acts, Dir, S);
+ setopts(ok, actions(Acts, Dir, S));
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
@@ -677,11 +693,16 @@ send(#diameter_packet{transport_data = {outstream, SId}}
= S) ->
send(SId rem OS, Msg, S);
-%% ... or not: rotate through all streams.
-send(Msg, #transport{streams = {_, OS},
+%% ... or not: rotate when sending on multiple streams ...
+send(Msg, #transport{rotate = true,
+ streams = {_, OS},
os = N}
= S) ->
- send(N, Msg, S#transport{os = (N + 1) rem OS}).
+ send(N, Msg, S#transport{os = (N + 1) rem OS});
+
+%% ... or send on the only stream available.
+send(Msg, S) ->
+ send(0, Msg, S).
%% send/3
@@ -749,7 +770,7 @@ recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin}
%% Inbound Diameter message.
recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S)
when is_binary(Bin) ->
- message(recv, Msg, S);
+ message(recv, Msg, recv(S));
recv({_, #sctp_shutdown_event{}}, _) ->
stop;
@@ -769,6 +790,25 @@ recv({_, #sctp_paddr_change{}}, _) ->
recv({_, #sctp_pdapi_event{}}, _) ->
ok.
+%% recv/1
+%%
+%% Start sending unordered after the second reception, so that an
+%% outgoing CER/CEA will arrive at the peer before another request.
+
+recv(#transport{rotate = B} = S)
+ when is_boolean(B) ->
+ S;
+
+recv(#transport{rotate = 0, streams = {_,N}, socket = Sock} = S) ->
+ ok = inet:setopts(Sock, [{sctp_default_send_param,
+ #sctp_sndrcvinfo{flags = [unordered]}}]),
+ S#transport{rotate = 1 < N};
+
+recv(#transport{rotate = N} = S) ->
+ S#transport{rotate = N-1}.
+
+%% publish/4
+
publish(T, Ref, Id, Sock) ->
true = diameter_reg:add_new({?MODULE, T, {Ref, {Id, Sock}}}),
putr(?INFO_KEY, {gen_sctp, Sock}). %% for info/1
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index ac55d722fa..a8639baa11 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -142,8 +142,7 @@
-> {ok, pid(), [inet:ip_address()]}
when Ref :: diameter:transport_ref();
({connect, Ref}, #diameter_service{}, [connect_option()])
- -> {ok, pid(), [inet:ip_address()]}
- | {ok, pid()}
+ -> {ok, pid()}
when Ref :: diameter:transport_ref().
start({T, Ref}, Svc, Opts) ->
@@ -258,22 +257,14 @@ i(#monitor{parent = Pid, transport = TPid} = S) ->
i({listen, Ref, {Mod, Opts, Addrs}}) ->
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {[LA, LP], Rest} = proplists:split(Opts, [ip, port]),
- LAddrOpt = get_addr(LA, Addrs),
- LPort = get_port(LP),
- {ok, LSock} = Mod:listen(LPort, gen_opts(LAddrOpt, Rest)),
- LAddr = laddr(LAddrOpt, Mod, LSock),
+ {[LP], Rest} = proplists:split(Opts, [port]),
+ {ok, LSock} = Mod:listen(get_port(LP), gen_opts(Addrs, Rest)),
+ {ok, {LAddr, _}} = sockname(Mod, LSock),
true = diameter_reg:add_new({?MODULE, listener, {Ref, {LAddr, LSock}}}),
proc_lib:init_ack({ok, self(), {LAddr, LSock}}),
#listener{socket = LSock,
module = Mod}.
-laddr([], Mod, Sock) ->
- {ok, {Addr, _Port}} = sockname(Mod, Sock),
- Addr;
-laddr([{ip, Addr}], _, _) ->
- Addr.
-
ssl_opts([]) ->
false;
ssl_opts([{ssl_options, true}]) ->
@@ -308,24 +299,16 @@ init(accept = T, Ref, Mod, Pid, Opts, Addrs, SvcPid) ->
Sock;
init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SvcPid) ->
- {[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]),
- LAddrOpt = get_addr(LA, Addrs),
+ {[RA, RP], Rest} = proplists:split(Opts, [raddr, rport]),
RAddr = get_addr(RA),
RPort = get_port(RP),
- proc_lib:init_ack(init_rc(LAddrOpt)),
- Sock = ok(connect(Mod, RAddr, RPort, gen_opts(LAddrOpt, Rest))),
+ proc_lib:init_ack({ok, self()}),
+ Sock = ok(connect(Mod, RAddr, RPort, gen_opts(Addrs, Rest))),
publish(Mod, T, Ref, Sock),
- up(Pid, {RAddr, RPort}, LAddrOpt, Mod, Sock),
+ up(Pid, {RAddr, RPort}, Mod, Sock),
Sock.
-init_rc([{ip, Addr}]) ->
- {ok, self(), [Addr]};
-init_rc([]) ->
- {ok, self()}.
-
-up(Pid, Remote, [{ip, _Addr}], _, _) ->
- diameter_peer:up(Pid, Remote);
-up(Pid, Remote, [], Mod, Sock) ->
+up(Pid, Remote, Mod, Sock) ->
{Addr, _Port} = ok(sockname(Mod, Sock)),
diameter_peer:up(Pid, Remote, [Addr]).
@@ -382,25 +365,41 @@ l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
l([], Ref, T) ->
diameter_tcp_sup:start_child({listen, Ref, T}).
-%% get_addr/1
+%% addrs/2
+%%
+%% Take the first address from the service if several are specified
+%% and not address is configured.
+
+addrs(Addrs, Opts) ->
+ case lists:mapfoldr(fun ipaddr/2, [], Opts) of
+ {Os, [_]} ->
+ Os;
+ {_, []} ->
+ Opts ++ [{ip, A} || [A|_] <- [Addrs]];
+ {_, As} ->
+ ?ERROR({invalid_addrs, As, Addrs})
+ end.
-get_addr(As) ->
- diameter_lib:ipaddr(addr(As, [])).
+ipaddr({K,A}, As)
+ when K == ifaddr;
+ K == ip ->
+ {{ip, ipaddr(A)}, [A | As]};
+ipaddr(T, B) ->
+ {T, B}.
-%% get_addr/2
+ipaddr(A)
+ when A == loopback;
+ A == any ->
+ A;
+ipaddr(A) ->
+ diameter_lib:ipaddr(A).
-get_addr([], []) ->
- [];
-get_addr(As, Def) ->
- [{ip, diameter_lib:ipaddr(addr(As, Def))}].
+%% get_addr/1
-%% Take the first address from the service if several are unspecified.
-addr([], [Addr | _]) ->
- Addr;
-addr([{_, Addr}], _) ->
- Addr;
-addr(As, Addrs) ->
- ?ERROR({invalid_addrs, As, Addrs}).
+get_addr([{_, Addr}]) ->
+ diameter_lib:ipaddr(Addr);
+get_addr(Addrs) ->
+ ?ERROR({invalid_addrs, Addrs}).
%% get_port/1
@@ -413,10 +412,15 @@ get_port(Ps) ->
%% gen_opts/2
-gen_opts(LAddrOpt, Opts) ->
+gen_opts(Addrs, Opts) ->
+ gen_opts(addrs(Addrs, Opts)).
+
+%% gen_opts/1
+
+gen_opts(Opts) ->
{L,_} = proplists:split(Opts, [binary, packet, active]),
[[],[],[]] == L orelse ?ERROR({reserved_options, Opts}),
- [binary, {packet, 0}, {active, false}] ++ LAddrOpt ++ Opts.
+ [binary, {packet, 0}, {active, false} | Opts].
%% ---------------------------------------------------------------------------
%% # ports/1
@@ -640,7 +644,7 @@ transition(Msg, S)
%% Deferred actions from a message_cb.
transition({actions, Dir, Acts}, S) ->
- actions(Acts, Dir, S);
+ setopts(actions(Acts, Dir, S));
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,