diff options
Diffstat (limited to 'lib/diameter/src/base/diameter_traffic.erl')
-rw-r--r-- | lib/diameter/src/base/diameter_traffic.erl | 104 |
1 files changed, 57 insertions, 47 deletions
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index d93a3e71e3..0720057fe5 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -43,8 +43,9 @@ peer_down/1, pending/1]). -%% towards ?MODULE --export([send/1]). %% send from remote node +%% internal +-export([send/1, %% send from remote node + init/1]). %% monitor process start -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). @@ -113,26 +114,30 @@ make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) -> %% peer_up/1 %% --------------------------------------------------------------------------- -%% Insert an element that is used to detect whether or not there has -%% been a failover when inserting an outgoing request. +%% Start a process that dies with peer_down/1, on which request +%% processes can monitor. The transport process started in +%% diameter_peer_fsm could be used for this, but starting a new +%% process here is easier: don't otherwise have access to the +%% transport process here (TPid here is the diameter_peer_fsm +%% process), and the need for a process to monitor came much later +%% than the calls to peer_up/down. peer_up(TPid) -> - ets:insert(?REQUEST_TABLE, {TPid}). + proc_lib:start(?MODULE, init, [TPid]). + +init(TPid) -> + ets:insert(?REQUEST_TABLE, {TPid, self()}), + proc_lib:init_ack(self()), + proc_lib:hibernate(erlang, exit, [{shutdown, TPid}]). %% --------------------------------------------------------------------------- %% peer_down/1 %% --------------------------------------------------------------------------- peer_down(TPid) -> - ets:delete_object(?REQUEST_TABLE, {TPid}), - lists:foreach(fun failover/1, ets:lookup(?REQUEST_TABLE, TPid)). -%% Note that a request process can store its request after failover -%% notifications are sent here: insert_request/2 sends the notification -%% in that case. - -%% failover/1 - -failover({_TPid, {Pid, TRef}}) -> - Pid ! {failover, TRef}. + [{_, Pid}] = ets:lookup(?REQUEST_TABLE, TPid), + ets:delete(?REQUEST_TABLE, TPid), + Pid ! ok, %% make it die + Pid. %% --------------------------------------------------------------------------- %% incr/4 @@ -1503,15 +1508,15 @@ send_R(Pkt0, packet = Pkt0}, incr(send, Pkt, TPid, AppDict), - TRef = send_request(TPid, Pkt, Req, SvcName, Timeout), + {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Timeout), Pid ! Ref, %% tell caller a send has been attempted handle_answer(SvcName, App, - recv_A(Timeout, SvcName, App, Opts, {TRef, Req})). + recv_A(Timeout, SvcName, App, Opts, {TRef, MRef, Req})). %% recv_A/5 -recv_A(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} = Req}) -> +recv_A(Timeout, SvcName, App, Opts, {TRef, MRef, #request{ref = Ref} = Req}) -> %% Matching on TRef below ensures we ignore messages that pertain %% to a previous transport prior to failover. The answer message %% includes the #request{} since it's not necessarily Req; that @@ -1521,14 +1526,21 @@ recv_A(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} = Req}) -> {A, Rq, Dict0, Pkt}; {timeout = Reason, TRef, _} -> %% No timely reply {error, Req, Reason}; - {failover, TRef} -> %% Service says peer has gone down - retransmit(pick_peer(SvcName, App, Req, Opts), - Req, - Opts, - SvcName, - Timeout) + {'DOWN', MRef, process, _, _} when false /= MRef -> %% local peer_down + failover(SvcName, App, Req, Opts, Timeout); + {failover, TRef} -> %% local or remote peer_down + failover(SvcName, App, Req, Opts, Timeout) end. +%% failover/5 + +failover(SvcName, App, Req, Opts, Timeout) -> + retransmit(pick_peer(SvcName, App, Req, Opts), + Req, + Opts, + SvcName, + Timeout). + %% handle_answer/3 handle_answer(SvcName, App, {error, Req, Reason}) -> @@ -1711,15 +1723,16 @@ send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout) when node() == node(TPid) -> Seqs = diameter_codec:sequence_numbers(Bin), TRef = erlang:start_timer(Timeout, self(), TPid), - Entry = {Seqs, #request{handler = Pid} = Req, TRef}, + Entry = {Seqs, Req, TRef}, %% Ensure that request table is cleaned even if the process is %% killed. - spawn(fun() -> diameter_lib:wait([Pid]), delete_request(Entry) end), + Self = self(), + spawn(fun() -> diameter_lib:wait([Self]), delete_request(Entry) end), - insert_request(Entry), + MRef = insert_request(Entry), send(TPid, Pkt), - TRef; + {TRef, MRef}; %% Send using a remote transport: spawn a process on the remote node %% to relay the answer. @@ -1727,7 +1740,7 @@ send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) -> TRef = erlang:start_timer(Timeout, self(), TPid), T = {TPid, Pkt, Req, SvcName, Timeout, TRef}, spawn(node(TPid), ?MODULE, send, [T]), - TRef. + {TRef, false}. %% send/1 @@ -1739,10 +1752,12 @@ send({TPid, Pkt, #request{handler = Pid} = Req0, SvcName, Timeout, TRef}) -> %% %% Relay an answer from a remote node. -recv(TPid, Pid, TRef, LocalTRef) -> +recv(TPid, Pid, TRef, {LocalTRef, MRef}) -> receive {answer, _, _, _, _} = A -> Pid ! A; + {'DOWN', MRef, process, _, _} -> + Pid ! {failover, TRef}; {failover = T, LocalTRef} -> Pid ! {T, TRef}; T -> @@ -1822,24 +1837,20 @@ resend_request(Pkt0, ?LOG(retransmission, Pkt#diameter_packet.header), incr(TPid, {msg_id(Pkt, AppDict), send, retransmission}), - TRef = send_request(TPid, Pkt, Req, SvcName, Tmo), - {TRef, Req}. + {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Tmo), + {TRef, MRef, Req}. %% insert_request/1 insert_request({_Seqs, #request{transport = TPid}, TRef} = T) -> - ets:insert(?REQUEST_TABLE, [T, {TPid, {self(), TRef}}]), - is_peer_up(TPid) - orelse (self() ! {failover, TRef}). %% failover/1 may have missed - -%% is_peer_up/1 -%% -%% Is the entry written by peer_up/1 and deleted by peer_down/1 still -%% in the request table? - -is_peer_up(TPid) -> - Spec = [{{TPid}, [], ['$_']}], - '$end_of_table' /= ets:select(?REQUEST_TABLE, Spec, 1). + ets:insert(?REQUEST_TABLE, T), + case ets:lookup(?REQUEST_TABLE, TPid) of %% at peer_up/1 + [{_, Pid}] -> + monitor(process, Pid); + [] -> %% transport has gone down + self() ! {failover, TRef}, + false + end. %% lookup_request/2 %% @@ -1861,9 +1872,8 @@ lookup_request(Msg, TPid) -> %% delete_request/1 -delete_request({_Seqs, #request{handler = Pid, transport = TPid}, TRef} = T) -> - Spec = [{R, [], [true]} || R <- [T, {TPid, {Pid, TRef}}]], - ets:select_delete(?REQUEST_TABLE, Spec). +delete_request(T) -> + ets:delete_object(?REQUEST_TABLE, T). %% have_request/2 |