aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_sctp.erl
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-09-14 12:59:17 +0200
committerAnders Svensson <[email protected]>2017-09-17 16:54:28 +0200
commitd38295257c5309e7291487edfacdfad605eec3e9 (patch)
tree82bc062e99322f24b23a333c4a0f03761d7ed837 /lib/diameter/src/transport/diameter_sctp.erl
parenta3749fd240260958053f90539b0f7e04e720d070 (diff)
downloadotp-d38295257c5309e7291487edfacdfad605eec3e9.tar.gz
otp-d38295257c5309e7291487edfacdfad605eec3e9.tar.bz2
otp-d38295257c5309e7291487edfacdfad605eec3e9.zip
Make unordered delivery configurable
Changing the default in the parent commit is possibly a bit dangerous, even if the motivation still holds. Take a step back and make unordered delivery a matter of configuration, without changing the default: configuration is {unordered, boolean() | pos_integer()}, with false the default, and N equivalent to OS =< N, where OS is the number of outbound streams negotiated on the association in question. A user can mess with this by configuring an sctp_default_send_param of their own, but unordered sending is them from start, not only after the second message reception.
Diffstat (limited to 'lib/diameter/src/transport/diameter_sctp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl47
1 files changed, 37 insertions, 10 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 2af11729a9..73a1a9191d 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -103,6 +103,8 @@
| undefined,
os = 0 :: uint(), %% next output stream
rotate = 1 :: boolean() | 0 | 1, %% rotate os?
+ unordered = false :: boolean() %% always send unordered?
+ | pos_integer(),% or if =< N outbound streams?
packet = true :: boolean() %% legacy transport_data?
| raw,
message_cb = false :: false | diameter:eval(),
@@ -247,8 +249,11 @@ i(#monitor{transport = TPid} = S) ->
i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
monitor(process, SvcPid),
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {Split, Rest}
- = proplists:split(Opts, [accept, packet, sender, message_cb]),
+ {Split, Rest} = proplists:split(Opts, [accept,
+ packet,
+ sender,
+ message_cb,
+ unordered]),
OwnOpts = lists:append(Split),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
@@ -260,12 +265,16 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
opts = [[[M] || {accept, M} <- OwnOpts],
proplists:get_value(packet, OwnOpts, true)
| [proplists:get_value(K, OwnOpts, false)
- || K <- [sender, message_cb]]]};
+ || K <- [sender, message_cb, unordered]]]};
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
- {[Ps | Split], Rest}
- = proplists:split(Opts, [rport, raddr, packet, sender, message_cb]),
+ {[Ps | Split], Rest} = proplists:split(Opts, [rport,
+ raddr,
+ packet,
+ sender,
+ message_cb,
+ unordered]),
OwnOpts = lists:append(Split),
CB = proplists:get_value(message_cb, OwnOpts, false),
false == CB orelse (Pid ! {diameter, ack}),
@@ -279,6 +288,7 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
mode = {connect, connect(Sock, RAs, RP, [])},
socket = Sock,
message_cb = CB,
+ unordered = proplists:get_value(ordered, OwnOpts, false),
packet = proplists:get_value(packet, OwnOpts, true),
send = proplists:get_value(sender, OwnOpts, false)};
@@ -316,12 +326,13 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
S#transport{parent = Pid};
{K, T, Opts} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
- [Matches, Packet, Sender, CB] = Opts,
+ [Matches, Packet, Sender, CB, Unordered] = Opts,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
false == CB orelse (S#transport.parent ! {diameter, ack}),
t(T, S#transport{socket = Sock,
message_cb = CB,
+ unordered = Unordered,
packet = Packet,
send = Sender});
accept_timeout = T ->
@@ -784,14 +795,30 @@ recv(#transport{rotate = B} = S)
when is_boolean(B) ->
S;
-recv(#transport{rotate = 0, streams = {_,N}, socket = Sock} = S) ->
- ok = inet:setopts(Sock, [{sctp_default_send_param,
- #sctp_sndrcvinfo{flags = [unordered]}}]),
- S#transport{rotate = 1 < N};
+recv(#transport{rotate = 0,
+ streams = {_,OS},
+ socket = Sock,
+ unordered = B}
+ = S) ->
+ ok = unordered(Sock, OS, B),
+ S#transport{rotate = 1 < OS};
recv(#transport{rotate = N} = S) ->
S#transport{rotate = N-1}.
+%% unordered/3
+
+unordered(Sock, OS, B)
+ when B;
+ is_integer(B), OS =< B ->
+ inet:setopts(Sock, [{sctp_default_send_param,
+ #sctp_sndrcvinfo{flags = [unordered]}}]);
+
+unordered(_, OS, B)
+ when not B;
+ is_integer(B), B < OS ->
+ ok.
+
%% publish/4
publish(T, Ref, Id, Sock) ->