From d38295257c5309e7291487edfacdfad605eec3e9 Mon Sep 17 00:00:00 2001
From: Anders Svensson <anders@erlang.org>
Date: Thu, 14 Sep 2017 12:59:17 +0200
Subject: Make unordered delivery configurable

Changing the default in the parent commit is possibly a bit dangerous,
even if the motivation still holds. Take a step back and make unordered
delivery a matter of configuration, without changing the default:
configuration is {unordered, boolean() | pos_integer()}, with false the
default, and N equivalent to OS =< N, where OS is the number of outbound
streams negotiated on the association in question.

A user can mess with this by configuring an sctp_default_send_param of
their own, but unordered sending is them from start, not only after the
second message reception.
---
 lib/diameter/src/transport/diameter_sctp.erl | 47 ++++++++++++++++++++++------
 lib/diameter/test/diameter_traffic_SUITE.erl | 19 ++++++-----
 2 files changed, 49 insertions(+), 17 deletions(-)

(limited to 'lib/diameter')

diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 2af11729a9..73a1a9191d 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -103,6 +103,8 @@
                    | undefined,
          os = 0   :: uint(),               %% next output stream
          rotate = 1 :: boolean() | 0 | 1,  %% rotate os?
+         unordered = false :: boolean()    %% always send unordered?
+                            | pos_integer(),% or if =< N outbound streams?
          packet = true :: boolean()        %% legacy transport_data?
                         | raw,
          message_cb = false :: false | diameter:eval(),
@@ -247,8 +249,11 @@ i(#monitor{transport = TPid} = S) ->
 i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
     monitor(process, SvcPid),
     [_] = diameter_config:subscribe(Ref, transport), %% assert existence
-    {Split, Rest}
-        = proplists:split(Opts, [accept, packet, sender, message_cb]),
+    {Split, Rest} = proplists:split(Opts, [accept,
+                                           packet,
+                                           sender,
+                                           message_cb,
+                                           unordered]),
     OwnOpts = lists:append(Split),
     {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
     ok = gen_sctp:listen(Sock, true),
@@ -260,12 +265,16 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
               opts = [[[M] || {accept, M} <- OwnOpts],
                       proplists:get_value(packet, OwnOpts, true)
                       | [proplists:get_value(K, OwnOpts, false)
-                         || K <- [sender, message_cb]]]};
+                         || K <- [sender, message_cb, unordered]]]};
 
 %% A connecting transport.
 i({connect, Pid, Opts, Addrs, Ref}) ->
-    {[Ps | Split], Rest}
-        = proplists:split(Opts, [rport, raddr, packet, sender, message_cb]),
+    {[Ps | Split], Rest} = proplists:split(Opts, [rport,
+                                                  raddr,
+                                                  packet,
+                                                  sender,
+                                                  message_cb,
+                                                  unordered]),
     OwnOpts = lists:append(Split),
     CB = proplists:get_value(message_cb, OwnOpts, false),
     false == CB orelse (Pid ! {diameter, ack}),
@@ -279,6 +288,7 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
                mode = {connect, connect(Sock, RAs, RP, [])},
                socket = Sock,
                message_cb = CB,
+               unordered = proplists:get_value(ordered, OwnOpts, false),
                packet = proplists:get_value(packet, OwnOpts, true),
                send = proplists:get_value(sender, OwnOpts, false)};
 
@@ -316,12 +326,13 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
             S#transport{parent = Pid};
         {K, T, Opts} when K == peeloff ->  %% association
             {sctp, Sock, _RA, _RP, _Data} = T,
-            [Matches, Packet, Sender, CB] = Opts,
+            [Matches, Packet, Sender, CB, Unordered] = Opts,
             ok = accept_peer(Sock, Matches),
             demonitor(Ref, [flush]),
             false == CB orelse (S#transport.parent ! {diameter, ack}),
             t(T, S#transport{socket = Sock,
                              message_cb = CB,
+                             unordered = Unordered,
                              packet = Packet,
                              send = Sender});
         accept_timeout = T ->
@@ -784,14 +795,30 @@ recv(#transport{rotate = B} = S)
   when is_boolean(B) ->
     S;
 
-recv(#transport{rotate = 0, streams = {_,N}, socket = Sock} = S) ->
-    ok = inet:setopts(Sock, [{sctp_default_send_param,
-                              #sctp_sndrcvinfo{flags = [unordered]}}]),
-    S#transport{rotate = 1 < N};
+recv(#transport{rotate = 0,
+                streams = {_,OS},
+                socket = Sock,
+                unordered = B}
+     = S) ->
+    ok = unordered(Sock, OS, B),
+    S#transport{rotate = 1 < OS};
 
 recv(#transport{rotate = N} = S) ->
     S#transport{rotate = N-1}.
 
+%% unordered/3
+
+unordered(Sock, OS, B)
+  when B;
+       is_integer(B), OS =< B ->
+    inet:setopts(Sock, [{sctp_default_send_param,
+                         #sctp_sndrcvinfo{flags = [unordered]}}]);
+
+unordered(_, OS, B)
+  when not B;
+       is_integer(B), B < OS ->
+    ok.
+
 %% publish/4
 
 publish(T, Ref, Id, Sock) ->
diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl
index f058ed65b8..2c478140f8 100644
--- a/lib/diameter/test/diameter_traffic_SUITE.erl
+++ b/lib/diameter/test/diameter_traffic_SUITE.erl
@@ -480,9 +480,10 @@ add_transports(Config) ->
     LRef = ?util:listen(SN,
                         [T,
                          {sender, SS},
-                         {message_cb, ST andalso {?MODULE, message, [0]}}
-                         | [{packet, hd(?util:scramble([false, raw]))}
-                            || T == sctp andalso CS]],
+                         {message_cb, ST andalso {?MODULE, message, [0]}}]
+                        ++ [{packet, hd(?util:scramble([false, raw]))}
+                            || T == sctp andalso CS]
+                        ++ [{unordered, unordered()} || T == sctp],
                         [{capabilities_cb, fun capx/2},
                          {pool_size, 8}
                          | server_apps()]
@@ -498,13 +499,17 @@ add_transports(Config) ->
              Id <- [{D,E}]],
     ?util:write_priv(Config, "transport", [LRef | Cs]).
 
+unordered() ->
+    element(rand:uniform(4), {true, false, 1, 2}).
+
 client_opts(tcp) ->
     [];
 client_opts(sctp) ->
-    [{sctp_initmsg, #sctp_initmsg{num_ostreams = N,
-                                  max_instreams = 5}}
-     || N <- [rand:uniform(8)],
-        N =< 6].
+    [{unordered, unordered()}
+     | [{sctp_initmsg, #sctp_initmsg{num_ostreams = N,
+                                     max_instreams = 5}}
+        || N <- [rand:uniform(8)],
+           N =< 6]].
 
 server_apps() ->
     B = have_nas(),
-- 
cgit v1.2.3