diff options
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 60 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 18 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_sup.erl | 4 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_traffic.erl | 174 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 28 |
5 files changed, 125 insertions, 159 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 996e75a8d3..0c24ebeb19 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -454,6 +454,9 @@ transition({timeout, _}, _) -> %% Outgoing message. transition({send, Msg}, S) -> outgoing(Msg, S); +transition({send, Route, Msg}, S) -> + put_route(Route), + outgoing(Msg, S); %% Request for graceful shutdown at remove_transport, stop_service of %% application shutdown. @@ -483,8 +486,10 @@ transition({'DOWN', _, process, TPid, _}, = S) -> start_next(S); -%% Transport has died after connection timeout. -transition({'DOWN', _, process, _, _}, _) -> +%% Transport has died after connection timeout, or handler process has +%% died. +transition({'DOWN', _, process, Pid, _}, _) -> + erase_route(Pid), ok; %% State query. @@ -494,6 +499,43 @@ transition({state, Pid}, #state{state = S, transport = TPid}) -> %% Crash on anything unexpected. +%% put_route/1 +%% +%% Map identifiers in an outgoing request to be able to lookup the +%% handler process when the answer is received. + +put_route(false) -> + ok; + +put_route({Pid, Ref, Seqs}) -> + MRef = monitor(process, Pid), + put(Pid, Seqs), + put(Seqs, {Pid, Ref, MRef}). + +%% get_route/1 + +get_route(#diameter_packet{header = #diameter_header{is_request = false}} + = Pkt) -> + Seqs = diameter_codec:sequence_numbers(Pkt), + case erase(Seqs) of + {Pid, Ref, MRef} -> + demonitor(MRef), + erase(Pid), + {Pid, Ref, self()}; + undefined -> + false + end; + +get_route(_) -> + false. + +%% erase_route/1 + +erase_route(Pid) -> + erase(erase(Pid)). + +%% capx/1 + capx(recv_CER) -> 'CER'; capx({'Wait-CEA', _, _}) -> @@ -576,8 +618,7 @@ incoming({Msg, NPid}, S) -> T catch {?MODULE, Name, Pkt} -> - S#state.parent ! {recv, self(), Name, {Pkt, NPid}}, - rcv(Name, Pkt, S) + incoming(Name, Pkt, NPid, S) end; incoming(Msg, S) -> @@ -585,10 +626,15 @@ incoming(Msg, S) -> recv(Msg, S) catch {?MODULE, Name, Pkt} -> - S#state.parent ! {recv, self(), Name, Pkt}, - rcv(Name, Pkt, S) + incoming(Name, Pkt, false, S) end. +%% incoming/4 + +incoming(Name, Pkt, NPid, #state{parent = Pid} = S) -> + Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid}, + rcv(Name, Pkt, S). + %% recv/2 recv(#diameter_packet{header = #diameter_header{} = Hdr} diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index ccf68f4d93..e4f77e3a24 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -1858,13 +1858,6 @@ eq(Any, Id, PeerId) -> %% OctetString() can be specified as an iolist() so test for string %% rather then term equality. -%% transports/1 - -transports(#state{watchdogT = WatchdogT}) -> - ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'}, - [{'is_pid', '$1'}], - ['$1']}]). - %% --------------------------------------------------------------------------- %% # service_info/2 %% --------------------------------------------------------------------------- @@ -1887,7 +1880,6 @@ transports(#state{watchdogT = WatchdogT}) -> -define(ALL_INFO, [capabilities, applications, transport, - pending, options]). %% The rest. @@ -1981,7 +1973,6 @@ complete_info(Item, #state{service = Svc} = S) -> applications -> info_apps(S); transport -> info_transport(S); options -> info_options(S); - pending -> info_pending(S); keys -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO; all -> service_info(?ALL_INFO, S); statistics -> info_stats(S); @@ -2189,13 +2180,6 @@ info_apps(#state{service = #diameter_service{applications = Apps}}) -> mk_app(#diameter_app{} = A) -> lists:zip(record_info(fields, diameter_app), tl(tuple_to_list(A))). -%% info_pending/1 -%% -%% One entry for each outgoing request whose answer is outstanding. - -info_pending(#state{} = S) -> - diameter_traffic:pending(transports(S)). - %% info_info/1 %% %% Extract process_info from connections info. diff --git a/lib/diameter/src/base/diameter_sup.erl b/lib/diameter/src/base/diameter_sup.erl index 482289cb9a..01c51f0856 100644 --- a/lib/diameter/src/base/diameter_sup.erl +++ b/lib/diameter/src/base/diameter_sup.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ -define(TABLES, [{diameter_sequence, [set]}, {diameter_service, [set, {keypos, 3}]}, - {diameter_request, [bag]}, + {diameter_request, [set]}, {diameter_config, [bag, {keypos, 2}]}]). %% start_link/0 diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index 0720057fe5..a39e0502d1 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2013-2016. All Rights Reserved. +%% Copyright Ericsson AB 2013-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ -export([send_request/4]). %% towards diameter_watchdog --export([receive_message/4]). +-export([receive_message/6]). %% towards diameter_peer_fsm and diameter_watchdog -export([incr/4, @@ -40,8 +40,7 @@ %% towards diameter_service -export([make_recvdata/1, peer_up/1, - peer_down/1, - pending/1]). + peer_down/1]). %% internal -export([send/1, %% send from remote node @@ -58,14 +57,12 @@ -define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests -define(DEFAULT_SPAWN_OPTS, []). -%% Table containing outgoing requests for which a reply has yet to be -%% received. +%% Table containing outgoing entries that live and die with +%% peer_up/down. The name is historic, since the table used to contain +%% information about outgoing requests for which an answer has yet to +%% be received. -define(REQUEST_TABLE, diameter_request). -%% Workaround for dialyzer's lack of understanding of match specs. --type match(T) - :: T | '_' | '$1' | '$2' | '$3' | '$4'. - %% Record diameter:call/4 options are parsed into. -record(options, {filter = none :: diameter:peer_filter(), @@ -73,7 +70,7 @@ timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF, detach = false :: boolean()}). -%% Term passed back to receive_message/4 with every incoming message. +%% Term passed back to receive_message/6 with every incoming message. -record(recvdata, {peerT :: ets:tid(), service_name :: diameter:service_name(), @@ -88,12 +85,12 @@ %% Record stored in diameter_request for each outgoing request. -record(request, - {ref :: match(reference()), %% used to receive answer - caller :: match(pid()), %% calling process - handler :: match(pid()), %% request process - transport :: match(pid()), %% peer process - caps :: match(#diameter_caps{}), %% of connection - packet :: match(#diameter_packet{})}). %% of request + {ref :: reference(), %% used to receive answer + caller :: pid() | undefined, %% calling process + handler :: pid(), %% request process + transport :: pid() | undefined, %% peer process + caps :: #diameter_caps{} | undefined, %% of connection + packet :: #diameter_packet{} | undefined}). %% of request %% --------------------------------------------------------------------------- %% # make_recvdata/1 @@ -115,12 +112,9 @@ make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) -> %% --------------------------------------------------------------------------- %% Start a process that dies with peer_down/1, on which request -%% processes can monitor. The transport process started in -%% diameter_peer_fsm could be used for this, but starting a new -%% process here is easier: don't otherwise have access to the -%% transport process here (TPid here is the diameter_peer_fsm -%% process), and the need for a process to monitor came much later -%% than the calls to peer_up/down. +%% processes can monitor. There is no other process that dies with +%% peer_down since failover doesn't imply the loss of transport in the +%% case of a watchdog transition into state SUSPECT. peer_up(TPid) -> proc_lib:start(?MODULE, init, [TPid]). @@ -212,54 +206,25 @@ incr_rc(Dir, Pkt, TPid, Dict0) -> incr_rc(Dir, Pkt, TPid, {Dict0, Dict0, Dict0}). %% --------------------------------------------------------------------------- -%% pending/1 -%% --------------------------------------------------------------------------- - -pending(TPids) -> - MatchSpec = [{{'$1', - #request{caller = '$2', - handler = '$3', - transport = '$4', - _ = '_'}, - '_'}, - [?ORCOND([{'==', T, '$4'} || T <- TPids])], - [{{'$1', [{{caller, '$2'}}, - {{handler, '$3'}}, - {{transport, '$4'}}]}}]}], - - try - ets:select(?REQUEST_TABLE, MatchSpec) - catch - error: badarg -> [] %% service has gone down - end. - -%% --------------------------------------------------------------------------- -%% # receive_message/4 +%% # receive_message/6 %% %% Handle an incoming Diameter message. %% --------------------------------------------------------------------------- -%% Handle an incoming Diameter message in the watchdog process. This -%% used to come through the service process but this avoids that -%% becoming a bottleneck. +%% Handle an incoming Diameter message in the watchdog process. -receive_message(TPid, {Pkt, NPid}, Dict0, RecvData) -> - NPid ! {diameter, incoming(TPid, Pkt, Dict0, RecvData)}; +receive_message(TPid, Route, Pkt, false, Dict0, RecvData) -> + incoming(TPid, Route, Pkt, Dict0, RecvData); -receive_message(TPid, Pkt, Dict0, RecvData) -> - incoming(TPid, Pkt, Dict0, RecvData). +receive_message(TPid, Route, Pkt, NPid, Dict0, RecvData) -> + NPid ! {diameter, incoming(TPid, Route, Pkt, Dict0, RecvData)}. %% incoming/4 -incoming(TPid, Pkt, Dict0, RecvData) +incoming(TPid, Route, Pkt, Dict0, RecvData) when is_pid(TPid) -> #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, - recv(R, - (not R) andalso lookup_request(Pkt, TPid), - TPid, - Pkt, - Dict0, - RecvData). + recv(R, Route, TPid, Pkt, Dict0, RecvData). %% recv/6 @@ -274,8 +239,8 @@ recv(true, false, TPid, Pkt, Dict0, T) -> end; %% ... answer to known request ... -recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> - Pid ! {answer, Ref, Req, Dict0, Pkt}, +recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) -> + Pid ! {answer, Ref, TPid, Dict0, Pkt}, {answer, Pid}; %% Note that failover could have happened prior to this message being @@ -546,7 +511,7 @@ send_A(T, TPid, {AppDict, Dict0} = DictT0, ReqPkt, EvalPktFs, EvalFs) -> {MsgDict, Pkt} = reply(T, TPid, DictT0, EvalPktFs, ReqPkt), incr(send, Pkt, TPid, AppDict), incr_rc(send, Pkt, TPid, {MsgDict, AppDict, Dict0}), %% count outgoing - send(TPid, Pkt), + send(TPid, false, Pkt), lists:foreach(fun diameter_lib:eval/1, EvalFs). %% answer/6 @@ -1508,7 +1473,7 @@ send_R(Pkt0, packet = Pkt0}, incr(send, Pkt, TPid, AppDict), - {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Timeout), + {TRef, MRef} = zend_requezt(TPid, Pkt, Req, SvcName, Timeout), Pid ! Ref, %% tell caller a send has been attempted handle_answer(SvcName, App, @@ -1519,11 +1484,11 @@ send_R(Pkt0, recv_A(Timeout, SvcName, App, Opts, {TRef, MRef, #request{ref = Ref} = Req}) -> %% Matching on TRef below ensures we ignore messages that pertain %% to a previous transport prior to failover. The answer message - %% includes the #request{} since it's not necessarily Req; that - %% is, from the last peer to which we've transmitted. + %% includes the pid of the transport on which it was received, + %% which may not be the last peer to which we've transmitted. receive - {answer = A, Ref, Rq, Dict0, Pkt} -> %% Answer from peer - {A, Rq, Dict0, Pkt}; + {answer = A, Ref, TPid, Dict0, Pkt} -> %% Answer from peer + {A, #request{} = erase(TPid), Dict0, Pkt}; {timeout = Reason, TRef, _} -> %% No timely reply {error, Req, Reason}; {'DOWN', MRef, process, _, _} when false /= MRef -> %% local peer_down @@ -1717,22 +1682,20 @@ encode(DictT, TPid, #diameter_packet{bin = undefined} = Pkt) -> encode(_, _, #diameter_packet{} = Pkt) -> Pkt. +%% zend_requezt/5 + +zend_requezt(TPid, Pkt, Req, SvcName, Timeout) -> + put(TPid, Req), + send_request(TPid, Pkt, Req, SvcName, Timeout). + %% send_request/5 send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout) when node() == node(TPid) -> Seqs = diameter_codec:sequence_numbers(Bin), TRef = erlang:start_timer(Timeout, self(), TPid), - Entry = {Seqs, Req, TRef}, - - %% Ensure that request table is cleaned even if the process is - %% killed. - Self = self(), - spawn(fun() -> diameter_lib:wait([Self]), delete_request(Entry) end), - - MRef = insert_request(Entry), - send(TPid, Pkt), - {TRef, MRef}; + send(TPid, _Route = {self(), Req#request.ref, Seqs}, Pkt), + {TRef, _MRef = peer_monitor(TPid, TRef)}; %% Send using a remote transport: spawn a process on the remote node %% to relay the answer. @@ -1764,16 +1727,16 @@ recv(TPid, Pid, TRef, {LocalTRef, MRef}) -> exit({timeout, LocalTRef, TPid} = T) end. -%% send/2 +%% send/3 -send(Pid, Pkt) -> %% Strip potentially large message terms. +send(Pid, Route, Pkt) -> %% Strip potentially large message terms. #diameter_packet{header = H, bin = Bin, transport_data = T} = Pkt, - Pid ! {send, #diameter_packet{header = H, - bin = Bin, - transport_data = T}}. + Pid ! {send, Route, #diameter_packet{header = H, + bin = Bin, + transport_data = T}}. %% retransmit/4 @@ -1783,8 +1746,8 @@ retransmit({TPid, Caps, App} = Req, SvcName, Timeout) -> - have_request(Pkt0, TPid) %% Don't failover to a peer we've - andalso ?THROW(timeout), %% already sent to. + undefined == get(TPid) %% Don't failover to a peer we've + orelse ?THROW(timeout), %% already sent to. Pkt = make_retransmit_packet(Pkt0), @@ -1837,51 +1800,20 @@ resend_request(Pkt0, ?LOG(retransmission, Pkt#diameter_packet.header), incr(TPid, {msg_id(Pkt, AppDict), send, retransmission}), - {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Tmo), + {TRef, MRef} = zend_requezt(TPid, Pkt, Req, SvcName, Tmo), {TRef, MRef, Req}. -%% insert_request/1 +%% peer_monitor/2 -insert_request({_Seqs, #request{transport = TPid}, TRef} = T) -> - ets:insert(?REQUEST_TABLE, T), +peer_monitor(TPid, TRef) -> case ets:lookup(?REQUEST_TABLE, TPid) of %% at peer_up/1 - [{_, Pid}] -> - monitor(process, Pid); + [{_, MPid}] -> + monitor(process, MPid); [] -> %% transport has gone down self() ! {failover, TRef}, false end. -%% lookup_request/2 -%% -%% Note the match on both the key and transport pid. The latter is -%% necessary since the same Hop-by-Hop and End-to-End identifiers are -%% reused in the case of retransmission. - -lookup_request(Msg, TPid) -> - Seqs = diameter_codec:sequence_numbers(Msg), - Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, '_'}, - [], - ['$_']}], - case ets:select(?REQUEST_TABLE, Spec) of - [{_, Req, _}] -> - Req; - [] -> - false - end. - -%% delete_request/1 - -delete_request(T) -> - ets:delete_object(?REQUEST_TABLE, T). - -%% have_request/2 - -have_request(Pkt, TPid) -> - Seqs = diameter_codec:sequence_numbers(Pkt), - Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'}, - '$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1). - %% get_destination/2 get_destination(Dict, Msg) -> diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 2ba60a65fb..f28b8f2910 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -283,7 +283,7 @@ event(Msg, ?LOG(transition, {From, To}). data(Msg, TPid, reopen, okay) -> - {recv, TPid, 'DWA', _Pkt} = Msg, %% assert + {recv, TPid, false, 'DWA', _Pkt, _NPid} = Msg, %% assert {TPid, T} = eraser(open), [T]; @@ -447,12 +447,14 @@ transition({'DOWN', _, process, TPid, _Reason} = D, end; %% Incoming message. -transition({recv, TPid, Name, PktT}, #watchdog{transport = TPid} = S) -> +transition({recv, TPid, Route, Name, Pkt, NPid}, + #watchdog{transport = TPid} + = S) -> try - incoming(Name, PktT, S) + incoming(Name, Pkt, NPid, S) catch #watchdog{dictionary = Dict0, receive_data = T} = NS -> - diameter_traffic:receive_message(TPid, PktT, Dict0, T), + diameter_traffic:receive_message(TPid, Route, Pkt, NPid, Dict0, T), NS end; @@ -582,15 +584,17 @@ send_watchdog(#watchdog{pending = false, %% Don't count encode errors since we don't expect any on DWR/DWA. -%% incoming/3 +%% incoming/4 -incoming(Name, {Pkt, NPid}, S) -> - NS = recv(Name, Pkt, S), - NPid ! {diameter, discard}, - NS; +incoming(Name, Pkt, false, S) -> + recv(Name, Pkt, S); -incoming(Name, Pkt, S) -> - recv(Name, Pkt, S). +incoming(Name, Pkt, NPid, S) -> + try + recv(Name, Pkt, S) + after + NPid ! {diameter, discard} + end. %% recv/3 |