aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_sctp.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/transport/diameter_sctp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl59
1 files changed, 41 insertions, 18 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index d23e56b413..3919596cb1 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -68,6 +68,7 @@
-type connect_option() :: {raddr, inet:ip_address()}
| {rport, inet:port_number()}
+ | option()
| term(). %% gen_sctp:open_option().
-type match() :: inet:ip_address()
@@ -75,8 +76,12 @@
| [match()].
-type listen_option() :: {accept, match()}
+ | option()
| term(). %% gen_sctp:open_option().
+-type option() :: {sender, boolean()}
+ | sender.
+
-type uint() :: non_neg_integer().
%% Accepting/connecting transport process state.
@@ -96,7 +101,7 @@
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
os = 0 :: uint(), %% next output stream
- monitor :: pid() | undefined}). %% sending process
+ send = false :: pid() | boolean()}). %% sending process
%% Monitor process state.
-record(monitor,
@@ -110,7 +115,8 @@
socket :: gen_sctp:sctp_socket(),
service :: pid(), %% service process
pending = {0, queue:new()},
- accept :: [match()]}).
+ accept :: [match()],
+ sender :: boolean()}).
%% Field pending implements two queues: the first of transport-to-be
%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
@@ -236,7 +242,8 @@ i(#monitor{transport = TPid} = S) ->
i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
monitor(process, SvcPid),
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {[Matches], Rest} = proplists:split(Opts, [accept]),
+ {Split, Rest} = proplists:split(Opts, [accept, sender]),
+ OwnOpts = lists:append(Split),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}),
@@ -244,12 +251,14 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
#listener{ref = Ref,
service = SvcPid,
socket = Sock,
- accept = [[M] || {accept, M} <- Matches]};
+ accept = [[M] || {accept, M} <- OwnOpts],
+ sender = proplists:get_value(sender, OwnOpts, false)};
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
- {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]),
- RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As],
+ {[Ps | Split], Rest} = proplists:split(Opts, [rport, raddr, sender]),
+ OwnOpts = lists:append(Split),
+ RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts],
[RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps],
{LAs, Sock} = open(Addrs, Rest, 0),
putr(?REF_KEY, Ref),
@@ -257,7 +266,8 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
monitor(process, Pid),
#transport{parent = Pid,
mode = {connect, connect(Sock, RAs, RP, [])},
- socket = Sock};
+ socket = Sock,
+ send = proplists:get_value(sender, OwnOpts, false)};
%% An accepting transport spawned by diameter, not yet owning an
%% association.
@@ -291,11 +301,12 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
receive
{Ref, Pid} when K == parent -> %% transport process started
S#transport{parent = Pid};
- {K, T, Matches} when K == peeloff -> %% association
+ {K, T, Matches, Bool} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
- t(T, S#transport{socket = Sock});
+ t(T, S#transport{socket = Sock,
+ send = Bool});
accept_timeout = T ->
x(T);
{'DOWN', _, process, _, _} = T ->
@@ -466,11 +477,12 @@ getr(Key) ->
%% Incoming message from SCTP.
l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
- accept = Matches}
+ accept = Matches,
+ sender = Sender}
= S) ->
Id = assoc_id(Data),
{TPid, NewS} = accept(S),
- TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches},
+ TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches, Sender},
setopts(Sock),
NewS;
@@ -546,14 +558,15 @@ transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
%% Parent process has died: call the monitor to not close the socket
%% during an ongoing send, but don't let it take forever.
transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
- monitor = MPid}) ->
- undefined == MPid
+ send = MPid}) ->
+ is_boolean(MPid)
orelse ok == (catch gen_server:call(MPid, {stop, Pid}))
orelse exit(MPid, kill),
stop;
%% Monitor process has died.
-transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) ->
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop;
%% Timeout after transport process has been started.
@@ -617,7 +630,7 @@ q(Ref, Pid, #listener{pending = {_,Q}}) ->
%% send/2
%% Start monitor process on first send.
-send(Msg, #transport{monitor = undefined,
+send(Msg, #transport{send = true,
socket = Sock,
assoc_id = AId}
= S) ->
@@ -625,7 +638,7 @@ send(Msg, #transport{monitor = undefined,
socket = Sock,
assoc_id = AId}),
monitor(process, MPid),
- send(Msg, S#transport{monitor = MPid});
+ send(Msg, S#transport{send = MPid});
%% Outbound Diameter message on a specified stream ...
send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}},
@@ -646,13 +659,23 @@ send(Bin, #transport{streams = {_, OS},
%% send/3
-send(StreamId, Bin, #transport{monitor = MPid}) ->
+send(StreamId, Bin, #transport{send = false,
+ socket = Sock,
+ assoc_id = AId}) ->
+ send(Sock, AId, StreamId, Bin);
+
+send(StreamId, Bin, #transport{send = MPid}) ->
MPid ! {Bin, StreamId},
MPid;
send(StreamId, Bin, #monitor{socket = Sock,
assoc_id = AId}) ->
- case gen_sctp:send(Sock, AId, StreamId, Bin) of
+ send(Sock, AId, StreamId, Bin).
+
+%% send/4
+
+send(Sock, AssocId, StreamId, Bin) ->
+ case gen_sctp:send(Sock, AssocId, StreamId, Bin) of
ok ->
ok;
{error, Reason} ->