aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_peer_fsm.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base/diameter_peer_fsm.erl')
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl128
1 files changed, 103 insertions, 25 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 23bba701eb..9ff6845ab7 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -63,6 +63,7 @@
%% Keys in process dictionary.
-define(CB_KEY, cb). %% capabilities callback
-define(DPR_KEY, dpr). %% disconnect callback
+-define(DPA_KEY, dpa). %% timeout for DPA reception
-define(REF_KEY, ref). %% transport_ref()
-define(Q_KEY, q). %% transport start queue
-define(START_KEY, start). %% start of connected transport
@@ -107,7 +108,8 @@
transport :: pid(), %% transport process
dictionary :: module(), %% common dictionary
service :: #diameter_service{},
- dpr = false :: false | {uint32(), uint32()},
+ dpr = false :: false | {uint32(), uint32()} %% set in old code
+ | {boolean(), uint32(), uint32()},
%% | hop by hop and end to end identifiers
length_errors :: exit | handle | discard}).
@@ -187,6 +189,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {Mask, Nodes, Dict0, Svc}}) ->
putr(?REF_KEY, Ref),
putr(?SEQUENCE_KEY, Mask),
putr(?RESTRICT_KEY, Nodes),
+ putr(?DPA_KEY, proplists:get_value(dpa_timeout, Opts, ?DPA_TIMEOUT)),
Tmo = proplists:get_value(capx_timeout, Opts, ?EVENT_TIMEOUT),
OnLengthErr = proplists:get_value(length_errors, Opts, exit),
@@ -212,9 +215,12 @@ wait(Ref, Pid) ->
Ref ->
ok;
{'DOWN', _, process, Pid, _} = D ->
- exit({shutdown, D})
+ x(D)
end.
+x(T) ->
+ exit({shutdown, T}).
+
start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) ->
Addrs0 = LCaps#diameter_caps.host_ip_address,
start_transport(Addrs0, {T, Opts, Svc}).
@@ -226,7 +232,7 @@ start_transport(Addrs0, T) ->
q_next(TPid, Addrs0, Tmo, Data),
{TPid, Addrs};
{error, No} ->
- exit({shutdown, {no_connection, No}})
+ x({no_connection, No})
end.
svc(#diameter_service{capabilities = LCaps0} = Svc, Addrs) ->
@@ -333,6 +339,9 @@ eraser(Key) ->
%% transition/2
+transition(T, #state{dpr = {Hid, Eid}} = S) -> %% DPR sent from old code
+ transition(T, S#state{dpr = {false, Hid, Eid}});
+
%% Connection to peer.
transition({diameter, {TPid, connected, Remote}},
#state{transport = TPid,
@@ -398,8 +407,7 @@ transition({timeout, _}, _) ->
%% Outgoing message.
transition({send, Msg}, S) ->
- outgoing(Msg, S),
- ok;
+ outgoing(Msg, S);
%% Request for graceful shutdown at remove_transport, stop_service of
%% application shutdown.
@@ -535,10 +543,23 @@ recv(Bin, S) ->
%% DPR, so both ends don't do so when sending simultaneously.
recv1(Name,
#diameter_packet{header = #diameter_header{is_request = true} = H},
- #state{dpr = {_,_}})
+ #state{dpr = {_,_,_}})
when Name /= 'DPR' ->
invalid(false, recv_after_dpr, H);
+%% DPA with identifier mismatch, or in response to a DPR initiated by
+%% the service.
+recv1('DPA' = N,
+ #diameter_packet{header = #diameter_header{hop_by_hop_id = Hid,
+ end_to_end_id = Eid}}
+ = Pkt,
+ #state{dpr = {X,H,E}}
+ = S)
+ when H /= Hid;
+ E /= Eid;
+ not X ->
+ rcv(N, Pkt, S);
+
%% Any other message with a header and no length errors: send to the
%% parent.
recv1(Name, Pkt, #state{parent = Pid} = S) ->
@@ -601,7 +622,7 @@ rcv(Name, _, #state{state = PS})
rcv('DPR' = N, Pkt, S) ->
handle_request(N, Pkt, S);
-%% DPA in response to DPR and with the expected identifiers.
+%% DPA in response to DPR, with the expected identifiers.
rcv('DPA' = N,
#diameter_packet{header = #diameter_header{end_to_end_id = Eid,
hop_by_hop_id = Hid}
@@ -609,10 +630,15 @@ rcv('DPA' = N,
= Pkt,
#state{dictionary = Dict0,
transport = TPid,
- dpr = {Hid, Eid}}) ->
+ dpr = {X, Hid, Eid}}) ->
?LOG(recv, N),
- incr(recv, H, Dict0),
- incr_rc(recv, diameter_codec:decode(Dict0, Pkt), Dict0),
+ X orelse begin
+ %% Only count DPA in response to a DPR sent by the
+ %% service: explicit DPR is counted in the same way
+ %% as other explicitly sent requests.
+ incr(recv, H, Dict0),
+ incr_rc(recv, diameter_codec:decode(Dict0, Pkt), Dict0)
+ end,
diameter_peer:close(TPid),
{stop, N};
@@ -650,23 +676,52 @@ incr_error(Dir, Pkt, Dict0) ->
%% Msg here could be a #diameter_packet or a binary depending on who's
%% sending. In particular, the watchdog will send DWR as a binary
%% while messages coming from clients will be in a #diameter_packet.
+
send(Pid, Msg) ->
diameter_peer:send(Pid, Msg).
%% outgoing/2
+%% Explicit DPR.
+outgoing(#diameter_packet{header = #diameter_header{application_id = 0,
+ cmd_code = 282,
+ is_request = true}
+ = H}
+ = Pkt,
+ #state{dpr = T,
+ parent = Pid}
+ = S) ->
+ if T == false ->
+ inform_dpr(Pid),
+ send_dpr(true, Pkt, dpa_timeout(), S);
+ true ->
+ invalid(false, dpr_after_dpr, H) %% already sent: discard
+ end;
+
+%% Explict CER or DWR: discard. These are sent by us.
+outgoing(#diameter_packet{header = #diameter_header{application_id = 0,
+ cmd_code = C,
+ is_request = true}
+ = H},
+ _)
+ when 257 == C; %% CER
+ 280 == C -> %% DWR
+ invalid(false, invalid_request, H);
+
%% DPR not sent: send.
outgoing(Msg, #state{transport = TPid, dpr = false}) ->
- send(TPid, Msg);
+ send(TPid, Msg),
+ ok;
%% Outgoing answer: send.
outgoing(#diameter_packet{header = #diameter_header{is_request = false}}
= Pkt,
#state{transport = TPid}) ->
- send(TPid, Pkt);
+ send(TPid, Pkt),
+ ok;
%% Outgoing request: discard.
-outgoing(Msg, #state{dpr = {_,_}}) ->
+outgoing(Msg, #state{dpr = {_,_,_}}) ->
invalid(false, send_after_dpr, header(Msg)).
header(#diameter_packet{header = H}) ->
@@ -782,8 +837,11 @@ cea(CEA, RC, Dict0) ->
post('CER' = T, RC, Pkt, S) ->
{T, caps(S), {RC, Pkt}};
-post('DPR' = T, _, _, #state{parent = Pid}) ->
- [fun(S) -> Pid ! {T, self()}, S end].
+post('DPR', _, _, #state{parent = Pid}) ->
+ [fun(S) -> inform_dpr(Pid), S end].
+
+inform_dpr(Pid) ->
+ Pid ! {'DPR', self()}. %% tell watchdog to die with us
rejected({capabilities_cb, _F, Reason}, T, S) ->
rejected(Reason, T, S);
@@ -1107,10 +1165,9 @@ dpr([CB|Rest], [Reason | _] = Args, S) ->
dpr([], [Reason | _], S) ->
send_dpr(Reason, [], S).
--record(opts, {cause, timeout = ?DPA_TIMEOUT}).
+-record(opts, {cause, timeout}).
-send_dpr(Reason, Opts, #state{transport = TPid,
- dictionary = Dict,
+send_dpr(Reason, Opts, #state{dictionary = Dict,
service = #diameter_service{capabilities = Caps}}
= S) ->
#opts{cause = Cause, timeout = Tmo}
@@ -1119,24 +1176,37 @@ send_dpr(Reason, Opts, #state{transport = TPid,
transport -> ?GOAWAY;
_ -> ?REBOOT
end,
- timeout = ?DPA_TIMEOUT},
+ timeout = dpa_timeout()},
Opts),
#diameter_caps{origin_host = {OH, _},
origin_realm = {OR, _}}
= Caps,
- #diameter_packet{header = #diameter_header{end_to_end_id = Eid,
- hop_by_hop_id = Hid}}
- = Pkt
- = encode(['DPR', {'Origin-Host', OH},
+ Pkt = encode(['DPR', {'Origin-Host', OH},
{'Origin-Realm', OR},
{'Disconnect-Cause', Cause}],
Dict),
- incr(send, Pkt, Dict),
+ send_dpr(false, Pkt, Tmo, S).
+
+%% send_dpr/4
+
+send_dpr(X,
+ #diameter_packet{header = #diameter_header{end_to_end_id = Eid,
+ hop_by_hop_id = Hid}}
+ = Pkt,
+ Tmo,
+ #state{transport = TPid,
+ dictionary = Dict}
+ = S) ->
+ %% Only count DPR sent by the service: explicit DPR is counted in
+ %% the same way as other explicitly sent requests.
+ X orelse incr(send, Pkt, Dict),
send(TPid, Pkt),
dpa_timer(Tmo),
?LOG(send, 'DPR'),
- S#state{dpr = {Hid, Eid}}.
+ S#state{dpr = {X, Hid, Eid}}.
+
+%% opt/2
opt({timeout, Tmo}, Rec)
when ?IS_TIMEOUT(Tmo) ->
@@ -1159,6 +1229,14 @@ cause(N) ->
dpa_timer(Tmo) ->
erlang:send_after(Tmo, self(), dpa_timeout).
+dpa_timeout() ->
+ dpa_timeout(getr(?DPA_KEY)).
+
+dpa_timeout(undefined) ->
+ ?DPA_TIMEOUT;
+dpa_timeout(Tmo) ->
+ Tmo.
+
%% register_everywhere/1
%%
%% Register a term and ensure it's not registered elsewhere. Note that