diff options
Diffstat (limited to 'lib/diameter/src')
-rw-r--r-- | lib/diameter/src/base/diameter.erl | 29 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 57 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 24 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_traffic.erl | 30 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 64 | ||||
-rw-r--r-- | lib/diameter/src/diameter.appup.src | 10 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 251 |
7 files changed, 348 insertions, 117 deletions
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index de88f6befd..e8f2f63f86 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -36,6 +36,8 @@ %% Information. -export([services/0, + peer_info/1, + peer_find/1, service_info/2]). %% Start/stop the application. In a "real" application this should @@ -53,6 +55,7 @@ service_name/0, capability/0, peer_filter/0, + peer_ref/0, service_opt/0, application_opt/0, app_module/0, @@ -147,6 +150,27 @@ service_info(SvcName, Option) -> diameter_service:info(SvcName, Option). %% --------------------------------------------------------------------------- +%% peer_info/2 +%% --------------------------------------------------------------------------- + +-spec peer_info(peer_ref()) + -> [tuple()]. + +peer_info(PeerRef) -> + diameter_service:peer_info(PeerRef). + +%% --------------------------------------------------------------------------- +%% peer_find/1 +%% --------------------------------------------------------------------------- + +-spec peer_find(peer_ref() | pid()) + -> {peer_ref(), pid()} + | false. + +peer_find(Pid) -> + diameter_peer_fsm:find(Pid). + +%% --------------------------------------------------------------------------- %% add_transport/3 %% --------------------------------------------------------------------------- @@ -280,6 +304,9 @@ call(SvcName, App, Message) -> | {all, [peer_filter()]} | {any, [peer_filter()]}. +-opaque peer_ref() + :: pid(). + -type evaluable() :: {module(), atom(), list()} | fun() diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index fb874013a3..996e75a8d3 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -32,6 +32,9 @@ -export([start/3, result_code/2]). +%% Interface towards diameter. +-export([find/1]). + %% gen_server callbacks -export([init/1, handle_call/3, @@ -185,6 +188,25 @@ start_link(T) -> infinity, diameter_lib:spawn_opts(server, [])). +%% find/1 +%% +%% Identify both pids of a peer_fsm/transport pair. + +find(Pid) -> + findl([{?MODULE, '_', Pid}, {?MODULE, Pid, '_'}]). + +findl([]) -> + false; + +findl([Pat | Rest]) -> + try + [{{_, Pid, TPid}, Pid}] = diameter_reg:match(Pat), + {Pid, TPid} + catch + error:_ -> + findl(Rest) + end. + %% --------------------------------------------------------------------------- %% --------------------------------------------------------------------------- @@ -215,6 +237,8 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> {TPid, Addrs} = start_transport(T, Rest, Svc), + diameter_reg:add({?MODULE, self(), TPid}), %% lets pairs be discovered + #state{state = {'Wait-Conn-Ack', Tmo}, parent = WPid, transport = TPid, @@ -416,8 +440,8 @@ transition({connection_timeout, _}, _) -> ok; %% Incoming message from the transport. -transition({diameter, {recv, Pkt}}, S) -> - recv(Pkt, S); +transition({diameter, {recv, MsgT}}, S) -> + incoming(MsgT, S); %% Timeout when still in the same state ... transition({timeout = T, PS}, #state{state = PS}) -> @@ -543,6 +567,28 @@ encode(Rec, Dict) -> diameter_codec:encode(Dict, #diameter_packet{header = Hdr, msg = Rec}). +%% incoming/2 + +incoming({Msg, NPid}, S) -> + try recv(Msg, S) of + T -> + NPid ! {diameter, discard}, + T + catch + {?MODULE, Name, Pkt} -> + S#state.parent ! {recv, self(), Name, {Pkt, NPid}}, + rcv(Name, Pkt, S) + end; + +incoming(Msg, S) -> + try + recv(Msg, S) + catch + {?MODULE, Name, Pkt} -> + S#state.parent ! {recv, self(), Name, Pkt}, + rcv(Name, Pkt, S) + end. + %% recv/2 recv(#diameter_packet{header = #diameter_header{} = Hdr} @@ -597,9 +643,8 @@ recv1('DPA' = N, %% Any other message with a header and no length errors: send to the %% parent. -recv1(Name, Pkt, #state{parent = Pid} = S) -> - Pid ! {recv, self(), Name, Pkt}, - rcv(Name, Pkt, S). +recv1(Name, Pkt, #state{}) -> + throw({?MODULE, Name, Pkt}). %% recv/3 diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index efa4cc9108..cfb5cb5b82 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -32,6 +32,7 @@ -export([subscribe/1, unsubscribe/1, services/0, + peer_info/1, info/2]). %% towards diameter_config @@ -218,6 +219,29 @@ lookup_state(SvcName) -> end. %% --------------------------------------------------------------------------- +%% # peer_info/2 +%% --------------------------------------------------------------------------- + +%% An extended version of info_peer/1 for peer_info/1. +peer_info(Pid) -> + try + {_, PD} = process_info(Pid, dictionary), + {_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD), + {TPid, {{Type, Ref}, TMod, Cfg}} = T, + {_, TD} = process_info(TPid, dictionary), + {_, Data} = lists:keyfind({TMod, info}, 1, TD), + [{ref, Ref}, + {type, Type}, + {owner, TPid}, + {module, TMod}, + {config, Cfg} + | try TMod:info(Data) catch _:_ -> [] end] + catch + error:_ -> + [] + end. + +%% --------------------------------------------------------------------------- %% # subscribe/1 %% # unsubscribe/1 %% --------------------------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index c169d3fc2c..2112941d5e 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -230,7 +230,15 @@ pending(TPids) -> %% used to come through the service process but this avoids that %% becoming a bottleneck. -receive_message(TPid, Pkt, Dict0, RecvData) +receive_message(TPid, {Pkt, NPid}, Dict0, RecvData) -> + NPid ! {diameter, incoming(TPid, Pkt, Dict0, RecvData)}; + +receive_message(TPid, Pkt, Dict0, RecvData) -> + incoming(TPid, Pkt, Dict0, RecvData). + +%% incoming/4 + +incoming(TPid, Pkt, Dict0, RecvData) when is_pid(TPid) -> #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, recv(R, @@ -244,11 +252,18 @@ receive_message(TPid, Pkt, Dict0, RecvData) %% Incoming request ... recv(true, false, TPid, Pkt, Dict0, T) -> - spawn_request(TPid, Pkt, Dict0, T); + try + {request, spawn_request(TPid, Pkt, Dict0, T)} + catch + error: system_limit = E -> %% discard + ?LOG(error, E), + discard + end; %% ... answer to known request ... recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> - Pid ! {answer, Ref, Req, Dict0, Pkt}; + Pid ! {answer, Ref, Req, Dict0, Pkt}, + {answer, Pid}; %% Note that failover could have happened prior to this message being %% received and triggering failback. That is, both a failover message @@ -263,7 +278,7 @@ recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> recv(false, false, TPid, Pkt, _, _) -> ?LOG(discarded, Pkt#diameter_packet.header), incr(TPid, {{unknown, 0}, recv, discarded}), - ok. + discard. %% spawn_request/4 @@ -273,12 +288,7 @@ spawn_request(TPid, Pkt, Dict0, RecvData) -> spawn_request(TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData). spawn_request(TPid, Pkt, Dict0, Opts, RecvData) -> - try - spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts) - catch - error: system_limit = E -> %% discard - ?LOG(error, E) - end. + spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts). %% --------------------------------------------------------------------------- %% recv_request/4 diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index ea8b2fdb0e..3fd87b223e 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -449,8 +449,14 @@ transition({'DOWN', _, process, TPid, _Reason} = D, end; %% Incoming message. -transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) -> - recv(Name, Pkt, S); +transition({recv, TPid, Name, PktT}, #watchdog{transport = TPid} = S) -> + try + incoming(Name, PktT, S) + catch + #watchdog{dictionary = Dict0, receive_data = T} = NS -> + diameter_traffic:receive_message(TPid, PktT, Dict0, T), + NS + end; %% Current watchdog has timed out. transition({timeout, TRef, tw}, #watchdog{tref = TRef} = S) -> @@ -578,22 +584,32 @@ send_watchdog(#watchdog{pending = false, %% Don't count encode errors since we don't expect any on DWR/DWA. +%% incoming/3 + +incoming(Name, {Pkt, NPid}, S) -> + NS = recv(Name, Pkt, S), + NPid ! {diameter, discard}, + NS; + +incoming(Name, Pkt, S) -> + recv(Name, Pkt, S). + %% recv/3 recv(Name, Pkt, S) -> - try rcv(Name, S) of + try rcv(Name, Pkt, rcv(Name, S)) of #watchdog{} = NS -> - rcv(Name, Pkt, S), - NS + throw(NS) catch - {?MODULE, throwaway, #watchdog{} = NS} -> + #watchdog{} = NS -> %% throwaway NS end. %% rcv/3 rcv('DWR', Pkt, #watchdog{transport = TPid, - dictionary = Dict0}) -> + dictionary = Dict0} + = S) -> ?LOG(recv, 'DWR'), DPkt = diameter_codec:decode(Dict0, Pkt), diameter_traffic:incr(recv, DPkt, TPid, Dict0), @@ -610,32 +626,30 @@ rcv('DWR', Pkt, #watchdog{transport = TPid, send(TPid, {send, #diameter_packet{header = H, transport_data = T, bin = Bin}}), - ?LOG(send, 'DWA'); + ?LOG(send, 'DWA'), + throw(S); rcv('DWA', Pkt, #watchdog{transport = TPid, - dictionary = Dict0}) -> + dictionary = Dict0} + = S) -> ?LOG(recv, 'DWA'), diameter_traffic:incr(recv, Pkt, TPid, Dict0), diameter_traffic:incr_rc(recv, diameter_codec:decode(Dict0, Pkt), TPid, - Dict0); + Dict0), + throw(S); -rcv(N, _, _) +rcv(N, _, S) when N == 'CER'; N == 'CEA'; N == 'DPR' -> - false; + throw(S); %% DPR can be sent explicitly with diameter:call/4. Only the %% corresponding DPAs arrive here. -rcv(_, Pkt, #watchdog{transport = TPid, - dictionary = Dict0, - receive_data = T}) -> - diameter_traffic:receive_message(TPid, Pkt, Dict0, T). - -throwaway(S) -> - throw({?MODULE, throwaway, S}). +rcv(_, _, S)-> + S. %% rcv/2 %% @@ -652,20 +666,20 @@ throwaway(S) -> %% INITIAL Receive non-DWA Throwaway() INITIAL rcv('DWA', #watchdog{status = initial} = S) -> - throwaway(S#watchdog{pending = false}); + throw(S#watchdog{pending = false}); rcv(_, #watchdog{status = initial} = S) -> - throwaway(S); + throw(S); %% DOWN Receive DWA Pending = FALSE %% Throwaway() DOWN %% DOWN Receive non-DWA Throwaway() DOWN rcv('DWA', #watchdog{status = down} = S) -> - throwaway(S#watchdog{pending = false}); + throw(S#watchdog{pending = false}); rcv(_, #watchdog{status = down} = S) -> - throwaway(S); + throw(S); %% OKAY Receive DWA Pending = FALSE %% SetWatchdog() OKAY @@ -721,7 +735,7 @@ rcv('DWR', #watchdog{status = reopen} = S) -> S; %% ensure DWA: the RFC isn't explicit about answering rcv(_, #watchdog{status = reopen} = S) -> - throwaway(S). + throw(S). %% timeout/1 %% diff --git a/lib/diameter/src/diameter.appup.src b/lib/diameter/src/diameter.appup.src index 12b09889d1..618d5a7f10 100644 --- a/lib/diameter/src/diameter.appup.src +++ b/lib/diameter/src/diameter.appup.src @@ -47,10 +47,9 @@ {"1.9.2.2", [{restart_application, diameter}]}, %% 17.5.6.7 {"1.9.2.3", [{restart_application, diameter}]}, %% 17.5.6.8 {"1.10", [{restart_application, diameter}]}, %% 18.0 - {"1.11", [{load_module, diameter_traffic}, %% 18.1 - {update, diameter_service, {advanced, []}}]}, - {"1.11.1", [{load_module, diameter_traffic}, %% 18.2 - {update, diameter_service, {advanced, []}}]} + {"1.11", [{restart_application, diameter}]}, %% 18.1 + {"1.11.1", [{restart_application, diameter}]}, %% 18.2 + {"1.11.2", [{restart_application, diameter}]} %% 18.3 ], [ {"0.9", [{restart_application, diameter}]}, @@ -80,6 +79,7 @@ {"1.9.2.3", [{restart_application, diameter}]}, {"1.10", [{restart_application, diameter}]}, {"1.11", [{restart_application, diameter}]}, - {"1.11.1", [{restart_application, diameter}]} + {"1.11.1", [{restart_application, diameter}]}, + {"1.11.2", [{restart_application, diameter}]} ] }. diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index c79d85820b..6a5e5fe89d 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ %% -module(diameter_tcp). +-dialyzer({no_fail_call, throttle/2}). -behaviour(gen_server). @@ -102,7 +103,8 @@ | gen_tcp:listen_option(). -type option() :: {port, non_neg_integer()} - | {fragment_timer, 0..16#FFFFFFFF}. + | {fragment_timer, 0..16#FFFFFFFF} + | {throttle_cb, diameter:evaluable()}. %% Accepting/connecting transport process state. -record(transport, @@ -110,10 +112,13 @@ parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module frag = <<>> :: frag(), %% message fragment - ssl :: boolean() | [term()], %% ssl options + ssl :: [term()] | boolean(), %% ssl options, ssl or not timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference - flush = false :: boolean()}). %% flush fragment at timeout? + flush = false :: boolean(), %% flush fragment at timeout? + throttle_cb :: false | diameter:evaluable(), %% ask to receive + throttled :: boolean() | binary()}). %% stopped receiving? + %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first %% (for simplicity) transport option. The transport_module diameter_etcp @@ -198,22 +203,27 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) %% that does nothing but kill us with the parent until call %% returns. {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), - {SslOpts, Rest0} = ssl(Opts), - {OwnOpts, Rest} = own(Rest0), + {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, + fragment_timer, + throttle_cb]), + SslOpts = ssl_opts(SO), + OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), + Throttle = proplists:get_value(throttle_cb, OwnOpts, false), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs), MPid ! {stop, self()}, %% tell the monitor to die M = if SslOpts -> ssl; true -> Mod end, - setopts(M, Sock), putr(?REF_KEY, Ref), - #transport{parent = Pid, - module = M, - socket = Sock, - ssl = SslOpts, - timeout = Tmo}; + throttle(#transport{parent = Pid, + module = M, + socket = Sock, + ssl = SslOpts, + timeout = Tmo, + throttle_cb = Throttle, + throttled = false /= Throttle}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -246,14 +256,6 @@ laddr([], Mod, Sock) -> laddr([{ip, Addr}], _, _) -> Addr. -own(Opts) -> - {[Own], Rest} = proplists:split(Opts, [fragment_timer]), - {Own, Rest}. - -ssl(Opts) -> - {[SslOpts], Rest} = proplists:split(Opts, [ssl_options]), - {ssl_opts(SslOpts), Rest}. - ssl_opts([]) -> false; ssl_opts([{ssl_options, true}]) -> @@ -261,8 +263,8 @@ ssl_opts([{ssl_options, true}]) -> ssl_opts([{ssl_options, Opts}]) when is_list(Opts) -> Opts; -ssl_opts(L) -> - ?ERROR({ssl_options, L}). +ssl_opts(T) -> + ?ERROR({ssl_options, T}). %% init/7 @@ -393,7 +395,7 @@ get_port(Ps) -> gen_opts(LAddrOpt, Opts) -> {L,_} = proplists:split(Opts, [binary, packet, active]), [[],[],[]] == L orelse ?ERROR({reserved_options, Opts}), - [binary, {packet, 0}, {active, once}] ++ LAddrOpt ++ Opts. + [binary, {packet, 0}, {active, false}] ++ LAddrOpt ++ Opts. %% --------------------------------------------------------------------------- %% # ports/1 @@ -536,53 +538,37 @@ t(T,S) -> S; #transport{} = NS -> NS; - {stop, Reason} -> - x(Reason); stop -> x(T) end. %% transition/2 -%% Initial incoming message when we might need to upgrade to TLS: -%% don't request another message until we know. - -transition({tcp, Sock, Bin}, #transport{socket = Sock, - parent = Pid, - frag = Head, - module = M, - ssl = Opts} - = S) - when is_list(Opts) -> - case rcv(Head, Bin) of - {Msg, B} when is_binary(Msg) -> - diameter_peer:recv(Pid, Msg), - S#transport{frag = B}; - Frag -> - setopts(M, Sock), - start_fragment_timer(S#transport{frag = Frag}) - end; - %% Incoming message. transition({P, Sock, Bin}, #transport{socket = Sock, - module = M, - ssl = B} + ssl = B, + throttled = T} = S) - when P == tcp, not B; - P == ssl, B -> - setopts(M, Sock), - start_fragment_timer(recv(Bin, S)); + when P == ssl, true == B; + P == tcp -> + false = T, %% assert + recv(Bin, S); + +%% Make a new throttling callback after a timeout. +transition(throttle, #transport{throttled = false}) -> + ok; +transition(throttle, S) -> + throttle(S); %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = S) -> - #transport{socket = Sock, - module = M} + true = is_boolean(B), %% assert + #transport{} = NS = tls_handshake(Type, B, S), Pid ! {diameter, {tls, Ref}}, - setopts(M, Sock), - start_fragment_timer(NS#transport{ssl = B}); + throttle(NS#transport{ssl = B}); transition({C, Sock}, #transport{socket = Sock, ssl = B}) @@ -598,14 +584,8 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, ?ERROR({T,S}); %% Outgoing message. -transition({diameter, {send, Bin}}, #transport{socket = Sock, - module = M}) -> - case send(M, Sock, Bin) of - ok -> - ok; - {error, Reason} -> - {stop, {send, Reason}} - end; +transition({diameter, {send, Bin}}, S) -> + send(Bin, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, @@ -672,16 +652,25 @@ tls(accept, Sock, Opts) -> %% Reassemble fragmented messages and extract multiple message sent %% using Nagle. -recv(Bin, #transport{parent = Pid, frag = Head} = S) -> +%% Receive packets until a full message is received, +recv(Bin, #transport{frag = Head, throttled = false} = S) -> case rcv(Head, Bin) of - {Msg, B} when is_binary(Msg) -> - diameter_peer:recv(Pid, Msg), - recv(B, S#transport{frag = <<>>}); + {Msg, B} -> + throttle(S#transport{frag = B, throttled = Msg}); Frag -> - S#transport{frag = Frag, - flush = false} + setopts(S), + start_fragment_timer(S#transport{frag = Frag, + flush = false}) end. +%% recv/1 + +recv(#transport{throttled = false} = S) -> + recv(<<>>, S); + +recv(#transport{} = S) -> + S. + %% rcv/2 %% No previous fragment. @@ -765,8 +754,10 @@ bin(Bin) %% since all messages with length problems are discarded this should %% also eventually lead to watchdog failover. -%% No fragment to flush. -flush(#transport{frag = <<>>} = S) -> +%% No fragment to flush or not receiving messages. +flush(#transport{frag = Frag, throttled = B} = S) + when Frag == <<>>; + B /= false -> S; %% Messages have been received since last timer expiry. @@ -807,6 +798,17 @@ accept(Mod, LSock) -> connect(Mod, Host, Port, Opts) -> Mod:connect(Host, Port, Opts). +%% send/2 + +send(Bin, #transport{socket = Sock, + module = M}) -> + case send(M, Sock, Bin) of + ok -> + ok; + {error, Reason} -> + x({send, Reason}) + end. + %% send/3 send(gen_tcp, Sock, Bin) -> @@ -825,6 +827,11 @@ setopts(ssl, Sock, Opts) -> setopts(M, Sock, Opts) -> M:setopts(Sock, Opts). +%% setopts/1 + +setopts(#transport{socket = Sock, module = M}) -> + setopts(M, Sock). + %% setopts/2 setopts(M, Sock) -> @@ -833,6 +840,110 @@ setopts(M, Sock) -> X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect end. +%% throttle/1 + +%% Still collecting packets for a complete message: keep receiving. +throttle(#transport{throttled = false} = S) -> + recv(S); + +%% Decide whether to receive another, or whether to accept a message +%% that's been received. +throttle(#transport{throttle_cb = F, throttled = T} = S) -> + Res = cb(F, T), + + try throttle(Res, S) of + #transport{ssl = SB} = NS when is_boolean(SB) -> + throttle(defrag(NS)); + #transport{throttled = Msg} = NS when is_binary(Msg) -> + %% Initial incoming message when we might need to upgrade + %% to TLS: wait for reception of a tls tuple. + defrag(NS) + catch + #transport{} = NS -> + recv(NS) + end. + +%% cb/2 + +cb(false, _) -> + ok; + +cb(F, B) -> + diameter_lib:eval([F, true /= B andalso B]). + +%% throttle/2 + +%% Callback says to receive another message. +throttle(ok, #transport{throttled = true} = S) -> + throw(S#transport{throttled = false}); + +%% Callback says to accept a received message. +throttle(ok, #transport{parent = Pid, throttled = Msg} = S) + when is_binary(Msg) -> + diameter_peer:recv(Pid, Msg), + S; + +throttle({ok = T, F}, S) -> + throttle(T, S#transport{throttle_cb = F}); + +%% Callback says to accept a received message and acknowledged the +%% returned pid with a {request, Pid} message if a request pid is +%% spawned, a discard message otherwise. The latter does not mean that +%% the message was necessarily discarded: it could have been an +%% answer. +throttle(NPid, #transport{parent = Pid, throttled = Msg} = S) + when is_pid(NPid), is_binary(Msg) -> + diameter_peer:recv(Pid, {Msg, NPid}), + S; + +throttle({NPid, F}, #transport{throttled = Msg} = S) + when is_pid(NPid), is_binary(Msg) -> + throttle(NPid, S#transport{throttle_cb = F}); + +%% Callback to accept a received message says to discard it. +throttle(discard, #transport{throttled = Msg} = S) + when is_binary(Msg) -> + S; + +throttle({discard = T, F}, #transport{throttled = Msg} = S) + when is_binary(Msg) -> + throttle(T, S#transport{throttle_cb = F}); + +%% Callback to accept a received message says to answer it with the +%% supplied binary. +throttle(Bin, #transport{throttled = Msg} = S) + when is_binary(Bin), is_binary(Msg) -> + send(Bin, S), + S; + +throttle({Bin, F}, #transport{throttled = Msg} = S) + when is_binary(Bin), is_binary(Msg) -> + throttle(Bin, S#transport{throttle_cb = F}); + +%% Callback says to ask again in the specified number of milliseconds. +throttle({timeout, Tmo}, S) -> + erlang:send_after(Tmo, self(), throttle), + throw(S); + +throttle({timeout = T, Tmo, F}, S) -> + throttle({T, Tmo}, S#transport{throttle_cb = F}); + +throttle(T, #transport{throttle_cb = F}) -> + ?ERROR({invalid_return, T, F}). + +%% defrag/1 +%% +%% Try to extract another message from packets already read before +%% another throttling callback. + +defrag(#transport{frag = Head} = S) -> + case rcv(Head, <<>>) of + {Msg, B} -> + S#transport{throttled = Msg, frag = B}; + _ -> + S#transport{throttled = true} + end. + %% portnr/2 portnr(gen_tcp, Sock) -> |