aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2013-02-05 13:36:07 +0100
committerAnders Svensson <[email protected]>2013-02-08 19:28:47 +0100
commit554f669f6884121fd846d398c81b27446a382c84 (patch)
tree8c96e45dd9a5e393b9b511d84c293331768cc76b /lib
parentea1c1a1046df63be2851f732cb4b1916856b5dbc (diff)
downloadotp-554f669f6884121fd846d398c81b27446a382c84.tar.gz
otp-554f669f6884121fd846d398c81b27446a382c84.tar.bz2
otp-554f669f6884121fd846d398c81b27446a382c84.zip
Move failover out of service process
In order to be able to remove fields from the request process that don't need to be there and do less in the service process. The pick_peer callback now takes place in the request process in the case of immutable state, just as in the case of the initial send.
Diffstat (limited to 'lib')
-rw-r--r--lib/diameter/src/base/diameter_service.erl216
1 files changed, 123 insertions, 93 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index 6e7adb1be2..d5a09338cd 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -334,14 +334,14 @@ call_rc(_, _, Sent) ->
call(SvcName, App, Msg, Opts, Caller) ->
c(find_state(SvcName), App, Msg, Opts, Caller).
-c(#state{service_name = Svc, options = [{_, Mask} | _]} = S,
+c(#state{service_name = SvcName, options = [{_, Mask} | _]} = S,
App,
Msg,
Opts,
Caller) ->
case find_transport(App, Msg, Opts, S) of
{_,_,_} = T ->
- send_request(T, Mask, Msg, Opts, Caller, Svc);
+ send_request(T, Mask, Msg, Opts, Caller, SvcName);
false ->
{error, no_connection};
{error, _} = No ->
@@ -587,8 +587,8 @@ transition({tc_timeout, T}, S) ->
%% Request process is telling us it may have missed a failover message
%% after a transport went down and the service process looked up
%% outstanding requests.
-transition({failover, TRef, Seqs}, S) ->
- failover(TRef, Seqs, S),
+transition({failover, TRef, Seqs}, _) ->
+ failover(TRef, Seqs),
ok;
transition(Req, S) ->
@@ -1064,7 +1064,7 @@ connection_down(#watchdog{state = ?WD_OKAY,
= S) ->
report_status(down, Wd, Pr, S, []),
remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
- request_peer_down(TPid, S);
+ request_peer_down(TPid);
connection_down(#watchdog{}, #peer{}, _) ->
ok;
@@ -1302,7 +1302,13 @@ cm([_,_|_], _, _, _) ->
%% The mod field of the #diameter_app{} here includes any extra
%% arguments passed to diameter:call/2.
-send_request({TPid, Caps, App} = T, Mask, Msg, Opts, Caller, SvcName) ->
+send_request({TPid, Caps, App}
+ = Transport,
+ Mask,
+ Msg,
+ Opts,
+ Caller,
+ SvcName) ->
#diameter_app{module = ModX}
= App,
@@ -1310,14 +1316,19 @@ send_request({TPid, Caps, App} = T, Mask, Msg, Opts, Caller, SvcName) ->
send_req(cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]),
Pkt,
- T,
+ Transport,
Opts,
Caller,
SvcName,
[]).
-send_req({send, P}, Pkt, T, Opts, Caller, SvcName, Fs) ->
- send_req(make_request_packet(P, Pkt), T, Opts, Caller, SvcName, Fs);
+send_req({send, Msg}, Pkt, Transport, Opts, Caller, SvcName, Fs) ->
+ send_req(make_request_packet(Msg, Pkt),
+ Transport,
+ Opts,
+ Caller,
+ SvcName,
+ Fs);
send_req({discard, Reason} , _, _, _, _, _, _) ->
{error, Reason};
@@ -1446,11 +1457,11 @@ send_req(Pkt0, {TPid, Caps, App}, Opts, Caller, SvcName, Fs) ->
module = ModX},
try
- TRef = send_request(TPid, Pkt, Req, Timeout),
+ TRef = send_request(TPid, Pkt, Req, SvcName, Timeout),
ack(Caller),
handle_answer(SvcName, AE, recv_answer(Timeout, SvcName, {TRef, Req}))
after
- erase_request(Pkt)
+ erase_requests(Pkt)
end.
%% Tell caller a send has been attempted.
@@ -1459,41 +1470,39 @@ ack({Pid, Ref}) ->
%% recv_answer/3
-recv_answer(Timeout,
- SvcName,
- {TRef, #request{from = {_, Ref}, packet = RPkt} = Req}
- = T) ->
-
+recv_answer(Timeout, SvcName, {TRef, #request{from = {_, Ref}} = Req}) ->
%% Matching on TRef below ensures we ignore messages that pertain
%% to a previous transport prior to failover. The answer message
%% includes the #request{} since it's not necessarily Req; that
%% is, from the last peer to which we've transmitted.
-
receive
{answer = A, Ref, Rq, Dict0, Pkt} -> %% Answer from peer
{A, Rq, Dict0, Pkt};
{timeout = Reason, TRef, _} -> %% No timely reply
{error, Req, Reason};
- {failover = Reason, TRef, false} -> %% No alternate peer
- {error, Req, Reason};
- {failover, TRef, Transport} -> %% Resend to alternate peer
- try_retransmit(Timeout, SvcName, Req, Transport);
- {failover, TRef} -> %% May have missed failover notification
- Seqs = diameter_codec:sequence_numbers(RPkt),
- Pid = whois(SvcName),
- is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}),
- recv_answer(Timeout, SvcName, T)
+ {failover, TRef} -> %% Service says peer has gone down
+ retransmit(Req, find_state(SvcName), Timeout)
end.
+
%% Note that failover starts a new timer and that expiry of an old
%% timer value is ignored. This means that an answer could be accepted
%% from a peer after timeout in the case of failover.
-try_retransmit(Timeout, SvcName, Req, Transport) ->
+retransmit(Req, #state{service_name = SvcName} = S, Timeout) ->
+ rt(find_transport(Req, S), Req, SvcName, Timeout);
+
+retransmit(Req, false, _) -> %% service has gone down
+ {error, Req, failover}.
+
+rt({_,_,_} = Transport, Req, SvcName, Timeout) ->
try retransmit(Transport, Req, SvcName, Timeout) of
T -> recv_answer(Timeout, SvcName, T)
catch
?FAILURE(Reason) -> {error, Req, Reason}
- end.
+ end;
+
+rt(_, Req, _, _) -> %% no alternate peer
+ {error, Req, failover}.
%% handle_error/3
@@ -1532,28 +1541,32 @@ encode(Dict, #diameter_packet{bin = undefined} = Pkt) ->
encode(_, #diameter_packet{} = Pkt) ->
Pkt.
-%% send_request/4
+%% send_request/5
-send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, Timeout)
+send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, SvcName, Timeout)
when node() == node(TPid) ->
%% Store the outgoing request before sending to avoid a race with
%% reply reception.
- TRef = store_request(TPid, Bin, Req, Timeout),
+ TRef = store_request(TPid, Bin, Req, Timeout, SvcName),
send(TPid, Pkt),
TRef;
%% Send using a remote transport: spawn a process on the remote node
%% to relay the answer.
-send_request(TPid, #diameter_packet{} = Pkt, Req, Timeout) ->
+send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) ->
TRef = erlang:start_timer(Timeout, self(), timeout),
- T = {TPid, Pkt, Req, Timeout, TRef},
+ T = {TPid, Pkt, Req, SvcName, Timeout, TRef},
spawn(node(TPid), ?MODULE, send, [T]),
TRef.
%% send/1
-send({TPid, Pkt, #request{handler = Pid} = Req, Timeout, TRef}) ->
- Ref = send_request(TPid, Pkt, Req#request{handler = self()}, Timeout),
+send({TPid, Pkt, #request{handler = Pid} = Req, SvcName, Timeout, TRef}) ->
+ Ref = send_request(TPid,
+ Pkt,
+ Req#request{handler = self()},
+ SvcName,
+ Timeout),
Pid ! reref(receive T -> T end, Ref, TRef).
reref({T, Ref, R}, Ref, TRef) ->
@@ -1568,62 +1581,84 @@ send(Pid, Pkt) ->
%% retransmit/4
-retransmit({TPid, Caps, #diameter_app{alias = Alias} = App} = T,
+retransmit({TPid, Caps, #diameter_app{alias = Alias} = App}
+ = Transport,
#request{app = Alias, packet = Pkt0}
= Req,
SvcName,
Timeout) ->
- have_request(Pkt0, TPid) %% Don't failover to a peer we've
+ have_request(Pkt0, TPid) %% Don't failover to a peer we've
andalso ?THROW(timeout), %% already sent to.
#diameter_packet{header = Hdr0} = Pkt0,
Hdr = Hdr0#diameter_header{is_retransmitted = true},
Pkt = Pkt0#diameter_packet{header = Hdr},
- resend_req(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]),
- T,
+ retransmit(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]),
+ Transport,
Req#request{packet = Pkt},
+ SvcName,
Timeout,
[]).
-resend_req({send, P}, T, #request{packet = Pkt} = Req, Timeout, Fs) ->
- retransmit(make_request_packet(P, Pkt), T, Req, Timeout, Fs);
-
-resend_req({discard, Reason}, _, _, _, _) ->
+retransmit({send, Msg},
+ Transport,
+ #request{packet = Pkt}
+ = Req,
+ SvcName,
+ Timeout,
+ Fs) ->
+ resend_request(make_request_packet(Msg, Pkt),
+ Transport,
+ Req,
+ SvcName,
+ Timeout,
+ Fs);
+
+retransmit({discard, Reason}, _, _, _, _, _) ->
?THROW(Reason);
-resend_req(discard, _, _, _, _) ->
+retransmit(discard, _, _, _, _, _) ->
?THROW(discarded);
-resend_req({eval_packet, RC, F}, T, Req, Timeout, Fs) ->
- resend_req(RC, T, Req, Timeout, [F|Fs]);
+retransmit({eval_packet, RC, F}, Transport, Req, SvcName, Timeout, Fs) ->
+ retransmit(RC, Transport, Req, SvcName, Timeout, [F|Fs]);
-resend_req(T, {_, _, App}, _, _, _) ->
+retransmit(T, {_, _, App}, _, _, _, _) ->
?ERROR({invalid_return, prepare_retransmit, App, T}).
-%% retransmit/6
-
-retransmit(Pkt0, {TPid, Caps, _}, #request{dictionary = D} = Req0, Tmo, Fs) ->
- Pkt = encode(D, Pkt0, Fs),
+resend_request(Pkt0,
+ {TPid, Caps, _},
+ #request{dictionary = Dict}
+ = Req0,
+ SvcName,
+ Tmo,
+ Fs) ->
+ Pkt = encode(Dict, Pkt0, Fs),
Req = Req0#request{transport = TPid,
packet = Pkt0,
caps = Caps},
?LOG(retransmission, Req),
- TRef = send_request(TPid, Pkt, Req, Tmo),
+ TRef = send_request(TPid, Pkt, Req, SvcName, Tmo),
{TRef, Req}.
-%% store_request/4
+%% store_request/5
-store_request(TPid, Bin, Req, Timeout) ->
+store_request(TPid, Bin, Req, Timeout, SvcName) ->
Seqs = diameter_codec:sequence_numbers(Bin),
TRef = erlang:start_timer(Timeout, self(), timeout),
ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}),
ets:member(?REQUEST_TABLE, TPid)
- orelse (self() ! {failover, TRef}), %% possibly missed failover
+ orelse failover(whois(SvcName), TRef, Seqs),
TRef.
+%% Induce failover for a request that was stored after a transport
+%% went down and which the service may have missed.
+failover(Pid, TRef, Seqs) ->
+ is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}).
+
%% lookup_request/2
lookup_request(Msg, TPid)
@@ -1646,9 +1681,9 @@ lookup(Msg, TPid, TRef) ->
false
end.
-%% erase_request/1
+%% erase_requests/1
-erase_request(Pkt) ->
+erase_requests(Pkt) ->
ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)).
%% match_requests/1
@@ -1671,16 +1706,14 @@ have_request(Pkt, TPid) ->
request_peer_up(TPid) ->
ets:insert(?REQUEST_TABLE, {TPid}).
-%% request_peer_down/2
+%% request_peer_down/1
-request_peer_down(TPid, S) ->
+request_peer_down(TPid) ->
ets:delete(?REQUEST_TABLE, TPid),
- lists:foreach(fun(T) -> failover(T,S) end, match_requests(TPid)).
+ lists:foreach(fun failover/1, match_requests(TPid)).
%% Note that a request process can store its request after failover
%% notifications are sent here: store_request/4 sends the notification
-%% in that case. Note also that we'll send as many notifications to a
-%% given handler as there are peers its sent to. All but one of these
-%% will be ignored.
+%% in that case.
%% ---------------------------------------------------------------------------
%% recv_request/4
@@ -2558,34 +2591,30 @@ x(T) ->
exit(T).
%% ---------------------------------------------------------------------------
-%% # failover/[23]
+%% # failover/1-2
%% ---------------------------------------------------------------------------
-%% Failover as a consequence of request_peer_down/2.
-failover({_, #request{handler = Pid} = Req, TRef}, S) ->
- Pid ! {failover, TRef, rt(Req, S)}.
+%% Failover as a consequence of request_peer_down/1: inform the
+%% request process.
+failover({_, Req, TRef}) ->
+ #request{handler = Pid,
+ packet = #diameter_packet{msg = M}}
+ = Req,
+ M /= undefined andalso (Pid ! {failover, TRef}).
+%% Failover is not performed when msg = binary() since sending
+%% pre-encoded binaries is only partially supported. (Mostly for
+%% test.)
%% Failover as a consequence of store_request/4.
-failover(TRef, Seqs, S)
+failover(TRef, Seqs)
when is_reference(TRef) ->
case lookup_request(Seqs, TRef) of
#request{} = Req ->
- failover({Seqs, Req, TRef}, S);
+ failover({Seqs, Req, TRef});
false ->
ok
end.
-%% prepare_request returned a binary ...
-rt(#request{packet = #diameter_packet{msg = undefined}}, _) ->
- false; %% Not what we should do but binaries are only parially supported
-
-%% ... or not.
-rt(#request{packet = #diameter_packet{msg = Msg},
- dictionary = Dict}
- = Req,
- S) ->
- find_transport(get_destination(Dict, Msg), Req, S).
-
%% ---------------------------------------------------------------------------
%% # report_status/5
%% ---------------------------------------------------------------------------
@@ -2672,7 +2701,7 @@ rpd(Pid, Alias, PDict) ->
?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict).
%% ---------------------------------------------------------------------------
-%% find_transport/[34]
+%% find_transport/2-3
%%
%% Return: {TransportPid, #diameter_caps{}, #diameter_app{}}
%% | false
@@ -2709,26 +2738,27 @@ find_send_app(Alias, Apps) ->
T
end.
-%% Retransmission, in the service process.
-find_transport([_,_] = RH,
- Req,
- #state{service = #diameter_service{pid = Pid,
- applications = Apps}}
+%% Retransmission after failover.
+find_transport(#request{app = Alias,
+ filter = Filter,
+ module = ModX,
+ packet = #diameter_packet{msg = Msg},
+ dictionary = Dict},
+ #state{service = #diameter_service{applications = Apps}}
= S)
- when self() == Pid ->
- #request{app = Alias,
- filter = Filter,
- module = ModX}
- = Req,
+ when Msg /= undefined -> %% retransmission of binaries is unsupported
#diameter_app{}
= App
= lists:keyfind(Alias, #diameter_app.alias, Apps),
pick_peer(App#diameter_app{module = ModX},
- RH,
+ get_destination(Dict, Msg),
Filter,
- S).
+ S);
+find_transport(_, _) ->
+ false.
+
%% get_destination/2
get_destination(Dict, Msg) ->