aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src')
-rw-r--r--lib/diameter/src/base/diameter.erl29
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl57
-rw-r--r--lib/diameter/src/base/diameter_service.erl24
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl30
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl64
-rw-r--r--lib/diameter/src/diameter.appup.src10
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl251
7 files changed, 348 insertions, 117 deletions
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl
index de88f6befd..e8f2f63f86 100644
--- a/lib/diameter/src/base/diameter.erl
+++ b/lib/diameter/src/base/diameter.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -36,6 +36,8 @@
%% Information.
-export([services/0,
+ peer_info/1,
+ peer_find/1,
service_info/2]).
%% Start/stop the application. In a "real" application this should
@@ -53,6 +55,7 @@
service_name/0,
capability/0,
peer_filter/0,
+ peer_ref/0,
service_opt/0,
application_opt/0,
app_module/0,
@@ -147,6 +150,27 @@ service_info(SvcName, Option) ->
diameter_service:info(SvcName, Option).
%% ---------------------------------------------------------------------------
+%% peer_info/2
+%% ---------------------------------------------------------------------------
+
+-spec peer_info(peer_ref())
+ -> [tuple()].
+
+peer_info(PeerRef) ->
+ diameter_service:peer_info(PeerRef).
+
+%% ---------------------------------------------------------------------------
+%% peer_find/1
+%% ---------------------------------------------------------------------------
+
+-spec peer_find(peer_ref() | pid())
+ -> {peer_ref(), pid()}
+ | false.
+
+peer_find(Pid) ->
+ diameter_peer_fsm:find(Pid).
+
+%% ---------------------------------------------------------------------------
%% add_transport/3
%% ---------------------------------------------------------------------------
@@ -280,6 +304,9 @@ call(SvcName, App, Message) ->
| {all, [peer_filter()]}
| {any, [peer_filter()]}.
+-opaque peer_ref()
+ :: pid().
+
-type evaluable()
:: {module(), atom(), list()}
| fun()
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index fb874013a3..996e75a8d3 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -32,6 +32,9 @@
-export([start/3,
result_code/2]).
+%% Interface towards diameter.
+-export([find/1]).
+
%% gen_server callbacks
-export([init/1,
handle_call/3,
@@ -185,6 +188,25 @@ start_link(T) ->
infinity,
diameter_lib:spawn_opts(server, [])).
+%% find/1
+%%
+%% Identify both pids of a peer_fsm/transport pair.
+
+find(Pid) ->
+ findl([{?MODULE, '_', Pid}, {?MODULE, Pid, '_'}]).
+
+findl([]) ->
+ false;
+
+findl([Pat | Rest]) ->
+ try
+ [{{_, Pid, TPid}, Pid}] = diameter_reg:match(Pat),
+ {Pid, TPid}
+ catch
+ error:_ ->
+ findl(Rest)
+ end.
+
%% ---------------------------------------------------------------------------
%% ---------------------------------------------------------------------------
@@ -215,6 +237,8 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) ->
{TPid, Addrs} = start_transport(T, Rest, Svc),
+ diameter_reg:add({?MODULE, self(), TPid}), %% lets pairs be discovered
+
#state{state = {'Wait-Conn-Ack', Tmo},
parent = WPid,
transport = TPid,
@@ -416,8 +440,8 @@ transition({connection_timeout, _}, _) ->
ok;
%% Incoming message from the transport.
-transition({diameter, {recv, Pkt}}, S) ->
- recv(Pkt, S);
+transition({diameter, {recv, MsgT}}, S) ->
+ incoming(MsgT, S);
%% Timeout when still in the same state ...
transition({timeout = T, PS}, #state{state = PS}) ->
@@ -543,6 +567,28 @@ encode(Rec, Dict) ->
diameter_codec:encode(Dict, #diameter_packet{header = Hdr,
msg = Rec}).
+%% incoming/2
+
+incoming({Msg, NPid}, S) ->
+ try recv(Msg, S) of
+ T ->
+ NPid ! {diameter, discard},
+ T
+ catch
+ {?MODULE, Name, Pkt} ->
+ S#state.parent ! {recv, self(), Name, {Pkt, NPid}},
+ rcv(Name, Pkt, S)
+ end;
+
+incoming(Msg, S) ->
+ try
+ recv(Msg, S)
+ catch
+ {?MODULE, Name, Pkt} ->
+ S#state.parent ! {recv, self(), Name, Pkt},
+ rcv(Name, Pkt, S)
+ end.
+
%% recv/2
recv(#diameter_packet{header = #diameter_header{} = Hdr}
@@ -597,9 +643,8 @@ recv1('DPA' = N,
%% Any other message with a header and no length errors: send to the
%% parent.
-recv1(Name, Pkt, #state{parent = Pid} = S) ->
- Pid ! {recv, self(), Name, Pkt},
- rcv(Name, Pkt, S).
+recv1(Name, Pkt, #state{}) ->
+ throw({?MODULE, Name, Pkt}).
%% recv/3
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index efa4cc9108..cfb5cb5b82 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -32,6 +32,7 @@
-export([subscribe/1,
unsubscribe/1,
services/0,
+ peer_info/1,
info/2]).
%% towards diameter_config
@@ -218,6 +219,29 @@ lookup_state(SvcName) ->
end.
%% ---------------------------------------------------------------------------
+%% # peer_info/2
+%% ---------------------------------------------------------------------------
+
+%% An extended version of info_peer/1 for peer_info/1.
+peer_info(Pid) ->
+ try
+ {_, PD} = process_info(Pid, dictionary),
+ {_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD),
+ {TPid, {{Type, Ref}, TMod, Cfg}} = T,
+ {_, TD} = process_info(TPid, dictionary),
+ {_, Data} = lists:keyfind({TMod, info}, 1, TD),
+ [{ref, Ref},
+ {type, Type},
+ {owner, TPid},
+ {module, TMod},
+ {config, Cfg}
+ | try TMod:info(Data) catch _:_ -> [] end]
+ catch
+ error:_ ->
+ []
+ end.
+
+%% ---------------------------------------------------------------------------
%% # subscribe/1
%% # unsubscribe/1
%% ---------------------------------------------------------------------------
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index c169d3fc2c..2112941d5e 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -230,7 +230,15 @@ pending(TPids) ->
%% used to come through the service process but this avoids that
%% becoming a bottleneck.
-receive_message(TPid, Pkt, Dict0, RecvData)
+receive_message(TPid, {Pkt, NPid}, Dict0, RecvData) ->
+ NPid ! {diameter, incoming(TPid, Pkt, Dict0, RecvData)};
+
+receive_message(TPid, Pkt, Dict0, RecvData) ->
+ incoming(TPid, Pkt, Dict0, RecvData).
+
+%% incoming/4
+
+incoming(TPid, Pkt, Dict0, RecvData)
when is_pid(TPid) ->
#diameter_packet{header = #diameter_header{is_request = R}} = Pkt,
recv(R,
@@ -244,11 +252,18 @@ receive_message(TPid, Pkt, Dict0, RecvData)
%% Incoming request ...
recv(true, false, TPid, Pkt, Dict0, T) ->
- spawn_request(TPid, Pkt, Dict0, T);
+ try
+ {request, spawn_request(TPid, Pkt, Dict0, T)}
+ catch
+ error: system_limit = E -> %% discard
+ ?LOG(error, E),
+ discard
+ end;
%% ... answer to known request ...
recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) ->
- Pid ! {answer, Ref, Req, Dict0, Pkt};
+ Pid ! {answer, Ref, Req, Dict0, Pkt},
+ {answer, Pid};
%% Note that failover could have happened prior to this message being
%% received and triggering failback. That is, both a failover message
@@ -263,7 +278,7 @@ recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) ->
recv(false, false, TPid, Pkt, _, _) ->
?LOG(discarded, Pkt#diameter_packet.header),
incr(TPid, {{unknown, 0}, recv, discarded}),
- ok.
+ discard.
%% spawn_request/4
@@ -273,12 +288,7 @@ spawn_request(TPid, Pkt, Dict0, RecvData) ->
spawn_request(TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData).
spawn_request(TPid, Pkt, Dict0, Opts, RecvData) ->
- try
- spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts)
- catch
- error: system_limit = E -> %% discard
- ?LOG(error, E)
- end.
+ spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts).
%% ---------------------------------------------------------------------------
%% recv_request/4
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index ea8b2fdb0e..3fd87b223e 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -449,8 +449,14 @@ transition({'DOWN', _, process, TPid, _Reason} = D,
end;
%% Incoming message.
-transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) ->
- recv(Name, Pkt, S);
+transition({recv, TPid, Name, PktT}, #watchdog{transport = TPid} = S) ->
+ try
+ incoming(Name, PktT, S)
+ catch
+ #watchdog{dictionary = Dict0, receive_data = T} = NS ->
+ diameter_traffic:receive_message(TPid, PktT, Dict0, T),
+ NS
+ end;
%% Current watchdog has timed out.
transition({timeout, TRef, tw}, #watchdog{tref = TRef} = S) ->
@@ -578,22 +584,32 @@ send_watchdog(#watchdog{pending = false,
%% Don't count encode errors since we don't expect any on DWR/DWA.
+%% incoming/3
+
+incoming(Name, {Pkt, NPid}, S) ->
+ NS = recv(Name, Pkt, S),
+ NPid ! {diameter, discard},
+ NS;
+
+incoming(Name, Pkt, S) ->
+ recv(Name, Pkt, S).
+
%% recv/3
recv(Name, Pkt, S) ->
- try rcv(Name, S) of
+ try rcv(Name, Pkt, rcv(Name, S)) of
#watchdog{} = NS ->
- rcv(Name, Pkt, S),
- NS
+ throw(NS)
catch
- {?MODULE, throwaway, #watchdog{} = NS} ->
+ #watchdog{} = NS -> %% throwaway
NS
end.
%% rcv/3
rcv('DWR', Pkt, #watchdog{transport = TPid,
- dictionary = Dict0}) ->
+ dictionary = Dict0}
+ = S) ->
?LOG(recv, 'DWR'),
DPkt = diameter_codec:decode(Dict0, Pkt),
diameter_traffic:incr(recv, DPkt, TPid, Dict0),
@@ -610,32 +626,30 @@ rcv('DWR', Pkt, #watchdog{transport = TPid,
send(TPid, {send, #diameter_packet{header = H,
transport_data = T,
bin = Bin}}),
- ?LOG(send, 'DWA');
+ ?LOG(send, 'DWA'),
+ throw(S);
rcv('DWA', Pkt, #watchdog{transport = TPid,
- dictionary = Dict0}) ->
+ dictionary = Dict0}
+ = S) ->
?LOG(recv, 'DWA'),
diameter_traffic:incr(recv, Pkt, TPid, Dict0),
diameter_traffic:incr_rc(recv,
diameter_codec:decode(Dict0, Pkt),
TPid,
- Dict0);
+ Dict0),
+ throw(S);
-rcv(N, _, _)
+rcv(N, _, S)
when N == 'CER';
N == 'CEA';
N == 'DPR' ->
- false;
+ throw(S);
%% DPR can be sent explicitly with diameter:call/4. Only the
%% corresponding DPAs arrive here.
-rcv(_, Pkt, #watchdog{transport = TPid,
- dictionary = Dict0,
- receive_data = T}) ->
- diameter_traffic:receive_message(TPid, Pkt, Dict0, T).
-
-throwaway(S) ->
- throw({?MODULE, throwaway, S}).
+rcv(_, _, S)->
+ S.
%% rcv/2
%%
@@ -652,20 +666,20 @@ throwaway(S) ->
%% INITIAL Receive non-DWA Throwaway() INITIAL
rcv('DWA', #watchdog{status = initial} = S) ->
- throwaway(S#watchdog{pending = false});
+ throw(S#watchdog{pending = false});
rcv(_, #watchdog{status = initial} = S) ->
- throwaway(S);
+ throw(S);
%% DOWN Receive DWA Pending = FALSE
%% Throwaway() DOWN
%% DOWN Receive non-DWA Throwaway() DOWN
rcv('DWA', #watchdog{status = down} = S) ->
- throwaway(S#watchdog{pending = false});
+ throw(S#watchdog{pending = false});
rcv(_, #watchdog{status = down} = S) ->
- throwaway(S);
+ throw(S);
%% OKAY Receive DWA Pending = FALSE
%% SetWatchdog() OKAY
@@ -721,7 +735,7 @@ rcv('DWR', #watchdog{status = reopen} = S) ->
S; %% ensure DWA: the RFC isn't explicit about answering
rcv(_, #watchdog{status = reopen} = S) ->
- throwaway(S).
+ throw(S).
%% timeout/1
%%
diff --git a/lib/diameter/src/diameter.appup.src b/lib/diameter/src/diameter.appup.src
index 12b09889d1..618d5a7f10 100644
--- a/lib/diameter/src/diameter.appup.src
+++ b/lib/diameter/src/diameter.appup.src
@@ -47,10 +47,9 @@
{"1.9.2.2", [{restart_application, diameter}]}, %% 17.5.6.7
{"1.9.2.3", [{restart_application, diameter}]}, %% 17.5.6.8
{"1.10", [{restart_application, diameter}]}, %% 18.0
- {"1.11", [{load_module, diameter_traffic}, %% 18.1
- {update, diameter_service, {advanced, []}}]},
- {"1.11.1", [{load_module, diameter_traffic}, %% 18.2
- {update, diameter_service, {advanced, []}}]}
+ {"1.11", [{restart_application, diameter}]}, %% 18.1
+ {"1.11.1", [{restart_application, diameter}]}, %% 18.2
+ {"1.11.2", [{restart_application, diameter}]} %% 18.3
],
[
{"0.9", [{restart_application, diameter}]},
@@ -80,6 +79,7 @@
{"1.9.2.3", [{restart_application, diameter}]},
{"1.10", [{restart_application, diameter}]},
{"1.11", [{restart_application, diameter}]},
- {"1.11.1", [{restart_application, diameter}]}
+ {"1.11.1", [{restart_application, diameter}]},
+ {"1.11.2", [{restart_application, diameter}]}
]
}.
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index c79d85820b..6a5e5fe89d 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
%%
-module(diameter_tcp).
+-dialyzer({no_fail_call, throttle/2}).
-behaviour(gen_server).
@@ -102,7 +103,8 @@
| gen_tcp:listen_option().
-type option() :: {port, non_neg_integer()}
- | {fragment_timer, 0..16#FFFFFFFF}.
+ | {fragment_timer, 0..16#FFFFFFFF}
+ | {throttle_cb, diameter:evaluable()}.
%% Accepting/connecting transport process state.
-record(transport,
@@ -110,10 +112,13 @@
parent :: pid(), %% of process that started us
module :: module(), %% gen_tcp-like module
frag = <<>> :: frag(), %% message fragment
- ssl :: boolean() | [term()], %% ssl options
+ ssl :: [term()] | boolean(), %% ssl options, ssl or not
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
- flush = false :: boolean()}). %% flush fragment at timeout?
+ flush = false :: boolean(), %% flush fragment at timeout?
+ throttle_cb :: false | diameter:evaluable(), %% ask to receive
+ throttled :: boolean() | binary()}). %% stopped receiving?
+
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
%% (for simplicity) transport option. The transport_module diameter_etcp
@@ -198,22 +203,27 @@ i({T, Ref, Mod, Pid, Opts, Addrs})
%% that does nothing but kill us with the parent until call
%% returns.
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
- {SslOpts, Rest0} = ssl(Opts),
- {OwnOpts, Rest} = own(Rest0),
+ {[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
+ fragment_timer,
+ throttle_cb]),
+ SslOpts = ssl_opts(SO),
+ OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
+ Throttle = proplists:get_value(throttle_cb, OwnOpts, false),
Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs),
MPid ! {stop, self()}, %% tell the monitor to die
M = if SslOpts -> ssl; true -> Mod end,
- setopts(M, Sock),
putr(?REF_KEY, Ref),
- #transport{parent = Pid,
- module = M,
- socket = Sock,
- ssl = SslOpts,
- timeout = Tmo};
+ throttle(#transport{parent = Pid,
+ module = M,
+ socket = Sock,
+ ssl = SslOpts,
+ timeout = Tmo,
+ throttle_cb = Throttle,
+ throttled = false /= Throttle});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
@@ -246,14 +256,6 @@ laddr([], Mod, Sock) ->
laddr([{ip, Addr}], _, _) ->
Addr.
-own(Opts) ->
- {[Own], Rest} = proplists:split(Opts, [fragment_timer]),
- {Own, Rest}.
-
-ssl(Opts) ->
- {[SslOpts], Rest} = proplists:split(Opts, [ssl_options]),
- {ssl_opts(SslOpts), Rest}.
-
ssl_opts([]) ->
false;
ssl_opts([{ssl_options, true}]) ->
@@ -261,8 +263,8 @@ ssl_opts([{ssl_options, true}]) ->
ssl_opts([{ssl_options, Opts}])
when is_list(Opts) ->
Opts;
-ssl_opts(L) ->
- ?ERROR({ssl_options, L}).
+ssl_opts(T) ->
+ ?ERROR({ssl_options, T}).
%% init/7
@@ -393,7 +395,7 @@ get_port(Ps) ->
gen_opts(LAddrOpt, Opts) ->
{L,_} = proplists:split(Opts, [binary, packet, active]),
[[],[],[]] == L orelse ?ERROR({reserved_options, Opts}),
- [binary, {packet, 0}, {active, once}] ++ LAddrOpt ++ Opts.
+ [binary, {packet, 0}, {active, false}] ++ LAddrOpt ++ Opts.
%% ---------------------------------------------------------------------------
%% # ports/1
@@ -536,53 +538,37 @@ t(T,S) ->
S;
#transport{} = NS ->
NS;
- {stop, Reason} ->
- x(Reason);
stop ->
x(T)
end.
%% transition/2
-%% Initial incoming message when we might need to upgrade to TLS:
-%% don't request another message until we know.
-
-transition({tcp, Sock, Bin}, #transport{socket = Sock,
- parent = Pid,
- frag = Head,
- module = M,
- ssl = Opts}
- = S)
- when is_list(Opts) ->
- case rcv(Head, Bin) of
- {Msg, B} when is_binary(Msg) ->
- diameter_peer:recv(Pid, Msg),
- S#transport{frag = B};
- Frag ->
- setopts(M, Sock),
- start_fragment_timer(S#transport{frag = Frag})
- end;
-
%% Incoming message.
transition({P, Sock, Bin}, #transport{socket = Sock,
- module = M,
- ssl = B}
+ ssl = B,
+ throttled = T}
= S)
- when P == tcp, not B;
- P == ssl, B ->
- setopts(M, Sock),
- start_fragment_timer(recv(Bin, S));
+ when P == ssl, true == B;
+ P == tcp ->
+ false = T, %% assert
+ recv(Bin, S);
+
+%% Make a new throttling callback after a timeout.
+transition(throttle, #transport{throttled = false}) ->
+ ok;
+transition(throttle, S) ->
+ throttle(S);
%% Capabilties exchange has decided on whether or not to run over TLS.
transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
= S) ->
- #transport{socket = Sock,
- module = M}
+ true = is_boolean(B), %% assert
+ #transport{}
= NS
= tls_handshake(Type, B, S),
Pid ! {diameter, {tls, Ref}},
- setopts(M, Sock),
- start_fragment_timer(NS#transport{ssl = B});
+ throttle(NS#transport{ssl = B});
transition({C, Sock}, #transport{socket = Sock,
ssl = B})
@@ -598,14 +584,8 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock,
?ERROR({T,S});
%% Outgoing message.
-transition({diameter, {send, Bin}}, #transport{socket = Sock,
- module = M}) ->
- case send(M, Sock, Bin) of
- ok ->
- ok;
- {error, Reason} ->
- {stop, {send, Reason}}
- end;
+transition({diameter, {send, Bin}}, S) ->
+ send(Bin, S);
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,
@@ -672,16 +652,25 @@ tls(accept, Sock, Opts) ->
%% Reassemble fragmented messages and extract multiple message sent
%% using Nagle.
-recv(Bin, #transport{parent = Pid, frag = Head} = S) ->
+%% Receive packets until a full message is received,
+recv(Bin, #transport{frag = Head, throttled = false} = S) ->
case rcv(Head, Bin) of
- {Msg, B} when is_binary(Msg) ->
- diameter_peer:recv(Pid, Msg),
- recv(B, S#transport{frag = <<>>});
+ {Msg, B} ->
+ throttle(S#transport{frag = B, throttled = Msg});
Frag ->
- S#transport{frag = Frag,
- flush = false}
+ setopts(S),
+ start_fragment_timer(S#transport{frag = Frag,
+ flush = false})
end.
+%% recv/1
+
+recv(#transport{throttled = false} = S) ->
+ recv(<<>>, S);
+
+recv(#transport{} = S) ->
+ S.
+
%% rcv/2
%% No previous fragment.
@@ -765,8 +754,10 @@ bin(Bin)
%% since all messages with length problems are discarded this should
%% also eventually lead to watchdog failover.
-%% No fragment to flush.
-flush(#transport{frag = <<>>} = S) ->
+%% No fragment to flush or not receiving messages.
+flush(#transport{frag = Frag, throttled = B} = S)
+ when Frag == <<>>;
+ B /= false ->
S;
%% Messages have been received since last timer expiry.
@@ -807,6 +798,17 @@ accept(Mod, LSock) ->
connect(Mod, Host, Port, Opts) ->
Mod:connect(Host, Port, Opts).
+%% send/2
+
+send(Bin, #transport{socket = Sock,
+ module = M}) ->
+ case send(M, Sock, Bin) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ x({send, Reason})
+ end.
+
%% send/3
send(gen_tcp, Sock, Bin) ->
@@ -825,6 +827,11 @@ setopts(ssl, Sock, Opts) ->
setopts(M, Sock, Opts) ->
M:setopts(Sock, Opts).
+%% setopts/1
+
+setopts(#transport{socket = Sock, module = M}) ->
+ setopts(M, Sock).
+
%% setopts/2
setopts(M, Sock) ->
@@ -833,6 +840,110 @@ setopts(M, Sock) ->
X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect
end.
+%% throttle/1
+
+%% Still collecting packets for a complete message: keep receiving.
+throttle(#transport{throttled = false} = S) ->
+ recv(S);
+
+%% Decide whether to receive another, or whether to accept a message
+%% that's been received.
+throttle(#transport{throttle_cb = F, throttled = T} = S) ->
+ Res = cb(F, T),
+
+ try throttle(Res, S) of
+ #transport{ssl = SB} = NS when is_boolean(SB) ->
+ throttle(defrag(NS));
+ #transport{throttled = Msg} = NS when is_binary(Msg) ->
+ %% Initial incoming message when we might need to upgrade
+ %% to TLS: wait for reception of a tls tuple.
+ defrag(NS)
+ catch
+ #transport{} = NS ->
+ recv(NS)
+ end.
+
+%% cb/2
+
+cb(false, _) ->
+ ok;
+
+cb(F, B) ->
+ diameter_lib:eval([F, true /= B andalso B]).
+
+%% throttle/2
+
+%% Callback says to receive another message.
+throttle(ok, #transport{throttled = true} = S) ->
+ throw(S#transport{throttled = false});
+
+%% Callback says to accept a received message.
+throttle(ok, #transport{parent = Pid, throttled = Msg} = S)
+ when is_binary(Msg) ->
+ diameter_peer:recv(Pid, Msg),
+ S;
+
+throttle({ok = T, F}, S) ->
+ throttle(T, S#transport{throttle_cb = F});
+
+%% Callback says to accept a received message and acknowledged the
+%% returned pid with a {request, Pid} message if a request pid is
+%% spawned, a discard message otherwise. The latter does not mean that
+%% the message was necessarily discarded: it could have been an
+%% answer.
+throttle(NPid, #transport{parent = Pid, throttled = Msg} = S)
+ when is_pid(NPid), is_binary(Msg) ->
+ diameter_peer:recv(Pid, {Msg, NPid}),
+ S;
+
+throttle({NPid, F}, #transport{throttled = Msg} = S)
+ when is_pid(NPid), is_binary(Msg) ->
+ throttle(NPid, S#transport{throttle_cb = F});
+
+%% Callback to accept a received message says to discard it.
+throttle(discard, #transport{throttled = Msg} = S)
+ when is_binary(Msg) ->
+ S;
+
+throttle({discard = T, F}, #transport{throttled = Msg} = S)
+ when is_binary(Msg) ->
+ throttle(T, S#transport{throttle_cb = F});
+
+%% Callback to accept a received message says to answer it with the
+%% supplied binary.
+throttle(Bin, #transport{throttled = Msg} = S)
+ when is_binary(Bin), is_binary(Msg) ->
+ send(Bin, S),
+ S;
+
+throttle({Bin, F}, #transport{throttled = Msg} = S)
+ when is_binary(Bin), is_binary(Msg) ->
+ throttle(Bin, S#transport{throttle_cb = F});
+
+%% Callback says to ask again in the specified number of milliseconds.
+throttle({timeout, Tmo}, S) ->
+ erlang:send_after(Tmo, self(), throttle),
+ throw(S);
+
+throttle({timeout = T, Tmo, F}, S) ->
+ throttle({T, Tmo}, S#transport{throttle_cb = F});
+
+throttle(T, #transport{throttle_cb = F}) ->
+ ?ERROR({invalid_return, T, F}).
+
+%% defrag/1
+%%
+%% Try to extract another message from packets already read before
+%% another throttling callback.
+
+defrag(#transport{frag = Head} = S) ->
+ case rcv(Head, <<>>) of
+ {Msg, B} ->
+ S#transport{throttled = Msg, frag = B};
+ _ ->
+ S#transport{throttled = true}
+ end.
+
%% portnr/2
portnr(gen_tcp, Sock) ->