From 472a080ccf2f725e2f5277fa5feb76aaf9ce2e67 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 21 Feb 2016 22:42:55 +0100 Subject: Add diameter_tcp option throttle_cb To let a callback module decide whether or to receive another message from the peer, so that backpressure can be applied when it's inappropriate. This is to let a callback protect against reading more than can be processed, which is otherwise possible since diameter_tcp otherwise always asks for more. A callback is made after each message, and can answer to continue reading or to ask again after a timeout. It's each message instead of each packet partly for simplicity, but also since this should be sufficiently fine-grained. Per packet would require some interaction with the fragment timer that flushes partial messages that haven't been completely received. --- lib/diameter/src/transport/diameter_tcp.erl | 121 ++++++++++++++++++---------- 1 file changed, 80 insertions(+), 41 deletions(-) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 0b26f429fb..51969a09c0 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -101,7 +101,8 @@ | gen_tcp:listen_option(). -type option() :: {port, non_neg_integer()} - | {fragment_timer, 0..16#FFFFFFFF}. + | {fragment_timer, 0..16#FFFFFFFF} + | {throttle_cb, diameter:evaluable()}. %% Accepting/connecting transport process state. -record(transport, @@ -112,7 +113,9 @@ ssl :: boolean() | [term()], %% ssl options timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference - flush = false :: boolean()}). %% flush fragment at timeout? + flush = false :: boolean(), %% flush fragment at timeout? + throttle_cb :: false | diameter:evaluable(), %% ask to receive + throttled = false :: boolean()}). %% stopped receiving? %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first %% (for simplicity) transport option. The transport_module diameter_etcp @@ -197,22 +200,26 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) %% that does nothing but kill us with the parent until call %% returns. {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), - {SslOpts, Rest0} = ssl(Opts), - {OwnOpts, Rest} = own(Rest0), + {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, + fragment_timer, + throttle_cb]), + SslOpts = ssl_opts(SO), + OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), + Throttle = proplists:get_value(throttle_cb, OwnOpts, false), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs), MPid ! {stop, self()}, %% tell the monitor to die M = if SslOpts -> ssl; true -> Mod end, - setopts(M, Sock), putr(?REF_KEY, Ref), - #transport{parent = Pid, - module = M, - socket = Sock, - ssl = SslOpts, - timeout = Tmo}; + throttle(#transport{parent = Pid, + module = M, + socket = Sock, + ssl = SslOpts, + timeout = Tmo, + throttle_cb = Throttle}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -245,14 +252,6 @@ laddr([], Mod, Sock) -> laddr([{ip, Addr}], _, _) -> Addr. -own(Opts) -> - {[Own], Rest} = proplists:split(Opts, [fragment_timer]), - {Own, Rest}. - -ssl(Opts) -> - {[SslOpts], Rest} = proplists:split(Opts, [ssl_options]), - {ssl_opts(SslOpts), Rest}. - ssl_opts([]) -> false; ssl_opts([{ssl_options, true}]) -> @@ -260,8 +259,8 @@ ssl_opts([{ssl_options, true}]) -> ssl_opts([{ssl_options, Opts}]) when is_list(Opts) -> Opts; -ssl_opts(L) -> - ?ERROR({ssl_options, L}). +ssl_opts(T) -> + ?ERROR({ssl_options, T}). %% init/7 @@ -392,7 +391,7 @@ get_port(Ps) -> gen_opts(LAddrOpt, Opts) -> {L,_} = proplists:split(Opts, [binary, packet, active]), [[],[],[]] == L orelse ?ERROR({reserved_options, Opts}), - [binary, {packet, 0}, {active, once}] ++ LAddrOpt ++ Opts. + [binary, {packet, 0}, {active, false}] ++ LAddrOpt ++ Opts. %% --------------------------------------------------------------------------- %% # ports/1 @@ -545,43 +544,44 @@ t(T,S) -> %% Initial incoming message when we might need to upgrade to TLS: %% don't request another message until we know. - transition({tcp, Sock, Bin}, #transport{socket = Sock, parent = Pid, frag = Head, - module = M, ssl = Opts} = S) when is_list(Opts) -> case rcv(Head, Bin) of - {Msg, B} when is_binary(Msg) -> + {Msg, B} -> diameter_peer:recv(Pid, Msg), S#transport{frag = B}; Frag -> - setopts(M, Sock), + setopts(S), start_fragment_timer(S#transport{frag = Frag}) end; %% Incoming message. transition({P, Sock, Bin}, #transport{socket = Sock, - module = M, - ssl = B} + ssl = B, + throttled = T} = S) when P == tcp, not B; P == ssl, B -> - setopts(M, Sock), - start_fragment_timer(recv(Bin, S)); + false = T, %% assert + recv(Bin, S); + +%% Check whether or not to read more after a throttle_cb timeout. +transition(throttle, #transport{throttled = B} = S) -> + true = B, %% assert + throttle(S); %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = S) -> - #transport{socket = Sock, - module = M} + #transport{} = NS = tls_handshake(Type, B, S), Pid ! {diameter, {tls, Ref}}, - setopts(M, Sock), - start_fragment_timer(NS#transport{ssl = B}); + throttle(NS#transport{ssl = B}); transition({C, Sock}, #transport{socket = Sock, ssl = B}) @@ -671,14 +671,17 @@ tls(accept, Sock, Opts) -> %% Reassemble fragmented messages and extract multiple message sent %% using Nagle. -recv(Bin, #transport{parent = Pid, frag = Head} = S) -> +%% Receive packets until a full message is received, then check +%% whether to keep receiving. +recv(Bin, #transport{parent = Pid, frag = Head, throttled = false} = S) -> case rcv(Head, Bin) of - {Msg, B} when is_binary(Msg) -> + {Msg, B} -> diameter_peer:recv(Pid, Msg), - recv(B, S#transport{frag = <<>>}); + throttle(S#transport{frag = B}); Frag -> - S#transport{frag = Frag, - flush = false} + setopts(S), + start_fragment_timer(S#transport{frag = Frag, + flush = false}) end. %% rcv/2 @@ -764,8 +767,10 @@ bin(Bin) %% since all messages with length problems are discarded this should %% also eventually lead to watchdog failover. -%% No fragment to flush. -flush(#transport{frag = <<>>} = S) -> +%% No fragment to flush or not receiving messages. +flush(#transport{frag = Frag, throttled = B} = S) + when Frag == <<>>; + B -> S; %% Messages have been received since last timer expiry. @@ -824,6 +829,11 @@ setopts(ssl, Sock, Opts) -> setopts(M, Sock, Opts) -> M:setopts(Sock, Opts). +%% setopts/1 + +setopts(#transport{socket = Sock, module = M}) -> + setopts(M, Sock). + %% setopts/2 setopts(M, Sock) -> @@ -832,6 +842,35 @@ setopts(M, Sock) -> X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect end. +%% throttle/1 + +throttle(#transport{throttle_cb = false} = S) -> + recv(<<>>, S); + +throttle(#transport{throttle_cb = F} = S) -> + throttle(diameter_lib:eval(F), S). + +%% throttle/2 + +%% Don't ask for more packets as long as there are previously received +%% messages to extract. +throttle(ok, S) -> + recv(<<>>, S#transport{throttled = false}); + +throttle({ok = T, F}, S) -> + throttle(T, S#transport{throttle_cb = F}); + +%% Ask again after the specified number of milliseconds. +throttle({timeout, Tmo}, #transport{} = S) -> + erlang:send_after(Tmo, self(), throttle), + S#transport{throttled = true}; + +throttle({timeout = T, Tmo, F}, S) -> + throttle({T, Tmo}, S#transport{throttle_cb = F}); + +throttle(T, #transport{throttle_cb = F}) -> + ?ERROR({invalid_return, T, F}). + %% portnr/2 portnr(gen_tcp, Sock) -> -- cgit v1.2.3 From e7b286c95531595daa26b09edffbf2f081c5455a Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 13 Mar 2016 00:31:40 +0100 Subject: Make throttling callbacks on message reception The callback is now applied to the atom 'false' when asking if another message should be received on the socket, and to a received binary message after reception. Throttling on received messages makes it possible to distinguish between requests and answers. There is no callback on outgoing messages since these don't have to go through the transport process, even if they currently do. --- lib/diameter/src/transport/diameter_tcp.erl | 56 +++++++++++++++++++---------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 51969a09c0..91e8d26d36 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -115,7 +115,8 @@ tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? throttle_cb :: false | diameter:evaluable(), %% ask to receive - throttled = false :: boolean()}). %% stopped receiving? + throttled :: boolean() | binary()}). %% stopped receiving? + %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first %% (for simplicity) transport option. The transport_module diameter_etcp @@ -219,7 +220,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) socket = Sock, ssl = SslOpts, timeout = Tmo, - throttle_cb = Throttle}); + throttle_cb = Throttle, + throttled = false /= Throttle}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -543,7 +545,7 @@ t(T,S) -> %% transition/2 %% Initial incoming message when we might need to upgrade to TLS: -%% don't request another message until we know. +%% don't receive another message until we know. transition({tcp, Sock, Bin}, #transport{socket = Sock, parent = Pid, frag = Head, @@ -569,9 +571,9 @@ transition({P, Sock, Bin}, #transport{socket = Sock, false = T, %% assert recv(Bin, S); -%% Check whether or not to read more after a throttle_cb timeout. +%% Make a new throttling callback after a timeout. transition(throttle, #transport{throttled = B} = S) -> - true = B, %% assert + true = false /= B, %% assert throttle(S); %% Capabilties exchange has decided on whether or not to run over TLS. @@ -673,17 +675,21 @@ tls(accept, Sock, Opts) -> %% Receive packets until a full message is received, then check %% whether to keep receiving. -recv(Bin, #transport{parent = Pid, frag = Head, throttled = false} = S) -> +recv(Bin, #transport{frag = Head, throttled = false} = S) -> case rcv(Head, Bin) of {Msg, B} -> - diameter_peer:recv(Pid, Msg), - throttle(S#transport{frag = B}); + throttle(S#transport{frag = B, throttled = Msg}); Frag -> setopts(S), start_fragment_timer(S#transport{frag = Frag, flush = false}) end. +%% recv/1 + +recv(S) -> + recv(<<>>, S). + %% rcv/2 %% No previous fragment. @@ -770,7 +776,7 @@ bin(Bin) %% No fragment to flush or not receiving messages. flush(#transport{frag = Frag, throttled = B} = S) when Frag == <<>>; - B -> + B /= false -> S; %% Messages have been received since last timer expiry. @@ -844,26 +850,38 @@ setopts(M, Sock) -> %% throttle/1 -throttle(#transport{throttle_cb = false} = S) -> - recv(<<>>, S); +throttle(#transport{throttled = false} = S) -> + recv(S); + +throttle(#transport{throttle_cb = F, throttled = B} = S) -> + throttle(cb(F, B), S). -throttle(#transport{throttle_cb = F} = S) -> - throttle(diameter_lib:eval(F), S). +%% cb/2 + +cb(false, _) -> + ok; + +cb(F, B) -> + diameter_lib:eval([F, true /= B andalso B]). %% throttle/2 -%% Don't ask for more packets as long as there are previously received -%% messages to extract. -throttle(ok, S) -> - recv(<<>>, S#transport{throttled = false}); +%% Callback says to receive another message. +throttle(ok, #transport{throttled = true} = S) -> + recv(S#transport{throttled = false}); + +%% Callback says to accept a received message. +throttle(ok, #transport{parent = Pid, throttled = Msg} = S) -> + diameter_peer:recv(Pid, Msg), + throttle(S#transport{throttled = true}); throttle({ok = T, F}, S) -> throttle(T, S#transport{throttle_cb = F}); -%% Ask again after the specified number of milliseconds. +%% Callback says to ask again in the specified number of milliseconds. throttle({timeout, Tmo}, #transport{} = S) -> erlang:send_after(Tmo, self(), throttle), - S#transport{throttled = true}; + S; throttle({timeout = T, Tmo, F}, S) -> throttle({T, Tmo}, S#transport{throttle_cb = F}); -- cgit v1.2.3 From 2ffb288d8daeb72c27c5cead30ce779682bdd8b0 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 5 Mar 2016 16:18:52 +0100 Subject: Let throttling callback return a notification pid In addition to returning ok or {timeout, Tmo}, let a throttling callback for message reception return a pid(), which is then notified if the message in question is either discarded or results in a request process. Notification is by way of messages of the form {diameter, discard | {request, pid()}} where the pid is that of a request process resulting from the received message. This allows the notification process to keep track of the maximum number of request processes a peer connection can have given rise to. --- lib/diameter/src/base/diameter_peer_fsm.erl | 33 ++++++++++++--- lib/diameter/src/base/diameter_traffic.erl | 30 ++++++++++---- lib/diameter/src/base/diameter_watchdog.erl | 64 ++++++++++++++++++----------- lib/diameter/src/transport/diameter_tcp.erl | 14 +++++++ 4 files changed, 102 insertions(+), 39 deletions(-) diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index a9ee4940a3..83e0dda501 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -426,8 +426,8 @@ transition({connection_timeout, _}, _) -> ok; %% Incoming message from the transport. -transition({diameter, {recv, Pkt}}, S) -> - recv(Pkt, S); +transition({diameter, {recv, MsgT}}, S) -> + incoming(MsgT, S); %% Timeout when still in the same state ... transition({timeout = T, PS}, #state{state = PS}) -> @@ -553,6 +553,28 @@ encode(Rec, Dict) -> diameter_codec:encode(Dict, #diameter_packet{header = Hdr, msg = Rec}). +%% incoming/2 + +incoming({Msg, NPid}, S) -> + try recv(Msg, S) of + T -> + NPid ! {diameter, discard}, + T + catch + {?MODULE, Name, Pkt} -> + S#state.parent ! {recv, self(), Name, {Pkt, NPid}}, + rcv(Name, Pkt, S) + end; + +incoming(Msg, S) -> + try + recv(Msg, S) + catch + {?MODULE, Name, Pkt} -> + S#state.parent ! {recv, self(), Name, Pkt}, + rcv(Name, Pkt, S) + end. + %% recv/2 recv(#diameter_packet{header = #diameter_header{} = Hdr} @@ -607,9 +629,8 @@ recv1('DPA' = N, %% Any other message with a header and no length errors: send to the %% parent. -recv1(Name, Pkt, #state{parent = Pid} = S) -> - Pid ! {recv, self(), Name, Pkt}, - rcv(Name, Pkt, S). +recv1(Name, Pkt, #state{}) -> + throw({?MODULE, Name, Pkt}). %% recv/3 diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index 5d39c08213..fba4d3736b 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -232,7 +232,20 @@ pending(TPids) -> %% used to come through the service process but this avoids that %% becoming a bottleneck. -receive_message(TPid, Pkt, Dict0, RecvData) +receive_message(TPid, {Pkt, NPid}, Dict0, RecvData) -> + NPid ! {diameter, case incoming(TPid, Pkt, Dict0, RecvData) of + Pid when is_pid(Pid) -> + {request, Pid}; + _ -> + discard + end}; + +receive_message(TPid, Pkt, Dict0, RecvData) -> + incoming(TPid, Pkt, Dict0, RecvData). + +%% incoming/4 + +incoming(TPid, Pkt, Dict0, RecvData) when is_pid(TPid) -> #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, recv(R, @@ -246,7 +259,13 @@ receive_message(TPid, Pkt, Dict0, RecvData) %% Incoming request ... recv(true, false, TPid, Pkt, Dict0, T) -> - spawn_request(TPid, Pkt, Dict0, T); + try + spawn_request(TPid, Pkt, Dict0, T) + catch + error: system_limit = E -> %% discard + ?LOG(error, E), + {error, E} + end; %% ... answer to known request ... recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> @@ -275,12 +294,7 @@ spawn_request(TPid, Pkt, Dict0, RecvData) -> spawn_request(TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData). spawn_request(TPid, Pkt, Dict0, Opts, RecvData) -> - try - spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts) - catch - error: system_limit = E -> %% discard - ?LOG(error, E) - end. + spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts). %% --------------------------------------------------------------------------- %% recv_request/4 diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 26bca7a5bc..43a8a5223f 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -447,8 +447,14 @@ transition({'DOWN', _, process, TPid, _Reason} = D, end; %% Incoming message. -transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) -> - recv(Name, Pkt, S); +transition({recv, TPid, Name, PktT}, #watchdog{transport = TPid} = S) -> + try + incoming(Name, PktT, S) + catch + #watchdog{dictionary = Dict0, receive_data = T} = NS -> + diameter_traffic:receive_message(TPid, PktT, Dict0, T), + NS + end; %% Current watchdog has timed out. transition({timeout, TRef, tw}, #watchdog{tref = TRef} = S) -> @@ -576,22 +582,32 @@ send_watchdog(#watchdog{pending = false, %% Don't count encode errors since we don't expect any on DWR/DWA. +%% incoming/3 + +incoming(Name, {Pkt, NPid}, S) -> + NS = recv(Name, Pkt, S), + NPid ! {diameter, discard}, + NS; + +incoming(Name, Pkt, S) -> + recv(Name, Pkt, S). + %% recv/3 recv(Name, Pkt, S) -> - try rcv(Name, S) of + try rcv(Name, Pkt, rcv(Name, S)) of #watchdog{} = NS -> - rcv(Name, Pkt, S), - NS + throw(NS) catch - {?MODULE, throwaway, #watchdog{} = NS} -> + #watchdog{} = NS -> %% throwaway NS end. %% rcv/3 rcv('DWR', Pkt, #watchdog{transport = TPid, - dictionary = Dict0}) -> + dictionary = Dict0} + = S) -> ?LOG(recv, 'DWR'), DPkt = diameter_codec:decode(Dict0, Pkt), diameter_traffic:incr(recv, DPkt, TPid, Dict0), @@ -608,32 +624,30 @@ rcv('DWR', Pkt, #watchdog{transport = TPid, send(TPid, {send, #diameter_packet{header = H, transport_data = T, bin = Bin}}), - ?LOG(send, 'DWA'); + ?LOG(send, 'DWA'), + throw(S); rcv('DWA', Pkt, #watchdog{transport = TPid, - dictionary = Dict0}) -> + dictionary = Dict0} + = S) -> ?LOG(recv, 'DWA'), diameter_traffic:incr(recv, Pkt, TPid, Dict0), diameter_traffic:incr_rc(recv, diameter_codec:decode(Dict0, Pkt), TPid, - Dict0); + Dict0), + throw(S); -rcv(N, _, _) +rcv(N, _, S) when N == 'CER'; N == 'CEA'; N == 'DPR' -> - false; + throw(S); %% DPR can be sent explicitly with diameter:call/4. Only the %% corresponding DPAs arrive here. -rcv(_, Pkt, #watchdog{transport = TPid, - dictionary = Dict0, - receive_data = T}) -> - diameter_traffic:receive_message(TPid, Pkt, Dict0, T). - -throwaway(S) -> - throw({?MODULE, throwaway, S}). +rcv(_, _, S)-> + S. %% rcv/2 %% @@ -650,20 +664,20 @@ throwaway(S) -> %% INITIAL Receive non-DWA Throwaway() INITIAL rcv('DWA', #watchdog{status = initial} = S) -> - throwaway(S#watchdog{pending = false}); + throw(S#watchdog{pending = false}); rcv(_, #watchdog{status = initial} = S) -> - throwaway(S); + throw(S); %% DOWN Receive DWA Pending = FALSE %% Throwaway() DOWN %% DOWN Receive non-DWA Throwaway() DOWN rcv('DWA', #watchdog{status = down} = S) -> - throwaway(S#watchdog{pending = false}); + throw(S#watchdog{pending = false}); rcv(_, #watchdog{status = down} = S) -> - throwaway(S); + throw(S); %% OKAY Receive DWA Pending = FALSE %% SetWatchdog() OKAY @@ -719,7 +733,7 @@ rcv('DWR', #watchdog{status = reopen} = S) -> S; %% ensure DWA: the RFC isn't explicit about answering rcv(_, #watchdog{status = reopen} = S) -> - throwaway(S). + throw(S). %% timeout/1 %% diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 91e8d26d36..7f9255c097 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -878,6 +878,20 @@ throttle(ok, #transport{parent = Pid, throttled = Msg} = S) -> throttle({ok = T, F}, S) -> throttle(T, S#transport{throttle_cb = F}); +%% Callback says to accept a received message and acknowledged the +%% returned pid with a {request, Pid} message if a request pid is +%% spawned, a discard message otherwise. The latter does not mean that +%% the message was necessarily discarded: it could have been an +%% answer. +throttle(NPid, #transport{parent = Pid, throttled = Msg} = S) + when is_pid(NPid), is_binary(Msg) -> + diameter_peer:recv(Pid, {Msg, NPid}), + throttle(S#transport{throttled = true}); + +throttle({NPid, F}, #transport{throttled = Msg} = S) + when is_pid(NPid), is_binary(Msg) -> + throttle(NPid, S#transport{throttle_cb = F}); + %% Callback says to ask again in the specified number of milliseconds. throttle({timeout, Tmo}, #transport{} = S) -> erlang:send_after(Tmo, self(), throttle), -- cgit v1.2.3 From 993d540a3ad0f2fc932fdfb0aabba06cb03f320c Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 10 Mar 2016 17:43:50 +0100 Subject: Let a throttling callback discard a received message This can be used as a simple form of overload protection, discarding the message before it's passed into diameter to become one more request process in a flood. Replying with 3004 would be more appropriate when the request has been directed at a specific server (the RFC's requirement) however, and possibly it should be possible for a callback to do this as well. --- lib/diameter/src/transport/diameter_tcp.erl | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 7f9255c097..b68a16b521 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -892,6 +892,15 @@ throttle({NPid, F}, #transport{throttled = Msg} = S) when is_pid(NPid), is_binary(Msg) -> throttle(NPid, S#transport{throttle_cb = F}); +%% Callback to accept a received message says to discard it. +throttle(discard, #transport{throttled = Msg} = S) + when is_binary(Msg) -> + throttle(S#transport{throttled = true}); + +throttle({discard = T, F}, #transport{throttled = Msg} = S) + when is_binary(Msg) -> + throttle(T, S#transport{throttle_cb = F}); + %% Callback says to ask again in the specified number of milliseconds. throttle({timeout, Tmo}, #transport{} = S) -> erlang:send_after(Tmo, self(), throttle), -- cgit v1.2.3 From 9298872b8771dff87e732237e36fddc81bfbcbde Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 10 Mar 2016 18:38:53 +0100 Subject: Let a throttling callback answer a received message As discussed in the parent commit. This is easier said than done in practice, but there's no harm in allowing it. --- lib/diameter/src/transport/diameter_tcp.erl | 32 +++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index b68a16b521..d22b0ff320 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -599,14 +599,8 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, ?ERROR({T,S}); %% Outgoing message. -transition({diameter, {send, Bin}}, #transport{socket = Sock, - module = M}) -> - case send(M, Sock, Bin) of - ok -> - ok; - {error, Reason} -> - {stop, {send, Reason}} - end; +transition({diameter, {send, Bin}}, S) -> + send(Bin, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, @@ -817,6 +811,17 @@ accept(Mod, LSock) -> connect(Mod, Host, Port, Opts) -> Mod:connect(Host, Port, Opts). +%% send/2 + +send(Bin, #transport{socket = Sock, + module = M}) -> + case send(M, Sock, Bin) of + ok -> + ok; + {error, Reason} -> + x({send, Reason}) + end. + %% send/3 send(gen_tcp, Sock, Bin) -> @@ -901,6 +906,17 @@ throttle({discard = T, F}, #transport{throttled = Msg} = S) when is_binary(Msg) -> throttle(T, S#transport{throttle_cb = F}); +%% Callback to accept a received message says to answer it with the +%% supplied binary. +throttle(Bin, #transport{throttled = Msg} = S) + when is_binary(Bin), is_binary(Msg) -> + send(Bin, S), + throttle(S#transport{throttled = true}); + +throttle({Bin, F}, #transport{throttled = Msg} = S) + when is_binary(Bin), is_binary(Msg) -> + throttle(Bin, S#transport{throttle_cb = F}); + %% Callback says to ask again in the specified number of milliseconds. throttle({timeout, Tmo}, #transport{} = S) -> erlang:send_after(Tmo, self(), throttle), -- cgit v1.2.3 From eae5e8163f3b0784848ff0310ba3c8adca0bcb87 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 11 Mar 2016 06:07:19 +0100 Subject: Don't ask throttling callback to receive more unless needed TCP packets can contain more than one message, so only ask to receive another message if it hasn't already been received. --- lib/diameter/src/transport/diameter_tcp.erl | 42 ++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index d22b0ff320..e317922e7e 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -859,7 +859,17 @@ throttle(#transport{throttled = false} = S) -> recv(S); throttle(#transport{throttle_cb = F, throttled = B} = S) -> - throttle(cb(F, B), S). + Res = cb(F, B), + + try throttle(Res, S) of + #transport{} = NS -> + throttle(defrag(NS)) + catch + #transport{throttled = false} = NS -> + recv(NS); + #transport{} = NS -> + NS + end. %% cb/2 @@ -873,12 +883,13 @@ cb(F, B) -> %% Callback says to receive another message. throttle(ok, #transport{throttled = true} = S) -> - recv(S#transport{throttled = false}); + throw(S#transport{throttled = false}); %% Callback says to accept a received message. -throttle(ok, #transport{parent = Pid, throttled = Msg} = S) -> +throttle(ok, #transport{parent = Pid, throttled = Msg} = S) + when is_binary(Msg) -> diameter_peer:recv(Pid, Msg), - throttle(S#transport{throttled = true}); + S; throttle({ok = T, F}, S) -> throttle(T, S#transport{throttle_cb = F}); @@ -891,7 +902,7 @@ throttle({ok = T, F}, S) -> throttle(NPid, #transport{parent = Pid, throttled = Msg} = S) when is_pid(NPid), is_binary(Msg) -> diameter_peer:recv(Pid, {Msg, NPid}), - throttle(S#transport{throttled = true}); + S; throttle({NPid, F}, #transport{throttled = Msg} = S) when is_pid(NPid), is_binary(Msg) -> @@ -900,7 +911,7 @@ throttle({NPid, F}, #transport{throttled = Msg} = S) %% Callback to accept a received message says to discard it. throttle(discard, #transport{throttled = Msg} = S) when is_binary(Msg) -> - throttle(S#transport{throttled = true}); + S; throttle({discard = T, F}, #transport{throttled = Msg} = S) when is_binary(Msg) -> @@ -911,16 +922,16 @@ throttle({discard = T, F}, #transport{throttled = Msg} = S) throttle(Bin, #transport{throttled = Msg} = S) when is_binary(Bin), is_binary(Msg) -> send(Bin, S), - throttle(S#transport{throttled = true}); + S; throttle({Bin, F}, #transport{throttled = Msg} = S) when is_binary(Bin), is_binary(Msg) -> throttle(Bin, S#transport{throttle_cb = F}); %% Callback says to ask again in the specified number of milliseconds. -throttle({timeout, Tmo}, #transport{} = S) -> +throttle({timeout, Tmo}, S) -> erlang:send_after(Tmo, self(), throttle), - S; + throw(S); throttle({timeout = T, Tmo, F}, S) -> throttle({T, Tmo}, S#transport{throttle_cb = F}); @@ -928,6 +939,19 @@ throttle({timeout = T, Tmo, F}, S) -> throttle(T, #transport{throttle_cb = F}) -> ?ERROR({invalid_return, T, F}). +%% defrag/1 +%% +%% Try to extract another message from packets already read before +%% another throttling callback. + +defrag(#transport{frag = Head} = S) -> + case rcv(Head, <<>>) of + {Msg, B} -> + S#transport{throttled = Msg, frag = B}; + _ -> + S#transport{throttled = true} + end. + %% portnr/2 portnr(gen_tcp, Sock) -> -- cgit v1.2.3 From 8f9173b675015a1efc34466dace11bfd35a063bd Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 12 Mar 2016 08:51:37 +0100 Subject: Throttle properly with TLS In particular, let a callback decide when to receive the initial message. --- lib/diameter/src/transport/diameter_tcp.erl | 53 ++++++++++++----------------- 1 file changed, 22 insertions(+), 31 deletions(-) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index e317922e7e..01f5bb8e66 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -110,7 +110,7 @@ parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module frag = <<>> :: frag(), %% message fragment - ssl :: boolean() | [term()], %% ssl options + ssl :: [term()] | boolean(), %% ssl options, ssl or not timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? @@ -544,30 +544,13 @@ t(T,S) -> %% transition/2 -%% Initial incoming message when we might need to upgrade to TLS: -%% don't receive another message until we know. -transition({tcp, Sock, Bin}, #transport{socket = Sock, - parent = Pid, - frag = Head, - ssl = Opts} - = S) - when is_list(Opts) -> - case rcv(Head, Bin) of - {Msg, B} -> - diameter_peer:recv(Pid, Msg), - S#transport{frag = B}; - Frag -> - setopts(S), - start_fragment_timer(S#transport{frag = Frag}) - end; - %% Incoming message. transition({P, Sock, Bin}, #transport{socket = Sock, ssl = B, throttled = T} = S) - when P == tcp, not B; - P == ssl, B -> + when P == ssl, true == B; + P == tcp -> false = T, %% assert recv(Bin, S); @@ -579,6 +562,7 @@ transition(throttle, #transport{throttled = B} = S) -> %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = S) -> + true = is_boolean(B), %% assert #transport{} = NS = tls_handshake(Type, B, S), @@ -667,8 +651,7 @@ tls(accept, Sock, Opts) -> %% Reassemble fragmented messages and extract multiple message sent %% using Nagle. -%% Receive packets until a full message is received, then check -%% whether to keep receiving. +%% Receive packets until a full message is received, recv(Bin, #transport{frag = Head, throttled = false} = S) -> case rcv(Head, Bin) of {Msg, B} -> @@ -681,8 +664,11 @@ recv(Bin, #transport{frag = Head, throttled = false} = S) -> %% recv/1 -recv(S) -> - recv(<<>>, S). +recv(#transport{throttled = false} = S) -> + recv(<<>>, S); + +recv(#transport{} = S) -> + S. %% rcv/2 @@ -855,20 +841,25 @@ setopts(M, Sock) -> %% throttle/1 +%% Still collecting packets for a complete message: keep receiving. throttle(#transport{throttled = false} = S) -> recv(S); -throttle(#transport{throttle_cb = F, throttled = B} = S) -> - Res = cb(F, B), +%% Decide whether to receive another, or whether to accept a message +%% that's been received. +throttle(#transport{throttle_cb = F, throttled = T} = S) -> + Res = cb(F, T), try throttle(Res, S) of - #transport{} = NS -> - throttle(defrag(NS)) + #transport{ssl = SB} = NS when is_boolean(SB) -> + throttle(defrag(NS)); + #transport{throttled = Msg} = NS when is_binary(Msg) -> + %% Initial incoming message when we might need to upgrade + %% to TLS: wait for reception of a tls tuple. + defrag(NS) catch - #transport{throttled = false} = NS -> - recv(NS); #transport{} = NS -> - NS + recv(NS) end. %% cb/2 -- cgit v1.2.3 From c322099e7e7efeb01577e4c8efd52579beb90949 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 13 Mar 2016 07:26:30 +0100 Subject: Acknowledge answers to notification pids when throttling By sending {diameter, {answer, pid()}} when an incoming answer is sent to the specified pid, instead of a discard message as previously. The latter now literally means that the message has been discarded. --- lib/diameter/src/base/diameter_traffic.erl | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index fba4d3736b..724fa855d8 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -233,12 +233,7 @@ pending(TPids) -> %% becoming a bottleneck. receive_message(TPid, {Pkt, NPid}, Dict0, RecvData) -> - NPid ! {diameter, case incoming(TPid, Pkt, Dict0, RecvData) of - Pid when is_pid(Pid) -> - {request, Pid}; - _ -> - discard - end}; + NPid ! {diameter, incoming(TPid, Pkt, Dict0, RecvData)}; receive_message(TPid, Pkt, Dict0, RecvData) -> incoming(TPid, Pkt, Dict0, RecvData). @@ -260,16 +255,17 @@ incoming(TPid, Pkt, Dict0, RecvData) %% Incoming request ... recv(true, false, TPid, Pkt, Dict0, T) -> try - spawn_request(TPid, Pkt, Dict0, T) + {request, spawn_request(TPid, Pkt, Dict0, T)} catch error: system_limit = E -> %% discard ?LOG(error, E), - {error, E} + discard end; %% ... answer to known request ... recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> - Pid ! {answer, Ref, Req, Dict0, Pkt}; + Pid ! {answer, Ref, Req, Dict0, Pkt}, + {answer, Pid}; %% Note that failover could have happened prior to this message being %% received and triggering failback. That is, both a failover message @@ -284,7 +280,7 @@ recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> recv(false, false, TPid, Pkt, _, _) -> ?LOG(discarded, Pkt#diameter_packet.header), incr(TPid, {{unknown, 0}, recv, discarded}), - ok. + discard. %% spawn_request/4 -- cgit v1.2.3 From 14eb86d85c5f191d78013aaa45b7e1aabf04a937 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 17 Mar 2016 14:50:53 +0100 Subject: Let throttling callback send a throttle message That is, don't assume that it's only diameter_tcp doing so: allow it to be received when not throttling. This lets a callback module trigger a new throttling callback itself, but it's not clear if this will be useful in practice. --- lib/diameter/src/transport/diameter_tcp.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 01f5bb8e66..1622e1b3d4 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -555,8 +555,9 @@ transition({P, Sock, Bin}, #transport{socket = Sock, recv(Bin, S); %% Make a new throttling callback after a timeout. -transition(throttle, #transport{throttled = B} = S) -> - true = false /= B, %% assert +transition(throttle, #transport{throttled = false}) -> + ok; +transition(throttle, S) -> throttle(S); %% Capabilties exchange has decided on whether or not to run over TLS. -- cgit v1.2.3 From 9169589569cda14c1c142cc396ab612697972dea Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Tue, 3 May 2016 13:57:58 +0200 Subject: Remove dead case clause Orphaned in commit 9298872b. --- lib/diameter/src/transport/diameter_tcp.erl | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 1622e1b3d4..5c7c7cb700 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -536,8 +536,6 @@ t(T,S) -> S; #transport{} = NS -> NS; - {stop, Reason} -> - x(Reason); stop -> x(T) end. -- cgit v1.2.3 From 47092580a7ca6de581abf672e45a1c11e1d4561e Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 4 May 2016 19:14:27 +0200 Subject: Suppress dialyzer warning This one: diameter_tcp.erl:928: (call) The call diameter_tcp:throttle({'timeout',_},#transport{socket::port() | {'sslsocket',_,_},parent::pid(),module::atom(),frag::binary() | {non_neg_integer(),non_neg_integer(),binary(),[binary()]},ssl::boolean() | [any()],timeout::'infinity' | non_neg_integer(),tref::'false' | reference(),flush::boolean(),throttle_cb::'false' | fun() | maybe_improper_list(fun() | maybe_improper_list(any(),[any()]) | {atom(),atom(),[any()]},[any()]) | {atom(),atom(),[any()]},throttled::'true' | binary()}) will never return since it differs in the 1st argument from the success typing arguments: ('discard' | 'ok' | binary() | pid() | {'discard' | 'ok' | binary() | pid(),'false' | fun() | [fun() | [any()] | {atom(),atom(),[any()]}] | {atom(),atom(),[any()]}},#transport{socket::port() | {'sslsocket',_,_},parent::pid(),module::atom(),frag::binary() | {non_neg_integer(),non_neg_integer(),binary(),[binary()]},ssl::boolean() | [any()],timeout::'infinity' | non_neg_integer(),tref::'false' | reference(),flush::boolean(),throttle_cb::'false' | fun() | [fun() | [any()] | {atom(),atom(),[any()]}] | {atom(),atom(),[any()]},throttled::binary()}) It's true that the clause doesn't return, because of the throw, and that's the intention. --- lib/diameter/src/transport/diameter_tcp.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 5c7c7cb700..783dc9b1a8 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -18,6 +18,7 @@ %% -module(diameter_tcp). +-dialyzer({no_fail_call, throttle/2}). -behaviour(gen_server). -- cgit v1.2.3