diff options
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 138 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_traffic.erl | 80 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 37 |
3 files changed, 136 insertions, 119 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index d394156367..d2af8fe425 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -129,6 +129,7 @@ %% the request was sent explicitly with %% diameter:call/4. strict :: boolean(), + ack = false :: boolean(), length_errors :: exit | handle | discard, incoming_maxlen :: integer() | infinity}). @@ -235,7 +236,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> Tmo = proplists:get_value(capx_timeout, Opts, ?CAPX_TIMEOUT), Strictness = proplists:get_value(capx_strictness, Opts, true), - OnLengthErr = proplists:get_value(length_errors, Opts, exit), + LengthErr = proplists:get_value(length_errors, Opts, exit), {TPid, Addrs} = start_transport(T, Rest, Svc), @@ -247,7 +248,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> dictionary = Dict0, mode = M, service = svc(Svc, Addrs), - length_errors = OnLengthErr, + length_errors = LengthErr, strict = Strictness, incoming_maxlen = Maxlen}. %% The transport returns its local ip addresses so that different @@ -442,10 +443,18 @@ transition({connection_timeout = T, TPid}, transition({connection_timeout, _}, _) -> ok; +%% Requests for acknowledgements to the transport. +transition({diameter, ack}, S) -> + S#state{ack = true}; + %% Incoming message from the transport. -transition({diameter, {recv, MsgT}}, S) -> - {Msg, NPid} = msg(MsgT), - incoming(recv(Msg, S), NPid, S); +transition({diameter, {recv, Msg}}, S) -> + incoming(recv(Msg, S), S); + +%% Handler of an incoming request is telling of its existence. +transition({handler, Pid}, _) -> + put_route(Pid), + ok; %% Timeout when still in the same state ... transition({timeout = T, PS}, #state{state = PS}) -> @@ -459,7 +468,7 @@ transition({timeout, _}, _) -> transition({send, Msg}, S) -> outgoing(Msg, S); transition({send, Msg, Route}, S) -> - put_route(Route), + route_outgoing(Route), outgoing(Msg, S); %% Request for graceful shutdown at remove_transport, stop_service of @@ -488,12 +497,13 @@ transition({'DOWN', _, process, WPid, _}, transition({'DOWN', _, process, TPid, _}, #state{transport = TPid} = S) -> - start_next(S); + start_next(S#state{ack = false}); %% Transport has died after connection timeout, or handler process has %% died. -transition({'DOWN', _, process, Pid, _}, _) -> - erase_route(Pid), +transition({'DOWN', _, process, Pid, _}, #state{transport = TPid}) -> + is_reference(erase_route(Pid)) + andalso send(TPid, false), %% answer not forthcoming ok; %% State query. @@ -503,37 +513,56 @@ transition({state, Pid}, #state{state = S, transport = TPid}) -> %% Crash on anything unexpected. -%% put_route/1 -%% +%% route_outgoing/1 + %% Map identifiers in an outgoing request to be able to lookup the %% handler process when the answer is received. - -put_route({Pid, Ref, Seqs}) -> +route_outgoing({Pid, Ref, Seqs}) -> %% request MRef = monitor(process, Pid), put(Pid, Seqs), - put(Seqs, {Pid, Ref, MRef}). + put(Seqs, {Pid, Ref, MRef}); -%% get_route/1 +%% Remove a mapping made for an incoming request. +route_outgoing(Pid) + when is_pid(Pid) -> %% answer + MRef = erase_route(Pid), + undefined == MRef orelse demonitor(MRef). -get_route(#diameter_packet{header = #diameter_header{is_request = false}} - = Pkt) -> +%% put_route/1 + +%% Monitor on a handler process for an incoming request. +put_route(Pid) -> + MRef = monitor(process, Pid), + put(Pid, MRef). + +%% get_route/2 + +%% incoming answer +get_route(_, #diameter_packet{header = #diameter_header{is_request = false}} + = Pkt) -> Seqs = diameter_codec:sequence_numbers(Pkt), case erase(Seqs) of {Pid, Ref, MRef} -> demonitor(MRef), erase(Pid), {Pid, Ref, self()}; - undefined -> + undefined -> %% request unknown false end; -get_route(_) -> - false. +%% incoming request +get_route(Ack, _) -> + Ack. %% erase_route/1 erase_route(Pid) -> - erase(erase(Pid)). + case erase(Pid) of + {_,_} = Seqs -> + erase(Seqs); + T -> + T + end. %% capx/1 @@ -610,26 +639,26 @@ encode(Rec, Dict) -> diameter_codec:encode(Dict, #diameter_packet{header = Hdr, msg = Rec}). -%% incoming/3 +%% incoming/2 -incoming({recv, Name, Pkt}, NPid, #state{parent = Pid} = S) -> - Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid}, +incoming({recv = T, Name, Pkt}, #state{parent = Pid, ack = Ack} = S) -> + Pid ! {T, self(), get_route(Ack, Pkt), Name, Pkt}, rcv(Name, Pkt, S); -incoming(T, false, _) -> - T; - -incoming(T, NPid, _) -> - NPid ! {diameter, discard}, - T. +incoming(#diameter_header{is_request = R}, #state{transport = TPid, + ack = Ack}) -> + R andalso Ack andalso send(TPid, false), + ok; -%% msg/1 +incoming(<<_:32, 1:1, _/bits>>, #state{ack = true} = S) -> + send(S#state.transport, false), + ok; -msg({_,_} = T) -> - T; +incoming(<<_/bits>>, _) -> + ok; -msg(Msg) -> - {Msg, false}. +incoming(T, _) -> + T. %% recv/2 @@ -654,18 +683,19 @@ recv1(_, #diameter_packet{header = H, bin = Bin}, #state{incoming_maxlen = M}) when M < size(Bin) -> - invalid(false, incoming_maxlen_exceeded, {size(Bin), H}); + invalid(false, incoming_maxlen_exceeded, {size(Bin), H}), + H; %% Ignore anything but an expected CER/CEA if so configured. This is %% non-standard behaviour. -recv1(Name, _, #state{state = {'Wait-CEA', _, _}, - strict = false}) +recv1(Name, #diameter_packet{header = H}, #state{state = {'Wait-CEA', _, _}, + strict = false}) when Name /= 'CEA' -> - ok; -recv1(Name, _, #state{state = recv_CER, - strict = false}) + H; +recv1(Name, #diameter_packet{header = H}, #state{state = recv_CER, + strict = false}) when Name /= 'CER' -> - ok; + H; %% Incoming request after outgoing DPR: discard. Don't discard DPR, so %% both ends don't do so when sending simultaneously. @@ -673,13 +703,15 @@ recv1(Name, #diameter_packet{header = #diameter_header{is_request = true} = H}, #state{dpr = {_,_,_}}) when Name /= 'DPR' -> - invalid(false, recv_after_outgoing_dpr, H); + invalid(false, recv_after_outgoing_dpr, H), + H; %% Incoming request after incoming DPR: discard. recv1(_, #diameter_packet{header = #diameter_header{is_request = true} = H}, #state{dpr = true}) -> - invalid(false, recv_after_incoming_dpr, H); + invalid(false, recv_after_incoming_dpr, H), + H; %% DPA with identifier mismatch, or in response to a DPR initiated by %% the service. @@ -716,10 +748,12 @@ recv(#diameter_header{} #diameter_packet{bin = Bin}, #state{length_errors = E}) -> T = {size(Bin), bit_size(Bin) rem 8, H}, - invalid(E, message_length_mismatch, T); + invalid(E, message_length_mismatch, T), + Bin; recv(false, #diameter_packet{bin = Bin}, #state{length_errors = E}) -> - invalid(E, truncated_header, Bin). + invalid(E, truncated_header, Bin), + Bin. %% Note that counters here only count discarded messages. invalid(E, Reason, T) -> @@ -775,14 +809,10 @@ rcv('DPA' = N, diameter_peer:close(TPid), {stop, N}; -%% Ignore anything else, an unsolicited DPA in particular. Note that -%% dpa_timeout deals with the case in which the peer sends the wrong -%% identifiers in DPA. -rcv(N, #diameter_packet{header = H}, _) - when N == 'CER'; - N == 'CEA'; - N == 'DPR'; - N == 'DPA' -> +%% Ignore an unsolicited DPA in particular. Note that dpa_timeout +%% deals with the case in which the peer sends the wrong identifiers +%% in DPA. +rcv('DPA' = N, #diameter_packet{header = H}, _) -> ?LOG(ignored, N), %% Note that these aren't counted in the normal recv counter. diameter_stats:incr({diameter_codec:msg_id(H), recv, ignored}), diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index bc1ccf4feb..96f3a307f9 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -30,7 +30,7 @@ -export([send_request/4]). %% towards diameter_watchdog --export([receive_message/6]). +-export([receive_message/5]). %% towards diameter_peer_fsm and diameter_watchdog -export([incr/4, @@ -93,7 +93,7 @@ packet :: #diameter_packet{} | undefined}). %% of request %% --------------------------------------------------------------------------- -%% # make_recvdata/1 +%% make_recvdata/1 %% --------------------------------------------------------------------------- make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) -> @@ -206,42 +206,36 @@ incr_rc(Dir, Pkt, TPid, Dict0) -> incr_rc(Dir, Pkt, TPid, {Dict0, Dict0, Dict0}). %% --------------------------------------------------------------------------- -%% # receive_message/6 +%% receive_message/5 %% -%% Handle an incoming Diameter message. +%% Handle an incoming Diameter message in a watchdog process. %% --------------------------------------------------------------------------- -%% Handle an incoming Diameter message in the watchdog process. - -receive_message(TPid, Route, Pkt, false, Dict0, RecvData) -> - incoming(TPid, Route, Pkt, Dict0, RecvData); - -receive_message(TPid, Route, Pkt, NPid, Dict0, RecvData) -> - NPid ! {diameter, incoming(TPid, Route, Pkt, Dict0, RecvData)}. - -%% incoming/4 - -incoming(TPid, Route, Pkt, Dict0, RecvData) - when is_pid(TPid) -> +-spec receive_message(pid(), Route, #diameter_packet{}, module(), #recvdata{}) + -> pid() + | boolean() + when Route :: {Handler, RequestRef, Seqs} + | Ack, + Handler :: pid(), + RequestRef :: reference(), + Seqs :: {0..16#FFFFFFFF, 0..16#FFFFFFFF}, + Ack :: boolean(). + +receive_message(TPid, Route, Pkt, Dict0, RecvData) -> #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, recv(R, Route, TPid, Pkt, Dict0, RecvData). %% recv/6 %% Incoming request ... -recv(true, false, TPid, Pkt, Dict0, T) -> - try - {request, spawn_request(TPid, Pkt, Dict0, T)} - catch - error: system_limit = E -> %% discard - ?LOG(error, E), - discard - end; +recv(true, Ack, TPid, Pkt, Dict0, T) + when is_boolean(Ack) -> + spawn_request(Ack, TPid, Pkt, Dict0, T); %% ... answer to known request ... recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) -> Pid ! {answer, Ref, TPid, Dict0, Pkt}, - {answer, Pid}; + true; %% Note that failover could have happened prior to this message being %% received and triggering failback. That is, both a failover message @@ -256,23 +250,27 @@ recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) -> recv(false, false, TPid, Pkt, _, _) -> ?LOG(discarded, Pkt#diameter_packet.header), incr(TPid, {{unknown, 0}, recv, discarded}), - discard. + false. -%% spawn_request/4 +%% spawn_request/5 -spawn_request(TPid, Pkt, Dict0, {Opts, RecvData}) -> - spawn_request(TPid, Pkt, Dict0, Opts, RecvData); -spawn_request(TPid, Pkt, Dict0, RecvData) -> - spawn_request(TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData). +spawn_request(Ack, TPid, Pkt, Dict0, {Opts, RecvData}) -> + spawn_request(Ack, TPid, Pkt, Dict0, Opts, RecvData); +spawn_request(Ack, TPid, Pkt, Dict0, RecvData) -> + spawn_request(Ack, TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData). -spawn_request(TPid, Pkt, Dict0, Opts, RecvData) -> - spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts). +spawn_request(Ack, TPid, Pkt, Dict0, Opts, RecvData) -> + spawn_opt(fun() -> + recv_request(Ack, TPid, Pkt, Dict0, RecvData) + end, + Opts). %% --------------------------------------------------------------------------- -%% recv_request/4 +%% recv_request/5 %% --------------------------------------------------------------------------- -recv_request(TPid, +recv_request(Ack, + TPid, #diameter_packet{header = #diameter_header{application_id = Id}} = Pkt, Dict0, @@ -280,6 +278,7 @@ recv_request(TPid, apps = Apps, codec = Opts} = RecvData) -> + Ack andalso (TPid ! {handler, self()}), diameter_codec:setopts([{common_dictionary, Dict0} | Opts]), send_A(recv_R(diameter_service:find_incoming_app(PeerT, TPid, Id, Apps), TPid, @@ -511,7 +510,7 @@ send_A(T, TPid, {AppDict, Dict0} = DictT0, ReqPkt, EvalPktFs, EvalFs) -> {MsgDict, Pkt} = reply(T, TPid, DictT0, EvalPktFs, ReqPkt), incr(send, Pkt, TPid, AppDict), incr_rc(send, Pkt, TPid, {MsgDict, AppDict, Dict0}), %% count outgoing - send(TPid, Pkt), + send(TPid, Pkt, _Route = self()), lists:foreach(fun diameter_lib:eval/1, EvalFs). %% answer/6 @@ -1207,7 +1206,7 @@ x(T) -> exit(T). %% --------------------------------------------------------------------------- -%% # send_request/4 +%% send_request/4 %% %% Handle an outgoing Diameter request. %% --------------------------------------------------------------------------- @@ -1296,7 +1295,7 @@ mo(T, _) -> ?ERROR({invalid_option, T}). %% --------------------------------------------------------------------------- -%% # send_request/6 +%% send_request/6 %% --------------------------------------------------------------------------- %% Send an outgoing request in its dedicated process. @@ -1745,11 +1744,6 @@ recv(TPid, Pid, TRef, {LocalTRef, MRef}) -> exit({timeout, LocalTRef, TPid} = T) end. -%% send/2 - -send(Pid, Pkt) -> - Pid ! {send, Pkt}. - %% send/3 send(Pid, Pkt, Route) -> diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 4484b7ee2c..a2eb661870 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -283,7 +283,7 @@ event(Msg, ?LOG(transition, {From, To}). data(Msg, TPid, reopen, okay) -> - {recv, TPid, false, 'DWA', _Pkt, _NPid} = Msg, %% assert + {recv, TPid, _, 'DWA', _Pkt} = Msg, %% assert {TPid, T} = eraser(open), [T]; @@ -302,6 +302,8 @@ tpid(_, Pid) tpid(Pid, _) -> Pid. +%% send/2 + send(Pid, T) -> Pid ! T. @@ -447,14 +449,15 @@ transition({'DOWN', _, process, TPid, _Reason} = D, end; %% Incoming message. -transition({recv, TPid, Route, Name, Pkt, NPid}, +transition({recv, TPid, Route, Name, Pkt}, #watchdog{transport = TPid} = S) -> - try - incoming(Name, Pkt, NPid, S) - catch + try incoming(Route, Name, Pkt, S) of #watchdog{dictionary = Dict0, receive_data = T} = NS -> - diameter_traffic:receive_message(TPid, Route, Pkt, NPid, Dict0, T), + diameter_traffic:receive_message(TPid, Route, Pkt, Dict0, T), + NS + catch + #watchdog{} = NS -> NS end; @@ -586,23 +589,13 @@ send_watchdog(#watchdog{pending = false, %% incoming/4 -incoming(Name, Pkt, false, S) -> - recv(Name, Pkt, S); - -incoming(Name, Pkt, NPid, S) -> - NS = recv(Name, Pkt, S), - NPid ! {diameter, discard}, - NS. - -%% recv/3 - -recv(Name, Pkt, S) -> - try rcv(Name, Pkt, rcv(Name, S)) of - #watchdog{} = NS -> - throw(NS) +incoming(Route, Name, Pkt, S) -> + try rcv(Name, S) of + NS -> rcv(Name, Pkt, NS) catch - #watchdog{} = NS -> %% throwaway - NS + #watchdog{transport = TPid} = NS when Route -> %% incoming request + send(TPid, {send, false}), %% requiring ack + throw(NS) end. %% rcv/3 |