aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_sctp.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/transport/diameter_sctp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl135
1 files changed, 101 insertions, 34 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 6a9f1f940b..64b34da690 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -79,7 +79,7 @@
-type option() :: {sender, boolean()}
| sender
| {packet, boolean() | raw}
- | {message_cb, false | diameter:evaluable()}.
+ | {message_cb, false | diameter:eval()}.
-type uint() :: non_neg_integer().
@@ -102,9 +102,12 @@
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
os = 0 :: uint(), %% next output stream
+ rotate = 1 :: boolean() | 0 | 1, %% rotate os?
+ unordered = false :: boolean() %% always send unordered?
+ | pos_integer(),% or if =< N outbound streams?
packet = true :: boolean() %% legacy transport_data?
| raw,
- message_cb = false :: false | diameter:evaluable(),
+ message_cb = false :: false | diameter:eval(),
send = false :: pid() | boolean()}). %% sending process
%% Monitor process state.
@@ -112,7 +115,7 @@
{transport :: pid(),
ack = false :: boolean(),
socket :: gen_sctp:sctp_socket(),
- assoc_id :: gen_sctp:assoc_id()}). %% next output stream
+ assoc_id :: gen_sctp:assoc_id()}).
%% Listener process state.
-record(listener,
@@ -120,7 +123,7 @@
socket :: gen_sctp:sctp_socket(),
service :: pid(), %% service process
pending = {0, queue:new()},
- opts :: [[match()] | boolean() | diameter:evaluable()]}).
+ opts :: [[match()] | boolean() | diameter:eval()]}).
%% Field pending implements two queues: the first of transport-to-be
%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
@@ -156,12 +159,7 @@ start(T, Svc, Opts)
= Svc,
diameter_sctp_sup:start(), %% start supervisors on demand
Addrs = Caps#diameter_caps.host_ip_address,
- s(T, Addrs, Pid, lists:map(fun ip/1, Opts)).
-
-ip({ifaddr, A}) ->
- {ip, A};
-ip(T) ->
- T.
+ s(T, Addrs, Pid, Opts).
%% A listener spawns transports either as a consequence of this call
%% when there is not yet an association to assign it, or at comm_up on
@@ -246,8 +244,11 @@ i(#monitor{transport = TPid} = S) ->
i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
monitor(process, SvcPid),
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {Split, Rest}
- = proplists:split(Opts, [accept, packet, sender, message_cb]),
+ {Split, Rest} = proplists:split(Opts, [accept,
+ packet,
+ sender,
+ message_cb,
+ unordered]),
OwnOpts = lists:append(Split),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
@@ -259,12 +260,16 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
opts = [[[M] || {accept, M} <- OwnOpts],
proplists:get_value(packet, OwnOpts, true)
| [proplists:get_value(K, OwnOpts, false)
- || K <- [sender, message_cb]]]};
+ || K <- [sender, message_cb, unordered]]]};
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
- {[Ps | Split], Rest}
- = proplists:split(Opts, [rport, raddr, packet, sender, message_cb]),
+ {[Ps | Split], Rest} = proplists:split(Opts, [rport,
+ raddr,
+ packet,
+ sender,
+ message_cb,
+ unordered]),
OwnOpts = lists:append(Split),
CB = proplists:get_value(message_cb, OwnOpts, false),
false == CB orelse (Pid ! {diameter, ack}),
@@ -278,6 +283,7 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
mode = {connect, connect(Sock, RAs, RP, [])},
socket = Sock,
message_cb = CB,
+ unordered = proplists:get_value(ordered, OwnOpts, false),
packet = proplists:get_value(packet, OwnOpts, true),
send = proplists:get_value(sender, OwnOpts, false)};
@@ -315,12 +321,13 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
S#transport{parent = Pid};
{K, T, Opts} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
- [Matches, Packet, Sender, CB] = Opts,
+ [Matches, Packet, Sender, CB, Unordered] = Opts,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
false == CB orelse (S#transport.parent ! {diameter, ack}),
t(T, S#transport{socket = Sock,
message_cb = CB,
+ unordered = Unordered,
packet = Packet,
send = Sender});
accept_timeout = T ->
@@ -354,23 +361,35 @@ l([], Ref, T) ->
%% open/3
open(Addrs, Opts, PortNr) ->
- {LAs, Os} = addrs(Addrs, Opts),
- {LAs, case gen_sctp:open(gen_opts(portnr(Os, PortNr))) of
- {ok, Sock} ->
- Sock;
- {error, Reason} ->
- x({open, Reason})
- end}.
+ case gen_sctp:open(gen_opts(portnr(addrs(Addrs, Opts), PortNr))) of
+ {ok, Sock} ->
+ {addrs(Sock), Sock};
+ {error, Reason} ->
+ x({open, Reason})
+ end.
addrs(Addrs, Opts) ->
- case proplists:split(Opts, [ip]) of
- {[[]], _} ->
- {Addrs, Opts ++ [{ip, A} || A <- Addrs]};
- {[As], Os} ->
- LAs = [diameter_lib:ipaddr(A) || {ip, A} <- As],
- {LAs, Os ++ [{ip, A} || A <- LAs]}
+ case lists:mapfoldl(fun ipaddr/2, false, Opts) of
+ {Os, true} ->
+ Os;
+ {_, false} ->
+ Opts ++ [{ip, A} || A <- Addrs]
end.
+ipaddr({K,A}, _)
+ when K == ifaddr;
+ K == ip ->
+ {{ip, ipaddr(A)}, true};
+ipaddr(T, B) ->
+ {T, B}.
+
+ipaddr(A)
+ when A == loopback;
+ A == any ->
+ A;
+ipaddr(A) ->
+ diameter_lib:ipaddr(A).
+
portnr(Opts, PortNr) ->
case proplists:get_value(port, Opts) of
undefined ->
@@ -379,6 +398,14 @@ portnr(Opts, PortNr) ->
Opts
end.
+addrs(Sock) ->
+ case inet:socknames(Sock) of
+ {ok, As} ->
+ [A || {A,_} <- As];
+ {error, Reason} ->
+ x({socknames, Reason})
+ end.
+
%% x/1
x(Reason) ->
@@ -565,7 +592,7 @@ transition(Msg, S)
%% Deferred actions from a message_cb.
transition({actions, Dir, Acts}, S) ->
- actions(Acts, Dir, S);
+ setopts(ok, actions(Acts, Dir, S));
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
@@ -677,11 +704,16 @@ send(#diameter_packet{transport_data = {outstream, SId}}
= S) ->
send(SId rem OS, Msg, S);
-%% ... or not: rotate through all streams.
-send(Msg, #transport{streams = {_, OS},
+%% ... or not: rotate when sending on multiple streams ...
+send(Msg, #transport{rotate = true,
+ streams = {_, OS},
os = N}
= S) ->
- send(N, Msg, S#transport{os = (N + 1) rem OS}).
+ send(N, Msg, S#transport{os = (N + 1) rem OS});
+
+%% ... or send on the only stream available.
+send(Msg, S) ->
+ send(0, Msg, S).
%% send/3
@@ -749,7 +781,7 @@ recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin}
%% Inbound Diameter message.
recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S)
when is_binary(Bin) ->
- message(recv, Msg, S);
+ message(recv, Msg, recv(S));
recv({_, #sctp_shutdown_event{}}, _) ->
stop;
@@ -769,6 +801,41 @@ recv({_, #sctp_paddr_change{}}, _) ->
recv({_, #sctp_pdapi_event{}}, _) ->
ok.
+%% recv/1
+%%
+%% Start sending unordered after the second reception, so that an
+%% outgoing CER/CEA will arrive at the peer before another request.
+
+recv(#transport{rotate = B} = S)
+ when is_boolean(B) ->
+ S;
+
+recv(#transport{rotate = 0,
+ streams = {_,OS},
+ socket = Sock,
+ unordered = B}
+ = S) ->
+ ok = unordered(Sock, OS, B),
+ S#transport{rotate = 1 < OS};
+
+recv(#transport{rotate = N} = S) ->
+ S#transport{rotate = N-1}.
+
+%% unordered/3
+
+unordered(Sock, OS, B)
+ when B;
+ is_integer(B), OS =< B ->
+ inet:setopts(Sock, [{sctp_default_send_param,
+ #sctp_sndrcvinfo{flags = [unordered]}}]);
+
+unordered(_, OS, B)
+ when not B;
+ is_integer(B), B < OS ->
+ ok.
+
+%% publish/4
+
publish(T, Ref, Id, Sock) ->
true = diameter_reg:add_new({?MODULE, T, {Ref, {Id, Sock}}}),
putr(?INFO_KEY, {gen_sctp, Sock}). %% for info/1