aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_traffic.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base/diameter_traffic.erl')
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl104
1 files changed, 57 insertions, 47 deletions
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index d93a3e71e3..0720057fe5 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -43,8 +43,9 @@
peer_down/1,
pending/1]).
-%% towards ?MODULE
--export([send/1]). %% send from remote node
+%% internal
+-export([send/1, %% send from remote node
+ init/1]). %% monitor process start
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
@@ -113,26 +114,30 @@ make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) ->
%% peer_up/1
%% ---------------------------------------------------------------------------
-%% Insert an element that is used to detect whether or not there has
-%% been a failover when inserting an outgoing request.
+%% Start a process that dies with peer_down/1, on which request
+%% processes can monitor. The transport process started in
+%% diameter_peer_fsm could be used for this, but starting a new
+%% process here is easier: don't otherwise have access to the
+%% transport process here (TPid here is the diameter_peer_fsm
+%% process), and the need for a process to monitor came much later
+%% than the calls to peer_up/down.
peer_up(TPid) ->
- ets:insert(?REQUEST_TABLE, {TPid}).
+ proc_lib:start(?MODULE, init, [TPid]).
+
+init(TPid) ->
+ ets:insert(?REQUEST_TABLE, {TPid, self()}),
+ proc_lib:init_ack(self()),
+ proc_lib:hibernate(erlang, exit, [{shutdown, TPid}]).
%% ---------------------------------------------------------------------------
%% peer_down/1
%% ---------------------------------------------------------------------------
peer_down(TPid) ->
- ets:delete_object(?REQUEST_TABLE, {TPid}),
- lists:foreach(fun failover/1, ets:lookup(?REQUEST_TABLE, TPid)).
-%% Note that a request process can store its request after failover
-%% notifications are sent here: insert_request/2 sends the notification
-%% in that case.
-
-%% failover/1
-
-failover({_TPid, {Pid, TRef}}) ->
- Pid ! {failover, TRef}.
+ [{_, Pid}] = ets:lookup(?REQUEST_TABLE, TPid),
+ ets:delete(?REQUEST_TABLE, TPid),
+ Pid ! ok, %% make it die
+ Pid.
%% ---------------------------------------------------------------------------
%% incr/4
@@ -1503,15 +1508,15 @@ send_R(Pkt0,
packet = Pkt0},
incr(send, Pkt, TPid, AppDict),
- TRef = send_request(TPid, Pkt, Req, SvcName, Timeout),
+ {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Timeout),
Pid ! Ref, %% tell caller a send has been attempted
handle_answer(SvcName,
App,
- recv_A(Timeout, SvcName, App, Opts, {TRef, Req})).
+ recv_A(Timeout, SvcName, App, Opts, {TRef, MRef, Req})).
%% recv_A/5
-recv_A(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} = Req}) ->
+recv_A(Timeout, SvcName, App, Opts, {TRef, MRef, #request{ref = Ref} = Req}) ->
%% Matching on TRef below ensures we ignore messages that pertain
%% to a previous transport prior to failover. The answer message
%% includes the #request{} since it's not necessarily Req; that
@@ -1521,14 +1526,21 @@ recv_A(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} = Req}) ->
{A, Rq, Dict0, Pkt};
{timeout = Reason, TRef, _} -> %% No timely reply
{error, Req, Reason};
- {failover, TRef} -> %% Service says peer has gone down
- retransmit(pick_peer(SvcName, App, Req, Opts),
- Req,
- Opts,
- SvcName,
- Timeout)
+ {'DOWN', MRef, process, _, _} when false /= MRef -> %% local peer_down
+ failover(SvcName, App, Req, Opts, Timeout);
+ {failover, TRef} -> %% local or remote peer_down
+ failover(SvcName, App, Req, Opts, Timeout)
end.
+%% failover/5
+
+failover(SvcName, App, Req, Opts, Timeout) ->
+ retransmit(pick_peer(SvcName, App, Req, Opts),
+ Req,
+ Opts,
+ SvcName,
+ Timeout).
+
%% handle_answer/3
handle_answer(SvcName, App, {error, Req, Reason}) ->
@@ -1711,15 +1723,16 @@ send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout)
when node() == node(TPid) ->
Seqs = diameter_codec:sequence_numbers(Bin),
TRef = erlang:start_timer(Timeout, self(), TPid),
- Entry = {Seqs, #request{handler = Pid} = Req, TRef},
+ Entry = {Seqs, Req, TRef},
%% Ensure that request table is cleaned even if the process is
%% killed.
- spawn(fun() -> diameter_lib:wait([Pid]), delete_request(Entry) end),
+ Self = self(),
+ spawn(fun() -> diameter_lib:wait([Self]), delete_request(Entry) end),
- insert_request(Entry),
+ MRef = insert_request(Entry),
send(TPid, Pkt),
- TRef;
+ {TRef, MRef};
%% Send using a remote transport: spawn a process on the remote node
%% to relay the answer.
@@ -1727,7 +1740,7 @@ send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) ->
TRef = erlang:start_timer(Timeout, self(), TPid),
T = {TPid, Pkt, Req, SvcName, Timeout, TRef},
spawn(node(TPid), ?MODULE, send, [T]),
- TRef.
+ {TRef, false}.
%% send/1
@@ -1739,10 +1752,12 @@ send({TPid, Pkt, #request{handler = Pid} = Req0, SvcName, Timeout, TRef}) ->
%%
%% Relay an answer from a remote node.
-recv(TPid, Pid, TRef, LocalTRef) ->
+recv(TPid, Pid, TRef, {LocalTRef, MRef}) ->
receive
{answer, _, _, _, _} = A ->
Pid ! A;
+ {'DOWN', MRef, process, _, _} ->
+ Pid ! {failover, TRef};
{failover = T, LocalTRef} ->
Pid ! {T, TRef};
T ->
@@ -1822,24 +1837,20 @@ resend_request(Pkt0,
?LOG(retransmission, Pkt#diameter_packet.header),
incr(TPid, {msg_id(Pkt, AppDict), send, retransmission}),
- TRef = send_request(TPid, Pkt, Req, SvcName, Tmo),
- {TRef, Req}.
+ {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Tmo),
+ {TRef, MRef, Req}.
%% insert_request/1
insert_request({_Seqs, #request{transport = TPid}, TRef} = T) ->
- ets:insert(?REQUEST_TABLE, [T, {TPid, {self(), TRef}}]),
- is_peer_up(TPid)
- orelse (self() ! {failover, TRef}). %% failover/1 may have missed
-
-%% is_peer_up/1
-%%
-%% Is the entry written by peer_up/1 and deleted by peer_down/1 still
-%% in the request table?
-
-is_peer_up(TPid) ->
- Spec = [{{TPid}, [], ['$_']}],
- '$end_of_table' /= ets:select(?REQUEST_TABLE, Spec, 1).
+ ets:insert(?REQUEST_TABLE, T),
+ case ets:lookup(?REQUEST_TABLE, TPid) of %% at peer_up/1
+ [{_, Pid}] ->
+ monitor(process, Pid);
+ [] -> %% transport has gone down
+ self() ! {failover, TRef},
+ false
+ end.
%% lookup_request/2
%%
@@ -1861,9 +1872,8 @@ lookup_request(Msg, TPid) ->
%% delete_request/1
-delete_request({_Seqs, #request{handler = Pid, transport = TPid}, TRef} = T) ->
- Spec = [{R, [], [true]} || R <- [T, {TPid, {Pid, TRef}}]],
- ets:select_delete(?REQUEST_TABLE, Spec).
+delete_request(T) ->
+ ets:delete_object(?REQUEST_TABLE, T).
%% have_request/2