aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_sctp.erl
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-06-10 23:40:53 +0200
committerAnders Svensson <[email protected]>2017-06-11 16:30:39 +0200
commiteadf4efc7e264fe8dd30befb42a42a02cdef58f1 (patch)
tree6ac266589946ff35f843cbf89d6913113cbb1629 /lib/diameter/src/transport/diameter_sctp.erl
parent034089ed1ba3f78c732edcfc84d85a6ed4a4854e (diff)
downloadotp-eadf4efc7e264fe8dd30befb42a42a02cdef58f1.tar.gz
otp-eadf4efc7e264fe8dd30befb42a42a02cdef58f1.tar.bz2
otp-eadf4efc7e264fe8dd30befb42a42a02cdef58f1.zip
Make diameter_{tcp,sctp} sender configurable
With sends still from the receiving process by default, since changing the default behaviour may well have negative effects. A separate sender probably implies a greater need for some form of load regulation for one, since a blocking send would no longer imply that incoming messages are no longer recevied. Dealing with this could result in the same deadlock that the sending process intends to avoid, but the user should be in control over how/when incoming traffic is regulated.
Diffstat (limited to 'lib/diameter/src/transport/diameter_sctp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl59
1 files changed, 41 insertions, 18 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index d23e56b413..3919596cb1 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -68,6 +68,7 @@
-type connect_option() :: {raddr, inet:ip_address()}
| {rport, inet:port_number()}
+ | option()
| term(). %% gen_sctp:open_option().
-type match() :: inet:ip_address()
@@ -75,8 +76,12 @@
| [match()].
-type listen_option() :: {accept, match()}
+ | option()
| term(). %% gen_sctp:open_option().
+-type option() :: {sender, boolean()}
+ | sender.
+
-type uint() :: non_neg_integer().
%% Accepting/connecting transport process state.
@@ -96,7 +101,7 @@
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
os = 0 :: uint(), %% next output stream
- monitor :: pid() | undefined}). %% sending process
+ send = false :: pid() | boolean()}). %% sending process
%% Monitor process state.
-record(monitor,
@@ -110,7 +115,8 @@
socket :: gen_sctp:sctp_socket(),
service :: pid(), %% service process
pending = {0, queue:new()},
- accept :: [match()]}).
+ accept :: [match()],
+ sender :: boolean()}).
%% Field pending implements two queues: the first of transport-to-be
%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
@@ -236,7 +242,8 @@ i(#monitor{transport = TPid} = S) ->
i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
monitor(process, SvcPid),
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {[Matches], Rest} = proplists:split(Opts, [accept]),
+ {Split, Rest} = proplists:split(Opts, [accept, sender]),
+ OwnOpts = lists:append(Split),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}),
@@ -244,12 +251,14 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
#listener{ref = Ref,
service = SvcPid,
socket = Sock,
- accept = [[M] || {accept, M} <- Matches]};
+ accept = [[M] || {accept, M} <- OwnOpts],
+ sender = proplists:get_value(sender, OwnOpts, false)};
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
- {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]),
- RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As],
+ {[Ps | Split], Rest} = proplists:split(Opts, [rport, raddr, sender]),
+ OwnOpts = lists:append(Split),
+ RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts],
[RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps],
{LAs, Sock} = open(Addrs, Rest, 0),
putr(?REF_KEY, Ref),
@@ -257,7 +266,8 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
monitor(process, Pid),
#transport{parent = Pid,
mode = {connect, connect(Sock, RAs, RP, [])},
- socket = Sock};
+ socket = Sock,
+ send = proplists:get_value(sender, OwnOpts, false)};
%% An accepting transport spawned by diameter, not yet owning an
%% association.
@@ -291,11 +301,12 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
receive
{Ref, Pid} when K == parent -> %% transport process started
S#transport{parent = Pid};
- {K, T, Matches} when K == peeloff -> %% association
+ {K, T, Matches, Bool} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
- t(T, S#transport{socket = Sock});
+ t(T, S#transport{socket = Sock,
+ send = Bool});
accept_timeout = T ->
x(T);
{'DOWN', _, process, _, _} = T ->
@@ -466,11 +477,12 @@ getr(Key) ->
%% Incoming message from SCTP.
l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
- accept = Matches}
+ accept = Matches,
+ sender = Sender}
= S) ->
Id = assoc_id(Data),
{TPid, NewS} = accept(S),
- TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches},
+ TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches, Sender},
setopts(Sock),
NewS;
@@ -546,14 +558,15 @@ transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
%% Parent process has died: call the monitor to not close the socket
%% during an ongoing send, but don't let it take forever.
transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
- monitor = MPid}) ->
- undefined == MPid
+ send = MPid}) ->
+ is_boolean(MPid)
orelse ok == (catch gen_server:call(MPid, {stop, Pid}))
orelse exit(MPid, kill),
stop;
%% Monitor process has died.
-transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) ->
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop;
%% Timeout after transport process has been started.
@@ -617,7 +630,7 @@ q(Ref, Pid, #listener{pending = {_,Q}}) ->
%% send/2
%% Start monitor process on first send.
-send(Msg, #transport{monitor = undefined,
+send(Msg, #transport{send = true,
socket = Sock,
assoc_id = AId}
= S) ->
@@ -625,7 +638,7 @@ send(Msg, #transport{monitor = undefined,
socket = Sock,
assoc_id = AId}),
monitor(process, MPid),
- send(Msg, S#transport{monitor = MPid});
+ send(Msg, S#transport{send = MPid});
%% Outbound Diameter message on a specified stream ...
send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}},
@@ -646,13 +659,23 @@ send(Bin, #transport{streams = {_, OS},
%% send/3
-send(StreamId, Bin, #transport{monitor = MPid}) ->
+send(StreamId, Bin, #transport{send = false,
+ socket = Sock,
+ assoc_id = AId}) ->
+ send(Sock, AId, StreamId, Bin);
+
+send(StreamId, Bin, #transport{send = MPid}) ->
MPid ! {Bin, StreamId},
MPid;
send(StreamId, Bin, #monitor{socket = Sock,
assoc_id = AId}) ->
- case gen_sctp:send(Sock, AId, StreamId, Bin) of
+ send(Sock, AId, StreamId, Bin).
+
+%% send/4
+
+send(Sock, AssocId, StreamId, Bin) ->
+ case gen_sctp:send(Sock, AssocId, StreamId, Bin) of
ok ->
ok;
{error, Reason} ->