aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/src/base/diameter_service.erl216
1 files changed, 123 insertions, 93 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index 6e7adb1be2..d5a09338cd 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -334,14 +334,14 @@ call_rc(_, _, Sent) ->
call(SvcName, App, Msg, Opts, Caller) ->
c(find_state(SvcName), App, Msg, Opts, Caller).
-c(#state{service_name = Svc, options = [{_, Mask} | _]} = S,
+c(#state{service_name = SvcName, options = [{_, Mask} | _]} = S,
App,
Msg,
Opts,
Caller) ->
case find_transport(App, Msg, Opts, S) of
{_,_,_} = T ->
- send_request(T, Mask, Msg, Opts, Caller, Svc);
+ send_request(T, Mask, Msg, Opts, Caller, SvcName);
false ->
{error, no_connection};
{error, _} = No ->
@@ -587,8 +587,8 @@ transition({tc_timeout, T}, S) ->
%% Request process is telling us it may have missed a failover message
%% after a transport went down and the service process looked up
%% outstanding requests.
-transition({failover, TRef, Seqs}, S) ->
- failover(TRef, Seqs, S),
+transition({failover, TRef, Seqs}, _) ->
+ failover(TRef, Seqs),
ok;
transition(Req, S) ->
@@ -1064,7 +1064,7 @@ connection_down(#watchdog{state = ?WD_OKAY,
= S) ->
report_status(down, Wd, Pr, S, []),
remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
- request_peer_down(TPid, S);
+ request_peer_down(TPid);
connection_down(#watchdog{}, #peer{}, _) ->
ok;
@@ -1302,7 +1302,13 @@ cm([_,_|_], _, _, _) ->
%% The mod field of the #diameter_app{} here includes any extra
%% arguments passed to diameter:call/2.
-send_request({TPid, Caps, App} = T, Mask, Msg, Opts, Caller, SvcName) ->
+send_request({TPid, Caps, App}
+ = Transport,
+ Mask,
+ Msg,
+ Opts,
+ Caller,
+ SvcName) ->
#diameter_app{module = ModX}
= App,
@@ -1310,14 +1316,19 @@ send_request({TPid, Caps, App} = T, Mask, Msg, Opts, Caller, SvcName) ->
send_req(cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]),
Pkt,
- T,
+ Transport,
Opts,
Caller,
SvcName,
[]).
-send_req({send, P}, Pkt, T, Opts, Caller, SvcName, Fs) ->
- send_req(make_request_packet(P, Pkt), T, Opts, Caller, SvcName, Fs);
+send_req({send, Msg}, Pkt, Transport, Opts, Caller, SvcName, Fs) ->
+ send_req(make_request_packet(Msg, Pkt),
+ Transport,
+ Opts,
+ Caller,
+ SvcName,
+ Fs);
send_req({discard, Reason} , _, _, _, _, _, _) ->
{error, Reason};
@@ -1446,11 +1457,11 @@ send_req(Pkt0, {TPid, Caps, App}, Opts, Caller, SvcName, Fs) ->
module = ModX},
try
- TRef = send_request(TPid, Pkt, Req, Timeout),
+ TRef = send_request(TPid, Pkt, Req, SvcName, Timeout),
ack(Caller),
handle_answer(SvcName, AE, recv_answer(Timeout, SvcName, {TRef, Req}))
after
- erase_request(Pkt)
+ erase_requests(Pkt)
end.
%% Tell caller a send has been attempted.
@@ -1459,41 +1470,39 @@ ack({Pid, Ref}) ->
%% recv_answer/3
-recv_answer(Timeout,
- SvcName,
- {TRef, #request{from = {_, Ref}, packet = RPkt} = Req}
- = T) ->
-
+recv_answer(Timeout, SvcName, {TRef, #request{from = {_, Ref}} = Req}) ->
%% Matching on TRef below ensures we ignore messages that pertain
%% to a previous transport prior to failover. The answer message
%% includes the #request{} since it's not necessarily Req; that
%% is, from the last peer to which we've transmitted.
-
receive
{answer = A, Ref, Rq, Dict0, Pkt} -> %% Answer from peer
{A, Rq, Dict0, Pkt};
{timeout = Reason, TRef, _} -> %% No timely reply
{error, Req, Reason};
- {failover = Reason, TRef, false} -> %% No alternate peer
- {error, Req, Reason};
- {failover, TRef, Transport} -> %% Resend to alternate peer
- try_retransmit(Timeout, SvcName, Req, Transport);
- {failover, TRef} -> %% May have missed failover notification
- Seqs = diameter_codec:sequence_numbers(RPkt),
- Pid = whois(SvcName),
- is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}),
- recv_answer(Timeout, SvcName, T)
+ {failover, TRef} -> %% Service says peer has gone down
+ retransmit(Req, find_state(SvcName), Timeout)
end.
+
%% Note that failover starts a new timer and that expiry of an old
%% timer value is ignored. This means that an answer could be accepted
%% from a peer after timeout in the case of failover.
-try_retransmit(Timeout, SvcName, Req, Transport) ->
+retransmit(Req, #state{service_name = SvcName} = S, Timeout) ->
+ rt(find_transport(Req, S), Req, SvcName, Timeout);
+
+retransmit(Req, false, _) -> %% service has gone down
+ {error, Req, failover}.
+
+rt({_,_,_} = Transport, Req, SvcName, Timeout) ->
try retransmit(Transport, Req, SvcName, Timeout) of
T -> recv_answer(Timeout, SvcName, T)
catch
?FAILURE(Reason) -> {error, Req, Reason}
- end.
+ end;
+
+rt(_, Req, _, _) -> %% no alternate peer
+ {error, Req, failover}.
%% handle_error/3
@@ -1532,28 +1541,32 @@ encode(Dict, #diameter_packet{bin = undefined} = Pkt) ->
encode(_, #diameter_packet{} = Pkt) ->
Pkt.
-%% send_request/4
+%% send_request/5
-send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, Timeout)
+send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, SvcName, Timeout)
when node() == node(TPid) ->
%% Store the outgoing request before sending to avoid a race with
%% reply reception.
- TRef = store_request(TPid, Bin, Req, Timeout),
+ TRef = store_request(TPid, Bin, Req, Timeout, SvcName),
send(TPid, Pkt),
TRef;
%% Send using a remote transport: spawn a process on the remote node
%% to relay the answer.
-send_request(TPid, #diameter_packet{} = Pkt, Req, Timeout) ->
+send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) ->
TRef = erlang:start_timer(Timeout, self(), timeout),
- T = {TPid, Pkt, Req, Timeout, TRef},
+ T = {TPid, Pkt, Req, SvcName, Timeout, TRef},
spawn(node(TPid), ?MODULE, send, [T]),
TRef.
%% send/1
-send({TPid, Pkt, #request{handler = Pid} = Req, Timeout, TRef}) ->
- Ref = send_request(TPid, Pkt, Req#request{handler = self()}, Timeout),
+send({TPid, Pkt, #request{handler = Pid} = Req, SvcName, Timeout, TRef}) ->
+ Ref = send_request(TPid,
+ Pkt,
+ Req#request{handler = self()},
+ SvcName,
+ Timeout),
Pid ! reref(receive T -> T end, Ref, TRef).
reref({T, Ref, R}, Ref, TRef) ->
@@ -1568,62 +1581,84 @@ send(Pid, Pkt) ->
%% retransmit/4
-retransmit({TPid, Caps, #diameter_app{alias = Alias} = App} = T,
+retransmit({TPid, Caps, #diameter_app{alias = Alias} = App}
+ = Transport,
#request{app = Alias, packet = Pkt0}
= Req,
SvcName,
Timeout) ->
- have_request(Pkt0, TPid) %% Don't failover to a peer we've
+ have_request(Pkt0, TPid) %% Don't failover to a peer we've
andalso ?THROW(timeout), %% already sent to.
#diameter_packet{header = Hdr0} = Pkt0,
Hdr = Hdr0#diameter_header{is_retransmitted = true},
Pkt = Pkt0#diameter_packet{header = Hdr},
- resend_req(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]),
- T,
+ retransmit(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]),
+ Transport,
Req#request{packet = Pkt},
+ SvcName,
Timeout,
[]).
-resend_req({send, P}, T, #request{packet = Pkt} = Req, Timeout, Fs) ->
- retransmit(make_request_packet(P, Pkt), T, Req, Timeout, Fs);
-
-resend_req({discard, Reason}, _, _, _, _) ->
+retransmit({send, Msg},
+ Transport,
+ #request{packet = Pkt}
+ = Req,
+ SvcName,
+ Timeout,
+ Fs) ->
+ resend_request(make_request_packet(Msg, Pkt),
+ Transport,
+ Req,
+ SvcName,
+ Timeout,
+ Fs);
+
+retransmit({discard, Reason}, _, _, _, _, _) ->
?THROW(Reason);
-resend_req(discard, _, _, _, _) ->
+retransmit(discard, _, _, _, _, _) ->
?THROW(discarded);
-resend_req({eval_packet, RC, F}, T, Req, Timeout, Fs) ->
- resend_req(RC, T, Req, Timeout, [F|Fs]);
+retransmit({eval_packet, RC, F}, Transport, Req, SvcName, Timeout, Fs) ->
+ retransmit(RC, Transport, Req, SvcName, Timeout, [F|Fs]);
-resend_req(T, {_, _, App}, _, _, _) ->
+retransmit(T, {_, _, App}, _, _, _, _) ->
?ERROR({invalid_return, prepare_retransmit, App, T}).
-%% retransmit/6
-
-retransmit(Pkt0, {TPid, Caps, _}, #request{dictionary = D} = Req0, Tmo, Fs) ->
- Pkt = encode(D, Pkt0, Fs),
+resend_request(Pkt0,
+ {TPid, Caps, _},
+ #request{dictionary = Dict}
+ = Req0,
+ SvcName,
+ Tmo,
+ Fs) ->
+ Pkt = encode(Dict, Pkt0, Fs),
Req = Req0#request{transport = TPid,
packet = Pkt0,
caps = Caps},
?LOG(retransmission, Req),
- TRef = send_request(TPid, Pkt, Req, Tmo),
+ TRef = send_request(TPid, Pkt, Req, SvcName, Tmo),
{TRef, Req}.
-%% store_request/4
+%% store_request/5
-store_request(TPid, Bin, Req, Timeout) ->
+store_request(TPid, Bin, Req, Timeout, SvcName) ->
Seqs = diameter_codec:sequence_numbers(Bin),
TRef = erlang:start_timer(Timeout, self(), timeout),
ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}),
ets:member(?REQUEST_TABLE, TPid)
- orelse (self() ! {failover, TRef}), %% possibly missed failover
+ orelse failover(whois(SvcName), TRef, Seqs),
TRef.
+%% Induce failover for a request that was stored after a transport
+%% went down and which the service may have missed.
+failover(Pid, TRef, Seqs) ->
+ is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}).
+
%% lookup_request/2
lookup_request(Msg, TPid)
@@ -1646,9 +1681,9 @@ lookup(Msg, TPid, TRef) ->
false
end.
-%% erase_request/1
+%% erase_requests/1
-erase_request(Pkt) ->
+erase_requests(Pkt) ->
ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)).
%% match_requests/1
@@ -1671,16 +1706,14 @@ have_request(Pkt, TPid) ->
request_peer_up(TPid) ->
ets:insert(?REQUEST_TABLE, {TPid}).
-%% request_peer_down/2
+%% request_peer_down/1
-request_peer_down(TPid, S) ->
+request_peer_down(TPid) ->
ets:delete(?REQUEST_TABLE, TPid),
- lists:foreach(fun(T) -> failover(T,S) end, match_requests(TPid)).
+ lists:foreach(fun failover/1, match_requests(TPid)).
%% Note that a request process can store its request after failover
%% notifications are sent here: store_request/4 sends the notification
-%% in that case. Note also that we'll send as many notifications to a
-%% given handler as there are peers its sent to. All but one of these
-%% will be ignored.
+%% in that case.
%% ---------------------------------------------------------------------------
%% recv_request/4
@@ -2558,34 +2591,30 @@ x(T) ->
exit(T).
%% ---------------------------------------------------------------------------
-%% # failover/[23]
+%% # failover/1-2
%% ---------------------------------------------------------------------------
-%% Failover as a consequence of request_peer_down/2.
-failover({_, #request{handler = Pid} = Req, TRef}, S) ->
- Pid ! {failover, TRef, rt(Req, S)}.
+%% Failover as a consequence of request_peer_down/1: inform the
+%% request process.
+failover({_, Req, TRef}) ->
+ #request{handler = Pid,
+ packet = #diameter_packet{msg = M}}
+ = Req,
+ M /= undefined andalso (Pid ! {failover, TRef}).
+%% Failover is not performed when msg = binary() since sending
+%% pre-encoded binaries is only partially supported. (Mostly for
+%% test.)
%% Failover as a consequence of store_request/4.
-failover(TRef, Seqs, S)
+failover(TRef, Seqs)
when is_reference(TRef) ->
case lookup_request(Seqs, TRef) of
#request{} = Req ->
- failover({Seqs, Req, TRef}, S);
+ failover({Seqs, Req, TRef});
false ->
ok
end.
-%% prepare_request returned a binary ...
-rt(#request{packet = #diameter_packet{msg = undefined}}, _) ->
- false; %% Not what we should do but binaries are only parially supported
-
-%% ... or not.
-rt(#request{packet = #diameter_packet{msg = Msg},
- dictionary = Dict}
- = Req,
- S) ->
- find_transport(get_destination(Dict, Msg), Req, S).
-
%% ---------------------------------------------------------------------------
%% # report_status/5
%% ---------------------------------------------------------------------------
@@ -2672,7 +2701,7 @@ rpd(Pid, Alias, PDict) ->
?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict).
%% ---------------------------------------------------------------------------
-%% find_transport/[34]
+%% find_transport/2-3
%%
%% Return: {TransportPid, #diameter_caps{}, #diameter_app{}}
%% | false
@@ -2709,26 +2738,27 @@ find_send_app(Alias, Apps) ->
T
end.
-%% Retransmission, in the service process.
-find_transport([_,_] = RH,
- Req,
- #state{service = #diameter_service{pid = Pid,
- applications = Apps}}
+%% Retransmission after failover.
+find_transport(#request{app = Alias,
+ filter = Filter,
+ module = ModX,
+ packet = #diameter_packet{msg = Msg},
+ dictionary = Dict},
+ #state{service = #diameter_service{applications = Apps}}
= S)
- when self() == Pid ->
- #request{app = Alias,
- filter = Filter,
- module = ModX}
- = Req,
+ when Msg /= undefined -> %% retransmission of binaries is unsupported
#diameter_app{}
= App
= lists:keyfind(Alias, #diameter_app.alias, Apps),
pick_peer(App#diameter_app{module = ModX},
- RH,
+ get_destination(Dict, Msg),
Filter,
- S).
+ S);
+find_transport(_, _) ->
+ false.
+
%% get_destination/2
get_destination(Dict, Msg) ->