aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_tcp.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/transport/diameter_tcp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl680
1 files changed, 360 insertions, 320 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 44abc5c3b4..a8639baa11 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -19,7 +19,6 @@
%%
-module(diameter_tcp).
--dialyzer({no_fail_call, throttle/2}).
-behaviour(gen_server).
@@ -53,6 +52,7 @@
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
+-define(TRANSPORT_KEY, transport).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
@@ -68,20 +68,26 @@
%% The same gen_server implementation supports three different kinds
%% of processes: an actual transport process, one that will club it to
%% death should the parent die before a connection is established, and
-%% a process owning the listening port.
+%% a process owning the listening port. The monitor process
+%% historically died after connection establishment, but can now live
+%% on as the sender of outgoing messages, so that a blocking send
+%% doesn't prevent messages from being received.
%% Listener process state.
-record(listener, {socket :: inet:socket(),
+ module :: module(),
service = false :: false | pid()}). %% service process
%% Monitor process state.
-record(monitor,
- {parent :: pid(),
- transport = self() :: pid()}).
+ {parent :: reference() | false | pid(),
+ transport = self() :: pid(),
+ ack = false :: boolean(),
+ socket :: inet:socket() | ssl:sslsocket() | undefined,
+ module :: module() | undefined}).
-type length() :: 0..16#FFFFFF. %% message length from Diameter header
--type size() :: non_neg_integer(). %% accumulated binary size
--type frag() :: {length(), size(), binary(), list(binary())}
+-type frag() :: maybe_improper_list(length(), binary())
| binary().
-type connect_option() :: {raddr, inet:ip_address()}
@@ -97,25 +103,30 @@
-type listen_option() :: {accept, match()}
| {ssl_options, true | [ssl:listen_option()]}
+ | option()
| ssl:listen_option()
| gen_tcp:listen_option().
-type option() :: {port, non_neg_integer()}
- | {fragment_timer, 0..16#FFFFFFFF}
- | {throttle_cb, diameter:evaluable()}.
+ | {sender, boolean()}
+ | sender
+ | {message_cb, false | diameter:eval()}
+ | {fragment_timer, 0..16#FFFFFFFF}.
%% Accepting/connecting transport process state.
-record(transport,
{socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket
+ active = false :: boolean(), %% is socket active?
+ recv = true :: boolean(), %% should it be active?
parent :: pid(), %% of process that started us
module :: module(), %% gen_tcp-like module
- frag = <<>> :: frag(), %% message fragment
ssl :: [term()] | boolean(), %% ssl options, ssl or not
+ frag = <<>> :: frag(), %% message fragment
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
- throttle_cb :: false | diameter:evaluable(), %% ask to receive
- throttled :: boolean() | binary()}). %% stopped receiving?
+ message_cb :: false | diameter:eval(),
+ send :: pid() | false}). %% sending process
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
@@ -131,19 +142,18 @@
-> {ok, pid(), [inet:ip_address()]}
when Ref :: diameter:transport_ref();
({connect, Ref}, #diameter_service{}, [connect_option()])
- -> {ok, pid(), [inet:ip_address()]}
- | {ok, pid()}
+ -> {ok, pid()}
when Ref :: diameter:transport_ref().
start({T, Ref}, Svc, Opts) ->
#diameter_service{capabilities = Caps,
- pid = SPid}
+ pid = SvcPid}
= Svc,
diameter_tcp_sup:start(), %% start tcp supervisors on demand
{Mod, Rest} = split(Opts),
Addrs = Caps#diameter_caps.host_ip_address,
- Arg = {T, Ref, Mod, self(), Rest, Addrs, SPid},
+ Arg = {T, Ref, Mod, self(), Rest, Addrs, SvcPid},
diameter_tcp_sup:start_child(Arg).
split([{module, M} | Opts]) ->
@@ -197,74 +207,63 @@ init(T) ->
%% i/1
%% A transport process.
-i({T, Ref, Mod, Pid, Opts, Addrs, SPid})
+i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid})
when T == accept;
T == connect ->
monitor(process, Pid),
%% Since accept/connect might block indefinitely, spawn a process
- %% that does nothing but kill us with the parent until call
- %% returns.
- {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
+ %% that kills us with the parent until call returns, and then
+ %% sends outgoing messages.
{[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
- fragment_timer,
- throttle_cb]),
+ sender,
+ message_cb,
+ fragment_timer]),
SslOpts = ssl_opts(SO),
OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
+ [CB, Sender] = [proplists:get_value(K, OwnOpts, false)
+ || K <- [message_cb, sender]],
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
- Throttle = proplists:get_value(throttle_cb, OwnOpts, false),
- Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid),
- MPid ! {stop, self()}, %% tell the monitor to die
+ {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
+ Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid),
M = if SslOpts -> ssl; true -> Mod end,
+ Sender andalso monitor(process, MPid),
+ false == CB orelse (Pid ! {diameter, ack}),
+ MPid ! {start, self(), Sender andalso {Sock, M}, false /= CB},
putr(?REF_KEY, Ref),
- throttle(#transport{parent = Pid,
- module = M,
- socket = Sock,
- ssl = SslOpts,
- timeout = Tmo,
- throttle_cb = Throttle,
- throttled = false /= Throttle});
+ setopts(#transport{parent = Pid,
+ module = M,
+ socket = Sock,
+ ssl = SslOpts,
+ message_cb = CB,
+ timeout = Tmo,
+ send = Sender andalso MPid});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
-i({T, _Ref, _Mod, _Pid, _Opts, _Addrs} = Arg) %% from old code
- when T == accept;
- T == connect ->
- i(erlang:append_element(Arg, _SPid = false));
-
%% A monitor process to kill the transport if the parent dies.
i(#monitor{parent = Pid, transport = TPid} = S) ->
+ putr(?TRANSPORT_KEY, TPid),
proc_lib:init_ack({ok, self()}),
- monitor(process, Pid),
monitor(process, TPid),
- S;
+ S#monitor{parent = monitor(process, Pid)};
%% In principle a link between the transport and killer processes
%% could do the same thing: have the accepting/connecting process be
%% killed when the killer process dies as a consequence of parent
%% death. However, a link can be unlinked and this is exactly what
-%% gen_tcp seems to so. Links should be left to supervisors.
-
-i({listen = L, Ref, _APid, T}) -> %% from old code
- i({L, Ref, T});
+%% gen_tcp seems to do. Links should be left to supervisors.
i({listen, Ref, {Mod, Opts, Addrs}}) ->
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {[LA, LP], Rest} = proplists:split(Opts, [ip, port]),
- LAddrOpt = get_addr(LA, Addrs),
- LPort = get_port(LP),
- {ok, LSock} = Mod:listen(LPort, gen_opts(LAddrOpt, Rest)),
- LAddr = laddr(LAddrOpt, Mod, LSock),
+ {[LP], Rest} = proplists:split(Opts, [port]),
+ {ok, LSock} = Mod:listen(get_port(LP), gen_opts(Addrs, Rest)),
+ {ok, {LAddr, _}} = sockname(Mod, LSock),
true = diameter_reg:add_new({?MODULE, listener, {Ref, {LAddr, LSock}}}),
proc_lib:init_ack({ok, self(), {LAddr, LSock}}),
- #listener{socket = LSock}.
-
-laddr([], Mod, Sock) ->
- {ok, {Addr, _Port}} = sockname(Mod, Sock),
- Addr;
-laddr([{ip, Addr}], _, _) ->
- Addr.
+ #listener{socket = LSock,
+ module = Mod}.
ssl_opts([]) ->
false;
@@ -279,19 +278,19 @@ ssl_opts(T) ->
%% init/8
%% Establish a TLS connection before capabilities exchange ...
-init(Type, Ref, Mod, Pid, true, Opts, Addrs, SPid) ->
- init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SPid);
+init(Type, Ref, Mod, Pid, true, Opts, Addrs, SvcPid) ->
+ init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SvcPid);
%% ... or not.
-init(Type, Ref, Mod, Pid, _, Opts, Addrs, SPid) ->
- init(Type, Ref, Mod, Pid, Opts, Addrs, SPid).
+init(Type, Ref, Mod, Pid, _, Opts, Addrs, SvcPid) ->
+ init(Type, Ref, Mod, Pid, Opts, Addrs, SvcPid).
%% init/7
-init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) ->
+init(accept = T, Ref, Mod, Pid, Opts, Addrs, SvcPid) ->
{[Matches], Rest} = proplists:split(Opts, [accept]),
{ok, LPid, {LAddr, LSock}} = listener(Ref, {Mod, Rest, Addrs}),
- ok = gen_server:call(LPid, {accept, SPid}, infinity),
+ ok = gen_server:call(LPid, {accept, SvcPid}, infinity),
proc_lib:init_ack({ok, self(), [LAddr]}),
Sock = ok(accept(Mod, LSock)),
ok = accept_peer(Mod, Sock, accept(Matches)),
@@ -299,25 +298,17 @@ init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) ->
diameter_peer:up(Pid),
Sock;
-init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SPid) ->
- {[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]),
- LAddrOpt = get_addr(LA, Addrs),
+init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SvcPid) ->
+ {[RA, RP], Rest} = proplists:split(Opts, [raddr, rport]),
RAddr = get_addr(RA),
RPort = get_port(RP),
- proc_lib:init_ack(init_rc(LAddrOpt)),
- Sock = ok(connect(Mod, RAddr, RPort, gen_opts(LAddrOpt, Rest))),
+ proc_lib:init_ack({ok, self()}),
+ Sock = ok(connect(Mod, RAddr, RPort, gen_opts(Addrs, Rest))),
publish(Mod, T, Ref, Sock),
- up(Pid, {RAddr, RPort}, LAddrOpt, Mod, Sock),
+ up(Pid, {RAddr, RPort}, Mod, Sock),
Sock.
-init_rc([{ip, Addr}]) ->
- {ok, self(), [Addr]};
-init_rc([]) ->
- {ok, self()}.
-
-up(Pid, Remote, [{ip, _Addr}], _, _) ->
- diameter_peer:up(Pid, Remote);
-up(Pid, Remote, [], Mod, Sock) ->
+up(Pid, Remote, Mod, Sock) ->
{Addr, _Port} = ok(sockname(Mod, Sock)),
diameter_peer:up(Pid, Remote, [Addr]).
@@ -374,25 +365,41 @@ l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
l([], Ref, T) ->
diameter_tcp_sup:start_child({listen, Ref, T}).
-%% get_addr/1
+%% addrs/2
+%%
+%% Take the first address from the service if several are specified
+%% and not address is configured.
+
+addrs(Addrs, Opts) ->
+ case lists:mapfoldr(fun ipaddr/2, [], Opts) of
+ {Os, [_]} ->
+ Os;
+ {_, []} ->
+ Opts ++ [{ip, A} || [A|_] <- [Addrs]];
+ {_, As} ->
+ ?ERROR({invalid_addrs, As, Addrs})
+ end.
-get_addr(As) ->
- diameter_lib:ipaddr(addr(As, [])).
+ipaddr({K,A}, As)
+ when K == ifaddr;
+ K == ip ->
+ {{ip, ipaddr(A)}, [A | As]};
+ipaddr(T, B) ->
+ {T, B}.
-%% get_addr/2
+ipaddr(A)
+ when A == loopback;
+ A == any ->
+ A;
+ipaddr(A) ->
+ diameter_lib:ipaddr(A).
-get_addr([], []) ->
- [];
-get_addr(As, Def) ->
- [{ip, diameter_lib:ipaddr(addr(As, Def))}].
+%% get_addr/1
-%% Take the first address from the service if several are unspecified.
-addr([], [Addr | _]) ->
- Addr;
-addr([{_, Addr}], _) ->
- Addr;
-addr(As, Addrs) ->
- ?ERROR({invalid_addrs, As, Addrs}).
+get_addr([{_, Addr}]) ->
+ diameter_lib:ipaddr(Addr);
+get_addr(Addrs) ->
+ ?ERROR({invalid_addrs, Addrs}).
%% get_port/1
@@ -405,10 +412,15 @@ get_port(Ps) ->
%% gen_opts/2
-gen_opts(LAddrOpt, Opts) ->
+gen_opts(Addrs, Opts) ->
+ gen_opts(addrs(Addrs, Opts)).
+
+%% gen_opts/1
+
+gen_opts(Opts) ->
{L,_} = proplists:split(Opts, [binary, packet, active]),
[[],[],[]] == L orelse ?ERROR({reserved_options, Opts}),
- [binary, {packet, 0}, {active, false}] ++ LAddrOpt ++ Opts.
+ [binary, {packet, 0}, {active, false} | Opts].
%% ---------------------------------------------------------------------------
%% # ports/1
@@ -451,14 +463,18 @@ portnr(Sock) ->
%% # handle_call/3
%% ---------------------------------------------------------------------------
-handle_call({accept, SPid}, _From, #listener{service = P} = S) ->
- {reply, ok, if not is_pid(P), is_pid(SPid) ->
- monitor(process, SPid),
- S#listener{service = SPid};
+handle_call({accept, SvcPid}, _From, #listener{service = P} = S) ->
+ {reply, ok, if not is_pid(P), is_pid(SvcPid) ->
+ monitor(process, SvcPid),
+ S#listener{service = SvcPid};
true ->
S
end};
-
+
+%% Transport is telling us of parent death.
+handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
+ {stop, {shutdown, Reason}, ok, S};
+
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -480,8 +496,7 @@ handle_info(T, #listener{} = S) ->
{noreply, #listener{} = l(T,S)};
handle_info(T, #monitor{} = S) ->
- m(T,S),
- x(T).
+ {noreply, #monitor{} = m(T,S)}.
%% ---------------------------------------------------------------------------
%% # code_change/3
@@ -497,6 +512,7 @@ code_change(_, State, _) ->
terminate(_, _) ->
ok.
+
%% ---------------------------------------------------------------------------
putr(Key, Val) ->
@@ -509,18 +525,47 @@ getr(Key) ->
%%
%% Transition monitor state.
+%% Outgoing message.
+m(Msg, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ send(Msg, S),
+ S;
+
+%% Transport has established a connection. Stop monitoring on the
+%% parent so as not to die before a send from the transport.
+m({start, TPid, T, Ack} = M, #monitor{transport = TPid} = S) ->
+ case T of
+ {Sock, Mod} ->
+ demonitor(S#monitor.parent, [flush]),
+ S#monitor{parent = false,
+ socket = Sock,
+ module = Mod,
+ ack = Ack};
+ false -> %% monitor not sending
+ x(M)
+ end;
+
%% Transport is telling us to die.
-m({stop, TPid}, #monitor{transport = TPid}) ->
- ok;
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
-%% Transport has died.
-m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) ->
- ok;
+%% Transport is telling us to die.
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
-%% Transport parent has died.
-m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid,
- transport = TPid}) ->
- exit(TPid, {shutdown, parent}).
+%% Transport is telling us that TLS has been negotiated after
+%% capabilities exchange.
+m({tls, SSock}, S) ->
+ S#monitor{socket = SSock,
+ module = ssl};
+
+%% Transport or parent has died.
+m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef,
+ transport = TPid})
+ when M == MRef;
+ P == TPid ->
+ x(T).
%% l/2
%%
@@ -528,18 +573,16 @@ m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid,
%% Service process has died.
l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid,
- socket = Sock}) ->
- gen_tcp:close(Sock),
+ socket = Sock,
+ module = M}) ->
+ M:close(Sock),
x(T);
%% Transport has been removed.
-l({transport, remove, _} = T, #listener{socket = Sock}) ->
- gen_tcp:close(Sock),
- x(T);
-
-%% Possibly death of an accepting process monitored in old code.
-l(_, S) ->
- S.
+l({transport, remove, _} = T, #listener{socket = Sock,
+ module = M}) ->
+ M:close(Sock),
+ x(T).
%% t/2
%%
@@ -557,21 +600,14 @@ t(T,S) ->
%% transition/2
-%% Incoming message.
+%% Incoming packets.
transition({P, Sock, Bin}, #transport{socket = Sock,
ssl = B,
- throttled = T}
+ frag = Frag}
= S)
when P == ssl, true == B;
P == tcp ->
- false = T, %% assert
- recv(Bin, S);
-
-%% Make a new throttling callback after a timeout.
-transition(throttle, #transport{throttled = false}) ->
- ok;
-transition(throttle, S) ->
- throttle(S);
+ recv(acc(Frag, Bin), S);
%% Capabilties exchange has decided on whether or not to run over TLS.
transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
@@ -581,7 +617,7 @@ transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
= NS
= tls_handshake(Type, B, S),
Pid ! {diameter, {tls, Ref}},
- throttle(NS#transport{ssl = B});
+ NS#transport{ssl = B};
transition({C, Sock}, #transport{socket = Sock,
ssl = B})
@@ -597,8 +633,18 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock,
?ERROR({T,S});
%% Outgoing message.
-transition({diameter, {send, Bin}}, S) ->
- send(Bin, S);
+transition({diameter, {send, Msg}}, #transport{} = S) ->
+ message(send, Msg, S);
+
+%% Monitor has sent an outgoing message.
+transition(Msg, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ message(ack, Msg, S);
+
+%% Deferred actions from a message_cb.
+transition({actions, Dir, Acts}, S) ->
+ setopts(actions(Acts, Dir, S));
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,
@@ -618,8 +664,18 @@ transition({resolve_port, Pid}, #transport{socket = Sock,
Pid ! portnr(M, Sock),
ok;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+%% Parent process has died: call the monitor to not close the socket
+%% during an ongoing send, but don't let it take forever.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
+ send = MPid}) ->
+ false == MPid
+ orelse (ok == gen_server:call(MPid, {stop, self()}, 1000))
+ orelse exit(MPid, {shutdown, parent}),
+ stop;
+
+%% Monitor process has died.
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop.
%% Crash on anything unexpected.
@@ -643,11 +699,13 @@ tls_handshake(_, true, #transport{ssl = false}) ->
%% Capabilities exchange negotiated TLS: upgrade the connection.
tls_handshake(Type, true, #transport{socket = Sock,
module = M,
- ssl = Opts}
+ ssl = Opts,
+ send = MPid}
= S) ->
{ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]),
Ref = getr(?REF_KEY),
true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}),
+ false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process
S#transport{socket = SSock,
module = ssl};
@@ -666,92 +724,77 @@ tls(accept, Sock, Opts) ->
%% using Nagle.
%% Receive packets until a full message is received,
-recv(Bin, #transport{frag = Head, throttled = false} = S) ->
- case rcv(Head, Bin) of
- {Msg, B} ->
- throttle(S#transport{frag = B, throttled = Msg});
- Frag ->
- setopts(S),
- start_fragment_timer(S#transport{frag = Frag,
- flush = false})
- end.
-%% recv/1
+recv({Msg, Rest}, S) -> %% have a complete message ...
+ recv(acc(Rest), message(recv, Msg, S));
-recv(#transport{throttled = false} = S) ->
- recv(<<>>, S);
+recv(Frag, #transport{recv = B,
+ socket = Sock,
+ module = M}
+ = S) -> %% or not
+ B andalso setopts(M, Sock),
+ start_fragment_timer(S#transport{frag = Frag,
+ flush = false,
+ active = B}).
-recv(#transport{} = S) ->
- S.
+%% acc/2
-%% rcv/2
+%% Know how many bytes to extract.
+acc([Len | Acc], Bin) ->
+ acc1(Len, <<Acc/binary, Bin/binary>>);
-%% No previous fragment.
-rcv(<<>>, Bin) ->
- rcv(Bin);
+%% Or not.
+acc(Head, Bin) ->
+ acc(<<Head/binary, Bin/binary>>).
-%% Not even the first four bytes of the header.
-rcv(Head, Bin)
- when is_binary(Head) ->
- rcv(<<Head/binary, Bin/binary>>);
-
-%% Or enough to know how many bytes to extract.
-rcv({Len, N, Head, Acc}, Bin) ->
- rcv(Len, N + size(Bin), Head, [Bin | Acc]).
-
-%% rcv/4
+%% acc1/3
%% Extract a message for which we have all bytes.
-rcv(Len, N, Head, Acc)
- when Len =< N ->
- recv1(Len, bin(Head, Acc));
+acc1(Len, Bin)
+ when Len =< byte_size(Bin) ->
+ split_binary(Bin, Len);
%% Wait for more packets.
-rcv(Len, N, Head, Acc) ->
- {Len, N, Head, Acc}.
-
-%% rcv/1
-
-%% Nothing left.
-rcv(<<>> = Bin) ->
- Bin;
-
-%% The Message Length isn't even sufficient for a header. Chances are
-%% things will go south from here but if we're lucky then the bytes we
-%% have extend to an intended message boundary and we can recover by
-%% simply receiving them. Make it so.
-rcv(<<_:1/binary, Len:24, _/binary>> = Bin)
- when Len < 20 ->
- {Bin, <<>>};
-
-%% Enough bytes to extract a message.
-rcv(<<_:1/binary, Len:24, _/binary>> = Bin)
- when Len =< size(Bin) ->
- recv1(Len, Bin);
-
-%% Or not: wait for more packets.
-rcv(<<_:1/binary, Len:24, _/binary>> = Head) ->
- {Len, size(Head), Head, []};
+acc1(Len, Bin) ->
+ [Len | Bin].
+
+%% acc/1
+
+%% Don't match on Bin since this results in it being copied at the
+%% next append according to the Efficiency Guide. This is also the
+%% reason that the Len is extracted and maintained when accumulating
+%% messages. The simplest implementation is just to accumulate a
+%% binary and match <<_, Len:24, _/binary>> each time the length is
+%% required, but the performance of this decays quadratically with the
+%% message length, since the binary is then copied with each append of
+%% additional bytes from gen_tcp.
+
+acc(Bin)
+ when 3 < byte_size(Bin) ->
+ {Head, _} = split_binary(Bin, 4),
+ [_,A,B,C] = binary_to_list(Head),
+ Len = (A bsl 16) bor (B bsl 8) bor C,
+ if Len < 20 ->
+ %% Message length isn't sufficient for a Diameter Header.
+ %% Chances are things will go south from here but if we're
+ %% lucky then the bytes we have extend to an intended
+ %% message boundary and we can recover by simply receiving
+ %% them. Make it so.
+ {Bin, <<>>};
+ true ->
+ acc1(Len, Bin)
+ end;
%% Not even 4 bytes yet.
-rcv(Head) ->
- Head.
-
-%% recv1/2
-
-recv1(Len, Bin) ->
- <<Msg:Len/binary, Rest/binary>> = Bin,
- {Msg, Rest}.
+acc(Bin) ->
+ Bin.
-%% bin/1-2
+%% bin/1
-bin(Head, Acc) ->
- list_to_binary([Head | lists:reverse(Acc)]).
+bin([_ | Bin]) ->
+ Bin;
-bin({_, _, Head, Acc}) ->
- bin(Head, Acc);
-bin(Bin)
- when is_binary(Bin) ->
+bin(Bin) ->
Bin.
%% flush/1
@@ -768,9 +811,7 @@ bin(Bin)
%% also eventually lead to watchdog failover.
%% No fragment to flush or not receiving messages.
-flush(#transport{frag = Frag, throttled = B} = S)
- when Frag == <<>>;
- B /= false ->
+flush(#transport{frag = <<>>} = S) ->
S;
%% Messages have been received since last timer expiry.
@@ -778,9 +819,8 @@ flush(#transport{flush = false} = S) ->
start_fragment_timer(S#transport{flush = true});
%% No messages since last expiry.
-flush(#transport{frag = Frag, parent = Pid} = S) ->
- diameter_peer:recv(Pid, bin(Frag)),
- S#transport{frag = <<>>}.
+flush(#transport{frag = Frag} = S) ->
+ message(recv, bin(Frag), S#transport{frag = <<>>}).
%% start_fragment_timer/1
%%
@@ -813,9 +853,27 @@ connect(Mod, Host, Port, Opts) ->
%% send/2
-send(Bin, #transport{socket = Sock,
- module = M}) ->
- case send(M, Sock, Bin) of
+send(Msg, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) ->
+ send1(M, Sock, Msg),
+ B andalso (TPid ! Msg);
+
+send(Msg, #transport{socket = Sock, module = M, send = false} = S) ->
+ send1(M, Sock, Msg),
+ message(ack, Msg, S);
+
+%% Send from the monitor process to avoid deadlock if both the
+%% receiver and the peer were to block in send.
+send(Msg, #transport{send = Pid} = S) ->
+ Pid ! Msg,
+ S.
+
+%% send1/3
+
+send1(Mod, Sock, #diameter_packet{bin = Bin}) ->
+ send1(Mod, Sock, Bin);
+
+send1(Mod, Sock, Bin) ->
+ case send(Mod, Sock, Bin) of
ok ->
ok;
{error, Reason} ->
@@ -842,119 +900,24 @@ setopts(M, Sock, Opts) ->
%% setopts/1
-setopts(#transport{socket = Sock, module = M}) ->
- setopts(M, Sock).
+setopts(#transport{socket = Sock,
+ active = A,
+ recv = B,
+ module = M}
+ = S)
+ when B, not A ->
+ setopts(M, Sock),
+ S#transport{active = true};
+
+setopts(S) ->
+ S.
%% setopts/2
setopts(M, Sock) ->
case setopts(M, Sock, [{active, once}]) of
ok -> ok;
- X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect
- end.
-
-%% throttle/1
-
-%% Still collecting packets for a complete message: keep receiving.
-throttle(#transport{throttled = false} = S) ->
- recv(S);
-
-%% Decide whether to receive another, or whether to accept a message
-%% that's been received.
-throttle(#transport{throttle_cb = F, throttled = T} = S) ->
- Res = cb(F, T),
-
- try throttle(Res, S) of
- #transport{ssl = SB} = NS when is_boolean(SB) ->
- throttle(defrag(NS));
- #transport{throttled = Msg} = NS when is_binary(Msg) ->
- %% Initial incoming message when we might need to upgrade
- %% to TLS: wait for reception of a tls tuple.
- defrag(NS)
- catch
- #transport{} = NS ->
- recv(NS)
- end.
-
-%% cb/2
-
-cb(false, _) ->
- ok;
-
-cb(F, B) ->
- diameter_lib:eval([F, true /= B andalso B]).
-
-%% throttle/2
-
-%% Callback says to receive another message.
-throttle(ok, #transport{throttled = true} = S) ->
- throw(S#transport{throttled = false});
-
-%% Callback says to accept a received message.
-throttle(ok, #transport{parent = Pid, throttled = Msg} = S)
- when is_binary(Msg) ->
- diameter_peer:recv(Pid, Msg),
- S;
-
-throttle({ok = T, F}, S) ->
- throttle(T, S#transport{throttle_cb = F});
-
-%% Callback says to accept a received message and acknowledged the
-%% returned pid with a {request, Pid} message if a request pid is
-%% spawned, a discard message otherwise. The latter does not mean that
-%% the message was necessarily discarded: it could have been an
-%% answer.
-throttle(NPid, #transport{parent = Pid, throttled = Msg} = S)
- when is_pid(NPid), is_binary(Msg) ->
- diameter_peer:recv(Pid, {Msg, NPid}),
- S;
-
-throttle({NPid, F}, #transport{throttled = Msg} = S)
- when is_pid(NPid), is_binary(Msg) ->
- throttle(NPid, S#transport{throttle_cb = F});
-
-%% Callback to accept a received message says to discard it.
-throttle(discard, #transport{throttled = Msg} = S)
- when is_binary(Msg) ->
- S;
-
-throttle({discard = T, F}, #transport{throttled = Msg} = S)
- when is_binary(Msg) ->
- throttle(T, S#transport{throttle_cb = F});
-
-%% Callback to accept a received message says to answer it with the
-%% supplied binary.
-throttle(Bin, #transport{throttled = Msg} = S)
- when is_binary(Bin), is_binary(Msg) ->
- send(Bin, S),
- S;
-
-throttle({Bin, F}, #transport{throttled = Msg} = S)
- when is_binary(Bin), is_binary(Msg) ->
- throttle(Bin, S#transport{throttle_cb = F});
-
-%% Callback says to ask again in the specified number of milliseconds.
-throttle({timeout, Tmo}, S) ->
- erlang:send_after(Tmo, self(), throttle),
- throw(S);
-
-throttle({timeout = T, Tmo, F}, S) ->
- throttle({T, Tmo}, S#transport{throttle_cb = F});
-
-throttle(T, #transport{throttle_cb = F}) ->
- ?ERROR({invalid_return, T, F}).
-
-%% defrag/1
-%%
-%% Try to extract another message from packets already read before
-%% another throttling callback.
-
-defrag(#transport{frag = Head} = S) ->
- case rcv(Head, <<>>) of
- {Msg, B} ->
- S#transport{throttled = Msg, frag = B};
- _ ->
- S#transport{throttled = true}
+ X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect
end.
%% portnr/2
@@ -990,3 +953,80 @@ getstat(gen_tcp, Sock) ->
getstat(M, Sock) ->
M:getstat(Sock).
%% Note that ssl:getstat/1 doesn't yet exist in R15B01.
+
+%% A message_cb is invoked whenever a message is sent or received, or
+%% to provide acknowledgement of a completed send or discarded
+%% request. Ignoring possible extra arguments, calls are of the
+%% following form.
+%%
+%% cb(recv, Msg) Receive a message into diameter?
+%% cb(send, Msg) Send a message on the socket?
+%% cb(ack, Msg) Acknowledgement of a completed send.
+%% cb(ack, false) Acknowledgement of a discarded request.
+%%
+%% Msg will be binary() in a recv callback, but can be a
+%% diameter_packet record in a send/ack callback if a recv/send
+%% callback returns a record. Callbacks return a list of the following
+%% form.
+%%
+%% [boolean() | send | recv | binary() | #diameter_packet{}]
+%%
+%% The atoms are meaningless by themselves, but say whether subsequent
+%% messages are to be sent or received. A boolean says whether or not
+%% to continue reading on the socket. Messages can be received even
+%% after false is returned if these arrived in the same packet. A
+%% leading recv or send is implicit on the corresponding callbacks. A
+%% new callback can be returned as the tail of a returned list: any
+%% value not of the aforementioned list type is interpreted as a
+%% callback.
+
+%% message/3
+
+message(send, false = M, S) ->
+ message(ack, M, S);
+
+message(ack, _, #transport{message_cb = false} = S) ->
+ S;
+
+message(Dir, Msg, #transport{message_cb = CB} = S) ->
+ setopts(actions(cb(CB, Dir, Msg), Dir, S)).
+
+%% actions/3
+
+actions([], _, S) ->
+ S;
+
+actions([B | As], Dir, S)
+ when is_boolean(B) ->
+ actions(As, Dir, S#transport{recv = B});
+
+actions([Dir | As], _, S)
+ when Dir == send;
+ Dir == recv ->
+ actions(As, Dir, S);
+
+actions([Msg | As], send = Dir, S)
+ when is_binary(Msg);
+ is_record(Msg, diameter_packet) ->
+ actions(As, Dir, send(Msg, S));
+
+actions([Msg | As], recv = Dir, #transport{parent = Pid} = S)
+ when is_binary(Msg);
+ is_record(Msg, diameter_packet) ->
+ diameter_peer:recv(Pid, Msg),
+ actions(As, Dir, S);
+
+actions([{defer, Tmo, Acts} | As], Dir, S) ->
+ erlang:send_after(Tmo, self(), {actions, Dir, Acts}),
+ actions(As, Dir, S);
+
+actions(CB, _, S) ->
+ S#transport{message_cb = CB}.
+
+%% cb/3
+
+cb(false, _, Msg) ->
+ [Msg];
+
+cb(CB, Dir, Msg) ->
+ diameter_lib:eval([CB, Dir, Msg]).