aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl60
-rw-r--r--lib/diameter/src/base/diameter_service.erl18
-rw-r--r--lib/diameter/src/base/diameter_sup.erl4
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl174
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl28
5 files changed, 125 insertions, 159 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 996e75a8d3..0c24ebeb19 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -454,6 +454,9 @@ transition({timeout, _}, _) ->
%% Outgoing message.
transition({send, Msg}, S) ->
outgoing(Msg, S);
+transition({send, Route, Msg}, S) ->
+ put_route(Route),
+ outgoing(Msg, S);
%% Request for graceful shutdown at remove_transport, stop_service of
%% application shutdown.
@@ -483,8 +486,10 @@ transition({'DOWN', _, process, TPid, _},
= S) ->
start_next(S);
-%% Transport has died after connection timeout.
-transition({'DOWN', _, process, _, _}, _) ->
+%% Transport has died after connection timeout, or handler process has
+%% died.
+transition({'DOWN', _, process, Pid, _}, _) ->
+ erase_route(Pid),
ok;
%% State query.
@@ -494,6 +499,43 @@ transition({state, Pid}, #state{state = S, transport = TPid}) ->
%% Crash on anything unexpected.
+%% put_route/1
+%%
+%% Map identifiers in an outgoing request to be able to lookup the
+%% handler process when the answer is received.
+
+put_route(false) ->
+ ok;
+
+put_route({Pid, Ref, Seqs}) ->
+ MRef = monitor(process, Pid),
+ put(Pid, Seqs),
+ put(Seqs, {Pid, Ref, MRef}).
+
+%% get_route/1
+
+get_route(#diameter_packet{header = #diameter_header{is_request = false}}
+ = Pkt) ->
+ Seqs = diameter_codec:sequence_numbers(Pkt),
+ case erase(Seqs) of
+ {Pid, Ref, MRef} ->
+ demonitor(MRef),
+ erase(Pid),
+ {Pid, Ref, self()};
+ undefined ->
+ false
+ end;
+
+get_route(_) ->
+ false.
+
+%% erase_route/1
+
+erase_route(Pid) ->
+ erase(erase(Pid)).
+
+%% capx/1
+
capx(recv_CER) ->
'CER';
capx({'Wait-CEA', _, _}) ->
@@ -576,8 +618,7 @@ incoming({Msg, NPid}, S) ->
T
catch
{?MODULE, Name, Pkt} ->
- S#state.parent ! {recv, self(), Name, {Pkt, NPid}},
- rcv(Name, Pkt, S)
+ incoming(Name, Pkt, NPid, S)
end;
incoming(Msg, S) ->
@@ -585,10 +626,15 @@ incoming(Msg, S) ->
recv(Msg, S)
catch
{?MODULE, Name, Pkt} ->
- S#state.parent ! {recv, self(), Name, Pkt},
- rcv(Name, Pkt, S)
+ incoming(Name, Pkt, false, S)
end.
+%% incoming/4
+
+incoming(Name, Pkt, NPid, #state{parent = Pid} = S) ->
+ Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid},
+ rcv(Name, Pkt, S).
+
%% recv/2
recv(#diameter_packet{header = #diameter_header{} = Hdr}
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index ccf68f4d93..e4f77e3a24 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -1858,13 +1858,6 @@ eq(Any, Id, PeerId) ->
%% OctetString() can be specified as an iolist() so test for string
%% rather then term equality.
-%% transports/1
-
-transports(#state{watchdogT = WatchdogT}) ->
- ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'},
- [{'is_pid', '$1'}],
- ['$1']}]).
-
%% ---------------------------------------------------------------------------
%% # service_info/2
%% ---------------------------------------------------------------------------
@@ -1887,7 +1880,6 @@ transports(#state{watchdogT = WatchdogT}) ->
-define(ALL_INFO, [capabilities,
applications,
transport,
- pending,
options]).
%% The rest.
@@ -1981,7 +1973,6 @@ complete_info(Item, #state{service = Svc} = S) ->
applications -> info_apps(S);
transport -> info_transport(S);
options -> info_options(S);
- pending -> info_pending(S);
keys -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO;
all -> service_info(?ALL_INFO, S);
statistics -> info_stats(S);
@@ -2189,13 +2180,6 @@ info_apps(#state{service = #diameter_service{applications = Apps}}) ->
mk_app(#diameter_app{} = A) ->
lists:zip(record_info(fields, diameter_app), tl(tuple_to_list(A))).
-%% info_pending/1
-%%
-%% One entry for each outgoing request whose answer is outstanding.
-
-info_pending(#state{} = S) ->
- diameter_traffic:pending(transports(S)).
-
%% info_info/1
%%
%% Extract process_info from connections info.
diff --git a/lib/diameter/src/base/diameter_sup.erl b/lib/diameter/src/base/diameter_sup.erl
index 482289cb9a..01c51f0856 100644
--- a/lib/diameter/src/base/diameter_sup.erl
+++ b/lib/diameter/src/base/diameter_sup.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -42,7 +42,7 @@
-define(TABLES, [{diameter_sequence, [set]},
{diameter_service, [set, {keypos, 3}]},
- {diameter_request, [bag]},
+ {diameter_request, [set]},
{diameter_config, [bag, {keypos, 2}]}]).
%% start_link/0
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index 0720057fe5..a39e0502d1 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2013-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2013-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -30,7 +30,7 @@
-export([send_request/4]).
%% towards diameter_watchdog
--export([receive_message/4]).
+-export([receive_message/6]).
%% towards diameter_peer_fsm and diameter_watchdog
-export([incr/4,
@@ -40,8 +40,7 @@
%% towards diameter_service
-export([make_recvdata/1,
peer_up/1,
- peer_down/1,
- pending/1]).
+ peer_down/1]).
%% internal
-export([send/1, %% send from remote node
@@ -58,14 +57,12 @@
-define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests
-define(DEFAULT_SPAWN_OPTS, []).
-%% Table containing outgoing requests for which a reply has yet to be
-%% received.
+%% Table containing outgoing entries that live and die with
+%% peer_up/down. The name is historic, since the table used to contain
+%% information about outgoing requests for which an answer has yet to
+%% be received.
-define(REQUEST_TABLE, diameter_request).
-%% Workaround for dialyzer's lack of understanding of match specs.
--type match(T)
- :: T | '_' | '$1' | '$2' | '$3' | '$4'.
-
%% Record diameter:call/4 options are parsed into.
-record(options,
{filter = none :: diameter:peer_filter(),
@@ -73,7 +70,7 @@
timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF,
detach = false :: boolean()}).
-%% Term passed back to receive_message/4 with every incoming message.
+%% Term passed back to receive_message/6 with every incoming message.
-record(recvdata,
{peerT :: ets:tid(),
service_name :: diameter:service_name(),
@@ -88,12 +85,12 @@
%% Record stored in diameter_request for each outgoing request.
-record(request,
- {ref :: match(reference()), %% used to receive answer
- caller :: match(pid()), %% calling process
- handler :: match(pid()), %% request process
- transport :: match(pid()), %% peer process
- caps :: match(#diameter_caps{}), %% of connection
- packet :: match(#diameter_packet{})}). %% of request
+ {ref :: reference(), %% used to receive answer
+ caller :: pid() | undefined, %% calling process
+ handler :: pid(), %% request process
+ transport :: pid() | undefined, %% peer process
+ caps :: #diameter_caps{} | undefined, %% of connection
+ packet :: #diameter_packet{} | undefined}). %% of request
%% ---------------------------------------------------------------------------
%% # make_recvdata/1
@@ -115,12 +112,9 @@ make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) ->
%% ---------------------------------------------------------------------------
%% Start a process that dies with peer_down/1, on which request
-%% processes can monitor. The transport process started in
-%% diameter_peer_fsm could be used for this, but starting a new
-%% process here is easier: don't otherwise have access to the
-%% transport process here (TPid here is the diameter_peer_fsm
-%% process), and the need for a process to monitor came much later
-%% than the calls to peer_up/down.
+%% processes can monitor. There is no other process that dies with
+%% peer_down since failover doesn't imply the loss of transport in the
+%% case of a watchdog transition into state SUSPECT.
peer_up(TPid) ->
proc_lib:start(?MODULE, init, [TPid]).
@@ -212,54 +206,25 @@ incr_rc(Dir, Pkt, TPid, Dict0) ->
incr_rc(Dir, Pkt, TPid, {Dict0, Dict0, Dict0}).
%% ---------------------------------------------------------------------------
-%% pending/1
-%% ---------------------------------------------------------------------------
-
-pending(TPids) ->
- MatchSpec = [{{'$1',
- #request{caller = '$2',
- handler = '$3',
- transport = '$4',
- _ = '_'},
- '_'},
- [?ORCOND([{'==', T, '$4'} || T <- TPids])],
- [{{'$1', [{{caller, '$2'}},
- {{handler, '$3'}},
- {{transport, '$4'}}]}}]}],
-
- try
- ets:select(?REQUEST_TABLE, MatchSpec)
- catch
- error: badarg -> [] %% service has gone down
- end.
-
-%% ---------------------------------------------------------------------------
-%% # receive_message/4
+%% # receive_message/6
%%
%% Handle an incoming Diameter message.
%% ---------------------------------------------------------------------------
-%% Handle an incoming Diameter message in the watchdog process. This
-%% used to come through the service process but this avoids that
-%% becoming a bottleneck.
+%% Handle an incoming Diameter message in the watchdog process.
-receive_message(TPid, {Pkt, NPid}, Dict0, RecvData) ->
- NPid ! {diameter, incoming(TPid, Pkt, Dict0, RecvData)};
+receive_message(TPid, Route, Pkt, false, Dict0, RecvData) ->
+ incoming(TPid, Route, Pkt, Dict0, RecvData);
-receive_message(TPid, Pkt, Dict0, RecvData) ->
- incoming(TPid, Pkt, Dict0, RecvData).
+receive_message(TPid, Route, Pkt, NPid, Dict0, RecvData) ->
+ NPid ! {diameter, incoming(TPid, Route, Pkt, Dict0, RecvData)}.
%% incoming/4
-incoming(TPid, Pkt, Dict0, RecvData)
+incoming(TPid, Route, Pkt, Dict0, RecvData)
when is_pid(TPid) ->
#diameter_packet{header = #diameter_header{is_request = R}} = Pkt,
- recv(R,
- (not R) andalso lookup_request(Pkt, TPid),
- TPid,
- Pkt,
- Dict0,
- RecvData).
+ recv(R, Route, TPid, Pkt, Dict0, RecvData).
%% recv/6
@@ -274,8 +239,8 @@ recv(true, false, TPid, Pkt, Dict0, T) ->
end;
%% ... answer to known request ...
-recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) ->
- Pid ! {answer, Ref, Req, Dict0, Pkt},
+recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) ->
+ Pid ! {answer, Ref, TPid, Dict0, Pkt},
{answer, Pid};
%% Note that failover could have happened prior to this message being
@@ -546,7 +511,7 @@ send_A(T, TPid, {AppDict, Dict0} = DictT0, ReqPkt, EvalPktFs, EvalFs) ->
{MsgDict, Pkt} = reply(T, TPid, DictT0, EvalPktFs, ReqPkt),
incr(send, Pkt, TPid, AppDict),
incr_rc(send, Pkt, TPid, {MsgDict, AppDict, Dict0}), %% count outgoing
- send(TPid, Pkt),
+ send(TPid, false, Pkt),
lists:foreach(fun diameter_lib:eval/1, EvalFs).
%% answer/6
@@ -1508,7 +1473,7 @@ send_R(Pkt0,
packet = Pkt0},
incr(send, Pkt, TPid, AppDict),
- {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Timeout),
+ {TRef, MRef} = zend_requezt(TPid, Pkt, Req, SvcName, Timeout),
Pid ! Ref, %% tell caller a send has been attempted
handle_answer(SvcName,
App,
@@ -1519,11 +1484,11 @@ send_R(Pkt0,
recv_A(Timeout, SvcName, App, Opts, {TRef, MRef, #request{ref = Ref} = Req}) ->
%% Matching on TRef below ensures we ignore messages that pertain
%% to a previous transport prior to failover. The answer message
- %% includes the #request{} since it's not necessarily Req; that
- %% is, from the last peer to which we've transmitted.
+ %% includes the pid of the transport on which it was received,
+ %% which may not be the last peer to which we've transmitted.
receive
- {answer = A, Ref, Rq, Dict0, Pkt} -> %% Answer from peer
- {A, Rq, Dict0, Pkt};
+ {answer = A, Ref, TPid, Dict0, Pkt} -> %% Answer from peer
+ {A, #request{} = erase(TPid), Dict0, Pkt};
{timeout = Reason, TRef, _} -> %% No timely reply
{error, Req, Reason};
{'DOWN', MRef, process, _, _} when false /= MRef -> %% local peer_down
@@ -1717,22 +1682,20 @@ encode(DictT, TPid, #diameter_packet{bin = undefined} = Pkt) ->
encode(_, _, #diameter_packet{} = Pkt) ->
Pkt.
+%% zend_requezt/5
+
+zend_requezt(TPid, Pkt, Req, SvcName, Timeout) ->
+ put(TPid, Req),
+ send_request(TPid, Pkt, Req, SvcName, Timeout).
+
%% send_request/5
send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout)
when node() == node(TPid) ->
Seqs = diameter_codec:sequence_numbers(Bin),
TRef = erlang:start_timer(Timeout, self(), TPid),
- Entry = {Seqs, Req, TRef},
-
- %% Ensure that request table is cleaned even if the process is
- %% killed.
- Self = self(),
- spawn(fun() -> diameter_lib:wait([Self]), delete_request(Entry) end),
-
- MRef = insert_request(Entry),
- send(TPid, Pkt),
- {TRef, MRef};
+ send(TPid, _Route = {self(), Req#request.ref, Seqs}, Pkt),
+ {TRef, _MRef = peer_monitor(TPid, TRef)};
%% Send using a remote transport: spawn a process on the remote node
%% to relay the answer.
@@ -1764,16 +1727,16 @@ recv(TPid, Pid, TRef, {LocalTRef, MRef}) ->
exit({timeout, LocalTRef, TPid} = T)
end.
-%% send/2
+%% send/3
-send(Pid, Pkt) -> %% Strip potentially large message terms.
+send(Pid, Route, Pkt) -> %% Strip potentially large message terms.
#diameter_packet{header = H,
bin = Bin,
transport_data = T}
= Pkt,
- Pid ! {send, #diameter_packet{header = H,
- bin = Bin,
- transport_data = T}}.
+ Pid ! {send, Route, #diameter_packet{header = H,
+ bin = Bin,
+ transport_data = T}}.
%% retransmit/4
@@ -1783,8 +1746,8 @@ retransmit({TPid, Caps, App}
= Req,
SvcName,
Timeout) ->
- have_request(Pkt0, TPid) %% Don't failover to a peer we've
- andalso ?THROW(timeout), %% already sent to.
+ undefined == get(TPid) %% Don't failover to a peer we've
+ orelse ?THROW(timeout), %% already sent to.
Pkt = make_retransmit_packet(Pkt0),
@@ -1837,51 +1800,20 @@ resend_request(Pkt0,
?LOG(retransmission, Pkt#diameter_packet.header),
incr(TPid, {msg_id(Pkt, AppDict), send, retransmission}),
- {TRef, MRef} = send_request(TPid, Pkt, Req, SvcName, Tmo),
+ {TRef, MRef} = zend_requezt(TPid, Pkt, Req, SvcName, Tmo),
{TRef, MRef, Req}.
-%% insert_request/1
+%% peer_monitor/2
-insert_request({_Seqs, #request{transport = TPid}, TRef} = T) ->
- ets:insert(?REQUEST_TABLE, T),
+peer_monitor(TPid, TRef) ->
case ets:lookup(?REQUEST_TABLE, TPid) of %% at peer_up/1
- [{_, Pid}] ->
- monitor(process, Pid);
+ [{_, MPid}] ->
+ monitor(process, MPid);
[] -> %% transport has gone down
self() ! {failover, TRef},
false
end.
-%% lookup_request/2
-%%
-%% Note the match on both the key and transport pid. The latter is
-%% necessary since the same Hop-by-Hop and End-to-End identifiers are
-%% reused in the case of retransmission.
-
-lookup_request(Msg, TPid) ->
- Seqs = diameter_codec:sequence_numbers(Msg),
- Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, '_'},
- [],
- ['$_']}],
- case ets:select(?REQUEST_TABLE, Spec) of
- [{_, Req, _}] ->
- Req;
- [] ->
- false
- end.
-
-%% delete_request/1
-
-delete_request(T) ->
- ets:delete_object(?REQUEST_TABLE, T).
-
-%% have_request/2
-
-have_request(Pkt, TPid) ->
- Seqs = diameter_codec:sequence_numbers(Pkt),
- Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'},
- '$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1).
-
%% get_destination/2
get_destination(Dict, Msg) ->
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index 2ba60a65fb..f28b8f2910 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -283,7 +283,7 @@ event(Msg,
?LOG(transition, {From, To}).
data(Msg, TPid, reopen, okay) ->
- {recv, TPid, 'DWA', _Pkt} = Msg, %% assert
+ {recv, TPid, false, 'DWA', _Pkt, _NPid} = Msg, %% assert
{TPid, T} = eraser(open),
[T];
@@ -447,12 +447,14 @@ transition({'DOWN', _, process, TPid, _Reason} = D,
end;
%% Incoming message.
-transition({recv, TPid, Name, PktT}, #watchdog{transport = TPid} = S) ->
+transition({recv, TPid, Route, Name, Pkt, NPid},
+ #watchdog{transport = TPid}
+ = S) ->
try
- incoming(Name, PktT, S)
+ incoming(Name, Pkt, NPid, S)
catch
#watchdog{dictionary = Dict0, receive_data = T} = NS ->
- diameter_traffic:receive_message(TPid, PktT, Dict0, T),
+ diameter_traffic:receive_message(TPid, Route, Pkt, NPid, Dict0, T),
NS
end;
@@ -582,15 +584,17 @@ send_watchdog(#watchdog{pending = false,
%% Don't count encode errors since we don't expect any on DWR/DWA.
-%% incoming/3
+%% incoming/4
-incoming(Name, {Pkt, NPid}, S) ->
- NS = recv(Name, Pkt, S),
- NPid ! {diameter, discard},
- NS;
+incoming(Name, Pkt, false, S) ->
+ recv(Name, Pkt, S);
-incoming(Name, Pkt, S) ->
- recv(Name, Pkt, S).
+incoming(Name, Pkt, NPid, S) ->
+ try
+ recv(Name, Pkt, S)
+ after
+ NPid ! {diameter, discard}
+ end.
%% recv/3