diff options
author | Anders Svensson <[email protected]> | 2017-06-10 23:15:37 +0200 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2017-06-12 15:36:36 +0200 |
commit | 636a719927b23751c12563b8e137ea8698e2abd5 (patch) | |
tree | 11d55116b32b37db9a1bc6ea0691366727222d6d | |
parent | eadf4efc7e264fe8dd30befb42a42a02cdef58f1 (diff) | |
download | otp-636a719927b23751c12563b8e137ea8698e2abd5.tar.gz otp-636a719927b23751c12563b8e137ea8698e2abd5.tar.bz2 otp-636a719927b23751c12563b8e137ea8698e2abd5.zip |
Add diameter_tcp send/recv callbacks
From the receiver process, that can return binaries to send/receive and
stop the transport process from reading on the socket.
This is still undocumented, and may change.
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 156 | ||||
-rw-r--r-- | lib/diameter/test/diameter_watchdog_SUITE.erl | 112 |
2 files changed, 165 insertions, 103 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index edbbec1709..5819d52bdc 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -82,6 +82,7 @@ -record(monitor, {parent :: reference() | false | pid(), transport = self() :: pid(), + ack = false :: boolean(), socket :: inet:socket() | ssl:sslsocket() | undefined, module :: module() | undefined}). @@ -109,11 +110,15 @@ -type option() :: {port, non_neg_integer()} | {sender, boolean()} + | sender + | {message_cb, false | diameter:evaluable()} | {fragment_timer, 0..16#FFFFFFFF}. %% Accepting/connecting transport process state. -record(transport, {socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket + active = false :: boolean(), %% is socket active? + recv = true :: boolean(), %% should it be active? parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module ssl :: [term()] | boolean(), %% ssl options, ssl or not @@ -121,7 +126,8 @@ timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? - send :: pid() | false}). %% sending process + message_cb :: false | diameter:evaluable(), + send :: pid() | false}). %% sending process %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -212,24 +218,28 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid}) %% sends outgoing messages. {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, sender, + message_cb, fragment_timer]), SslOpts = ssl_opts(SO), OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), - Sender = proplists:get_value(sender, OwnOpts, false), + [CB, Sender] = [proplists:get_value(K, OwnOpts, false) + || K <- [message_cb, sender]], ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid), M = if SslOpts -> ssl; true -> Mod end, Sender andalso monitor(process, MPid), - MPid ! {start, self(), Sender andalso {Sock, M}}, %% prepare for sending + false == CB orelse (Pid ! {diameter, ack}), + MPid ! {start, self(), Sender andalso {Sock, M}, false /= CB}, putr(?REF_KEY, Ref), setopts(#transport{parent = Pid, module = M, socket = Sock, ssl = SslOpts, + message_cb = CB, timeout = Tmo, send = Sender andalso MPid}); %% Put the reference in the process dictionary since we now use it @@ -520,13 +530,14 @@ m(Bin, S) %% Transport has established a connection. Stop monitoring on the %% parent so as not to die before a send from the transport. -m({start, TPid, T} = M, #monitor{transport = TPid} = S) -> +m({start, TPid, T, Ack} = M, #monitor{transport = TPid} = S) -> case T of {Sock, Mod} -> demonitor(S#monitor.parent, [flush]), S#monitor{parent = false, socket = Sock, - module = Mod}; + module = Mod, + ack = Ack}; false -> %% monitor not sending x(M) end; @@ -591,7 +602,7 @@ transition({P, Sock, Bin}, #transport{socket = Sock, = S) when P == ssl, true == B; P == tcp -> - recv(Bin, S); + recv(Bin, S#transport{active = false}); %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} @@ -618,8 +629,16 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, %% Outgoing message. transition({diameter, {send, Msg}}, #transport{} = S) -> - send(Msg, S), - ok; + message(send, Msg, S); + +%% Monitor has sent an outgoing message. +transition(Bin, S) + when is_binary(Bin) -> + message(ack, Bin, S); + +%% Deferred actions from a message_cb. +transition({actions, Dir, Acts}, S) -> + actions(Acts, Dir, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, @@ -699,12 +718,11 @@ tls(accept, Sock, Opts) -> %% using Nagle. %% Receive packets until a full message is received, -recv(Bin, #transport{parent = Pid, frag = Head} = S) -> +recv(Bin, #transport{frag = Head} = S) -> case rcv(Head, Bin) of {Msg, B} -> %% have a complete message ... - diameter_peer:recv(Pid, Msg), - recv(<<>>, S#transport{frag = B}); - Frag -> %% read more on the socket + message(recv, Msg, S#transport{frag = B}); + Frag -> %% read more on the socket start_fragment_timer(setopts(S#transport{frag = Frag, flush = false})) end. @@ -804,9 +822,8 @@ flush(#transport{flush = false} = S) -> start_fragment_timer(S#transport{flush = true}); %% No messages since last expiry. -flush(#transport{frag = Frag, parent = Pid} = S) -> - diameter_peer:recv(Pid, bin(Frag)), - S#transport{frag = <<>>}. +flush(#transport{frag = Frag} = S) -> + message(recv, bin(Frag), S#transport{frag = <<>>}). %% start_fragment_timer/1 %% @@ -839,23 +856,19 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(false, #transport{}) -> %% ack - ok; - -send(#diameter_packet{bin = Bin}, S) -> - send(Bin, S); - -send(Bin, #transport{socket = Sock, module = M, send = false}) -> - send1(M, Sock, Bin); +send(Bin, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) -> + send1(M, Sock, Bin), + B andalso (TPid ! Bin); -send(Bin, #monitor{socket = Sock, module = M}) -> - send1(M, Sock, Bin); +send(Bin, #transport{socket = Sock, module = M, send = false} = S) -> + send1(M, Sock, Bin), + message(ack, Bin, S); %% Send from the monitor process to avoid deadlock if both the %% receiver and the peer were to block in send. -send(Bin, #transport{send = Pid}) -> +send(Bin, #transport{send = Pid} = S) -> Pid ! Bin, - ok. + S. %% send1/3 @@ -866,7 +879,7 @@ send1(Mod, Sock, Bin) -> {error, Reason} -> x({send, Reason}) end. - + %% send/3 send(gen_tcp, Sock, Bin) -> @@ -888,12 +901,18 @@ setopts(M, Sock, Opts) -> %% setopts/1 setopts(#transport{socket = Sock, + active = A, + recv = B, module = M} - = S)-> + = S) + when B, not A -> case setopts(M, Sock, [{active, once}]) of - ok -> S; + ok -> S#transport{active = true}; X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect - end. + end; + +setopts(S) -> + S. %% portnr/2 @@ -928,3 +947,78 @@ getstat(gen_tcp, Sock) -> getstat(M, Sock) -> M:getstat(Sock). %% Note that ssl:getstat/1 doesn't yet exist in R15B01. + +%% A message_cb is invoked whenever a message is sent or received, or +%% to provide acknowledgement of a completed send or discarded +%% request. Ignoring possible extra arguments, calls are of the +%% following form. +%% +%% cb(recv, Bin) Pass a received message into diameter? +%% cb(send, Bin) Send a message? +%% cb(ack, Bin) Acknowledgement of a completed send. +%% cb(ack, false) Acknowledgement of a discarded request. +%% +%% Callbacks return a list of the following form. +%% +%% [boolean() | send | recv | binary()] +%% +%% The atoms are meaningless by themselves, but say whether subsequent +%% binaries are to be sent or received. A boolean says whether or not +%% to continue reading on the socket. Messages can be received even +%% after false is returned if these arrived in the same packet. A +%% leading recv or send is implicit on the corresponding callbacks. A +%% new callback can be returned as the tail of a returned list: any +%% value not of the aforementioned list type is interpreted as a +%% callback. + +%% message/3 + +message(send, false = M, S) -> + message(ack, M, S); + +message(ack, _, #transport{message_cb = false} = S) -> + S; + +message(Dir, #diameter_packet{bin = Bin}, S) -> + message(Dir, Bin, S); + +message(Dir, Bin, #transport{message_cb = CB} = S) -> + recv(<<>>, actions(cb(CB, Dir, Bin), Dir, S)). + +%% actions/3 + +actions([], _, S) -> + S; + +actions([B | As], Dir, S) + when is_boolean(B) -> + actions(As, Dir, S#transport{recv = B}); + +actions([Dir | As], _, S) + when Dir == send; + Dir == recv -> + actions(As, Dir, S); + +actions([Bin | As], send = Dir, #transport{} = S) + when is_binary(Bin) -> + actions(As, Dir, send(Bin, S)); + +actions([Bin | As], recv = Dir, #transport{parent = Pid} = S) + when is_binary(Bin) -> + diameter_peer:recv(Pid, Bin), + actions(As, Dir, S); + +actions([{defer, Tmo, Acts} | As], Dir, S) -> + erlang:send_after(Tmo, self(), {actions, Dir, Acts}), + actions(As, Dir, S); + +actions(CB, _, S) -> + S#transport{message_cb = CB}. + +%% cb/3 + +cb(false, _, Bin) -> + [Bin]; + +cb(CB, Dir, Msg) -> + diameter_lib:eval([CB, Dir, Msg]). diff --git a/lib/diameter/test/diameter_watchdog_SUITE.erl b/lib/diameter/test/diameter_watchdog_SUITE.erl index 5ae951f7c2..39c4f051a5 100644 --- a/lib/diameter/test/diameter_watchdog_SUITE.erl +++ b/lib/diameter/test/diameter_watchdog_SUITE.erl @@ -44,13 +44,8 @@ -export([peer_up/3, peer_down/3]). -%% gen_tcp-ish interface --export([listen/2, - accept/1, - connect/3, - send/2, - setopts/2, - close/1]). +%% diameter_tcp message_cb +-export([message/3]). -include("diameter.hrl"). -include("diameter_ct.hrl"). @@ -161,9 +156,9 @@ reopen(Type, Test, Ref, Wd, N, M) -> reopen(Type, Test, SvcName, TRef, Wd, N, M). cfg(Type, Type, Wd) -> - {Wd, [], []}; + {Wd, [], false}; cfg(_Type, _Test, _Wd) -> - {?WD(?PEER_WD), [{okay, 0}], [{module, ?MODULE}]}. + {?WD(?PEER_WD), [{okay, 0}], true}. %% reopen/7 @@ -346,7 +341,7 @@ recv_reopen(listen, Ref) -> %% reg/3 %% %% Lookup the pid of the transport process and publish a term for -%% send/2 to lookup. +%% message/3 to lookup. reg(TRef, SvcName, T) -> TPid = tpid(TRef, diameter:service_info(SvcName, transport)), true = diameter_reg:add_new({?MODULE, TPid, T}). @@ -394,7 +389,7 @@ suspect(_) -> suspect(Type, Fake, Ref, N) when is_reference(Ref) -> {SvcName, TRef} - = start(Type, Ref, {?WD(10000), [{suspect, N}], mod(Fake)}), + = start(Type, Ref, {?WD(10000), [{suspect, N}], Fake}), {initial, okay} = ?WD_EVENT(TRef), suspect(TRef, Fake, SvcName, N); @@ -436,11 +431,6 @@ abuse([F|A], Test) -> abuse(F, Test) -> abuse([F], Test). -mod(true) -> - [{module, ?MODULE}]; -mod(false) -> - []. - %% =========================================================================== %% # okay/1 %% =========================================================================== @@ -456,7 +446,7 @@ okay(Type, Fake, Ref, N) {SvcName, TRef} = start(Type, Ref, {?WD(10000), [{okay, choose(Fake, 0, N)}], - mod(Fake)}), + Fake}), {initial, okay} = ?WD_EVENT(TRef), okay(TRef, Fake, @@ -515,12 +505,17 @@ start(Type, Ref, T) -> true = diameter_reg:add_new({Type, Ref, Name}), {Name, TRef}. -opts(Type, Ref, {Timer, Config, Mod}) -> +opts(Type, Ref, {Timer, Config, Fake}) + when is_boolean(Fake) -> [{transport_module, diameter_tcp}, - {transport_config, Mod ++ [{ip, ?ADDR}, {port, 0}] ++ cfg(Type, Ref)}, + {transport_config, mod(Fake) ++ [{ip, ?ADDR}, {port, 0}] + ++ cfg(Type, Ref)}, {watchdog_timer, Timer}, {watchdog_config, Config}]. +mod(B) -> + [{message_cb, [fun message/3, capx]} || B]. + cfg(listen, _) -> []; cfg(connect, Ref) -> @@ -531,37 +526,29 @@ cfg(connect, Ref) -> %% =========================================================================== -listen(PortNr, Opts) -> - gen_tcp:listen(PortNr, Opts). - -accept(LSock) -> - gen_tcp:accept(LSock). - -connect(Addr, Port, Opts) -> - gen_tcp:connect(Addr, Port, Opts). +%% message/3 -setopts(Sock, Opts) -> - inet:setopts(Sock, Opts). +message(send, Bin, X) -> + send(Bin, X); -send(Sock, Bin) -> - send(getr(config), Sock, Bin). +message(recv, Bin, _) -> + [Bin]; -close(Sock) -> - gen_tcp:close(Sock). +message(_, _, _) -> + []. -%% send/3 +%% send/2 %% First outgoing message from a new transport process is CER/CEA. %% Remaining outgoing messages are either DWR or DWA. -send(undefined, Sock, Bin) -> - <<_:32, _:8, 257:24, _/binary>> = Bin, - putr(config, init), - gen_tcp:send(Sock, Bin); +send(Bin, capx) -> + <<_:32, _:8, 257:24, _/binary>> = Bin, %% assert on CER/CEA + [Bin, fun message/3, init]; %% Outgoing DWR: fake reception of DWA. Use the fact that AVP values %% are ignored. This is to ensure that the peer's watchdog state %% transitions are only induced by responses to messages it sends. -send(_, Sock, <<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>) -> +send(<<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>, _) -> Pkt = #diameter_packet{header = #diameter_header{version = 1, end_to_end_id = EId, hop_by_hop_id = HId}, @@ -569,55 +556,36 @@ send(_, Sock, <<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>) -> {'Origin-Host', "XXX"}, {'Origin-Realm', ?REALM}]}, #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt), - tpid(Sock) ! {tcp, Sock, Bin}, - ok; + [recv, Bin]; %% First outgoing DWA. -send(init, Sock, Bin) -> - [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, tpid(Sock), '_'}), - putr(config, T), - send(Sock, Bin); +send(Bin, init) -> + [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, self(), '_'}), + send(Bin, T); %% First transport process. -send({SvcName, {_,_,_} = T}, Sock, Bin) -> +send(Bin, {SvcName, {_,_,_} = T}) -> [{'Origin-Host', _} = OH, {'Origin-Realm', _} = OR | _] = ?SERVICE(SvcName), putr(origin, [OH, OR]), - putr(config, T), - send(Sock, Bin); + send(Bin, T); %% Discard DWA, failback after another timeout in the peer. -send({Wd, 0 = No, Msg}, Sock, Bin) -> +send(Bin, {Wd, 0 = No, Msg}) -> Origin = getr(origin), - spawn(fun() -> failback(?ONE_WD(Wd), Msg, Sock, Bin, Origin) end), - putr(config, No), - ok; + [{defer, ?ONE_WD(Wd), [msg(Msg, Bin, Origin)]}, fun message/3, No]; %% Send DWA while we're in the mood (aka 0 < N). -send({Wd, N, Msg}, Sock, Bin) -> - putr(config, {Wd, N-1, Msg}), - gen_tcp:send(Sock, Bin); +send(Bin, {Wd, N, Msg}) -> + [Bin, fun message/3, {Wd, N-1, Msg}]; %% Discard DWA. -send(0, _Sock, _Bin) -> - ok; +send(_Bin, 0 = No) -> + [fun message/3, No]; %% Send DWA. -send(N, Sock, <<_:32, 0:1, _:7, 280:24, _/binary>> = Bin) -> - putr(config, N-1), - gen_tcp:send(Sock, Bin). - -%% tpid/1 - -tpid(Sock) -> - {connected, Pid} = erlang:port_info(Sock, connected), - Pid. - -%%failback/5 - -failback(Tmo, Msg, Sock, Bin, Origin) -> - timer:sleep(Tmo), - ok = gen_tcp:send(Sock, msg(Msg, Bin, Origin)). +send(<<_:32, 0:1, _:7, 280:24, _/binary>> = DWA, N) -> + [DWA, fun message/3, N-1]. %% msg/2 |