aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/src/base/diameter_config.erl2
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl146
-rw-r--r--lib/diameter/src/base/diameter_reg.erl2
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl84
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl39
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl319
-rw-r--r--lib/diameter/src/transport/diameter_sctp_sup.erl3
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl491
-rw-r--r--lib/diameter/test/diameter_capx_SUITE.erl2
-rw-r--r--lib/diameter/test/diameter_examples_SUITE.erl2
-rw-r--r--lib/diameter/test/diameter_gen_sctp_SUITE.erl457
-rw-r--r--lib/diameter/test/diameter_gen_tcp_SUITE.erl4
-rw-r--r--lib/diameter/test/diameter_traffic_SUITE.erl207
-rw-r--r--lib/diameter/test/diameter_transport_SUITE.erl30
-rw-r--r--lib/diameter/test/diameter_util.erl65
-rw-r--r--lib/diameter/test/diameter_watchdog_SUITE.erl104
16 files changed, 1175 insertions, 782 deletions
diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl
index e10804c931..cea671f275 100644
--- a/lib/diameter/src/base/diameter_config.erl
+++ b/lib/diameter/src/base/diameter_config.erl
@@ -277,7 +277,7 @@ start_link() ->
start_link(T) ->
proc_lib:start_link(?MODULE, init, [T], infinity, []).
-
+
state() ->
call(state).
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 46d231da74..601e48e817 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -129,6 +129,7 @@
%% the request was sent explicitly with
%% diameter:call/4.
strict :: boolean(),
+ ack = false :: boolean(),
length_errors :: exit | handle | discard,
incoming_maxlen :: integer() | infinity}).
@@ -235,7 +236,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) ->
Tmo = proplists:get_value(capx_timeout, Opts, ?CAPX_TIMEOUT),
Strictness = proplists:get_value(capx_strictness, Opts, true),
- OnLengthErr = proplists:get_value(length_errors, Opts, exit),
+ LengthErr = proplists:get_value(length_errors, Opts, exit),
{TPid, Addrs} = start_transport(T, Rest, Svc),
@@ -247,7 +248,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) ->
dictionary = Dict0,
mode = M,
service = svc(Svc, Addrs),
- length_errors = OnLengthErr,
+ length_errors = LengthErr,
strict = Strictness,
incoming_maxlen = Maxlen}.
%% The transport returns its local ip addresses so that different
@@ -442,9 +443,18 @@ transition({connection_timeout = T, TPid},
transition({connection_timeout, _}, _) ->
ok;
+%% Requests for acknowledgements to the transport.
+transition({diameter, ack}, S) ->
+ S#state{ack = true};
+
%% Incoming message from the transport.
-transition({diameter, {recv, MsgT}}, S) ->
- incoming(MsgT, S);
+transition({diameter, {recv, Msg}}, S) ->
+ incoming(recv(Msg, S), S);
+
+%% Handler of an incoming request is telling of its existence.
+transition({handler, Pid}, _) ->
+ put_route(Pid),
+ ok;
%% Timeout when still in the same state ...
transition({timeout = T, PS}, #state{state = PS}) ->
@@ -458,7 +468,7 @@ transition({timeout, _}, _) ->
transition({send, Msg}, S) ->
outgoing(Msg, S);
transition({send, Msg, Route}, S) ->
- put_route(Route),
+ route_outgoing(Route),
outgoing(Msg, S);
%% Request for graceful shutdown at remove_transport, stop_service of
@@ -487,12 +497,13 @@ transition({'DOWN', _, process, WPid, _},
transition({'DOWN', _, process, TPid, _},
#state{transport = TPid}
= S) ->
- start_next(S);
+ start_next(S#state{ack = false});
%% Transport has died after connection timeout, or handler process has
%% died.
-transition({'DOWN', _, process, Pid, _}, _) ->
- erase_route(Pid),
+transition({'DOWN', _, process, Pid, _}, #state{transport = TPid}) ->
+ is_reference(erase_route(Pid))
+ andalso send(TPid, false), %% answer not forthcoming
ok;
%% State query.
@@ -502,37 +513,56 @@ transition({state, Pid}, #state{state = S, transport = TPid}) ->
%% Crash on anything unexpected.
-%% put_route/1
-%%
+%% route_outgoing/1
+
%% Map identifiers in an outgoing request to be able to lookup the
%% handler process when the answer is received.
-
-put_route({Pid, Ref, Seqs}) ->
+route_outgoing({Pid, Ref, Seqs}) -> %% request
MRef = monitor(process, Pid),
put(Pid, Seqs),
- put(Seqs, {Pid, Ref, MRef}).
+ put(Seqs, {Pid, Ref, MRef});
+
+%% Remove a mapping made for an incoming request.
+route_outgoing(Pid)
+ when is_pid(Pid) -> %% answer
+ MRef = erase_route(Pid),
+ undefined == MRef orelse demonitor(MRef).
-%% get_route/1
+%% put_route/1
+
+%% Monitor on a handler process for an incoming request.
+put_route(Pid) ->
+ MRef = monitor(process, Pid),
+ put(Pid, MRef).
-get_route(#diameter_packet{header = #diameter_header{is_request = false}}
- = Pkt) ->
+%% get_route/2
+
+%% incoming answer
+get_route(_, #diameter_packet{header = #diameter_header{is_request = false}}
+ = Pkt) ->
Seqs = diameter_codec:sequence_numbers(Pkt),
case erase(Seqs) of
{Pid, Ref, MRef} ->
demonitor(MRef),
erase(Pid),
{Pid, Ref, self()};
- undefined ->
+ undefined -> %% request unknown
false
end;
-get_route(_) ->
- false.
+%% incoming request
+get_route(Ack, _) ->
+ Ack.
%% erase_route/1
erase_route(Pid) ->
- erase(erase(Pid)).
+ case erase(Pid) of
+ {_,_} = Seqs ->
+ erase(Seqs);
+ T ->
+ T
+ end.
%% capx/1
@@ -611,29 +641,24 @@ encode(Rec, Dict) ->
%% incoming/2
-incoming({Msg, NPid}, S) ->
- try recv(Msg, S) of
- T ->
- NPid ! {diameter, discard},
- T
- catch
- {?MODULE, Name, Pkt} ->
- incoming(Name, Pkt, NPid, S)
- end;
+incoming({recv = T, Name, Pkt}, #state{parent = Pid, ack = Ack} = S) ->
+ Pid ! {T, self(), get_route(Ack, Pkt), Name, Pkt},
+ rcv(Name, Pkt, S);
-incoming(Msg, S) ->
- try
- recv(Msg, S)
- catch
- {?MODULE, Name, Pkt} ->
- incoming(Name, Pkt, false, S)
- end.
+incoming(#diameter_header{is_request = R}, #state{transport = TPid,
+ ack = Ack}) ->
+ R andalso Ack andalso send(TPid, false),
+ ok;
+
+incoming(<<_:32, 1:1, _/bits>>, #state{ack = true} = S) ->
+ send(S#state.transport, false),
+ ok;
-%% incoming/4
+incoming(<<_/bits>>, _) ->
+ ok;
-incoming(Name, Pkt, NPid, #state{parent = Pid} = S) ->
- Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid},
- rcv(Name, Pkt, S).
+incoming(T, _) ->
+ T.
%% recv/2
@@ -658,18 +683,19 @@ recv1(_,
#diameter_packet{header = H, bin = Bin},
#state{incoming_maxlen = M})
when M < size(Bin) ->
- invalid(false, incoming_maxlen_exceeded, {size(Bin), H});
+ invalid(false, incoming_maxlen_exceeded, {size(Bin), H}),
+ H;
%% Ignore anything but an expected CER/CEA if so configured. This is
%% non-standard behaviour.
-recv1(Name, _, #state{state = {'Wait-CEA', _, _},
- strict = false})
+recv1(Name, #diameter_packet{header = H}, #state{state = {'Wait-CEA', _, _},
+ strict = false})
when Name /= 'CEA' ->
- ok;
-recv1(Name, _, #state{state = recv_CER,
- strict = false})
+ H;
+recv1(Name, #diameter_packet{header = H}, #state{state = recv_CER,
+ strict = false})
when Name /= 'CER' ->
- ok;
+ H;
%% Incoming request after outgoing DPR: discard. Don't discard DPR, so
%% both ends don't do so when sending simultaneously.
@@ -677,13 +703,15 @@ recv1(Name,
#diameter_packet{header = #diameter_header{is_request = true} = H},
#state{dpr = {_,_,_}})
when Name /= 'DPR' ->
- invalid(false, recv_after_outgoing_dpr, H);
+ invalid(false, recv_after_outgoing_dpr, H),
+ H;
%% Incoming request after incoming DPR: discard.
recv1(_,
#diameter_packet{header = #diameter_header{is_request = true} = H},
#state{dpr = true}) ->
- invalid(false, recv_after_incoming_dpr, H);
+ invalid(false, recv_after_incoming_dpr, H),
+ H;
%% DPA with identifier mismatch, or in response to a DPR initiated by
%% the service.
@@ -701,7 +729,7 @@ recv1('DPA' = N,
%% Any other message with a header and no length errors: send to the
%% parent.
recv1(Name, Pkt, #state{}) ->
- throw({?MODULE, Name, Pkt}).
+ {recv, Name, Pkt}.
%% recv/3
@@ -720,10 +748,12 @@ recv(#diameter_header{}
#diameter_packet{bin = Bin},
#state{length_errors = E}) ->
T = {size(Bin), bit_size(Bin) rem 8, H},
- invalid(E, message_length_mismatch, T);
+ invalid(E, message_length_mismatch, T),
+ Bin;
recv(false, #diameter_packet{bin = Bin}, #state{length_errors = E}) ->
- invalid(E, truncated_header, Bin).
+ invalid(E, truncated_header, Bin),
+ Bin.
%% Note that counters here only count discarded messages.
invalid(E, Reason, T) ->
@@ -779,14 +809,10 @@ rcv('DPA' = N,
diameter_peer:close(TPid),
{stop, N};
-%% Ignore anything else, an unsolicited DPA in particular. Note that
-%% dpa_timeout deals with the case in which the peer sends the wrong
-%% identifiers in DPA.
-rcv(N, #diameter_packet{header = H}, _)
- when N == 'CER';
- N == 'CEA';
- N == 'DPR';
- N == 'DPA' ->
+%% Ignore an unsolicited DPA in particular. Note that dpa_timeout
+%% deals with the case in which the peer sends the wrong identifiers
+%% in DPA.
+rcv('DPA' = N, #diameter_packet{header = H}, _) ->
?LOG(ignored, N),
%% Note that these aren't counted in the normal recv counter.
diameter_stats:incr({diameter_codec:msg_id(H), recv, ignored}),
diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl
index 9027130063..4910979219 100644
--- a/lib/diameter/src/base/diameter_reg.erl
+++ b/lib/diameter/src/base/diameter_reg.erl
@@ -137,7 +137,7 @@ match(Pat) ->
match(Pat, Pid) ->
ets:match_object(?TABLE, {Pat, Pid}).
-
+
%% ===========================================================================
%% # wait(Pat)
%%
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index bc1ccf4feb..ccfab22e9c 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -30,7 +30,7 @@
-export([send_request/4]).
%% towards diameter_watchdog
--export([receive_message/6]).
+-export([receive_message/5]).
%% towards diameter_peer_fsm and diameter_watchdog
-export([incr/4,
@@ -54,9 +54,6 @@
-define(RELAY, ?DIAMETER_DICT_RELAY).
-define(BASE, ?DIAMETER_DICT_COMMON). %% Note: the RFC 3588 dictionary
--define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests
--define(DEFAULT_SPAWN_OPTS, []).
-
%% Table containing outgoing entries that live and die with
%% peer_up/down. The name is historic, since the table used to contain
%% information about outgoing requests for which an answer has yet to
@@ -67,7 +64,7 @@
-record(options,
{filter = none :: diameter:peer_filter(),
extra = [] :: list(),
- timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF,
+ timeout = 5000 :: 0..16#FFFFFFFF, %% for outgoing requests
detach = false :: boolean()}).
%% Term passed back to receive_message/6 with every incoming message.
@@ -93,7 +90,7 @@
packet :: #diameter_packet{} | undefined}). %% of request
%% ---------------------------------------------------------------------------
-%% # make_recvdata/1
+%% make_recvdata/1
%% ---------------------------------------------------------------------------
make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) ->
@@ -206,42 +203,38 @@ incr_rc(Dir, Pkt, TPid, Dict0) ->
incr_rc(Dir, Pkt, TPid, {Dict0, Dict0, Dict0}).
%% ---------------------------------------------------------------------------
-%% # receive_message/6
+%% receive_message/5
%%
-%% Handle an incoming Diameter message.
+%% Handle an incoming Diameter message in a watchdog process.
%% ---------------------------------------------------------------------------
-%% Handle an incoming Diameter message in the watchdog process.
-
-receive_message(TPid, Route, Pkt, false, Dict0, RecvData) ->
- incoming(TPid, Route, Pkt, Dict0, RecvData);
-
-receive_message(TPid, Route, Pkt, NPid, Dict0, RecvData) ->
- NPid ! {diameter, incoming(TPid, Route, Pkt, Dict0, RecvData)}.
-
-%% incoming/4
-
-incoming(TPid, Route, Pkt, Dict0, RecvData)
- when is_pid(TPid) ->
+-spec receive_message(pid(), Route, #diameter_packet{}, module(), RecvData)
+ -> pid()
+ | boolean()
+ when Route :: {Handler, RequestRef, Seqs}
+ | Ack,
+ RecvData :: {[SpawnOpt], #recvdata{}},
+ SpawnOpt :: term(),
+ Handler :: pid(),
+ RequestRef :: reference(),
+ Seqs :: {0..16#FFFFFFFF, 0..16#FFFFFFFF},
+ Ack :: boolean().
+
+receive_message(TPid, Route, Pkt, Dict0, RecvData) ->
#diameter_packet{header = #diameter_header{is_request = R}} = Pkt,
recv(R, Route, TPid, Pkt, Dict0, RecvData).
%% recv/6
%% Incoming request ...
-recv(true, false, TPid, Pkt, Dict0, T) ->
- try
- {request, spawn_request(TPid, Pkt, Dict0, T)}
- catch
- error: system_limit = E -> %% discard
- ?LOG(error, E),
- discard
- end;
+recv(true, Ack, TPid, Pkt, Dict0, T)
+ when is_boolean(Ack) ->
+ spawn_request(Ack, TPid, Pkt, Dict0, T);
%% ... answer to known request ...
recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) ->
Pid ! {answer, Ref, TPid, Dict0, Pkt},
- {answer, Pid};
+ true;
%% Note that failover could have happened prior to this message being
%% received and triggering failback. That is, both a failover message
@@ -256,23 +249,22 @@ recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) ->
recv(false, false, TPid, Pkt, _, _) ->
?LOG(discarded, Pkt#diameter_packet.header),
incr(TPid, {{unknown, 0}, recv, discarded}),
- discard.
-
-%% spawn_request/4
+ false.
-spawn_request(TPid, Pkt, Dict0, {Opts, RecvData}) ->
- spawn_request(TPid, Pkt, Dict0, Opts, RecvData);
-spawn_request(TPid, Pkt, Dict0, RecvData) ->
- spawn_request(TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData).
+%% spawn_request/5
-spawn_request(TPid, Pkt, Dict0, Opts, RecvData) ->
- spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts).
+spawn_request(Ack, TPid, Pkt, Dict0, {Opts, RecvData}) ->
+ spawn_opt(fun() ->
+ recv_request(Ack, TPid, Pkt, Dict0, RecvData)
+ end,
+ Opts).
%% ---------------------------------------------------------------------------
-%% recv_request/4
+%% recv_request/5
%% ---------------------------------------------------------------------------
-recv_request(TPid,
+recv_request(Ack,
+ TPid,
#diameter_packet{header = #diameter_header{application_id = Id}}
= Pkt,
Dict0,
@@ -280,6 +272,7 @@ recv_request(TPid,
apps = Apps,
codec = Opts}
= RecvData) ->
+ Ack andalso (TPid ! {handler, self()}),
diameter_codec:setopts([{common_dictionary, Dict0} | Opts]),
send_A(recv_R(diameter_service:find_incoming_app(PeerT, TPid, Id, Apps),
TPid,
@@ -511,7 +504,7 @@ send_A(T, TPid, {AppDict, Dict0} = DictT0, ReqPkt, EvalPktFs, EvalFs) ->
{MsgDict, Pkt} = reply(T, TPid, DictT0, EvalPktFs, ReqPkt),
incr(send, Pkt, TPid, AppDict),
incr_rc(send, Pkt, TPid, {MsgDict, AppDict, Dict0}), %% count outgoing
- send(TPid, Pkt),
+ send(TPid, Pkt, _Route = self()),
lists:foreach(fun diameter_lib:eval/1, EvalFs).
%% answer/6
@@ -1207,7 +1200,7 @@ x(T) ->
exit(T).
%% ---------------------------------------------------------------------------
-%% # send_request/4
+%% send_request/4
%%
%% Handle an outgoing Diameter request.
%% ---------------------------------------------------------------------------
@@ -1296,7 +1289,7 @@ mo(T, _) ->
?ERROR({invalid_option, T}).
%% ---------------------------------------------------------------------------
-%% # send_request/6
+%% send_request/6
%% ---------------------------------------------------------------------------
%% Send an outgoing request in its dedicated process.
@@ -1745,11 +1738,6 @@ recv(TPid, Pid, TRef, {LocalTRef, MRef}) ->
exit({timeout, LocalTRef, TPid} = T)
end.
-%% send/2
-
-send(Pid, Pkt) ->
- Pid ! {send, Pkt}.
-
%% send/3
send(Pid, Pkt, Route) ->
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index f28b8f2910..a2eb661870 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -283,7 +283,7 @@ event(Msg,
?LOG(transition, {From, To}).
data(Msg, TPid, reopen, okay) ->
- {recv, TPid, false, 'DWA', _Pkt, _NPid} = Msg, %% assert
+ {recv, TPid, _, 'DWA', _Pkt} = Msg, %% assert
{TPid, T} = eraser(open),
[T];
@@ -302,6 +302,8 @@ tpid(_, Pid)
tpid(Pid, _) ->
Pid.
+%% send/2
+
send(Pid, T) ->
Pid ! T.
@@ -447,14 +449,15 @@ transition({'DOWN', _, process, TPid, _Reason} = D,
end;
%% Incoming message.
-transition({recv, TPid, Route, Name, Pkt, NPid},
+transition({recv, TPid, Route, Name, Pkt},
#watchdog{transport = TPid}
= S) ->
- try
- incoming(Name, Pkt, NPid, S)
- catch
+ try incoming(Route, Name, Pkt, S) of
#watchdog{dictionary = Dict0, receive_data = T} = NS ->
- diameter_traffic:receive_message(TPid, Route, Pkt, NPid, Dict0, T),
+ diameter_traffic:receive_message(TPid, Route, Pkt, Dict0, T),
+ NS
+ catch
+ #watchdog{} = NS ->
NS
end;
@@ -586,25 +589,13 @@ send_watchdog(#watchdog{pending = false,
%% incoming/4
-incoming(Name, Pkt, false, S) ->
- recv(Name, Pkt, S);
-
-incoming(Name, Pkt, NPid, S) ->
- try
- recv(Name, Pkt, S)
- after
- NPid ! {diameter, discard}
- end.
-
-%% recv/3
-
-recv(Name, Pkt, S) ->
- try rcv(Name, Pkt, rcv(Name, S)) of
- #watchdog{} = NS ->
- throw(NS)
+incoming(Route, Name, Pkt, S) ->
+ try rcv(Name, S) of
+ NS -> rcv(Name, Pkt, NS)
catch
- #watchdog{} = NS -> %% throwaway
- NS
+ #watchdog{transport = TPid} = NS when Route -> %% incoming request
+ send(TPid, {send, false}), %% requiring ack
+ throw(NS)
end.
%% rcv/3
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 76aacabcb8..6a9f1f940b 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -52,21 +52,20 @@
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
+-define(TRANSPORT_KEY, transport).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
%% The default port for a listener.
-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1
-%% Remote addresses to accept connections from.
--define(DEFAULT_ACCEPT, []). %% any
-
%% How long to wait for a transport process to attach after
%% association establishment.
-define(ACCEPT_TIMEOUT, 5000).
-type connect_option() :: {raddr, inet:ip_address()}
| {rport, inet:port_number()}
+ | option()
| term(). %% gen_sctp:open_option().
-type match() :: inet:ip_address()
@@ -74,8 +73,14 @@
| [match()].
-type listen_option() :: {accept, match()}
+ | option()
| term(). %% gen_sctp:open_option().
+-type option() :: {sender, boolean()}
+ | sender
+ | {packet, boolean() | raw}
+ | {message_cb, false | diameter:evaluable()}.
+
-type uint() :: non_neg_integer().
%% Accepting/connecting transport process state.
@@ -87,20 +92,35 @@
%% {RAs, RP, Errors}
| connect,
socket :: gen_sctp:sctp_socket() | undefined,
- assoc_id :: gen_sctp:assoc_id(), %% association identifier
+ active = false :: boolean(), %% is socket active?
+ recv = true :: boolean(), %% should it be active?
+ assoc_id :: gen_sctp:assoc_id() %% association identifier
+ | undefined
+ | true,
peer :: {[inet:ip_address()], uint()} %% {RAs, RP}
| undefined,
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
- os = 0 :: uint()}). %% next output stream
+ os = 0 :: uint(), %% next output stream
+ packet = true :: boolean() %% legacy transport_data?
+ | raw,
+ message_cb = false :: false | diameter:evaluable(),
+ send = false :: pid() | boolean()}). %% sending process
+
+%% Monitor process state.
+-record(monitor,
+ {transport :: pid(),
+ ack = false :: boolean(),
+ socket :: gen_sctp:sctp_socket(),
+ assoc_id :: gen_sctp:assoc_id()}). %% next output stream
%% Listener process state.
-record(listener,
{ref :: reference(),
socket :: gen_sctp:sctp_socket(),
- service = false :: false | pid(), %% service process
+ service :: pid(), %% service process
pending = {0, queue:new()},
- accept :: [match()]}).
+ opts :: [[match()] | boolean() | diameter:evaluable()]}).
%% Field pending implements two queues: the first of transport-to-be
%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
@@ -132,11 +152,11 @@
start(T, Svc, Opts)
when is_list(Opts) ->
#diameter_service{capabilities = Caps,
- pid = SPid}
+ pid = Pid}
= Svc,
diameter_sctp_sup:start(), %% start supervisors on demand
Addrs = Caps#diameter_caps.host_ip_address,
- s(T, Addrs, SPid, lists:map(fun ip/1, Opts)).
+ s(T, Addrs, Pid, lists:map(fun ip/1, Opts)).
ip({ifaddr, A}) ->
{ip, A};
@@ -147,9 +167,9 @@ ip(T) ->
%% when there is not yet an association to assign it, or at comm_up on
%% a new association in which case the call retrieves a transport from
%% the pending queue.
-s({accept, Ref} = A, Addrs, SPid, Opts) ->
- {ok, LPid, LAs} = listener(Ref, {Opts, Addrs}),
- try gen_server:call(LPid, {A, self(), SPid}, infinity) of
+s({accept, Ref} = A, Addrs, SvcPid, Opts) ->
+ {ok, LPid, LAs} = listener(Ref, {Opts, SvcPid, Addrs}),
+ try gen_server:call(LPid, {A, self()}, infinity) of
{ok, TPid} ->
{ok, TPid, LAs};
No ->
@@ -162,7 +182,7 @@ s({accept, Ref} = A, Addrs, SPid, Opts) ->
%% gen_sctp in order to be able to accept a new association only
%% *after* an accepting transport has been spawned.
-s({connect = C, Ref}, Addrs, _SPid, Opts) ->
+s({connect = C, Ref}, Addrs, _SvcPid, Opts) ->
diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}).
%% start_link/1
@@ -216,22 +236,39 @@ init(T) ->
%% i/1
+i(#monitor{transport = TPid} = S) ->
+ monitor(process, TPid),
+ putr(?TRANSPORT_KEY, TPid),
+ proc_lib:init_ack({ok, self()}),
+ S;
+
%% A process owning a listening socket.
-i({listen, Ref, {Opts, Addrs}}) ->
+i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
+ monitor(process, SvcPid),
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {[Matches], Rest} = proplists:split(Opts, [accept]),
+ {Split, Rest}
+ = proplists:split(Opts, [accept, packet, sender, message_cb]),
+ OwnOpts = lists:append(Split),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}),
proc_lib:init_ack({ok, self(), LAs}),
#listener{ref = Ref,
+ service = SvcPid,
socket = Sock,
- accept = [[M] || {accept, M} <- Matches]};
+ opts = [[[M] || {accept, M} <- OwnOpts],
+ proplists:get_value(packet, OwnOpts, true)
+ | [proplists:get_value(K, OwnOpts, false)
+ || K <- [sender, message_cb]]]};
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
- {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]),
- RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As],
+ {[Ps | Split], Rest}
+ = proplists:split(Opts, [rport, raddr, packet, sender, message_cb]),
+ OwnOpts = lists:append(Split),
+ CB = proplists:get_value(message_cb, OwnOpts, false),
+ false == CB orelse (Pid ! {diameter, ack}),
+ RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts],
[RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps],
{LAs, Sock} = open(Addrs, Rest, 0),
putr(?REF_KEY, Ref),
@@ -239,7 +276,10 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
monitor(process, Pid),
#transport{parent = Pid,
mode = {connect, connect(Sock, RAs, RP, [])},
- socket = Sock};
+ socket = Sock,
+ message_cb = CB,
+ packet = proplists:get_value(packet, OwnOpts, true),
+ send = proplists:get_value(sender, OwnOpts, false)};
%% An accepting transport spawned by diameter, not yet owning an
%% association.
@@ -273,11 +313,16 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
receive
{Ref, Pid} when K == parent -> %% transport process started
S#transport{parent = Pid};
- {K, T, Matches} when K == peeloff -> %% association
+ {K, T, Opts} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
+ [Matches, Packet, Sender, CB] = Opts,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
- t(T, S#transport{socket = Sock});
+ false == CB orelse (S#transport.parent ! {diameter, ack}),
+ t(T, S#transport{socket = Sock,
+ message_cb = CB,
+ packet = Packet,
+ send = Sender});
accept_timeout = T ->
x(T);
{'DOWN', _, process, _, _} = T ->
@@ -374,13 +419,9 @@ handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref} = S) ->
{TPid, NewS} = accept(Ref, Pid, S),
{reply, {ok, TPid}, NewS};
-handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) ->
- handle_call({T, Pid}, From, if not is_pid(P), is_pid(SPid) ->
- monitor(process, SPid),
- S#listener{service = SPid};
- true ->
- S
- end);
+%% Transport is telling us of parent death.
+handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
+ {stop, {shutdown, Reason}, ok, S};
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -400,7 +441,11 @@ handle_info(T, #transport{} = S) ->
{noreply, #transport{} = t(T,S)};
handle_info(T, #listener{} = S) ->
- {noreply, #listener{} = l(T,S)}.
+ {noreply, #listener{} = l(T,S)};
+
+handle_info(T, #monitor{} = S) ->
+ m(T,S),
+ {noreply, S}.
%% Prior to the possibility of setting pool_size on in transport
%% configuration, a new accepting transport was only started following
@@ -422,6 +467,9 @@ code_change(_, State, _) ->
%% # terminate/2
%% ---------------------------------------------------------------------------
+terminate(_, #monitor{}) ->
+ ok;
+
terminate(_, #transport{assoc_id = undefined}) ->
ok;
@@ -445,11 +493,11 @@ getr(Key) ->
%% Incoming message from SCTP.
l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
- accept = Matches}
+ opts = Opts}
= S) ->
Id = assoc_id(Data),
{TPid, NewS} = accept(S),
- TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches},
+ TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Opts},
setopts(Sock),
NewS;
@@ -503,12 +551,21 @@ t(T,S) ->
%% Incoming message.
transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
- setopts(Sock),
- recv(Data, S);
+ setopts(S, recv(Data, S#transport{active = false}));
%% Outgoing message.
transition({diameter, {send, Msg}}, S) ->
- send(Msg, S);
+ message(send, Msg, S);
+
+%% Monitor has sent an outgoing message.
+transition(Msg, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ message(ack, Msg, S);
+
+%% Deferred actions from a message_cb.
+transition({actions, Dir, Acts}, S) ->
+ actions(Acts, Dir, S);
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
@@ -522,8 +579,18 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
stop;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+%% Parent process has died: call the monitor to not close the socket
+%% during an ongoing send, but don't let it take forever.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
+ send = MPid}) ->
+ is_boolean(MPid)
+ orelse ok == (catch gen_server:call(MPid, {stop, Pid}))
+ orelse exit(MPid, kill),
+ stop;
+
+%% Monitor process has died.
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop;
%% Timeout after transport process has been started.
@@ -536,6 +603,18 @@ transition({resolve_port, Pid}, #transport{socket = Sock})
Pid ! inet:port(Sock),
ok.
+%% m/2
+
+m({Msg, StreamId}, #monitor{socket = Sock,
+ transport = TPid,
+ assoc_id = AId,
+ ack = B}) ->
+ send(Sock, AId, StreamId, Msg),
+ B andalso (TPid ! Msg);
+
+m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) ->
+ x(T).
+
%% Crash on anything unexpected.
ok({ok, T}) ->
@@ -578,33 +657,52 @@ q(Ref, Pid, #listener{pending = {_,Q}}) ->
%% send/2
+%% Start monitor process on first send.
+send(Msg, #transport{send = true,
+ socket = Sock,
+ assoc_id = AId,
+ message_cb = CB}
+ = S) ->
+ {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(),
+ socket = Sock,
+ assoc_id = AId,
+ ack = false /= CB}),
+ monitor(process, MPid),
+ send(Msg, S#transport{send = MPid});
+
%% Outbound Diameter message on a specified stream ...
-send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}},
+send(#diameter_packet{transport_data = {outstream, SId}}
+ = Msg,
#transport{streams = {_, OS}}
= S) ->
- send(SId rem OS, Bin, S),
- S;
+ send(SId rem OS, Msg, S);
%% ... or not: rotate through all streams.
-send(#diameter_packet{bin = Bin}, S) ->
- send(Bin, S);
-send(Bin, #transport{streams = {_, OS},
+send(Msg, #transport{streams = {_, OS},
os = N}
- = S)
- when is_binary(Bin) ->
- send(N, Bin, S),
- S#transport{os = (N + 1) rem OS}.
+ = S) ->
+ send(N, Msg, S#transport{os = (N + 1) rem OS}).
%% send/3
-send(StreamId, Bin, #transport{socket = Sock,
- assoc_id = AId}) ->
- send(Sock, AId, StreamId, Bin).
+send(StreamId, Msg, #transport{send = false,
+ socket = Sock,
+ assoc_id = AId}
+ = S) ->
+ send(Sock, AId, StreamId, Msg),
+ message(ack, Msg, S);
+
+send(StreamId, Msg, #transport{send = MPid} = S) ->
+ MPid ! {Msg, StreamId},
+ S.
%% send/4
-send(Sock, AssocId, Stream, Bin) ->
- case gen_sctp:send(Sock, AssocId, Stream, Bin) of
+send(Sock, AssocId, StreamId, #diameter_packet{bin = Bin}) ->
+ send(Sock, AssocId, StreamId, Bin);
+
+send(Sock, AssocId, StreamId, Bin) ->
+ case gen_sctp:send(Sock, AssocId, StreamId, Bin) of
ok ->
ok;
{error, Reason} ->
@@ -624,7 +722,9 @@ recv({_, #sctp_assoc_change{state = comm_up,
= S) ->
Ref = getr(?REF_KEY),
publish(T, Ref, Id, Sock),
- up(S#transport{assoc_id = Id,
+ %% Deal with different association id after peeloff on Solaris by
+ %% taking the id from the first reception.
+ up(S#transport{assoc_id = T == accept orelse Id,
streams = {IS, OS}});
%% ... or not: try the next address.
@@ -639,17 +739,19 @@ recv({_, #sctp_assoc_change{} = E},
recv({_, #sctp_assoc_change{}}, _) ->
stop;
+%% First inbound on an accepting transport.
+recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin}
+ = T,
+ #transport{assoc_id = true}
+ = S) ->
+ recv(T, S#transport{assoc_id = Id});
+
%% Inbound Diameter message.
-recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid})
+recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S)
when is_binary(Bin) ->
- diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id},
- bin = Bin}),
- ok;
+ message(recv, Msg, S);
-recv({_, #sctp_shutdown_event{assoc_id = A}},
- #transport{assoc_id = Id})
- when A == Id;
- A == 0 ->
+recv({_, #sctp_shutdown_event{}}, _) ->
stop;
%% Note that diameter_sctp(3) documents that sctp_events cannot be
@@ -765,6 +867,23 @@ connect(Sock, [Addr | AT] = As, Port, Reasons) ->
connect(Sock, AT, Port, [{Addr, E} | Reasons])
end.
+%% setopts/2
+
+setopts(_, #transport{socket = Sock,
+ active = A,
+ recv = B}
+ = S)
+ when B, not A ->
+ setopts(Sock),
+ S#transport{active = true};
+
+setopts(_, #transport{} = S) ->
+ S;
+
+setopts(#transport{socket = Sock}, T) ->
+ setopts(Sock),
+ T.
+
%% setopts/1
setopts(Sock) ->
@@ -772,3 +891,83 @@ setopts(Sock) ->
ok -> ok;
X -> x({setopts, Sock, X}) %% possibly on peer disconnect
end.
+
+%% A message_cb is invoked whenever a message is sent or received, or
+%% to provide acknowledgement of a completed send or discarded
+%% request. See diameter_tcp for semantics, the only difference being
+%% that a recv callback can get a diameter_packet record as Msg
+%% depending on how/if option packet has been specified.
+
+%% message/3
+
+message(send, false = M, S) ->
+ message(ack, M, S);
+
+message(ack, _, #transport{message_cb = false} = S) ->
+ S;
+
+message(Dir, Msg, S) ->
+ setopts(S, actions(cb(S, Dir, Msg), Dir, S)).
+
+%% actions/3
+
+actions([], _, S) ->
+ S;
+
+actions([B | As], Dir, S)
+ when is_boolean(B) ->
+ actions(As, Dir, S#transport{recv = B});
+
+actions([Dir | As], _, S)
+ when Dir == send;
+ Dir == recv ->
+ actions(As, Dir, S);
+
+actions([Msg | As], send = Dir, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ actions(As, Dir, send(Msg, S));
+
+actions([Msg | As], recv = Dir, #transport{parent = Pid} = S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ diameter_peer:recv(Pid, Msg),
+ actions(As, Dir, S);
+
+actions([{defer, Tmo, Acts} | As], Dir, S) ->
+ erlang:send_after(Tmo, self(), {actions, Dir, Acts}),
+ actions(As, Dir, S);
+
+actions(CB, _, S) ->
+ S#transport{message_cb = CB}.
+
+%% cb/3
+
+cb(#transport{message_cb = false, packet = P}, recv, Msg) ->
+ [pkt(P, true, Msg)];
+
+cb(#transport{message_cb = CB, packet = P}, recv = D, Msg) ->
+ cb(CB, D, pkt(P, false, Msg));
+
+cb(#transport{message_cb = CB}, Dir, Msg) ->
+ cb(CB, Dir, Msg);
+
+cb(false, send, Msg) ->
+ [Msg];
+
+cb(CB, Dir, Msg) ->
+ diameter_lib:eval([CB, Dir, Msg]).
+
+%% pkt/3
+
+pkt(false, _, {_Info, Bin}) ->
+ Bin;
+
+pkt(true, _, {[#sctp_sndrcvinfo{stream = Id}], Bin}) ->
+ #diameter_packet{bin = Bin, transport_data = {stream, Id}};
+
+pkt(raw, true, {[Info], Bin}) ->
+ #diameter_packet{bin = Bin, transport_data = Info};
+
+pkt(raw, false, {[_], _} = Msg) ->
+ Msg.
diff --git a/lib/diameter/src/transport/diameter_sctp_sup.erl b/lib/diameter/src/transport/diameter_sctp_sup.erl
index 36050aaf28..e8e26ec7c5 100644
--- a/lib/diameter/src/transport/diameter_sctp_sup.erl
+++ b/lib/diameter/src/transport/diameter_sctp_sup.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -49,6 +49,7 @@ start() ->
start_child(T) ->
SupRef = case element(1,T) of
+ monitor -> ?TRANSPORT_SUP;
connect -> ?TRANSPORT_SUP;
accept -> ?TRANSPORT_SUP;
listen -> ?LISTENER_SUP
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 44abc5c3b4..a2f393d5d4 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -19,7 +19,6 @@
%%
-module(diameter_tcp).
--dialyzer({no_fail_call, throttle/2}).
-behaviour(gen_server).
@@ -53,6 +52,7 @@
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
+-define(TRANSPORT_KEY, transport).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
@@ -68,16 +68,23 @@
%% The same gen_server implementation supports three different kinds
%% of processes: an actual transport process, one that will club it to
%% death should the parent die before a connection is established, and
-%% a process owning the listening port.
+%% a process owning the listening port. The monitor process
+%% historically died after connection establishment, but can now live
+%% on as the sender of outgoing messages, so that a blocking send
+%% doesn't prevent messages from being received.
%% Listener process state.
-record(listener, {socket :: inet:socket(),
+ module :: module(),
service = false :: false | pid()}). %% service process
%% Monitor process state.
-record(monitor,
- {parent :: pid(),
- transport = self() :: pid()}).
+ {parent :: reference() | false | pid(),
+ transport = self() :: pid(),
+ ack = false :: boolean(),
+ socket :: inet:socket() | ssl:sslsocket() | undefined,
+ module :: module() | undefined}).
-type length() :: 0..16#FFFFFF. %% message length from Diameter header
-type size() :: non_neg_integer(). %% accumulated binary size
@@ -97,25 +104,30 @@
-type listen_option() :: {accept, match()}
| {ssl_options, true | [ssl:listen_option()]}
+ | option()
| ssl:listen_option()
| gen_tcp:listen_option().
-type option() :: {port, non_neg_integer()}
- | {fragment_timer, 0..16#FFFFFFFF}
- | {throttle_cb, diameter:evaluable()}.
+ | {sender, boolean()}
+ | sender
+ | {message_cb, false | diameter:evaluable()}
+ | {fragment_timer, 0..16#FFFFFFFF}.
%% Accepting/connecting transport process state.
-record(transport,
{socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket
+ active = false :: boolean(), %% is socket active?
+ recv = true :: boolean(), %% should it be active?
parent :: pid(), %% of process that started us
module :: module(), %% gen_tcp-like module
- frag = <<>> :: frag(), %% message fragment
ssl :: [term()] | boolean(), %% ssl options, ssl or not
+ frag = <<>> :: frag(), %% message fragment
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
- throttle_cb :: false | diameter:evaluable(), %% ask to receive
- throttled :: boolean() | binary()}). %% stopped receiving?
+ message_cb :: false | diameter:evaluable(),
+ send :: pid() | false}). %% sending process
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
@@ -137,13 +149,13 @@
start({T, Ref}, Svc, Opts) ->
#diameter_service{capabilities = Caps,
- pid = SPid}
+ pid = SvcPid}
= Svc,
diameter_tcp_sup:start(), %% start tcp supervisors on demand
{Mod, Rest} = split(Opts),
Addrs = Caps#diameter_caps.host_ip_address,
- Arg = {T, Ref, Mod, self(), Rest, Addrs, SPid},
+ Arg = {T, Ref, Mod, self(), Rest, Addrs, SvcPid},
diameter_tcp_sup:start_child(Arg).
split([{module, M} | Opts]) ->
@@ -197,57 +209,53 @@ init(T) ->
%% i/1
%% A transport process.
-i({T, Ref, Mod, Pid, Opts, Addrs, SPid})
+i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid})
when T == accept;
T == connect ->
monitor(process, Pid),
%% Since accept/connect might block indefinitely, spawn a process
- %% that does nothing but kill us with the parent until call
- %% returns.
- {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
+ %% that kills us with the parent until call returns, and then
+ %% sends outgoing messages.
{[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
- fragment_timer,
- throttle_cb]),
+ sender,
+ message_cb,
+ fragment_timer]),
SslOpts = ssl_opts(SO),
OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
+ [CB, Sender] = [proplists:get_value(K, OwnOpts, false)
+ || K <- [message_cb, sender]],
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
- Throttle = proplists:get_value(throttle_cb, OwnOpts, false),
- Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid),
- MPid ! {stop, self()}, %% tell the monitor to die
+ {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
+ Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid),
M = if SslOpts -> ssl; true -> Mod end,
+ Sender andalso monitor(process, MPid),
+ false == CB orelse (Pid ! {diameter, ack}),
+ MPid ! {start, self(), Sender andalso {Sock, M}, false /= CB},
putr(?REF_KEY, Ref),
- throttle(#transport{parent = Pid,
- module = M,
- socket = Sock,
- ssl = SslOpts,
- timeout = Tmo,
- throttle_cb = Throttle,
- throttled = false /= Throttle});
+ setopts(#transport{parent = Pid,
+ module = M,
+ socket = Sock,
+ ssl = SslOpts,
+ message_cb = CB,
+ timeout = Tmo,
+ send = Sender andalso MPid});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
-i({T, _Ref, _Mod, _Pid, _Opts, _Addrs} = Arg) %% from old code
- when T == accept;
- T == connect ->
- i(erlang:append_element(Arg, _SPid = false));
-
%% A monitor process to kill the transport if the parent dies.
i(#monitor{parent = Pid, transport = TPid} = S) ->
+ putr(?TRANSPORT_KEY, TPid),
proc_lib:init_ack({ok, self()}),
- monitor(process, Pid),
monitor(process, TPid),
- S;
+ S#monitor{parent = monitor(process, Pid)};
%% In principle a link between the transport and killer processes
%% could do the same thing: have the accepting/connecting process be
%% killed when the killer process dies as a consequence of parent
%% death. However, a link can be unlinked and this is exactly what
-%% gen_tcp seems to so. Links should be left to supervisors.
-
-i({listen = L, Ref, _APid, T}) -> %% from old code
- i({L, Ref, T});
+%% gen_tcp seems to do. Links should be left to supervisors.
i({listen, Ref, {Mod, Opts, Addrs}}) ->
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
@@ -258,7 +266,8 @@ i({listen, Ref, {Mod, Opts, Addrs}}) ->
LAddr = laddr(LAddrOpt, Mod, LSock),
true = diameter_reg:add_new({?MODULE, listener, {Ref, {LAddr, LSock}}}),
proc_lib:init_ack({ok, self(), {LAddr, LSock}}),
- #listener{socket = LSock}.
+ #listener{socket = LSock,
+ module = Mod}.
laddr([], Mod, Sock) ->
{ok, {Addr, _Port}} = sockname(Mod, Sock),
@@ -279,19 +288,19 @@ ssl_opts(T) ->
%% init/8
%% Establish a TLS connection before capabilities exchange ...
-init(Type, Ref, Mod, Pid, true, Opts, Addrs, SPid) ->
- init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SPid);
+init(Type, Ref, Mod, Pid, true, Opts, Addrs, SvcPid) ->
+ init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SvcPid);
%% ... or not.
-init(Type, Ref, Mod, Pid, _, Opts, Addrs, SPid) ->
- init(Type, Ref, Mod, Pid, Opts, Addrs, SPid).
+init(Type, Ref, Mod, Pid, _, Opts, Addrs, SvcPid) ->
+ init(Type, Ref, Mod, Pid, Opts, Addrs, SvcPid).
%% init/7
-init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) ->
+init(accept = T, Ref, Mod, Pid, Opts, Addrs, SvcPid) ->
{[Matches], Rest} = proplists:split(Opts, [accept]),
{ok, LPid, {LAddr, LSock}} = listener(Ref, {Mod, Rest, Addrs}),
- ok = gen_server:call(LPid, {accept, SPid}, infinity),
+ ok = gen_server:call(LPid, {accept, SvcPid}, infinity),
proc_lib:init_ack({ok, self(), [LAddr]}),
Sock = ok(accept(Mod, LSock)),
ok = accept_peer(Mod, Sock, accept(Matches)),
@@ -299,7 +308,7 @@ init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) ->
diameter_peer:up(Pid),
Sock;
-init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SPid) ->
+init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SvcPid) ->
{[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]),
LAddrOpt = get_addr(LA, Addrs),
RAddr = get_addr(RA),
@@ -451,14 +460,18 @@ portnr(Sock) ->
%% # handle_call/3
%% ---------------------------------------------------------------------------
-handle_call({accept, SPid}, _From, #listener{service = P} = S) ->
- {reply, ok, if not is_pid(P), is_pid(SPid) ->
- monitor(process, SPid),
- S#listener{service = SPid};
+handle_call({accept, SvcPid}, _From, #listener{service = P} = S) ->
+ {reply, ok, if not is_pid(P), is_pid(SvcPid) ->
+ monitor(process, SvcPid),
+ S#listener{service = SvcPid};
true ->
S
end};
-
+
+%% Transport is telling us of parent death.
+handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
+ {stop, {shutdown, Reason}, ok, S};
+
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -480,8 +493,7 @@ handle_info(T, #listener{} = S) ->
{noreply, #listener{} = l(T,S)};
handle_info(T, #monitor{} = S) ->
- m(T,S),
- x(T).
+ {noreply, #monitor{} = m(T,S)}.
%% ---------------------------------------------------------------------------
%% # code_change/3
@@ -497,6 +509,7 @@ code_change(_, State, _) ->
terminate(_, _) ->
ok.
+
%% ---------------------------------------------------------------------------
putr(Key, Val) ->
@@ -509,18 +522,47 @@ getr(Key) ->
%%
%% Transition monitor state.
+%% Outgoing message.
+m(Msg, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ send(Msg, S),
+ S;
+
+%% Transport has established a connection. Stop monitoring on the
+%% parent so as not to die before a send from the transport.
+m({start, TPid, T, Ack} = M, #monitor{transport = TPid} = S) ->
+ case T of
+ {Sock, Mod} ->
+ demonitor(S#monitor.parent, [flush]),
+ S#monitor{parent = false,
+ socket = Sock,
+ module = Mod,
+ ack = Ack};
+ false -> %% monitor not sending
+ x(M)
+ end;
+
%% Transport is telling us to die.
-m({stop, TPid}, #monitor{transport = TPid}) ->
- ok;
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
-%% Transport has died.
-m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) ->
- ok;
+%% Transport is telling us to die.
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
-%% Transport parent has died.
-m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid,
- transport = TPid}) ->
- exit(TPid, {shutdown, parent}).
+%% Transport is telling us that TLS has been negotiated after
+%% capabilities exchange.
+m({tls, SSock}, S) ->
+ S#monitor{socket = SSock,
+ module = ssl};
+
+%% Transport or parent has died.
+m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef,
+ transport = TPid})
+ when M == MRef;
+ P == TPid ->
+ x(T).
%% l/2
%%
@@ -528,18 +570,16 @@ m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid,
%% Service process has died.
l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid,
- socket = Sock}) ->
- gen_tcp:close(Sock),
+ socket = Sock,
+ module = M}) ->
+ M:close(Sock),
x(T);
%% Transport has been removed.
-l({transport, remove, _} = T, #listener{socket = Sock}) ->
- gen_tcp:close(Sock),
- x(T);
-
-%% Possibly death of an accepting process monitored in old code.
-l(_, S) ->
- S.
+l({transport, remove, _} = T, #listener{socket = Sock,
+ module = M}) ->
+ M:close(Sock),
+ x(T).
%% t/2
%%
@@ -557,21 +597,13 @@ t(T,S) ->
%% transition/2
-%% Incoming message.
+%% Incoming packets.
transition({P, Sock, Bin}, #transport{socket = Sock,
- ssl = B,
- throttled = T}
+ ssl = B}
= S)
when P == ssl, true == B;
P == tcp ->
- false = T, %% assert
- recv(Bin, S);
-
-%% Make a new throttling callback after a timeout.
-transition(throttle, #transport{throttled = false}) ->
- ok;
-transition(throttle, S) ->
- throttle(S);
+ recv(Bin, S#transport{active = false});
%% Capabilties exchange has decided on whether or not to run over TLS.
transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
@@ -581,7 +613,7 @@ transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
= NS
= tls_handshake(Type, B, S),
Pid ! {diameter, {tls, Ref}},
- throttle(NS#transport{ssl = B});
+ NS#transport{ssl = B};
transition({C, Sock}, #transport{socket = Sock,
ssl = B})
@@ -597,8 +629,18 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock,
?ERROR({T,S});
%% Outgoing message.
-transition({diameter, {send, Bin}}, S) ->
- send(Bin, S);
+transition({diameter, {send, Msg}}, #transport{} = S) ->
+ message(send, Msg, S);
+
+%% Monitor has sent an outgoing message.
+transition(Msg, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ message(ack, Msg, S);
+
+%% Deferred actions from a message_cb.
+transition({actions, Dir, Acts}, S) ->
+ actions(Acts, Dir, S);
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,
@@ -618,8 +660,18 @@ transition({resolve_port, Pid}, #transport{socket = Sock,
Pid ! portnr(M, Sock),
ok;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+%% Parent process has died: call the monitor to not close the socket
+%% during an ongoing send, but don't let it take forever.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
+ send = MPid}) ->
+ false == MPid
+ orelse (ok == gen_server:call(MPid, {stop, self()}, 1000))
+ orelse exit(MPid, {shutdown, parent}),
+ stop;
+
+%% Monitor process has died.
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop.
%% Crash on anything unexpected.
@@ -643,11 +695,13 @@ tls_handshake(_, true, #transport{ssl = false}) ->
%% Capabilities exchange negotiated TLS: upgrade the connection.
tls_handshake(Type, true, #transport{socket = Sock,
module = M,
- ssl = Opts}
+ ssl = Opts,
+ send = MPid}
= S) ->
{ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]),
Ref = getr(?REF_KEY),
true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}),
+ false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process
S#transport{socket = SSock,
module = ssl};
@@ -666,24 +720,15 @@ tls(accept, Sock, Opts) ->
%% using Nagle.
%% Receive packets until a full message is received,
-recv(Bin, #transport{frag = Head, throttled = false} = S) ->
+recv(Bin, #transport{frag = Head} = S) ->
case rcv(Head, Bin) of
- {Msg, B} ->
- throttle(S#transport{frag = B, throttled = Msg});
- Frag ->
- setopts(S),
- start_fragment_timer(S#transport{frag = Frag,
- flush = false})
+ {Msg, B} -> %% have a complete message ...
+ message(recv, Msg, S#transport{frag = B});
+ Frag -> %% read more on the socket
+ start_fragment_timer(setopts(S#transport{frag = Frag,
+ flush = false}))
end.
-%% recv/1
-
-recv(#transport{throttled = false} = S) ->
- recv(<<>>, S);
-
-recv(#transport{} = S) ->
- S.
-
%% rcv/2
%% No previous fragment.
@@ -743,13 +788,16 @@ recv1(Len, Bin) ->
<<Msg:Len/binary, Rest/binary>> = Bin,
{Msg, Rest}.
-%% bin/1-2
+%% bin/2
bin(Head, Acc) ->
list_to_binary([Head | lists:reverse(Acc)]).
+%% bin/1
+
bin({_, _, Head, Acc}) ->
bin(Head, Acc);
+
bin(Bin)
when is_binary(Bin) ->
Bin.
@@ -768,9 +816,7 @@ bin(Bin)
%% also eventually lead to watchdog failover.
%% No fragment to flush or not receiving messages.
-flush(#transport{frag = Frag, throttled = B} = S)
- when Frag == <<>>;
- B /= false ->
+flush(#transport{frag = <<>>} = S) ->
S;
%% Messages have been received since last timer expiry.
@@ -778,9 +824,8 @@ flush(#transport{flush = false} = S) ->
start_fragment_timer(S#transport{flush = true});
%% No messages since last expiry.
-flush(#transport{frag = Frag, parent = Pid} = S) ->
- diameter_peer:recv(Pid, bin(Frag)),
- S#transport{frag = <<>>}.
+flush(#transport{frag = Frag} = S) ->
+ message(recv, bin(Frag), S#transport{frag = <<>>}).
%% start_fragment_timer/1
%%
@@ -813,9 +858,27 @@ connect(Mod, Host, Port, Opts) ->
%% send/2
-send(Bin, #transport{socket = Sock,
- module = M}) ->
- case send(M, Sock, Bin) of
+send(Msg, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) ->
+ send1(M, Sock, Msg),
+ B andalso (TPid ! Msg);
+
+send(Msg, #transport{socket = Sock, module = M, send = false} = S) ->
+ send1(M, Sock, Msg),
+ message(ack, Msg, S);
+
+%% Send from the monitor process to avoid deadlock if both the
+%% receiver and the peer were to block in send.
+send(Msg, #transport{send = Pid} = S) ->
+ Pid ! Msg,
+ S.
+
+%% send1/3
+
+send1(Mod, Sock, #diameter_packet{bin = Bin}) ->
+ send1(Mod, Sock, Bin);
+
+send1(Mod, Sock, Bin) ->
+ case send(Mod, Sock, Bin) of
ok ->
ok;
{error, Reason} ->
@@ -842,120 +905,19 @@ setopts(M, Sock, Opts) ->
%% setopts/1
-setopts(#transport{socket = Sock, module = M}) ->
- setopts(M, Sock).
-
-%% setopts/2
-
-setopts(M, Sock) ->
+setopts(#transport{socket = Sock,
+ active = A,
+ recv = B,
+ module = M}
+ = S)
+ when B, not A ->
case setopts(M, Sock, [{active, once}]) of
- ok -> ok;
- X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect
- end.
-
-%% throttle/1
-
-%% Still collecting packets for a complete message: keep receiving.
-throttle(#transport{throttled = false} = S) ->
- recv(S);
-
-%% Decide whether to receive another, or whether to accept a message
-%% that's been received.
-throttle(#transport{throttle_cb = F, throttled = T} = S) ->
- Res = cb(F, T),
-
- try throttle(Res, S) of
- #transport{ssl = SB} = NS when is_boolean(SB) ->
- throttle(defrag(NS));
- #transport{throttled = Msg} = NS when is_binary(Msg) ->
- %% Initial incoming message when we might need to upgrade
- %% to TLS: wait for reception of a tls tuple.
- defrag(NS)
- catch
- #transport{} = NS ->
- recv(NS)
- end.
-
-%% cb/2
-
-cb(false, _) ->
- ok;
-
-cb(F, B) ->
- diameter_lib:eval([F, true /= B andalso B]).
-
-%% throttle/2
-
-%% Callback says to receive another message.
-throttle(ok, #transport{throttled = true} = S) ->
- throw(S#transport{throttled = false});
-
-%% Callback says to accept a received message.
-throttle(ok, #transport{parent = Pid, throttled = Msg} = S)
- when is_binary(Msg) ->
- diameter_peer:recv(Pid, Msg),
- S;
-
-throttle({ok = T, F}, S) ->
- throttle(T, S#transport{throttle_cb = F});
-
-%% Callback says to accept a received message and acknowledged the
-%% returned pid with a {request, Pid} message if a request pid is
-%% spawned, a discard message otherwise. The latter does not mean that
-%% the message was necessarily discarded: it could have been an
-%% answer.
-throttle(NPid, #transport{parent = Pid, throttled = Msg} = S)
- when is_pid(NPid), is_binary(Msg) ->
- diameter_peer:recv(Pid, {Msg, NPid}),
- S;
-
-throttle({NPid, F}, #transport{throttled = Msg} = S)
- when is_pid(NPid), is_binary(Msg) ->
- throttle(NPid, S#transport{throttle_cb = F});
-
-%% Callback to accept a received message says to discard it.
-throttle(discard, #transport{throttled = Msg} = S)
- when is_binary(Msg) ->
- S;
-
-throttle({discard = T, F}, #transport{throttled = Msg} = S)
- when is_binary(Msg) ->
- throttle(T, S#transport{throttle_cb = F});
-
-%% Callback to accept a received message says to answer it with the
-%% supplied binary.
-throttle(Bin, #transport{throttled = Msg} = S)
- when is_binary(Bin), is_binary(Msg) ->
- send(Bin, S),
- S;
-
-throttle({Bin, F}, #transport{throttled = Msg} = S)
- when is_binary(Bin), is_binary(Msg) ->
- throttle(Bin, S#transport{throttle_cb = F});
-
-%% Callback says to ask again in the specified number of milliseconds.
-throttle({timeout, Tmo}, S) ->
- erlang:send_after(Tmo, self(), throttle),
- throw(S);
-
-throttle({timeout = T, Tmo, F}, S) ->
- throttle({T, Tmo}, S#transport{throttle_cb = F});
-
-throttle(T, #transport{throttle_cb = F}) ->
- ?ERROR({invalid_return, T, F}).
-
-%% defrag/1
-%%
-%% Try to extract another message from packets already read before
-%% another throttling callback.
+ ok -> S#transport{active = true};
+ X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect
+ end;
-defrag(#transport{frag = Head} = S) ->
- case rcv(Head, <<>>) of
- {Msg, B} ->
- S#transport{throttled = Msg, frag = B};
- _ ->
- S#transport{throttled = true}
- end.
+setopts(S) ->
+ S.
%% portnr/2
@@ -990,3 +952,80 @@ getstat(gen_tcp, Sock) ->
getstat(M, Sock) ->
M:getstat(Sock).
%% Note that ssl:getstat/1 doesn't yet exist in R15B01.
+
+%% A message_cb is invoked whenever a message is sent or received, or
+%% to provide acknowledgement of a completed send or discarded
+%% request. Ignoring possible extra arguments, calls are of the
+%% following form.
+%%
+%% cb(recv, Msg) Receive a message into diameter?
+%% cb(send, Msg) Send a message on the socket?
+%% cb(ack, Msg) Acknowledgement of a completed send.
+%% cb(ack, false) Acknowledgement of a discarded request.
+%%
+%% Msg will be binary() in a recv callback, but can be a
+%% diameter_packet record in a send/ack callback if a recv/send
+%% callback returns a record. Callbacks return a list of the following
+%% form.
+%%
+%% [boolean() | send | recv | binary() | #diameter_packet{}]
+%%
+%% The atoms are meaningless by themselves, but say whether subsequent
+%% messages are to be sent or received. A boolean says whether or not
+%% to continue reading on the socket. Messages can be received even
+%% after false is returned if these arrived in the same packet. A
+%% leading recv or send is implicit on the corresponding callbacks. A
+%% new callback can be returned as the tail of a returned list: any
+%% value not of the aforementioned list type is interpreted as a
+%% callback.
+
+%% message/3
+
+message(send, false = M, S) ->
+ message(ack, M, S);
+
+message(ack, _, #transport{message_cb = false} = S) ->
+ S;
+
+message(Dir, Msg, #transport{message_cb = CB} = S) ->
+ recv(<<>>, actions(cb(CB, Dir, Msg), Dir, S)).
+
+%% actions/3
+
+actions([], _, S) ->
+ S;
+
+actions([B | As], Dir, S)
+ when is_boolean(B) ->
+ actions(As, Dir, S#transport{recv = B});
+
+actions([Dir | As], _, S)
+ when Dir == send;
+ Dir == recv ->
+ actions(As, Dir, S);
+
+actions([Msg | As], send = Dir, S)
+ when is_binary(Msg);
+ is_record(Msg, diameter_packet) ->
+ actions(As, Dir, send(Msg, S));
+
+actions([Msg | As], recv = Dir, #transport{parent = Pid} = S)
+ when is_binary(Msg);
+ is_record(Msg, diameter_packet) ->
+ diameter_peer:recv(Pid, Msg),
+ actions(As, Dir, S);
+
+actions([{defer, Tmo, Acts} | As], Dir, S) ->
+ erlang:send_after(Tmo, self(), {actions, Dir, Acts}),
+ actions(As, Dir, S);
+
+actions(CB, _, S) ->
+ S#transport{message_cb = CB}.
+
+%% cb/3
+
+cb(false, _, Msg) ->
+ [Msg];
+
+cb(CB, Dir, Msg) ->
+ diameter_lib:eval([CB, Dir, Msg]).
diff --git a/lib/diameter/test/diameter_capx_SUITE.erl b/lib/diameter/test/diameter_capx_SUITE.erl
index fdeff96a58..51b6c1d7f2 100644
--- a/lib/diameter/test/diameter_capx_SUITE.erl
+++ b/lib/diameter/test/diameter_capx_SUITE.erl
@@ -433,7 +433,7 @@ server_reject(Config, F, RC) ->
?fail({LRef, OH})
end.
-%% cliient_closed/4
+%% client_closed/4
client_closed(Config, Host, F, RC) ->
true = diameter:subscribe(?CLIENT),
diff --git a/lib/diameter/test/diameter_examples_SUITE.erl b/lib/diameter/test/diameter_examples_SUITE.erl
index e4ed2b227d..fad54d62b2 100644
--- a/lib/diameter/test/diameter_examples_SUITE.erl
+++ b/lib/diameter/test/diameter_examples_SUITE.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2013-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2013-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
diff --git a/lib/diameter/test/diameter_gen_sctp_SUITE.erl b/lib/diameter/test/diameter_gen_sctp_SUITE.erl
index 79db39ca45..d76d2bdbd3 100644
--- a/lib/diameter/test/diameter_gen_sctp_SUITE.erl
+++ b/lib/diameter/test/diameter_gen_sctp_SUITE.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -33,8 +33,8 @@
end_per_suite/1]).
%% testcases
--export([send_not_from_controlling_process/1,
- send_from_multiple_clients/1, send_from_multiple_clients/0,
+-export([send_one_from_many/1, send_one_from_many/0,
+ send_many_from_one/1, send_many_from_one/0,
receive_what_was_sent/1]).
-include_lib("kernel/include/inet_sctp.hrl").
@@ -45,16 +45,24 @@
%% Open sockets on the loopback address.
-define(ADDR, {127,0,0,1}).
-%% Snooze, nap, siesta.
--define(SLEEP(T), receive after T -> ok end).
-
%% An indescribably long number of milliseconds after which everthing
%% that should have happened has.
-define(FOREVER, 2000).
+%% How many milliseconds to tolerate between the fastest and slowest
+%% turnaround times.
+-define(VARIANCE, 100).
+
%% The first byte in each message we send as a simple guard against
%% not receiving what was sent.
--define(MAGIC, 42).
+-define(MAGIC, 0).
+
+%% Requested number of inbound/outbound streams.
+-define(STREAMS, 5).
+
+%% Success for send_multiple. Match in each testcase rather than in
+%% send_multiple itself for a better failure in common_test.
+-define(OK, {_, true, _, [true, true], [], _}).
%% ===========================================================================
@@ -62,8 +70,8 @@ suite() ->
[{timetrap, {seconds, 10}}].
all() ->
- [send_not_from_controlling_process,
- send_from_multiple_clients,
+ [send_one_from_many,
+ send_many_from_one,
receive_what_was_sent].
init_per_suite(Config) ->
@@ -81,130 +89,37 @@ end_per_suite(_Config) ->
%% ===========================================================================
-%% send_not_from_controlling_process/1
-%%
-%% This testcase failing shows gen_sctp:send/4 hanging when called
-%% outside the controlling process of the socket in question.
-
-send_not_from_controlling_process(_) ->
- Pids = send_not_from_controlling_process(),
- ?SLEEP(?FOREVER),
- try
- [] = [{P,I} || P <- Pids, I <- [process_info(P)], I /= undefined]
- after
- lists:foreach(fun(P) -> exit(P, kill) end, Pids)
- end.
-
-%% send_not_from_controlling_process/0
-%%
-%% Returns the pids of three spawned processes: a listening process, a
-%% connecting process and a sending process.
-%%
-%% The expected behaviour is that all three processes exit:
-%%
-%% - The listening process exits upon receiving an SCTP message
-%% sent by the sending process.
-%% - The connecting process exits upon listening process exit.
-%% - The sending process exits upon gen_sctp:send/4 return.
-%%
-%% The observed behaviour is that all three processes remain alive
-%% indefinitely:
-%%
-%% - The listening process never receives the SCTP message sent
-%% by the sending process.
-%% - The connecting process has an inet_reply message in its mailbox
-%% as a consequence of the call to gen_sctp:send/4 call from the
-%% sending process.
-%% - The call to gen_sctp:send/4 in the sending process doesn't return,
-%% hanging in prim_inet:getopts/2.
-
-send_not_from_controlling_process() ->
- FPid = self(),
- {L, MRef} = spawn_monitor(fun() -> listen(FPid) end),
- receive
- {?MODULE, C, S} ->
- demonitor(MRef, [flush]),
- [L,C,S];
- {'DOWN', MRef, process, _, _} = T ->
- error(T)
- end.
-
-%% listen/1
-
-listen(FPid) ->
- {ok, Sock} = open(),
- ok = gen_sctp:listen(Sock, true),
- {ok, PortNr} = inet:port(Sock),
- LPid = self(),
- spawn(fun() -> connect1(PortNr, FPid, LPid) end), %% connecting process
- Id = assoc(Sock),
- recv(Sock, Id).
-
-%% connect1/3
-
-connect1(PortNr, FPid, LPid) ->
- {ok, Sock} = open(),
- ok = gen_sctp:connect_init(Sock, ?ADDR, PortNr, []),
- Id = assoc(Sock),
- FPid ! {?MODULE,
- self(),
- spawn(fun() -> send(Sock, Id) end)}, %% sending process
- MRef = monitor(process, LPid),
- down(MRef). %% Waits with this as current_function.
-
-%% down/1
-
-down(MRef) ->
- receive {'DOWN', MRef, process, _, Reason} -> Reason end.
-
-%% send/2
-
-send(Sock, Id) ->
- ok = gen_sctp:send(Sock, Id, 0, <<0:32>>).
-
-%% ===========================================================================
-
-%% send_from_multiple_clients/0
+%% send_one_from_many/0
%%
%% Demonstrates sluggish delivery of messages.
-send_from_multiple_clients() ->
- [{timetrap, {seconds, 60}}].
+send_one_from_many() ->
+ [{timetrap, {seconds, 30}}].
-send_from_multiple_clients(_) ->
- {S, Rs} = T = send_from_multiple_clients(8, 1024),
- Max = ?FOREVER*1000,
- {false, [], _} = {Max < S,
- Rs -- [OI || {O,_} = OI <- Rs, is_integer(O)],
- T}.
+send_one_from_many(_) ->
+ ?OK = send_multiple(128, 1, 1024).
-%% send_from_multiple_clients/2
+%% send_one_from_many/2
%%
%% Opens a listening socket and then spawns a specified number of
-%% processes, each of which connects to the listening socket. Each
-%% connecting process then sends a message, whose size in bytes is
-%% passed as an argument, the listening process sends a reply
-%% containing the time at which the message was received, and the
-%% connecting process then exits upon reception of this reply.
+%% processes, each of which connects, sends a message, receives a
+%% reply, and exits.
%%
%% Returns the elapsed time for all connecting process to exit
-%% together with a list of exit reasons for the connecting processes.
-%% In the successful case a connecting process exits with the
-%% outbound/inbound transit times for the sent/received message as
-%% reason.
+%% together with a list of exit reasons. In the successful case a
+%% connecting process exits with the outbound/inbound transit times
+%% for the sent/received message as reason.
%%
%% The observed behaviour is that some outbound messages (that is,
%% from a connecting process to the listening process) can take an
%% unexpectedly long time to complete their journey. The more
-%% connecting processes, the longer the possible delay it seems.
+%% connecting processes, the longer it can take it seems.
%%
-%% eg. (With F = fun send_from_multiple_clients/2.)
-%%
-%% 5> F(2, 1024).
+%% eg. 5> send_one_from_many(2, 1024).
%% {875,[{128,116},{113,139}]}
-%% 6> F(4, 1024).
+%% 6> send_one_from_many(4, 1024).
%% {2995290,[{2994022,250},{2994071,80},{200,130},{211,113}]}
-%% 7> F(8, 1024).
+%% 7> send_one_from_many(8, 1024).
%% {8997461,[{8996161,116},
%% {2996471,86},
%% {2996278,116},
@@ -213,7 +128,7 @@ send_from_multiple_clients(_) ->
%% {213,159},
%% {373,173},
%% {376,118}]}
-%% 8> F(8, 1024).
+%% 8> send_one_from_many(8, 1024).
%% {21001891,[{20999968,128},
%% {8997891,172},
%% {8997927,91},
@@ -223,120 +138,279 @@ send_from_multiple_clients(_) ->
%% {117,98},
%% {149,125}]}
%%
-%% This turns out to have been due to SCTP resends as a consequence of
-%% the listener having an insufficient recbuf. Increasing the size
-%% solves the problem.
-%%
-
-send_from_multiple_clients(N, Sz)
- when is_integer(N), 0 < N, is_integer(Sz), 0 < Sz ->
- timer:tc(fun listen/2, [N, <<?MAGIC, 0:Sz/unit:8>>]).
-%% listen/2
-
-listen(N, Bin) ->
+send_multiple(Clients, Msgs, Sz)
+ when is_integer(Clients), 0 < Clients,
+ is_integer(Msgs), 0 < Msgs,
+ is_integer(Sz), 0 < Sz ->
+ T0 = diameter_lib:now(),
+ {S, Res} = timer:tc(fun listen/3, [Clients, Msgs, Sz]),
+ report(T0, Res),
+ Ts = lists:append(Res),
+ Outgoing = [DT || {_,{_,_,DT},{_,_,_},_} <- Ts],
+ Incoming = [DT || {_,{_,_,_},{_,_,DT},_} <- Ts],
+ Diffs = [lists:max(L) - lists:min(L) || L <- [Outgoing, Incoming]],
+ {S,
+ S < ?FOREVER*1000,
+ Diffs,
+ [D < V || V <- [?VARIANCE*1000], D <- Diffs],
+ [T || T <- Ts, [] == [T || {_,{_,_,_},{_,_,_},_} <- [T]]],
+ Res}.
+
+%% listen/3
+
+listen(Clients, Msgs, Sz) ->
{ok, Sock} = open(),
ok = gen_sctp:listen(Sock, true),
{ok, PortNr} = inet:port(Sock),
%% Spawn a middleman that in turn spawns N connecting processes,
%% collects a list of exit reasons and then exits with the list as
- %% reason. loop/3 returns when we receive this list from the
+ %% reason. accept/2 returns when we receive this list from the
%% middleman's 'DOWN'.
Self = self(),
- Fun = fun() -> exit(connect2(Self, PortNr, Bin)) end,
- {_, MRef} = spawn_monitor(fun() -> exit(fold(N, Fun)) end),
- loop(Sock, MRef, Bin).
+ Fun = fun() -> exit(client(Self, PortNr, Msgs, Sz)) end, %% start clients
+ {_, MRef} = spawn_monitor(fun() -> exit(clients(Clients, Fun)) end),
+ accept_loop(Sock, MRef).
-%% fold/2
+%% fclients/2
%%
%% Spawn N processes and collect their exit reasons in a list.
-fold(N, Fun) ->
+clients(N, Fun) ->
start(N, Fun),
acc(N, []).
+%% start/2
+
start(0, _) ->
ok;
+
start(N, Fun) ->
spawn_monitor(Fun),
start(N-1, Fun).
+%% acc/2
+
acc(0, Acc) ->
Acc;
+
acc(N, Acc) ->
receive
{'DOWN', _MRef, process, _, RC} ->
acc(N-1, [RC | Acc])
end.
-%% loop/3
+%% accept_loop/2
-loop(Sock, MRef, Bin) ->
+accept_loop(Sock, MRef) ->
+ ok = inet:setopts(Sock, [{active, once}]),
receive
- ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = Id}], B})
- when is_binary(B) ->
- Sz = size(Bin),
- {Sz, Bin} = {size(B), B}, %% assert
- ok = send(Sock, Id, mark(Bin)),
- loop(Sock, MRef, Bin);
+ ?SCTP(Sock, {_, #sctp_assoc_change{state = comm_up,
+ outbound_streams = OS,
+ assoc_id = Id}}) ->
+ Self = self(),
+ TPid = spawn(fun() -> assoc(monitor(process, Self), Id, OS) end),
+ NewSock = peeloff(Sock, Id, TPid),
+ TPid ! {peeloff, NewSock},
+ accept_loop(Sock, MRef);
?SCTP(Sock, _) ->
- loop(Sock, MRef, Bin);
+ accept_loop(Sock, MRef);
{'DOWN', MRef, process, _, Reason} ->
- Reason
+ Reason;
+ T ->
+ error(T)
end.
-%% connect2/3
+%% assoc/3
+%%
+%% Server process that answers incoming messages as long as the parent
+%% lives.
+
+assoc(MRef, _Id, OS)
+ when is_reference(MRef) ->
+ {peeloff, Sock} = receive T -> T end,
+ recv_loop(Sock, false, sender(Sock, false, OS), MRef).
+
+%% recv_loop/4
+
+recv_loop(Sock, Id, Pid, MRef) ->
+ ok = inet:setopts(Sock, [{active, once}]),
+ recv(Sock, Id, Pid, MRef, receive T -> T end).
+
+%% recv/5
+
+%% Association id can change on a peeloff socket on some versions of
+%% Solaris.
+recv(Sock,
+ false,
+ Pid,
+ MRef,
+ ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = Id}], _})
+ = T) ->
+ Pid ! {assoc_id, Id},
+ recv(Sock, Id, Pid, MRef, T);
+
+recv(Sock, Id, Pid, MRef, ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = I}], B}))
+ when is_binary(B) ->
+ T2 = diameter_lib:now(),
+ Id = I, %% assert
+ <<?MAGIC, Bin/binary>> = B, %% assert
+ {[_,_,_,Sz] = L, Bytes} = unmark(Bin),
+ Sz = size(Bin) - Bytes, %% assert
+ <<_:Bytes/binary, Body:Sz/binary>> = Bin,
+ send(Pid, [T2|L], Body), %% answer
+ recv_loop(Sock, Id, Pid, MRef);
+
+recv(Sock, Id, Pid, MRef, ?SCTP(Sock, _)) ->
+ recv_loop(Sock, Id, Pid, MRef);
+
+recv(_, _, _, MRef, {'DOWN', MRef, process, _, Reason}) ->
+ Reason;
+
+recv(_, _, _, _, T) ->
+ error(T).
-connect2(Pid, PortNr, Bin) ->
- monitor(process, Pid),
+%% send/3
- {ok, Sock} = open(),
- ok = gen_sctp:connect_init(Sock, ?ADDR, PortNr, []),
- Id = assoc(Sock),
+send(Pid, Header, Body) ->
+ Pid ! {send, Header, Body}.
- %% T1 = time before send
- %% T2 = time after listening process received our message
- %% T3 = time after reply is received
+%% sender/3
+%%
+%% Start a process that sends, so as not to block the controlling process.
- T1 = diameter_lib:now(),
- ok = send(Sock, Id, Bin),
- T2 = unmark(recv(Sock, Id)),
- T3 = diameter_lib:now(),
- {diameter_lib:micro_diff(T2, T1), %% Outbound
- diameter_lib:micro_diff(T3, T2)}. %% Inbound
+sender(Sock, Id, OS) ->
+ Pid = self(),
+ spawn(fun() -> send_loop(Sock, Id, OS, 1, monitor(process, Pid)) end).
-%% recv/2
+%% send_loop/5
-recv(Sock, Id) ->
+send_loop(Sock, Id, OS, N, MRef) ->
receive
- ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = I}], Bin})
- when is_binary(Bin) ->
- Id = I, %% assert
- Bin;
- ?SCTP(S, _) ->
- Sock = S, %% assert
- recv(Sock, Id);
+ {assoc_id, I} ->
+ send_loop(Sock, I, OS, N, MRef);
+ {send, L, Body} ->
+ Stream = N rem OS,
+ ok = send(Sock, Id, Stream, mark(Body, [N, Stream | L])),
+ send_loop(Sock, Id, OS, N+1, MRef);
+ {'DOWN', MRef, process, _, _} = T ->
+ T;
T ->
- exit(T)
+ error(T)
end.
-%% send/3
+%% peeloff/3
-send(Sock, Id, Bin) ->
- gen_sctp:send(Sock, Id, 0, Bin).
+peeloff(LSock, Id, TPid) ->
+ {ok, Sock} = gen_sctp:peeloff(LSock, Id),
+ ok = gen_sctp:controlling_process(Sock, TPid),
+ Sock.
-%% mark/1
+%% client/4
-mark(Bin) ->
- Info = term_to_binary(diameter_lib:now()),
- <<Info/binary, Bin/binary>>.
+client(Pid, PortNr, Msgs, Sz) ->
+ monitor(process, Pid),
+ {ok, Sock} = open(),
+ ok = gen_sctp:connect_init(Sock, ?ADDR, PortNr, []),
+ recv_loop(Sock, Msgs, Sz).
+
+%% recv_loop/3
+
+recv_loop(_, 0, T) ->
+ [_,_|Acc] = T,
+ Acc;
+
+recv_loop(Sock, Msgs, T) ->
+ ok = inet:setopts(Sock, [{active, once}]),
+ {I, NewT} = recv(Sock, Msgs, T, receive X -> X end),
+ recv_loop(Sock, Msgs - I, NewT).
+
+%% recv/4
+
+recv(Sock, Msgs, Sz, ?SCTP(Sock, {_, #sctp_assoc_change{} = A})) ->
+ #sctp_assoc_change{state = comm_up, %% assert
+ assoc_id = Id,
+ outbound_streams = OS}
+ = A,
+ true = is_integer(Sz), %% assert
+ send_n(Msgs, sender(Sock, Id, OS), Sz),
+ {0, [Id, OS]};
+
+recv(Sock, _, T, ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = Id}], Bin})) ->
+ T4 = diameter_lib:now(),
+ [Id, OS | Acc] = T,
+ {1, [Id, OS, stat(T4, Bin) | Acc]};
+
+recv(Sock, _, T, ?SCTP(Sock, _)) ->
+ {0, [_,_|_] = T};
+
+recv(_, _, _, T) ->
+ error(T).
+
+%% send_n/3
+%%
+%% Send messages to the server from dedicated processes.
+
+send_n(0, _, _) ->
+ ok;
+send_n(N, Pid, Sz) ->
+ M = rand:uniform(255),
+ send(Pid, [Sz], binary:copy(<<M>>, Sz)),
+ send_n(N-1, Pid, Sz).
+
+%% send/4
+
+send(Sock, Id, Stream, Bin) ->
+ case gen_sctp:send(Sock, Id, Stream, <<?MAGIC, Bin/binary>>) of
+ {error, eagain} ->
+ send(Sock, Id, Stream, Bin);
+ RC ->
+ RC
+ end.
+
+%% stat/2
+
+stat(T4, <<?MAGIC, Bin/binary>>) ->
+ %% T1 = time at send
+ %% T2 = time at reception by server
+ %% T3 = time at reception by server's sender
+ %% T4 = time at reception of answer
+
+ {[T3,NI,SI,T2,T1,NO,SO,Sz], Bytes} = unmark(Bin),
+
+ Sz = size(Bin) - Bytes, %% assert
+
+ {T1,
+ {NO, SO, diameter_lib:micro_diff(T2, T1)}, %% Outbound
+ {NI, SI, diameter_lib:micro_diff(T4, T3)}, %% Inbound
+ T4}.
+
+%% mark/2
+
+mark(Bin, T) ->
+ Info = term_to_binary([diameter_lib:now() | T]),
+ <<Info/binary, Bin/binary>>.
+
%% unmark/1
unmark(Bin) ->
- binary_to_term(Bin).
+ T = binary_to_term(Bin),
+ {T, size(term_to_binary(T))}.
+
+%% ===========================================================================
+
+%% send_many_from_one/0
+%%
+%% Demonstrates sluggish delivery of messages.
+
+send_many_from_one() ->
+ [{timetrap, {seconds, 30}}].
+
+send_many_from_one(_) ->
+ ?OK = send_multiple(1, 128, 1024).
%% ===========================================================================
@@ -345,7 +419,7 @@ unmark(Bin) ->
%% Demonstrates reception of a message that differs from that sent.
receive_what_was_sent(_Config) ->
- send_from_multiple_clients(1, 1024*32). %% fails
+ ?OK = send_multiple(1, 1, 1024*32).
%% ===========================================================================
@@ -357,16 +431,23 @@ open() ->
%% open/1
open(Opts) ->
- gen_sctp:open([{ip, ?ADDR}, {port, 0}, {active, true}, binary,
+ gen_sctp:open([{ip, ?ADDR}, {port, 0}, {active, false}, binary,
+ {sctp_initmsg, #sctp_initmsg{num_ostreams = ?STREAMS,
+ max_instreams = ?STREAMS}},
{recbuf, 1 bsl 16}, {sndbuf, 1 bsl 16}
| Opts]).
-%% assoc/1
+%% report/2
-assoc(Sock) ->
- receive
- ?SCTP(Sock, {_, #sctp_assoc_change{state = S,
- assoc_id = Id}}) ->
- comm_up = S, %% assert
- Id
- end.
+report(T0, Ts) ->
+ ct:pal("~p~n", [lists:sort([sort([{diameter_lib:micro_diff(T1,T0),
+ OT,
+ IT,
+ diameter_lib:micro_diff(T4,T0)}
+ || {T1,OT,IT,T4} <- L])
+ || L <- Ts])]).
+
+%% sort/1
+
+sort(L) ->
+ lists:sort(fun({_,{N,_,_},_,_}, {_,{M,_,_},_,_}) -> N =< M end, L).
diff --git a/lib/diameter/test/diameter_gen_tcp_SUITE.erl b/lib/diameter/test/diameter_gen_tcp_SUITE.erl
index 2be2cf4b35..db42ea813e 100644
--- a/lib/diameter/test/diameter_gen_tcp_SUITE.erl
+++ b/lib/diameter/test/diameter_gen_tcp_SUITE.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2014-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2014-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -54,7 +54,7 @@ all() ->
send_long(_) ->
{Sock, SendF} = connection(),
- B = list_to_binary(lists:duplicate(1 bsl 20, $X)),
+ B = binary:copy(<<$X>>, 1 bsl 20),
ok = SendF(B),
B = recv(Sock, size(B), []).
diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl
index 4c82d4dee2..c6d63a9345 100644
--- a/lib/diameter/test/diameter_traffic_SUITE.erl
+++ b/lib/diameter/test/diameter_traffic_SUITE.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -27,6 +27,8 @@
-export([suite/0,
all/0,
groups/0,
+ init_per_suite/1,
+ end_per_suite/1,
init_per_group/2,
end_per_group/2,
init_per_testcase/2,
@@ -90,7 +92,6 @@
send_multiple_filters_2/1,
send_multiple_filters_3/1,
send_anything/1,
- outstanding/1,
remove_transports/1,
empty/1,
stop_services/1,
@@ -106,6 +107,9 @@
handle_error/6,
handle_request/3]).
+%% diameter_{tcp,sctp} callbacks
+-export([message/3]).
+
-include("diameter.hrl").
-include("diameter_gen_base_rfc3588.hrl").
-include("diameter_gen_base_accounting.hrl").
@@ -145,21 +149,29 @@
%% Whether to decode stringish Diameter types to strings, or leave
%% them as binary.
--define(STRING_DECODES, [true, false]).
+-define(STRING_DECODES, [false, true]).
%% Which transport protocol to use.
-define(TRANSPORTS, [tcp, sctp]).
+%% Send from a dedicated process?
+-define(SENDERS, [true, false]).
+
+%% Message callbacks from diameter_{tcp,sctp}?
+-define(CALLBACKS, [true, false]).
+
-record(group,
{transport,
+ strings,
client_service,
client_encoding,
client_dict0,
- client_strings,
+ client_sender,
server_service,
server_encoding,
server_container,
- server_strings}).
+ server_sender,
+ server_throttle}).
%% Not really what we should be setting unless the message is sent in
%% the common application but diameter doesn't care.
@@ -243,76 +255,128 @@ suite() ->
[{timetrap, {seconds, 10}}].
all() ->
- [start, result_codes, {group, traffic}, outstanding, empty, stop].
+ [start, result_codes, {group, traffic}, empty, stop].
groups() ->
- Ts = tc(),
- Sctp = ?util:have_sctp(),
- [{B, [P], Ts} || {B,P} <- [{true, shuffle}, {false, parallel}]]
+ [{P, [P], Ts} || Ts <- [tc(tc())], P <- [shuffle, parallel]]
++
- [{?util:name([T,R,D,A,C,SD,CD]),
+ [{?util:name([T,R,D,A,C,S,SS,ST,CS]),
[],
- [start_services,
- add_transports,
- result_codes,
- {group, SD orelse CD},
- remove_transports,
- stop_services]}
+ [{group, if S -> shuffle; not S -> parallel end}]}
|| T <- ?TRANSPORTS,
- T /= sctp orelse Sctp,
R <- ?ENCODINGS,
D <- ?RFCS,
A <- ?ENCODINGS,
C <- ?CONTAINERS,
- SD <- ?STRING_DECODES,
- CD <- ?STRING_DECODES]
+ S <- ?STRING_DECODES,
+ SS <- ?SENDERS,
+ ST <- ?CALLBACKS,
+ CS <- ?SENDERS]
+ ++
+ [{T, [], groups([[T,R,D,A,C,S,SS,ST,CS]
+ || R <- ?ENCODINGS,
+ D <- ?RFCS,
+ A <- ?ENCODINGS,
+ C <- ?CONTAINERS,
+ S <- ?STRING_DECODES,
+ SS <- ?SENDERS,
+ ST <- ?CALLBACKS,
+ CS <- ?SENDERS,
+ SS orelse CS])} %% avoid deadlock
+ || T <- ?TRANSPORTS]
++
- [{traffic, [], [{group, ?util:name([T,R,D,A,C,SD,CD])}
- || T <- ?TRANSPORTS,
- T /= sctp orelse Sctp,
- R <- ?ENCODINGS,
- D <- ?RFCS,
- A <- ?ENCODINGS,
- C <- ?CONTAINERS,
- SD <- ?STRING_DECODES,
- CD <- ?STRING_DECODES]}].
+ [{traffic, [], [{group, T} || T <- ?TRANSPORTS]}].
+
+%groups(_) -> %% debug
+% Name = [sctp,record,rfc6733,record,pkt,false,false,false,false],
+% [{group, ?util:name(Name)}];
+groups(Names) ->
+ [{group, ?util:name(L)} || L <- Names].
+
+%tc([N|_]) -> %% debug
+% [N];
+tc(L) ->
+ L.
+
+%% --------------------
+
+init_per_suite(Config) ->
+ [{sctp, ?util:have_sctp()} | Config].
+
+end_per_suite(_Config) ->
+ ok.
+
+%% --------------------
+
+init_per_group(Name, Config)
+ when Name == shuffle;
+ Name == parallel ->
+ start_services(Config),
+ add_transports(Config);
+
+init_per_group(sctp = Name, Config) ->
+ {_, Sctp} = lists:keyfind(Name, 1, Config),
+ if Sctp ->
+ Config;
+ true ->
+ {skip, Name}
+ end;
init_per_group(Name, Config) ->
case ?util:name(Name) of
- [T,R,D,A,C,SD,CD] ->
+ [T,R,D,A,C,S,SS,ST,CS] ->
G = #group{transport = T,
+ strings = S,
client_service = [$C|?util:unique_string()],
client_encoding = R,
client_dict0 = dict0(D),
- client_strings = CD,
+ client_sender = CS,
server_service = [$S|?util:unique_string()],
server_encoding = A,
server_container = C,
- server_strings = SD},
- [{group, G} | Config];
+ server_sender = SS,
+ server_throttle = ST},
+ %% Limit the number of testcase, since the number of
+ %% groups is large.
+ All = ?util:scramble(tc()),
+ TCs = lists:sublist(All, rand:uniform(32)),
+ [{group, G}, {runlist, TCs} | Config];
_ ->
Config
end.
+end_per_group(Name, Config)
+ when Name == shuffle;
+ Name == parallel ->
+ remove_transports(Config),
+ stop_services(Config);
+
end_per_group(_, _) ->
ok.
+%% --------------------
+
%% Skip testcases that can reasonably fail under SCTP.
init_per_testcase(Name, Config) ->
- case [skip || #group{transport = sctp}
- <- [proplists:get_value(group, Config)],
- send_maxlen == Name
- orelse send_long == Name]
+ TCs = proplists:get_value(runlist, Config, []),
+ Run = [] == TCs orelse lists:member(Name, TCs),
+ case [G || #group{transport = sctp} = G
+ <- [proplists:get_value(group, Config)]]
of
- [skip] ->
+ [_] when Name == send_maxlen;
+ Name == send_long ->
{skip, sctp};
- [] ->
+ _ when not Run ->
+ {skip, random};
+ _ ->
[{testcase, Name} | Config]
end.
end_per_testcase(_, _) ->
ok.
+%% --------------------
+
%% Testcases to run when services are started and connections
%% established.
tc() ->
@@ -377,28 +441,34 @@ start(_Config) ->
ok = diameter:start().
start_services(Config) ->
- #group{client_service = CN,
- client_strings = CD,
- server_service = SN,
- server_strings = SD}
+ #group{strings = S,
+ client_service = CN,
+ server_service = SN}
= group(Config),
- ok = diameter:start_service(SN, ?SERVICE(SN, SD)),
+ ok = diameter:start_service(SN, ?SERVICE(SN, S)),
ok = diameter:start_service(CN, [{sequence, ?CLIENT_MASK}
- | ?SERVICE(CN, CD)]).
+ | ?SERVICE(CN, S)]).
add_transports(Config) ->
#group{transport = T,
client_service = CN,
- server_service = SN}
+ client_sender = CS,
+ server_service = SN,
+ server_sender = SS,
+ server_throttle = ST}
= group(Config),
LRef = ?util:listen(SN,
- T,
+ [T,
+ {sender, SS},
+ {message_cb, ST andalso {?MODULE, message, [4]}}
+ | [{packet, hd(?util:scramble([false, raw]))}
+ || T == sctp andalso CS]],
[{capabilities_cb, fun capx/2},
{pool_size, 8},
{spawn_opt, [{min_heap_size, 8096}]},
{applications, apps(rfc3588)}]),
Cs = [?util:connect(CN,
- T,
+ [T, {sender, CS}],
LRef,
[{id, Id},
{capabilities, [{'Origin-State-Id', origin(Id)}]},
@@ -415,11 +485,6 @@ apps(D0) ->
D = dict0(D0),
[acct(D), D].
-%% Ensure there are no outstanding requests in request table.
-outstanding(_Config) ->
- [] = [T || T <- ets:tab2list(diameter_request),
- is_atom(element(1,T))].
-
remove_transports(Config) ->
#group{client_service = CN,
server_service = SN}
@@ -689,14 +754,14 @@ send_unexpected_mandatory(Config) ->
%% Send something long that will be fragmented by TCP.
send_long(Config) ->
Req = ['STR', {'Termination-Cause', ?LOGOUT},
- {'User-Name', [lists:duplicate(1 bsl 20, $X)]}],
+ {'User-Name', [binary:copy(<<$X>>, 1 bsl 20)]}],
['STA', {'Session-Id', _}, {'Result-Code', ?SUCCESS} | _]
= call(Config, Req).
%% Send something longer than the configure incoming_maxlen.
send_maxlen(Config) ->
Req = ['STR', {'Termination-Cause', ?LOGOUT},
- {'User-Name', [lists:duplicate(1 bsl 21, $X)]}],
+ {'User-Name', [binary:copy(<<$X>>, 1 bsl 21)]}],
{timeout, _} = call(Config, Req).
%% Send something for which pick_peer finds no suitable peer.
@@ -875,7 +940,7 @@ group(Config) ->
#group{} = proplists:get_value(group, Config).
string(V, Config) ->
- #group{client_strings = B} = group(Config),
+ #group{strings = B} = group(Config),
decode(V,B).
decode(S, true)
@@ -995,7 +1060,7 @@ pick_peer(Peers, _, [$C|_], _State, {send_detach, Group}, _, {_,_}) ->
find(#group{client_service = CN,
server_encoding = A,
server_container = C},
- Peers) ->
+ [_|_] = Peers) ->
Id = {A,C},
[P] = [P || P <- Peers, id(Id, P, CN)],
{ok, P}.
@@ -1429,3 +1494,33 @@ request(#diameter_base_STR{'Session-Id' = SId},
%% send_error/send_timeout
request(#diameter_base_RAR{}, _Caps) ->
receive after 2000 -> {protocol_error, ?TOO_BUSY} end.
+
+%% message/3
+%%
+%% Limit the number of messages received. More can be received if read
+%% in the same packet.
+
+message(recv = D, {[_], Bin}, N) ->
+ message(D, Bin, N);
+message(Dir, #diameter_packet{bin = Bin}, N) ->
+ message(Dir, Bin, N);
+
+%% incoming request
+message(recv, <<_:32, 1, _/bits>> = Bin, N) ->
+ [Bin, 1 < N, fun ?MODULE:message/3, N-1];
+
+%% incoming answer
+message(recv, Bin, _) ->
+ [Bin];
+
+%% outgoing
+message(send, Bin, _) ->
+ [Bin];
+
+%% sent request
+message(ack, <<_:32, 1, _/bits>>, _) ->
+ [];
+
+%% sent answer or discarded request
+message(ack, _, N) ->
+ [0 =< N, fun ?MODULE:message/3, N+1].
diff --git a/lib/diameter/test/diameter_transport_SUITE.erl b/lib/diameter/test/diameter_transport_SUITE.erl
index c94f46b7a5..9d981d0a2b 100644
--- a/lib/diameter/test/diameter_transport_SUITE.erl
+++ b/lib/diameter/test/diameter_transport_SUITE.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -294,10 +294,17 @@ init(gen_accept, {Prot, Ref}) ->
{ok, PortNr} = inet:port(LSock),
true = diameter_reg:add_new(?TEST_LISTENER(Ref, PortNr)),
- %% Accept a connection, receive a message and send it back.
+ %% Accept a connection, receive a message send it back, and wait
+ %% for the peer to close the connection.
{ok, Sock} = gen_accept(Prot, LSock),
Bin = gen_recv(Prot, Sock),
- ok = gen_send(Prot, Sock, Bin);
+ ok = gen_send(Prot, Sock, Bin),
+ receive
+ {tcp_closed, Sock} = T ->
+ T;
+ ?SCTP(Sock, {_, #sctp_assoc_change{}}) = T ->
+ T
+ end;
init(connect, {Prot, Ref}) ->
%% Lookup the peer's listening socket.
@@ -311,12 +318,7 @@ init(connect, {Prot, Ref}) ->
%% Send a message and receive it back.
Bin = make_msg(),
TPid ! ?TMSG({send, Bin}),
- Bin = bin(Prot, ?RECV(?TMSG({recv, P}), P)),
-
- %% Expect the transport process to die as a result of the peer
- %% closing the connection.
- MRef = erlang:monitor(process, TPid),
- ?RECV({'DOWN', MRef, process, _, _}).
+ Bin = bin(Prot, ?RECV(?TMSG({recv, P}), P)).
bin(sctp, #diameter_packet{bin = Bin}) ->
Bin;
@@ -336,15 +338,11 @@ make_msg() ->
<<1:8, Len:24, Bin/binary>>.
%% crypto:rand_bytes/1 isn't available on all platforms (since openssl
-%% isn't) so roll our own.
+%% isn't) so roll our own. Not particularly random, but less verbose
+%% in trace.
rand_bytes(N) ->
- rand_bytes(N, <<>>).
-
-rand_bytes(0, Bin) ->
- Bin;
-rand_bytes(N, Bin) ->
Oct = rand:uniform(256) - 1,
- rand_bytes(N-1, <<Oct, Bin/binary>>).
+ binary:copy(<<Oct>>, N).
%% ===========================================================================
diff --git a/lib/diameter/test/diameter_util.erl b/lib/diameter/test/diameter_util.erl
index cca28dd23c..03f79096ac 100644
--- a/lib/diameter/test/diameter_util.erl
+++ b/lib/diameter/test/diameter_util.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -172,18 +172,7 @@ recvl([{MRef, F} | L], Ref, Fun, Acc) ->
%% Sort a list into random order.
scramble(L) ->
- foldl(fun(true, _, S, false) -> S end,
- false,
- [[fun s/1, L]]).
-
-s(L) ->
- s([], L).
-
-s(Acc, []) ->
- Acc;
-s(Acc, L) ->
- {H, [T|Rest]} = lists:split(rand:uniform(length(L)) - 1, L),
- s([T|Acc], H ++ Rest).
+ [X || {_,X} <- lists:sort([{rand:uniform(), T} || T <- L])].
%% ---------------------------------------------------------------------------
%% unique_string/0
@@ -195,21 +184,22 @@ unique_string() ->
%% have_sctp/0
have_sctp() ->
- case erlang:system_info(system_architecture) of
- %% We do not support the sctp version present in solaris
- %% version "sparc-sun-solaris2.10", that behaves differently
- %% from later versions and linux
- "sparc-sun-solaris2.10" ->
- false;
- _->
- case gen_sctp:open() of
- {ok, Sock} ->
- gen_sctp:close(Sock),
- true;
- {error, E} when E == eprotonosupport;
- E == esocktnosupport -> %% fail on any other reason
- false
- end
+ have_sctp(erlang:system_info(system_architecture)).
+
+%% Don't run SCTP on platforms where it's either known to be flakey or
+%% isn't available.
+
+have_sctp("sparc-sun-solaris2.10") ->
+ false;
+
+have_sctp(_) ->
+ case gen_sctp:open() of
+ {ok, Sock} ->
+ gen_sctp:close(Sock),
+ true;
+ {error, E} when E == eprotonosupport;
+ E == esocktnosupport -> %% fail on any other reason
+ false
end.
%% ---------------------------------------------------------------------------
@@ -313,17 +303,23 @@ listen(SvcName, Prot, Opts) ->
connect(Client, Prot, LRef) ->
connect(Client, Prot, LRef, []).
-connect(Client, Prot, LRef, Opts) ->
+connect(Client, ProtOpts, LRef, Opts) ->
+ Prot = head(ProtOpts),
[PortNr] = lport(Prot, LRef),
Client = diameter:service_info(Client, name), %% assert
true = diameter:subscribe(Client),
- Ref = add_transport(Client, {connect, opts(Prot, PortNr) ++ Opts}),
+ Ref = add_transport(Client, {connect, opts(ProtOpts, PortNr) ++ Opts}),
true = transport(Client, Ref), %% assert
diameter_lib:for_n(fun(_) -> ok = up(Client, Ref, Prot, PortNr) end,
proplists:get_value(pool_size, Opts, 1)),
Ref.
+head([T|_]) ->
+ T;
+head(T) ->
+ T.
+
up(Client, Ref, Prot, PortNr) ->
receive
{diameter_event, Client, {up, Ref, _, _, _}} -> ok
@@ -366,10 +362,13 @@ tmod(sctp) ->
tmod(any) ->
[diameter_sctp, diameter_tcp].
-opts(Prot, T) ->
- tmo(T, lists:append([[{transport_module, M}, {transport_config, C}]
+opts([Prot | Opts], T) ->
+ tmo(T, lists:append([[{transport_module, M}, {transport_config, C ++ Opts}]
|| M <- tmod(Prot),
- C <- [cfg(M,T) ++ cfg(M) ++ cfg(T)]])).
+ C <- [cfg(M,T) ++ cfg(M) ++ cfg(T)]]));
+
+opts(Prot, T) ->
+ opts([Prot], T).
tmo(listen, Opts) ->
Opts;
diff --git a/lib/diameter/test/diameter_watchdog_SUITE.erl b/lib/diameter/test/diameter_watchdog_SUITE.erl
index 6d22ddcc18..39c4f051a5 100644
--- a/lib/diameter/test/diameter_watchdog_SUITE.erl
+++ b/lib/diameter/test/diameter_watchdog_SUITE.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -44,13 +44,8 @@
-export([peer_up/3,
peer_down/3]).
-%% gen_tcp-ish interface
--export([listen/2,
- accept/1,
- connect/3,
- send/2,
- setopts/2,
- close/1]).
+%% diameter_tcp message_cb
+-export([message/3]).
-include("diameter.hrl").
-include("diameter_ct.hrl").
@@ -161,9 +156,9 @@ reopen(Type, Test, Ref, Wd, N, M) ->
reopen(Type, Test, SvcName, TRef, Wd, N, M).
cfg(Type, Type, Wd) ->
- {Wd, [], []};
+ {Wd, [], false};
cfg(_Type, _Test, _Wd) ->
- {?WD(?PEER_WD), [{okay, 0}], [{module, ?MODULE}]}.
+ {?WD(?PEER_WD), [{okay, 0}], true}.
%% reopen/7
@@ -346,7 +341,7 @@ recv_reopen(listen, Ref) ->
%% reg/3
%%
%% Lookup the pid of the transport process and publish a term for
-%% send/2 to lookup.
+%% message/3 to lookup.
reg(TRef, SvcName, T) ->
TPid = tpid(TRef, diameter:service_info(SvcName, transport)),
true = diameter_reg:add_new({?MODULE, TPid, T}).
@@ -394,7 +389,7 @@ suspect(_) ->
suspect(Type, Fake, Ref, N)
when is_reference(Ref) ->
{SvcName, TRef}
- = start(Type, Ref, {?WD(10000), [{suspect, N}], mod(Fake)}),
+ = start(Type, Ref, {?WD(10000), [{suspect, N}], Fake}),
{initial, okay} = ?WD_EVENT(TRef),
suspect(TRef, Fake, SvcName, N);
@@ -436,11 +431,6 @@ abuse([F|A], Test) ->
abuse(F, Test) ->
abuse([F], Test).
-mod(true) ->
- [{module, ?MODULE}];
-mod(false) ->
- [].
-
%% ===========================================================================
%% # okay/1
%% ===========================================================================
@@ -456,7 +446,7 @@ okay(Type, Fake, Ref, N)
{SvcName, TRef}
= start(Type, Ref, {?WD(10000),
[{okay, choose(Fake, 0, N)}],
- mod(Fake)}),
+ Fake}),
{initial, okay} = ?WD_EVENT(TRef),
okay(TRef,
Fake,
@@ -515,12 +505,17 @@ start(Type, Ref, T) ->
true = diameter_reg:add_new({Type, Ref, Name}),
{Name, TRef}.
-opts(Type, Ref, {Timer, Config, Mod}) ->
+opts(Type, Ref, {Timer, Config, Fake})
+ when is_boolean(Fake) ->
[{transport_module, diameter_tcp},
- {transport_config, Mod ++ [{ip, ?ADDR}, {port, 0}] ++ cfg(Type, Ref)},
+ {transport_config, mod(Fake) ++ [{ip, ?ADDR}, {port, 0}]
+ ++ cfg(Type, Ref)},
{watchdog_timer, Timer},
{watchdog_config, Config}].
+mod(B) ->
+ [{message_cb, [fun message/3, capx]} || B].
+
cfg(listen, _) ->
[];
cfg(connect, Ref) ->
@@ -531,37 +526,29 @@ cfg(connect, Ref) ->
%% ===========================================================================
-listen(PortNr, Opts) ->
- gen_tcp:listen(PortNr, Opts).
-
-accept(LSock) ->
- gen_tcp:accept(LSock).
+%% message/3
-connect(Addr, Port, Opts) ->
- gen_tcp:connect(Addr, Port, Opts).
+message(send, Bin, X) ->
+ send(Bin, X);
-setopts(Sock, Opts) ->
- inet:setopts(Sock, Opts).
+message(recv, Bin, _) ->
+ [Bin];
-send(Sock, Bin) ->
- send(getr(config), Sock, Bin).
-
-close(Sock) ->
- gen_tcp:close(Sock).
+message(_, _, _) ->
+ [].
-%% send/3
+%% send/2
%% First outgoing message from a new transport process is CER/CEA.
%% Remaining outgoing messages are either DWR or DWA.
-send(undefined, Sock, Bin) ->
- <<_:32, _:8, 257:24, _/binary>> = Bin,
- putr(config, init),
- gen_tcp:send(Sock, Bin);
+send(Bin, capx) ->
+ <<_:32, _:8, 257:24, _/binary>> = Bin, %% assert on CER/CEA
+ [Bin, fun message/3, init];
%% Outgoing DWR: fake reception of DWA. Use the fact that AVP values
%% are ignored. This is to ensure that the peer's watchdog state
%% transitions are only induced by responses to messages it sends.
-send(_, Sock, <<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>) ->
+send(<<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>, _) ->
Pkt = #diameter_packet{header = #diameter_header{version = 1,
end_to_end_id = EId,
hop_by_hop_id = HId},
@@ -569,47 +556,36 @@ send(_, Sock, <<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>) ->
{'Origin-Host', "XXX"},
{'Origin-Realm', ?REALM}]},
#diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt),
- self() ! {tcp, Sock, Bin},
- ok;
+ [recv, Bin];
%% First outgoing DWA.
-send(init, Sock, Bin) ->
+send(Bin, init) ->
[{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, self(), '_'}),
- putr(config, T),
- send(Sock, Bin);
+ send(Bin, T);
%% First transport process.
-send({SvcName, {_,_,_} = T}, Sock, Bin) ->
+send(Bin, {SvcName, {_,_,_} = T}) ->
[{'Origin-Host', _} = OH, {'Origin-Realm', _} = OR | _]
= ?SERVICE(SvcName),
putr(origin, [OH, OR]),
- putr(config, T),
- send(Sock, Bin);
+ send(Bin, T);
%% Discard DWA, failback after another timeout in the peer.
-send({Wd, 0 = No, Msg}, Sock, Bin) ->
+send(Bin, {Wd, 0 = No, Msg}) ->
Origin = getr(origin),
- spawn(fun() -> failback(?ONE_WD(Wd), Msg, Sock, Bin, Origin) end),
- putr(config, No),
- ok;
+ [{defer, ?ONE_WD(Wd), [msg(Msg, Bin, Origin)]}, fun message/3, No];
%% Send DWA while we're in the mood (aka 0 < N).
-send({Wd, N, Msg}, Sock, Bin) ->
- putr(config, {Wd, N-1, Msg}),
- gen_tcp:send(Sock, Bin);
+send(Bin, {Wd, N, Msg}) ->
+ [Bin, fun message/3, {Wd, N-1, Msg}];
%% Discard DWA.
-send(0, _Sock, _Bin) ->
- ok;
+send(_Bin, 0 = No) ->
+ [fun message/3, No];
%% Send DWA.
-send(N, Sock, <<_:32, 0:1, _:7, 280:24, _/binary>> = Bin) ->
- putr(config, N-1),
- gen_tcp:send(Sock, Bin).
-
-failback(Tmo, Msg, Sock, Bin, Origin) ->
- timer:sleep(Tmo),
- ok = gen_tcp:send(Sock, msg(Msg, Bin, Origin)).
+send(<<_:32, 0:1, _:7, 280:24, _/binary>> = DWA, N) ->
+ [DWA, fun message/3, N-1].
%% msg/2