aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_traffic.erl
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-02-06 15:49:44 +0100
committerAnders Svensson <[email protected]>2017-02-24 10:45:45 +0100
commita5bc8a5911613ec9ddfef9984ee59a24110c8b2b (patch)
tree6343e6f55482aa094e4b34dfd335bc7667b2f8af /lib/diameter/src/base/diameter_traffic.erl
parent3473ecd83a7bbe7e0bebb865f25dddb93e3bf10f (diff)
downloadotp-a5bc8a5911613ec9ddfef9984ee59a24110c8b2b.tar.gz
otp-a5bc8a5911613ec9ddfef9984ee59a24110c8b2b.tar.bz2
otp-a5bc8a5911613ec9ddfef9984ee59a24110c8b2b.zip
Fix/redo failover optimization
Commit 9a878743 addressed inefficiency at failover, but introduced inefficiency in the sending of outgoing requests in so doing: each outgoing request added an request table entry keyed on a transport pid, then looked for a specific element with this key, and then (later) removed the inserted element. Since the request table is a bag, this results in linear searches over a potentially long list of element keyed on the same pid. The higher the rate of outgoing calls, the more costly it becomes. Instead of writing entries to the request table, the peer_up/down calls to diameter_traffic that mirror transitions to and from the OKAY state in the RFC 3539 watchdog state machine now result in a process for request processes to monitor in order to detect failover.
Diffstat (limited to 'lib/diameter/src/base/diameter_traffic.erl')
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl104
1 files changed, 57 insertions, 47 deletions
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index d93a3e71e3..0720057fe5 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -43,8 +43,9 @@
peer_down/1,
pending/1]).
-%% towards ?MODULE
--export([send/1]). %% send from remote node
+%% internal
+-export([send/1, %% send from remote node
+ init/1]). %% monitor process start
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
@@ -113,26 +114,30 @@ make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) ->
%% peer_up/1
%% ---------------------------------------------------------------------------
-%% Insert an element that is used to detect whether or not there has
-%% been a failover when inserting an outgoing request.
+%% Start a process that dies with peer_down/1, on which request
+%% processes can monitor. The transport process started in
+%% diameter_peer_fsm could be used for this, but starting a new
+%% process here is easier: don't otherwise have access to the
+%% transport process here (TPid here is the diameter_peer_fsm
+%% process), and the need for a process to monitor came much later
+%% than the calls to peer_up/down.
peer_up(TPid) ->
- ets:insert(?REQUEST_TABLE, {TPid}).
+ proc_lib:start(?MODULE, init, [TPid]).
+
+init(TPid) ->
+ ets:insert(?REQUEST_TABLE, {TPid, self()}),
+ proc_lib:init_ack(self()),
+ proc_lib:hibernate(erlang, exit, [{shutdown, TPid}]).
%% ---------------------------------------------------------------------------
%% peer_down/1
%% ---------------------------------------------------------------------------
peer_down(TPid) ->
- ets:delete_object(?REQUEST_TABLE, {TPid}),
- lists:foreach(fun failover/1, ets:lookup(?REQUEST_TABLE, TPid)).
-%% Note that a request process can store its request after failover
-%% notifications are sent here: insert_request/2 sends the notification
-%% in that case.
-
-%% failover/1
-
-failover({_TPid, {Pid, TRef}}) ->
- Pid ! {failover, TRef}.
+ [{_, Pid}] = ets:lookup(?REQUEST_TABLE, TPid),
+ ets:delete(?REQUEST_TABLE, TPid),
+ Pid ! ok, %% make it die
+ Pid.
%% ---------------------------------------------------------------------------
%% incr/4
@@ -1503,15 +1508,15 @@ send_R(Pkt0,
packet = Pkt0},
incr(send, Pkt, TPid, AppDict),
- TRef = send_request(TPid, Pkt, Req, SvcName, Timeout),
+ {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Timeout),
Pid ! Ref, %% tell caller a send has been attempted
handle_answer(SvcName,
App,
- recv_A(Timeout, SvcName, App, Opts, {TRef, Req})).
+ recv_A(Timeout, SvcName, App, Opts, {TRef, MRef, Req})).
%% recv_A/5
-recv_A(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} = Req}) ->
+recv_A(Timeout, SvcName, App, Opts, {TRef, MRef, #request{ref = Ref} = Req}) ->
%% Matching on TRef below ensures we ignore messages that pertain
%% to a previous transport prior to failover. The answer message
%% includes the #request{} since it's not necessarily Req; that
@@ -1521,14 +1526,21 @@ recv_A(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} = Req}) ->
{A, Rq, Dict0, Pkt};
{timeout = Reason, TRef, _} -> %% No timely reply
{error, Req, Reason};
- {failover, TRef} -> %% Service says peer has gone down
- retransmit(pick_peer(SvcName, App, Req, Opts),
- Req,
- Opts,
- SvcName,
- Timeout)
+ {'DOWN', MRef, process, _, _} when false /= MRef -> %% local peer_down
+ failover(SvcName, App, Req, Opts, Timeout);
+ {failover, TRef} -> %% local or remote peer_down
+ failover(SvcName, App, Req, Opts, Timeout)
end.
+%% failover/5
+
+failover(SvcName, App, Req, Opts, Timeout) ->
+ retransmit(pick_peer(SvcName, App, Req, Opts),
+ Req,
+ Opts,
+ SvcName,
+ Timeout).
+
%% handle_answer/3
handle_answer(SvcName, App, {error, Req, Reason}) ->
@@ -1711,15 +1723,16 @@ send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout)
when node() == node(TPid) ->
Seqs = diameter_codec:sequence_numbers(Bin),
TRef = erlang:start_timer(Timeout, self(), TPid),
- Entry = {Seqs, #request{handler = Pid} = Req, TRef},
+ Entry = {Seqs, Req, TRef},
%% Ensure that request table is cleaned even if the process is
%% killed.
- spawn(fun() -> diameter_lib:wait([Pid]), delete_request(Entry) end),
+ Self = self(),
+ spawn(fun() -> diameter_lib:wait([Self]), delete_request(Entry) end),
- insert_request(Entry),
+ MRef = insert_request(Entry),
send(TPid, Pkt),
- TRef;
+ {TRef, MRef};
%% Send using a remote transport: spawn a process on the remote node
%% to relay the answer.
@@ -1727,7 +1740,7 @@ send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) ->
TRef = erlang:start_timer(Timeout, self(), TPid),
T = {TPid, Pkt, Req, SvcName, Timeout, TRef},
spawn(node(TPid), ?MODULE, send, [T]),
- TRef.
+ {TRef, false}.
%% send/1
@@ -1739,10 +1752,12 @@ send({TPid, Pkt, #request{handler = Pid} = Req0, SvcName, Timeout, TRef}) ->
%%
%% Relay an answer from a remote node.
-recv(TPid, Pid, TRef, LocalTRef) ->
+recv(TPid, Pid, TRef, {LocalTRef, MRef}) ->
receive
{answer, _, _, _, _} = A ->
Pid ! A;
+ {'DOWN', MRef, process, _, _} ->
+ Pid ! {failover, TRef};
{failover = T, LocalTRef} ->
Pid ! {T, TRef};
T ->
@@ -1822,24 +1837,20 @@ resend_request(Pkt0,
?LOG(retransmission, Pkt#diameter_packet.header),
incr(TPid, {msg_id(Pkt, AppDict), send, retransmission}),
- TRef = send_request(TPid, Pkt, Req, SvcName, Tmo),
- {TRef, Req}.
+ {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Tmo),
+ {TRef, MRef, Req}.
%% insert_request/1
insert_request({_Seqs, #request{transport = TPid}, TRef} = T) ->
- ets:insert(?REQUEST_TABLE, [T, {TPid, {self(), TRef}}]),
- is_peer_up(TPid)
- orelse (self() ! {failover, TRef}). %% failover/1 may have missed
-
-%% is_peer_up/1
-%%
-%% Is the entry written by peer_up/1 and deleted by peer_down/1 still
-%% in the request table?
-
-is_peer_up(TPid) ->
- Spec = [{{TPid}, [], ['$_']}],
- '$end_of_table' /= ets:select(?REQUEST_TABLE, Spec, 1).
+ ets:insert(?REQUEST_TABLE, T),
+ case ets:lookup(?REQUEST_TABLE, TPid) of %% at peer_up/1
+ [{_, Pid}] ->
+ monitor(process, Pid);
+ [] -> %% transport has gone down
+ self() ! {failover, TRef},
+ false
+ end.
%% lookup_request/2
%%
@@ -1861,9 +1872,8 @@ lookup_request(Msg, TPid) ->
%% delete_request/1
-delete_request({_Seqs, #request{handler = Pid, transport = TPid}, TRef} = T) ->
- Spec = [{R, [], [true]} || R <- [T, {TPid, {Pid, TRef}}]],
- ets:select_delete(?REQUEST_TABLE, Spec).
+delete_request(T) ->
+ ets:delete_object(?REQUEST_TABLE, T).
%% have_request/2