aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-09-14 12:59:17 +0200
committerAnders Svensson <[email protected]>2017-09-17 16:54:28 +0200
commitd38295257c5309e7291487edfacdfad605eec3e9 (patch)
tree82bc062e99322f24b23a333c4a0f03761d7ed837
parenta3749fd240260958053f90539b0f7e04e720d070 (diff)
downloadotp-d38295257c5309e7291487edfacdfad605eec3e9.tar.gz
otp-d38295257c5309e7291487edfacdfad605eec3e9.tar.bz2
otp-d38295257c5309e7291487edfacdfad605eec3e9.zip
Make unordered delivery configurable
Changing the default in the parent commit is possibly a bit dangerous, even if the motivation still holds. Take a step back and make unordered delivery a matter of configuration, without changing the default: configuration is {unordered, boolean() | pos_integer()}, with false the default, and N equivalent to OS =< N, where OS is the number of outbound streams negotiated on the association in question. A user can mess with this by configuring an sctp_default_send_param of their own, but unordered sending is them from start, not only after the second message reception.
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl47
-rw-r--r--lib/diameter/test/diameter_traffic_SUITE.erl19
2 files changed, 49 insertions, 17 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 2af11729a9..73a1a9191d 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -103,6 +103,8 @@
| undefined,
os = 0 :: uint(), %% next output stream
rotate = 1 :: boolean() | 0 | 1, %% rotate os?
+ unordered = false :: boolean() %% always send unordered?
+ | pos_integer(),% or if =< N outbound streams?
packet = true :: boolean() %% legacy transport_data?
| raw,
message_cb = false :: false | diameter:eval(),
@@ -247,8 +249,11 @@ i(#monitor{transport = TPid} = S) ->
i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
monitor(process, SvcPid),
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {Split, Rest}
- = proplists:split(Opts, [accept, packet, sender, message_cb]),
+ {Split, Rest} = proplists:split(Opts, [accept,
+ packet,
+ sender,
+ message_cb,
+ unordered]),
OwnOpts = lists:append(Split),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
@@ -260,12 +265,16 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
opts = [[[M] || {accept, M} <- OwnOpts],
proplists:get_value(packet, OwnOpts, true)
| [proplists:get_value(K, OwnOpts, false)
- || K <- [sender, message_cb]]]};
+ || K <- [sender, message_cb, unordered]]]};
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
- {[Ps | Split], Rest}
- = proplists:split(Opts, [rport, raddr, packet, sender, message_cb]),
+ {[Ps | Split], Rest} = proplists:split(Opts, [rport,
+ raddr,
+ packet,
+ sender,
+ message_cb,
+ unordered]),
OwnOpts = lists:append(Split),
CB = proplists:get_value(message_cb, OwnOpts, false),
false == CB orelse (Pid ! {diameter, ack}),
@@ -279,6 +288,7 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
mode = {connect, connect(Sock, RAs, RP, [])},
socket = Sock,
message_cb = CB,
+ unordered = proplists:get_value(ordered, OwnOpts, false),
packet = proplists:get_value(packet, OwnOpts, true),
send = proplists:get_value(sender, OwnOpts, false)};
@@ -316,12 +326,13 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
S#transport{parent = Pid};
{K, T, Opts} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
- [Matches, Packet, Sender, CB] = Opts,
+ [Matches, Packet, Sender, CB, Unordered] = Opts,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
false == CB orelse (S#transport.parent ! {diameter, ack}),
t(T, S#transport{socket = Sock,
message_cb = CB,
+ unordered = Unordered,
packet = Packet,
send = Sender});
accept_timeout = T ->
@@ -784,14 +795,30 @@ recv(#transport{rotate = B} = S)
when is_boolean(B) ->
S;
-recv(#transport{rotate = 0, streams = {_,N}, socket = Sock} = S) ->
- ok = inet:setopts(Sock, [{sctp_default_send_param,
- #sctp_sndrcvinfo{flags = [unordered]}}]),
- S#transport{rotate = 1 < N};
+recv(#transport{rotate = 0,
+ streams = {_,OS},
+ socket = Sock,
+ unordered = B}
+ = S) ->
+ ok = unordered(Sock, OS, B),
+ S#transport{rotate = 1 < OS};
recv(#transport{rotate = N} = S) ->
S#transport{rotate = N-1}.
+%% unordered/3
+
+unordered(Sock, OS, B)
+ when B;
+ is_integer(B), OS =< B ->
+ inet:setopts(Sock, [{sctp_default_send_param,
+ #sctp_sndrcvinfo{flags = [unordered]}}]);
+
+unordered(_, OS, B)
+ when not B;
+ is_integer(B), B < OS ->
+ ok.
+
%% publish/4
publish(T, Ref, Id, Sock) ->
diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl
index f058ed65b8..2c478140f8 100644
--- a/lib/diameter/test/diameter_traffic_SUITE.erl
+++ b/lib/diameter/test/diameter_traffic_SUITE.erl
@@ -480,9 +480,10 @@ add_transports(Config) ->
LRef = ?util:listen(SN,
[T,
{sender, SS},
- {message_cb, ST andalso {?MODULE, message, [0]}}
- | [{packet, hd(?util:scramble([false, raw]))}
- || T == sctp andalso CS]],
+ {message_cb, ST andalso {?MODULE, message, [0]}}]
+ ++ [{packet, hd(?util:scramble([false, raw]))}
+ || T == sctp andalso CS]
+ ++ [{unordered, unordered()} || T == sctp],
[{capabilities_cb, fun capx/2},
{pool_size, 8}
| server_apps()]
@@ -498,13 +499,17 @@ add_transports(Config) ->
Id <- [{D,E}]],
?util:write_priv(Config, "transport", [LRef | Cs]).
+unordered() ->
+ element(rand:uniform(4), {true, false, 1, 2}).
+
client_opts(tcp) ->
[];
client_opts(sctp) ->
- [{sctp_initmsg, #sctp_initmsg{num_ostreams = N,
- max_instreams = 5}}
- || N <- [rand:uniform(8)],
- N =< 6].
+ [{unordered, unordered()}
+ | [{sctp_initmsg, #sctp_initmsg{num_ostreams = N,
+ max_instreams = 5}}
+ || N <- [rand:uniform(8)],
+ N =< 6]].
server_apps() ->
B = have_nas(),