aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-03-11 21:11:48 +0100
committerAnders Svensson <[email protected]>2017-06-11 16:30:38 +0200
commitea339754d579b7e917dab0afa9a4694689f1f990 (patch)
tree3f26802ca88d4a8a009c5d660c68193c42e00641 /lib
parent9a90f20f365a90612ee5e5f7db14fd71b4875b84 (diff)
downloadotp-ea339754d579b7e917dab0afa9a4694689f1f990.tar.gz
otp-ea339754d579b7e917dab0afa9a4694689f1f990.tar.bz2
otp-ea339754d579b7e917dab0afa9a4694689f1f990.zip
Strip throttling callbacks from diameter_tcp
Commits starting at 472a080c added a throttle_cb option to diameter_tcp to let a callback apply backpressure when it decides that additional requests should not be read. It didn't provide a hook for knowing that an answer was sent however, which is needed when sends no longer take place in the receiver process, and is more complicated than it should be. Strip it all away, in preparation for a simpler incarnation.
Diffstat (limited to 'lib')
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl221
1 files changed, 51 insertions, 170 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index f745e2a6f7..c81701b624 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -19,7 +19,6 @@
%%
-module(diameter_tcp).
--dialyzer({no_fail_call, throttle/2}).
-behaviour(gen_server).
@@ -109,22 +108,19 @@
| gen_tcp:listen_option().
-type option() :: {port, non_neg_integer()}
- | {fragment_timer, 0..16#FFFFFFFF}
- | {throttle_cb, diameter:evaluable()}.
+ | {fragment_timer, 0..16#FFFFFFFF}.
%% Accepting/connecting transport process state.
-record(transport,
{socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket
parent :: pid(), %% of process that started us
module :: module(), %% gen_tcp-like module
- frag = <<>> :: frag(), %% message fragment
ssl :: [term()] | boolean(), %% ssl options, ssl or not
+ frag = <<>> :: frag(), %% message fragment
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
- throttle_cb :: false | diameter:evaluable(), %% ask to receive
- throttled :: boolean() | binary(), %% stopped receiving?
- monitor :: pid()}).
+ monitor :: pid()}). %% monitor/sender process
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
@@ -213,30 +209,26 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid})
%% Since accept/connect might block indefinitely, spawn a process
%% that kills us with the parent until call returns, and then
%% sends outgoing messages.
- {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
{[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
- fragment_timer,
- throttle_cb]),
+ fragment_timer]),
SslOpts = ssl_opts(SO),
OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
- Throttle = proplists:get_value(throttle_cb, OwnOpts, false),
+ {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid),
M = if SslOpts -> ssl; true -> Mod end,
monitor(process, MPid),
MPid ! {start, self(), Sock, M}, %% prepare monitor for sending
putr(?REF_KEY, Ref),
- throttle(#transport{parent = Pid,
- module = M,
- socket = Sock,
- ssl = SslOpts,
- timeout = Tmo,
- throttle_cb = Throttle,
- throttled = false /= Throttle,
- monitor = MPid});
+ setopts(#transport{parent = Pid,
+ module = M,
+ socket = Sock,
+ ssl = SslOpts,
+ timeout = Tmo,
+ monitor = MPid});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
@@ -518,7 +510,7 @@ getr(Key) ->
%% Transition monitor state.
%% Outgoing message.
-m(Bin, #monitor{} = S)
+m(Bin, S)
when is_binary(Bin) ->
send(Bin, S),
S;
@@ -539,7 +531,7 @@ m({stop, TPid} = T, #monitor{transport = TPid}) ->
%% Transport is telling us that TLS has been negotiated after
%% capabilities exchange.
-m({tls, SSock}, #monitor{} = S) ->
+m({tls, SSock}, S) ->
S#monitor{socket = SSock,
module = ssl};
@@ -583,22 +575,14 @@ t(T,S) ->
%% transition/2
-%% Incoming message.
+%% Incoming packets.
transition({P, Sock, Bin}, #transport{socket = Sock,
- ssl = B,
- throttled = T}
+ ssl = B}
= S)
when P == ssl, true == B;
P == tcp ->
- false = T, %% assert
recv(Bin, S);
-%% Make a new throttling callback after a timeout.
-transition(throttle, #transport{throttled = false}) ->
- ok;
-transition(throttle, S) ->
- throttle(S);
-
%% Capabilties exchange has decided on whether or not to run over TLS.
transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
= S) ->
@@ -607,7 +591,7 @@ transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
= NS
= tls_handshake(Type, B, S),
Pid ! {diameter, {tls, Ref}},
- throttle(NS#transport{ssl = B});
+ NS#transport{ssl = B};
transition({C, Sock}, #transport{socket = Sock,
ssl = B})
@@ -623,8 +607,8 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock,
?ERROR({T,S});
%% Outgoing message.
-transition({diameter, {send, Bin}}, #transport{} = S) ->
- send(Bin, S),
+transition({diameter, {send, Msg}}, #transport{} = S) ->
+ send(Msg, S),
ok;
%% Request to close the transport connection.
@@ -703,24 +687,16 @@ tls(accept, Sock, Opts) ->
%% using Nagle.
%% Receive packets until a full message is received,
-recv(Bin, #transport{frag = Head, throttled = false} = S) ->
+recv(Bin, #transport{parent = Pid, frag = Head} = S) ->
case rcv(Head, Bin) of
- {Msg, B} ->
- throttle(S#transport{frag = B, throttled = Msg});
- Frag ->
- setopts(S),
- start_fragment_timer(S#transport{frag = Frag,
- flush = false})
+ {Msg, B} -> %% have a complete message ...
+ diameter_peer:recv(Pid, Msg),
+ recv(<<>>, S#transport{frag = B});
+ Frag -> %% read more on the socket
+ start_fragment_timer(setopts(S#transport{frag = Frag,
+ flush = false}))
end.
-%% recv/1
-
-recv(#transport{throttled = false} = S) ->
- recv(<<>>, S);
-
-recv(#transport{} = S) ->
- S.
-
%% rcv/2
%% No previous fragment.
@@ -780,13 +756,16 @@ recv1(Len, Bin) ->
<<Msg:Len/binary, Rest/binary>> = Bin,
{Msg, Rest}.
-%% bin/1-2
+%% bin/2
bin(Head, Acc) ->
list_to_binary([Head | lists:reverse(Acc)]).
+%% bin/1
+
bin({_, _, Head, Acc}) ->
bin(Head, Acc);
+
bin(Bin)
when is_binary(Bin) ->
Bin.
@@ -805,9 +784,7 @@ bin(Bin)
%% also eventually lead to watchdog failover.
%% No fragment to flush or not receiving messages.
-flush(#transport{frag = Frag, throttled = B} = S)
- when Frag == <<>>;
- B /= false ->
+flush(#transport{frag = <<>>} = S) ->
S;
%% Messages have been received since last timer expiry.
@@ -850,14 +827,11 @@ connect(Mod, Host, Port, Opts) ->
%% send/2
-send(Bin, #monitor{socket = Sock,
- module = M}) ->
- case send(M, Sock, Bin) of
- ok ->
- ok;
- {error, Reason} ->
- x({send, Reason})
- end;
+send(#diameter_packet{bin = Bin}, S) ->
+ send(Bin, S);
+
+send(Bin, #monitor{} = S) ->
+ send1(Bin, S);
%% Send from the monitor process to avoid deadlock if both the
%% receiver and the peer were to block in send.
@@ -865,6 +839,17 @@ send(Bin, #transport{monitor = MPid}) ->
MPid ! Bin,
MPid.
+%% send1/2
+
+send1(Bin, #monitor{socket = Sock,
+ module = M}) ->
+ case send(M, Sock, Bin) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ x({send, Reason})
+ end.
+
%% send/3
send(gen_tcp, Sock, Bin) ->
@@ -885,116 +870,12 @@ setopts(M, Sock, Opts) ->
%% setopts/1
-setopts(#transport{socket = Sock, module = M}) ->
- setopts(M, Sock).
-
-%% setopts/2
-
-setopts(M, Sock) ->
+setopts(#transport{socket = Sock,
+ module = M}
+ = S)->
case setopts(M, Sock, [{active, once}]) of
- ok -> ok;
- X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect
- end.
-
-%% throttle/1
-
-%% Still collecting packets for a complete message: keep receiving.
-throttle(#transport{throttled = false} = S) ->
- recv(S);
-
-%% Decide whether to receive another, or whether to accept a message
-%% that's been received.
-throttle(#transport{throttle_cb = F, throttled = T} = S) ->
- Res = cb(F, T),
-
- try throttle(Res, S) of
- #transport{ssl = SB} = NS when is_boolean(SB) ->
- throttle(defrag(NS));
- #transport{throttled = Msg} = NS when is_binary(Msg) ->
- %% Initial incoming message when we might need to upgrade
- %% to TLS: wait for reception of a tls tuple.
- defrag(NS)
- catch
- #transport{} = NS ->
- recv(NS)
- end.
-
-%% cb/2
-
-cb(false, _) ->
- ok;
-
-cb(F, B) ->
- diameter_lib:eval([F, true /= B andalso B]).
-
-%% throttle/2
-
-%% Callback says to receive another message.
-throttle(ok, #transport{throttled = true} = S) ->
- throw(S#transport{throttled = false});
-
-%% Callback says to accept a received message.
-throttle(ok, #transport{parent = Pid, throttled = Msg} = S)
- when is_binary(Msg) ->
- diameter_peer:recv(Pid, Msg),
- S;
-
-throttle({ok = T, F}, S) ->
- throttle(T, S#transport{throttle_cb = F});
-
-%% Callback says to accept a received message and acknowledged the
-%% returned pid with a {request, Pid} message if a request pid is
-%% spawned, a discard message otherwise. The latter does not mean that
-%% the message was necessarily discarded: it could have been an
-%% answer.
-throttle(NPid, #transport{parent = Pid, throttled = Msg} = S)
- when is_pid(NPid), is_binary(Msg) ->
- diameter_peer:recv(Pid, {Msg, NPid}),
- S;
-
-throttle({NPid, F}, #transport{throttled = Msg} = S)
- when is_pid(NPid), is_binary(Msg) ->
- throttle(NPid, S#transport{throttle_cb = F});
-
-%% Callback to accept a received message says to discard it.
-throttle(discard, #transport{throttled = Msg} = S)
- when is_binary(Msg) ->
- S;
-throttle({discard = T, F}, #transport{throttled = Msg} = S)
- when is_binary(Msg) ->
- throttle(T, S#transport{throttle_cb = F});
-
-%% Callback to accept a received message says to answer it with the
-%% supplied binary.
-throttle(Bin, #transport{throttled = Msg} = S)
- when is_binary(Bin), is_binary(Msg) ->
- send(Bin, S),
- S;
-throttle({Bin, F}, #transport{throttled = Msg} = S)
- when is_binary(Bin), is_binary(Msg) ->
- throttle(Bin, S#transport{throttle_cb = F});
-
-%% Callback says to ask again in the specified number of milliseconds.
-throttle({timeout, Tmo}, S) ->
- erlang:send_after(Tmo, self(), throttle),
- throw(S);
-throttle({timeout = T, Tmo, F}, S) ->
- throttle({T, Tmo}, S#transport{throttle_cb = F});
-
-throttle(T, #transport{throttle_cb = F}) ->
- ?ERROR({invalid_return, T, F}).
-
-%% defrag/1
-%%
-%% Try to extract another message from packets already read before
-%% another throttling callback.
-
-defrag(#transport{frag = Head} = S) ->
- case rcv(Head, <<>>) of
- {Msg, B} ->
- S#transport{throttled = Msg, frag = B};
- _ ->
- S#transport{throttled = true}
+ ok -> S;
+ X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect
end.
%% portnr/2