aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_tcp.erl
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2017-08-30 21:00:35 +0200
committerSverker Eriksson <[email protected]>2017-08-30 21:00:35 +0200
commit44a83c8860bbd00878c720a7b9d940b4630bab8a (patch)
tree101b3c52ec505a94f56c8f70e078ecb8a2e8c6cd /lib/diameter/src/transport/diameter_tcp.erl
parent7c67bbddb53c364086f66260701bc54a61c9659c (diff)
parent040bdce67f88d833bfb59adae130a4ffb4c180f0 (diff)
downloadotp-44a83c8860bbd00878c720a7b9d940b4630bab8a.tar.gz
otp-44a83c8860bbd00878c720a7b9d940b4630bab8a.tar.bz2
otp-44a83c8860bbd00878c720a7b9d940b4630bab8a.zip
Merge tag 'OTP-20.0' into sverker/20/binary_to_atom-utf8-crash/ERL-474/OTP-14590
Diffstat (limited to 'lib/diameter/src/transport/diameter_tcp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl533
1 files changed, 296 insertions, 237 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 546c2cfa5e..a2f393d5d4 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -19,7 +19,6 @@
%%
-module(diameter_tcp).
--dialyzer({no_fail_call, throttle/2}).
-behaviour(gen_server).
@@ -53,6 +52,7 @@
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
+-define(TRANSPORT_KEY, transport).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
@@ -68,19 +68,23 @@
%% The same gen_server implementation supports three different kinds
%% of processes: an actual transport process, one that will club it to
%% death should the parent die before a connection is established, and
-%% a process owning the listening port.
+%% a process owning the listening port. The monitor process
+%% historically died after connection establishment, but can now live
+%% on as the sender of outgoing messages, so that a blocking send
+%% doesn't prevent messages from being received.
%% Listener process state.
--record(listener, {socket :: inet:socket(),
- count = 1 :: non_neg_integer()}). %% accepting processes
-%% The count of accepting processes was previously used to terminate
-%% the listening process, but diameter_reg:subscribe/2 is now used for
-%% this. Leave the the count for trace purposes.
+-record(listener, {socket :: inet:socket(),
+ module :: module(),
+ service = false :: false | pid()}). %% service process
%% Monitor process state.
-record(monitor,
- {parent :: pid(),
- transport = self() :: pid()}).
+ {parent :: reference() | false | pid(),
+ transport = self() :: pid(),
+ ack = false :: boolean(),
+ socket :: inet:socket() | ssl:sslsocket() | undefined,
+ module :: module() | undefined}).
-type length() :: 0..16#FFFFFF. %% message length from Diameter header
-type size() :: non_neg_integer(). %% accumulated binary size
@@ -100,25 +104,30 @@
-type listen_option() :: {accept, match()}
| {ssl_options, true | [ssl:listen_option()]}
+ | option()
| ssl:listen_option()
| gen_tcp:listen_option().
-type option() :: {port, non_neg_integer()}
- | {fragment_timer, 0..16#FFFFFFFF}
- | {throttle_cb, diameter:evaluable()}.
+ | {sender, boolean()}
+ | sender
+ | {message_cb, false | diameter:evaluable()}
+ | {fragment_timer, 0..16#FFFFFFFF}.
%% Accepting/connecting transport process state.
-record(transport,
{socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket
+ active = false :: boolean(), %% is socket active?
+ recv = true :: boolean(), %% should it be active?
parent :: pid(), %% of process that started us
module :: module(), %% gen_tcp-like module
- frag = <<>> :: frag(), %% message fragment
ssl :: [term()] | boolean(), %% ssl options, ssl or not
+ frag = <<>> :: frag(), %% message fragment
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
- throttle_cb :: false | diameter:evaluable(), %% ask to receive
- throttled :: boolean() | binary()}). %% stopped receiving?
+ message_cb :: false | diameter:evaluable(),
+ send :: pid() | false}). %% sending process
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
@@ -138,11 +147,15 @@
| {ok, pid()}
when Ref :: diameter:transport_ref().
-start({T, Ref}, #diameter_service{capabilities = Caps}, Opts) ->
+start({T, Ref}, Svc, Opts) ->
+ #diameter_service{capabilities = Caps,
+ pid = SvcPid}
+ = Svc,
+
diameter_tcp_sup:start(), %% start tcp supervisors on demand
{Mod, Rest} = split(Opts),
Addrs = Caps#diameter_caps.host_ip_address,
- Arg = {T, Ref, Mod, self(), Rest, Addrs},
+ Arg = {T, Ref, Mod, self(), Rest, Addrs, SvcPid},
diameter_tcp_sup:start_child(Arg).
split([{module, M} | Opts]) ->
@@ -196,61 +209,65 @@ init(T) ->
%% i/1
%% A transport process.
-i({T, Ref, Mod, Pid, Opts, Addrs})
+i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid})
when T == accept;
T == connect ->
monitor(process, Pid),
%% Since accept/connect might block indefinitely, spawn a process
- %% that does nothing but kill us with the parent until call
- %% returns.
- {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
+ %% that kills us with the parent until call returns, and then
+ %% sends outgoing messages.
{[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
- fragment_timer,
- throttle_cb]),
+ sender,
+ message_cb,
+ fragment_timer]),
SslOpts = ssl_opts(SO),
OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
+ [CB, Sender] = [proplists:get_value(K, OwnOpts, false)
+ || K <- [message_cb, sender]],
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
- Throttle = proplists:get_value(throttle_cb, OwnOpts, false),
- Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs),
- MPid ! {stop, self()}, %% tell the monitor to die
+ {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
+ Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid),
M = if SslOpts -> ssl; true -> Mod end,
+ Sender andalso monitor(process, MPid),
+ false == CB orelse (Pid ! {diameter, ack}),
+ MPid ! {start, self(), Sender andalso {Sock, M}, false /= CB},
putr(?REF_KEY, Ref),
- throttle(#transport{parent = Pid,
- module = M,
- socket = Sock,
- ssl = SslOpts,
- timeout = Tmo,
- throttle_cb = Throttle,
- throttled = false /= Throttle});
+ setopts(#transport{parent = Pid,
+ module = M,
+ socket = Sock,
+ ssl = SslOpts,
+ message_cb = CB,
+ timeout = Tmo,
+ send = Sender andalso MPid});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
%% A monitor process to kill the transport if the parent dies.
i(#monitor{parent = Pid, transport = TPid} = S) ->
+ putr(?TRANSPORT_KEY, TPid),
proc_lib:init_ack({ok, self()}),
- monitor(process, Pid),
monitor(process, TPid),
- S;
+ S#monitor{parent = monitor(process, Pid)};
%% In principle a link between the transport and killer processes
%% could do the same thing: have the accepting/connecting process be
%% killed when the killer process dies as a consequence of parent
%% death. However, a link can be unlinked and this is exactly what
-%% gen_tcp seems to so. Links should be left to supervisors.
+%% gen_tcp seems to do. Links should be left to supervisors.
-i({listen, LRef, APid, {Mod, Opts, Addrs}}) ->
- [_] = diameter_config:subscribe(LRef, transport), %% assert existence
+i({listen, Ref, {Mod, Opts, Addrs}}) ->
+ [_] = diameter_config:subscribe(Ref, transport), %% assert existence
{[LA, LP], Rest} = proplists:split(Opts, [ip, port]),
LAddrOpt = get_addr(LA, Addrs),
LPort = get_port(LP),
{ok, LSock} = Mod:listen(LPort, gen_opts(LAddrOpt, Rest)),
LAddr = laddr(LAddrOpt, Mod, LSock),
- true = diameter_reg:add_new({?MODULE, listener, {LRef, {LAddr, LSock}}}),
+ true = diameter_reg:add_new({?MODULE, listener, {Ref, {LAddr, LSock}}}),
proc_lib:init_ack({ok, self(), {LAddr, LSock}}),
- monitor(process, APid),
- #listener{socket = LSock}.
+ #listener{socket = LSock,
+ module = Mod}.
laddr([], Mod, Sock) ->
{ok, {Addr, _Port}} = sockname(Mod, Sock),
@@ -268,21 +285,22 @@ ssl_opts([{ssl_options, Opts}])
ssl_opts(T) ->
?ERROR({ssl_options, T}).
-%% init/7
+%% init/8
%% Establish a TLS connection before capabilities exchange ...
-init(Type, Ref, Mod, Pid, true, Opts, Addrs) ->
- init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs);
+init(Type, Ref, Mod, Pid, true, Opts, Addrs, SvcPid) ->
+ init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SvcPid);
%% ... or not.
-init(Type, Ref, Mod, Pid, _, Opts, Addrs) ->
- init(Type, Ref, Mod, Pid, Opts, Addrs).
+init(Type, Ref, Mod, Pid, _, Opts, Addrs, SvcPid) ->
+ init(Type, Ref, Mod, Pid, Opts, Addrs, SvcPid).
-%% init/6
+%% init/7
-init(accept = T, Ref, Mod, Pid, Opts, Addrs) ->
+init(accept = T, Ref, Mod, Pid, Opts, Addrs, SvcPid) ->
{[Matches], Rest} = proplists:split(Opts, [accept]),
- {LAddr, LSock} = listener(Ref, {Mod, Rest, Addrs}),
+ {ok, LPid, {LAddr, LSock}} = listener(Ref, {Mod, Rest, Addrs}),
+ ok = gen_server:call(LPid, {accept, SvcPid}, infinity),
proc_lib:init_ack({ok, self(), [LAddr]}),
Sock = ok(accept(Mod, LSock)),
ok = accept_peer(Mod, Sock, accept(Matches)),
@@ -290,7 +308,7 @@ init(accept = T, Ref, Mod, Pid, Opts, Addrs) ->
diameter_peer:up(Pid),
Sock;
-init(connect = T, Ref, Mod, Pid, Opts, Addrs) ->
+init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SvcPid) ->
{[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]),
LAddrOpt = get_addr(LA, Addrs),
RAddr = get_addr(RA),
@@ -344,24 +362,26 @@ accept(Opts) ->
%% Accepting processes can be started concurrently: ensure only one
%% listener is started.
-listener(LRef, T) ->
- diameter_sync:call({?MODULE, listener, LRef},
- {?MODULE, listener, [{LRef, T, self()}]},
+listener(Ref, T) ->
+ diameter_sync:call({?MODULE, listener, Ref},
+ {?MODULE, listener, [{Ref, T, self()}]},
infinity,
infinity).
-listener({LRef, T, TPid}) ->
- l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T, TPid).
+%% listener/1
+
+listener({Ref, T, _TPid}) ->
+ l(diameter_reg:match({?MODULE, listener, {Ref, '_'}}), Ref, T).
+
+%% l/3
%% Existing listening process ...
-l([{{?MODULE, listener, {_, AS}}, LPid}], _, _, TPid) ->
- LPid ! {accept, TPid},
- AS;
+l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
+ {ok, LPid, AS};
%% ... or not.
-l([], LRef, T, TPid) ->
- {ok, _, AS} = diameter_tcp_sup:start_child({listen, LRef, TPid, T}),
- AS.
+l([], Ref, T) ->
+ diameter_tcp_sup:start_child({listen, Ref, T}).
%% get_addr/1
@@ -440,6 +460,18 @@ portnr(Sock) ->
%% # handle_call/3
%% ---------------------------------------------------------------------------
+handle_call({accept, SvcPid}, _From, #listener{service = P} = S) ->
+ {reply, ok, if not is_pid(P), is_pid(SvcPid) ->
+ monitor(process, SvcPid),
+ S#listener{service = SvcPid};
+ true ->
+ S
+ end};
+
+%% Transport is telling us of parent death.
+handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
+ {stop, {shutdown, Reason}, ok, S};
+
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -461,8 +493,7 @@ handle_info(T, #listener{} = S) ->
{noreply, #listener{} = l(T,S)};
handle_info(T, #monitor{} = S) ->
- m(T,S),
- x(T).
+ {noreply, #monitor{} = m(T,S)}.
%% ---------------------------------------------------------------------------
%% # code_change/3
@@ -478,6 +509,7 @@ code_change(_, State, _) ->
terminate(_, _) ->
ok.
+
%% ---------------------------------------------------------------------------
putr(Key, Val) ->
@@ -490,35 +522,63 @@ getr(Key) ->
%%
%% Transition monitor state.
-%% Transport is telling us to die.
-m({stop, TPid}, #monitor{transport = TPid}) ->
- ok;
+%% Outgoing message.
+m(Msg, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ send(Msg, S),
+ S;
-%% Transport has died.
-m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) ->
- ok;
+%% Transport has established a connection. Stop monitoring on the
+%% parent so as not to die before a send from the transport.
+m({start, TPid, T, Ack} = M, #monitor{transport = TPid} = S) ->
+ case T of
+ {Sock, Mod} ->
+ demonitor(S#monitor.parent, [flush]),
+ S#monitor{parent = false,
+ socket = Sock,
+ module = Mod,
+ ack = Ack};
+ false -> %% monitor not sending
+ x(M)
+ end;
-%% Transport parent has died.
-m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid,
- transport = TPid}) ->
- exit(TPid, {shutdown, parent}).
+%% Transport is telling us to die.
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
+
+%% Transport is telling us to die.
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
+
+%% Transport is telling us that TLS has been negotiated after
+%% capabilities exchange.
+m({tls, SSock}, S) ->
+ S#monitor{socket = SSock,
+ module = ssl};
+
+%% Transport or parent has died.
+m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef,
+ transport = TPid})
+ when M == MRef;
+ P == TPid ->
+ x(T).
%% l/2
%%
%% Transition listener state.
-%% An accepting transport is attaching.
-l({accept, TPid}, #listener{count = N} = S) ->
- monitor(process, TPid),
- S#listener{count = N+1};
-
-%% Accepting process has died.
-l({'DOWN', _, process, _, _}, #listener{count = N} = S) ->
- S#listener{count = N-1};
+%% Service process has died.
+l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid,
+ socket = Sock,
+ module = M}) ->
+ M:close(Sock),
+ x(T);
%% Transport has been removed.
-l({transport, remove, _} = T, #listener{socket = Sock}) ->
- gen_tcp:close(Sock),
+l({transport, remove, _} = T, #listener{socket = Sock,
+ module = M}) ->
+ M:close(Sock),
x(T).
%% t/2
@@ -537,21 +597,13 @@ t(T,S) ->
%% transition/2
-%% Incoming message.
+%% Incoming packets.
transition({P, Sock, Bin}, #transport{socket = Sock,
- ssl = B,
- throttled = T}
+ ssl = B}
= S)
when P == ssl, true == B;
P == tcp ->
- false = T, %% assert
- recv(Bin, S);
-
-%% Make a new throttling callback after a timeout.
-transition(throttle, #transport{throttled = false}) ->
- ok;
-transition(throttle, S) ->
- throttle(S);
+ recv(Bin, S#transport{active = false});
%% Capabilties exchange has decided on whether or not to run over TLS.
transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
@@ -561,7 +613,7 @@ transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
= NS
= tls_handshake(Type, B, S),
Pid ! {diameter, {tls, Ref}},
- throttle(NS#transport{ssl = B});
+ NS#transport{ssl = B};
transition({C, Sock}, #transport{socket = Sock,
ssl = B})
@@ -577,8 +629,18 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock,
?ERROR({T,S});
%% Outgoing message.
-transition({diameter, {send, Bin}}, S) ->
- send(Bin, S);
+transition({diameter, {send, Msg}}, #transport{} = S) ->
+ message(send, Msg, S);
+
+%% Monitor has sent an outgoing message.
+transition(Msg, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ message(ack, Msg, S);
+
+%% Deferred actions from a message_cb.
+transition({actions, Dir, Acts}, S) ->
+ actions(Acts, Dir, S);
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,
@@ -598,8 +660,18 @@ transition({resolve_port, Pid}, #transport{socket = Sock,
Pid ! portnr(M, Sock),
ok;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+%% Parent process has died: call the monitor to not close the socket
+%% during an ongoing send, but don't let it take forever.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
+ send = MPid}) ->
+ false == MPid
+ orelse (ok == gen_server:call(MPid, {stop, self()}, 1000))
+ orelse exit(MPid, {shutdown, parent}),
+ stop;
+
+%% Monitor process has died.
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop.
%% Crash on anything unexpected.
@@ -623,11 +695,13 @@ tls_handshake(_, true, #transport{ssl = false}) ->
%% Capabilities exchange negotiated TLS: upgrade the connection.
tls_handshake(Type, true, #transport{socket = Sock,
module = M,
- ssl = Opts}
+ ssl = Opts,
+ send = MPid}
= S) ->
{ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]),
Ref = getr(?REF_KEY),
true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}),
+ false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process
S#transport{socket = SSock,
module = ssl};
@@ -646,24 +720,15 @@ tls(accept, Sock, Opts) ->
%% using Nagle.
%% Receive packets until a full message is received,
-recv(Bin, #transport{frag = Head, throttled = false} = S) ->
+recv(Bin, #transport{frag = Head} = S) ->
case rcv(Head, Bin) of
- {Msg, B} ->
- throttle(S#transport{frag = B, throttled = Msg});
- Frag ->
- setopts(S),
- start_fragment_timer(S#transport{frag = Frag,
- flush = false})
+ {Msg, B} -> %% have a complete message ...
+ message(recv, Msg, S#transport{frag = B});
+ Frag -> %% read more on the socket
+ start_fragment_timer(setopts(S#transport{frag = Frag,
+ flush = false}))
end.
-%% recv/1
-
-recv(#transport{throttled = false} = S) ->
- recv(<<>>, S);
-
-recv(#transport{} = S) ->
- S.
-
%% rcv/2
%% No previous fragment.
@@ -723,13 +788,16 @@ recv1(Len, Bin) ->
<<Msg:Len/binary, Rest/binary>> = Bin,
{Msg, Rest}.
-%% bin/1-2
+%% bin/2
bin(Head, Acc) ->
list_to_binary([Head | lists:reverse(Acc)]).
+%% bin/1
+
bin({_, _, Head, Acc}) ->
bin(Head, Acc);
+
bin(Bin)
when is_binary(Bin) ->
Bin.
@@ -748,9 +816,7 @@ bin(Bin)
%% also eventually lead to watchdog failover.
%% No fragment to flush or not receiving messages.
-flush(#transport{frag = Frag, throttled = B} = S)
- when Frag == <<>>;
- B /= false ->
+flush(#transport{frag = <<>>} = S) ->
S;
%% Messages have been received since last timer expiry.
@@ -758,9 +824,8 @@ flush(#transport{flush = false} = S) ->
start_fragment_timer(S#transport{flush = true});
%% No messages since last expiry.
-flush(#transport{frag = Frag, parent = Pid} = S) ->
- diameter_peer:recv(Pid, bin(Frag)),
- S#transport{frag = <<>>}.
+flush(#transport{frag = Frag} = S) ->
+ message(recv, bin(Frag), S#transport{frag = <<>>}).
%% start_fragment_timer/1
%%
@@ -793,9 +858,27 @@ connect(Mod, Host, Port, Opts) ->
%% send/2
-send(Bin, #transport{socket = Sock,
- module = M}) ->
- case send(M, Sock, Bin) of
+send(Msg, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) ->
+ send1(M, Sock, Msg),
+ B andalso (TPid ! Msg);
+
+send(Msg, #transport{socket = Sock, module = M, send = false} = S) ->
+ send1(M, Sock, Msg),
+ message(ack, Msg, S);
+
+%% Send from the monitor process to avoid deadlock if both the
+%% receiver and the peer were to block in send.
+send(Msg, #transport{send = Pid} = S) ->
+ Pid ! Msg,
+ S.
+
+%% send1/3
+
+send1(Mod, Sock, #diameter_packet{bin = Bin}) ->
+ send1(Mod, Sock, Bin);
+
+send1(Mod, Sock, Bin) ->
+ case send(Mod, Sock, Bin) of
ok ->
ok;
{error, Reason} ->
@@ -822,120 +905,19 @@ setopts(M, Sock, Opts) ->
%% setopts/1
-setopts(#transport{socket = Sock, module = M}) ->
- setopts(M, Sock).
-
-%% setopts/2
-
-setopts(M, Sock) ->
+setopts(#transport{socket = Sock,
+ active = A,
+ recv = B,
+ module = M}
+ = S)
+ when B, not A ->
case setopts(M, Sock, [{active, once}]) of
- ok -> ok;
- X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect
- end.
-
-%% throttle/1
-
-%% Still collecting packets for a complete message: keep receiving.
-throttle(#transport{throttled = false} = S) ->
- recv(S);
-
-%% Decide whether to receive another, or whether to accept a message
-%% that's been received.
-throttle(#transport{throttle_cb = F, throttled = T} = S) ->
- Res = cb(F, T),
-
- try throttle(Res, S) of
- #transport{ssl = SB} = NS when is_boolean(SB) ->
- throttle(defrag(NS));
- #transport{throttled = Msg} = NS when is_binary(Msg) ->
- %% Initial incoming message when we might need to upgrade
- %% to TLS: wait for reception of a tls tuple.
- defrag(NS)
- catch
- #transport{} = NS ->
- recv(NS)
- end.
-
-%% cb/2
-
-cb(false, _) ->
- ok;
-
-cb(F, B) ->
- diameter_lib:eval([F, true /= B andalso B]).
-
-%% throttle/2
-
-%% Callback says to receive another message.
-throttle(ok, #transport{throttled = true} = S) ->
- throw(S#transport{throttled = false});
-
-%% Callback says to accept a received message.
-throttle(ok, #transport{parent = Pid, throttled = Msg} = S)
- when is_binary(Msg) ->
- diameter_peer:recv(Pid, Msg),
- S;
-
-throttle({ok = T, F}, S) ->
- throttle(T, S#transport{throttle_cb = F});
-
-%% Callback says to accept a received message and acknowledged the
-%% returned pid with a {request, Pid} message if a request pid is
-%% spawned, a discard message otherwise. The latter does not mean that
-%% the message was necessarily discarded: it could have been an
-%% answer.
-throttle(NPid, #transport{parent = Pid, throttled = Msg} = S)
- when is_pid(NPid), is_binary(Msg) ->
- diameter_peer:recv(Pid, {Msg, NPid}),
- S;
-
-throttle({NPid, F}, #transport{throttled = Msg} = S)
- when is_pid(NPid), is_binary(Msg) ->
- throttle(NPid, S#transport{throttle_cb = F});
-
-%% Callback to accept a received message says to discard it.
-throttle(discard, #transport{throttled = Msg} = S)
- when is_binary(Msg) ->
- S;
-
-throttle({discard = T, F}, #transport{throttled = Msg} = S)
- when is_binary(Msg) ->
- throttle(T, S#transport{throttle_cb = F});
-
-%% Callback to accept a received message says to answer it with the
-%% supplied binary.
-throttle(Bin, #transport{throttled = Msg} = S)
- when is_binary(Bin), is_binary(Msg) ->
- send(Bin, S),
- S;
-
-throttle({Bin, F}, #transport{throttled = Msg} = S)
- when is_binary(Bin), is_binary(Msg) ->
- throttle(Bin, S#transport{throttle_cb = F});
-
-%% Callback says to ask again in the specified number of milliseconds.
-throttle({timeout, Tmo}, S) ->
- erlang:send_after(Tmo, self(), throttle),
- throw(S);
-
-throttle({timeout = T, Tmo, F}, S) ->
- throttle({T, Tmo}, S#transport{throttle_cb = F});
-
-throttle(T, #transport{throttle_cb = F}) ->
- ?ERROR({invalid_return, T, F}).
-
-%% defrag/1
-%%
-%% Try to extract another message from packets already read before
-%% another throttling callback.
+ ok -> S#transport{active = true};
+ X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect
+ end;
-defrag(#transport{frag = Head} = S) ->
- case rcv(Head, <<>>) of
- {Msg, B} ->
- S#transport{throttled = Msg, frag = B};
- _ ->
- S#transport{throttled = true}
- end.
+setopts(S) ->
+ S.
%% portnr/2
@@ -970,3 +952,80 @@ getstat(gen_tcp, Sock) ->
getstat(M, Sock) ->
M:getstat(Sock).
%% Note that ssl:getstat/1 doesn't yet exist in R15B01.
+
+%% A message_cb is invoked whenever a message is sent or received, or
+%% to provide acknowledgement of a completed send or discarded
+%% request. Ignoring possible extra arguments, calls are of the
+%% following form.
+%%
+%% cb(recv, Msg) Receive a message into diameter?
+%% cb(send, Msg) Send a message on the socket?
+%% cb(ack, Msg) Acknowledgement of a completed send.
+%% cb(ack, false) Acknowledgement of a discarded request.
+%%
+%% Msg will be binary() in a recv callback, but can be a
+%% diameter_packet record in a send/ack callback if a recv/send
+%% callback returns a record. Callbacks return a list of the following
+%% form.
+%%
+%% [boolean() | send | recv | binary() | #diameter_packet{}]
+%%
+%% The atoms are meaningless by themselves, but say whether subsequent
+%% messages are to be sent or received. A boolean says whether or not
+%% to continue reading on the socket. Messages can be received even
+%% after false is returned if these arrived in the same packet. A
+%% leading recv or send is implicit on the corresponding callbacks. A
+%% new callback can be returned as the tail of a returned list: any
+%% value not of the aforementioned list type is interpreted as a
+%% callback.
+
+%% message/3
+
+message(send, false = M, S) ->
+ message(ack, M, S);
+
+message(ack, _, #transport{message_cb = false} = S) ->
+ S;
+
+message(Dir, Msg, #transport{message_cb = CB} = S) ->
+ recv(<<>>, actions(cb(CB, Dir, Msg), Dir, S)).
+
+%% actions/3
+
+actions([], _, S) ->
+ S;
+
+actions([B | As], Dir, S)
+ when is_boolean(B) ->
+ actions(As, Dir, S#transport{recv = B});
+
+actions([Dir | As], _, S)
+ when Dir == send;
+ Dir == recv ->
+ actions(As, Dir, S);
+
+actions([Msg | As], send = Dir, S)
+ when is_binary(Msg);
+ is_record(Msg, diameter_packet) ->
+ actions(As, Dir, send(Msg, S));
+
+actions([Msg | As], recv = Dir, #transport{parent = Pid} = S)
+ when is_binary(Msg);
+ is_record(Msg, diameter_packet) ->
+ diameter_peer:recv(Pid, Msg),
+ actions(As, Dir, S);
+
+actions([{defer, Tmo, Acts} | As], Dir, S) ->
+ erlang:send_after(Tmo, self(), {actions, Dir, Acts}),
+ actions(As, Dir, S);
+
+actions(CB, _, S) ->
+ S#transport{message_cb = CB}.
+
+%% cb/3
+
+cb(false, _, Msg) ->
+ [Msg];
+
+cb(CB, Dir, Msg) ->
+ diameter_lib:eval([CB, Dir, Msg]).