diff options
Diffstat (limited to 'lib/diameter/src')
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 216 |
1 files changed, 123 insertions, 93 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 6e7adb1be2..d5a09338cd 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -334,14 +334,14 @@ call_rc(_, _, Sent) -> call(SvcName, App, Msg, Opts, Caller) -> c(find_state(SvcName), App, Msg, Opts, Caller). -c(#state{service_name = Svc, options = [{_, Mask} | _]} = S, +c(#state{service_name = SvcName, options = [{_, Mask} | _]} = S, App, Msg, Opts, Caller) -> case find_transport(App, Msg, Opts, S) of {_,_,_} = T -> - send_request(T, Mask, Msg, Opts, Caller, Svc); + send_request(T, Mask, Msg, Opts, Caller, SvcName); false -> {error, no_connection}; {error, _} = No -> @@ -587,8 +587,8 @@ transition({tc_timeout, T}, S) -> %% Request process is telling us it may have missed a failover message %% after a transport went down and the service process looked up %% outstanding requests. -transition({failover, TRef, Seqs}, S) -> - failover(TRef, Seqs, S), +transition({failover, TRef, Seqs}, _) -> + failover(TRef, Seqs), ok; transition(Req, S) -> @@ -1064,7 +1064,7 @@ connection_down(#watchdog{state = ?WD_OKAY, = S) -> report_status(down, Wd, Pr, S, []), remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), - request_peer_down(TPid, S); + request_peer_down(TPid); connection_down(#watchdog{}, #peer{}, _) -> ok; @@ -1302,7 +1302,13 @@ cm([_,_|_], _, _, _) -> %% The mod field of the #diameter_app{} here includes any extra %% arguments passed to diameter:call/2. -send_request({TPid, Caps, App} = T, Mask, Msg, Opts, Caller, SvcName) -> +send_request({TPid, Caps, App} + = Transport, + Mask, + Msg, + Opts, + Caller, + SvcName) -> #diameter_app{module = ModX} = App, @@ -1310,14 +1316,19 @@ send_request({TPid, Caps, App} = T, Mask, Msg, Opts, Caller, SvcName) -> send_req(cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]), Pkt, - T, + Transport, Opts, Caller, SvcName, []). -send_req({send, P}, Pkt, T, Opts, Caller, SvcName, Fs) -> - send_req(make_request_packet(P, Pkt), T, Opts, Caller, SvcName, Fs); +send_req({send, Msg}, Pkt, Transport, Opts, Caller, SvcName, Fs) -> + send_req(make_request_packet(Msg, Pkt), + Transport, + Opts, + Caller, + SvcName, + Fs); send_req({discard, Reason} , _, _, _, _, _, _) -> {error, Reason}; @@ -1446,11 +1457,11 @@ send_req(Pkt0, {TPid, Caps, App}, Opts, Caller, SvcName, Fs) -> module = ModX}, try - TRef = send_request(TPid, Pkt, Req, Timeout), + TRef = send_request(TPid, Pkt, Req, SvcName, Timeout), ack(Caller), handle_answer(SvcName, AE, recv_answer(Timeout, SvcName, {TRef, Req})) after - erase_request(Pkt) + erase_requests(Pkt) end. %% Tell caller a send has been attempted. @@ -1459,41 +1470,39 @@ ack({Pid, Ref}) -> %% recv_answer/3 -recv_answer(Timeout, - SvcName, - {TRef, #request{from = {_, Ref}, packet = RPkt} = Req} - = T) -> - +recv_answer(Timeout, SvcName, {TRef, #request{from = {_, Ref}} = Req}) -> %% Matching on TRef below ensures we ignore messages that pertain %% to a previous transport prior to failover. The answer message %% includes the #request{} since it's not necessarily Req; that %% is, from the last peer to which we've transmitted. - receive {answer = A, Ref, Rq, Dict0, Pkt} -> %% Answer from peer {A, Rq, Dict0, Pkt}; {timeout = Reason, TRef, _} -> %% No timely reply {error, Req, Reason}; - {failover = Reason, TRef, false} -> %% No alternate peer - {error, Req, Reason}; - {failover, TRef, Transport} -> %% Resend to alternate peer - try_retransmit(Timeout, SvcName, Req, Transport); - {failover, TRef} -> %% May have missed failover notification - Seqs = diameter_codec:sequence_numbers(RPkt), - Pid = whois(SvcName), - is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}), - recv_answer(Timeout, SvcName, T) + {failover, TRef} -> %% Service says peer has gone down + retransmit(Req, find_state(SvcName), Timeout) end. + %% Note that failover starts a new timer and that expiry of an old %% timer value is ignored. This means that an answer could be accepted %% from a peer after timeout in the case of failover. -try_retransmit(Timeout, SvcName, Req, Transport) -> +retransmit(Req, #state{service_name = SvcName} = S, Timeout) -> + rt(find_transport(Req, S), Req, SvcName, Timeout); + +retransmit(Req, false, _) -> %% service has gone down + {error, Req, failover}. + +rt({_,_,_} = Transport, Req, SvcName, Timeout) -> try retransmit(Transport, Req, SvcName, Timeout) of T -> recv_answer(Timeout, SvcName, T) catch ?FAILURE(Reason) -> {error, Req, Reason} - end. + end; + +rt(_, Req, _, _) -> %% no alternate peer + {error, Req, failover}. %% handle_error/3 @@ -1532,28 +1541,32 @@ encode(Dict, #diameter_packet{bin = undefined} = Pkt) -> encode(_, #diameter_packet{} = Pkt) -> Pkt. -%% send_request/4 +%% send_request/5 -send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, Timeout) +send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, SvcName, Timeout) when node() == node(TPid) -> %% Store the outgoing request before sending to avoid a race with %% reply reception. - TRef = store_request(TPid, Bin, Req, Timeout), + TRef = store_request(TPid, Bin, Req, Timeout, SvcName), send(TPid, Pkt), TRef; %% Send using a remote transport: spawn a process on the remote node %% to relay the answer. -send_request(TPid, #diameter_packet{} = Pkt, Req, Timeout) -> +send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) -> TRef = erlang:start_timer(Timeout, self(), timeout), - T = {TPid, Pkt, Req, Timeout, TRef}, + T = {TPid, Pkt, Req, SvcName, Timeout, TRef}, spawn(node(TPid), ?MODULE, send, [T]), TRef. %% send/1 -send({TPid, Pkt, #request{handler = Pid} = Req, Timeout, TRef}) -> - Ref = send_request(TPid, Pkt, Req#request{handler = self()}, Timeout), +send({TPid, Pkt, #request{handler = Pid} = Req, SvcName, Timeout, TRef}) -> + Ref = send_request(TPid, + Pkt, + Req#request{handler = self()}, + SvcName, + Timeout), Pid ! reref(receive T -> T end, Ref, TRef). reref({T, Ref, R}, Ref, TRef) -> @@ -1568,62 +1581,84 @@ send(Pid, Pkt) -> %% retransmit/4 -retransmit({TPid, Caps, #diameter_app{alias = Alias} = App} = T, +retransmit({TPid, Caps, #diameter_app{alias = Alias} = App} + = Transport, #request{app = Alias, packet = Pkt0} = Req, SvcName, Timeout) -> - have_request(Pkt0, TPid) %% Don't failover to a peer we've + have_request(Pkt0, TPid) %% Don't failover to a peer we've andalso ?THROW(timeout), %% already sent to. #diameter_packet{header = Hdr0} = Pkt0, Hdr = Hdr0#diameter_header{is_retransmitted = true}, Pkt = Pkt0#diameter_packet{header = Hdr}, - resend_req(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]), - T, + retransmit(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]), + Transport, Req#request{packet = Pkt}, + SvcName, Timeout, []). -resend_req({send, P}, T, #request{packet = Pkt} = Req, Timeout, Fs) -> - retransmit(make_request_packet(P, Pkt), T, Req, Timeout, Fs); - -resend_req({discard, Reason}, _, _, _, _) -> +retransmit({send, Msg}, + Transport, + #request{packet = Pkt} + = Req, + SvcName, + Timeout, + Fs) -> + resend_request(make_request_packet(Msg, Pkt), + Transport, + Req, + SvcName, + Timeout, + Fs); + +retransmit({discard, Reason}, _, _, _, _, _) -> ?THROW(Reason); -resend_req(discard, _, _, _, _) -> +retransmit(discard, _, _, _, _, _) -> ?THROW(discarded); -resend_req({eval_packet, RC, F}, T, Req, Timeout, Fs) -> - resend_req(RC, T, Req, Timeout, [F|Fs]); +retransmit({eval_packet, RC, F}, Transport, Req, SvcName, Timeout, Fs) -> + retransmit(RC, Transport, Req, SvcName, Timeout, [F|Fs]); -resend_req(T, {_, _, App}, _, _, _) -> +retransmit(T, {_, _, App}, _, _, _, _) -> ?ERROR({invalid_return, prepare_retransmit, App, T}). -%% retransmit/6 - -retransmit(Pkt0, {TPid, Caps, _}, #request{dictionary = D} = Req0, Tmo, Fs) -> - Pkt = encode(D, Pkt0, Fs), +resend_request(Pkt0, + {TPid, Caps, _}, + #request{dictionary = Dict} + = Req0, + SvcName, + Tmo, + Fs) -> + Pkt = encode(Dict, Pkt0, Fs), Req = Req0#request{transport = TPid, packet = Pkt0, caps = Caps}, ?LOG(retransmission, Req), - TRef = send_request(TPid, Pkt, Req, Tmo), + TRef = send_request(TPid, Pkt, Req, SvcName, Tmo), {TRef, Req}. -%% store_request/4 +%% store_request/5 -store_request(TPid, Bin, Req, Timeout) -> +store_request(TPid, Bin, Req, Timeout, SvcName) -> Seqs = diameter_codec:sequence_numbers(Bin), TRef = erlang:start_timer(Timeout, self(), timeout), ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}), ets:member(?REQUEST_TABLE, TPid) - orelse (self() ! {failover, TRef}), %% possibly missed failover + orelse failover(whois(SvcName), TRef, Seqs), TRef. +%% Induce failover for a request that was stored after a transport +%% went down and which the service may have missed. +failover(Pid, TRef, Seqs) -> + is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}). + %% lookup_request/2 lookup_request(Msg, TPid) @@ -1646,9 +1681,9 @@ lookup(Msg, TPid, TRef) -> false end. -%% erase_request/1 +%% erase_requests/1 -erase_request(Pkt) -> +erase_requests(Pkt) -> ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)). %% match_requests/1 @@ -1671,16 +1706,14 @@ have_request(Pkt, TPid) -> request_peer_up(TPid) -> ets:insert(?REQUEST_TABLE, {TPid}). -%% request_peer_down/2 +%% request_peer_down/1 -request_peer_down(TPid, S) -> +request_peer_down(TPid) -> ets:delete(?REQUEST_TABLE, TPid), - lists:foreach(fun(T) -> failover(T,S) end, match_requests(TPid)). + lists:foreach(fun failover/1, match_requests(TPid)). %% Note that a request process can store its request after failover %% notifications are sent here: store_request/4 sends the notification -%% in that case. Note also that we'll send as many notifications to a -%% given handler as there are peers its sent to. All but one of these -%% will be ignored. +%% in that case. %% --------------------------------------------------------------------------- %% recv_request/4 @@ -2558,34 +2591,30 @@ x(T) -> exit(T). %% --------------------------------------------------------------------------- -%% # failover/[23] +%% # failover/1-2 %% --------------------------------------------------------------------------- -%% Failover as a consequence of request_peer_down/2. -failover({_, #request{handler = Pid} = Req, TRef}, S) -> - Pid ! {failover, TRef, rt(Req, S)}. +%% Failover as a consequence of request_peer_down/1: inform the +%% request process. +failover({_, Req, TRef}) -> + #request{handler = Pid, + packet = #diameter_packet{msg = M}} + = Req, + M /= undefined andalso (Pid ! {failover, TRef}). +%% Failover is not performed when msg = binary() since sending +%% pre-encoded binaries is only partially supported. (Mostly for +%% test.) %% Failover as a consequence of store_request/4. -failover(TRef, Seqs, S) +failover(TRef, Seqs) when is_reference(TRef) -> case lookup_request(Seqs, TRef) of #request{} = Req -> - failover({Seqs, Req, TRef}, S); + failover({Seqs, Req, TRef}); false -> ok end. -%% prepare_request returned a binary ... -rt(#request{packet = #diameter_packet{msg = undefined}}, _) -> - false; %% Not what we should do but binaries are only parially supported - -%% ... or not. -rt(#request{packet = #diameter_packet{msg = Msg}, - dictionary = Dict} - = Req, - S) -> - find_transport(get_destination(Dict, Msg), Req, S). - %% --------------------------------------------------------------------------- %% # report_status/5 %% --------------------------------------------------------------------------- @@ -2672,7 +2701,7 @@ rpd(Pid, Alias, PDict) -> ?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict). %% --------------------------------------------------------------------------- -%% find_transport/[34] +%% find_transport/2-3 %% %% Return: {TransportPid, #diameter_caps{}, #diameter_app{}} %% | false @@ -2709,26 +2738,27 @@ find_send_app(Alias, Apps) -> T end. -%% Retransmission, in the service process. -find_transport([_,_] = RH, - Req, - #state{service = #diameter_service{pid = Pid, - applications = Apps}} +%% Retransmission after failover. +find_transport(#request{app = Alias, + filter = Filter, + module = ModX, + packet = #diameter_packet{msg = Msg}, + dictionary = Dict}, + #state{service = #diameter_service{applications = Apps}} = S) - when self() == Pid -> - #request{app = Alias, - filter = Filter, - module = ModX} - = Req, + when Msg /= undefined -> %% retransmission of binaries is unsupported #diameter_app{} = App = lists:keyfind(Alias, #diameter_app.alias, Apps), pick_peer(App#diameter_app{module = ModX}, - RH, + get_destination(Dict, Msg), Filter, - S). + S); +find_transport(_, _) -> + false. + %% get_destination/2 get_destination(Dict, Msg) -> |