diff options
author | Anders Svensson <[email protected]> | 2017-06-10 23:15:37 +0200 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2017-06-12 15:36:36 +0200 |
commit | 636a719927b23751c12563b8e137ea8698e2abd5 (patch) | |
tree | 11d55116b32b37db9a1bc6ea0691366727222d6d /lib/diameter/src | |
parent | eadf4efc7e264fe8dd30befb42a42a02cdef58f1 (diff) | |
download | otp-636a719927b23751c12563b8e137ea8698e2abd5.tar.gz otp-636a719927b23751c12563b8e137ea8698e2abd5.tar.bz2 otp-636a719927b23751c12563b8e137ea8698e2abd5.zip |
Add diameter_tcp send/recv callbacks
From the receiver process, that can return binaries to send/receive and
stop the transport process from reading on the socket.
This is still undocumented, and may change.
Diffstat (limited to 'lib/diameter/src')
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 156 |
1 files changed, 125 insertions, 31 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index edbbec1709..5819d52bdc 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -82,6 +82,7 @@ -record(monitor, {parent :: reference() | false | pid(), transport = self() :: pid(), + ack = false :: boolean(), socket :: inet:socket() | ssl:sslsocket() | undefined, module :: module() | undefined}). @@ -109,11 +110,15 @@ -type option() :: {port, non_neg_integer()} | {sender, boolean()} + | sender + | {message_cb, false | diameter:evaluable()} | {fragment_timer, 0..16#FFFFFFFF}. %% Accepting/connecting transport process state. -record(transport, {socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket + active = false :: boolean(), %% is socket active? + recv = true :: boolean(), %% should it be active? parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module ssl :: [term()] | boolean(), %% ssl options, ssl or not @@ -121,7 +126,8 @@ timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? - send :: pid() | false}). %% sending process + message_cb :: false | diameter:evaluable(), + send :: pid() | false}). %% sending process %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -212,24 +218,28 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid}) %% sends outgoing messages. {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, sender, + message_cb, fragment_timer]), SslOpts = ssl_opts(SO), OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), - Sender = proplists:get_value(sender, OwnOpts, false), + [CB, Sender] = [proplists:get_value(K, OwnOpts, false) + || K <- [message_cb, sender]], ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid), M = if SslOpts -> ssl; true -> Mod end, Sender andalso monitor(process, MPid), - MPid ! {start, self(), Sender andalso {Sock, M}}, %% prepare for sending + false == CB orelse (Pid ! {diameter, ack}), + MPid ! {start, self(), Sender andalso {Sock, M}, false /= CB}, putr(?REF_KEY, Ref), setopts(#transport{parent = Pid, module = M, socket = Sock, ssl = SslOpts, + message_cb = CB, timeout = Tmo, send = Sender andalso MPid}); %% Put the reference in the process dictionary since we now use it @@ -520,13 +530,14 @@ m(Bin, S) %% Transport has established a connection. Stop monitoring on the %% parent so as not to die before a send from the transport. -m({start, TPid, T} = M, #monitor{transport = TPid} = S) -> +m({start, TPid, T, Ack} = M, #monitor{transport = TPid} = S) -> case T of {Sock, Mod} -> demonitor(S#monitor.parent, [flush]), S#monitor{parent = false, socket = Sock, - module = Mod}; + module = Mod, + ack = Ack}; false -> %% monitor not sending x(M) end; @@ -591,7 +602,7 @@ transition({P, Sock, Bin}, #transport{socket = Sock, = S) when P == ssl, true == B; P == tcp -> - recv(Bin, S); + recv(Bin, S#transport{active = false}); %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} @@ -618,8 +629,16 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, %% Outgoing message. transition({diameter, {send, Msg}}, #transport{} = S) -> - send(Msg, S), - ok; + message(send, Msg, S); + +%% Monitor has sent an outgoing message. +transition(Bin, S) + when is_binary(Bin) -> + message(ack, Bin, S); + +%% Deferred actions from a message_cb. +transition({actions, Dir, Acts}, S) -> + actions(Acts, Dir, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, @@ -699,12 +718,11 @@ tls(accept, Sock, Opts) -> %% using Nagle. %% Receive packets until a full message is received, -recv(Bin, #transport{parent = Pid, frag = Head} = S) -> +recv(Bin, #transport{frag = Head} = S) -> case rcv(Head, Bin) of {Msg, B} -> %% have a complete message ... - diameter_peer:recv(Pid, Msg), - recv(<<>>, S#transport{frag = B}); - Frag -> %% read more on the socket + message(recv, Msg, S#transport{frag = B}); + Frag -> %% read more on the socket start_fragment_timer(setopts(S#transport{frag = Frag, flush = false})) end. @@ -804,9 +822,8 @@ flush(#transport{flush = false} = S) -> start_fragment_timer(S#transport{flush = true}); %% No messages since last expiry. -flush(#transport{frag = Frag, parent = Pid} = S) -> - diameter_peer:recv(Pid, bin(Frag)), - S#transport{frag = <<>>}. +flush(#transport{frag = Frag} = S) -> + message(recv, bin(Frag), S#transport{frag = <<>>}). %% start_fragment_timer/1 %% @@ -839,23 +856,19 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(false, #transport{}) -> %% ack - ok; - -send(#diameter_packet{bin = Bin}, S) -> - send(Bin, S); - -send(Bin, #transport{socket = Sock, module = M, send = false}) -> - send1(M, Sock, Bin); +send(Bin, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) -> + send1(M, Sock, Bin), + B andalso (TPid ! Bin); -send(Bin, #monitor{socket = Sock, module = M}) -> - send1(M, Sock, Bin); +send(Bin, #transport{socket = Sock, module = M, send = false} = S) -> + send1(M, Sock, Bin), + message(ack, Bin, S); %% Send from the monitor process to avoid deadlock if both the %% receiver and the peer were to block in send. -send(Bin, #transport{send = Pid}) -> +send(Bin, #transport{send = Pid} = S) -> Pid ! Bin, - ok. + S. %% send1/3 @@ -866,7 +879,7 @@ send1(Mod, Sock, Bin) -> {error, Reason} -> x({send, Reason}) end. - + %% send/3 send(gen_tcp, Sock, Bin) -> @@ -888,12 +901,18 @@ setopts(M, Sock, Opts) -> %% setopts/1 setopts(#transport{socket = Sock, + active = A, + recv = B, module = M} - = S)-> + = S) + when B, not A -> case setopts(M, Sock, [{active, once}]) of - ok -> S; + ok -> S#transport{active = true}; X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect - end. + end; + +setopts(S) -> + S. %% portnr/2 @@ -928,3 +947,78 @@ getstat(gen_tcp, Sock) -> getstat(M, Sock) -> M:getstat(Sock). %% Note that ssl:getstat/1 doesn't yet exist in R15B01. + +%% A message_cb is invoked whenever a message is sent or received, or +%% to provide acknowledgement of a completed send or discarded +%% request. Ignoring possible extra arguments, calls are of the +%% following form. +%% +%% cb(recv, Bin) Pass a received message into diameter? +%% cb(send, Bin) Send a message? +%% cb(ack, Bin) Acknowledgement of a completed send. +%% cb(ack, false) Acknowledgement of a discarded request. +%% +%% Callbacks return a list of the following form. +%% +%% [boolean() | send | recv | binary()] +%% +%% The atoms are meaningless by themselves, but say whether subsequent +%% binaries are to be sent or received. A boolean says whether or not +%% to continue reading on the socket. Messages can be received even +%% after false is returned if these arrived in the same packet. A +%% leading recv or send is implicit on the corresponding callbacks. A +%% new callback can be returned as the tail of a returned list: any +%% value not of the aforementioned list type is interpreted as a +%% callback. + +%% message/3 + +message(send, false = M, S) -> + message(ack, M, S); + +message(ack, _, #transport{message_cb = false} = S) -> + S; + +message(Dir, #diameter_packet{bin = Bin}, S) -> + message(Dir, Bin, S); + +message(Dir, Bin, #transport{message_cb = CB} = S) -> + recv(<<>>, actions(cb(CB, Dir, Bin), Dir, S)). + +%% actions/3 + +actions([], _, S) -> + S; + +actions([B | As], Dir, S) + when is_boolean(B) -> + actions(As, Dir, S#transport{recv = B}); + +actions([Dir | As], _, S) + when Dir == send; + Dir == recv -> + actions(As, Dir, S); + +actions([Bin | As], send = Dir, #transport{} = S) + when is_binary(Bin) -> + actions(As, Dir, send(Bin, S)); + +actions([Bin | As], recv = Dir, #transport{parent = Pid} = S) + when is_binary(Bin) -> + diameter_peer:recv(Pid, Bin), + actions(As, Dir, S); + +actions([{defer, Tmo, Acts} | As], Dir, S) -> + erlang:send_after(Tmo, self(), {actions, Dir, Acts}), + actions(As, Dir, S); + +actions(CB, _, S) -> + S#transport{message_cb = CB}. + +%% cb/3 + +cb(false, _, Bin) -> + [Bin]; + +cb(CB, Dir, Msg) -> + diameter_lib:eval([CB, Dir, Msg]). |