From 2d74fa618f3a34a5487f5de37c4f6e2870b58273 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 15 Mar 2017 22:33:17 +0100 Subject: Fix handling of message length errors Message length errors in incoming messages were misinterpreted with transport_opt() {length_errors, exit} due to the throw introduced in commit 2ffb288: the corresponding catch in incoming/2 caught errors thrown by close/1, leading to failure when the error reason was interpreted as a diameter_packet record. Do away with the throw, that also caused woe in the parent commit. --- lib/diameter/src/base/diameter_peer_fsm.erl | 42 +++++++++++++---------------- 1 file changed, 19 insertions(+), 23 deletions(-) (limited to 'lib/diameter/src/base/diameter_peer_fsm.erl') diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 7ee1e5fe59..d394156367 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -444,7 +444,8 @@ transition({connection_timeout, _}, _) -> %% Incoming message from the transport. transition({diameter, {recv, MsgT}}, S) -> - incoming(MsgT, S); + {Msg, NPid} = msg(MsgT), + incoming(recv(Msg, S), NPid, S); %% Timeout when still in the same state ... transition({timeout = T, PS}, #state{state = PS}) -> @@ -609,31 +610,26 @@ encode(Rec, Dict) -> diameter_codec:encode(Dict, #diameter_packet{header = Hdr, msg = Rec}). -%% incoming/2 +%% incoming/3 -incoming({Msg, NPid}, S) -> - try recv(Msg, S) of - T -> - NPid ! {diameter, discard}, - T - catch - {?MODULE, Name, Pkt} -> - incoming(Name, Pkt, NPid, S) - end; +incoming({recv, Name, Pkt}, NPid, #state{parent = Pid} = S) -> + Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid}, + rcv(Name, Pkt, S); -incoming(Msg, S) -> - try - recv(Msg, S) - catch - {?MODULE, Name, Pkt} -> - incoming(Name, Pkt, false, S) - end. +incoming(T, false, _) -> + T; -%% incoming/4 +incoming(T, NPid, _) -> + NPid ! {diameter, discard}, + T. -incoming(Name, Pkt, NPid, #state{parent = Pid} = S) -> - Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid}, - rcv(Name, Pkt, S). +%% msg/1 + +msg({_,_} = T) -> + T; + +msg(Msg) -> + {Msg, false}. %% recv/2 @@ -701,7 +697,7 @@ recv1('DPA' = N, %% Any other message with a header and no length errors: send to the %% parent. recv1(Name, Pkt, #state{}) -> - throw({?MODULE, Name, Pkt}). + {recv, Name, Pkt}. %% recv/3 -- cgit v1.2.3 From ca09cf7b697798aca5a4f81a11d5ad1d90f4107e Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 15 Mar 2017 17:04:44 +0100 Subject: Simplify acks to transport processes What's interesting when implementing some form of load regulation is when an incoming request has been answered or discarded. Acknowledge exactly this, not the identity of handler processes as previously. A transport process can request acks of nonforthcoming answers by sending {diameter, ack} to the parent peer_fsm, a handler processes identifies itself with a {handler, pid()} message, and the peer_fsm monitors on this to be able to send a notification to the transport if the handler dies before sending an answer. --- lib/diameter/src/base/diameter_peer_fsm.erl | 138 +++++++++++++++++----------- 1 file changed, 84 insertions(+), 54 deletions(-) (limited to 'lib/diameter/src/base/diameter_peer_fsm.erl') diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index d394156367..d2af8fe425 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -129,6 +129,7 @@ %% the request was sent explicitly with %% diameter:call/4. strict :: boolean(), + ack = false :: boolean(), length_errors :: exit | handle | discard, incoming_maxlen :: integer() | infinity}). @@ -235,7 +236,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> Tmo = proplists:get_value(capx_timeout, Opts, ?CAPX_TIMEOUT), Strictness = proplists:get_value(capx_strictness, Opts, true), - OnLengthErr = proplists:get_value(length_errors, Opts, exit), + LengthErr = proplists:get_value(length_errors, Opts, exit), {TPid, Addrs} = start_transport(T, Rest, Svc), @@ -247,7 +248,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> dictionary = Dict0, mode = M, service = svc(Svc, Addrs), - length_errors = OnLengthErr, + length_errors = LengthErr, strict = Strictness, incoming_maxlen = Maxlen}. %% The transport returns its local ip addresses so that different @@ -442,10 +443,18 @@ transition({connection_timeout = T, TPid}, transition({connection_timeout, _}, _) -> ok; +%% Requests for acknowledgements to the transport. +transition({diameter, ack}, S) -> + S#state{ack = true}; + %% Incoming message from the transport. -transition({diameter, {recv, MsgT}}, S) -> - {Msg, NPid} = msg(MsgT), - incoming(recv(Msg, S), NPid, S); +transition({diameter, {recv, Msg}}, S) -> + incoming(recv(Msg, S), S); + +%% Handler of an incoming request is telling of its existence. +transition({handler, Pid}, _) -> + put_route(Pid), + ok; %% Timeout when still in the same state ... transition({timeout = T, PS}, #state{state = PS}) -> @@ -459,7 +468,7 @@ transition({timeout, _}, _) -> transition({send, Msg}, S) -> outgoing(Msg, S); transition({send, Msg, Route}, S) -> - put_route(Route), + route_outgoing(Route), outgoing(Msg, S); %% Request for graceful shutdown at remove_transport, stop_service of @@ -488,12 +497,13 @@ transition({'DOWN', _, process, WPid, _}, transition({'DOWN', _, process, TPid, _}, #state{transport = TPid} = S) -> - start_next(S); + start_next(S#state{ack = false}); %% Transport has died after connection timeout, or handler process has %% died. -transition({'DOWN', _, process, Pid, _}, _) -> - erase_route(Pid), +transition({'DOWN', _, process, Pid, _}, #state{transport = TPid}) -> + is_reference(erase_route(Pid)) + andalso send(TPid, false), %% answer not forthcoming ok; %% State query. @@ -503,37 +513,56 @@ transition({state, Pid}, #state{state = S, transport = TPid}) -> %% Crash on anything unexpected. -%% put_route/1 -%% +%% route_outgoing/1 + %% Map identifiers in an outgoing request to be able to lookup the %% handler process when the answer is received. - -put_route({Pid, Ref, Seqs}) -> +route_outgoing({Pid, Ref, Seqs}) -> %% request MRef = monitor(process, Pid), put(Pid, Seqs), - put(Seqs, {Pid, Ref, MRef}). + put(Seqs, {Pid, Ref, MRef}); -%% get_route/1 +%% Remove a mapping made for an incoming request. +route_outgoing(Pid) + when is_pid(Pid) -> %% answer + MRef = erase_route(Pid), + undefined == MRef orelse demonitor(MRef). -get_route(#diameter_packet{header = #diameter_header{is_request = false}} - = Pkt) -> +%% put_route/1 + +%% Monitor on a handler process for an incoming request. +put_route(Pid) -> + MRef = monitor(process, Pid), + put(Pid, MRef). + +%% get_route/2 + +%% incoming answer +get_route(_, #diameter_packet{header = #diameter_header{is_request = false}} + = Pkt) -> Seqs = diameter_codec:sequence_numbers(Pkt), case erase(Seqs) of {Pid, Ref, MRef} -> demonitor(MRef), erase(Pid), {Pid, Ref, self()}; - undefined -> + undefined -> %% request unknown false end; -get_route(_) -> - false. +%% incoming request +get_route(Ack, _) -> + Ack. %% erase_route/1 erase_route(Pid) -> - erase(erase(Pid)). + case erase(Pid) of + {_,_} = Seqs -> + erase(Seqs); + T -> + T + end. %% capx/1 @@ -610,26 +639,26 @@ encode(Rec, Dict) -> diameter_codec:encode(Dict, #diameter_packet{header = Hdr, msg = Rec}). -%% incoming/3 +%% incoming/2 -incoming({recv, Name, Pkt}, NPid, #state{parent = Pid} = S) -> - Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid}, +incoming({recv = T, Name, Pkt}, #state{parent = Pid, ack = Ack} = S) -> + Pid ! {T, self(), get_route(Ack, Pkt), Name, Pkt}, rcv(Name, Pkt, S); -incoming(T, false, _) -> - T; - -incoming(T, NPid, _) -> - NPid ! {diameter, discard}, - T. +incoming(#diameter_header{is_request = R}, #state{transport = TPid, + ack = Ack}) -> + R andalso Ack andalso send(TPid, false), + ok; -%% msg/1 +incoming(<<_:32, 1:1, _/bits>>, #state{ack = true} = S) -> + send(S#state.transport, false), + ok; -msg({_,_} = T) -> - T; +incoming(<<_/bits>>, _) -> + ok; -msg(Msg) -> - {Msg, false}. +incoming(T, _) -> + T. %% recv/2 @@ -654,18 +683,19 @@ recv1(_, #diameter_packet{header = H, bin = Bin}, #state{incoming_maxlen = M}) when M < size(Bin) -> - invalid(false, incoming_maxlen_exceeded, {size(Bin), H}); + invalid(false, incoming_maxlen_exceeded, {size(Bin), H}), + H; %% Ignore anything but an expected CER/CEA if so configured. This is %% non-standard behaviour. -recv1(Name, _, #state{state = {'Wait-CEA', _, _}, - strict = false}) +recv1(Name, #diameter_packet{header = H}, #state{state = {'Wait-CEA', _, _}, + strict = false}) when Name /= 'CEA' -> - ok; -recv1(Name, _, #state{state = recv_CER, - strict = false}) + H; +recv1(Name, #diameter_packet{header = H}, #state{state = recv_CER, + strict = false}) when Name /= 'CER' -> - ok; + H; %% Incoming request after outgoing DPR: discard. Don't discard DPR, so %% both ends don't do so when sending simultaneously. @@ -673,13 +703,15 @@ recv1(Name, #diameter_packet{header = #diameter_header{is_request = true} = H}, #state{dpr = {_,_,_}}) when Name /= 'DPR' -> - invalid(false, recv_after_outgoing_dpr, H); + invalid(false, recv_after_outgoing_dpr, H), + H; %% Incoming request after incoming DPR: discard. recv1(_, #diameter_packet{header = #diameter_header{is_request = true} = H}, #state{dpr = true}) -> - invalid(false, recv_after_incoming_dpr, H); + invalid(false, recv_after_incoming_dpr, H), + H; %% DPA with identifier mismatch, or in response to a DPR initiated by %% the service. @@ -716,10 +748,12 @@ recv(#diameter_header{} #diameter_packet{bin = Bin}, #state{length_errors = E}) -> T = {size(Bin), bit_size(Bin) rem 8, H}, - invalid(E, message_length_mismatch, T); + invalid(E, message_length_mismatch, T), + Bin; recv(false, #diameter_packet{bin = Bin}, #state{length_errors = E}) -> - invalid(E, truncated_header, Bin). + invalid(E, truncated_header, Bin), + Bin. %% Note that counters here only count discarded messages. invalid(E, Reason, T) -> @@ -775,14 +809,10 @@ rcv('DPA' = N, diameter_peer:close(TPid), {stop, N}; -%% Ignore anything else, an unsolicited DPA in particular. Note that -%% dpa_timeout deals with the case in which the peer sends the wrong -%% identifiers in DPA. -rcv(N, #diameter_packet{header = H}, _) - when N == 'CER'; - N == 'CEA'; - N == 'DPR'; - N == 'DPA' -> +%% Ignore an unsolicited DPA in particular. Note that dpa_timeout +%% deals with the case in which the peer sends the wrong identifiers +%% in DPA. +rcv('DPA' = N, #diameter_packet{header = H}, _) -> ?LOG(ignored, N), %% Note that these aren't counted in the normal recv counter. diameter_stats:incr({diameter_codec:msg_id(H), recv, ignored}), -- cgit v1.2.3