From 2ffb288d8daeb72c27c5cead30ce779682bdd8b0 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 5 Mar 2016 16:18:52 +0100 Subject: Let throttling callback return a notification pid In addition to returning ok or {timeout, Tmo}, let a throttling callback for message reception return a pid(), which is then notified if the message in question is either discarded or results in a request process. Notification is by way of messages of the form {diameter, discard | {request, pid()}} where the pid is that of a request process resulting from the received message. This allows the notification process to keep track of the maximum number of request processes a peer connection can have given rise to. --- lib/diameter/src/base/diameter_peer_fsm.erl | 33 ++++++++++++--- lib/diameter/src/base/diameter_traffic.erl | 30 ++++++++++---- lib/diameter/src/base/diameter_watchdog.erl | 64 ++++++++++++++++++----------- 3 files changed, 88 insertions(+), 39 deletions(-) (limited to 'lib/diameter/src/base') diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index a9ee4940a3..83e0dda501 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -426,8 +426,8 @@ transition({connection_timeout, _}, _) -> ok; %% Incoming message from the transport. -transition({diameter, {recv, Pkt}}, S) -> - recv(Pkt, S); +transition({diameter, {recv, MsgT}}, S) -> + incoming(MsgT, S); %% Timeout when still in the same state ... transition({timeout = T, PS}, #state{state = PS}) -> @@ -553,6 +553,28 @@ encode(Rec, Dict) -> diameter_codec:encode(Dict, #diameter_packet{header = Hdr, msg = Rec}). +%% incoming/2 + +incoming({Msg, NPid}, S) -> + try recv(Msg, S) of + T -> + NPid ! {diameter, discard}, + T + catch + {?MODULE, Name, Pkt} -> + S#state.parent ! {recv, self(), Name, {Pkt, NPid}}, + rcv(Name, Pkt, S) + end; + +incoming(Msg, S) -> + try + recv(Msg, S) + catch + {?MODULE, Name, Pkt} -> + S#state.parent ! {recv, self(), Name, Pkt}, + rcv(Name, Pkt, S) + end. + %% recv/2 recv(#diameter_packet{header = #diameter_header{} = Hdr} @@ -607,9 +629,8 @@ recv1('DPA' = N, %% Any other message with a header and no length errors: send to the %% parent. -recv1(Name, Pkt, #state{parent = Pid} = S) -> - Pid ! {recv, self(), Name, Pkt}, - rcv(Name, Pkt, S). +recv1(Name, Pkt, #state{}) -> + throw({?MODULE, Name, Pkt}). %% recv/3 diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index 5d39c08213..fba4d3736b 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -232,7 +232,20 @@ pending(TPids) -> %% used to come through the service process but this avoids that %% becoming a bottleneck. -receive_message(TPid, Pkt, Dict0, RecvData) +receive_message(TPid, {Pkt, NPid}, Dict0, RecvData) -> + NPid ! {diameter, case incoming(TPid, Pkt, Dict0, RecvData) of + Pid when is_pid(Pid) -> + {request, Pid}; + _ -> + discard + end}; + +receive_message(TPid, Pkt, Dict0, RecvData) -> + incoming(TPid, Pkt, Dict0, RecvData). + +%% incoming/4 + +incoming(TPid, Pkt, Dict0, RecvData) when is_pid(TPid) -> #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, recv(R, @@ -246,7 +259,13 @@ receive_message(TPid, Pkt, Dict0, RecvData) %% Incoming request ... recv(true, false, TPid, Pkt, Dict0, T) -> - spawn_request(TPid, Pkt, Dict0, T); + try + spawn_request(TPid, Pkt, Dict0, T) + catch + error: system_limit = E -> %% discard + ?LOG(error, E), + {error, E} + end; %% ... answer to known request ... recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> @@ -275,12 +294,7 @@ spawn_request(TPid, Pkt, Dict0, RecvData) -> spawn_request(TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData). spawn_request(TPid, Pkt, Dict0, Opts, RecvData) -> - try - spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts) - catch - error: system_limit = E -> %% discard - ?LOG(error, E) - end. + spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts). %% --------------------------------------------------------------------------- %% recv_request/4 diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 26bca7a5bc..43a8a5223f 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -447,8 +447,14 @@ transition({'DOWN', _, process, TPid, _Reason} = D, end; %% Incoming message. -transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) -> - recv(Name, Pkt, S); +transition({recv, TPid, Name, PktT}, #watchdog{transport = TPid} = S) -> + try + incoming(Name, PktT, S) + catch + #watchdog{dictionary = Dict0, receive_data = T} = NS -> + diameter_traffic:receive_message(TPid, PktT, Dict0, T), + NS + end; %% Current watchdog has timed out. transition({timeout, TRef, tw}, #watchdog{tref = TRef} = S) -> @@ -576,22 +582,32 @@ send_watchdog(#watchdog{pending = false, %% Don't count encode errors since we don't expect any on DWR/DWA. +%% incoming/3 + +incoming(Name, {Pkt, NPid}, S) -> + NS = recv(Name, Pkt, S), + NPid ! {diameter, discard}, + NS; + +incoming(Name, Pkt, S) -> + recv(Name, Pkt, S). + %% recv/3 recv(Name, Pkt, S) -> - try rcv(Name, S) of + try rcv(Name, Pkt, rcv(Name, S)) of #watchdog{} = NS -> - rcv(Name, Pkt, S), - NS + throw(NS) catch - {?MODULE, throwaway, #watchdog{} = NS} -> + #watchdog{} = NS -> %% throwaway NS end. %% rcv/3 rcv('DWR', Pkt, #watchdog{transport = TPid, - dictionary = Dict0}) -> + dictionary = Dict0} + = S) -> ?LOG(recv, 'DWR'), DPkt = diameter_codec:decode(Dict0, Pkt), diameter_traffic:incr(recv, DPkt, TPid, Dict0), @@ -608,32 +624,30 @@ rcv('DWR', Pkt, #watchdog{transport = TPid, send(TPid, {send, #diameter_packet{header = H, transport_data = T, bin = Bin}}), - ?LOG(send, 'DWA'); + ?LOG(send, 'DWA'), + throw(S); rcv('DWA', Pkt, #watchdog{transport = TPid, - dictionary = Dict0}) -> + dictionary = Dict0} + = S) -> ?LOG(recv, 'DWA'), diameter_traffic:incr(recv, Pkt, TPid, Dict0), diameter_traffic:incr_rc(recv, diameter_codec:decode(Dict0, Pkt), TPid, - Dict0); + Dict0), + throw(S); -rcv(N, _, _) +rcv(N, _, S) when N == 'CER'; N == 'CEA'; N == 'DPR' -> - false; + throw(S); %% DPR can be sent explicitly with diameter:call/4. Only the %% corresponding DPAs arrive here. -rcv(_, Pkt, #watchdog{transport = TPid, - dictionary = Dict0, - receive_data = T}) -> - diameter_traffic:receive_message(TPid, Pkt, Dict0, T). - -throwaway(S) -> - throw({?MODULE, throwaway, S}). +rcv(_, _, S)-> + S. %% rcv/2 %% @@ -650,20 +664,20 @@ throwaway(S) -> %% INITIAL Receive non-DWA Throwaway() INITIAL rcv('DWA', #watchdog{status = initial} = S) -> - throwaway(S#watchdog{pending = false}); + throw(S#watchdog{pending = false}); rcv(_, #watchdog{status = initial} = S) -> - throwaway(S); + throw(S); %% DOWN Receive DWA Pending = FALSE %% Throwaway() DOWN %% DOWN Receive non-DWA Throwaway() DOWN rcv('DWA', #watchdog{status = down} = S) -> - throwaway(S#watchdog{pending = false}); + throw(S#watchdog{pending = false}); rcv(_, #watchdog{status = down} = S) -> - throwaway(S); + throw(S); %% OKAY Receive DWA Pending = FALSE %% SetWatchdog() OKAY @@ -719,7 +733,7 @@ rcv('DWR', #watchdog{status = reopen} = S) -> S; %% ensure DWA: the RFC isn't explicit about answering rcv(_, #watchdog{status = reopen} = S) -> - throwaway(S). + throw(S). %% timeout/1 %% -- cgit v1.2.3 From c322099e7e7efeb01577e4c8efd52579beb90949 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 13 Mar 2016 07:26:30 +0100 Subject: Acknowledge answers to notification pids when throttling By sending {diameter, {answer, pid()}} when an incoming answer is sent to the specified pid, instead of a discard message as previously. The latter now literally means that the message has been discarded. --- lib/diameter/src/base/diameter_traffic.erl | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) (limited to 'lib/diameter/src/base') diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index fba4d3736b..724fa855d8 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -233,12 +233,7 @@ pending(TPids) -> %% becoming a bottleneck. receive_message(TPid, {Pkt, NPid}, Dict0, RecvData) -> - NPid ! {diameter, case incoming(TPid, Pkt, Dict0, RecvData) of - Pid when is_pid(Pid) -> - {request, Pid}; - _ -> - discard - end}; + NPid ! {diameter, incoming(TPid, Pkt, Dict0, RecvData)}; receive_message(TPid, Pkt, Dict0, RecvData) -> incoming(TPid, Pkt, Dict0, RecvData). @@ -260,16 +255,17 @@ incoming(TPid, Pkt, Dict0, RecvData) %% Incoming request ... recv(true, false, TPid, Pkt, Dict0, T) -> try - spawn_request(TPid, Pkt, Dict0, T) + {request, spawn_request(TPid, Pkt, Dict0, T)} catch error: system_limit = E -> %% discard ?LOG(error, E), - {error, E} + discard end; %% ... answer to known request ... recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> - Pid ! {answer, Ref, Req, Dict0, Pkt}; + Pid ! {answer, Ref, Req, Dict0, Pkt}, + {answer, Pid}; %% Note that failover could have happened prior to this message being %% received and triggering failback. That is, both a failover message @@ -284,7 +280,7 @@ recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> recv(false, false, TPid, Pkt, _, _) -> ?LOG(discarded, Pkt#diameter_packet.header), incr(TPid, {{unknown, 0}, recv, discarded}), - ok. + discard. %% spawn_request/4 -- cgit v1.2.3