aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_tcp.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/transport/diameter_tcp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl156
1 files changed, 125 insertions, 31 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index edbbec1709..5819d52bdc 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -82,6 +82,7 @@
-record(monitor,
{parent :: reference() | false | pid(),
transport = self() :: pid(),
+ ack = false :: boolean(),
socket :: inet:socket() | ssl:sslsocket() | undefined,
module :: module() | undefined}).
@@ -109,11 +110,15 @@
-type option() :: {port, non_neg_integer()}
| {sender, boolean()}
+ | sender
+ | {message_cb, false | diameter:evaluable()}
| {fragment_timer, 0..16#FFFFFFFF}.
%% Accepting/connecting transport process state.
-record(transport,
{socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket
+ active = false :: boolean(), %% is socket active?
+ recv = true :: boolean(), %% should it be active?
parent :: pid(), %% of process that started us
module :: module(), %% gen_tcp-like module
ssl :: [term()] | boolean(), %% ssl options, ssl or not
@@ -121,7 +126,8 @@
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
- send :: pid() | false}). %% sending process
+ message_cb :: false | diameter:evaluable(),
+ send :: pid() | false}). %% sending process
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
@@ -212,24 +218,28 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid})
%% sends outgoing messages.
{[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
sender,
+ message_cb,
fragment_timer]),
SslOpts = ssl_opts(SO),
OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
- Sender = proplists:get_value(sender, OwnOpts, false),
+ [CB, Sender] = [proplists:get_value(K, OwnOpts, false)
+ || K <- [message_cb, sender]],
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid),
M = if SslOpts -> ssl; true -> Mod end,
Sender andalso monitor(process, MPid),
- MPid ! {start, self(), Sender andalso {Sock, M}}, %% prepare for sending
+ false == CB orelse (Pid ! {diameter, ack}),
+ MPid ! {start, self(), Sender andalso {Sock, M}, false /= CB},
putr(?REF_KEY, Ref),
setopts(#transport{parent = Pid,
module = M,
socket = Sock,
ssl = SslOpts,
+ message_cb = CB,
timeout = Tmo,
send = Sender andalso MPid});
%% Put the reference in the process dictionary since we now use it
@@ -520,13 +530,14 @@ m(Bin, S)
%% Transport has established a connection. Stop monitoring on the
%% parent so as not to die before a send from the transport.
-m({start, TPid, T} = M, #monitor{transport = TPid} = S) ->
+m({start, TPid, T, Ack} = M, #monitor{transport = TPid} = S) ->
case T of
{Sock, Mod} ->
demonitor(S#monitor.parent, [flush]),
S#monitor{parent = false,
socket = Sock,
- module = Mod};
+ module = Mod,
+ ack = Ack};
false -> %% monitor not sending
x(M)
end;
@@ -591,7 +602,7 @@ transition({P, Sock, Bin}, #transport{socket = Sock,
= S)
when P == ssl, true == B;
P == tcp ->
- recv(Bin, S);
+ recv(Bin, S#transport{active = false});
%% Capabilties exchange has decided on whether or not to run over TLS.
transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
@@ -618,8 +629,16 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock,
%% Outgoing message.
transition({diameter, {send, Msg}}, #transport{} = S) ->
- send(Msg, S),
- ok;
+ message(send, Msg, S);
+
+%% Monitor has sent an outgoing message.
+transition(Bin, S)
+ when is_binary(Bin) ->
+ message(ack, Bin, S);
+
+%% Deferred actions from a message_cb.
+transition({actions, Dir, Acts}, S) ->
+ actions(Acts, Dir, S);
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,
@@ -699,12 +718,11 @@ tls(accept, Sock, Opts) ->
%% using Nagle.
%% Receive packets until a full message is received,
-recv(Bin, #transport{parent = Pid, frag = Head} = S) ->
+recv(Bin, #transport{frag = Head} = S) ->
case rcv(Head, Bin) of
{Msg, B} -> %% have a complete message ...
- diameter_peer:recv(Pid, Msg),
- recv(<<>>, S#transport{frag = B});
- Frag -> %% read more on the socket
+ message(recv, Msg, S#transport{frag = B});
+ Frag -> %% read more on the socket
start_fragment_timer(setopts(S#transport{frag = Frag,
flush = false}))
end.
@@ -804,9 +822,8 @@ flush(#transport{flush = false} = S) ->
start_fragment_timer(S#transport{flush = true});
%% No messages since last expiry.
-flush(#transport{frag = Frag, parent = Pid} = S) ->
- diameter_peer:recv(Pid, bin(Frag)),
- S#transport{frag = <<>>}.
+flush(#transport{frag = Frag} = S) ->
+ message(recv, bin(Frag), S#transport{frag = <<>>}).
%% start_fragment_timer/1
%%
@@ -839,23 +856,19 @@ connect(Mod, Host, Port, Opts) ->
%% send/2
-send(false, #transport{}) -> %% ack
- ok;
-
-send(#diameter_packet{bin = Bin}, S) ->
- send(Bin, S);
-
-send(Bin, #transport{socket = Sock, module = M, send = false}) ->
- send1(M, Sock, Bin);
+send(Bin, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) ->
+ send1(M, Sock, Bin),
+ B andalso (TPid ! Bin);
-send(Bin, #monitor{socket = Sock, module = M}) ->
- send1(M, Sock, Bin);
+send(Bin, #transport{socket = Sock, module = M, send = false} = S) ->
+ send1(M, Sock, Bin),
+ message(ack, Bin, S);
%% Send from the monitor process to avoid deadlock if both the
%% receiver and the peer were to block in send.
-send(Bin, #transport{send = Pid}) ->
+send(Bin, #transport{send = Pid} = S) ->
Pid ! Bin,
- ok.
+ S.
%% send1/3
@@ -866,7 +879,7 @@ send1(Mod, Sock, Bin) ->
{error, Reason} ->
x({send, Reason})
end.
-
+
%% send/3
send(gen_tcp, Sock, Bin) ->
@@ -888,12 +901,18 @@ setopts(M, Sock, Opts) ->
%% setopts/1
setopts(#transport{socket = Sock,
+ active = A,
+ recv = B,
module = M}
- = S)->
+ = S)
+ when B, not A ->
case setopts(M, Sock, [{active, once}]) of
- ok -> S;
+ ok -> S#transport{active = true};
X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect
- end.
+ end;
+
+setopts(S) ->
+ S.
%% portnr/2
@@ -928,3 +947,78 @@ getstat(gen_tcp, Sock) ->
getstat(M, Sock) ->
M:getstat(Sock).
%% Note that ssl:getstat/1 doesn't yet exist in R15B01.
+
+%% A message_cb is invoked whenever a message is sent or received, or
+%% to provide acknowledgement of a completed send or discarded
+%% request. Ignoring possible extra arguments, calls are of the
+%% following form.
+%%
+%% cb(recv, Bin) Pass a received message into diameter?
+%% cb(send, Bin) Send a message?
+%% cb(ack, Bin) Acknowledgement of a completed send.
+%% cb(ack, false) Acknowledgement of a discarded request.
+%%
+%% Callbacks return a list of the following form.
+%%
+%% [boolean() | send | recv | binary()]
+%%
+%% The atoms are meaningless by themselves, but say whether subsequent
+%% binaries are to be sent or received. A boolean says whether or not
+%% to continue reading on the socket. Messages can be received even
+%% after false is returned if these arrived in the same packet. A
+%% leading recv or send is implicit on the corresponding callbacks. A
+%% new callback can be returned as the tail of a returned list: any
+%% value not of the aforementioned list type is interpreted as a
+%% callback.
+
+%% message/3
+
+message(send, false = M, S) ->
+ message(ack, M, S);
+
+message(ack, _, #transport{message_cb = false} = S) ->
+ S;
+
+message(Dir, #diameter_packet{bin = Bin}, S) ->
+ message(Dir, Bin, S);
+
+message(Dir, Bin, #transport{message_cb = CB} = S) ->
+ recv(<<>>, actions(cb(CB, Dir, Bin), Dir, S)).
+
+%% actions/3
+
+actions([], _, S) ->
+ S;
+
+actions([B | As], Dir, S)
+ when is_boolean(B) ->
+ actions(As, Dir, S#transport{recv = B});
+
+actions([Dir | As], _, S)
+ when Dir == send;
+ Dir == recv ->
+ actions(As, Dir, S);
+
+actions([Bin | As], send = Dir, #transport{} = S)
+ when is_binary(Bin) ->
+ actions(As, Dir, send(Bin, S));
+
+actions([Bin | As], recv = Dir, #transport{parent = Pid} = S)
+ when is_binary(Bin) ->
+ diameter_peer:recv(Pid, Bin),
+ actions(As, Dir, S);
+
+actions([{defer, Tmo, Acts} | As], Dir, S) ->
+ erlang:send_after(Tmo, self(), {actions, Dir, Acts}),
+ actions(As, Dir, S);
+
+actions(CB, _, S) ->
+ S#transport{message_cb = CB}.
+
+%% cb/3
+
+cb(false, _, Bin) ->
+ [Bin];
+
+cb(CB, Dir, Msg) ->
+ diameter_lib:eval([CB, Dir, Msg]).