diff options
Diffstat (limited to 'lib/diameter/src/base')
-rw-r--r-- | lib/diameter/src/base/diameter.erl | 1 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_config.erl | 4 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 168 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 12 |
4 files changed, 151 insertions, 34 deletions
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index 1bbdf6e34d..a45d84f95b 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -343,6 +343,7 @@ call(SvcName, App, Message) -> | {capabilities_cb, evaluable()} | {capx_timeout, 'Unsigned32'()} | {disconnect_cb, evaluable()} + | {dpa_timeout, 'Unsigned32'()} | {length_errors, exit | handle | discard} | {connect_timer, 'Unsigned32'()} | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}} diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index c0a4f7df69..aa4d6e5a20 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -532,7 +532,9 @@ opt({applications, As}) -> opt({capabilities, Os}) -> is_list(Os) andalso ok == encode_CER(Os); -opt({capx_timeout, Tmo}) -> +opt({K, Tmo}) + when K == capx_timeout; + K == dpa_timeout -> ?IS_UINT32(Tmo); opt({length_errors, T}) -> diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index ee6e7dd89e..9ff6845ab7 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2014. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -63,6 +63,7 @@ %% Keys in process dictionary. -define(CB_KEY, cb). %% capabilities callback -define(DPR_KEY, dpr). %% disconnect callback +-define(DPA_KEY, dpa). %% timeout for DPA reception -define(REF_KEY, ref). %% transport_ref() -define(Q_KEY, q). %% transport start queue -define(START_KEY, start). %% start of connected transport @@ -107,7 +108,8 @@ transport :: pid(), %% transport process dictionary :: module(), %% common dictionary service :: #diameter_service{}, - dpr = false :: false | {uint32(), uint32()}, + dpr = false :: false | {uint32(), uint32()} %% set in old code + | {boolean(), uint32(), uint32()}, %% | hop by hop and end to end identifiers length_errors :: exit | handle | discard}). @@ -187,6 +189,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {Mask, Nodes, Dict0, Svc}}) -> putr(?REF_KEY, Ref), putr(?SEQUENCE_KEY, Mask), putr(?RESTRICT_KEY, Nodes), + putr(?DPA_KEY, proplists:get_value(dpa_timeout, Opts, ?DPA_TIMEOUT)), Tmo = proplists:get_value(capx_timeout, Opts, ?EVENT_TIMEOUT), OnLengthErr = proplists:get_value(length_errors, Opts, exit), @@ -212,9 +215,12 @@ wait(Ref, Pid) -> Ref -> ok; {'DOWN', _, process, Pid, _} = D -> - exit({shutdown, D}) + x(D) end. +x(T) -> + exit({shutdown, T}). + start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) -> Addrs0 = LCaps#diameter_caps.host_ip_address, start_transport(Addrs0, {T, Opts, Svc}). @@ -226,7 +232,7 @@ start_transport(Addrs0, T) -> q_next(TPid, Addrs0, Tmo, Data), {TPid, Addrs}; {error, No} -> - exit({shutdown, {no_connection, No}}) + x({no_connection, No}) end. svc(#diameter_service{capabilities = LCaps0} = Svc, Addrs) -> @@ -333,6 +339,9 @@ eraser(Key) -> %% transition/2 +transition(T, #state{dpr = {Hid, Eid}} = S) -> %% DPR sent from old code + transition(T, S#state{dpr = {false, Hid, Eid}}); + %% Connection to peer. transition({diameter, {TPid, connected, Remote}}, #state{transport = TPid, @@ -397,9 +406,8 @@ transition({timeout, _}, _) -> ok; %% Outgoing message. -transition({send, Msg}, #state{transport = TPid}) -> - send(TPid, Msg), - ok; +transition({send, Msg}, S) -> + outgoing(Msg, S); %% Request for graceful shutdown at remove_transport, stop_service of %% application shutdown. @@ -516,12 +524,9 @@ encode(Rec, Dict) -> recv(#diameter_packet{header = #diameter_header{} = Hdr} = Pkt, - #state{parent = Pid, - dictionary = Dict0} + #state{dictionary = Dict0} = S) -> - Name = diameter_codec:msg_name(Dict0, Hdr), - Pid ! {recv, self(), Name, Pkt}, - rcv(Name, Pkt, S); + recv1(diameter_codec:msg_name(Dict0, Hdr), Pkt, S); recv(#diameter_packet{header = undefined, bin = Bin} @@ -532,6 +537,35 @@ recv(#diameter_packet{header = undefined, recv(Bin, S) -> recv(#diameter_packet{bin = Bin}, S). +%% recv1/3 + +%% Incoming request after DPR has been sent: discard. Don't discard +%% DPR, so both ends don't do so when sending simultaneously. +recv1(Name, + #diameter_packet{header = #diameter_header{is_request = true} = H}, + #state{dpr = {_,_,_}}) + when Name /= 'DPR' -> + invalid(false, recv_after_dpr, H); + +%% DPA with identifier mismatch, or in response to a DPR initiated by +%% the service. +recv1('DPA' = N, + #diameter_packet{header = #diameter_header{hop_by_hop_id = Hid, + end_to_end_id = Eid}} + = Pkt, + #state{dpr = {X,H,E}} + = S) + when H /= Hid; + E /= Eid; + not X -> + rcv(N, Pkt, S); + +%% Any other message with a header and no length errors: send to the +%% parent. +recv1(Name, Pkt, #state{parent = Pid} = S) -> + Pid ! {recv, self(), Name, Pkt}, + rcv(Name, Pkt, S). + %% recv/3 recv(#diameter_header{length = Len} @@ -588,7 +622,7 @@ rcv(Name, _, #state{state = PS}) rcv('DPR' = N, Pkt, S) -> handle_request(N, Pkt, S); -%% DPA in response to DPR and with the expected identifiers. +%% DPA in response to DPR, with the expected identifiers. rcv('DPA' = N, #diameter_packet{header = #diameter_header{end_to_end_id = Eid, hop_by_hop_id = Hid} @@ -596,10 +630,15 @@ rcv('DPA' = N, = Pkt, #state{dictionary = Dict0, transport = TPid, - dpr = {Hid, Eid}}) -> + dpr = {X, Hid, Eid}}) -> ?LOG(recv, N), - incr(recv, H, Dict0), - incr_rc(recv, diameter_codec:decode(Dict0, Pkt), Dict0), + X orelse begin + %% Only count DPA in response to a DPR sent by the + %% service: explicit DPR is counted in the same way + %% as other explicitly sent requests. + incr(recv, H, Dict0), + incr_rc(recv, diameter_codec:decode(Dict0, Pkt), Dict0) + end, diameter_peer:close(TPid), {stop, N}; @@ -637,9 +676,59 @@ incr_error(Dir, Pkt, Dict0) -> %% Msg here could be a #diameter_packet or a binary depending on who's %% sending. In particular, the watchdog will send DWR as a binary %% while messages coming from clients will be in a #diameter_packet. + send(Pid, Msg) -> diameter_peer:send(Pid, Msg). +%% outgoing/2 + +%% Explicit DPR. +outgoing(#diameter_packet{header = #diameter_header{application_id = 0, + cmd_code = 282, + is_request = true} + = H} + = Pkt, + #state{dpr = T, + parent = Pid} + = S) -> + if T == false -> + inform_dpr(Pid), + send_dpr(true, Pkt, dpa_timeout(), S); + true -> + invalid(false, dpr_after_dpr, H) %% already sent: discard + end; + +%% Explict CER or DWR: discard. These are sent by us. +outgoing(#diameter_packet{header = #diameter_header{application_id = 0, + cmd_code = C, + is_request = true} + = H}, + _) + when 257 == C; %% CER + 280 == C -> %% DWR + invalid(false, invalid_request, H); + +%% DPR not sent: send. +outgoing(Msg, #state{transport = TPid, dpr = false}) -> + send(TPid, Msg), + ok; + +%% Outgoing answer: send. +outgoing(#diameter_packet{header = #diameter_header{is_request = false}} + = Pkt, + #state{transport = TPid}) -> + send(TPid, Pkt), + ok; + +%% Outgoing request: discard. +outgoing(Msg, #state{dpr = {_,_,_}}) -> + invalid(false, send_after_dpr, header(Msg)). + +header(#diameter_packet{header = H}) -> + H; +header(Bin) -> %% DWR + diameter_codec:decode_header(Bin). + %% handle_request/3 %% %% Incoming CER or DPR. @@ -748,8 +837,11 @@ cea(CEA, RC, Dict0) -> post('CER' = T, RC, Pkt, S) -> {T, caps(S), {RC, Pkt}}; -post('DPR' = T, _, _, #state{parent = Pid}) -> - [fun(S) -> Pid ! {T, self()}, S end]. +post('DPR', _, _, #state{parent = Pid}) -> + [fun(S) -> inform_dpr(Pid), S end]. + +inform_dpr(Pid) -> + Pid ! {'DPR', self()}. %% tell watchdog to die with us rejected({capabilities_cb, _F, Reason}, T, S) -> rejected(Reason, T, S); @@ -1073,10 +1165,9 @@ dpr([CB|Rest], [Reason | _] = Args, S) -> dpr([], [Reason | _], S) -> send_dpr(Reason, [], S). --record(opts, {cause, timeout = ?DPA_TIMEOUT}). +-record(opts, {cause, timeout}). -send_dpr(Reason, Opts, #state{transport = TPid, - dictionary = Dict, +send_dpr(Reason, Opts, #state{dictionary = Dict, service = #diameter_service{capabilities = Caps}} = S) -> #opts{cause = Cause, timeout = Tmo} @@ -1085,24 +1176,37 @@ send_dpr(Reason, Opts, #state{transport = TPid, transport -> ?GOAWAY; _ -> ?REBOOT end, - timeout = ?DPA_TIMEOUT}, + timeout = dpa_timeout()}, Opts), #diameter_caps{origin_host = {OH, _}, origin_realm = {OR, _}} = Caps, - #diameter_packet{header = #diameter_header{end_to_end_id = Eid, - hop_by_hop_id = Hid}} - = Pkt - = encode(['DPR', {'Origin-Host', OH}, + Pkt = encode(['DPR', {'Origin-Host', OH}, {'Origin-Realm', OR}, {'Disconnect-Cause', Cause}], Dict), - incr(send, Pkt, Dict), + send_dpr(false, Pkt, Tmo, S). + +%% send_dpr/4 + +send_dpr(X, + #diameter_packet{header = #diameter_header{end_to_end_id = Eid, + hop_by_hop_id = Hid}} + = Pkt, + Tmo, + #state{transport = TPid, + dictionary = Dict} + = S) -> + %% Only count DPR sent by the service: explicit DPR is counted in + %% the same way as other explicitly sent requests. + X orelse incr(send, Pkt, Dict), send(TPid, Pkt), dpa_timer(Tmo), ?LOG(send, 'DPR'), - S#state{dpr = {Hid, Eid}}. + S#state{dpr = {X, Hid, Eid}}. + +%% opt/2 opt({timeout, Tmo}, Rec) when ?IS_TIMEOUT(Tmo) -> @@ -1125,6 +1229,14 @@ cause(N) -> dpa_timer(Tmo) -> erlang:send_after(Tmo, self(), dpa_timeout). +dpa_timeout() -> + dpa_timeout(getr(?DPA_KEY)). + +dpa_timeout(undefined) -> + ?DPA_TIMEOUT; +dpa_timeout(Tmo) -> + Tmo. + %% register_everywhere/1 %% %% Register a term and ensure it's not registered elsewhere. Note that diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 67715906e8..ff51c6dcf7 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -333,8 +333,9 @@ transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid, send(TPid, {T, self(), Reason}), S#watchdog{shutdown = true}; -%% Transport is telling us that DPA has been sent in response to DPR: -%% its death should lead to ours. +%% Transport is telling us that DPA has been sent in response to DPR, +%% or that DPR has been explicitly sent: transport death should lead +%% to ours. transition({'DPR', TPid}, #watchdog{transport = TPid} = S) -> S#watchdog{shutdown = true}; @@ -550,7 +551,7 @@ send_watchdog(#watchdog{pending = false, ?LOG(send, 'DWR'), S#watchdog{pending = true}. -%% Dont' count encode errors since we don't expect any on DWR/DWA. +%% Don't count encode errors since we don't expect any on DWR/DWA. %% recv/3 @@ -591,9 +592,10 @@ rcv('DWA', Pkt, #watchdog{transport = TPid, rcv(N, _, _) when N == 'CER'; N == 'CEA'; - N == 'DPR'; - N == 'DPA' -> + N == 'DPR' -> false; +%% DPR can be sent explicitly with diameter:call/4. Only the +%% corresponding DPAs arrive here. rcv(_, Pkt, #watchdog{transport = TPid, dictionary = Dict0, |