aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_traffic.erl
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-03-15 17:04:44 +0100
committerAnders Svensson <[email protected]>2017-06-11 16:30:38 +0200
commitca09cf7b697798aca5a4f81a11d5ad1d90f4107e (patch)
tree129600135950d661fbf23544701ecf47999d8f72 /lib/diameter/src/base/diameter_traffic.erl
parentea339754d579b7e917dab0afa9a4694689f1f990 (diff)
downloadotp-ca09cf7b697798aca5a4f81a11d5ad1d90f4107e.tar.gz
otp-ca09cf7b697798aca5a4f81a11d5ad1d90f4107e.tar.bz2
otp-ca09cf7b697798aca5a4f81a11d5ad1d90f4107e.zip
Simplify acks to transport processes
What's interesting when implementing some form of load regulation is when an incoming request has been answered or discarded. Acknowledge exactly this, not the identity of handler processes as previously. A transport process can request acks of nonforthcoming answers by sending {diameter, ack} to the parent peer_fsm, a handler processes identifies itself with a {handler, pid()} message, and the peer_fsm monitors on this to be able to send a notification to the transport if the handler dies before sending an answer.
Diffstat (limited to 'lib/diameter/src/base/diameter_traffic.erl')
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl80
1 files changed, 37 insertions, 43 deletions
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index bc1ccf4feb..96f3a307f9 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -30,7 +30,7 @@
-export([send_request/4]).
%% towards diameter_watchdog
--export([receive_message/6]).
+-export([receive_message/5]).
%% towards diameter_peer_fsm and diameter_watchdog
-export([incr/4,
@@ -93,7 +93,7 @@
packet :: #diameter_packet{} | undefined}). %% of request
%% ---------------------------------------------------------------------------
-%% # make_recvdata/1
+%% make_recvdata/1
%% ---------------------------------------------------------------------------
make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) ->
@@ -206,42 +206,36 @@ incr_rc(Dir, Pkt, TPid, Dict0) ->
incr_rc(Dir, Pkt, TPid, {Dict0, Dict0, Dict0}).
%% ---------------------------------------------------------------------------
-%% # receive_message/6
+%% receive_message/5
%%
-%% Handle an incoming Diameter message.
+%% Handle an incoming Diameter message in a watchdog process.
%% ---------------------------------------------------------------------------
-%% Handle an incoming Diameter message in the watchdog process.
-
-receive_message(TPid, Route, Pkt, false, Dict0, RecvData) ->
- incoming(TPid, Route, Pkt, Dict0, RecvData);
-
-receive_message(TPid, Route, Pkt, NPid, Dict0, RecvData) ->
- NPid ! {diameter, incoming(TPid, Route, Pkt, Dict0, RecvData)}.
-
-%% incoming/4
-
-incoming(TPid, Route, Pkt, Dict0, RecvData)
- when is_pid(TPid) ->
+-spec receive_message(pid(), Route, #diameter_packet{}, module(), #recvdata{})
+ -> pid()
+ | boolean()
+ when Route :: {Handler, RequestRef, Seqs}
+ | Ack,
+ Handler :: pid(),
+ RequestRef :: reference(),
+ Seqs :: {0..16#FFFFFFFF, 0..16#FFFFFFFF},
+ Ack :: boolean().
+
+receive_message(TPid, Route, Pkt, Dict0, RecvData) ->
#diameter_packet{header = #diameter_header{is_request = R}} = Pkt,
recv(R, Route, TPid, Pkt, Dict0, RecvData).
%% recv/6
%% Incoming request ...
-recv(true, false, TPid, Pkt, Dict0, T) ->
- try
- {request, spawn_request(TPid, Pkt, Dict0, T)}
- catch
- error: system_limit = E -> %% discard
- ?LOG(error, E),
- discard
- end;
+recv(true, Ack, TPid, Pkt, Dict0, T)
+ when is_boolean(Ack) ->
+ spawn_request(Ack, TPid, Pkt, Dict0, T);
%% ... answer to known request ...
recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) ->
Pid ! {answer, Ref, TPid, Dict0, Pkt},
- {answer, Pid};
+ true;
%% Note that failover could have happened prior to this message being
%% received and triggering failback. That is, both a failover message
@@ -256,23 +250,27 @@ recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) ->
recv(false, false, TPid, Pkt, _, _) ->
?LOG(discarded, Pkt#diameter_packet.header),
incr(TPid, {{unknown, 0}, recv, discarded}),
- discard.
+ false.
-%% spawn_request/4
+%% spawn_request/5
-spawn_request(TPid, Pkt, Dict0, {Opts, RecvData}) ->
- spawn_request(TPid, Pkt, Dict0, Opts, RecvData);
-spawn_request(TPid, Pkt, Dict0, RecvData) ->
- spawn_request(TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData).
+spawn_request(Ack, TPid, Pkt, Dict0, {Opts, RecvData}) ->
+ spawn_request(Ack, TPid, Pkt, Dict0, Opts, RecvData);
+spawn_request(Ack, TPid, Pkt, Dict0, RecvData) ->
+ spawn_request(Ack, TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData).
-spawn_request(TPid, Pkt, Dict0, Opts, RecvData) ->
- spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts).
+spawn_request(Ack, TPid, Pkt, Dict0, Opts, RecvData) ->
+ spawn_opt(fun() ->
+ recv_request(Ack, TPid, Pkt, Dict0, RecvData)
+ end,
+ Opts).
%% ---------------------------------------------------------------------------
-%% recv_request/4
+%% recv_request/5
%% ---------------------------------------------------------------------------
-recv_request(TPid,
+recv_request(Ack,
+ TPid,
#diameter_packet{header = #diameter_header{application_id = Id}}
= Pkt,
Dict0,
@@ -280,6 +278,7 @@ recv_request(TPid,
apps = Apps,
codec = Opts}
= RecvData) ->
+ Ack andalso (TPid ! {handler, self()}),
diameter_codec:setopts([{common_dictionary, Dict0} | Opts]),
send_A(recv_R(diameter_service:find_incoming_app(PeerT, TPid, Id, Apps),
TPid,
@@ -511,7 +510,7 @@ send_A(T, TPid, {AppDict, Dict0} = DictT0, ReqPkt, EvalPktFs, EvalFs) ->
{MsgDict, Pkt} = reply(T, TPid, DictT0, EvalPktFs, ReqPkt),
incr(send, Pkt, TPid, AppDict),
incr_rc(send, Pkt, TPid, {MsgDict, AppDict, Dict0}), %% count outgoing
- send(TPid, Pkt),
+ send(TPid, Pkt, _Route = self()),
lists:foreach(fun diameter_lib:eval/1, EvalFs).
%% answer/6
@@ -1207,7 +1206,7 @@ x(T) ->
exit(T).
%% ---------------------------------------------------------------------------
-%% # send_request/4
+%% send_request/4
%%
%% Handle an outgoing Diameter request.
%% ---------------------------------------------------------------------------
@@ -1296,7 +1295,7 @@ mo(T, _) ->
?ERROR({invalid_option, T}).
%% ---------------------------------------------------------------------------
-%% # send_request/6
+%% send_request/6
%% ---------------------------------------------------------------------------
%% Send an outgoing request in its dedicated process.
@@ -1745,11 +1744,6 @@ recv(TPid, Pid, TRef, {LocalTRef, MRef}) ->
exit({timeout, LocalTRef, TPid} = T)
end.
-%% send/2
-
-send(Pid, Pkt) ->
- Pid ! {send, Pkt}.
-
%% send/3
send(Pid, Pkt, Route) ->