aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl59
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl81
2 files changed, 90 insertions, 50 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index d23e56b413..3919596cb1 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -68,6 +68,7 @@
-type connect_option() :: {raddr, inet:ip_address()}
| {rport, inet:port_number()}
+ | option()
| term(). %% gen_sctp:open_option().
-type match() :: inet:ip_address()
@@ -75,8 +76,12 @@
| [match()].
-type listen_option() :: {accept, match()}
+ | option()
| term(). %% gen_sctp:open_option().
+-type option() :: {sender, boolean()}
+ | sender.
+
-type uint() :: non_neg_integer().
%% Accepting/connecting transport process state.
@@ -96,7 +101,7 @@
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
os = 0 :: uint(), %% next output stream
- monitor :: pid() | undefined}). %% sending process
+ send = false :: pid() | boolean()}). %% sending process
%% Monitor process state.
-record(monitor,
@@ -110,7 +115,8 @@
socket :: gen_sctp:sctp_socket(),
service :: pid(), %% service process
pending = {0, queue:new()},
- accept :: [match()]}).
+ accept :: [match()],
+ sender :: boolean()}).
%% Field pending implements two queues: the first of transport-to-be
%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
@@ -236,7 +242,8 @@ i(#monitor{transport = TPid} = S) ->
i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
monitor(process, SvcPid),
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {[Matches], Rest} = proplists:split(Opts, [accept]),
+ {Split, Rest} = proplists:split(Opts, [accept, sender]),
+ OwnOpts = lists:append(Split),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}),
@@ -244,12 +251,14 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
#listener{ref = Ref,
service = SvcPid,
socket = Sock,
- accept = [[M] || {accept, M} <- Matches]};
+ accept = [[M] || {accept, M} <- OwnOpts],
+ sender = proplists:get_value(sender, OwnOpts, false)};
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
- {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]),
- RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As],
+ {[Ps | Split], Rest} = proplists:split(Opts, [rport, raddr, sender]),
+ OwnOpts = lists:append(Split),
+ RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts],
[RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps],
{LAs, Sock} = open(Addrs, Rest, 0),
putr(?REF_KEY, Ref),
@@ -257,7 +266,8 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
monitor(process, Pid),
#transport{parent = Pid,
mode = {connect, connect(Sock, RAs, RP, [])},
- socket = Sock};
+ socket = Sock,
+ send = proplists:get_value(sender, OwnOpts, false)};
%% An accepting transport spawned by diameter, not yet owning an
%% association.
@@ -291,11 +301,12 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
receive
{Ref, Pid} when K == parent -> %% transport process started
S#transport{parent = Pid};
- {K, T, Matches} when K == peeloff -> %% association
+ {K, T, Matches, Bool} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
- t(T, S#transport{socket = Sock});
+ t(T, S#transport{socket = Sock,
+ send = Bool});
accept_timeout = T ->
x(T);
{'DOWN', _, process, _, _} = T ->
@@ -466,11 +477,12 @@ getr(Key) ->
%% Incoming message from SCTP.
l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
- accept = Matches}
+ accept = Matches,
+ sender = Sender}
= S) ->
Id = assoc_id(Data),
{TPid, NewS} = accept(S),
- TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches},
+ TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches, Sender},
setopts(Sock),
NewS;
@@ -546,14 +558,15 @@ transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
%% Parent process has died: call the monitor to not close the socket
%% during an ongoing send, but don't let it take forever.
transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
- monitor = MPid}) ->
- undefined == MPid
+ send = MPid}) ->
+ is_boolean(MPid)
orelse ok == (catch gen_server:call(MPid, {stop, Pid}))
orelse exit(MPid, kill),
stop;
%% Monitor process has died.
-transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) ->
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop;
%% Timeout after transport process has been started.
@@ -617,7 +630,7 @@ q(Ref, Pid, #listener{pending = {_,Q}}) ->
%% send/2
%% Start monitor process on first send.
-send(Msg, #transport{monitor = undefined,
+send(Msg, #transport{send = true,
socket = Sock,
assoc_id = AId}
= S) ->
@@ -625,7 +638,7 @@ send(Msg, #transport{monitor = undefined,
socket = Sock,
assoc_id = AId}),
monitor(process, MPid),
- send(Msg, S#transport{monitor = MPid});
+ send(Msg, S#transport{send = MPid});
%% Outbound Diameter message on a specified stream ...
send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}},
@@ -646,13 +659,23 @@ send(Bin, #transport{streams = {_, OS},
%% send/3
-send(StreamId, Bin, #transport{monitor = MPid}) ->
+send(StreamId, Bin, #transport{send = false,
+ socket = Sock,
+ assoc_id = AId}) ->
+ send(Sock, AId, StreamId, Bin);
+
+send(StreamId, Bin, #transport{send = MPid}) ->
MPid ! {Bin, StreamId},
MPid;
send(StreamId, Bin, #monitor{socket = Sock,
assoc_id = AId}) ->
- case gen_sctp:send(Sock, AId, StreamId, Bin) of
+ send(Sock, AId, StreamId, Bin).
+
+%% send/4
+
+send(Sock, AssocId, StreamId, Bin) ->
+ case gen_sctp:send(Sock, AssocId, StreamId, Bin) of
ok ->
ok;
{error, Reason} ->
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 427b2395b9..edbbec1709 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -69,16 +69,16 @@
%% of processes: an actual transport process, one that will club it to
%% death should the parent die before a connection is established, and
%% a process owning the listening port. The monitor process
-%% historically died after connection establishment, but now lives on
-%% as the sender of outgoing messages, so that a blocking send doesn't
-%% prevent messages from being received.
+%% historically died after connection establishment, but can now live
+%% on as the sender of outgoing messages, so that a blocking send
+%% doesn't prevent messages from being received.
%% Listener process state.
-record(listener, {socket :: inet:socket(),
module :: module(),
service = false :: false | pid()}). %% service process
-%% Monitor process state. The name monitor predates its role as sender.
+%% Monitor process state.
-record(monitor,
{parent :: reference() | false | pid(),
transport = self() :: pid(),
@@ -108,6 +108,7 @@
| gen_tcp:listen_option().
-type option() :: {port, non_neg_integer()}
+ | {sender, boolean()}
| {fragment_timer, 0..16#FFFFFFFF}.
%% Accepting/connecting transport process state.
@@ -120,7 +121,7 @@
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
- monitor :: pid()}). %% monitor/sender process
+ send :: pid() | false}). %% sending process
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
@@ -210,25 +211,27 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid})
%% that kills us with the parent until call returns, and then
%% sends outgoing messages.
{[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
+ sender,
fragment_timer]),
SslOpts = ssl_opts(SO),
OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
+ Sender = proplists:get_value(sender, OwnOpts, false),
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid),
M = if SslOpts -> ssl; true -> Mod end,
- monitor(process, MPid),
- MPid ! {start, self(), Sock, M}, %% prepare monitor for sending
+ Sender andalso monitor(process, MPid),
+ MPid ! {start, self(), Sender andalso {Sock, M}}, %% prepare for sending
putr(?REF_KEY, Ref),
setopts(#transport{parent = Pid,
module = M,
socket = Sock,
ssl = SslOpts,
timeout = Tmo,
- monitor = MPid});
+ send = Sender andalso MPid});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
@@ -515,15 +518,22 @@ m(Bin, S)
send(Bin, S),
S;
-%% Transport is telling us to be ready to send. Stop monitoring on the
+%% Transport has established a connection. Stop monitoring on the
%% parent so as not to die before a send from the transport.
-m({start, TPid, Sock, Mod}, #monitor{parent = MRef,
- transport = TPid}
- = S) ->
- demonitor(MRef, [flush]),
- S#monitor{parent = false,
- socket = Sock,
- module = Mod};
+m({start, TPid, T} = M, #monitor{transport = TPid} = S) ->
+ case T of
+ {Sock, Mod} ->
+ demonitor(S#monitor.parent, [flush]),
+ S#monitor{parent = false,
+ socket = Sock,
+ module = Mod};
+ false -> %% monitor not sending
+ x(M)
+ end;
+
+%% Transport is telling us to die.
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
%% Transport is telling us to die.
m({stop, TPid} = T, #monitor{transport = TPid}) ->
@@ -632,13 +642,15 @@ transition({resolve_port, Pid}, #transport{socket = Sock,
%% Parent process has died: call the monitor to not close the socket
%% during an ongoing send, but don't let it take forever.
transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
- monitor = MPid}) ->
- ok == (catch gen_server:call(MPid, {stop, Pid}))
- orelse exit(MPid, kill),
+ send = MPid}) ->
+ false == MPid
+ orelse (ok == gen_server:call(MPid, {stop, self()}, 1000))
+ orelse exit(MPid, {shutdown, parent}),
stop;
%% Monitor process has died.
-transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) ->
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop.
%% Crash on anything unexpected.
@@ -663,12 +675,12 @@ tls_handshake(_, true, #transport{ssl = false}) ->
tls_handshake(Type, true, #transport{socket = Sock,
module = M,
ssl = Opts,
- monitor = MPid}
+ send = MPid}
= S) ->
{ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]),
Ref = getr(?REF_KEY),
true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}),
- MPid ! {tls, SSock}, %% tell the monitor process
+ false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process
S#transport{socket = SSock,
module = ssl};
@@ -827,29 +839,34 @@ connect(Mod, Host, Port, Opts) ->
%% send/2
+send(false, #transport{}) -> %% ack
+ ok;
+
send(#diameter_packet{bin = Bin}, S) ->
send(Bin, S);
-send(Bin, #monitor{} = S) ->
- send1(Bin, S);
+send(Bin, #transport{socket = Sock, module = M, send = false}) ->
+ send1(M, Sock, Bin);
+
+send(Bin, #monitor{socket = Sock, module = M}) ->
+ send1(M, Sock, Bin);
%% Send from the monitor process to avoid deadlock if both the
%% receiver and the peer were to block in send.
-send(Bin, #transport{monitor = MPid}) ->
- MPid ! Bin,
- MPid.
+send(Bin, #transport{send = Pid}) ->
+ Pid ! Bin,
+ ok.
-%% send1/2
+%% send1/3
-send1(Bin, #monitor{socket = Sock,
- module = M}) ->
- case send(M, Sock, Bin) of
+send1(Mod, Sock, Bin) ->
+ case send(Mod, Sock, Bin) of
ok ->
ok;
{error, Reason} ->
x({send, Reason})
end.
-
+
%% send/3
send(gen_tcp, Sock, Bin) ->