From d38295257c5309e7291487edfacdfad605eec3e9 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 14 Sep 2017 12:59:17 +0200 Subject: Make unordered delivery configurable Changing the default in the parent commit is possibly a bit dangerous, even if the motivation still holds. Take a step back and make unordered delivery a matter of configuration, without changing the default: configuration is {unordered, boolean() | pos_integer()}, with false the default, and N equivalent to OS =< N, where OS is the number of outbound streams negotiated on the association in question. A user can mess with this by configuring an sctp_default_send_param of their own, but unordered sending is them from start, not only after the second message reception. --- lib/diameter/src/transport/diameter_sctp.erl | 47 ++++++++++++++++++++++------ lib/diameter/test/diameter_traffic_SUITE.erl | 19 ++++++----- 2 files changed, 49 insertions(+), 17 deletions(-) (limited to 'lib/diameter') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 2af11729a9..73a1a9191d 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -103,6 +103,8 @@ | undefined, os = 0 :: uint(), %% next output stream rotate = 1 :: boolean() | 0 | 1, %% rotate os? + unordered = false :: boolean() %% always send unordered? + | pos_integer(),% or if =< N outbound streams? packet = true :: boolean() %% legacy transport_data? | raw, message_cb = false :: false | diameter:eval(), @@ -247,8 +249,11 @@ i(#monitor{transport = TPid} = S) -> i({listen, Ref, {Opts, SvcPid, Addrs}}) -> monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {Split, Rest} - = proplists:split(Opts, [accept, packet, sender, message_cb]), + {Split, Rest} = proplists:split(Opts, [accept, + packet, + sender, + message_cb, + unordered]), OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), @@ -260,12 +265,16 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) -> opts = [[[M] || {accept, M} <- OwnOpts], proplists:get_value(packet, OwnOpts, true) | [proplists:get_value(K, OwnOpts, false) - || K <- [sender, message_cb]]]}; + || K <- [sender, message_cb, unordered]]]}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> - {[Ps | Split], Rest} - = proplists:split(Opts, [rport, raddr, packet, sender, message_cb]), + {[Ps | Split], Rest} = proplists:split(Opts, [rport, + raddr, + packet, + sender, + message_cb, + unordered]), OwnOpts = lists:append(Split), CB = proplists:get_value(message_cb, OwnOpts, false), false == CB orelse (Pid ! {diameter, ack}), @@ -279,6 +288,7 @@ i({connect, Pid, Opts, Addrs, Ref}) -> mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock, message_cb = CB, + unordered = proplists:get_value(ordered, OwnOpts, false), packet = proplists:get_value(packet, OwnOpts, true), send = proplists:get_value(sender, OwnOpts, false)}; @@ -316,12 +326,13 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> S#transport{parent = Pid}; {K, T, Opts} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, - [Matches, Packet, Sender, CB] = Opts, + [Matches, Packet, Sender, CB, Unordered] = Opts, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), false == CB orelse (S#transport.parent ! {diameter, ack}), t(T, S#transport{socket = Sock, message_cb = CB, + unordered = Unordered, packet = Packet, send = Sender}); accept_timeout = T -> @@ -784,14 +795,30 @@ recv(#transport{rotate = B} = S) when is_boolean(B) -> S; -recv(#transport{rotate = 0, streams = {_,N}, socket = Sock} = S) -> - ok = inet:setopts(Sock, [{sctp_default_send_param, - #sctp_sndrcvinfo{flags = [unordered]}}]), - S#transport{rotate = 1 < N}; +recv(#transport{rotate = 0, + streams = {_,OS}, + socket = Sock, + unordered = B} + = S) -> + ok = unordered(Sock, OS, B), + S#transport{rotate = 1 < OS}; recv(#transport{rotate = N} = S) -> S#transport{rotate = N-1}. +%% unordered/3 + +unordered(Sock, OS, B) + when B; + is_integer(B), OS =< B -> + inet:setopts(Sock, [{sctp_default_send_param, + #sctp_sndrcvinfo{flags = [unordered]}}]); + +unordered(_, OS, B) + when not B; + is_integer(B), B < OS -> + ok. + %% publish/4 publish(T, Ref, Id, Sock) -> diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index f058ed65b8..2c478140f8 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -480,9 +480,10 @@ add_transports(Config) -> LRef = ?util:listen(SN, [T, {sender, SS}, - {message_cb, ST andalso {?MODULE, message, [0]}} - | [{packet, hd(?util:scramble([false, raw]))} - || T == sctp andalso CS]], + {message_cb, ST andalso {?MODULE, message, [0]}}] + ++ [{packet, hd(?util:scramble([false, raw]))} + || T == sctp andalso CS] + ++ [{unordered, unordered()} || T == sctp], [{capabilities_cb, fun capx/2}, {pool_size, 8} | server_apps()] @@ -498,13 +499,17 @@ add_transports(Config) -> Id <- [{D,E}]], ?util:write_priv(Config, "transport", [LRef | Cs]). +unordered() -> + element(rand:uniform(4), {true, false, 1, 2}). + client_opts(tcp) -> []; client_opts(sctp) -> - [{sctp_initmsg, #sctp_initmsg{num_ostreams = N, - max_instreams = 5}} - || N <- [rand:uniform(8)], - N =< 6]. + [{unordered, unordered()} + | [{sctp_initmsg, #sctp_initmsg{num_ostreams = N, + max_instreams = 5}} + || N <- [rand:uniform(8)], + N =< 6]]. server_apps() -> B = have_nas(), -- cgit v1.2.3