diff options
author | Anders Svensson <[email protected]> | 2017-02-06 15:49:44 +0100 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2017-02-24 10:45:45 +0100 |
commit | a5bc8a5911613ec9ddfef9984ee59a24110c8b2b (patch) | |
tree | 6343e6f55482aa094e4b34dfd335bc7667b2f8af /lib | |
parent | 3473ecd83a7bbe7e0bebb865f25dddb93e3bf10f (diff) | |
download | otp-a5bc8a5911613ec9ddfef9984ee59a24110c8b2b.tar.gz otp-a5bc8a5911613ec9ddfef9984ee59a24110c8b2b.tar.bz2 otp-a5bc8a5911613ec9ddfef9984ee59a24110c8b2b.zip |
Fix/redo failover optimization
Commit 9a878743 addressed inefficiency at failover, but introduced
inefficiency in the sending of outgoing requests in so doing: each
outgoing request added an request table entry keyed on a transport pid,
then looked for a specific element with this key, and then (later)
removed the inserted element. Since the request table is a bag, this
results in linear searches over a potentially long list of element
keyed on the same pid. The higher the rate of outgoing calls, the more
costly it becomes.
Instead of writing entries to the request table, the peer_up/down calls
to diameter_traffic that mirror transitions to and from the OKAY state
in the RFC 3539 watchdog state machine now result in a process for
request processes to monitor in order to detect failover.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/diameter/src/base/diameter_traffic.erl | 104 |
1 files changed, 57 insertions, 47 deletions
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index d93a3e71e3..0720057fe5 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -43,8 +43,9 @@ peer_down/1, pending/1]). -%% towards ?MODULE --export([send/1]). %% send from remote node +%% internal +-export([send/1, %% send from remote node + init/1]). %% monitor process start -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). @@ -113,26 +114,30 @@ make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) -> %% peer_up/1 %% --------------------------------------------------------------------------- -%% Insert an element that is used to detect whether or not there has -%% been a failover when inserting an outgoing request. +%% Start a process that dies with peer_down/1, on which request +%% processes can monitor. The transport process started in +%% diameter_peer_fsm could be used for this, but starting a new +%% process here is easier: don't otherwise have access to the +%% transport process here (TPid here is the diameter_peer_fsm +%% process), and the need for a process to monitor came much later +%% than the calls to peer_up/down. peer_up(TPid) -> - ets:insert(?REQUEST_TABLE, {TPid}). + proc_lib:start(?MODULE, init, [TPid]). + +init(TPid) -> + ets:insert(?REQUEST_TABLE, {TPid, self()}), + proc_lib:init_ack(self()), + proc_lib:hibernate(erlang, exit, [{shutdown, TPid}]). %% --------------------------------------------------------------------------- %% peer_down/1 %% --------------------------------------------------------------------------- peer_down(TPid) -> - ets:delete_object(?REQUEST_TABLE, {TPid}), - lists:foreach(fun failover/1, ets:lookup(?REQUEST_TABLE, TPid)). -%% Note that a request process can store its request after failover -%% notifications are sent here: insert_request/2 sends the notification -%% in that case. - -%% failover/1 - -failover({_TPid, {Pid, TRef}}) -> - Pid ! {failover, TRef}. + [{_, Pid}] = ets:lookup(?REQUEST_TABLE, TPid), + ets:delete(?REQUEST_TABLE, TPid), + Pid ! ok, %% make it die + Pid. %% --------------------------------------------------------------------------- %% incr/4 @@ -1503,15 +1508,15 @@ send_R(Pkt0, packet = Pkt0}, incr(send, Pkt, TPid, AppDict), - TRef = send_request(TPid, Pkt, Req, SvcName, Timeout), + {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Timeout), Pid ! Ref, %% tell caller a send has been attempted handle_answer(SvcName, App, - recv_A(Timeout, SvcName, App, Opts, {TRef, Req})). + recv_A(Timeout, SvcName, App, Opts, {TRef, MRef, Req})). %% recv_A/5 -recv_A(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} = Req}) -> +recv_A(Timeout, SvcName, App, Opts, {TRef, MRef, #request{ref = Ref} = Req}) -> %% Matching on TRef below ensures we ignore messages that pertain %% to a previous transport prior to failover. The answer message %% includes the #request{} since it's not necessarily Req; that @@ -1521,14 +1526,21 @@ recv_A(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} = Req}) -> {A, Rq, Dict0, Pkt}; {timeout = Reason, TRef, _} -> %% No timely reply {error, Req, Reason}; - {failover, TRef} -> %% Service says peer has gone down - retransmit(pick_peer(SvcName, App, Req, Opts), - Req, - Opts, - SvcName, - Timeout) + {'DOWN', MRef, process, _, _} when false /= MRef -> %% local peer_down + failover(SvcName, App, Req, Opts, Timeout); + {failover, TRef} -> %% local or remote peer_down + failover(SvcName, App, Req, Opts, Timeout) end. +%% failover/5 + +failover(SvcName, App, Req, Opts, Timeout) -> + retransmit(pick_peer(SvcName, App, Req, Opts), + Req, + Opts, + SvcName, + Timeout). + %% handle_answer/3 handle_answer(SvcName, App, {error, Req, Reason}) -> @@ -1711,15 +1723,16 @@ send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout) when node() == node(TPid) -> Seqs = diameter_codec:sequence_numbers(Bin), TRef = erlang:start_timer(Timeout, self(), TPid), - Entry = {Seqs, #request{handler = Pid} = Req, TRef}, + Entry = {Seqs, Req, TRef}, %% Ensure that request table is cleaned even if the process is %% killed. - spawn(fun() -> diameter_lib:wait([Pid]), delete_request(Entry) end), + Self = self(), + spawn(fun() -> diameter_lib:wait([Self]), delete_request(Entry) end), - insert_request(Entry), + MRef = insert_request(Entry), send(TPid, Pkt), - TRef; + {TRef, MRef}; %% Send using a remote transport: spawn a process on the remote node %% to relay the answer. @@ -1727,7 +1740,7 @@ send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) -> TRef = erlang:start_timer(Timeout, self(), TPid), T = {TPid, Pkt, Req, SvcName, Timeout, TRef}, spawn(node(TPid), ?MODULE, send, [T]), - TRef. + {TRef, false}. %% send/1 @@ -1739,10 +1752,12 @@ send({TPid, Pkt, #request{handler = Pid} = Req0, SvcName, Timeout, TRef}) -> %% %% Relay an answer from a remote node. -recv(TPid, Pid, TRef, LocalTRef) -> +recv(TPid, Pid, TRef, {LocalTRef, MRef}) -> receive {answer, _, _, _, _} = A -> Pid ! A; + {'DOWN', MRef, process, _, _} -> + Pid ! {failover, TRef}; {failover = T, LocalTRef} -> Pid ! {T, TRef}; T -> @@ -1822,24 +1837,20 @@ resend_request(Pkt0, ?LOG(retransmission, Pkt#diameter_packet.header), incr(TPid, {msg_id(Pkt, AppDict), send, retransmission}), - TRef = send_request(TPid, Pkt, Req, SvcName, Tmo), - {TRef, Req}. + {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Tmo), + {TRef, MRef, Req}. %% insert_request/1 insert_request({_Seqs, #request{transport = TPid}, TRef} = T) -> - ets:insert(?REQUEST_TABLE, [T, {TPid, {self(), TRef}}]), - is_peer_up(TPid) - orelse (self() ! {failover, TRef}). %% failover/1 may have missed - -%% is_peer_up/1 -%% -%% Is the entry written by peer_up/1 and deleted by peer_down/1 still -%% in the request table? - -is_peer_up(TPid) -> - Spec = [{{TPid}, [], ['$_']}], - '$end_of_table' /= ets:select(?REQUEST_TABLE, Spec, 1). + ets:insert(?REQUEST_TABLE, T), + case ets:lookup(?REQUEST_TABLE, TPid) of %% at peer_up/1 + [{_, Pid}] -> + monitor(process, Pid); + [] -> %% transport has gone down + self() ! {failover, TRef}, + false + end. %% lookup_request/2 %% @@ -1861,9 +1872,8 @@ lookup_request(Msg, TPid) -> %% delete_request/1 -delete_request({_Seqs, #request{handler = Pid, transport = TPid}, TRef} = T) -> - Spec = [{R, [], [true]} || R <- [T, {TPid, {Pid, TRef}}]], - ets:select_delete(?REQUEST_TABLE, Spec). +delete_request(T) -> + ets:delete_object(?REQUEST_TABLE, T). %% have_request/2 |