aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/transport')
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl251
1 files changed, 181 insertions, 70 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index c79d85820b..6a5e5fe89d 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
%%
-module(diameter_tcp).
+-dialyzer({no_fail_call, throttle/2}).
-behaviour(gen_server).
@@ -102,7 +103,8 @@
| gen_tcp:listen_option().
-type option() :: {port, non_neg_integer()}
- | {fragment_timer, 0..16#FFFFFFFF}.
+ | {fragment_timer, 0..16#FFFFFFFF}
+ | {throttle_cb, diameter:evaluable()}.
%% Accepting/connecting transport process state.
-record(transport,
@@ -110,10 +112,13 @@
parent :: pid(), %% of process that started us
module :: module(), %% gen_tcp-like module
frag = <<>> :: frag(), %% message fragment
- ssl :: boolean() | [term()], %% ssl options
+ ssl :: [term()] | boolean(), %% ssl options, ssl or not
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
- flush = false :: boolean()}). %% flush fragment at timeout?
+ flush = false :: boolean(), %% flush fragment at timeout?
+ throttle_cb :: false | diameter:evaluable(), %% ask to receive
+ throttled :: boolean() | binary()}). %% stopped receiving?
+
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
%% (for simplicity) transport option. The transport_module diameter_etcp
@@ -198,22 +203,27 @@ i({T, Ref, Mod, Pid, Opts, Addrs})
%% that does nothing but kill us with the parent until call
%% returns.
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
- {SslOpts, Rest0} = ssl(Opts),
- {OwnOpts, Rest} = own(Rest0),
+ {[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
+ fragment_timer,
+ throttle_cb]),
+ SslOpts = ssl_opts(SO),
+ OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
+ Throttle = proplists:get_value(throttle_cb, OwnOpts, false),
Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs),
MPid ! {stop, self()}, %% tell the monitor to die
M = if SslOpts -> ssl; true -> Mod end,
- setopts(M, Sock),
putr(?REF_KEY, Ref),
- #transport{parent = Pid,
- module = M,
- socket = Sock,
- ssl = SslOpts,
- timeout = Tmo};
+ throttle(#transport{parent = Pid,
+ module = M,
+ socket = Sock,
+ ssl = SslOpts,
+ timeout = Tmo,
+ throttle_cb = Throttle,
+ throttled = false /= Throttle});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
@@ -246,14 +256,6 @@ laddr([], Mod, Sock) ->
laddr([{ip, Addr}], _, _) ->
Addr.
-own(Opts) ->
- {[Own], Rest} = proplists:split(Opts, [fragment_timer]),
- {Own, Rest}.
-
-ssl(Opts) ->
- {[SslOpts], Rest} = proplists:split(Opts, [ssl_options]),
- {ssl_opts(SslOpts), Rest}.
-
ssl_opts([]) ->
false;
ssl_opts([{ssl_options, true}]) ->
@@ -261,8 +263,8 @@ ssl_opts([{ssl_options, true}]) ->
ssl_opts([{ssl_options, Opts}])
when is_list(Opts) ->
Opts;
-ssl_opts(L) ->
- ?ERROR({ssl_options, L}).
+ssl_opts(T) ->
+ ?ERROR({ssl_options, T}).
%% init/7
@@ -393,7 +395,7 @@ get_port(Ps) ->
gen_opts(LAddrOpt, Opts) ->
{L,_} = proplists:split(Opts, [binary, packet, active]),
[[],[],[]] == L orelse ?ERROR({reserved_options, Opts}),
- [binary, {packet, 0}, {active, once}] ++ LAddrOpt ++ Opts.
+ [binary, {packet, 0}, {active, false}] ++ LAddrOpt ++ Opts.
%% ---------------------------------------------------------------------------
%% # ports/1
@@ -536,53 +538,37 @@ t(T,S) ->
S;
#transport{} = NS ->
NS;
- {stop, Reason} ->
- x(Reason);
stop ->
x(T)
end.
%% transition/2
-%% Initial incoming message when we might need to upgrade to TLS:
-%% don't request another message until we know.
-
-transition({tcp, Sock, Bin}, #transport{socket = Sock,
- parent = Pid,
- frag = Head,
- module = M,
- ssl = Opts}
- = S)
- when is_list(Opts) ->
- case rcv(Head, Bin) of
- {Msg, B} when is_binary(Msg) ->
- diameter_peer:recv(Pid, Msg),
- S#transport{frag = B};
- Frag ->
- setopts(M, Sock),
- start_fragment_timer(S#transport{frag = Frag})
- end;
-
%% Incoming message.
transition({P, Sock, Bin}, #transport{socket = Sock,
- module = M,
- ssl = B}
+ ssl = B,
+ throttled = T}
= S)
- when P == tcp, not B;
- P == ssl, B ->
- setopts(M, Sock),
- start_fragment_timer(recv(Bin, S));
+ when P == ssl, true == B;
+ P == tcp ->
+ false = T, %% assert
+ recv(Bin, S);
+
+%% Make a new throttling callback after a timeout.
+transition(throttle, #transport{throttled = false}) ->
+ ok;
+transition(throttle, S) ->
+ throttle(S);
%% Capabilties exchange has decided on whether or not to run over TLS.
transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
= S) ->
- #transport{socket = Sock,
- module = M}
+ true = is_boolean(B), %% assert
+ #transport{}
= NS
= tls_handshake(Type, B, S),
Pid ! {diameter, {tls, Ref}},
- setopts(M, Sock),
- start_fragment_timer(NS#transport{ssl = B});
+ throttle(NS#transport{ssl = B});
transition({C, Sock}, #transport{socket = Sock,
ssl = B})
@@ -598,14 +584,8 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock,
?ERROR({T,S});
%% Outgoing message.
-transition({diameter, {send, Bin}}, #transport{socket = Sock,
- module = M}) ->
- case send(M, Sock, Bin) of
- ok ->
- ok;
- {error, Reason} ->
- {stop, {send, Reason}}
- end;
+transition({diameter, {send, Bin}}, S) ->
+ send(Bin, S);
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,
@@ -672,16 +652,25 @@ tls(accept, Sock, Opts) ->
%% Reassemble fragmented messages and extract multiple message sent
%% using Nagle.
-recv(Bin, #transport{parent = Pid, frag = Head} = S) ->
+%% Receive packets until a full message is received,
+recv(Bin, #transport{frag = Head, throttled = false} = S) ->
case rcv(Head, Bin) of
- {Msg, B} when is_binary(Msg) ->
- diameter_peer:recv(Pid, Msg),
- recv(B, S#transport{frag = <<>>});
+ {Msg, B} ->
+ throttle(S#transport{frag = B, throttled = Msg});
Frag ->
- S#transport{frag = Frag,
- flush = false}
+ setopts(S),
+ start_fragment_timer(S#transport{frag = Frag,
+ flush = false})
end.
+%% recv/1
+
+recv(#transport{throttled = false} = S) ->
+ recv(<<>>, S);
+
+recv(#transport{} = S) ->
+ S.
+
%% rcv/2
%% No previous fragment.
@@ -765,8 +754,10 @@ bin(Bin)
%% since all messages with length problems are discarded this should
%% also eventually lead to watchdog failover.
-%% No fragment to flush.
-flush(#transport{frag = <<>>} = S) ->
+%% No fragment to flush or not receiving messages.
+flush(#transport{frag = Frag, throttled = B} = S)
+ when Frag == <<>>;
+ B /= false ->
S;
%% Messages have been received since last timer expiry.
@@ -807,6 +798,17 @@ accept(Mod, LSock) ->
connect(Mod, Host, Port, Opts) ->
Mod:connect(Host, Port, Opts).
+%% send/2
+
+send(Bin, #transport{socket = Sock,
+ module = M}) ->
+ case send(M, Sock, Bin) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ x({send, Reason})
+ end.
+
%% send/3
send(gen_tcp, Sock, Bin) ->
@@ -825,6 +827,11 @@ setopts(ssl, Sock, Opts) ->
setopts(M, Sock, Opts) ->
M:setopts(Sock, Opts).
+%% setopts/1
+
+setopts(#transport{socket = Sock, module = M}) ->
+ setopts(M, Sock).
+
%% setopts/2
setopts(M, Sock) ->
@@ -833,6 +840,110 @@ setopts(M, Sock) ->
X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect
end.
+%% throttle/1
+
+%% Still collecting packets for a complete message: keep receiving.
+throttle(#transport{throttled = false} = S) ->
+ recv(S);
+
+%% Decide whether to receive another, or whether to accept a message
+%% that's been received.
+throttle(#transport{throttle_cb = F, throttled = T} = S) ->
+ Res = cb(F, T),
+
+ try throttle(Res, S) of
+ #transport{ssl = SB} = NS when is_boolean(SB) ->
+ throttle(defrag(NS));
+ #transport{throttled = Msg} = NS when is_binary(Msg) ->
+ %% Initial incoming message when we might need to upgrade
+ %% to TLS: wait for reception of a tls tuple.
+ defrag(NS)
+ catch
+ #transport{} = NS ->
+ recv(NS)
+ end.
+
+%% cb/2
+
+cb(false, _) ->
+ ok;
+
+cb(F, B) ->
+ diameter_lib:eval([F, true /= B andalso B]).
+
+%% throttle/2
+
+%% Callback says to receive another message.
+throttle(ok, #transport{throttled = true} = S) ->
+ throw(S#transport{throttled = false});
+
+%% Callback says to accept a received message.
+throttle(ok, #transport{parent = Pid, throttled = Msg} = S)
+ when is_binary(Msg) ->
+ diameter_peer:recv(Pid, Msg),
+ S;
+
+throttle({ok = T, F}, S) ->
+ throttle(T, S#transport{throttle_cb = F});
+
+%% Callback says to accept a received message and acknowledged the
+%% returned pid with a {request, Pid} message if a request pid is
+%% spawned, a discard message otherwise. The latter does not mean that
+%% the message was necessarily discarded: it could have been an
+%% answer.
+throttle(NPid, #transport{parent = Pid, throttled = Msg} = S)
+ when is_pid(NPid), is_binary(Msg) ->
+ diameter_peer:recv(Pid, {Msg, NPid}),
+ S;
+
+throttle({NPid, F}, #transport{throttled = Msg} = S)
+ when is_pid(NPid), is_binary(Msg) ->
+ throttle(NPid, S#transport{throttle_cb = F});
+
+%% Callback to accept a received message says to discard it.
+throttle(discard, #transport{throttled = Msg} = S)
+ when is_binary(Msg) ->
+ S;
+
+throttle({discard = T, F}, #transport{throttled = Msg} = S)
+ when is_binary(Msg) ->
+ throttle(T, S#transport{throttle_cb = F});
+
+%% Callback to accept a received message says to answer it with the
+%% supplied binary.
+throttle(Bin, #transport{throttled = Msg} = S)
+ when is_binary(Bin), is_binary(Msg) ->
+ send(Bin, S),
+ S;
+
+throttle({Bin, F}, #transport{throttled = Msg} = S)
+ when is_binary(Bin), is_binary(Msg) ->
+ throttle(Bin, S#transport{throttle_cb = F});
+
+%% Callback says to ask again in the specified number of milliseconds.
+throttle({timeout, Tmo}, S) ->
+ erlang:send_after(Tmo, self(), throttle),
+ throw(S);
+
+throttle({timeout = T, Tmo, F}, S) ->
+ throttle({T, Tmo}, S#transport{throttle_cb = F});
+
+throttle(T, #transport{throttle_cb = F}) ->
+ ?ERROR({invalid_return, T, F}).
+
+%% defrag/1
+%%
+%% Try to extract another message from packets already read before
+%% another throttling callback.
+
+defrag(#transport{frag = Head} = S) ->
+ case rcv(Head, <<>>) of
+ {Msg, B} ->
+ S#transport{throttled = Msg, frag = B};
+ _ ->
+ S#transport{throttled = true}
+ end.
+
%% portnr/2
portnr(gen_tcp, Sock) ->