diff options
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 121 |
1 files changed, 80 insertions, 41 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 0b26f429fb..51969a09c0 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -101,7 +101,8 @@ | gen_tcp:listen_option(). -type option() :: {port, non_neg_integer()} - | {fragment_timer, 0..16#FFFFFFFF}. + | {fragment_timer, 0..16#FFFFFFFF} + | {throttle_cb, diameter:evaluable()}. %% Accepting/connecting transport process state. -record(transport, @@ -112,7 +113,9 @@ ssl :: boolean() | [term()], %% ssl options timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference - flush = false :: boolean()}). %% flush fragment at timeout? + flush = false :: boolean(), %% flush fragment at timeout? + throttle_cb :: false | diameter:evaluable(), %% ask to receive + throttled = false :: boolean()}). %% stopped receiving? %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first %% (for simplicity) transport option. The transport_module diameter_etcp @@ -197,22 +200,26 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) %% that does nothing but kill us with the parent until call %% returns. {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), - {SslOpts, Rest0} = ssl(Opts), - {OwnOpts, Rest} = own(Rest0), + {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, + fragment_timer, + throttle_cb]), + SslOpts = ssl_opts(SO), + OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), + Throttle = proplists:get_value(throttle_cb, OwnOpts, false), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs), MPid ! {stop, self()}, %% tell the monitor to die M = if SslOpts -> ssl; true -> Mod end, - setopts(M, Sock), putr(?REF_KEY, Ref), - #transport{parent = Pid, - module = M, - socket = Sock, - ssl = SslOpts, - timeout = Tmo}; + throttle(#transport{parent = Pid, + module = M, + socket = Sock, + ssl = SslOpts, + timeout = Tmo, + throttle_cb = Throttle}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -245,14 +252,6 @@ laddr([], Mod, Sock) -> laddr([{ip, Addr}], _, _) -> Addr. -own(Opts) -> - {[Own], Rest} = proplists:split(Opts, [fragment_timer]), - {Own, Rest}. - -ssl(Opts) -> - {[SslOpts], Rest} = proplists:split(Opts, [ssl_options]), - {ssl_opts(SslOpts), Rest}. - ssl_opts([]) -> false; ssl_opts([{ssl_options, true}]) -> @@ -260,8 +259,8 @@ ssl_opts([{ssl_options, true}]) -> ssl_opts([{ssl_options, Opts}]) when is_list(Opts) -> Opts; -ssl_opts(L) -> - ?ERROR({ssl_options, L}). +ssl_opts(T) -> + ?ERROR({ssl_options, T}). %% init/7 @@ -392,7 +391,7 @@ get_port(Ps) -> gen_opts(LAddrOpt, Opts) -> {L,_} = proplists:split(Opts, [binary, packet, active]), [[],[],[]] == L orelse ?ERROR({reserved_options, Opts}), - [binary, {packet, 0}, {active, once}] ++ LAddrOpt ++ Opts. + [binary, {packet, 0}, {active, false}] ++ LAddrOpt ++ Opts. %% --------------------------------------------------------------------------- %% # ports/1 @@ -545,43 +544,44 @@ t(T,S) -> %% Initial incoming message when we might need to upgrade to TLS: %% don't request another message until we know. - transition({tcp, Sock, Bin}, #transport{socket = Sock, parent = Pid, frag = Head, - module = M, ssl = Opts} = S) when is_list(Opts) -> case rcv(Head, Bin) of - {Msg, B} when is_binary(Msg) -> + {Msg, B} -> diameter_peer:recv(Pid, Msg), S#transport{frag = B}; Frag -> - setopts(M, Sock), + setopts(S), start_fragment_timer(S#transport{frag = Frag}) end; %% Incoming message. transition({P, Sock, Bin}, #transport{socket = Sock, - module = M, - ssl = B} + ssl = B, + throttled = T} = S) when P == tcp, not B; P == ssl, B -> - setopts(M, Sock), - start_fragment_timer(recv(Bin, S)); + false = T, %% assert + recv(Bin, S); + +%% Check whether or not to read more after a throttle_cb timeout. +transition(throttle, #transport{throttled = B} = S) -> + true = B, %% assert + throttle(S); %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = S) -> - #transport{socket = Sock, - module = M} + #transport{} = NS = tls_handshake(Type, B, S), Pid ! {diameter, {tls, Ref}}, - setopts(M, Sock), - start_fragment_timer(NS#transport{ssl = B}); + throttle(NS#transport{ssl = B}); transition({C, Sock}, #transport{socket = Sock, ssl = B}) @@ -671,14 +671,17 @@ tls(accept, Sock, Opts) -> %% Reassemble fragmented messages and extract multiple message sent %% using Nagle. -recv(Bin, #transport{parent = Pid, frag = Head} = S) -> +%% Receive packets until a full message is received, then check +%% whether to keep receiving. +recv(Bin, #transport{parent = Pid, frag = Head, throttled = false} = S) -> case rcv(Head, Bin) of - {Msg, B} when is_binary(Msg) -> + {Msg, B} -> diameter_peer:recv(Pid, Msg), - recv(B, S#transport{frag = <<>>}); + throttle(S#transport{frag = B}); Frag -> - S#transport{frag = Frag, - flush = false} + setopts(S), + start_fragment_timer(S#transport{frag = Frag, + flush = false}) end. %% rcv/2 @@ -764,8 +767,10 @@ bin(Bin) %% since all messages with length problems are discarded this should %% also eventually lead to watchdog failover. -%% No fragment to flush. -flush(#transport{frag = <<>>} = S) -> +%% No fragment to flush or not receiving messages. +flush(#transport{frag = Frag, throttled = B} = S) + when Frag == <<>>; + B -> S; %% Messages have been received since last timer expiry. @@ -824,6 +829,11 @@ setopts(ssl, Sock, Opts) -> setopts(M, Sock, Opts) -> M:setopts(Sock, Opts). +%% setopts/1 + +setopts(#transport{socket = Sock, module = M}) -> + setopts(M, Sock). + %% setopts/2 setopts(M, Sock) -> @@ -832,6 +842,35 @@ setopts(M, Sock) -> X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect end. +%% throttle/1 + +throttle(#transport{throttle_cb = false} = S) -> + recv(<<>>, S); + +throttle(#transport{throttle_cb = F} = S) -> + throttle(diameter_lib:eval(F), S). + +%% throttle/2 + +%% Don't ask for more packets as long as there are previously received +%% messages to extract. +throttle(ok, S) -> + recv(<<>>, S#transport{throttled = false}); + +throttle({ok = T, F}, S) -> + throttle(T, S#transport{throttle_cb = F}); + +%% Ask again after the specified number of milliseconds. +throttle({timeout, Tmo}, #transport{} = S) -> + erlang:send_after(Tmo, self(), throttle), + S#transport{throttled = true}; + +throttle({timeout = T, Tmo, F}, S) -> + throttle({T, Tmo}, S#transport{throttle_cb = F}); + +throttle(T, #transport{throttle_cb = F}) -> + ?ERROR({invalid_return, T, F}). + %% portnr/2 portnr(gen_tcp, Sock) -> |