aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-06-10 23:15:37 +0200
committerAnders Svensson <[email protected]>2017-06-12 15:36:36 +0200
commit636a719927b23751c12563b8e137ea8698e2abd5 (patch)
tree11d55116b32b37db9a1bc6ea0691366727222d6d
parenteadf4efc7e264fe8dd30befb42a42a02cdef58f1 (diff)
downloadotp-636a719927b23751c12563b8e137ea8698e2abd5.tar.gz
otp-636a719927b23751c12563b8e137ea8698e2abd5.tar.bz2
otp-636a719927b23751c12563b8e137ea8698e2abd5.zip
Add diameter_tcp send/recv callbacks
From the receiver process, that can return binaries to send/receive and stop the transport process from reading on the socket. This is still undocumented, and may change.
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl156
-rw-r--r--lib/diameter/test/diameter_watchdog_SUITE.erl112
2 files changed, 165 insertions, 103 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index edbbec1709..5819d52bdc 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -82,6 +82,7 @@
-record(monitor,
{parent :: reference() | false | pid(),
transport = self() :: pid(),
+ ack = false :: boolean(),
socket :: inet:socket() | ssl:sslsocket() | undefined,
module :: module() | undefined}).
@@ -109,11 +110,15 @@
-type option() :: {port, non_neg_integer()}
| {sender, boolean()}
+ | sender
+ | {message_cb, false | diameter:evaluable()}
| {fragment_timer, 0..16#FFFFFFFF}.
%% Accepting/connecting transport process state.
-record(transport,
{socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket
+ active = false :: boolean(), %% is socket active?
+ recv = true :: boolean(), %% should it be active?
parent :: pid(), %% of process that started us
module :: module(), %% gen_tcp-like module
ssl :: [term()] | boolean(), %% ssl options, ssl or not
@@ -121,7 +126,8 @@
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
- send :: pid() | false}). %% sending process
+ message_cb :: false | diameter:evaluable(),
+ send :: pid() | false}). %% sending process
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
@@ -212,24 +218,28 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid})
%% sends outgoing messages.
{[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
sender,
+ message_cb,
fragment_timer]),
SslOpts = ssl_opts(SO),
OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
- Sender = proplists:get_value(sender, OwnOpts, false),
+ [CB, Sender] = [proplists:get_value(K, OwnOpts, false)
+ || K <- [message_cb, sender]],
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid),
M = if SslOpts -> ssl; true -> Mod end,
Sender andalso monitor(process, MPid),
- MPid ! {start, self(), Sender andalso {Sock, M}}, %% prepare for sending
+ false == CB orelse (Pid ! {diameter, ack}),
+ MPid ! {start, self(), Sender andalso {Sock, M}, false /= CB},
putr(?REF_KEY, Ref),
setopts(#transport{parent = Pid,
module = M,
socket = Sock,
ssl = SslOpts,
+ message_cb = CB,
timeout = Tmo,
send = Sender andalso MPid});
%% Put the reference in the process dictionary since we now use it
@@ -520,13 +530,14 @@ m(Bin, S)
%% Transport has established a connection. Stop monitoring on the
%% parent so as not to die before a send from the transport.
-m({start, TPid, T} = M, #monitor{transport = TPid} = S) ->
+m({start, TPid, T, Ack} = M, #monitor{transport = TPid} = S) ->
case T of
{Sock, Mod} ->
demonitor(S#monitor.parent, [flush]),
S#monitor{parent = false,
socket = Sock,
- module = Mod};
+ module = Mod,
+ ack = Ack};
false -> %% monitor not sending
x(M)
end;
@@ -591,7 +602,7 @@ transition({P, Sock, Bin}, #transport{socket = Sock,
= S)
when P == ssl, true == B;
P == tcp ->
- recv(Bin, S);
+ recv(Bin, S#transport{active = false});
%% Capabilties exchange has decided on whether or not to run over TLS.
transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
@@ -618,8 +629,16 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock,
%% Outgoing message.
transition({diameter, {send, Msg}}, #transport{} = S) ->
- send(Msg, S),
- ok;
+ message(send, Msg, S);
+
+%% Monitor has sent an outgoing message.
+transition(Bin, S)
+ when is_binary(Bin) ->
+ message(ack, Bin, S);
+
+%% Deferred actions from a message_cb.
+transition({actions, Dir, Acts}, S) ->
+ actions(Acts, Dir, S);
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,
@@ -699,12 +718,11 @@ tls(accept, Sock, Opts) ->
%% using Nagle.
%% Receive packets until a full message is received,
-recv(Bin, #transport{parent = Pid, frag = Head} = S) ->
+recv(Bin, #transport{frag = Head} = S) ->
case rcv(Head, Bin) of
{Msg, B} -> %% have a complete message ...
- diameter_peer:recv(Pid, Msg),
- recv(<<>>, S#transport{frag = B});
- Frag -> %% read more on the socket
+ message(recv, Msg, S#transport{frag = B});
+ Frag -> %% read more on the socket
start_fragment_timer(setopts(S#transport{frag = Frag,
flush = false}))
end.
@@ -804,9 +822,8 @@ flush(#transport{flush = false} = S) ->
start_fragment_timer(S#transport{flush = true});
%% No messages since last expiry.
-flush(#transport{frag = Frag, parent = Pid} = S) ->
- diameter_peer:recv(Pid, bin(Frag)),
- S#transport{frag = <<>>}.
+flush(#transport{frag = Frag} = S) ->
+ message(recv, bin(Frag), S#transport{frag = <<>>}).
%% start_fragment_timer/1
%%
@@ -839,23 +856,19 @@ connect(Mod, Host, Port, Opts) ->
%% send/2
-send(false, #transport{}) -> %% ack
- ok;
-
-send(#diameter_packet{bin = Bin}, S) ->
- send(Bin, S);
-
-send(Bin, #transport{socket = Sock, module = M, send = false}) ->
- send1(M, Sock, Bin);
+send(Bin, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) ->
+ send1(M, Sock, Bin),
+ B andalso (TPid ! Bin);
-send(Bin, #monitor{socket = Sock, module = M}) ->
- send1(M, Sock, Bin);
+send(Bin, #transport{socket = Sock, module = M, send = false} = S) ->
+ send1(M, Sock, Bin),
+ message(ack, Bin, S);
%% Send from the monitor process to avoid deadlock if both the
%% receiver and the peer were to block in send.
-send(Bin, #transport{send = Pid}) ->
+send(Bin, #transport{send = Pid} = S) ->
Pid ! Bin,
- ok.
+ S.
%% send1/3
@@ -866,7 +879,7 @@ send1(Mod, Sock, Bin) ->
{error, Reason} ->
x({send, Reason})
end.
-
+
%% send/3
send(gen_tcp, Sock, Bin) ->
@@ -888,12 +901,18 @@ setopts(M, Sock, Opts) ->
%% setopts/1
setopts(#transport{socket = Sock,
+ active = A,
+ recv = B,
module = M}
- = S)->
+ = S)
+ when B, not A ->
case setopts(M, Sock, [{active, once}]) of
- ok -> S;
+ ok -> S#transport{active = true};
X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect
- end.
+ end;
+
+setopts(S) ->
+ S.
%% portnr/2
@@ -928,3 +947,78 @@ getstat(gen_tcp, Sock) ->
getstat(M, Sock) ->
M:getstat(Sock).
%% Note that ssl:getstat/1 doesn't yet exist in R15B01.
+
+%% A message_cb is invoked whenever a message is sent or received, or
+%% to provide acknowledgement of a completed send or discarded
+%% request. Ignoring possible extra arguments, calls are of the
+%% following form.
+%%
+%% cb(recv, Bin) Pass a received message into diameter?
+%% cb(send, Bin) Send a message?
+%% cb(ack, Bin) Acknowledgement of a completed send.
+%% cb(ack, false) Acknowledgement of a discarded request.
+%%
+%% Callbacks return a list of the following form.
+%%
+%% [boolean() | send | recv | binary()]
+%%
+%% The atoms are meaningless by themselves, but say whether subsequent
+%% binaries are to be sent or received. A boolean says whether or not
+%% to continue reading on the socket. Messages can be received even
+%% after false is returned if these arrived in the same packet. A
+%% leading recv or send is implicit on the corresponding callbacks. A
+%% new callback can be returned as the tail of a returned list: any
+%% value not of the aforementioned list type is interpreted as a
+%% callback.
+
+%% message/3
+
+message(send, false = M, S) ->
+ message(ack, M, S);
+
+message(ack, _, #transport{message_cb = false} = S) ->
+ S;
+
+message(Dir, #diameter_packet{bin = Bin}, S) ->
+ message(Dir, Bin, S);
+
+message(Dir, Bin, #transport{message_cb = CB} = S) ->
+ recv(<<>>, actions(cb(CB, Dir, Bin), Dir, S)).
+
+%% actions/3
+
+actions([], _, S) ->
+ S;
+
+actions([B | As], Dir, S)
+ when is_boolean(B) ->
+ actions(As, Dir, S#transport{recv = B});
+
+actions([Dir | As], _, S)
+ when Dir == send;
+ Dir == recv ->
+ actions(As, Dir, S);
+
+actions([Bin | As], send = Dir, #transport{} = S)
+ when is_binary(Bin) ->
+ actions(As, Dir, send(Bin, S));
+
+actions([Bin | As], recv = Dir, #transport{parent = Pid} = S)
+ when is_binary(Bin) ->
+ diameter_peer:recv(Pid, Bin),
+ actions(As, Dir, S);
+
+actions([{defer, Tmo, Acts} | As], Dir, S) ->
+ erlang:send_after(Tmo, self(), {actions, Dir, Acts}),
+ actions(As, Dir, S);
+
+actions(CB, _, S) ->
+ S#transport{message_cb = CB}.
+
+%% cb/3
+
+cb(false, _, Bin) ->
+ [Bin];
+
+cb(CB, Dir, Msg) ->
+ diameter_lib:eval([CB, Dir, Msg]).
diff --git a/lib/diameter/test/diameter_watchdog_SUITE.erl b/lib/diameter/test/diameter_watchdog_SUITE.erl
index 5ae951f7c2..39c4f051a5 100644
--- a/lib/diameter/test/diameter_watchdog_SUITE.erl
+++ b/lib/diameter/test/diameter_watchdog_SUITE.erl
@@ -44,13 +44,8 @@
-export([peer_up/3,
peer_down/3]).
-%% gen_tcp-ish interface
--export([listen/2,
- accept/1,
- connect/3,
- send/2,
- setopts/2,
- close/1]).
+%% diameter_tcp message_cb
+-export([message/3]).
-include("diameter.hrl").
-include("diameter_ct.hrl").
@@ -161,9 +156,9 @@ reopen(Type, Test, Ref, Wd, N, M) ->
reopen(Type, Test, SvcName, TRef, Wd, N, M).
cfg(Type, Type, Wd) ->
- {Wd, [], []};
+ {Wd, [], false};
cfg(_Type, _Test, _Wd) ->
- {?WD(?PEER_WD), [{okay, 0}], [{module, ?MODULE}]}.
+ {?WD(?PEER_WD), [{okay, 0}], true}.
%% reopen/7
@@ -346,7 +341,7 @@ recv_reopen(listen, Ref) ->
%% reg/3
%%
%% Lookup the pid of the transport process and publish a term for
-%% send/2 to lookup.
+%% message/3 to lookup.
reg(TRef, SvcName, T) ->
TPid = tpid(TRef, diameter:service_info(SvcName, transport)),
true = diameter_reg:add_new({?MODULE, TPid, T}).
@@ -394,7 +389,7 @@ suspect(_) ->
suspect(Type, Fake, Ref, N)
when is_reference(Ref) ->
{SvcName, TRef}
- = start(Type, Ref, {?WD(10000), [{suspect, N}], mod(Fake)}),
+ = start(Type, Ref, {?WD(10000), [{suspect, N}], Fake}),
{initial, okay} = ?WD_EVENT(TRef),
suspect(TRef, Fake, SvcName, N);
@@ -436,11 +431,6 @@ abuse([F|A], Test) ->
abuse(F, Test) ->
abuse([F], Test).
-mod(true) ->
- [{module, ?MODULE}];
-mod(false) ->
- [].
-
%% ===========================================================================
%% # okay/1
%% ===========================================================================
@@ -456,7 +446,7 @@ okay(Type, Fake, Ref, N)
{SvcName, TRef}
= start(Type, Ref, {?WD(10000),
[{okay, choose(Fake, 0, N)}],
- mod(Fake)}),
+ Fake}),
{initial, okay} = ?WD_EVENT(TRef),
okay(TRef,
Fake,
@@ -515,12 +505,17 @@ start(Type, Ref, T) ->
true = diameter_reg:add_new({Type, Ref, Name}),
{Name, TRef}.
-opts(Type, Ref, {Timer, Config, Mod}) ->
+opts(Type, Ref, {Timer, Config, Fake})
+ when is_boolean(Fake) ->
[{transport_module, diameter_tcp},
- {transport_config, Mod ++ [{ip, ?ADDR}, {port, 0}] ++ cfg(Type, Ref)},
+ {transport_config, mod(Fake) ++ [{ip, ?ADDR}, {port, 0}]
+ ++ cfg(Type, Ref)},
{watchdog_timer, Timer},
{watchdog_config, Config}].
+mod(B) ->
+ [{message_cb, [fun message/3, capx]} || B].
+
cfg(listen, _) ->
[];
cfg(connect, Ref) ->
@@ -531,37 +526,29 @@ cfg(connect, Ref) ->
%% ===========================================================================
-listen(PortNr, Opts) ->
- gen_tcp:listen(PortNr, Opts).
-
-accept(LSock) ->
- gen_tcp:accept(LSock).
-
-connect(Addr, Port, Opts) ->
- gen_tcp:connect(Addr, Port, Opts).
+%% message/3
-setopts(Sock, Opts) ->
- inet:setopts(Sock, Opts).
+message(send, Bin, X) ->
+ send(Bin, X);
-send(Sock, Bin) ->
- send(getr(config), Sock, Bin).
+message(recv, Bin, _) ->
+ [Bin];
-close(Sock) ->
- gen_tcp:close(Sock).
+message(_, _, _) ->
+ [].
-%% send/3
+%% send/2
%% First outgoing message from a new transport process is CER/CEA.
%% Remaining outgoing messages are either DWR or DWA.
-send(undefined, Sock, Bin) ->
- <<_:32, _:8, 257:24, _/binary>> = Bin,
- putr(config, init),
- gen_tcp:send(Sock, Bin);
+send(Bin, capx) ->
+ <<_:32, _:8, 257:24, _/binary>> = Bin, %% assert on CER/CEA
+ [Bin, fun message/3, init];
%% Outgoing DWR: fake reception of DWA. Use the fact that AVP values
%% are ignored. This is to ensure that the peer's watchdog state
%% transitions are only induced by responses to messages it sends.
-send(_, Sock, <<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>) ->
+send(<<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>, _) ->
Pkt = #diameter_packet{header = #diameter_header{version = 1,
end_to_end_id = EId,
hop_by_hop_id = HId},
@@ -569,55 +556,36 @@ send(_, Sock, <<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>) ->
{'Origin-Host', "XXX"},
{'Origin-Realm', ?REALM}]},
#diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt),
- tpid(Sock) ! {tcp, Sock, Bin},
- ok;
+ [recv, Bin];
%% First outgoing DWA.
-send(init, Sock, Bin) ->
- [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, tpid(Sock), '_'}),
- putr(config, T),
- send(Sock, Bin);
+send(Bin, init) ->
+ [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, self(), '_'}),
+ send(Bin, T);
%% First transport process.
-send({SvcName, {_,_,_} = T}, Sock, Bin) ->
+send(Bin, {SvcName, {_,_,_} = T}) ->
[{'Origin-Host', _} = OH, {'Origin-Realm', _} = OR | _]
= ?SERVICE(SvcName),
putr(origin, [OH, OR]),
- putr(config, T),
- send(Sock, Bin);
+ send(Bin, T);
%% Discard DWA, failback after another timeout in the peer.
-send({Wd, 0 = No, Msg}, Sock, Bin) ->
+send(Bin, {Wd, 0 = No, Msg}) ->
Origin = getr(origin),
- spawn(fun() -> failback(?ONE_WD(Wd), Msg, Sock, Bin, Origin) end),
- putr(config, No),
- ok;
+ [{defer, ?ONE_WD(Wd), [msg(Msg, Bin, Origin)]}, fun message/3, No];
%% Send DWA while we're in the mood (aka 0 < N).
-send({Wd, N, Msg}, Sock, Bin) ->
- putr(config, {Wd, N-1, Msg}),
- gen_tcp:send(Sock, Bin);
+send(Bin, {Wd, N, Msg}) ->
+ [Bin, fun message/3, {Wd, N-1, Msg}];
%% Discard DWA.
-send(0, _Sock, _Bin) ->
- ok;
+send(_Bin, 0 = No) ->
+ [fun message/3, No];
%% Send DWA.
-send(N, Sock, <<_:32, 0:1, _:7, 280:24, _/binary>> = Bin) ->
- putr(config, N-1),
- gen_tcp:send(Sock, Bin).
-
-%% tpid/1
-
-tpid(Sock) ->
- {connected, Pid} = erlang:port_info(Sock, connected),
- Pid.
-
-%%failback/5
-
-failback(Tmo, Msg, Sock, Bin, Origin) ->
- timer:sleep(Tmo),
- ok = gen_tcp:send(Sock, msg(Msg, Bin, Origin)).
+send(<<_:32, 0:1, _:7, 280:24, _/binary>> = DWA, N) ->
+ [DWA, fun message/3, N-1].
%% msg/2