aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl138
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl80
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl37
3 files changed, 136 insertions, 119 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index d394156367..d2af8fe425 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -129,6 +129,7 @@
%% the request was sent explicitly with
%% diameter:call/4.
strict :: boolean(),
+ ack = false :: boolean(),
length_errors :: exit | handle | discard,
incoming_maxlen :: integer() | infinity}).
@@ -235,7 +236,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) ->
Tmo = proplists:get_value(capx_timeout, Opts, ?CAPX_TIMEOUT),
Strictness = proplists:get_value(capx_strictness, Opts, true),
- OnLengthErr = proplists:get_value(length_errors, Opts, exit),
+ LengthErr = proplists:get_value(length_errors, Opts, exit),
{TPid, Addrs} = start_transport(T, Rest, Svc),
@@ -247,7 +248,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) ->
dictionary = Dict0,
mode = M,
service = svc(Svc, Addrs),
- length_errors = OnLengthErr,
+ length_errors = LengthErr,
strict = Strictness,
incoming_maxlen = Maxlen}.
%% The transport returns its local ip addresses so that different
@@ -442,10 +443,18 @@ transition({connection_timeout = T, TPid},
transition({connection_timeout, _}, _) ->
ok;
+%% Requests for acknowledgements to the transport.
+transition({diameter, ack}, S) ->
+ S#state{ack = true};
+
%% Incoming message from the transport.
-transition({diameter, {recv, MsgT}}, S) ->
- {Msg, NPid} = msg(MsgT),
- incoming(recv(Msg, S), NPid, S);
+transition({diameter, {recv, Msg}}, S) ->
+ incoming(recv(Msg, S), S);
+
+%% Handler of an incoming request is telling of its existence.
+transition({handler, Pid}, _) ->
+ put_route(Pid),
+ ok;
%% Timeout when still in the same state ...
transition({timeout = T, PS}, #state{state = PS}) ->
@@ -459,7 +468,7 @@ transition({timeout, _}, _) ->
transition({send, Msg}, S) ->
outgoing(Msg, S);
transition({send, Msg, Route}, S) ->
- put_route(Route),
+ route_outgoing(Route),
outgoing(Msg, S);
%% Request for graceful shutdown at remove_transport, stop_service of
@@ -488,12 +497,13 @@ transition({'DOWN', _, process, WPid, _},
transition({'DOWN', _, process, TPid, _},
#state{transport = TPid}
= S) ->
- start_next(S);
+ start_next(S#state{ack = false});
%% Transport has died after connection timeout, or handler process has
%% died.
-transition({'DOWN', _, process, Pid, _}, _) ->
- erase_route(Pid),
+transition({'DOWN', _, process, Pid, _}, #state{transport = TPid}) ->
+ is_reference(erase_route(Pid))
+ andalso send(TPid, false), %% answer not forthcoming
ok;
%% State query.
@@ -503,37 +513,56 @@ transition({state, Pid}, #state{state = S, transport = TPid}) ->
%% Crash on anything unexpected.
-%% put_route/1
-%%
+%% route_outgoing/1
+
%% Map identifiers in an outgoing request to be able to lookup the
%% handler process when the answer is received.
-
-put_route({Pid, Ref, Seqs}) ->
+route_outgoing({Pid, Ref, Seqs}) -> %% request
MRef = monitor(process, Pid),
put(Pid, Seqs),
- put(Seqs, {Pid, Ref, MRef}).
+ put(Seqs, {Pid, Ref, MRef});
-%% get_route/1
+%% Remove a mapping made for an incoming request.
+route_outgoing(Pid)
+ when is_pid(Pid) -> %% answer
+ MRef = erase_route(Pid),
+ undefined == MRef orelse demonitor(MRef).
-get_route(#diameter_packet{header = #diameter_header{is_request = false}}
- = Pkt) ->
+%% put_route/1
+
+%% Monitor on a handler process for an incoming request.
+put_route(Pid) ->
+ MRef = monitor(process, Pid),
+ put(Pid, MRef).
+
+%% get_route/2
+
+%% incoming answer
+get_route(_, #diameter_packet{header = #diameter_header{is_request = false}}
+ = Pkt) ->
Seqs = diameter_codec:sequence_numbers(Pkt),
case erase(Seqs) of
{Pid, Ref, MRef} ->
demonitor(MRef),
erase(Pid),
{Pid, Ref, self()};
- undefined ->
+ undefined -> %% request unknown
false
end;
-get_route(_) ->
- false.
+%% incoming request
+get_route(Ack, _) ->
+ Ack.
%% erase_route/1
erase_route(Pid) ->
- erase(erase(Pid)).
+ case erase(Pid) of
+ {_,_} = Seqs ->
+ erase(Seqs);
+ T ->
+ T
+ end.
%% capx/1
@@ -610,26 +639,26 @@ encode(Rec, Dict) ->
diameter_codec:encode(Dict, #diameter_packet{header = Hdr,
msg = Rec}).
-%% incoming/3
+%% incoming/2
-incoming({recv, Name, Pkt}, NPid, #state{parent = Pid} = S) ->
- Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid},
+incoming({recv = T, Name, Pkt}, #state{parent = Pid, ack = Ack} = S) ->
+ Pid ! {T, self(), get_route(Ack, Pkt), Name, Pkt},
rcv(Name, Pkt, S);
-incoming(T, false, _) ->
- T;
-
-incoming(T, NPid, _) ->
- NPid ! {diameter, discard},
- T.
+incoming(#diameter_header{is_request = R}, #state{transport = TPid,
+ ack = Ack}) ->
+ R andalso Ack andalso send(TPid, false),
+ ok;
-%% msg/1
+incoming(<<_:32, 1:1, _/bits>>, #state{ack = true} = S) ->
+ send(S#state.transport, false),
+ ok;
-msg({_,_} = T) ->
- T;
+incoming(<<_/bits>>, _) ->
+ ok;
-msg(Msg) ->
- {Msg, false}.
+incoming(T, _) ->
+ T.
%% recv/2
@@ -654,18 +683,19 @@ recv1(_,
#diameter_packet{header = H, bin = Bin},
#state{incoming_maxlen = M})
when M < size(Bin) ->
- invalid(false, incoming_maxlen_exceeded, {size(Bin), H});
+ invalid(false, incoming_maxlen_exceeded, {size(Bin), H}),
+ H;
%% Ignore anything but an expected CER/CEA if so configured. This is
%% non-standard behaviour.
-recv1(Name, _, #state{state = {'Wait-CEA', _, _},
- strict = false})
+recv1(Name, #diameter_packet{header = H}, #state{state = {'Wait-CEA', _, _},
+ strict = false})
when Name /= 'CEA' ->
- ok;
-recv1(Name, _, #state{state = recv_CER,
- strict = false})
+ H;
+recv1(Name, #diameter_packet{header = H}, #state{state = recv_CER,
+ strict = false})
when Name /= 'CER' ->
- ok;
+ H;
%% Incoming request after outgoing DPR: discard. Don't discard DPR, so
%% both ends don't do so when sending simultaneously.
@@ -673,13 +703,15 @@ recv1(Name,
#diameter_packet{header = #diameter_header{is_request = true} = H},
#state{dpr = {_,_,_}})
when Name /= 'DPR' ->
- invalid(false, recv_after_outgoing_dpr, H);
+ invalid(false, recv_after_outgoing_dpr, H),
+ H;
%% Incoming request after incoming DPR: discard.
recv1(_,
#diameter_packet{header = #diameter_header{is_request = true} = H},
#state{dpr = true}) ->
- invalid(false, recv_after_incoming_dpr, H);
+ invalid(false, recv_after_incoming_dpr, H),
+ H;
%% DPA with identifier mismatch, or in response to a DPR initiated by
%% the service.
@@ -716,10 +748,12 @@ recv(#diameter_header{}
#diameter_packet{bin = Bin},
#state{length_errors = E}) ->
T = {size(Bin), bit_size(Bin) rem 8, H},
- invalid(E, message_length_mismatch, T);
+ invalid(E, message_length_mismatch, T),
+ Bin;
recv(false, #diameter_packet{bin = Bin}, #state{length_errors = E}) ->
- invalid(E, truncated_header, Bin).
+ invalid(E, truncated_header, Bin),
+ Bin.
%% Note that counters here only count discarded messages.
invalid(E, Reason, T) ->
@@ -775,14 +809,10 @@ rcv('DPA' = N,
diameter_peer:close(TPid),
{stop, N};
-%% Ignore anything else, an unsolicited DPA in particular. Note that
-%% dpa_timeout deals with the case in which the peer sends the wrong
-%% identifiers in DPA.
-rcv(N, #diameter_packet{header = H}, _)
- when N == 'CER';
- N == 'CEA';
- N == 'DPR';
- N == 'DPA' ->
+%% Ignore an unsolicited DPA in particular. Note that dpa_timeout
+%% deals with the case in which the peer sends the wrong identifiers
+%% in DPA.
+rcv('DPA' = N, #diameter_packet{header = H}, _) ->
?LOG(ignored, N),
%% Note that these aren't counted in the normal recv counter.
diameter_stats:incr({diameter_codec:msg_id(H), recv, ignored}),
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index bc1ccf4feb..96f3a307f9 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -30,7 +30,7 @@
-export([send_request/4]).
%% towards diameter_watchdog
--export([receive_message/6]).
+-export([receive_message/5]).
%% towards diameter_peer_fsm and diameter_watchdog
-export([incr/4,
@@ -93,7 +93,7 @@
packet :: #diameter_packet{} | undefined}). %% of request
%% ---------------------------------------------------------------------------
-%% # make_recvdata/1
+%% make_recvdata/1
%% ---------------------------------------------------------------------------
make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) ->
@@ -206,42 +206,36 @@ incr_rc(Dir, Pkt, TPid, Dict0) ->
incr_rc(Dir, Pkt, TPid, {Dict0, Dict0, Dict0}).
%% ---------------------------------------------------------------------------
-%% # receive_message/6
+%% receive_message/5
%%
-%% Handle an incoming Diameter message.
+%% Handle an incoming Diameter message in a watchdog process.
%% ---------------------------------------------------------------------------
-%% Handle an incoming Diameter message in the watchdog process.
-
-receive_message(TPid, Route, Pkt, false, Dict0, RecvData) ->
- incoming(TPid, Route, Pkt, Dict0, RecvData);
-
-receive_message(TPid, Route, Pkt, NPid, Dict0, RecvData) ->
- NPid ! {diameter, incoming(TPid, Route, Pkt, Dict0, RecvData)}.
-
-%% incoming/4
-
-incoming(TPid, Route, Pkt, Dict0, RecvData)
- when is_pid(TPid) ->
+-spec receive_message(pid(), Route, #diameter_packet{}, module(), #recvdata{})
+ -> pid()
+ | boolean()
+ when Route :: {Handler, RequestRef, Seqs}
+ | Ack,
+ Handler :: pid(),
+ RequestRef :: reference(),
+ Seqs :: {0..16#FFFFFFFF, 0..16#FFFFFFFF},
+ Ack :: boolean().
+
+receive_message(TPid, Route, Pkt, Dict0, RecvData) ->
#diameter_packet{header = #diameter_header{is_request = R}} = Pkt,
recv(R, Route, TPid, Pkt, Dict0, RecvData).
%% recv/6
%% Incoming request ...
-recv(true, false, TPid, Pkt, Dict0, T) ->
- try
- {request, spawn_request(TPid, Pkt, Dict0, T)}
- catch
- error: system_limit = E -> %% discard
- ?LOG(error, E),
- discard
- end;
+recv(true, Ack, TPid, Pkt, Dict0, T)
+ when is_boolean(Ack) ->
+ spawn_request(Ack, TPid, Pkt, Dict0, T);
%% ... answer to known request ...
recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) ->
Pid ! {answer, Ref, TPid, Dict0, Pkt},
- {answer, Pid};
+ true;
%% Note that failover could have happened prior to this message being
%% received and triggering failback. That is, both a failover message
@@ -256,23 +250,27 @@ recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) ->
recv(false, false, TPid, Pkt, _, _) ->
?LOG(discarded, Pkt#diameter_packet.header),
incr(TPid, {{unknown, 0}, recv, discarded}),
- discard.
+ false.
-%% spawn_request/4
+%% spawn_request/5
-spawn_request(TPid, Pkt, Dict0, {Opts, RecvData}) ->
- spawn_request(TPid, Pkt, Dict0, Opts, RecvData);
-spawn_request(TPid, Pkt, Dict0, RecvData) ->
- spawn_request(TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData).
+spawn_request(Ack, TPid, Pkt, Dict0, {Opts, RecvData}) ->
+ spawn_request(Ack, TPid, Pkt, Dict0, Opts, RecvData);
+spawn_request(Ack, TPid, Pkt, Dict0, RecvData) ->
+ spawn_request(Ack, TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData).
-spawn_request(TPid, Pkt, Dict0, Opts, RecvData) ->
- spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts).
+spawn_request(Ack, TPid, Pkt, Dict0, Opts, RecvData) ->
+ spawn_opt(fun() ->
+ recv_request(Ack, TPid, Pkt, Dict0, RecvData)
+ end,
+ Opts).
%% ---------------------------------------------------------------------------
-%% recv_request/4
+%% recv_request/5
%% ---------------------------------------------------------------------------
-recv_request(TPid,
+recv_request(Ack,
+ TPid,
#diameter_packet{header = #diameter_header{application_id = Id}}
= Pkt,
Dict0,
@@ -280,6 +278,7 @@ recv_request(TPid,
apps = Apps,
codec = Opts}
= RecvData) ->
+ Ack andalso (TPid ! {handler, self()}),
diameter_codec:setopts([{common_dictionary, Dict0} | Opts]),
send_A(recv_R(diameter_service:find_incoming_app(PeerT, TPid, Id, Apps),
TPid,
@@ -511,7 +510,7 @@ send_A(T, TPid, {AppDict, Dict0} = DictT0, ReqPkt, EvalPktFs, EvalFs) ->
{MsgDict, Pkt} = reply(T, TPid, DictT0, EvalPktFs, ReqPkt),
incr(send, Pkt, TPid, AppDict),
incr_rc(send, Pkt, TPid, {MsgDict, AppDict, Dict0}), %% count outgoing
- send(TPid, Pkt),
+ send(TPid, Pkt, _Route = self()),
lists:foreach(fun diameter_lib:eval/1, EvalFs).
%% answer/6
@@ -1207,7 +1206,7 @@ x(T) ->
exit(T).
%% ---------------------------------------------------------------------------
-%% # send_request/4
+%% send_request/4
%%
%% Handle an outgoing Diameter request.
%% ---------------------------------------------------------------------------
@@ -1296,7 +1295,7 @@ mo(T, _) ->
?ERROR({invalid_option, T}).
%% ---------------------------------------------------------------------------
-%% # send_request/6
+%% send_request/6
%% ---------------------------------------------------------------------------
%% Send an outgoing request in its dedicated process.
@@ -1745,11 +1744,6 @@ recv(TPid, Pid, TRef, {LocalTRef, MRef}) ->
exit({timeout, LocalTRef, TPid} = T)
end.
-%% send/2
-
-send(Pid, Pkt) ->
- Pid ! {send, Pkt}.
-
%% send/3
send(Pid, Pkt, Route) ->
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index 4484b7ee2c..a2eb661870 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -283,7 +283,7 @@ event(Msg,
?LOG(transition, {From, To}).
data(Msg, TPid, reopen, okay) ->
- {recv, TPid, false, 'DWA', _Pkt, _NPid} = Msg, %% assert
+ {recv, TPid, _, 'DWA', _Pkt} = Msg, %% assert
{TPid, T} = eraser(open),
[T];
@@ -302,6 +302,8 @@ tpid(_, Pid)
tpid(Pid, _) ->
Pid.
+%% send/2
+
send(Pid, T) ->
Pid ! T.
@@ -447,14 +449,15 @@ transition({'DOWN', _, process, TPid, _Reason} = D,
end;
%% Incoming message.
-transition({recv, TPid, Route, Name, Pkt, NPid},
+transition({recv, TPid, Route, Name, Pkt},
#watchdog{transport = TPid}
= S) ->
- try
- incoming(Name, Pkt, NPid, S)
- catch
+ try incoming(Route, Name, Pkt, S) of
#watchdog{dictionary = Dict0, receive_data = T} = NS ->
- diameter_traffic:receive_message(TPid, Route, Pkt, NPid, Dict0, T),
+ diameter_traffic:receive_message(TPid, Route, Pkt, Dict0, T),
+ NS
+ catch
+ #watchdog{} = NS ->
NS
end;
@@ -586,23 +589,13 @@ send_watchdog(#watchdog{pending = false,
%% incoming/4
-incoming(Name, Pkt, false, S) ->
- recv(Name, Pkt, S);
-
-incoming(Name, Pkt, NPid, S) ->
- NS = recv(Name, Pkt, S),
- NPid ! {diameter, discard},
- NS.
-
-%% recv/3
-
-recv(Name, Pkt, S) ->
- try rcv(Name, Pkt, rcv(Name, S)) of
- #watchdog{} = NS ->
- throw(NS)
+incoming(Route, Name, Pkt, S) ->
+ try rcv(Name, S) of
+ NS -> rcv(Name, Pkt, NS)
catch
- #watchdog{} = NS -> %% throwaway
- NS
+ #watchdog{transport = TPid} = NS when Route -> %% incoming request
+ send(TPid, {send, false}), %% requiring ack
+ throw(NS)
end.
%% rcv/3