From a5bc8a5911613ec9ddfef9984ee59a24110c8b2b Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 6 Feb 2017 15:49:44 +0100 Subject: Fix/redo failover optimization Commit 9a878743 addressed inefficiency at failover, but introduced inefficiency in the sending of outgoing requests in so doing: each outgoing request added an request table entry keyed on a transport pid, then looked for a specific element with this key, and then (later) removed the inserted element. Since the request table is a bag, this results in linear searches over a potentially long list of element keyed on the same pid. The higher the rate of outgoing calls, the more costly it becomes. Instead of writing entries to the request table, the peer_up/down calls to diameter_traffic that mirror transitions to and from the OKAY state in the RFC 3539 watchdog state machine now result in a process for request processes to monitor in order to detect failover. --- lib/diameter/src/base/diameter_traffic.erl | 104 ++++++++++++++++------------- 1 file changed, 57 insertions(+), 47 deletions(-) diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index d93a3e71e3..0720057fe5 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -43,8 +43,9 @@ peer_down/1, pending/1]). -%% towards ?MODULE --export([send/1]). %% send from remote node +%% internal +-export([send/1, %% send from remote node + init/1]). %% monitor process start -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). @@ -113,26 +114,30 @@ make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) -> %% peer_up/1 %% --------------------------------------------------------------------------- -%% Insert an element that is used to detect whether or not there has -%% been a failover when inserting an outgoing request. +%% Start a process that dies with peer_down/1, on which request +%% processes can monitor. The transport process started in +%% diameter_peer_fsm could be used for this, but starting a new +%% process here is easier: don't otherwise have access to the +%% transport process here (TPid here is the diameter_peer_fsm +%% process), and the need for a process to monitor came much later +%% than the calls to peer_up/down. peer_up(TPid) -> - ets:insert(?REQUEST_TABLE, {TPid}). + proc_lib:start(?MODULE, init, [TPid]). + +init(TPid) -> + ets:insert(?REQUEST_TABLE, {TPid, self()}), + proc_lib:init_ack(self()), + proc_lib:hibernate(erlang, exit, [{shutdown, TPid}]). %% --------------------------------------------------------------------------- %% peer_down/1 %% --------------------------------------------------------------------------- peer_down(TPid) -> - ets:delete_object(?REQUEST_TABLE, {TPid}), - lists:foreach(fun failover/1, ets:lookup(?REQUEST_TABLE, TPid)). -%% Note that a request process can store its request after failover -%% notifications are sent here: insert_request/2 sends the notification -%% in that case. - -%% failover/1 - -failover({_TPid, {Pid, TRef}}) -> - Pid ! {failover, TRef}. + [{_, Pid}] = ets:lookup(?REQUEST_TABLE, TPid), + ets:delete(?REQUEST_TABLE, TPid), + Pid ! ok, %% make it die + Pid. %% --------------------------------------------------------------------------- %% incr/4 @@ -1503,15 +1508,15 @@ send_R(Pkt0, packet = Pkt0}, incr(send, Pkt, TPid, AppDict), - TRef = send_request(TPid, Pkt, Req, SvcName, Timeout), + {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Timeout), Pid ! Ref, %% tell caller a send has been attempted handle_answer(SvcName, App, - recv_A(Timeout, SvcName, App, Opts, {TRef, Req})). + recv_A(Timeout, SvcName, App, Opts, {TRef, MRef, Req})). %% recv_A/5 -recv_A(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} = Req}) -> +recv_A(Timeout, SvcName, App, Opts, {TRef, MRef, #request{ref = Ref} = Req}) -> %% Matching on TRef below ensures we ignore messages that pertain %% to a previous transport prior to failover. The answer message %% includes the #request{} since it's not necessarily Req; that @@ -1521,14 +1526,21 @@ recv_A(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} = Req}) -> {A, Rq, Dict0, Pkt}; {timeout = Reason, TRef, _} -> %% No timely reply {error, Req, Reason}; - {failover, TRef} -> %% Service says peer has gone down - retransmit(pick_peer(SvcName, App, Req, Opts), - Req, - Opts, - SvcName, - Timeout) + {'DOWN', MRef, process, _, _} when false /= MRef -> %% local peer_down + failover(SvcName, App, Req, Opts, Timeout); + {failover, TRef} -> %% local or remote peer_down + failover(SvcName, App, Req, Opts, Timeout) end. +%% failover/5 + +failover(SvcName, App, Req, Opts, Timeout) -> + retransmit(pick_peer(SvcName, App, Req, Opts), + Req, + Opts, + SvcName, + Timeout). + %% handle_answer/3 handle_answer(SvcName, App, {error, Req, Reason}) -> @@ -1711,15 +1723,16 @@ send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout) when node() == node(TPid) -> Seqs = diameter_codec:sequence_numbers(Bin), TRef = erlang:start_timer(Timeout, self(), TPid), - Entry = {Seqs, #request{handler = Pid} = Req, TRef}, + Entry = {Seqs, Req, TRef}, %% Ensure that request table is cleaned even if the process is %% killed. - spawn(fun() -> diameter_lib:wait([Pid]), delete_request(Entry) end), + Self = self(), + spawn(fun() -> diameter_lib:wait([Self]), delete_request(Entry) end), - insert_request(Entry), + MRef = insert_request(Entry), send(TPid, Pkt), - TRef; + {TRef, MRef}; %% Send using a remote transport: spawn a process on the remote node %% to relay the answer. @@ -1727,7 +1740,7 @@ send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) -> TRef = erlang:start_timer(Timeout, self(), TPid), T = {TPid, Pkt, Req, SvcName, Timeout, TRef}, spawn(node(TPid), ?MODULE, send, [T]), - TRef. + {TRef, false}. %% send/1 @@ -1739,10 +1752,12 @@ send({TPid, Pkt, #request{handler = Pid} = Req0, SvcName, Timeout, TRef}) -> %% %% Relay an answer from a remote node. -recv(TPid, Pid, TRef, LocalTRef) -> +recv(TPid, Pid, TRef, {LocalTRef, MRef}) -> receive {answer, _, _, _, _} = A -> Pid ! A; + {'DOWN', MRef, process, _, _} -> + Pid ! {failover, TRef}; {failover = T, LocalTRef} -> Pid ! {T, TRef}; T -> @@ -1822,24 +1837,20 @@ resend_request(Pkt0, ?LOG(retransmission, Pkt#diameter_packet.header), incr(TPid, {msg_id(Pkt, AppDict), send, retransmission}), - TRef = send_request(TPid, Pkt, Req, SvcName, Tmo), - {TRef, Req}. + {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Tmo), + {TRef, MRef, Req}. %% insert_request/1 insert_request({_Seqs, #request{transport = TPid}, TRef} = T) -> - ets:insert(?REQUEST_TABLE, [T, {TPid, {self(), TRef}}]), - is_peer_up(TPid) - orelse (self() ! {failover, TRef}). %% failover/1 may have missed - -%% is_peer_up/1 -%% -%% Is the entry written by peer_up/1 and deleted by peer_down/1 still -%% in the request table? - -is_peer_up(TPid) -> - Spec = [{{TPid}, [], ['$_']}], - '$end_of_table' /= ets:select(?REQUEST_TABLE, Spec, 1). + ets:insert(?REQUEST_TABLE, T), + case ets:lookup(?REQUEST_TABLE, TPid) of %% at peer_up/1 + [{_, Pid}] -> + monitor(process, Pid); + [] -> %% transport has gone down + self() ! {failover, TRef}, + false + end. %% lookup_request/2 %% @@ -1861,9 +1872,8 @@ lookup_request(Msg, TPid) -> %% delete_request/1 -delete_request({_Seqs, #request{handler = Pid, transport = TPid}, TRef} = T) -> - Spec = [{R, [], [true]} || R <- [T, {TPid, {Pid, TRef}}]], - ets:select_delete(?REQUEST_TABLE, Spec). +delete_request(T) -> + ets:delete_object(?REQUEST_TABLE, T). %% have_request/2 -- cgit v1.2.3 From a4da06a56f778b7ac44c33a4fea38d1f60cac28d Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 2 Mar 2017 12:57:19 +0100 Subject: Don't use request table for answer routing The table has existed forever, to route incoming answers to a waiting request process: each outgoing request writes to the table, and each incoming answer reads. This has been seen to suffer from lock contention at high load however, so this commit moves the routing into the diameter_peer_fsm processes that are diameter's conduit to transport processes. The request table is still used for failover detection, but entries are only written when a watchdog state transitions leaves or enters state OKAY. --- lib/diameter/src/base/diameter_peer_fsm.erl | 60 ++++++++-- lib/diameter/src/base/diameter_service.erl | 18 +-- lib/diameter/src/base/diameter_sup.erl | 4 +- lib/diameter/src/base/diameter_traffic.erl | 174 +++++++++------------------- lib/diameter/src/base/diameter_watchdog.erl | 28 +++-- 5 files changed, 125 insertions(+), 159 deletions(-) diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 996e75a8d3..0c24ebeb19 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -454,6 +454,9 @@ transition({timeout, _}, _) -> %% Outgoing message. transition({send, Msg}, S) -> outgoing(Msg, S); +transition({send, Route, Msg}, S) -> + put_route(Route), + outgoing(Msg, S); %% Request for graceful shutdown at remove_transport, stop_service of %% application shutdown. @@ -483,8 +486,10 @@ transition({'DOWN', _, process, TPid, _}, = S) -> start_next(S); -%% Transport has died after connection timeout. -transition({'DOWN', _, process, _, _}, _) -> +%% Transport has died after connection timeout, or handler process has +%% died. +transition({'DOWN', _, process, Pid, _}, _) -> + erase_route(Pid), ok; %% State query. @@ -494,6 +499,43 @@ transition({state, Pid}, #state{state = S, transport = TPid}) -> %% Crash on anything unexpected. +%% put_route/1 +%% +%% Map identifiers in an outgoing request to be able to lookup the +%% handler process when the answer is received. + +put_route(false) -> + ok; + +put_route({Pid, Ref, Seqs}) -> + MRef = monitor(process, Pid), + put(Pid, Seqs), + put(Seqs, {Pid, Ref, MRef}). + +%% get_route/1 + +get_route(#diameter_packet{header = #diameter_header{is_request = false}} + = Pkt) -> + Seqs = diameter_codec:sequence_numbers(Pkt), + case erase(Seqs) of + {Pid, Ref, MRef} -> + demonitor(MRef), + erase(Pid), + {Pid, Ref, self()}; + undefined -> + false + end; + +get_route(_) -> + false. + +%% erase_route/1 + +erase_route(Pid) -> + erase(erase(Pid)). + +%% capx/1 + capx(recv_CER) -> 'CER'; capx({'Wait-CEA', _, _}) -> @@ -576,8 +618,7 @@ incoming({Msg, NPid}, S) -> T catch {?MODULE, Name, Pkt} -> - S#state.parent ! {recv, self(), Name, {Pkt, NPid}}, - rcv(Name, Pkt, S) + incoming(Name, Pkt, NPid, S) end; incoming(Msg, S) -> @@ -585,10 +626,15 @@ incoming(Msg, S) -> recv(Msg, S) catch {?MODULE, Name, Pkt} -> - S#state.parent ! {recv, self(), Name, Pkt}, - rcv(Name, Pkt, S) + incoming(Name, Pkt, false, S) end. +%% incoming/4 + +incoming(Name, Pkt, NPid, #state{parent = Pid} = S) -> + Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid}, + rcv(Name, Pkt, S). + %% recv/2 recv(#diameter_packet{header = #diameter_header{} = Hdr} diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index ccf68f4d93..e4f77e3a24 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -1858,13 +1858,6 @@ eq(Any, Id, PeerId) -> %% OctetString() can be specified as an iolist() so test for string %% rather then term equality. -%% transports/1 - -transports(#state{watchdogT = WatchdogT}) -> - ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'}, - [{'is_pid', '$1'}], - ['$1']}]). - %% --------------------------------------------------------------------------- %% # service_info/2 %% --------------------------------------------------------------------------- @@ -1887,7 +1880,6 @@ transports(#state{watchdogT = WatchdogT}) -> -define(ALL_INFO, [capabilities, applications, transport, - pending, options]). %% The rest. @@ -1981,7 +1973,6 @@ complete_info(Item, #state{service = Svc} = S) -> applications -> info_apps(S); transport -> info_transport(S); options -> info_options(S); - pending -> info_pending(S); keys -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO; all -> service_info(?ALL_INFO, S); statistics -> info_stats(S); @@ -2189,13 +2180,6 @@ info_apps(#state{service = #diameter_service{applications = Apps}}) -> mk_app(#diameter_app{} = A) -> lists:zip(record_info(fields, diameter_app), tl(tuple_to_list(A))). -%% info_pending/1 -%% -%% One entry for each outgoing request whose answer is outstanding. - -info_pending(#state{} = S) -> - diameter_traffic:pending(transports(S)). - %% info_info/1 %% %% Extract process_info from connections info. diff --git a/lib/diameter/src/base/diameter_sup.erl b/lib/diameter/src/base/diameter_sup.erl index 482289cb9a..01c51f0856 100644 --- a/lib/diameter/src/base/diameter_sup.erl +++ b/lib/diameter/src/base/diameter_sup.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ -define(TABLES, [{diameter_sequence, [set]}, {diameter_service, [set, {keypos, 3}]}, - {diameter_request, [bag]}, + {diameter_request, [set]}, {diameter_config, [bag, {keypos, 2}]}]). %% start_link/0 diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index 0720057fe5..a39e0502d1 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2013-2016. All Rights Reserved. +%% Copyright Ericsson AB 2013-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ -export([send_request/4]). %% towards diameter_watchdog --export([receive_message/4]). +-export([receive_message/6]). %% towards diameter_peer_fsm and diameter_watchdog -export([incr/4, @@ -40,8 +40,7 @@ %% towards diameter_service -export([make_recvdata/1, peer_up/1, - peer_down/1, - pending/1]). + peer_down/1]). %% internal -export([send/1, %% send from remote node @@ -58,14 +57,12 @@ -define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests -define(DEFAULT_SPAWN_OPTS, []). -%% Table containing outgoing requests for which a reply has yet to be -%% received. +%% Table containing outgoing entries that live and die with +%% peer_up/down. The name is historic, since the table used to contain +%% information about outgoing requests for which an answer has yet to +%% be received. -define(REQUEST_TABLE, diameter_request). -%% Workaround for dialyzer's lack of understanding of match specs. --type match(T) - :: T | '_' | '$1' | '$2' | '$3' | '$4'. - %% Record diameter:call/4 options are parsed into. -record(options, {filter = none :: diameter:peer_filter(), @@ -73,7 +70,7 @@ timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF, detach = false :: boolean()}). -%% Term passed back to receive_message/4 with every incoming message. +%% Term passed back to receive_message/6 with every incoming message. -record(recvdata, {peerT :: ets:tid(), service_name :: diameter:service_name(), @@ -88,12 +85,12 @@ %% Record stored in diameter_request for each outgoing request. -record(request, - {ref :: match(reference()), %% used to receive answer - caller :: match(pid()), %% calling process - handler :: match(pid()), %% request process - transport :: match(pid()), %% peer process - caps :: match(#diameter_caps{}), %% of connection - packet :: match(#diameter_packet{})}). %% of request + {ref :: reference(), %% used to receive answer + caller :: pid() | undefined, %% calling process + handler :: pid(), %% request process + transport :: pid() | undefined, %% peer process + caps :: #diameter_caps{} | undefined, %% of connection + packet :: #diameter_packet{} | undefined}). %% of request %% --------------------------------------------------------------------------- %% # make_recvdata/1 @@ -115,12 +112,9 @@ make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) -> %% --------------------------------------------------------------------------- %% Start a process that dies with peer_down/1, on which request -%% processes can monitor. The transport process started in -%% diameter_peer_fsm could be used for this, but starting a new -%% process here is easier: don't otherwise have access to the -%% transport process here (TPid here is the diameter_peer_fsm -%% process), and the need for a process to monitor came much later -%% than the calls to peer_up/down. +%% processes can monitor. There is no other process that dies with +%% peer_down since failover doesn't imply the loss of transport in the +%% case of a watchdog transition into state SUSPECT. peer_up(TPid) -> proc_lib:start(?MODULE, init, [TPid]). @@ -212,54 +206,25 @@ incr_rc(Dir, Pkt, TPid, Dict0) -> incr_rc(Dir, Pkt, TPid, {Dict0, Dict0, Dict0}). %% --------------------------------------------------------------------------- -%% pending/1 -%% --------------------------------------------------------------------------- - -pending(TPids) -> - MatchSpec = [{{'$1', - #request{caller = '$2', - handler = '$3', - transport = '$4', - _ = '_'}, - '_'}, - [?ORCOND([{'==', T, '$4'} || T <- TPids])], - [{{'$1', [{{caller, '$2'}}, - {{handler, '$3'}}, - {{transport, '$4'}}]}}]}], - - try - ets:select(?REQUEST_TABLE, MatchSpec) - catch - error: badarg -> [] %% service has gone down - end. - -%% --------------------------------------------------------------------------- -%% # receive_message/4 +%% # receive_message/6 %% %% Handle an incoming Diameter message. %% --------------------------------------------------------------------------- -%% Handle an incoming Diameter message in the watchdog process. This -%% used to come through the service process but this avoids that -%% becoming a bottleneck. +%% Handle an incoming Diameter message in the watchdog process. -receive_message(TPid, {Pkt, NPid}, Dict0, RecvData) -> - NPid ! {diameter, incoming(TPid, Pkt, Dict0, RecvData)}; +receive_message(TPid, Route, Pkt, false, Dict0, RecvData) -> + incoming(TPid, Route, Pkt, Dict0, RecvData); -receive_message(TPid, Pkt, Dict0, RecvData) -> - incoming(TPid, Pkt, Dict0, RecvData). +receive_message(TPid, Route, Pkt, NPid, Dict0, RecvData) -> + NPid ! {diameter, incoming(TPid, Route, Pkt, Dict0, RecvData)}. %% incoming/4 -incoming(TPid, Pkt, Dict0, RecvData) +incoming(TPid, Route, Pkt, Dict0, RecvData) when is_pid(TPid) -> #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, - recv(R, - (not R) andalso lookup_request(Pkt, TPid), - TPid, - Pkt, - Dict0, - RecvData). + recv(R, Route, TPid, Pkt, Dict0, RecvData). %% recv/6 @@ -274,8 +239,8 @@ recv(true, false, TPid, Pkt, Dict0, T) -> end; %% ... answer to known request ... -recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> - Pid ! {answer, Ref, Req, Dict0, Pkt}, +recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) -> + Pid ! {answer, Ref, TPid, Dict0, Pkt}, {answer, Pid}; %% Note that failover could have happened prior to this message being @@ -546,7 +511,7 @@ send_A(T, TPid, {AppDict, Dict0} = DictT0, ReqPkt, EvalPktFs, EvalFs) -> {MsgDict, Pkt} = reply(T, TPid, DictT0, EvalPktFs, ReqPkt), incr(send, Pkt, TPid, AppDict), incr_rc(send, Pkt, TPid, {MsgDict, AppDict, Dict0}), %% count outgoing - send(TPid, Pkt), + send(TPid, false, Pkt), lists:foreach(fun diameter_lib:eval/1, EvalFs). %% answer/6 @@ -1508,7 +1473,7 @@ send_R(Pkt0, packet = Pkt0}, incr(send, Pkt, TPid, AppDict), - {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Timeout), + {TRef, MRef} = zend_requezt(TPid, Pkt, Req, SvcName, Timeout), Pid ! Ref, %% tell caller a send has been attempted handle_answer(SvcName, App, @@ -1519,11 +1484,11 @@ send_R(Pkt0, recv_A(Timeout, SvcName, App, Opts, {TRef, MRef, #request{ref = Ref} = Req}) -> %% Matching on TRef below ensures we ignore messages that pertain %% to a previous transport prior to failover. The answer message - %% includes the #request{} since it's not necessarily Req; that - %% is, from the last peer to which we've transmitted. + %% includes the pid of the transport on which it was received, + %% which may not be the last peer to which we've transmitted. receive - {answer = A, Ref, Rq, Dict0, Pkt} -> %% Answer from peer - {A, Rq, Dict0, Pkt}; + {answer = A, Ref, TPid, Dict0, Pkt} -> %% Answer from peer + {A, #request{} = erase(TPid), Dict0, Pkt}; {timeout = Reason, TRef, _} -> %% No timely reply {error, Req, Reason}; {'DOWN', MRef, process, _, _} when false /= MRef -> %% local peer_down @@ -1717,22 +1682,20 @@ encode(DictT, TPid, #diameter_packet{bin = undefined} = Pkt) -> encode(_, _, #diameter_packet{} = Pkt) -> Pkt. +%% zend_requezt/5 + +zend_requezt(TPid, Pkt, Req, SvcName, Timeout) -> + put(TPid, Req), + send_request(TPid, Pkt, Req, SvcName, Timeout). + %% send_request/5 send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout) when node() == node(TPid) -> Seqs = diameter_codec:sequence_numbers(Bin), TRef = erlang:start_timer(Timeout, self(), TPid), - Entry = {Seqs, Req, TRef}, - - %% Ensure that request table is cleaned even if the process is - %% killed. - Self = self(), - spawn(fun() -> diameter_lib:wait([Self]), delete_request(Entry) end), - - MRef = insert_request(Entry), - send(TPid, Pkt), - {TRef, MRef}; + send(TPid, _Route = {self(), Req#request.ref, Seqs}, Pkt), + {TRef, _MRef = peer_monitor(TPid, TRef)}; %% Send using a remote transport: spawn a process on the remote node %% to relay the answer. @@ -1764,16 +1727,16 @@ recv(TPid, Pid, TRef, {LocalTRef, MRef}) -> exit({timeout, LocalTRef, TPid} = T) end. -%% send/2 +%% send/3 -send(Pid, Pkt) -> %% Strip potentially large message terms. +send(Pid, Route, Pkt) -> %% Strip potentially large message terms. #diameter_packet{header = H, bin = Bin, transport_data = T} = Pkt, - Pid ! {send, #diameter_packet{header = H, - bin = Bin, - transport_data = T}}. + Pid ! {send, Route, #diameter_packet{header = H, + bin = Bin, + transport_data = T}}. %% retransmit/4 @@ -1783,8 +1746,8 @@ retransmit({TPid, Caps, App} = Req, SvcName, Timeout) -> - have_request(Pkt0, TPid) %% Don't failover to a peer we've - andalso ?THROW(timeout), %% already sent to. + undefined == get(TPid) %% Don't failover to a peer we've + orelse ?THROW(timeout), %% already sent to. Pkt = make_retransmit_packet(Pkt0), @@ -1837,51 +1800,20 @@ resend_request(Pkt0, ?LOG(retransmission, Pkt#diameter_packet.header), incr(TPid, {msg_id(Pkt, AppDict), send, retransmission}), - {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Tmo), + {TRef, MRef} = zend_requezt(TPid, Pkt, Req, SvcName, Tmo), {TRef, MRef, Req}. -%% insert_request/1 +%% peer_monitor/2 -insert_request({_Seqs, #request{transport = TPid}, TRef} = T) -> - ets:insert(?REQUEST_TABLE, T), +peer_monitor(TPid, TRef) -> case ets:lookup(?REQUEST_TABLE, TPid) of %% at peer_up/1 - [{_, Pid}] -> - monitor(process, Pid); + [{_, MPid}] -> + monitor(process, MPid); [] -> %% transport has gone down self() ! {failover, TRef}, false end. -%% lookup_request/2 -%% -%% Note the match on both the key and transport pid. The latter is -%% necessary since the same Hop-by-Hop and End-to-End identifiers are -%% reused in the case of retransmission. - -lookup_request(Msg, TPid) -> - Seqs = diameter_codec:sequence_numbers(Msg), - Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, '_'}, - [], - ['$_']}], - case ets:select(?REQUEST_TABLE, Spec) of - [{_, Req, _}] -> - Req; - [] -> - false - end. - -%% delete_request/1 - -delete_request(T) -> - ets:delete_object(?REQUEST_TABLE, T). - -%% have_request/2 - -have_request(Pkt, TPid) -> - Seqs = diameter_codec:sequence_numbers(Pkt), - Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'}, - '$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1). - %% get_destination/2 get_destination(Dict, Msg) -> diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 2ba60a65fb..f28b8f2910 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -283,7 +283,7 @@ event(Msg, ?LOG(transition, {From, To}). data(Msg, TPid, reopen, okay) -> - {recv, TPid, 'DWA', _Pkt} = Msg, %% assert + {recv, TPid, false, 'DWA', _Pkt, _NPid} = Msg, %% assert {TPid, T} = eraser(open), [T]; @@ -447,12 +447,14 @@ transition({'DOWN', _, process, TPid, _Reason} = D, end; %% Incoming message. -transition({recv, TPid, Name, PktT}, #watchdog{transport = TPid} = S) -> +transition({recv, TPid, Route, Name, Pkt, NPid}, + #watchdog{transport = TPid} + = S) -> try - incoming(Name, PktT, S) + incoming(Name, Pkt, NPid, S) catch #watchdog{dictionary = Dict0, receive_data = T} = NS -> - diameter_traffic:receive_message(TPid, PktT, Dict0, T), + diameter_traffic:receive_message(TPid, Route, Pkt, NPid, Dict0, T), NS end; @@ -582,15 +584,17 @@ send_watchdog(#watchdog{pending = false, %% Don't count encode errors since we don't expect any on DWR/DWA. -%% incoming/3 +%% incoming/4 -incoming(Name, {Pkt, NPid}, S) -> - NS = recv(Name, Pkt, S), - NPid ! {diameter, discard}, - NS; +incoming(Name, Pkt, false, S) -> + recv(Name, Pkt, S); -incoming(Name, Pkt, S) -> - recv(Name, Pkt, S). +incoming(Name, Pkt, NPid, S) -> + try + recv(Name, Pkt, S) + after + NPid ! {diameter, discard} + end. %% recv/3 -- cgit v1.2.3 From fb14eac92b8bc08293443196ed5ab3896d8fb7a1 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 3 Mar 2017 14:25:11 +0100 Subject: Avoid sending large terms between nodes unnecessarily When relaying outgoing requests through transport on a remote node, terms that were stripped when sending to the transport process weren't stripped when spawning a process on the remote node. Also, don't save the request to the process dictionary in a process that just relays an answer. --- lib/diameter/src/base/diameter_peer_fsm.erl | 5 +--- lib/diameter/src/base/diameter_traffic.erl | 43 ++++++++++++++++++++--------- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 0c24ebeb19..0df36efa16 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -454,7 +454,7 @@ transition({timeout, _}, _) -> %% Outgoing message. transition({send, Msg}, S) -> outgoing(Msg, S); -transition({send, Route, Msg}, S) -> +transition({send, Msg, Route}, S) -> put_route(Route), outgoing(Msg, S); @@ -504,9 +504,6 @@ transition({state, Pid}, #state{state = S, transport = TPid}) -> %% Map identifiers in an outgoing request to be able to lookup the %% handler process when the answer is received. -put_route(false) -> - ok; - put_route({Pid, Ref, Seqs}) -> MRef = monitor(process, Pid), put(Pid, Seqs), diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index a39e0502d1..bc1ccf4feb 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -511,7 +511,7 @@ send_A(T, TPid, {AppDict, Dict0} = DictT0, ReqPkt, EvalPktFs, EvalFs) -> {MsgDict, Pkt} = reply(T, TPid, DictT0, EvalPktFs, ReqPkt), incr(send, Pkt, TPid, AppDict), incr_rc(send, Pkt, TPid, {MsgDict, AppDict, Dict0}), %% count outgoing - send(TPid, false, Pkt), + send(TPid, Pkt), lists:foreach(fun diameter_lib:eval/1, EvalFs). %% answer/6 @@ -1683,10 +1683,13 @@ encode(_, _, #diameter_packet{} = Pkt) -> Pkt. %% zend_requezt/5 +%% +%% Strip potentially large record fields that aren't used by the +%% processes the records can be send to, possibly on a remote node. zend_requezt(TPid, Pkt, Req, SvcName, Timeout) -> put(TPid, Req), - send_request(TPid, Pkt, Req, SvcName, Timeout). + send_request(TPid, z(Pkt), Req, SvcName, Timeout). %% send_request/5 @@ -1694,22 +1697,37 @@ send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout) when node() == node(TPid) -> Seqs = diameter_codec:sequence_numbers(Bin), TRef = erlang:start_timer(Timeout, self(), TPid), - send(TPid, _Route = {self(), Req#request.ref, Seqs}, Pkt), + send(TPid, Pkt, _Route = {self(), Req#request.ref, Seqs}), {TRef, _MRef = peer_monitor(TPid, TRef)}; %% Send using a remote transport: spawn a process on the remote node %% to relay the answer. send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) -> TRef = erlang:start_timer(Timeout, self(), TPid), - T = {TPid, Pkt, Req, SvcName, Timeout, TRef}, + T = {TPid, Pkt, z(Req), SvcName, Timeout, TRef}, spawn(node(TPid), ?MODULE, send, [T]), {TRef, false}. +%% z/1 +%% +%% Avoid sending potentially large terms unnecessarily. The records +%% themselves are retained since they're sent between nodes in send/1 +%% and changing what's sent causes upgrade issues. + +z(#request{ref = Ref, handler = Pid}) -> + #request{ref = Ref, + handler = Pid}; + +z(#diameter_packet{header = H, bin = Bin, transport_data = T}) -> + #diameter_packet{header = H, + bin = Bin, + transport_data = T}. + %% send/1 send({TPid, Pkt, #request{handler = Pid} = Req0, SvcName, Timeout, TRef}) -> Req = Req0#request{handler = self()}, - recv(TPid, Pid, TRef, send_request(TPid, Pkt, Req, SvcName, Timeout)). + recv(TPid, Pid, TRef, zend_requezt(TPid, Pkt, Req, SvcName, Timeout)). %% recv/4 %% @@ -1727,16 +1745,15 @@ recv(TPid, Pid, TRef, {LocalTRef, MRef}) -> exit({timeout, LocalTRef, TPid} = T) end. +%% send/2 + +send(Pid, Pkt) -> + Pid ! {send, Pkt}. + %% send/3 -send(Pid, Route, Pkt) -> %% Strip potentially large message terms. - #diameter_packet{header = H, - bin = Bin, - transport_data = T} - = Pkt, - Pid ! {send, Route, #diameter_packet{header = H, - bin = Bin, - transport_data = T}}. +send(Pid, Pkt, Route) -> + Pid ! {send, Pkt, Route}. %% retransmit/4 -- cgit v1.2.3