diff options
Diffstat (limited to 'lib/diameter/src/transport')
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 251 |
1 files changed, 181 insertions, 70 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index c79d85820b..6a5e5fe89d 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ %% -module(diameter_tcp). +-dialyzer({no_fail_call, throttle/2}). -behaviour(gen_server). @@ -102,7 +103,8 @@ | gen_tcp:listen_option(). -type option() :: {port, non_neg_integer()} - | {fragment_timer, 0..16#FFFFFFFF}. + | {fragment_timer, 0..16#FFFFFFFF} + | {throttle_cb, diameter:evaluable()}. %% Accepting/connecting transport process state. -record(transport, @@ -110,10 +112,13 @@ parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module frag = <<>> :: frag(), %% message fragment - ssl :: boolean() | [term()], %% ssl options + ssl :: [term()] | boolean(), %% ssl options, ssl or not timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference - flush = false :: boolean()}). %% flush fragment at timeout? + flush = false :: boolean(), %% flush fragment at timeout? + throttle_cb :: false | diameter:evaluable(), %% ask to receive + throttled :: boolean() | binary()}). %% stopped receiving? + %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first %% (for simplicity) transport option. The transport_module diameter_etcp @@ -198,22 +203,27 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) %% that does nothing but kill us with the parent until call %% returns. {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), - {SslOpts, Rest0} = ssl(Opts), - {OwnOpts, Rest} = own(Rest0), + {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, + fragment_timer, + throttle_cb]), + SslOpts = ssl_opts(SO), + OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), + Throttle = proplists:get_value(throttle_cb, OwnOpts, false), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs), MPid ! {stop, self()}, %% tell the monitor to die M = if SslOpts -> ssl; true -> Mod end, - setopts(M, Sock), putr(?REF_KEY, Ref), - #transport{parent = Pid, - module = M, - socket = Sock, - ssl = SslOpts, - timeout = Tmo}; + throttle(#transport{parent = Pid, + module = M, + socket = Sock, + ssl = SslOpts, + timeout = Tmo, + throttle_cb = Throttle, + throttled = false /= Throttle}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -246,14 +256,6 @@ laddr([], Mod, Sock) -> laddr([{ip, Addr}], _, _) -> Addr. -own(Opts) -> - {[Own], Rest} = proplists:split(Opts, [fragment_timer]), - {Own, Rest}. - -ssl(Opts) -> - {[SslOpts], Rest} = proplists:split(Opts, [ssl_options]), - {ssl_opts(SslOpts), Rest}. - ssl_opts([]) -> false; ssl_opts([{ssl_options, true}]) -> @@ -261,8 +263,8 @@ ssl_opts([{ssl_options, true}]) -> ssl_opts([{ssl_options, Opts}]) when is_list(Opts) -> Opts; -ssl_opts(L) -> - ?ERROR({ssl_options, L}). +ssl_opts(T) -> + ?ERROR({ssl_options, T}). %% init/7 @@ -393,7 +395,7 @@ get_port(Ps) -> gen_opts(LAddrOpt, Opts) -> {L,_} = proplists:split(Opts, [binary, packet, active]), [[],[],[]] == L orelse ?ERROR({reserved_options, Opts}), - [binary, {packet, 0}, {active, once}] ++ LAddrOpt ++ Opts. + [binary, {packet, 0}, {active, false}] ++ LAddrOpt ++ Opts. %% --------------------------------------------------------------------------- %% # ports/1 @@ -536,53 +538,37 @@ t(T,S) -> S; #transport{} = NS -> NS; - {stop, Reason} -> - x(Reason); stop -> x(T) end. %% transition/2 -%% Initial incoming message when we might need to upgrade to TLS: -%% don't request another message until we know. - -transition({tcp, Sock, Bin}, #transport{socket = Sock, - parent = Pid, - frag = Head, - module = M, - ssl = Opts} - = S) - when is_list(Opts) -> - case rcv(Head, Bin) of - {Msg, B} when is_binary(Msg) -> - diameter_peer:recv(Pid, Msg), - S#transport{frag = B}; - Frag -> - setopts(M, Sock), - start_fragment_timer(S#transport{frag = Frag}) - end; - %% Incoming message. transition({P, Sock, Bin}, #transport{socket = Sock, - module = M, - ssl = B} + ssl = B, + throttled = T} = S) - when P == tcp, not B; - P == ssl, B -> - setopts(M, Sock), - start_fragment_timer(recv(Bin, S)); + when P == ssl, true == B; + P == tcp -> + false = T, %% assert + recv(Bin, S); + +%% Make a new throttling callback after a timeout. +transition(throttle, #transport{throttled = false}) -> + ok; +transition(throttle, S) -> + throttle(S); %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = S) -> - #transport{socket = Sock, - module = M} + true = is_boolean(B), %% assert + #transport{} = NS = tls_handshake(Type, B, S), Pid ! {diameter, {tls, Ref}}, - setopts(M, Sock), - start_fragment_timer(NS#transport{ssl = B}); + throttle(NS#transport{ssl = B}); transition({C, Sock}, #transport{socket = Sock, ssl = B}) @@ -598,14 +584,8 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, ?ERROR({T,S}); %% Outgoing message. -transition({diameter, {send, Bin}}, #transport{socket = Sock, - module = M}) -> - case send(M, Sock, Bin) of - ok -> - ok; - {error, Reason} -> - {stop, {send, Reason}} - end; +transition({diameter, {send, Bin}}, S) -> + send(Bin, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, @@ -672,16 +652,25 @@ tls(accept, Sock, Opts) -> %% Reassemble fragmented messages and extract multiple message sent %% using Nagle. -recv(Bin, #transport{parent = Pid, frag = Head} = S) -> +%% Receive packets until a full message is received, +recv(Bin, #transport{frag = Head, throttled = false} = S) -> case rcv(Head, Bin) of - {Msg, B} when is_binary(Msg) -> - diameter_peer:recv(Pid, Msg), - recv(B, S#transport{frag = <<>>}); + {Msg, B} -> + throttle(S#transport{frag = B, throttled = Msg}); Frag -> - S#transport{frag = Frag, - flush = false} + setopts(S), + start_fragment_timer(S#transport{frag = Frag, + flush = false}) end. +%% recv/1 + +recv(#transport{throttled = false} = S) -> + recv(<<>>, S); + +recv(#transport{} = S) -> + S. + %% rcv/2 %% No previous fragment. @@ -765,8 +754,10 @@ bin(Bin) %% since all messages with length problems are discarded this should %% also eventually lead to watchdog failover. -%% No fragment to flush. -flush(#transport{frag = <<>>} = S) -> +%% No fragment to flush or not receiving messages. +flush(#transport{frag = Frag, throttled = B} = S) + when Frag == <<>>; + B /= false -> S; %% Messages have been received since last timer expiry. @@ -807,6 +798,17 @@ accept(Mod, LSock) -> connect(Mod, Host, Port, Opts) -> Mod:connect(Host, Port, Opts). +%% send/2 + +send(Bin, #transport{socket = Sock, + module = M}) -> + case send(M, Sock, Bin) of + ok -> + ok; + {error, Reason} -> + x({send, Reason}) + end. + %% send/3 send(gen_tcp, Sock, Bin) -> @@ -825,6 +827,11 @@ setopts(ssl, Sock, Opts) -> setopts(M, Sock, Opts) -> M:setopts(Sock, Opts). +%% setopts/1 + +setopts(#transport{socket = Sock, module = M}) -> + setopts(M, Sock). + %% setopts/2 setopts(M, Sock) -> @@ -833,6 +840,110 @@ setopts(M, Sock) -> X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect end. +%% throttle/1 + +%% Still collecting packets for a complete message: keep receiving. +throttle(#transport{throttled = false} = S) -> + recv(S); + +%% Decide whether to receive another, or whether to accept a message +%% that's been received. +throttle(#transport{throttle_cb = F, throttled = T} = S) -> + Res = cb(F, T), + + try throttle(Res, S) of + #transport{ssl = SB} = NS when is_boolean(SB) -> + throttle(defrag(NS)); + #transport{throttled = Msg} = NS when is_binary(Msg) -> + %% Initial incoming message when we might need to upgrade + %% to TLS: wait for reception of a tls tuple. + defrag(NS) + catch + #transport{} = NS -> + recv(NS) + end. + +%% cb/2 + +cb(false, _) -> + ok; + +cb(F, B) -> + diameter_lib:eval([F, true /= B andalso B]). + +%% throttle/2 + +%% Callback says to receive another message. +throttle(ok, #transport{throttled = true} = S) -> + throw(S#transport{throttled = false}); + +%% Callback says to accept a received message. +throttle(ok, #transport{parent = Pid, throttled = Msg} = S) + when is_binary(Msg) -> + diameter_peer:recv(Pid, Msg), + S; + +throttle({ok = T, F}, S) -> + throttle(T, S#transport{throttle_cb = F}); + +%% Callback says to accept a received message and acknowledged the +%% returned pid with a {request, Pid} message if a request pid is +%% spawned, a discard message otherwise. The latter does not mean that +%% the message was necessarily discarded: it could have been an +%% answer. +throttle(NPid, #transport{parent = Pid, throttled = Msg} = S) + when is_pid(NPid), is_binary(Msg) -> + diameter_peer:recv(Pid, {Msg, NPid}), + S; + +throttle({NPid, F}, #transport{throttled = Msg} = S) + when is_pid(NPid), is_binary(Msg) -> + throttle(NPid, S#transport{throttle_cb = F}); + +%% Callback to accept a received message says to discard it. +throttle(discard, #transport{throttled = Msg} = S) + when is_binary(Msg) -> + S; + +throttle({discard = T, F}, #transport{throttled = Msg} = S) + when is_binary(Msg) -> + throttle(T, S#transport{throttle_cb = F}); + +%% Callback to accept a received message says to answer it with the +%% supplied binary. +throttle(Bin, #transport{throttled = Msg} = S) + when is_binary(Bin), is_binary(Msg) -> + send(Bin, S), + S; + +throttle({Bin, F}, #transport{throttled = Msg} = S) + when is_binary(Bin), is_binary(Msg) -> + throttle(Bin, S#transport{throttle_cb = F}); + +%% Callback says to ask again in the specified number of milliseconds. +throttle({timeout, Tmo}, S) -> + erlang:send_after(Tmo, self(), throttle), + throw(S); + +throttle({timeout = T, Tmo, F}, S) -> + throttle({T, Tmo}, S#transport{throttle_cb = F}); + +throttle(T, #transport{throttle_cb = F}) -> + ?ERROR({invalid_return, T, F}). + +%% defrag/1 +%% +%% Try to extract another message from packets already read before +%% another throttling callback. + +defrag(#transport{frag = Head} = S) -> + case rcv(Head, <<>>) of + {Msg, B} -> + S#transport{throttled = Msg, frag = B}; + _ -> + S#transport{throttled = true} + end. + %% portnr/2 portnr(gen_tcp, Sock) -> |