diff options
author | Anders Svensson <[email protected]> | 2017-06-10 23:40:53 +0200 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2017-06-11 16:30:39 +0200 |
commit | eadf4efc7e264fe8dd30befb42a42a02cdef58f1 (patch) | |
tree | 6ac266589946ff35f843cbf89d6913113cbb1629 /lib/diameter/src/transport/diameter_sctp.erl | |
parent | 034089ed1ba3f78c732edcfc84d85a6ed4a4854e (diff) | |
download | otp-eadf4efc7e264fe8dd30befb42a42a02cdef58f1.tar.gz otp-eadf4efc7e264fe8dd30befb42a42a02cdef58f1.tar.bz2 otp-eadf4efc7e264fe8dd30befb42a42a02cdef58f1.zip |
Make diameter_{tcp,sctp} sender configurable
With sends still from the receiving process by default, since changing
the default behaviour may well have negative effects. A separate sender
probably implies a greater need for some form of load regulation for
one, since a blocking send would no longer imply that incoming messages
are no longer recevied. Dealing with this could result in the same
deadlock that the sending process intends to avoid, but the user should
be in control over how/when incoming traffic is regulated.
Diffstat (limited to 'lib/diameter/src/transport/diameter_sctp.erl')
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 59 |
1 files changed, 41 insertions, 18 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index d23e56b413..3919596cb1 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -68,6 +68,7 @@ -type connect_option() :: {raddr, inet:ip_address()} | {rport, inet:port_number()} + | option() | term(). %% gen_sctp:open_option(). -type match() :: inet:ip_address() @@ -75,8 +76,12 @@ | [match()]. -type listen_option() :: {accept, match()} + | option() | term(). %% gen_sctp:open_option(). +-type option() :: {sender, boolean()} + | sender. + -type uint() :: non_neg_integer(). %% Accepting/connecting transport process state. @@ -96,7 +101,7 @@ streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, os = 0 :: uint(), %% next output stream - monitor :: pid() | undefined}). %% sending process + send = false :: pid() | boolean()}). %% sending process %% Monitor process state. -record(monitor, @@ -110,7 +115,8 @@ socket :: gen_sctp:sctp_socket(), service :: pid(), %% service process pending = {0, queue:new()}, - accept :: [match()]}). + accept :: [match()], + sender :: boolean()}). %% Field pending implements two queues: the first of transport-to-be %% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived @@ -236,7 +242,8 @@ i(#monitor{transport = TPid} = S) -> i({listen, Ref, {Opts, SvcPid, Addrs}}) -> monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {[Matches], Rest} = proplists:split(Opts, [accept]), + {Split, Rest} = proplists:split(Opts, [accept, sender]), + OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), @@ -244,12 +251,14 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) -> #listener{ref = Ref, service = SvcPid, socket = Sock, - accept = [[M] || {accept, M} <- Matches]}; + accept = [[M] || {accept, M} <- OwnOpts], + sender = proplists:get_value(sender, OwnOpts, false)}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> - {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]), - RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As], + {[Ps | Split], Rest} = proplists:split(Opts, [rport, raddr, sender]), + OwnOpts = lists:append(Split), + RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts], [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], {LAs, Sock} = open(Addrs, Rest, 0), putr(?REF_KEY, Ref), @@ -257,7 +266,8 @@ i({connect, Pid, Opts, Addrs, Ref}) -> monitor(process, Pid), #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, - socket = Sock}; + socket = Sock, + send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an %% association. @@ -291,11 +301,12 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> receive {Ref, Pid} when K == parent -> %% transport process started S#transport{parent = Pid}; - {K, T, Matches} when K == peeloff -> %% association + {K, T, Matches, Bool} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), - t(T, S#transport{socket = Sock}); + t(T, S#transport{socket = Sock, + send = Bool}); accept_timeout = T -> x(T); {'DOWN', _, process, _, _} = T -> @@ -466,11 +477,12 @@ getr(Key) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, - accept = Matches} + accept = Matches, + sender = Sender} = S) -> Id = assoc_id(Data), {TPid, NewS} = accept(S), - TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches}, + TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches, Sender}, setopts(Sock), NewS; @@ -546,14 +558,15 @@ transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> %% Parent process has died: call the monitor to not close the socket %% during an ongoing send, but don't let it take forever. transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, - monitor = MPid}) -> - undefined == MPid + send = MPid}) -> + is_boolean(MPid) orelse ok == (catch gen_server:call(MPid, {stop, Pid})) orelse exit(MPid, kill), stop; %% Monitor process has died. -transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> +transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) + when is_pid(MPid) -> stop; %% Timeout after transport process has been started. @@ -617,7 +630,7 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% send/2 %% Start monitor process on first send. -send(Msg, #transport{monitor = undefined, +send(Msg, #transport{send = true, socket = Sock, assoc_id = AId} = S) -> @@ -625,7 +638,7 @@ send(Msg, #transport{monitor = undefined, socket = Sock, assoc_id = AId}), monitor(process, MPid), - send(Msg, S#transport{monitor = MPid}); + send(Msg, S#transport{send = MPid}); %% Outbound Diameter message on a specified stream ... send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, @@ -646,13 +659,23 @@ send(Bin, #transport{streams = {_, OS}, %% send/3 -send(StreamId, Bin, #transport{monitor = MPid}) -> +send(StreamId, Bin, #transport{send = false, + socket = Sock, + assoc_id = AId}) -> + send(Sock, AId, StreamId, Bin); + +send(StreamId, Bin, #transport{send = MPid}) -> MPid ! {Bin, StreamId}, MPid; send(StreamId, Bin, #monitor{socket = Sock, assoc_id = AId}) -> - case gen_sctp:send(Sock, AId, StreamId, Bin) of + send(Sock, AId, StreamId, Bin). + +%% send/4 + +send(Sock, AssocId, StreamId, Bin) -> + case gen_sctp:send(Sock, AssocId, StreamId, Bin) of ok -> ok; {error, Reason} -> |