From 4bad839a0931d7a926118c49f20cfab6ebcfdb91 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 11 Jun 2017 16:27:21 +0200 Subject: Remove trailing whitespace From commits 5ca5fb71 and 58091992. --- lib/diameter/src/base/diameter_config.erl | 2 +- lib/diameter/src/base/diameter_reg.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index 245a3ea7ac..386ae967f2 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -277,7 +277,7 @@ start_link() -> start_link(T) -> proc_lib:start_link(?MODULE, init, [T], infinity, []). - + state() -> call(state). diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 9027130063..4910979219 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -137,7 +137,7 @@ match(Pat) -> match(Pat, Pid) -> ets:match_object(?TABLE, {Pat, Pid}). - + %% =========================================================================== %% # wait(Pat) %% -- cgit v1.2.3 From 6b7ec6db6bd999a509e867aef438b3b8769cae43 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 15 Mar 2017 16:42:44 +0100 Subject: Fix broken discard acknowledgement A transport process can request acknowledgement of the fate of an incoming message to a specified pid, causing it to receive one of {diameter, {request|answer, pid()} | discard} depending on whether or not diameter passes the message off to a handler process. This was broken in commit a4da06a5 (since recv/3 threw a message that should be received), but is of little consequence since the interface isn't yet documented and is only used from diameter_tcp with configuration that will soon change. --- lib/diameter/src/base/diameter_watchdog.erl | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index f28b8f2910..4484b7ee2c 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -590,11 +590,9 @@ incoming(Name, Pkt, false, S) -> recv(Name, Pkt, S); incoming(Name, Pkt, NPid, S) -> - try - recv(Name, Pkt, S) - after - NPid ! {diameter, discard} - end. + NS = recv(Name, Pkt, S), + NPid ! {diameter, discard}, + NS. %% recv/3 -- cgit v1.2.3 From 2d74fa618f3a34a5487f5de37c4f6e2870b58273 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 15 Mar 2017 22:33:17 +0100 Subject: Fix handling of message length errors Message length errors in incoming messages were misinterpreted with transport_opt() {length_errors, exit} due to the throw introduced in commit 2ffb288: the corresponding catch in incoming/2 caught errors thrown by close/1, leading to failure when the error reason was interpreted as a diameter_packet record. Do away with the throw, that also caused woe in the parent commit. --- lib/diameter/src/base/diameter_peer_fsm.erl | 42 +++++++++++++---------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 7ee1e5fe59..d394156367 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -444,7 +444,8 @@ transition({connection_timeout, _}, _) -> %% Incoming message from the transport. transition({diameter, {recv, MsgT}}, S) -> - incoming(MsgT, S); + {Msg, NPid} = msg(MsgT), + incoming(recv(Msg, S), NPid, S); %% Timeout when still in the same state ... transition({timeout = T, PS}, #state{state = PS}) -> @@ -609,31 +610,26 @@ encode(Rec, Dict) -> diameter_codec:encode(Dict, #diameter_packet{header = Hdr, msg = Rec}). -%% incoming/2 +%% incoming/3 -incoming({Msg, NPid}, S) -> - try recv(Msg, S) of - T -> - NPid ! {diameter, discard}, - T - catch - {?MODULE, Name, Pkt} -> - incoming(Name, Pkt, NPid, S) - end; +incoming({recv, Name, Pkt}, NPid, #state{parent = Pid} = S) -> + Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid}, + rcv(Name, Pkt, S); -incoming(Msg, S) -> - try - recv(Msg, S) - catch - {?MODULE, Name, Pkt} -> - incoming(Name, Pkt, false, S) - end. +incoming(T, false, _) -> + T; -%% incoming/4 +incoming(T, NPid, _) -> + NPid ! {diameter, discard}, + T. -incoming(Name, Pkt, NPid, #state{parent = Pid} = S) -> - Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid}, - rcv(Name, Pkt, S). +%% msg/1 + +msg({_,_} = T) -> + T; + +msg(Msg) -> + {Msg, false}. %% recv/2 @@ -701,7 +697,7 @@ recv1('DPA' = N, %% Any other message with a header and no length errors: send to the %% parent. recv1(Name, Pkt, #state{}) -> - throw({?MODULE, Name, Pkt}). + {recv, Name, Pkt}. %% recv/3 -- cgit v1.2.3 From f91d47f08f853dc9442e60425e5dd301994c9ded Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 5 Mar 2017 19:59:05 +0100 Subject: Correct comment typo --- lib/diameter/test/diameter_capx_SUITE.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/diameter/test/diameter_capx_SUITE.erl b/lib/diameter/test/diameter_capx_SUITE.erl index ed6641b9fb..0d535b6642 100644 --- a/lib/diameter/test/diameter_capx_SUITE.erl +++ b/lib/diameter/test/diameter_capx_SUITE.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -433,7 +433,7 @@ server_reject(Config, F, RC) -> ?fail({LRef, OH}) end. -%% cliient_closed/4 +%% client_closed/4 client_closed(Config, Host, F, RC) -> true = diameter:subscribe(?CLIENT), -- cgit v1.2.3 From eb69b55b1f2a490f87e5f3f976fcd1351b82eafb Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 10 Mar 2017 17:09:58 +0100 Subject: Remove clauses supporting old code Since smooth upgrade won't be supported in this branch. --- lib/diameter/src/transport/diameter_tcp.erl | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 44abc5c3b4..63642a8168 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -229,11 +229,6 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. -i({T, _Ref, _Mod, _Pid, _Opts, _Addrs} = Arg) %% from old code - when T == accept; - T == connect -> - i(erlang:append_element(Arg, _SPid = false)); - %% A monitor process to kill the transport if the parent dies. i(#monitor{parent = Pid, transport = TPid} = S) -> proc_lib:init_ack({ok, self()}), @@ -246,9 +241,6 @@ i(#monitor{parent = Pid, transport = TPid} = S) -> %% death. However, a link can be unlinked and this is exactly what %% gen_tcp seems to so. Links should be left to supervisors. -i({listen = L, Ref, _APid, T}) -> %% from old code - i({L, Ref, T}); - i({listen, Ref, {Mod, Opts, Addrs}}) -> [_] = diameter_config:subscribe(Ref, transport), %% assert existence {[LA, LP], Rest} = proplists:split(Opts, [ip, port]), @@ -535,11 +527,7 @@ l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid, %% Transport has been removed. l({transport, remove, _} = T, #listener{socket = Sock}) -> gen_tcp:close(Sock), - x(T); - -%% Possibly death of an accepting process monitored in old code. -l(_, S) -> - S. + x(T). %% t/2 %% -- cgit v1.2.3 From 69b0c1878a95bdfcfe9043fbccf8a0f7b4545bdc Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 11 Mar 2017 00:04:58 +0100 Subject: Fix gen_tcp close of ssl socket Should be ssl:close/1. --- lib/diameter/src/transport/diameter_tcp.erl | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 63642a8168..8e400bc6ee 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -72,6 +72,7 @@ %% Listener process state. -record(listener, {socket :: inet:socket(), + module :: module(), service = false :: false | pid()}). %% service process %% Monitor process state. @@ -250,7 +251,8 @@ i({listen, Ref, {Mod, Opts, Addrs}}) -> LAddr = laddr(LAddrOpt, Mod, LSock), true = diameter_reg:add_new({?MODULE, listener, {Ref, {LAddr, LSock}}}), proc_lib:init_ack({ok, self(), {LAddr, LSock}}), - #listener{socket = LSock}. + #listener{socket = LSock, + module = Mod}. laddr([], Mod, Sock) -> {ok, {Addr, _Port}} = sockname(Mod, Sock), @@ -520,13 +522,15 @@ m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, %% Service process has died. l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid, - socket = Sock}) -> - gen_tcp:close(Sock), + socket = Sock, + module = M}) -> + M:close(Sock), x(T); %% Transport has been removed. -l({transport, remove, _} = T, #listener{socket = Sock}) -> - gen_tcp:close(Sock), +l({transport, remove, _} = T, #listener{socket = Sock, + module = M}) -> + M:close(Sock), x(T). %% t/2 -- cgit v1.2.3 From 9ff8491996381cb2297671b94b7282a7ffb2136f Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 9 Feb 2017 17:18:21 +0100 Subject: Don't send from receiving transport processes Both diameter_tcp and diameter_sctp are susceptible to deadlock since a peer that blocks send also prevents additional messages from being received. Send from a process that's paired with the transport process to avoid this. Use the existing monitor process in the TCP case, add one in the SCTP case. This has been the reason for many sporadic testcase failures, mostly in diameter_traffic_SUITE. --- lib/diameter/src/transport/diameter_sctp.erl | 76 ++++++++++++--- lib/diameter/src/transport/diameter_sctp_sup.erl | 3 +- lib/diameter/src/transport/diameter_tcp.erl | 119 ++++++++++++++++------- lib/diameter/test/diameter_watchdog_SUITE.erl | 14 ++- 4 files changed, 160 insertions(+), 52 deletions(-) diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index f48e4347ee..f9feb68874 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -52,6 +52,7 @@ %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). +-define(TRANSPORT_KEY, transport). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). @@ -92,7 +93,14 @@ | undefined, streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, - os = 0 :: uint()}). %% next output stream + os = 0 :: uint(), %% next output stream + monitor :: pid() | undefined}). %% sending process + +%% Monitor process state. +-record(monitor, + {transport :: pid(), + socket :: gen_sctp:sctp_socket(), + assoc_id :: gen_sctp:assoc_id()}). %% next output stream %% Listener process state. -record(listener, @@ -216,6 +224,12 @@ init(T) -> %% i/1 +i(#monitor{transport = TPid} = S) -> + monitor(process, TPid), + putr(?TRANSPORT_KEY, TPid), + proc_lib:init_ack({ok, self()}), + S; + %% A process owning a listening socket. i({listen, Ref, {Opts, Addrs}}) -> [_] = diameter_config:subscribe(Ref, transport), %% assert existence @@ -382,6 +396,10 @@ handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) -> S end); +%% Transport is telling us of parent death. +handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> + {stop, {shutdown, Reason}, ok, S}; + handle_call(_, _, State) -> {reply, nok, State}. @@ -400,7 +418,11 @@ handle_info(T, #transport{} = S) -> {noreply, #transport{} = t(T,S)}; handle_info(T, #listener{} = S) -> - {noreply, #listener{} = l(T,S)}. + {noreply, #listener{} = l(T,S)}; + +handle_info(T, #monitor{} = S) -> + m(T,S), + {noreply, S}. %% Prior to the possiblity of setting pool_size on in transport %% configuration, a new accepting transport was only started following @@ -422,6 +444,9 @@ code_change(_, State, _) -> %% # terminate/2 %% --------------------------------------------------------------------------- +terminate(_, #monitor{}) -> + ok; + terminate(_, #transport{assoc_id = undefined}) -> ok; @@ -522,8 +547,17 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> stop; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> +%% Parent process has died: call the monitor to not close the socket +%% during an ongoing send, but don't let it take forever. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, + monitor = MPid}) -> + undefined == MPid + orelse ok == (catch gen_server:call(MPid, {stop, Pid})) + orelse exit(MPid, kill), + stop; + +%% Monitor process has died. +transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> stop; %% Timeout after transport process has been started. @@ -536,6 +570,14 @@ transition({resolve_port, Pid}, #transport{socket = Sock}) Pid ! inet:port(Sock), ok. +%% m/2 + +m({Bin, StreamId}, #monitor{} = S) -> + send(StreamId, Bin, S); + +m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) -> + x(T). + %% Crash on anything unexpected. ok({ok, T}) -> @@ -578,6 +620,17 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% send/2 +%% Start monitor process on first send. +send(Msg, #transport{monitor = undefined, + socket = Sock, + assoc_id = AId} + = S) -> + {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(), + socket = Sock, + assoc_id = AId}), + monitor(process, MPid), + send(Msg, S#transport{monitor = MPid}); + %% Outbound Diameter message on a specified stream ... send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, #transport{streams = {_, OS}} @@ -597,14 +650,13 @@ send(Bin, #transport{streams = {_, OS}, %% send/3 -send(StreamId, Bin, #transport{socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin). - -%% send/4 +send(StreamId, Bin, #transport{monitor = MPid}) -> + MPid ! {Bin, StreamId}, + MPid; -send(Sock, AssocId, Stream, Bin) -> - case gen_sctp:send(Sock, AssocId, Stream, Bin) of +send(StreamId, Bin, #monitor{socket = Sock, + assoc_id = AId}) -> + case gen_sctp:send(Sock, AId, StreamId, Bin) of ok -> ok; {error, Reason} -> diff --git a/lib/diameter/src/transport/diameter_sctp_sup.erl b/lib/diameter/src/transport/diameter_sctp_sup.erl index 36050aaf28..e8e26ec7c5 100644 --- a/lib/diameter/src/transport/diameter_sctp_sup.erl +++ b/lib/diameter/src/transport/diameter_sctp_sup.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ start() -> start_child(T) -> SupRef = case element(1,T) of + monitor -> ?TRANSPORT_SUP; connect -> ?TRANSPORT_SUP; accept -> ?TRANSPORT_SUP; listen -> ?LISTENER_SUP diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 8e400bc6ee..2a580753a0 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -53,6 +53,7 @@ %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). +-define(TRANSPORT_KEY, transport). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). @@ -68,17 +69,22 @@ %% The same gen_server implementation supports three different kinds %% of processes: an actual transport process, one that will club it to %% death should the parent die before a connection is established, and -%% a process owning the listening port. +%% a process owning the listening port. The monitor process +%% historically died after connection establishment, but now lives on +%% as the sender of outgoing messages, so that a blocking send doesn't +%% prevent messages from being received. %% Listener process state. -record(listener, {socket :: inet:socket(), module :: module(), service = false :: false | pid()}). %% service process -%% Monitor process state. +%% Monitor process state. The name monitor predates its role as sender. -record(monitor, - {parent :: pid(), - transport = self() :: pid()}). + {parent :: reference() | false, + transport = self() :: pid(), + socket :: inet:socket() | ssl:sslsocket() | undefined, + module :: module() | undefined}). -type length() :: 0..16#FFFFFF. %% message length from Diameter header -type size() :: non_neg_integer(). %% accumulated binary size @@ -116,7 +122,8 @@ tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? throttle_cb :: false | diameter:evaluable(), %% ask to receive - throttled :: boolean() | binary()}). %% stopped receiving? + throttled :: boolean() | binary(), %% stopped receiving? + monitor :: pid()}). %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -203,8 +210,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) T == connect -> monitor(process, Pid), %% Since accept/connect might block indefinitely, spawn a process - %% that does nothing but kill us with the parent until call - %% returns. + %% that kills us with the parent until call returns, and then + %% sends outgoing messages. {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, fragment_timer, @@ -217,8 +224,9 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), Throttle = proplists:get_value(throttle_cb, OwnOpts, false), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid), - MPid ! {stop, self()}, %% tell the monitor to die M = if SslOpts -> ssl; true -> Mod end, + monitor(process, MPid), + MPid ! {start, self(), Sock, M}, %% prepare monitor for sending putr(?REF_KEY, Ref), throttle(#transport{parent = Pid, module = M, @@ -226,21 +234,22 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) ssl = SslOpts, timeout = Tmo, throttle_cb = Throttle, - throttled = false /= Throttle}); + throttled = false /= Throttle, + monitor = MPid}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. %% A monitor process to kill the transport if the parent dies. i(#monitor{parent = Pid, transport = TPid} = S) -> + putr(?TRANSPORT_KEY, TPid), proc_lib:init_ack({ok, self()}), - monitor(process, Pid), monitor(process, TPid), - S; + S#monitor{parent = monitor(process, Pid)}; %% In principle a link between the transport and killer processes %% could do the same thing: have the accepting/connecting process be %% killed when the killer process dies as a consequence of parent %% death. However, a link can be unlinked and this is exactly what -%% gen_tcp seems to so. Links should be left to supervisors. +%% gen_tcp seems to do. Links should be left to supervisors. i({listen, Ref, {Mod, Opts, Addrs}}) -> [_] = diameter_config:subscribe(Ref, transport), %% assert existence @@ -452,7 +461,11 @@ handle_call({accept, SPid}, _From, #listener{service = P} = S) -> true -> S end}; - + +%% Transport is telling us of parent death. +handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> + {stop, {shutdown, Reason}, ok, S}; + handle_call(_, _, State) -> {reply, nok, State}. @@ -474,8 +487,7 @@ handle_info(T, #listener{} = S) -> {noreply, #listener{} = l(T,S)}; handle_info(T, #monitor{} = S) -> - m(T,S), - x(T). + {noreply, #monitor{} = m(T,S)}. %% --------------------------------------------------------------------------- %% # code_change/3 @@ -491,6 +503,7 @@ code_change(_, State, _) -> terminate(_, _) -> ok. + %% --------------------------------------------------------------------------- putr(Key, Val) -> @@ -503,18 +516,38 @@ getr(Key) -> %% %% Transition monitor state. -%% Transport is telling us to die. -m({stop, TPid}, #monitor{transport = TPid}) -> - ok; +%% Outgoing message. +m(Bin, #monitor{} = S) + when is_binary(Bin) -> + send(Bin, S), + S; -%% Transport has died. -m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) -> - ok; +%% Transport is telling us to be ready to send. Stop monitoring on the +%% parent so as not to die before a send from the transport. +m({start, TPid, Sock, Mod}, #monitor{parent = MRef, + transport = TPid} + = S) -> + demonitor(MRef, [flush]), + S#monitor{parent = false, + socket = Sock, + module = Mod}; -%% Transport parent has died. -m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, - transport = TPid}) -> - exit(TPid, {shutdown, parent}). +%% Transport is telling us to die. +m({stop, TPid} = T, #monitor{transport = TPid}) -> + x(T); + +%% Transport is telling us that TLS has been negotiated after +%% capabilities exchange. +m({tls, SSock}, #monitor{} = S) -> + S#monitor{socket = SSock, + module = ssl}; + +%% Transport or parent has died. +m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef, + transport = TPid}) + when M == MRef; + P == TPid -> + x(T). %% l/2 %% @@ -589,8 +622,9 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, ?ERROR({T,S}); %% Outgoing message. -transition({diameter, {send, Bin}}, S) -> - send(Bin, S); +transition({diameter, {send, Bin}}, #transport{} = S) -> + send(Bin, S), + ok; %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, @@ -610,8 +644,16 @@ transition({resolve_port, Pid}, #transport{socket = Sock, Pid ! portnr(M, Sock), ok; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> +%% Parent process has died: call the monitor to not close the socket +%% during an ongoing send, but don't let it take forever. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, + monitor = MPid}) -> + ok == (catch gen_server:call(MPid, {stop, Pid})) + orelse exit(MPid, kill), + stop; + +%% Monitor process has died. +transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> stop. %% Crash on anything unexpected. @@ -635,11 +677,13 @@ tls_handshake(_, true, #transport{ssl = false}) -> %% Capabilities exchange negotiated TLS: upgrade the connection. tls_handshake(Type, true, #transport{socket = Sock, module = M, - ssl = Opts} + ssl = Opts, + monitor = MPid} = S) -> {ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]), Ref = getr(?REF_KEY), true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}), + MPid ! {tls, SSock}, %% tell the monitor process S#transport{socket = SSock, module = ssl}; @@ -805,14 +849,20 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(Bin, #transport{socket = Sock, - module = M}) -> +send(Bin, #monitor{socket = Sock, + module = M}) -> case send(M, Sock, Bin) of ok -> ok; {error, Reason} -> x({send, Reason}) - end. + end; + +%% Send from the monitor process to avoid deadlock if both the +%% receiver and the peer were to block in send. +send(Bin, #transport{monitor = MPid}) -> + MPid ! Bin, + MPid. %% send/3 @@ -909,7 +959,6 @@ throttle({NPid, F}, #transport{throttled = Msg} = S) throttle(discard, #transport{throttled = Msg} = S) when is_binary(Msg) -> S; - throttle({discard = T, F}, #transport{throttled = Msg} = S) when is_binary(Msg) -> throttle(T, S#transport{throttle_cb = F}); @@ -920,7 +969,6 @@ throttle(Bin, #transport{throttled = Msg} = S) when is_binary(Bin), is_binary(Msg) -> send(Bin, S), S; - throttle({Bin, F}, #transport{throttled = Msg} = S) when is_binary(Bin), is_binary(Msg) -> throttle(Bin, S#transport{throttle_cb = F}); @@ -929,7 +977,6 @@ throttle({Bin, F}, #transport{throttled = Msg} = S) throttle({timeout, Tmo}, S) -> erlang:send_after(Tmo, self(), throttle), throw(S); - throttle({timeout = T, Tmo, F}, S) -> throttle({T, Tmo}, S#transport{throttle_cb = F}); diff --git a/lib/diameter/test/diameter_watchdog_SUITE.erl b/lib/diameter/test/diameter_watchdog_SUITE.erl index 6d22ddcc18..5ae951f7c2 100644 --- a/lib/diameter/test/diameter_watchdog_SUITE.erl +++ b/lib/diameter/test/diameter_watchdog_SUITE.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -569,12 +569,12 @@ send(_, Sock, <<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>) -> {'Origin-Host', "XXX"}, {'Origin-Realm', ?REALM}]}, #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt), - self() ! {tcp, Sock, Bin}, + tpid(Sock) ! {tcp, Sock, Bin}, ok; %% First outgoing DWA. send(init, Sock, Bin) -> - [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, self(), '_'}), + [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, tpid(Sock), '_'}), putr(config, T), send(Sock, Bin); @@ -607,6 +607,14 @@ send(N, Sock, <<_:32, 0:1, _:7, 280:24, _/binary>> = Bin) -> putr(config, N-1), gen_tcp:send(Sock, Bin). +%% tpid/1 + +tpid(Sock) -> + {connected, Pid} = erlang:port_info(Sock, connected), + Pid. + +%%failback/5 + failback(Tmo, Msg, Sock, Bin, Origin) -> timer:sleep(Tmo), ok = gen_tcp:send(Sock, msg(Msg, Bin, Origin)). -- cgit v1.2.3 From c9dd8410fb44034821c4068e1cc4df253b21f9f9 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Tue, 21 Mar 2017 16:38:28 +0100 Subject: Add missing dialyzer types Which dialyzer itself has never complained about. --- lib/diameter/src/transport/diameter_tcp.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 2a580753a0..f745e2a6f7 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -104,6 +104,7 @@ -type listen_option() :: {accept, match()} | {ssl_options, true | [ssl:listen_option()]} + | option() | ssl:listen_option() | gen_tcp:listen_option(). -- cgit v1.2.3 From 111261d15df900bedd544ba1fab3a5880abda70a Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 10 Mar 2017 23:09:45 +0100 Subject: Revert "diameter: Do not test SCTP on sparc-sun-solaris2.10" The comment in commit 736ce20a isn't quite true. There's no different behaviour that diameter doesn't support, but there is a quirk with the loopback address that has caused many testcases to fail. This will be addressed in a subsequent commit. Reverts commit 736ce20a. --- lib/diameter/test/diameter_util.erl | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/lib/diameter/test/diameter_util.erl b/lib/diameter/test/diameter_util.erl index cca28dd23c..37fcbbc267 100644 --- a/lib/diameter/test/diameter_util.erl +++ b/lib/diameter/test/diameter_util.erl @@ -195,21 +195,13 @@ unique_string() -> %% have_sctp/0 have_sctp() -> - case erlang:system_info(system_architecture) of - %% We do not support the sctp version present in solaris - %% version "sparc-sun-solaris2.10", that behaves differently - %% from later versions and linux - "sparc-sun-solaris2.10" -> - false; - _-> - case gen_sctp:open() of - {ok, Sock} -> - gen_sctp:close(Sock), - true; - {error, E} when E == eprotonosupport; - E == esocktnosupport -> %% fail on any other reason - false - end + case gen_sctp:open() of + {ok, Sock} -> + gen_sctp:close(Sock), + true; + {error, E} when E == eprotonosupport; + E == esocktnosupport -> %% fail on any other reason + false end. %% --------------------------------------------------------------------------- -- cgit v1.2.3 From 1df74351286b6cd0e2b673fdf07f7219244ce9a7 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 30 Mar 2017 16:29:02 +0200 Subject: Work around SCTP quirks on sparc-sun-solaris2.10 This addresses the testcase failures mentioned in the parent commit, which has been on account of the behaviour below, in which connect fails on the loopback address. Work around it by finding/using another address if possible. $ erl Erlang/OTP 20 [DEVELOPMENT] [erts-9.0] [smp:2:2] [ds:2:2:10] [async-threads:10] [hipe] [kernel-poll:false] [sharing-preserving] Eshell V9.0 (abort with ^G) 1> {ok, LP} = gen_sctp:open(). {ok,#Port<0.439>} 2> gen_sctp:listen(LP, true). ok 3> inet:socknames(LP). {ok,[{{10,67,16,178},36506},{{127,0,0,1},36506}]} 4> {ok, S} = gen_sctp:open([{ip, {127,0,0,1}}]). {ok,#Port<0.443>} 5> gen_sctp:connect_init(S, {127,0,0,1}, 36506, []). {error,eaddrnotavail} 6> gen_sctp:connect_init(S, {10,67,16,178}, 36506, []). {error,eaddrnotavail} 7> gen_sctp:close(S). ok 8> f(S). ok 9> {ok, S} = gen_sctp:open(). {ok,#Port<0.444>} 10> gen_sctp:connect_init(S, {127,0,0,1}, 36506, []). ok Even the following has been seen on at least one host, so that success of gen_sctp:open/0 is no guarantee. $ ifconfig -a4 lo0: flags=2001000849 mtu 8232 index 1 inet 127.0.0.1 netmask ff000000 bge0: flags=1004843 mtu 1500 index 2 inet 10.67.16.180 netmask ffffff00 broadcast 10.67.16.255 $ erl Erlang/OTP 20 [DEVELOPMENT] [erts-9.0] [source] [smp:2:2] [ds:2:2:10] [async-threads:10] [hipe] [kernel-poll:false] Eshell V9.0 (abort with ^G) 1> {ok, S} = gen_sctp:open(), 1> gen_sctp:connect(S, {127,0,0,1}, 3868, []). {error,eafnosupport} 2> gen_sctp:connect(S, {10,67,16,180}, 3868, []). {error,eafnosupport} --- lib/diameter/test/diameter_examples_SUITE.erl | 8 +++-- lib/diameter/test/diameter_transport_SUITE.erl | 46 +++++++++++++----------- lib/diameter/test/diameter_util.erl | 50 ++++++++++++++++++-------- 3 files changed, 66 insertions(+), 38 deletions(-) diff --git a/lib/diameter/test/diameter_examples_SUITE.erl b/lib/diameter/test/diameter_examples_SUITE.erl index e4ed2b227d..680ce4f366 100644 --- a/lib/diameter/test/diameter_examples_SUITE.erl +++ b/lib/diameter/test/diameter_examples_SUITE.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2013-2015. All Rights Reserved. +%% Copyright Ericsson AB 2013-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -70,6 +70,8 @@ %% Transport protocols over which the example Diameter nodes are run. -define(PROTS, [tcp, sctp]). +-define(ADDR, diameter_util:ip4()). + %% =========================================================================== suite() -> @@ -346,7 +348,7 @@ top(Dir, LibDir) -> start({server, Prot}) -> ok = diameter:start(), ok = server:start(), - {ok, Ref} = server:listen(Prot), + {ok, Ref} = server:listen({Prot, ?ADDR, 3868}), [_] = ?util:lport(Prot, Ref), ok; @@ -354,7 +356,7 @@ start({client = Svc, Prot}) -> ok = diameter:start(), true = diameter:subscribe(Svc), ok = client:start(), - {ok, Ref} = client:connect(Prot), + {ok, Ref} = client:connect({Prot, ?ADDR, ?ADDR, 3868}), receive #diameter_event{info = {up, Ref, _, _, _}} -> ok end; start(Config) -> diff --git a/lib/diameter/test/diameter_transport_SUITE.erl b/lib/diameter/test/diameter_transport_SUITE.erl index c94f46b7a5..ac65cbe618 100644 --- a/lib/diameter/test/diameter_transport_SUITE.erl +++ b/lib/diameter/test/diameter_transport_SUITE.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -56,8 +56,10 @@ -define(RECV(Pat, Ret), receive Pat -> Ret end). -define(RECV(Pat), ?RECV(Pat, diameter_lib:now())). -%% Sockets are opened on the loopback address. --define(ADDR, {127,0,0,1}). +%% Address to open sockets on. +-define(ADDR(Prot), if sctp == Prot -> diameter_util:ip4(); + true -> {127,0,0,1} + end). %% diameter_tcp doesn't use anything but host_ip_address, and that %% only is a local address isn't configured as at transport start. @@ -351,13 +353,14 @@ rand_bytes(N, Bin) -> %% start_connect/3 start_connect(Prot, PortNr, Ref) -> - {ok, TPid, [?ADDR]} = start_connect(Prot, - {connect, Ref}, - ?SVC([]), - [{raddr, ?ADDR}, - {rport, PortNr}, - {ip, ?ADDR}, - {port, 0}]), + Addr = ?ADDR(Prot), + {ok, TPid, [_]} = start_connect(Prot, + {connect, Ref}, + ?SVC([]), + [{raddr, Addr}, + {rport, PortNr}, + {ip, Addr}, + {port, 0}]), ?RECV(?TMSG({TPid, connected, _})), TPid. @@ -370,9 +373,9 @@ start_connect(tcp, T, Svc, Opts) -> start_accept(Prot, Ref) -> {Mod, Opts} = tmod(Prot), - {ok, TPid, [?ADDR]} = Mod:start({accept, Ref}, - ?SVC([?ADDR]), - [{port, 0} | Opts]), + {ok, TPid, [_]} = Mod:start({accept, Ref}, + ?SVC([?ADDR(Prot)]), + [{port, 0} | Opts]), ?RECV(?TMSG({TPid, connected})), TPid. @@ -386,19 +389,20 @@ tmod(tcp) -> %% gen_connect/2 gen_connect(sctp = P, PortNr) -> - {ok, Sock} = Ok = gen_sctp:open([{ip, ?ADDR}, {port, 0} | ?SCTP_OPTS]), - ok = gen_sctp:connect_init(Sock, ?ADDR, PortNr, []), + Addr = ?ADDR(P), + {ok, Sock} = Ok = gen_sctp:open([{ip, Addr}, {port, 0} | ?SCTP_OPTS]), + ok = gen_sctp:connect_init(Sock, Addr, PortNr, []), Ok = gen_accept(P, Sock); -gen_connect(tcp, PortNr) -> - gen_tcp:connect(?ADDR, PortNr, ?TCP_OPTS). +gen_connect(tcp = P, PortNr) -> + gen_tcp:connect(?ADDR(P), PortNr, ?TCP_OPTS). %% gen_listen/1 -gen_listen(sctp) -> - {ok, Sock} = gen_sctp:open([{ip, ?ADDR}, {port, 0} | ?SCTP_OPTS]), +gen_listen(sctp = P) -> + {ok, Sock} = gen_sctp:open([{ip, ?ADDR(P)}, {port, 0} | ?SCTP_OPTS]), {gen_sctp:listen(Sock, true), Sock}; -gen_listen(tcp) -> - gen_tcp:listen(0, [{ip, ?ADDR} | ?TCP_OPTS]). +gen_listen(tcp = P) -> + gen_tcp:listen(0, [{ip, ?ADDR(P)} | ?TCP_OPTS]). %% gen_accept/2 diff --git a/lib/diameter/test/diameter_util.erl b/lib/diameter/test/diameter_util.erl index 37fcbbc267..8df5c907d0 100644 --- a/lib/diameter/test/diameter_util.erl +++ b/lib/diameter/test/diameter_util.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -32,7 +32,8 @@ foldl/3, scramble/1, unique_string/0, - have_sctp/0]). + have_sctp/0, + ip4/0]). %% diameter-specific -export([lport/2, @@ -197,8 +198,11 @@ unique_string() -> have_sctp() -> case gen_sctp:open() of {ok, Sock} -> + RC = gen_sctp:connect(Sock, ip4(), 3868, []), gen_sctp:close(Sock), - true; + %% Connect has been seen to return eafnosupport on at least + %% one SunOS 10 Sparc host, for reasons unknown. + RC /= {error, eafnosupport}; {error, E} when E == eprotonosupport; E == esocktnosupport -> %% fail on any other reason false @@ -361,7 +365,8 @@ tmod(any) -> opts(Prot, T) -> tmo(T, lists:append([[{transport_module, M}, {transport_config, C}] || M <- tmod(Prot), - C <- [cfg(M,T) ++ cfg(M) ++ cfg(T)]])). + C <- [buf(M,T) ++ [{ip, addr(M)}, {port, 0}] + ++ remote(M,T)]])). tmo(listen, Opts) -> Opts; @@ -377,21 +382,38 @@ tmo([M, C | Opts]) -> %% Listening SCTP socket need larger-than-default buffers to avoid %% resends on some platforms (eg. SLES 11). -cfg(diameter_sctp, listen) -> +buf(diameter_sctp, listen) -> [{recbuf, 1 bsl 16}, {sndbuf, 1 bsl 16}]; - -cfg(_, _) -> +buf(_, _) -> []. -cfg(M) - when M == diameter_tcp; - M == diameter_sctp -> - [{ip, ?ADDR}, {port, 0}]; +addr(diameter_tcp) -> + {127,0,0,1}; +addr(diameter_sctp) -> + ip4(). -cfg(listen) -> +remote(_, listen) -> [{accept, M} || M <- [{256,0,0,1}, ["256.0.0.1", ["^.+$"]]]]; -cfg(PortNr) -> - [{raddr, ?ADDR}, {rport, PortNr}]. +remote(Mod, PortNr) -> + [{raddr, addr(Mod)}, {rport, PortNr}]. + +%% Try to use something other than the loopback address where this +%% address is known to be problematic for gen_sctp. +ip4() -> + try + "sparc-sun-solaris2.10" = erlang:system_info(system_architecture), + {ok, List} = inet:getifaddrs(), + hd(lists:flatmap(fun ip4/1, List)) + catch + error:_ -> + ?ADDR + end. + +ip4({_, Opts}) -> + {flags, Flags} = lists:keyfind(flags, 1, Opts), + [A || lists:member(up, Flags), + not lists:member(loopback, Flags), + {addr, {_,_,_,_} = A} <- Opts]. %% --------------------------------------------------------------------------- %% info/0 -- cgit v1.2.3 From a47ab3e0021a3d27936f16b464112c2f08a2d2f0 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 31 Mar 2017 23:32:26 +0200 Subject: Fix diameter_transport_SUITE race The server sent and died, but there's no guarantee that it won't take the connection down before the client has receive its bytes. Make the server wait for the client to take down the connection. --- lib/diameter/test/diameter_transport_SUITE.erl | 28 ++++++++++++-------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/lib/diameter/test/diameter_transport_SUITE.erl b/lib/diameter/test/diameter_transport_SUITE.erl index ac65cbe618..14c748ad20 100644 --- a/lib/diameter/test/diameter_transport_SUITE.erl +++ b/lib/diameter/test/diameter_transport_SUITE.erl @@ -296,10 +296,17 @@ init(gen_accept, {Prot, Ref}) -> {ok, PortNr} = inet:port(LSock), true = diameter_reg:add_new(?TEST_LISTENER(Ref, PortNr)), - %% Accept a connection, receive a message and send it back. + %% Accept a connection, receive a message send it back, and wait + %% for the peer to close the connection. {ok, Sock} = gen_accept(Prot, LSock), Bin = gen_recv(Prot, Sock), - ok = gen_send(Prot, Sock, Bin); + ok = gen_send(Prot, Sock, Bin), + receive + {tcp_closed, Sock} = T -> + T; + ?SCTP(Sock, {_, #sctp_assoc_change{}}) = T -> + T + end; init(connect, {Prot, Ref}) -> %% Lookup the peer's listening socket. @@ -313,12 +320,7 @@ init(connect, {Prot, Ref}) -> %% Send a message and receive it back. Bin = make_msg(), TPid ! ?TMSG({send, Bin}), - Bin = bin(Prot, ?RECV(?TMSG({recv, P}), P)), - - %% Expect the transport process to die as a result of the peer - %% closing the connection. - MRef = erlang:monitor(process, TPid), - ?RECV({'DOWN', MRef, process, _, _}). + Bin = bin(Prot, ?RECV(?TMSG({recv, P}), P)). bin(sctp, #diameter_packet{bin = Bin}) -> Bin; @@ -338,15 +340,11 @@ make_msg() -> <<1:8, Len:24, Bin/binary>>. %% crypto:rand_bytes/1 isn't available on all platforms (since openssl -%% isn't) so roll our own. +%% isn't) so roll our own. Not particularly random, but less verbose +%% in trace. rand_bytes(N) -> - rand_bytes(N, <<>>). - -rand_bytes(0, Bin) -> - Bin; -rand_bytes(N, Bin) -> Oct = rand:uniform(256) - 1, - rand_bytes(N-1, <>). + binary:copy(<>, N). %% =========================================================================== -- cgit v1.2.3 From 008b12c642bdcfb0fa96543da2d5142502b210d7 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Tue, 4 Apr 2017 17:17:21 +0200 Subject: Rework gen_sctp suite to demonstrate remaining problems In particular, that transmission can be very slow. The problem appears to be linked to sndbuf/recbuf, but even with buffers that are large enough to hold all messages being sent, turnaround times can still vary by hundreds of milliseconds in a reasonable test environment. Use multiple streams and a sender process to more closely mirror the usage in diameter_sctp, but neither is the source of the problems. --- lib/diameter/test/diameter_gen_sctp_SUITE.erl | 437 +++++++++++++++----------- 1 file changed, 250 insertions(+), 187 deletions(-) diff --git a/lib/diameter/test/diameter_gen_sctp_SUITE.erl b/lib/diameter/test/diameter_gen_sctp_SUITE.erl index 79db39ca45..457d9a0a85 100644 --- a/lib/diameter/test/diameter_gen_sctp_SUITE.erl +++ b/lib/diameter/test/diameter_gen_sctp_SUITE.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -33,8 +33,8 @@ end_per_suite/1]). %% testcases --export([send_not_from_controlling_process/1, - send_from_multiple_clients/1, send_from_multiple_clients/0, +-export([send_one_from_many/1, send_one_from_many/0, + send_many_from_one/1, send_many_from_one/0, receive_what_was_sent/1]). -include_lib("kernel/include/inet_sctp.hrl"). @@ -45,16 +45,24 @@ %% Open sockets on the loopback address. -define(ADDR, {127,0,0,1}). -%% Snooze, nap, siesta. --define(SLEEP(T), receive after T -> ok end). - %% An indescribably long number of milliseconds after which everthing %% that should have happened has. -define(FOREVER, 2000). +%% How many milliseconds to tolerate between the fastest and slowest +%% turnaround times. +-define(VARIANCE, 100). + %% The first byte in each message we send as a simple guard against %% not receiving what was sent. --define(MAGIC, 42). +-define(MAGIC, 0). + +%% Requested number of inbound/outbound streams. +-define(STREAMS, 5). + +%% Success for send_multiple. Match in each testcase rather than in +%% send_multiple itself for a better failure in common_test. +-define(OK, {_, true, _, [true, true], [], _}). %% =========================================================================== @@ -62,8 +70,8 @@ suite() -> [{timetrap, {seconds, 10}}]. all() -> - [send_not_from_controlling_process, - send_from_multiple_clients, + [send_one_from_many, + send_many_from_one, receive_what_was_sent]. init_per_suite(Config) -> @@ -81,130 +89,37 @@ end_per_suite(_Config) -> %% =========================================================================== -%% send_not_from_controlling_process/1 -%% -%% This testcase failing shows gen_sctp:send/4 hanging when called -%% outside the controlling process of the socket in question. - -send_not_from_controlling_process(_) -> - Pids = send_not_from_controlling_process(), - ?SLEEP(?FOREVER), - try - [] = [{P,I} || P <- Pids, I <- [process_info(P)], I /= undefined] - after - lists:foreach(fun(P) -> exit(P, kill) end, Pids) - end. - -%% send_not_from_controlling_process/0 -%% -%% Returns the pids of three spawned processes: a listening process, a -%% connecting process and a sending process. -%% -%% The expected behaviour is that all three processes exit: -%% -%% - The listening process exits upon receiving an SCTP message -%% sent by the sending process. -%% - The connecting process exits upon listening process exit. -%% - The sending process exits upon gen_sctp:send/4 return. -%% -%% The observed behaviour is that all three processes remain alive -%% indefinitely: -%% -%% - The listening process never receives the SCTP message sent -%% by the sending process. -%% - The connecting process has an inet_reply message in its mailbox -%% as a consequence of the call to gen_sctp:send/4 call from the -%% sending process. -%% - The call to gen_sctp:send/4 in the sending process doesn't return, -%% hanging in prim_inet:getopts/2. - -send_not_from_controlling_process() -> - FPid = self(), - {L, MRef} = spawn_monitor(fun() -> listen(FPid) end), - receive - {?MODULE, C, S} -> - demonitor(MRef, [flush]), - [L,C,S]; - {'DOWN', MRef, process, _, _} = T -> - error(T) - end. - -%% listen/1 - -listen(FPid) -> - {ok, Sock} = open(), - ok = gen_sctp:listen(Sock, true), - {ok, PortNr} = inet:port(Sock), - LPid = self(), - spawn(fun() -> connect1(PortNr, FPid, LPid) end), %% connecting process - Id = assoc(Sock), - recv(Sock, Id). - -%% connect1/3 - -connect1(PortNr, FPid, LPid) -> - {ok, Sock} = open(), - ok = gen_sctp:connect_init(Sock, ?ADDR, PortNr, []), - Id = assoc(Sock), - FPid ! {?MODULE, - self(), - spawn(fun() -> send(Sock, Id) end)}, %% sending process - MRef = monitor(process, LPid), - down(MRef). %% Waits with this as current_function. - -%% down/1 - -down(MRef) -> - receive {'DOWN', MRef, process, _, Reason} -> Reason end. - -%% send/2 - -send(Sock, Id) -> - ok = gen_sctp:send(Sock, Id, 0, <<0:32>>). - -%% =========================================================================== - -%% send_from_multiple_clients/0 +%% send_one_from_many/0 %% %% Demonstrates sluggish delivery of messages. -send_from_multiple_clients() -> - [{timetrap, {seconds, 60}}]. +send_one_from_many() -> + [{timetrap, {seconds, 30}}]. -send_from_multiple_clients(_) -> - {S, Rs} = T = send_from_multiple_clients(8, 1024), - Max = ?FOREVER*1000, - {false, [], _} = {Max < S, - Rs -- [OI || {O,_} = OI <- Rs, is_integer(O)], - T}. +send_one_from_many(_) -> + ?OK = send_multiple(128, 1, 1024). -%% send_from_multiple_clients/2 +%% send_one_from_many/2 %% %% Opens a listening socket and then spawns a specified number of -%% processes, each of which connects to the listening socket. Each -%% connecting process then sends a message, whose size in bytes is -%% passed as an argument, the listening process sends a reply -%% containing the time at which the message was received, and the -%% connecting process then exits upon reception of this reply. +%% processes, each of which connects, sends a message, receives a +%% reply, and exits. %% %% Returns the elapsed time for all connecting process to exit -%% together with a list of exit reasons for the connecting processes. -%% In the successful case a connecting process exits with the -%% outbound/inbound transit times for the sent/received message as -%% reason. +%% together with a list of exit reasons. In the successful case a +%% connecting process exits with the outbound/inbound transit times +%% for the sent/received message as reason. %% %% The observed behaviour is that some outbound messages (that is, %% from a connecting process to the listening process) can take an %% unexpectedly long time to complete their journey. The more -%% connecting processes, the longer the possible delay it seems. -%% -%% eg. (With F = fun send_from_multiple_clients/2.) +%% connecting processes, the longer it can take it seems. %% -%% 5> F(2, 1024). +%% eg. 5> send_one_from_many(2, 1024). %% {875,[{128,116},{113,139}]} -%% 6> F(4, 1024). +%% 6> send_one_from_many(4, 1024). %% {2995290,[{2994022,250},{2994071,80},{200,130},{211,113}]} -%% 7> F(8, 1024). +%% 7> send_one_from_many(8, 1024). %% {8997461,[{8996161,116}, %% {2996471,86}, %% {2996278,116}, @@ -213,7 +128,7 @@ send_from_multiple_clients(_) -> %% {213,159}, %% {373,173}, %% {376,118}]} -%% 8> F(8, 1024). +%% 8> send_one_from_many(8, 1024). %% {21001891,[{20999968,128}, %% {8997891,172}, %% {8997927,91}, @@ -223,120 +138,261 @@ send_from_multiple_clients(_) -> %% {117,98}, %% {149,125}]} %% -%% This turns out to have been due to SCTP resends as a consequence of -%% the listener having an insufficient recbuf. Increasing the size -%% solves the problem. -%% - -send_from_multiple_clients(N, Sz) - when is_integer(N), 0 < N, is_integer(Sz), 0 < Sz -> - timer:tc(fun listen/2, [N, <>]). - -%% listen/2 -listen(N, Bin) -> +send_multiple(Clients, Msgs, Sz) + when is_integer(Clients), 0 < Clients, + is_integer(Msgs), 0 < Msgs, + is_integer(Sz), 0 < Sz -> + T0 = diameter_lib:now(), + {S, Res} = timer:tc(fun listen/3, [Clients, Msgs, Sz]), + report(T0, Res), + Ts = lists:append(Res), + Outgoing = [DT || {_,{_,_,DT},{_,_,_},_} <- Ts], + Incoming = [DT || {_,{_,_,_},{_,_,DT},_} <- Ts], + Diffs = [lists:max(L) - lists:min(L) || L <- [Outgoing, Incoming]], + {S, + S < ?FOREVER*1000, + Diffs, + [D < V || V <- [?VARIANCE*1000], D <- Diffs], + [T || T <- Ts, [] == [T || {_,{_,_,_},{_,_,_},_} <- [T]]], + Res}. + +%% listen/3 + +listen(Clients, Msgs, Sz) -> {ok, Sock} = open(), ok = gen_sctp:listen(Sock, true), {ok, PortNr} = inet:port(Sock), %% Spawn a middleman that in turn spawns N connecting processes, %% collects a list of exit reasons and then exits with the list as - %% reason. loop/3 returns when we receive this list from the + %% reason. accept/2 returns when we receive this list from the %% middleman's 'DOWN'. Self = self(), - Fun = fun() -> exit(connect2(Self, PortNr, Bin)) end, - {_, MRef} = spawn_monitor(fun() -> exit(fold(N, Fun)) end), - loop(Sock, MRef, Bin). + Fun = fun() -> exit(client(Self, PortNr, Msgs, Sz)) end, %% start clients + {_, MRef} = spawn_monitor(fun() -> exit(clients(Clients, Fun)) end), + accept_loop(Sock, MRef). -%% fold/2 +%% fclients/2 %% %% Spawn N processes and collect their exit reasons in a list. -fold(N, Fun) -> +clients(N, Fun) -> start(N, Fun), acc(N, []). +%% start/2 + start(0, _) -> ok; + start(N, Fun) -> spawn_monitor(Fun), start(N-1, Fun). +%% acc/2 + acc(0, Acc) -> Acc; + acc(N, Acc) -> receive {'DOWN', _MRef, process, _, RC} -> acc(N-1, [RC | Acc]) end. -%% loop/3 +%% accept_loop/2 -loop(Sock, MRef, Bin) -> +accept_loop(Sock, MRef) -> + ok = inet:setopts(Sock, [{active, once}]), receive - ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = Id}], B}) + ?SCTP(Sock, {_, #sctp_assoc_change{state = comm_up, + outbound_streams = OS, + assoc_id = Id}}) -> + Self = self(), + TPid = spawn(fun() -> assoc(monitor(process, Self), Id, OS) end), + NewSock = peeloff(Sock, Id, TPid), + TPid ! {peeloff, NewSock}, + accept_loop(Sock, MRef); + ?SCTP(Sock, _) -> + accept_loop(Sock, MRef); + {'DOWN', MRef, process, _, Reason} -> + Reason; + T -> + error(T) + end. + +%% assoc/3 +%% +%% Server process that answers incoming messages as long as the parent +%% lives. + +assoc(MRef, Id, OS) + when is_reference(MRef) -> + {peeloff, Sock} = receive T -> T end, + recv_loop(Sock, Id, sender(Sock, Id, OS), MRef). + +%% recv_loop/4 + +recv_loop(Sock, Id, Pid, MRef) -> + ok = inet:setopts(Sock, [{active, once}]), + receive + ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = I}], B}) when is_binary(B) -> - Sz = size(Bin), - {Sz, Bin} = {size(B), B}, %% assert - ok = send(Sock, Id, mark(Bin)), - loop(Sock, MRef, Bin); + T2 = diameter_lib:now(), + I = Id, %% assert + <> = B, %% assert + {[_,_,_,Sz] = L, Bytes} = unmark(Bin), + Sz = size(Bin) - Bytes, %% assert + <<_:Bytes/binary, Body:Sz/binary>> = Bin, + send(Pid, [T2|L], Body), %% answer + recv_loop(Sock, Id, Pid, MRef); ?SCTP(Sock, _) -> - loop(Sock, MRef, Bin); + recv_loop(Sock, Id, Pid, MRef); {'DOWN', MRef, process, _, Reason} -> - Reason + Reason; + T -> + error(T) end. -%% connect2/3 +%% send/3 -connect2(Pid, PortNr, Bin) -> - monitor(process, Pid), +send(Pid, Header, Body) -> + Pid ! {send, Header, Body}. + +%% sender/3 +%% +%% Start a process that sends, so as not to block the controlling process. +sender(Sock, Id, OS) -> + Pid = self(), + spawn(fun() -> send_loop(Sock, Id, OS, 1, monitor(process, Pid)) end). + +%% send_loop/5 + +send_loop(Sock, Id, OS, N, MRef) -> + receive + {send, L, Body} -> + Stream = N rem OS, + ok = send(Sock, Id, Stream, mark(Body, [N, Stream | L])), + send_loop(Sock, Id, OS, N+1, MRef); + {'DOWN', MRef, process, _, _} = T -> + T; + T -> + error(T) + end. + +%% peeloff/3 + +peeloff(LSock, Id, TPid) -> + {ok, Sock} = gen_sctp:peeloff(LSock, Id), + ok = gen_sctp:controlling_process(Sock, TPid), + Sock. + +%% client/4 + +client(Pid, PortNr, Msgs, Sz) -> + monitor(process, Pid), {ok, Sock} = open(), ok = gen_sctp:connect_init(Sock, ?ADDR, PortNr, []), - Id = assoc(Sock), + recv_loop(Sock, Msgs, Sz). - %% T1 = time before send - %% T2 = time after listening process received our message - %% T3 = time after reply is received +%% recv_loop/3 - T1 = diameter_lib:now(), - ok = send(Sock, Id, Bin), - T2 = unmark(recv(Sock, Id)), - T3 = diameter_lib:now(), - {diameter_lib:micro_diff(T2, T1), %% Outbound - diameter_lib:micro_diff(T3, T2)}. %% Inbound +recv_loop(_, 0, T) -> + [_,_|Acc] = T, + Acc; -%% recv/2 +recv_loop(Sock, Msgs, T) -> + ok = inet:setopts(Sock, [{active, once}]), + {I, NewT} = recv(Sock, Msgs, T, receive X -> X end), + recv_loop(Sock, Msgs - I, NewT). -recv(Sock, Id) -> - receive - ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = I}], Bin}) - when is_binary(Bin) -> - Id = I, %% assert - Bin; - ?SCTP(S, _) -> - Sock = S, %% assert - recv(Sock, Id); - T -> - exit(T) +%% recv/4 + +recv(Sock, Msgs, Sz, ?SCTP(Sock, {_, #sctp_assoc_change{} = A})) -> + #sctp_assoc_change{state = comm_up, %% assert + assoc_id = Id, + outbound_streams = OS} + = A, + true = is_integer(Sz), %% assert + send_n(Msgs, sender(Sock, Id, OS), Sz), + {0, [Id, OS]}; + +recv(Sock, _, T, ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = Id}], Bin})) -> + T4 = diameter_lib:now(), + [Id, OS | Acc] = T, + {1, [Id, OS, stat(T4, Bin) | Acc]}; + +recv(Sock, _, T, ?SCTP(Sock, _)) -> + {0, [_,_|_] = T}; + +recv(_, _, _, T) -> + error(T). + +%% send_n/3 +%% +%% Send messages to the server from dedicated processes. + +send_n(0, _, _) -> + ok; + +send_n(N, Pid, Sz) -> + M = rand:uniform(255), + send(Pid, [Sz], binary:copy(<>, Sz)), + send_n(N-1, Pid, Sz). + +%% send/4 + +send(Sock, Id, Stream, Bin) -> + case gen_sctp:send(Sock, Id, Stream, <>) of + {error, eagain} -> + send(Sock, Id, Stream, Bin); + RC -> + RC end. -%% send/3 +%% stat/2 -send(Sock, Id, Bin) -> - gen_sctp:send(Sock, Id, 0, Bin). +stat(T4, <>) -> + %% T1 = time at send + %% T2 = time at reception by server + %% T3 = time at reception by server's sender + %% T4 = time at reception of answer -%% mark/1 + {[T3,NI,SI,T2,T1,NO,SO,Sz], Bytes} = unmark(Bin), -mark(Bin) -> - Info = term_to_binary(diameter_lib:now()), - <>. + Sz = size(Bin) - Bytes, %% assert + {T1, + {NO, SO, diameter_lib:micro_diff(T2, T1)}, %% Outbound + {NI, SI, diameter_lib:micro_diff(T4, T3)}, %% Inbound + T4}. + +%% mark/2 + +mark(Bin, T) -> + Info = term_to_binary([diameter_lib:now() | T]), + <>. + %% unmark/1 unmark(Bin) -> - binary_to_term(Bin). + T = binary_to_term(Bin), + {T, size(term_to_binary(T))}. + +%% =========================================================================== + +%% send_many_from_one/0 +%% +%% Demonstrates sluggish delivery of messages. + +send_many_from_one() -> + [{timetrap, {seconds, 30}}]. + +send_many_from_one(_) -> + ?OK = send_multiple(1, 128, 1024). %% =========================================================================== @@ -345,7 +401,7 @@ unmark(Bin) -> %% Demonstrates reception of a message that differs from that sent. receive_what_was_sent(_Config) -> - send_from_multiple_clients(1, 1024*32). %% fails + ?OK = send_multiple(1, 1, 1024*32). %% =========================================================================== @@ -357,16 +413,23 @@ open() -> %% open/1 open(Opts) -> - gen_sctp:open([{ip, ?ADDR}, {port, 0}, {active, true}, binary, + gen_sctp:open([{ip, ?ADDR}, {port, 0}, {active, false}, binary, + {sctp_initmsg, #sctp_initmsg{num_ostreams = ?STREAMS, + max_instreams = ?STREAMS}}, {recbuf, 1 bsl 16}, {sndbuf, 1 bsl 16} | Opts]). -%% assoc/1 +%% report/2 -assoc(Sock) -> - receive - ?SCTP(Sock, {_, #sctp_assoc_change{state = S, - assoc_id = Id}}) -> - comm_up = S, %% assert - Id - end. +report(T0, Ts) -> + ct:pal("~p~n", [lists:sort([sort([{diameter_lib:micro_diff(T1,T0), + OT, + IT, + diameter_lib:micro_diff(T4,T0)} + || {T1,OT,IT,T4} <- L]) + || L <- Ts])]). + +%% sort/1 + +sort(L) -> + lists:sort(fun({_,{N,_,_},_,_}, {_,{M,_,_},_,_}) -> N =< M end, L). -- cgit v1.2.3 From 31a1ee5a679dea077007812d4718a784c918a2f2 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 5 Apr 2017 16:02:56 +0200 Subject: Remove superfluous traffic testcase Testcase is already run elsewhere on the suite. --- lib/diameter/test/diameter_traffic_SUITE.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index 4c82d4dee2..105c0ffefa 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -254,7 +254,6 @@ groups() -> [], [start_services, add_transports, - result_codes, {group, SD orelse CD}, remove_transports, stop_services]} -- cgit v1.2.3 From 6759b2b5f0603c080277d2b019789d1693ff5e57 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 5 Apr 2017 16:34:55 +0200 Subject: Make traffic suite fail less brutally Autoskip traffic testcases if transport isn't established instead of having traffic cases run and fail. --- lib/diameter/test/diameter_traffic_SUITE.erl | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index 105c0ffefa..e1d3d186a3 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -252,11 +252,7 @@ groups() -> ++ [{?util:name([T,R,D,A,C,SD,CD]), [], - [start_services, - add_transports, - {group, SD orelse CD}, - remove_transports, - stop_services]} + [{group, SD orelse CD}]} || T <- ?TRANSPORTS, T /= sctp orelse Sctp, R <- ?ENCODINGS, @@ -276,6 +272,11 @@ groups() -> SD <- ?STRING_DECODES, CD <- ?STRING_DECODES]}]. +init_per_group(B, Config) + when is_boolean(B) -> + start_services(Config), + add_transports(Config); + init_per_group(Name, Config) -> case ?util:name(Name) of [T,R,D,A,C,SD,CD] -> @@ -293,6 +294,11 @@ init_per_group(Name, Config) -> Config end. +end_per_group(B, Config) + when is_boolean(B) -> + remove_transports(Config), + stop_services(Config); + end_per_group(_, _) -> ok. -- cgit v1.2.3 From c02abb8185bc9c33b93d13cd720539e5bf5e2fad Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 7 Apr 2017 13:47:39 +0200 Subject: Make skipped SCTP testcases more visible in traffic suite By explicitly skipping instead of omitting testcases from groups. --- lib/diameter/test/diameter_traffic_SUITE.erl | 73 ++++++++++++++++++---------- 1 file changed, 48 insertions(+), 25 deletions(-) diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index e1d3d186a3..5ae05a8bb6 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -27,6 +27,8 @@ -export([suite/0, all/0, groups/0, + init_per_suite/1, + end_per_suite/1, init_per_group/2, end_per_group/2, init_per_testcase/2, @@ -246,15 +248,12 @@ all() -> [start, result_codes, {group, traffic}, outstanding, empty, stop]. groups() -> - Ts = tc(), - Sctp = ?util:have_sctp(), - [{B, [P], Ts} || {B,P} <- [{true, shuffle}, {false, parallel}]] + [{P, [P], Ts} || Ts <- [tc()], P <- [shuffle, parallel]] ++ [{?util:name([T,R,D,A,C,SD,CD]), [], - [{group, SD orelse CD}]} + [{group, if SD orelse CD -> shuffle; true -> parallel end}]} || T <- ?TRANSPORTS, - T /= sctp orelse Sctp, R <- ?ENCODINGS, D <- ?RFCS, A <- ?ENCODINGS, @@ -262,21 +261,41 @@ groups() -> SD <- ?STRING_DECODES, CD <- ?STRING_DECODES] ++ - [{traffic, [], [{group, ?util:name([T,R,D,A,C,SD,CD])} - || T <- ?TRANSPORTS, - T /= sctp orelse Sctp, - R <- ?ENCODINGS, - D <- ?RFCS, - A <- ?ENCODINGS, - C <- ?CONTAINERS, - SD <- ?STRING_DECODES, - CD <- ?STRING_DECODES]}]. - -init_per_group(B, Config) - when is_boolean(B) -> + [{T, [], [{group, ?util:name([T,R,D,A,C,SD,CD])} + || R <- ?ENCODINGS, + D <- ?RFCS, + A <- ?ENCODINGS, + C <- ?CONTAINERS, + SD <- ?STRING_DECODES, + CD <- ?STRING_DECODES]} + || T <- ?TRANSPORTS] + ++ + [{traffic, [], [{group, T} || T <- ?TRANSPORTS]}]. + +%% -------------------- + +init_per_suite(Config) -> + [{sctp, ?util:have_sctp()} | Config]. + +end_per_suite(_Config) -> + ok. + +%% -------------------- + +init_per_group(Name, Config) + when Name == shuffle; + Name == parallel -> start_services(Config), add_transports(Config); +init_per_group(sctp = Name, Config) -> + {_, Sctp} = lists:keyfind(Name, 1, Config), + if Sctp -> + Config; + true -> + {skip, Name} + end; + init_per_group(Name, Config) -> case ?util:name(Name) of [T,R,D,A,C,SD,CD] -> @@ -294,30 +313,34 @@ init_per_group(Name, Config) -> Config end. -end_per_group(B, Config) - when is_boolean(B) -> +end_per_group(Name, Config) + when Name == shuffle; + Name == parallel -> remove_transports(Config), stop_services(Config); end_per_group(_, _) -> ok. +%% -------------------- + %% Skip testcases that can reasonably fail under SCTP. init_per_testcase(Name, Config) -> - case [skip || #group{transport = sctp} - <- [proplists:get_value(group, Config)], - send_maxlen == Name - orelse send_long == Name] + case [G || #group{transport = sctp} = G + <- [proplists:get_value(group, Config)]] of - [skip] -> + [_] when Name == send_maxlen; + Name == send_long -> {skip, sctp}; - [] -> + _ -> [{testcase, Name} | Config] end. end_per_testcase(_, _) -> ok. +%% -------------------- + %% Testcases to run when services are started and connections %% established. tc() -> -- cgit v1.2.3 From 618acfe5dcb0aa3d8ec0101704cd8fc774ac6c90 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 10 Apr 2017 15:28:43 +0200 Subject: Deal with SCTP association id quirk on Solaris In particular, that the association id received in messages on a one-to-one socket after peeloff may be different from the id received on the listen socket at comm_up. This seems odd, since it's then not possible to send until the id is discovered by reception of an SCTP message containing it, but it's unclear if this is a bug or a feature, or if it's specific to certain platforms. Treat it as a feature in this commit, and get the association id as mentioned, an incoming CER being expected before anything is sent. Commit da3e5d67 has more history. --- lib/diameter/src/transport/diameter_sctp.erl | 22 +++++++--- lib/diameter/test/diameter_gen_sctp_SUITE.erl | 58 ++++++++++++++++++--------- 2 files changed, 55 insertions(+), 25 deletions(-) diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index f9feb68874..2059a0e029 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -88,7 +88,9 @@ %% {RAs, RP, Errors} | connect, socket :: gen_sctp:sctp_socket() | undefined, - assoc_id :: gen_sctp:assoc_id(), %% association identifier + assoc_id :: gen_sctp:assoc_id() %% association identifier + | undefined + | true, peer :: {[inet:ip_address()], uint()} %% {RAs, RP} | undefined, streams :: {uint(), uint()} %% {InStream, OutStream} counts @@ -676,7 +678,9 @@ recv({_, #sctp_assoc_change{state = comm_up, = S) -> Ref = getr(?REF_KEY), publish(T, Ref, Id, Sock), - up(S#transport{assoc_id = Id, + %% Deal with different association id after peeloff on Solaris by + %% taking the id from the first reception. + up(S#transport{assoc_id = T == accept orelse Id, streams = {IS, OS}}); %% ... or not: try the next address. @@ -691,16 +695,24 @@ recv({_, #sctp_assoc_change{} = E}, recv({_, #sctp_assoc_change{}}, _) -> stop; +%% First inbound on an accepting transport. +recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} + = T, + #transport{assoc_id = true} + = S) -> + recv(T, S#transport{assoc_id = Id}); + %% Inbound Diameter message. -recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid}) +recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid} = S) when is_binary(Bin) -> diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id}, bin = Bin}), - ok; + S; recv({_, #sctp_shutdown_event{assoc_id = A}}, #transport{assoc_id = Id}) - when A == Id; + when Id; + A == Id; A == 0 -> stop; diff --git a/lib/diameter/test/diameter_gen_sctp_SUITE.erl b/lib/diameter/test/diameter_gen_sctp_SUITE.erl index 457d9a0a85..d76d2bdbd3 100644 --- a/lib/diameter/test/diameter_gen_sctp_SUITE.erl +++ b/lib/diameter/test/diameter_gen_sctp_SUITE.erl @@ -228,33 +228,49 @@ accept_loop(Sock, MRef) -> %% Server process that answers incoming messages as long as the parent %% lives. -assoc(MRef, Id, OS) +assoc(MRef, _Id, OS) when is_reference(MRef) -> {peeloff, Sock} = receive T -> T end, - recv_loop(Sock, Id, sender(Sock, Id, OS), MRef). + recv_loop(Sock, false, sender(Sock, false, OS), MRef). %% recv_loop/4 recv_loop(Sock, Id, Pid, MRef) -> ok = inet:setopts(Sock, [{active, once}]), - receive - ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = I}], B}) - when is_binary(B) -> - T2 = diameter_lib:now(), - I = Id, %% assert - <> = B, %% assert - {[_,_,_,Sz] = L, Bytes} = unmark(Bin), - Sz = size(Bin) - Bytes, %% assert - <<_:Bytes/binary, Body:Sz/binary>> = Bin, - send(Pid, [T2|L], Body), %% answer - recv_loop(Sock, Id, Pid, MRef); - ?SCTP(Sock, _) -> - recv_loop(Sock, Id, Pid, MRef); - {'DOWN', MRef, process, _, Reason} -> - Reason; - T -> - error(T) - end. + recv(Sock, Id, Pid, MRef, receive T -> T end). + +%% recv/5 + +%% Association id can change on a peeloff socket on some versions of +%% Solaris. +recv(Sock, + false, + Pid, + MRef, + ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = Id}], _}) + = T) -> + Pid ! {assoc_id, Id}, + recv(Sock, Id, Pid, MRef, T); + +recv(Sock, Id, Pid, MRef, ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = I}], B})) + when is_binary(B) -> + T2 = diameter_lib:now(), + Id = I, %% assert + <> = B, %% assert + {[_,_,_,Sz] = L, Bytes} = unmark(Bin), + Sz = size(Bin) - Bytes, %% assert + <<_:Bytes/binary, Body:Sz/binary>> = Bin, + send(Pid, [T2|L], Body), %% answer + recv_loop(Sock, Id, Pid, MRef); + +recv(Sock, Id, Pid, MRef, ?SCTP(Sock, _)) -> + recv_loop(Sock, Id, Pid, MRef); + +recv(_, _, _, MRef, {'DOWN', MRef, process, _, Reason}) -> + Reason; + +recv(_, _, _, _, T) -> + error(T). %% send/3 @@ -273,6 +289,8 @@ sender(Sock, Id, OS) -> send_loop(Sock, Id, OS, N, MRef) -> receive + {assoc_id, I} -> + send_loop(Sock, I, OS, N, MRef); {send, L, Body} -> Stream = N rem OS, ok = send(Sock, Id, Stream, mark(Body, [N, Stream | L])), -- cgit v1.2.3 From d7f83ba37b7fb9a9af635213e7ca8877a4438c05 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 12 Apr 2017 18:40:13 +0200 Subject: Use binary:copy/2 when generating largish data in test suites Faster than lists:duplicate/2. --- lib/diameter/test/diameter_gen_tcp_SUITE.erl | 4 ++-- lib/diameter/test/diameter_traffic_SUITE.erl | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/diameter/test/diameter_gen_tcp_SUITE.erl b/lib/diameter/test/diameter_gen_tcp_SUITE.erl index 2be2cf4b35..db42ea813e 100644 --- a/lib/diameter/test/diameter_gen_tcp_SUITE.erl +++ b/lib/diameter/test/diameter_gen_tcp_SUITE.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2014-2015. All Rights Reserved. +%% Copyright Ericsson AB 2014-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -54,7 +54,7 @@ all() -> send_long(_) -> {Sock, SendF} = connection(), - B = list_to_binary(lists:duplicate(1 bsl 20, $X)), + B = binary:copy(<<$X>>, 1 bsl 20), ok = SendF(B), B = recv(Sock, size(B), []). diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index 5ae05a8bb6..7fd13b0536 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -717,14 +717,14 @@ send_unexpected_mandatory(Config) -> %% Send something long that will be fragmented by TCP. send_long(Config) -> Req = ['STR', {'Termination-Cause', ?LOGOUT}, - {'User-Name', [lists:duplicate(1 bsl 20, $X)]}], + {'User-Name', [binary:copy(<<$X>>, 1 bsl 20)]}], ['STA', {'Session-Id', _}, {'Result-Code', ?SUCCESS} | _] = call(Config, Req). %% Send something longer than the configure incoming_maxlen. send_maxlen(Config) -> Req = ['STR', {'Termination-Cause', ?LOGOUT}, - {'User-Name', [lists:duplicate(1 bsl 21, $X)]}], + {'User-Name', [binary:copy(<<$X>>, 1 bsl 21)]}], {timeout, _} = call(Config, Req). %% Send something for which pick_peer finds no suitable peer. -- cgit v1.2.3 From 9a90f20f365a90612ee5e5f7db14fd71b4875b84 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 24 Apr 2017 11:07:27 +0200 Subject: Deal with (another) SCTP association id quirk on Solaris Shutdown events have been seen to get a different association id. For example, first incoming message with association id = 0: + {trace_ts,<6421.268.0>,call, {diameter_sctp,handle_info, [{sctp,#Port<6421.2588>, {10,67,16,179}, 44159, {[{sctp_sndrcvinfo,0,0,[],0,0,0,269950872,269950872,0}], <<1,0,0,156,128,0,1,1,0,0,0,0,6,193,40,137,6,193,40,137,0,0, 1,8,64,0,0,30,67,45,49,51,52,50,49,55,52,52,49,46,101,114, 108,97,110,103,46,111,114,103,0,0,0,0,1,40,64,0,0,18,101, 114,108,97,110,103,46,111,114,103,0,0,0,0,1,1,64,0,0,14,0, 1,127,0,0,1,0,0,0,0,1,10,64,0,0,12,0,0,48,57,0,0,1,13,0,0, 0,20,79,84,80,47,100,105,97,109,101,116,101,114,0,0,1,22, 64,0,0,12,0,0,0,1,0,0,1,2,64,0,0,12,0,0,0,0,0,0,1,3,64,0,0, 12,0,0,0,3>>}}, {transport,<6421.252.0>,accept,#Port<6421.2588>,true,undefined, {32,32}, 0,undefined}]}, {1493,21505,577938}} Later, a shutdown event with association id 1536: + {trace_ts,<6421.268.0>,call, {diameter_sctp,handle_info, [{sctp,#Port<6421.2588>, {10,67,16,179}, 44159, {[],{sctp_shutdown_event,1536}}}, {transport,<6421.252.0>,accept,#Port<6421.2588>,0,undefined, {32,32}, 2,<6421.304.0>}]}, {1493,21505,746929}} Both this and the grandparent commit are on this: $ uname -a SunOS beren 5.10 Generic_118833-33 sun4v sparc SUNW,Netra-T2000 --- lib/diameter/src/transport/diameter_sctp.erl | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 2059a0e029..1e64804666 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -709,11 +709,7 @@ recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid} = S) bin = Bin}), S; -recv({_, #sctp_shutdown_event{assoc_id = A}}, - #transport{assoc_id = Id}) - when Id; - A == Id; - A == 0 -> +recv({_, #sctp_shutdown_event{}}, _) -> stop; %% Note that diameter_sctp(3) documents that sctp_events cannot be -- cgit v1.2.3 From ea339754d579b7e917dab0afa9a4694689f1f990 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 11 Mar 2017 21:11:48 +0100 Subject: Strip throttling callbacks from diameter_tcp Commits starting at 472a080c added a throttle_cb option to diameter_tcp to let a callback apply backpressure when it decides that additional requests should not be read. It didn't provide a hook for knowing that an answer was sent however, which is needed when sends no longer take place in the receiver process, and is more complicated than it should be. Strip it all away, in preparation for a simpler incarnation. --- lib/diameter/src/transport/diameter_tcp.erl | 221 +++++++--------------------- 1 file changed, 51 insertions(+), 170 deletions(-) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index f745e2a6f7..c81701b624 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -19,7 +19,6 @@ %% -module(diameter_tcp). --dialyzer({no_fail_call, throttle/2}). -behaviour(gen_server). @@ -109,22 +108,19 @@ | gen_tcp:listen_option(). -type option() :: {port, non_neg_integer()} - | {fragment_timer, 0..16#FFFFFFFF} - | {throttle_cb, diameter:evaluable()}. + | {fragment_timer, 0..16#FFFFFFFF}. %% Accepting/connecting transport process state. -record(transport, {socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module - frag = <<>> :: frag(), %% message fragment ssl :: [term()] | boolean(), %% ssl options, ssl or not + frag = <<>> :: frag(), %% message fragment timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? - throttle_cb :: false | diameter:evaluable(), %% ask to receive - throttled :: boolean() | binary(), %% stopped receiving? - monitor :: pid()}). + monitor :: pid()}). %% monitor/sender process %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -213,30 +209,26 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) %% Since accept/connect might block indefinitely, spawn a process %% that kills us with the parent until call returns, and then %% sends outgoing messages. - {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, - fragment_timer, - throttle_cb]), + fragment_timer]), SslOpts = ssl_opts(SO), OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), - Throttle = proplists:get_value(throttle_cb, OwnOpts, false), + {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid), M = if SslOpts -> ssl; true -> Mod end, monitor(process, MPid), MPid ! {start, self(), Sock, M}, %% prepare monitor for sending putr(?REF_KEY, Ref), - throttle(#transport{parent = Pid, - module = M, - socket = Sock, - ssl = SslOpts, - timeout = Tmo, - throttle_cb = Throttle, - throttled = false /= Throttle, - monitor = MPid}); + setopts(#transport{parent = Pid, + module = M, + socket = Sock, + ssl = SslOpts, + timeout = Tmo, + monitor = MPid}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -518,7 +510,7 @@ getr(Key) -> %% Transition monitor state. %% Outgoing message. -m(Bin, #monitor{} = S) +m(Bin, S) when is_binary(Bin) -> send(Bin, S), S; @@ -539,7 +531,7 @@ m({stop, TPid} = T, #monitor{transport = TPid}) -> %% Transport is telling us that TLS has been negotiated after %% capabilities exchange. -m({tls, SSock}, #monitor{} = S) -> +m({tls, SSock}, S) -> S#monitor{socket = SSock, module = ssl}; @@ -583,22 +575,14 @@ t(T,S) -> %% transition/2 -%% Incoming message. +%% Incoming packets. transition({P, Sock, Bin}, #transport{socket = Sock, - ssl = B, - throttled = T} + ssl = B} = S) when P == ssl, true == B; P == tcp -> - false = T, %% assert recv(Bin, S); -%% Make a new throttling callback after a timeout. -transition(throttle, #transport{throttled = false}) -> - ok; -transition(throttle, S) -> - throttle(S); - %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = S) -> @@ -607,7 +591,7 @@ transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = NS = tls_handshake(Type, B, S), Pid ! {diameter, {tls, Ref}}, - throttle(NS#transport{ssl = B}); + NS#transport{ssl = B}; transition({C, Sock}, #transport{socket = Sock, ssl = B}) @@ -623,8 +607,8 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, ?ERROR({T,S}); %% Outgoing message. -transition({diameter, {send, Bin}}, #transport{} = S) -> - send(Bin, S), +transition({diameter, {send, Msg}}, #transport{} = S) -> + send(Msg, S), ok; %% Request to close the transport connection. @@ -703,24 +687,16 @@ tls(accept, Sock, Opts) -> %% using Nagle. %% Receive packets until a full message is received, -recv(Bin, #transport{frag = Head, throttled = false} = S) -> +recv(Bin, #transport{parent = Pid, frag = Head} = S) -> case rcv(Head, Bin) of - {Msg, B} -> - throttle(S#transport{frag = B, throttled = Msg}); - Frag -> - setopts(S), - start_fragment_timer(S#transport{frag = Frag, - flush = false}) + {Msg, B} -> %% have a complete message ... + diameter_peer:recv(Pid, Msg), + recv(<<>>, S#transport{frag = B}); + Frag -> %% read more on the socket + start_fragment_timer(setopts(S#transport{frag = Frag, + flush = false})) end. -%% recv/1 - -recv(#transport{throttled = false} = S) -> - recv(<<>>, S); - -recv(#transport{} = S) -> - S. - %% rcv/2 %% No previous fragment. @@ -780,13 +756,16 @@ recv1(Len, Bin) -> <> = Bin, {Msg, Rest}. -%% bin/1-2 +%% bin/2 bin(Head, Acc) -> list_to_binary([Head | lists:reverse(Acc)]). +%% bin/1 + bin({_, _, Head, Acc}) -> bin(Head, Acc); + bin(Bin) when is_binary(Bin) -> Bin. @@ -805,9 +784,7 @@ bin(Bin) %% also eventually lead to watchdog failover. %% No fragment to flush or not receiving messages. -flush(#transport{frag = Frag, throttled = B} = S) - when Frag == <<>>; - B /= false -> +flush(#transport{frag = <<>>} = S) -> S; %% Messages have been received since last timer expiry. @@ -850,14 +827,11 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(Bin, #monitor{socket = Sock, - module = M}) -> - case send(M, Sock, Bin) of - ok -> - ok; - {error, Reason} -> - x({send, Reason}) - end; +send(#diameter_packet{bin = Bin}, S) -> + send(Bin, S); + +send(Bin, #monitor{} = S) -> + send1(Bin, S); %% Send from the monitor process to avoid deadlock if both the %% receiver and the peer were to block in send. @@ -865,6 +839,17 @@ send(Bin, #transport{monitor = MPid}) -> MPid ! Bin, MPid. +%% send1/2 + +send1(Bin, #monitor{socket = Sock, + module = M}) -> + case send(M, Sock, Bin) of + ok -> + ok; + {error, Reason} -> + x({send, Reason}) + end. + %% send/3 send(gen_tcp, Sock, Bin) -> @@ -885,116 +870,12 @@ setopts(M, Sock, Opts) -> %% setopts/1 -setopts(#transport{socket = Sock, module = M}) -> - setopts(M, Sock). - -%% setopts/2 - -setopts(M, Sock) -> +setopts(#transport{socket = Sock, + module = M} + = S)-> case setopts(M, Sock, [{active, once}]) of - ok -> ok; - X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect - end. - -%% throttle/1 - -%% Still collecting packets for a complete message: keep receiving. -throttle(#transport{throttled = false} = S) -> - recv(S); - -%% Decide whether to receive another, or whether to accept a message -%% that's been received. -throttle(#transport{throttle_cb = F, throttled = T} = S) -> - Res = cb(F, T), - - try throttle(Res, S) of - #transport{ssl = SB} = NS when is_boolean(SB) -> - throttle(defrag(NS)); - #transport{throttled = Msg} = NS when is_binary(Msg) -> - %% Initial incoming message when we might need to upgrade - %% to TLS: wait for reception of a tls tuple. - defrag(NS) - catch - #transport{} = NS -> - recv(NS) - end. - -%% cb/2 - -cb(false, _) -> - ok; - -cb(F, B) -> - diameter_lib:eval([F, true /= B andalso B]). - -%% throttle/2 - -%% Callback says to receive another message. -throttle(ok, #transport{throttled = true} = S) -> - throw(S#transport{throttled = false}); - -%% Callback says to accept a received message. -throttle(ok, #transport{parent = Pid, throttled = Msg} = S) - when is_binary(Msg) -> - diameter_peer:recv(Pid, Msg), - S; - -throttle({ok = T, F}, S) -> - throttle(T, S#transport{throttle_cb = F}); - -%% Callback says to accept a received message and acknowledged the -%% returned pid with a {request, Pid} message if a request pid is -%% spawned, a discard message otherwise. The latter does not mean that -%% the message was necessarily discarded: it could have been an -%% answer. -throttle(NPid, #transport{parent = Pid, throttled = Msg} = S) - when is_pid(NPid), is_binary(Msg) -> - diameter_peer:recv(Pid, {Msg, NPid}), - S; - -throttle({NPid, F}, #transport{throttled = Msg} = S) - when is_pid(NPid), is_binary(Msg) -> - throttle(NPid, S#transport{throttle_cb = F}); - -%% Callback to accept a received message says to discard it. -throttle(discard, #transport{throttled = Msg} = S) - when is_binary(Msg) -> - S; -throttle({discard = T, F}, #transport{throttled = Msg} = S) - when is_binary(Msg) -> - throttle(T, S#transport{throttle_cb = F}); - -%% Callback to accept a received message says to answer it with the -%% supplied binary. -throttle(Bin, #transport{throttled = Msg} = S) - when is_binary(Bin), is_binary(Msg) -> - send(Bin, S), - S; -throttle({Bin, F}, #transport{throttled = Msg} = S) - when is_binary(Bin), is_binary(Msg) -> - throttle(Bin, S#transport{throttle_cb = F}); - -%% Callback says to ask again in the specified number of milliseconds. -throttle({timeout, Tmo}, S) -> - erlang:send_after(Tmo, self(), throttle), - throw(S); -throttle({timeout = T, Tmo, F}, S) -> - throttle({T, Tmo}, S#transport{throttle_cb = F}); - -throttle(T, #transport{throttle_cb = F}) -> - ?ERROR({invalid_return, T, F}). - -%% defrag/1 -%% -%% Try to extract another message from packets already read before -%% another throttling callback. - -defrag(#transport{frag = Head} = S) -> - case rcv(Head, <<>>) of - {Msg, B} -> - S#transport{throttled = Msg, frag = B}; - _ -> - S#transport{throttled = true} + ok -> S; + X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect end. %% portnr/2 -- cgit v1.2.3 From ca09cf7b697798aca5a4f81a11d5ad1d90f4107e Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 15 Mar 2017 17:04:44 +0100 Subject: Simplify acks to transport processes What's interesting when implementing some form of load regulation is when an incoming request has been answered or discarded. Acknowledge exactly this, not the identity of handler processes as previously. A transport process can request acks of nonforthcoming answers by sending {diameter, ack} to the parent peer_fsm, a handler processes identifies itself with a {handler, pid()} message, and the peer_fsm monitors on this to be able to send a notification to the transport if the handler dies before sending an answer. --- lib/diameter/src/base/diameter_peer_fsm.erl | 138 +++++++++++++++++----------- lib/diameter/src/base/diameter_traffic.erl | 80 ++++++++-------- lib/diameter/src/base/diameter_watchdog.erl | 37 +++----- 3 files changed, 136 insertions(+), 119 deletions(-) diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index d394156367..d2af8fe425 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -129,6 +129,7 @@ %% the request was sent explicitly with %% diameter:call/4. strict :: boolean(), + ack = false :: boolean(), length_errors :: exit | handle | discard, incoming_maxlen :: integer() | infinity}). @@ -235,7 +236,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> Tmo = proplists:get_value(capx_timeout, Opts, ?CAPX_TIMEOUT), Strictness = proplists:get_value(capx_strictness, Opts, true), - OnLengthErr = proplists:get_value(length_errors, Opts, exit), + LengthErr = proplists:get_value(length_errors, Opts, exit), {TPid, Addrs} = start_transport(T, Rest, Svc), @@ -247,7 +248,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> dictionary = Dict0, mode = M, service = svc(Svc, Addrs), - length_errors = OnLengthErr, + length_errors = LengthErr, strict = Strictness, incoming_maxlen = Maxlen}. %% The transport returns its local ip addresses so that different @@ -442,10 +443,18 @@ transition({connection_timeout = T, TPid}, transition({connection_timeout, _}, _) -> ok; +%% Requests for acknowledgements to the transport. +transition({diameter, ack}, S) -> + S#state{ack = true}; + %% Incoming message from the transport. -transition({diameter, {recv, MsgT}}, S) -> - {Msg, NPid} = msg(MsgT), - incoming(recv(Msg, S), NPid, S); +transition({diameter, {recv, Msg}}, S) -> + incoming(recv(Msg, S), S); + +%% Handler of an incoming request is telling of its existence. +transition({handler, Pid}, _) -> + put_route(Pid), + ok; %% Timeout when still in the same state ... transition({timeout = T, PS}, #state{state = PS}) -> @@ -459,7 +468,7 @@ transition({timeout, _}, _) -> transition({send, Msg}, S) -> outgoing(Msg, S); transition({send, Msg, Route}, S) -> - put_route(Route), + route_outgoing(Route), outgoing(Msg, S); %% Request for graceful shutdown at remove_transport, stop_service of @@ -488,12 +497,13 @@ transition({'DOWN', _, process, WPid, _}, transition({'DOWN', _, process, TPid, _}, #state{transport = TPid} = S) -> - start_next(S); + start_next(S#state{ack = false}); %% Transport has died after connection timeout, or handler process has %% died. -transition({'DOWN', _, process, Pid, _}, _) -> - erase_route(Pid), +transition({'DOWN', _, process, Pid, _}, #state{transport = TPid}) -> + is_reference(erase_route(Pid)) + andalso send(TPid, false), %% answer not forthcoming ok; %% State query. @@ -503,37 +513,56 @@ transition({state, Pid}, #state{state = S, transport = TPid}) -> %% Crash on anything unexpected. -%% put_route/1 -%% +%% route_outgoing/1 + %% Map identifiers in an outgoing request to be able to lookup the %% handler process when the answer is received. - -put_route({Pid, Ref, Seqs}) -> +route_outgoing({Pid, Ref, Seqs}) -> %% request MRef = monitor(process, Pid), put(Pid, Seqs), - put(Seqs, {Pid, Ref, MRef}). + put(Seqs, {Pid, Ref, MRef}); -%% get_route/1 +%% Remove a mapping made for an incoming request. +route_outgoing(Pid) + when is_pid(Pid) -> %% answer + MRef = erase_route(Pid), + undefined == MRef orelse demonitor(MRef). -get_route(#diameter_packet{header = #diameter_header{is_request = false}} - = Pkt) -> +%% put_route/1 + +%% Monitor on a handler process for an incoming request. +put_route(Pid) -> + MRef = monitor(process, Pid), + put(Pid, MRef). + +%% get_route/2 + +%% incoming answer +get_route(_, #diameter_packet{header = #diameter_header{is_request = false}} + = Pkt) -> Seqs = diameter_codec:sequence_numbers(Pkt), case erase(Seqs) of {Pid, Ref, MRef} -> demonitor(MRef), erase(Pid), {Pid, Ref, self()}; - undefined -> + undefined -> %% request unknown false end; -get_route(_) -> - false. +%% incoming request +get_route(Ack, _) -> + Ack. %% erase_route/1 erase_route(Pid) -> - erase(erase(Pid)). + case erase(Pid) of + {_,_} = Seqs -> + erase(Seqs); + T -> + T + end. %% capx/1 @@ -610,26 +639,26 @@ encode(Rec, Dict) -> diameter_codec:encode(Dict, #diameter_packet{header = Hdr, msg = Rec}). -%% incoming/3 +%% incoming/2 -incoming({recv, Name, Pkt}, NPid, #state{parent = Pid} = S) -> - Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid}, +incoming({recv = T, Name, Pkt}, #state{parent = Pid, ack = Ack} = S) -> + Pid ! {T, self(), get_route(Ack, Pkt), Name, Pkt}, rcv(Name, Pkt, S); -incoming(T, false, _) -> - T; - -incoming(T, NPid, _) -> - NPid ! {diameter, discard}, - T. +incoming(#diameter_header{is_request = R}, #state{transport = TPid, + ack = Ack}) -> + R andalso Ack andalso send(TPid, false), + ok; -%% msg/1 +incoming(<<_:32, 1:1, _/bits>>, #state{ack = true} = S) -> + send(S#state.transport, false), + ok; -msg({_,_} = T) -> - T; +incoming(<<_/bits>>, _) -> + ok; -msg(Msg) -> - {Msg, false}. +incoming(T, _) -> + T. %% recv/2 @@ -654,18 +683,19 @@ recv1(_, #diameter_packet{header = H, bin = Bin}, #state{incoming_maxlen = M}) when M < size(Bin) -> - invalid(false, incoming_maxlen_exceeded, {size(Bin), H}); + invalid(false, incoming_maxlen_exceeded, {size(Bin), H}), + H; %% Ignore anything but an expected CER/CEA if so configured. This is %% non-standard behaviour. -recv1(Name, _, #state{state = {'Wait-CEA', _, _}, - strict = false}) +recv1(Name, #diameter_packet{header = H}, #state{state = {'Wait-CEA', _, _}, + strict = false}) when Name /= 'CEA' -> - ok; -recv1(Name, _, #state{state = recv_CER, - strict = false}) + H; +recv1(Name, #diameter_packet{header = H}, #state{state = recv_CER, + strict = false}) when Name /= 'CER' -> - ok; + H; %% Incoming request after outgoing DPR: discard. Don't discard DPR, so %% both ends don't do so when sending simultaneously. @@ -673,13 +703,15 @@ recv1(Name, #diameter_packet{header = #diameter_header{is_request = true} = H}, #state{dpr = {_,_,_}}) when Name /= 'DPR' -> - invalid(false, recv_after_outgoing_dpr, H); + invalid(false, recv_after_outgoing_dpr, H), + H; %% Incoming request after incoming DPR: discard. recv1(_, #diameter_packet{header = #diameter_header{is_request = true} = H}, #state{dpr = true}) -> - invalid(false, recv_after_incoming_dpr, H); + invalid(false, recv_after_incoming_dpr, H), + H; %% DPA with identifier mismatch, or in response to a DPR initiated by %% the service. @@ -716,10 +748,12 @@ recv(#diameter_header{} #diameter_packet{bin = Bin}, #state{length_errors = E}) -> T = {size(Bin), bit_size(Bin) rem 8, H}, - invalid(E, message_length_mismatch, T); + invalid(E, message_length_mismatch, T), + Bin; recv(false, #diameter_packet{bin = Bin}, #state{length_errors = E}) -> - invalid(E, truncated_header, Bin). + invalid(E, truncated_header, Bin), + Bin. %% Note that counters here only count discarded messages. invalid(E, Reason, T) -> @@ -775,14 +809,10 @@ rcv('DPA' = N, diameter_peer:close(TPid), {stop, N}; -%% Ignore anything else, an unsolicited DPA in particular. Note that -%% dpa_timeout deals with the case in which the peer sends the wrong -%% identifiers in DPA. -rcv(N, #diameter_packet{header = H}, _) - when N == 'CER'; - N == 'CEA'; - N == 'DPR'; - N == 'DPA' -> +%% Ignore an unsolicited DPA in particular. Note that dpa_timeout +%% deals with the case in which the peer sends the wrong identifiers +%% in DPA. +rcv('DPA' = N, #diameter_packet{header = H}, _) -> ?LOG(ignored, N), %% Note that these aren't counted in the normal recv counter. diameter_stats:incr({diameter_codec:msg_id(H), recv, ignored}), diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index bc1ccf4feb..96f3a307f9 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -30,7 +30,7 @@ -export([send_request/4]). %% towards diameter_watchdog --export([receive_message/6]). +-export([receive_message/5]). %% towards diameter_peer_fsm and diameter_watchdog -export([incr/4, @@ -93,7 +93,7 @@ packet :: #diameter_packet{} | undefined}). %% of request %% --------------------------------------------------------------------------- -%% # make_recvdata/1 +%% make_recvdata/1 %% --------------------------------------------------------------------------- make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) -> @@ -206,42 +206,36 @@ incr_rc(Dir, Pkt, TPid, Dict0) -> incr_rc(Dir, Pkt, TPid, {Dict0, Dict0, Dict0}). %% --------------------------------------------------------------------------- -%% # receive_message/6 +%% receive_message/5 %% -%% Handle an incoming Diameter message. +%% Handle an incoming Diameter message in a watchdog process. %% --------------------------------------------------------------------------- -%% Handle an incoming Diameter message in the watchdog process. - -receive_message(TPid, Route, Pkt, false, Dict0, RecvData) -> - incoming(TPid, Route, Pkt, Dict0, RecvData); - -receive_message(TPid, Route, Pkt, NPid, Dict0, RecvData) -> - NPid ! {diameter, incoming(TPid, Route, Pkt, Dict0, RecvData)}. - -%% incoming/4 - -incoming(TPid, Route, Pkt, Dict0, RecvData) - when is_pid(TPid) -> +-spec receive_message(pid(), Route, #diameter_packet{}, module(), #recvdata{}) + -> pid() + | boolean() + when Route :: {Handler, RequestRef, Seqs} + | Ack, + Handler :: pid(), + RequestRef :: reference(), + Seqs :: {0..16#FFFFFFFF, 0..16#FFFFFFFF}, + Ack :: boolean(). + +receive_message(TPid, Route, Pkt, Dict0, RecvData) -> #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, recv(R, Route, TPid, Pkt, Dict0, RecvData). %% recv/6 %% Incoming request ... -recv(true, false, TPid, Pkt, Dict0, T) -> - try - {request, spawn_request(TPid, Pkt, Dict0, T)} - catch - error: system_limit = E -> %% discard - ?LOG(error, E), - discard - end; +recv(true, Ack, TPid, Pkt, Dict0, T) + when is_boolean(Ack) -> + spawn_request(Ack, TPid, Pkt, Dict0, T); %% ... answer to known request ... recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) -> Pid ! {answer, Ref, TPid, Dict0, Pkt}, - {answer, Pid}; + true; %% Note that failover could have happened prior to this message being %% received and triggering failback. That is, both a failover message @@ -256,23 +250,27 @@ recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) -> recv(false, false, TPid, Pkt, _, _) -> ?LOG(discarded, Pkt#diameter_packet.header), incr(TPid, {{unknown, 0}, recv, discarded}), - discard. + false. -%% spawn_request/4 +%% spawn_request/5 -spawn_request(TPid, Pkt, Dict0, {Opts, RecvData}) -> - spawn_request(TPid, Pkt, Dict0, Opts, RecvData); -spawn_request(TPid, Pkt, Dict0, RecvData) -> - spawn_request(TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData). +spawn_request(Ack, TPid, Pkt, Dict0, {Opts, RecvData}) -> + spawn_request(Ack, TPid, Pkt, Dict0, Opts, RecvData); +spawn_request(Ack, TPid, Pkt, Dict0, RecvData) -> + spawn_request(Ack, TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData). -spawn_request(TPid, Pkt, Dict0, Opts, RecvData) -> - spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts). +spawn_request(Ack, TPid, Pkt, Dict0, Opts, RecvData) -> + spawn_opt(fun() -> + recv_request(Ack, TPid, Pkt, Dict0, RecvData) + end, + Opts). %% --------------------------------------------------------------------------- -%% recv_request/4 +%% recv_request/5 %% --------------------------------------------------------------------------- -recv_request(TPid, +recv_request(Ack, + TPid, #diameter_packet{header = #diameter_header{application_id = Id}} = Pkt, Dict0, @@ -280,6 +278,7 @@ recv_request(TPid, apps = Apps, codec = Opts} = RecvData) -> + Ack andalso (TPid ! {handler, self()}), diameter_codec:setopts([{common_dictionary, Dict0} | Opts]), send_A(recv_R(diameter_service:find_incoming_app(PeerT, TPid, Id, Apps), TPid, @@ -511,7 +510,7 @@ send_A(T, TPid, {AppDict, Dict0} = DictT0, ReqPkt, EvalPktFs, EvalFs) -> {MsgDict, Pkt} = reply(T, TPid, DictT0, EvalPktFs, ReqPkt), incr(send, Pkt, TPid, AppDict), incr_rc(send, Pkt, TPid, {MsgDict, AppDict, Dict0}), %% count outgoing - send(TPid, Pkt), + send(TPid, Pkt, _Route = self()), lists:foreach(fun diameter_lib:eval/1, EvalFs). %% answer/6 @@ -1207,7 +1206,7 @@ x(T) -> exit(T). %% --------------------------------------------------------------------------- -%% # send_request/4 +%% send_request/4 %% %% Handle an outgoing Diameter request. %% --------------------------------------------------------------------------- @@ -1296,7 +1295,7 @@ mo(T, _) -> ?ERROR({invalid_option, T}). %% --------------------------------------------------------------------------- -%% # send_request/6 +%% send_request/6 %% --------------------------------------------------------------------------- %% Send an outgoing request in its dedicated process. @@ -1745,11 +1744,6 @@ recv(TPid, Pid, TRef, {LocalTRef, MRef}) -> exit({timeout, LocalTRef, TPid} = T) end. -%% send/2 - -send(Pid, Pkt) -> - Pid ! {send, Pkt}. - %% send/3 send(Pid, Pkt, Route) -> diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 4484b7ee2c..a2eb661870 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -283,7 +283,7 @@ event(Msg, ?LOG(transition, {From, To}). data(Msg, TPid, reopen, okay) -> - {recv, TPid, false, 'DWA', _Pkt, _NPid} = Msg, %% assert + {recv, TPid, _, 'DWA', _Pkt} = Msg, %% assert {TPid, T} = eraser(open), [T]; @@ -302,6 +302,8 @@ tpid(_, Pid) tpid(Pid, _) -> Pid. +%% send/2 + send(Pid, T) -> Pid ! T. @@ -447,14 +449,15 @@ transition({'DOWN', _, process, TPid, _Reason} = D, end; %% Incoming message. -transition({recv, TPid, Route, Name, Pkt, NPid}, +transition({recv, TPid, Route, Name, Pkt}, #watchdog{transport = TPid} = S) -> - try - incoming(Name, Pkt, NPid, S) - catch + try incoming(Route, Name, Pkt, S) of #watchdog{dictionary = Dict0, receive_data = T} = NS -> - diameter_traffic:receive_message(TPid, Route, Pkt, NPid, Dict0, T), + diameter_traffic:receive_message(TPid, Route, Pkt, Dict0, T), + NS + catch + #watchdog{} = NS -> NS end; @@ -586,23 +589,13 @@ send_watchdog(#watchdog{pending = false, %% incoming/4 -incoming(Name, Pkt, false, S) -> - recv(Name, Pkt, S); - -incoming(Name, Pkt, NPid, S) -> - NS = recv(Name, Pkt, S), - NPid ! {diameter, discard}, - NS. - -%% recv/3 - -recv(Name, Pkt, S) -> - try rcv(Name, Pkt, rcv(Name, S)) of - #watchdog{} = NS -> - throw(NS) +incoming(Route, Name, Pkt, S) -> + try rcv(Name, S) of + NS -> rcv(Name, Pkt, NS) catch - #watchdog{} = NS -> %% throwaway - NS + #watchdog{transport = TPid} = NS when Route -> %% incoming request + send(TPid, {send, false}), %% requiring ack + throw(NS) end. %% rcv/3 -- cgit v1.2.3 From 200697ef596dbe689443834aaf5aa0303971ca5d Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Tue, 4 Apr 2017 11:24:48 +0200 Subject: Fix incomprehensible dialyzer warning This: diameter_tcp.erl:241: Record construction #transport{parent::'false',ssl::boolean() | maybe_improper_list(),frag::<<>>,tref::'false',flush::'false',pending::0,reset::{1 | 4,0 | 2},throttled::boolean(),q::{0,queue:queue(_)},monitor::'undefined' | pid()} violates the declared type of field parent::pid() The problem isn't #transport.pid at all, it's #monitor.pid, and the only relation is that the pid that's assigned to the latter is also (later) assigned to the former. There is no record construction that assigns false to #transport.parent. Introduced in commit 33a535e4. --- lib/diameter/src/transport/diameter_tcp.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index c81701b624..8f2e9027fc 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -80,7 +80,7 @@ %% Monitor process state. The name monitor predates its role as sender. -record(monitor, - {parent :: reference() | false, + {parent :: reference() | false | pid(), transport = self() :: pid(), socket :: inet:socket() | ssl:sslsocket() | undefined, module :: module() | undefined}). -- cgit v1.2.3 From 034089ed1ba3f78c732edcfc84d85a6ed4a4854e Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 10 Jun 2017 06:34:44 +0200 Subject: Remove upgrade from diameter_sctp; tweak diameter_tcp to match Added in commit 2afd1fe5. Only rename variables in diameter_tcp, no functional change. --- lib/diameter/src/transport/diameter_sctp.erl | 26 ++++++++++-------------- lib/diameter/src/transport/diameter_tcp.erl | 30 ++++++++++++++-------------- 2 files changed, 25 insertions(+), 31 deletions(-) diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 1e64804666..d23e56b413 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -108,7 +108,7 @@ -record(listener, {ref :: reference(), socket :: gen_sctp:sctp_socket(), - service = false :: false | pid(), %% service process + service :: pid(), %% service process pending = {0, queue:new()}, accept :: [match()]}). %% Field pending implements two queues: the first of transport-to-be @@ -142,11 +142,11 @@ start(T, Svc, Opts) when is_list(Opts) -> #diameter_service{capabilities = Caps, - pid = SPid} + pid = Pid} = Svc, diameter_sctp_sup:start(), %% start supervisors on demand Addrs = Caps#diameter_caps.host_ip_address, - s(T, Addrs, SPid, lists:map(fun ip/1, Opts)). + s(T, Addrs, Pid, lists:map(fun ip/1, Opts)). ip({ifaddr, A}) -> {ip, A}; @@ -157,9 +157,9 @@ ip(T) -> %% when there is not yet an association to assign it, or at comm_up on %% a new association in which case the call retrieves a transport from %% the pending queue. -s({accept, Ref} = A, Addrs, SPid, Opts) -> - {ok, LPid, LAs} = listener(Ref, {Opts, Addrs}), - try gen_server:call(LPid, {A, self(), SPid}, infinity) of +s({accept, Ref} = A, Addrs, SvcPid, Opts) -> + {ok, LPid, LAs} = listener(Ref, {Opts, SvcPid, Addrs}), + try gen_server:call(LPid, {A, self()}, infinity) of {ok, TPid} -> {ok, TPid, LAs}; No -> @@ -172,7 +172,7 @@ s({accept, Ref} = A, Addrs, SPid, Opts) -> %% gen_sctp in order to be able to accept a new association only %% *after* an accepting transport has been spawned. -s({connect = C, Ref}, Addrs, _SPid, Opts) -> +s({connect = C, Ref}, Addrs, _SvcPid, Opts) -> diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}). %% start_link/1 @@ -233,7 +233,8 @@ i(#monitor{transport = TPid} = S) -> S; %% A process owning a listening socket. -i({listen, Ref, {Opts, Addrs}}) -> +i({listen, Ref, {Opts, SvcPid, Addrs}}) -> + monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence {[Matches], Rest} = proplists:split(Opts, [accept]), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), @@ -241,6 +242,7 @@ i({listen, Ref, {Opts, Addrs}}) -> true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), proc_lib:init_ack({ok, self(), LAs}), #listener{ref = Ref, + service = SvcPid, socket = Sock, accept = [[M] || {accept, M} <- Matches]}; @@ -390,14 +392,6 @@ handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref} = S) -> {TPid, NewS} = accept(Ref, Pid, S), {reply, {ok, TPid}, NewS}; -handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) -> - handle_call({T, Pid}, From, if not is_pid(P), is_pid(SPid) -> - monitor(process, SPid), - S#listener{service = SPid}; - true -> - S - end); - %% Transport is telling us of parent death. handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> {stop, {shutdown, Reason}, ok, S}; diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 8f2e9027fc..427b2395b9 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -142,13 +142,13 @@ start({T, Ref}, Svc, Opts) -> #diameter_service{capabilities = Caps, - pid = SPid} + pid = SvcPid} = Svc, diameter_tcp_sup:start(), %% start tcp supervisors on demand {Mod, Rest} = split(Opts), Addrs = Caps#diameter_caps.host_ip_address, - Arg = {T, Ref, Mod, self(), Rest, Addrs, SPid}, + Arg = {T, Ref, Mod, self(), Rest, Addrs, SvcPid}, diameter_tcp_sup:start_child(Arg). split([{module, M} | Opts]) -> @@ -202,7 +202,7 @@ init(T) -> %% i/1 %% A transport process. -i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) +i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid}) when T == accept; T == connect -> monitor(process, Pid), @@ -218,7 +218,7 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) ?DEFAULT_FRAGMENT_TIMEOUT), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), - Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid), + Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid), M = if SslOpts -> ssl; true -> Mod end, monitor(process, MPid), MPid ! {start, self(), Sock, M}, %% prepare monitor for sending @@ -275,19 +275,19 @@ ssl_opts(T) -> %% init/8 %% Establish a TLS connection before capabilities exchange ... -init(Type, Ref, Mod, Pid, true, Opts, Addrs, SPid) -> - init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SPid); +init(Type, Ref, Mod, Pid, true, Opts, Addrs, SvcPid) -> + init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SvcPid); %% ... or not. -init(Type, Ref, Mod, Pid, _, Opts, Addrs, SPid) -> - init(Type, Ref, Mod, Pid, Opts, Addrs, SPid). +init(Type, Ref, Mod, Pid, _, Opts, Addrs, SvcPid) -> + init(Type, Ref, Mod, Pid, Opts, Addrs, SvcPid). %% init/7 -init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) -> +init(accept = T, Ref, Mod, Pid, Opts, Addrs, SvcPid) -> {[Matches], Rest} = proplists:split(Opts, [accept]), {ok, LPid, {LAddr, LSock}} = listener(Ref, {Mod, Rest, Addrs}), - ok = gen_server:call(LPid, {accept, SPid}, infinity), + ok = gen_server:call(LPid, {accept, SvcPid}, infinity), proc_lib:init_ack({ok, self(), [LAddr]}), Sock = ok(accept(Mod, LSock)), ok = accept_peer(Mod, Sock, accept(Matches)), @@ -295,7 +295,7 @@ init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) -> diameter_peer:up(Pid), Sock; -init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SPid) -> +init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SvcPid) -> {[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]), LAddrOpt = get_addr(LA, Addrs), RAddr = get_addr(RA), @@ -447,10 +447,10 @@ portnr(Sock) -> %% # handle_call/3 %% --------------------------------------------------------------------------- -handle_call({accept, SPid}, _From, #listener{service = P} = S) -> - {reply, ok, if not is_pid(P), is_pid(SPid) -> - monitor(process, SPid), - S#listener{service = SPid}; +handle_call({accept, SvcPid}, _From, #listener{service = P} = S) -> + {reply, ok, if not is_pid(P), is_pid(SvcPid) -> + monitor(process, SvcPid), + S#listener{service = SvcPid}; true -> S end}; -- cgit v1.2.3 From eadf4efc7e264fe8dd30befb42a42a02cdef58f1 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 10 Jun 2017 23:40:53 +0200 Subject: Make diameter_{tcp,sctp} sender configurable With sends still from the receiving process by default, since changing the default behaviour may well have negative effects. A separate sender probably implies a greater need for some form of load regulation for one, since a blocking send would no longer imply that incoming messages are no longer recevied. Dealing with this could result in the same deadlock that the sending process intends to avoid, but the user should be in control over how/when incoming traffic is regulated. --- lib/diameter/src/transport/diameter_sctp.erl | 59 +++++++++++++------- lib/diameter/src/transport/diameter_tcp.erl | 81 +++++++++++++++++----------- 2 files changed, 90 insertions(+), 50 deletions(-) diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index d23e56b413..3919596cb1 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -68,6 +68,7 @@ -type connect_option() :: {raddr, inet:ip_address()} | {rport, inet:port_number()} + | option() | term(). %% gen_sctp:open_option(). -type match() :: inet:ip_address() @@ -75,8 +76,12 @@ | [match()]. -type listen_option() :: {accept, match()} + | option() | term(). %% gen_sctp:open_option(). +-type option() :: {sender, boolean()} + | sender. + -type uint() :: non_neg_integer(). %% Accepting/connecting transport process state. @@ -96,7 +101,7 @@ streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, os = 0 :: uint(), %% next output stream - monitor :: pid() | undefined}). %% sending process + send = false :: pid() | boolean()}). %% sending process %% Monitor process state. -record(monitor, @@ -110,7 +115,8 @@ socket :: gen_sctp:sctp_socket(), service :: pid(), %% service process pending = {0, queue:new()}, - accept :: [match()]}). + accept :: [match()], + sender :: boolean()}). %% Field pending implements two queues: the first of transport-to-be %% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived @@ -236,7 +242,8 @@ i(#monitor{transport = TPid} = S) -> i({listen, Ref, {Opts, SvcPid, Addrs}}) -> monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {[Matches], Rest} = proplists:split(Opts, [accept]), + {Split, Rest} = proplists:split(Opts, [accept, sender]), + OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), @@ -244,12 +251,14 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) -> #listener{ref = Ref, service = SvcPid, socket = Sock, - accept = [[M] || {accept, M} <- Matches]}; + accept = [[M] || {accept, M} <- OwnOpts], + sender = proplists:get_value(sender, OwnOpts, false)}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> - {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]), - RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As], + {[Ps | Split], Rest} = proplists:split(Opts, [rport, raddr, sender]), + OwnOpts = lists:append(Split), + RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts], [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], {LAs, Sock} = open(Addrs, Rest, 0), putr(?REF_KEY, Ref), @@ -257,7 +266,8 @@ i({connect, Pid, Opts, Addrs, Ref}) -> monitor(process, Pid), #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, - socket = Sock}; + socket = Sock, + send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an %% association. @@ -291,11 +301,12 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> receive {Ref, Pid} when K == parent -> %% transport process started S#transport{parent = Pid}; - {K, T, Matches} when K == peeloff -> %% association + {K, T, Matches, Bool} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), - t(T, S#transport{socket = Sock}); + t(T, S#transport{socket = Sock, + send = Bool}); accept_timeout = T -> x(T); {'DOWN', _, process, _, _} = T -> @@ -466,11 +477,12 @@ getr(Key) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, - accept = Matches} + accept = Matches, + sender = Sender} = S) -> Id = assoc_id(Data), {TPid, NewS} = accept(S), - TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches}, + TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches, Sender}, setopts(Sock), NewS; @@ -546,14 +558,15 @@ transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> %% Parent process has died: call the monitor to not close the socket %% during an ongoing send, but don't let it take forever. transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, - monitor = MPid}) -> - undefined == MPid + send = MPid}) -> + is_boolean(MPid) orelse ok == (catch gen_server:call(MPid, {stop, Pid})) orelse exit(MPid, kill), stop; %% Monitor process has died. -transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> +transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) + when is_pid(MPid) -> stop; %% Timeout after transport process has been started. @@ -617,7 +630,7 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% send/2 %% Start monitor process on first send. -send(Msg, #transport{monitor = undefined, +send(Msg, #transport{send = true, socket = Sock, assoc_id = AId} = S) -> @@ -625,7 +638,7 @@ send(Msg, #transport{monitor = undefined, socket = Sock, assoc_id = AId}), monitor(process, MPid), - send(Msg, S#transport{monitor = MPid}); + send(Msg, S#transport{send = MPid}); %% Outbound Diameter message on a specified stream ... send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, @@ -646,13 +659,23 @@ send(Bin, #transport{streams = {_, OS}, %% send/3 -send(StreamId, Bin, #transport{monitor = MPid}) -> +send(StreamId, Bin, #transport{send = false, + socket = Sock, + assoc_id = AId}) -> + send(Sock, AId, StreamId, Bin); + +send(StreamId, Bin, #transport{send = MPid}) -> MPid ! {Bin, StreamId}, MPid; send(StreamId, Bin, #monitor{socket = Sock, assoc_id = AId}) -> - case gen_sctp:send(Sock, AId, StreamId, Bin) of + send(Sock, AId, StreamId, Bin). + +%% send/4 + +send(Sock, AssocId, StreamId, Bin) -> + case gen_sctp:send(Sock, AssocId, StreamId, Bin) of ok -> ok; {error, Reason} -> diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 427b2395b9..edbbec1709 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -69,16 +69,16 @@ %% of processes: an actual transport process, one that will club it to %% death should the parent die before a connection is established, and %% a process owning the listening port. The monitor process -%% historically died after connection establishment, but now lives on -%% as the sender of outgoing messages, so that a blocking send doesn't -%% prevent messages from being received. +%% historically died after connection establishment, but can now live +%% on as the sender of outgoing messages, so that a blocking send +%% doesn't prevent messages from being received. %% Listener process state. -record(listener, {socket :: inet:socket(), module :: module(), service = false :: false | pid()}). %% service process -%% Monitor process state. The name monitor predates its role as sender. +%% Monitor process state. -record(monitor, {parent :: reference() | false | pid(), transport = self() :: pid(), @@ -108,6 +108,7 @@ | gen_tcp:listen_option(). -type option() :: {port, non_neg_integer()} + | {sender, boolean()} | {fragment_timer, 0..16#FFFFFFFF}. %% Accepting/connecting transport process state. @@ -120,7 +121,7 @@ timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? - monitor :: pid()}). %% monitor/sender process + send :: pid() | false}). %% sending process %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -210,25 +211,27 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid}) %% that kills us with the parent until call returns, and then %% sends outgoing messages. {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, + sender, fragment_timer]), SslOpts = ssl_opts(SO), OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), + Sender = proplists:get_value(sender, OwnOpts, false), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid), M = if SslOpts -> ssl; true -> Mod end, - monitor(process, MPid), - MPid ! {start, self(), Sock, M}, %% prepare monitor for sending + Sender andalso monitor(process, MPid), + MPid ! {start, self(), Sender andalso {Sock, M}}, %% prepare for sending putr(?REF_KEY, Ref), setopts(#transport{parent = Pid, module = M, socket = Sock, ssl = SslOpts, timeout = Tmo, - monitor = MPid}); + send = Sender andalso MPid}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -515,15 +518,22 @@ m(Bin, S) send(Bin, S), S; -%% Transport is telling us to be ready to send. Stop monitoring on the +%% Transport has established a connection. Stop monitoring on the %% parent so as not to die before a send from the transport. -m({start, TPid, Sock, Mod}, #monitor{parent = MRef, - transport = TPid} - = S) -> - demonitor(MRef, [flush]), - S#monitor{parent = false, - socket = Sock, - module = Mod}; +m({start, TPid, T} = M, #monitor{transport = TPid} = S) -> + case T of + {Sock, Mod} -> + demonitor(S#monitor.parent, [flush]), + S#monitor{parent = false, + socket = Sock, + module = Mod}; + false -> %% monitor not sending + x(M) + end; + +%% Transport is telling us to die. +m({stop, TPid} = T, #monitor{transport = TPid}) -> + x(T); %% Transport is telling us to die. m({stop, TPid} = T, #monitor{transport = TPid}) -> @@ -632,13 +642,15 @@ transition({resolve_port, Pid}, #transport{socket = Sock, %% Parent process has died: call the monitor to not close the socket %% during an ongoing send, but don't let it take forever. transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, - monitor = MPid}) -> - ok == (catch gen_server:call(MPid, {stop, Pid})) - orelse exit(MPid, kill), + send = MPid}) -> + false == MPid + orelse (ok == gen_server:call(MPid, {stop, self()}, 1000)) + orelse exit(MPid, {shutdown, parent}), stop; %% Monitor process has died. -transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> +transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) + when is_pid(MPid) -> stop. %% Crash on anything unexpected. @@ -663,12 +675,12 @@ tls_handshake(_, true, #transport{ssl = false}) -> tls_handshake(Type, true, #transport{socket = Sock, module = M, ssl = Opts, - monitor = MPid} + send = MPid} = S) -> {ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]), Ref = getr(?REF_KEY), true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}), - MPid ! {tls, SSock}, %% tell the monitor process + false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process S#transport{socket = SSock, module = ssl}; @@ -827,29 +839,34 @@ connect(Mod, Host, Port, Opts) -> %% send/2 +send(false, #transport{}) -> %% ack + ok; + send(#diameter_packet{bin = Bin}, S) -> send(Bin, S); -send(Bin, #monitor{} = S) -> - send1(Bin, S); +send(Bin, #transport{socket = Sock, module = M, send = false}) -> + send1(M, Sock, Bin); + +send(Bin, #monitor{socket = Sock, module = M}) -> + send1(M, Sock, Bin); %% Send from the monitor process to avoid deadlock if both the %% receiver and the peer were to block in send. -send(Bin, #transport{monitor = MPid}) -> - MPid ! Bin, - MPid. +send(Bin, #transport{send = Pid}) -> + Pid ! Bin, + ok. -%% send1/2 +%% send1/3 -send1(Bin, #monitor{socket = Sock, - module = M}) -> - case send(M, Sock, Bin) of +send1(Mod, Sock, Bin) -> + case send(Mod, Sock, Bin) of ok -> ok; {error, Reason} -> x({send, Reason}) end. - + %% send/3 send(gen_tcp, Sock, Bin) -> -- cgit v1.2.3 From 636a719927b23751c12563b8e137ea8698e2abd5 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 10 Jun 2017 23:15:37 +0200 Subject: Add diameter_tcp send/recv callbacks From the receiver process, that can return binaries to send/receive and stop the transport process from reading on the socket. This is still undocumented, and may change. --- lib/diameter/src/transport/diameter_tcp.erl | 156 +++++++++++++++++++++----- lib/diameter/test/diameter_watchdog_SUITE.erl | 112 +++++++----------- 2 files changed, 165 insertions(+), 103 deletions(-) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index edbbec1709..5819d52bdc 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -82,6 +82,7 @@ -record(monitor, {parent :: reference() | false | pid(), transport = self() :: pid(), + ack = false :: boolean(), socket :: inet:socket() | ssl:sslsocket() | undefined, module :: module() | undefined}). @@ -109,11 +110,15 @@ -type option() :: {port, non_neg_integer()} | {sender, boolean()} + | sender + | {message_cb, false | diameter:evaluable()} | {fragment_timer, 0..16#FFFFFFFF}. %% Accepting/connecting transport process state. -record(transport, {socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket + active = false :: boolean(), %% is socket active? + recv = true :: boolean(), %% should it be active? parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module ssl :: [term()] | boolean(), %% ssl options, ssl or not @@ -121,7 +126,8 @@ timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? - send :: pid() | false}). %% sending process + message_cb :: false | diameter:evaluable(), + send :: pid() | false}). %% sending process %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -212,24 +218,28 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid}) %% sends outgoing messages. {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, sender, + message_cb, fragment_timer]), SslOpts = ssl_opts(SO), OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), - Sender = proplists:get_value(sender, OwnOpts, false), + [CB, Sender] = [proplists:get_value(K, OwnOpts, false) + || K <- [message_cb, sender]], ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid), M = if SslOpts -> ssl; true -> Mod end, Sender andalso monitor(process, MPid), - MPid ! {start, self(), Sender andalso {Sock, M}}, %% prepare for sending + false == CB orelse (Pid ! {diameter, ack}), + MPid ! {start, self(), Sender andalso {Sock, M}, false /= CB}, putr(?REF_KEY, Ref), setopts(#transport{parent = Pid, module = M, socket = Sock, ssl = SslOpts, + message_cb = CB, timeout = Tmo, send = Sender andalso MPid}); %% Put the reference in the process dictionary since we now use it @@ -520,13 +530,14 @@ m(Bin, S) %% Transport has established a connection. Stop monitoring on the %% parent so as not to die before a send from the transport. -m({start, TPid, T} = M, #monitor{transport = TPid} = S) -> +m({start, TPid, T, Ack} = M, #monitor{transport = TPid} = S) -> case T of {Sock, Mod} -> demonitor(S#monitor.parent, [flush]), S#monitor{parent = false, socket = Sock, - module = Mod}; + module = Mod, + ack = Ack}; false -> %% monitor not sending x(M) end; @@ -591,7 +602,7 @@ transition({P, Sock, Bin}, #transport{socket = Sock, = S) when P == ssl, true == B; P == tcp -> - recv(Bin, S); + recv(Bin, S#transport{active = false}); %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} @@ -618,8 +629,16 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, %% Outgoing message. transition({diameter, {send, Msg}}, #transport{} = S) -> - send(Msg, S), - ok; + message(send, Msg, S); + +%% Monitor has sent an outgoing message. +transition(Bin, S) + when is_binary(Bin) -> + message(ack, Bin, S); + +%% Deferred actions from a message_cb. +transition({actions, Dir, Acts}, S) -> + actions(Acts, Dir, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, @@ -699,12 +718,11 @@ tls(accept, Sock, Opts) -> %% using Nagle. %% Receive packets until a full message is received, -recv(Bin, #transport{parent = Pid, frag = Head} = S) -> +recv(Bin, #transport{frag = Head} = S) -> case rcv(Head, Bin) of {Msg, B} -> %% have a complete message ... - diameter_peer:recv(Pid, Msg), - recv(<<>>, S#transport{frag = B}); - Frag -> %% read more on the socket + message(recv, Msg, S#transport{frag = B}); + Frag -> %% read more on the socket start_fragment_timer(setopts(S#transport{frag = Frag, flush = false})) end. @@ -804,9 +822,8 @@ flush(#transport{flush = false} = S) -> start_fragment_timer(S#transport{flush = true}); %% No messages since last expiry. -flush(#transport{frag = Frag, parent = Pid} = S) -> - diameter_peer:recv(Pid, bin(Frag)), - S#transport{frag = <<>>}. +flush(#transport{frag = Frag} = S) -> + message(recv, bin(Frag), S#transport{frag = <<>>}). %% start_fragment_timer/1 %% @@ -839,23 +856,19 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(false, #transport{}) -> %% ack - ok; - -send(#diameter_packet{bin = Bin}, S) -> - send(Bin, S); - -send(Bin, #transport{socket = Sock, module = M, send = false}) -> - send1(M, Sock, Bin); +send(Bin, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) -> + send1(M, Sock, Bin), + B andalso (TPid ! Bin); -send(Bin, #monitor{socket = Sock, module = M}) -> - send1(M, Sock, Bin); +send(Bin, #transport{socket = Sock, module = M, send = false} = S) -> + send1(M, Sock, Bin), + message(ack, Bin, S); %% Send from the monitor process to avoid deadlock if both the %% receiver and the peer were to block in send. -send(Bin, #transport{send = Pid}) -> +send(Bin, #transport{send = Pid} = S) -> Pid ! Bin, - ok. + S. %% send1/3 @@ -866,7 +879,7 @@ send1(Mod, Sock, Bin) -> {error, Reason} -> x({send, Reason}) end. - + %% send/3 send(gen_tcp, Sock, Bin) -> @@ -888,12 +901,18 @@ setopts(M, Sock, Opts) -> %% setopts/1 setopts(#transport{socket = Sock, + active = A, + recv = B, module = M} - = S)-> + = S) + when B, not A -> case setopts(M, Sock, [{active, once}]) of - ok -> S; + ok -> S#transport{active = true}; X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect - end. + end; + +setopts(S) -> + S. %% portnr/2 @@ -928,3 +947,78 @@ getstat(gen_tcp, Sock) -> getstat(M, Sock) -> M:getstat(Sock). %% Note that ssl:getstat/1 doesn't yet exist in R15B01. + +%% A message_cb is invoked whenever a message is sent or received, or +%% to provide acknowledgement of a completed send or discarded +%% request. Ignoring possible extra arguments, calls are of the +%% following form. +%% +%% cb(recv, Bin) Pass a received message into diameter? +%% cb(send, Bin) Send a message? +%% cb(ack, Bin) Acknowledgement of a completed send. +%% cb(ack, false) Acknowledgement of a discarded request. +%% +%% Callbacks return a list of the following form. +%% +%% [boolean() | send | recv | binary()] +%% +%% The atoms are meaningless by themselves, but say whether subsequent +%% binaries are to be sent or received. A boolean says whether or not +%% to continue reading on the socket. Messages can be received even +%% after false is returned if these arrived in the same packet. A +%% leading recv or send is implicit on the corresponding callbacks. A +%% new callback can be returned as the tail of a returned list: any +%% value not of the aforementioned list type is interpreted as a +%% callback. + +%% message/3 + +message(send, false = M, S) -> + message(ack, M, S); + +message(ack, _, #transport{message_cb = false} = S) -> + S; + +message(Dir, #diameter_packet{bin = Bin}, S) -> + message(Dir, Bin, S); + +message(Dir, Bin, #transport{message_cb = CB} = S) -> + recv(<<>>, actions(cb(CB, Dir, Bin), Dir, S)). + +%% actions/3 + +actions([], _, S) -> + S; + +actions([B | As], Dir, S) + when is_boolean(B) -> + actions(As, Dir, S#transport{recv = B}); + +actions([Dir | As], _, S) + when Dir == send; + Dir == recv -> + actions(As, Dir, S); + +actions([Bin | As], send = Dir, #transport{} = S) + when is_binary(Bin) -> + actions(As, Dir, send(Bin, S)); + +actions([Bin | As], recv = Dir, #transport{parent = Pid} = S) + when is_binary(Bin) -> + diameter_peer:recv(Pid, Bin), + actions(As, Dir, S); + +actions([{defer, Tmo, Acts} | As], Dir, S) -> + erlang:send_after(Tmo, self(), {actions, Dir, Acts}), + actions(As, Dir, S); + +actions(CB, _, S) -> + S#transport{message_cb = CB}. + +%% cb/3 + +cb(false, _, Bin) -> + [Bin]; + +cb(CB, Dir, Msg) -> + diameter_lib:eval([CB, Dir, Msg]). diff --git a/lib/diameter/test/diameter_watchdog_SUITE.erl b/lib/diameter/test/diameter_watchdog_SUITE.erl index 5ae951f7c2..39c4f051a5 100644 --- a/lib/diameter/test/diameter_watchdog_SUITE.erl +++ b/lib/diameter/test/diameter_watchdog_SUITE.erl @@ -44,13 +44,8 @@ -export([peer_up/3, peer_down/3]). -%% gen_tcp-ish interface --export([listen/2, - accept/1, - connect/3, - send/2, - setopts/2, - close/1]). +%% diameter_tcp message_cb +-export([message/3]). -include("diameter.hrl"). -include("diameter_ct.hrl"). @@ -161,9 +156,9 @@ reopen(Type, Test, Ref, Wd, N, M) -> reopen(Type, Test, SvcName, TRef, Wd, N, M). cfg(Type, Type, Wd) -> - {Wd, [], []}; + {Wd, [], false}; cfg(_Type, _Test, _Wd) -> - {?WD(?PEER_WD), [{okay, 0}], [{module, ?MODULE}]}. + {?WD(?PEER_WD), [{okay, 0}], true}. %% reopen/7 @@ -346,7 +341,7 @@ recv_reopen(listen, Ref) -> %% reg/3 %% %% Lookup the pid of the transport process and publish a term for -%% send/2 to lookup. +%% message/3 to lookup. reg(TRef, SvcName, T) -> TPid = tpid(TRef, diameter:service_info(SvcName, transport)), true = diameter_reg:add_new({?MODULE, TPid, T}). @@ -394,7 +389,7 @@ suspect(_) -> suspect(Type, Fake, Ref, N) when is_reference(Ref) -> {SvcName, TRef} - = start(Type, Ref, {?WD(10000), [{suspect, N}], mod(Fake)}), + = start(Type, Ref, {?WD(10000), [{suspect, N}], Fake}), {initial, okay} = ?WD_EVENT(TRef), suspect(TRef, Fake, SvcName, N); @@ -436,11 +431,6 @@ abuse([F|A], Test) -> abuse(F, Test) -> abuse([F], Test). -mod(true) -> - [{module, ?MODULE}]; -mod(false) -> - []. - %% =========================================================================== %% # okay/1 %% =========================================================================== @@ -456,7 +446,7 @@ okay(Type, Fake, Ref, N) {SvcName, TRef} = start(Type, Ref, {?WD(10000), [{okay, choose(Fake, 0, N)}], - mod(Fake)}), + Fake}), {initial, okay} = ?WD_EVENT(TRef), okay(TRef, Fake, @@ -515,12 +505,17 @@ start(Type, Ref, T) -> true = diameter_reg:add_new({Type, Ref, Name}), {Name, TRef}. -opts(Type, Ref, {Timer, Config, Mod}) -> +opts(Type, Ref, {Timer, Config, Fake}) + when is_boolean(Fake) -> [{transport_module, diameter_tcp}, - {transport_config, Mod ++ [{ip, ?ADDR}, {port, 0}] ++ cfg(Type, Ref)}, + {transport_config, mod(Fake) ++ [{ip, ?ADDR}, {port, 0}] + ++ cfg(Type, Ref)}, {watchdog_timer, Timer}, {watchdog_config, Config}]. +mod(B) -> + [{message_cb, [fun message/3, capx]} || B]. + cfg(listen, _) -> []; cfg(connect, Ref) -> @@ -531,37 +526,29 @@ cfg(connect, Ref) -> %% =========================================================================== -listen(PortNr, Opts) -> - gen_tcp:listen(PortNr, Opts). - -accept(LSock) -> - gen_tcp:accept(LSock). - -connect(Addr, Port, Opts) -> - gen_tcp:connect(Addr, Port, Opts). +%% message/3 -setopts(Sock, Opts) -> - inet:setopts(Sock, Opts). +message(send, Bin, X) -> + send(Bin, X); -send(Sock, Bin) -> - send(getr(config), Sock, Bin). +message(recv, Bin, _) -> + [Bin]; -close(Sock) -> - gen_tcp:close(Sock). +message(_, _, _) -> + []. -%% send/3 +%% send/2 %% First outgoing message from a new transport process is CER/CEA. %% Remaining outgoing messages are either DWR or DWA. -send(undefined, Sock, Bin) -> - <<_:32, _:8, 257:24, _/binary>> = Bin, - putr(config, init), - gen_tcp:send(Sock, Bin); +send(Bin, capx) -> + <<_:32, _:8, 257:24, _/binary>> = Bin, %% assert on CER/CEA + [Bin, fun message/3, init]; %% Outgoing DWR: fake reception of DWA. Use the fact that AVP values %% are ignored. This is to ensure that the peer's watchdog state %% transitions are only induced by responses to messages it sends. -send(_, Sock, <<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>) -> +send(<<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>, _) -> Pkt = #diameter_packet{header = #diameter_header{version = 1, end_to_end_id = EId, hop_by_hop_id = HId}, @@ -569,55 +556,36 @@ send(_, Sock, <<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>) -> {'Origin-Host', "XXX"}, {'Origin-Realm', ?REALM}]}, #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt), - tpid(Sock) ! {tcp, Sock, Bin}, - ok; + [recv, Bin]; %% First outgoing DWA. -send(init, Sock, Bin) -> - [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, tpid(Sock), '_'}), - putr(config, T), - send(Sock, Bin); +send(Bin, init) -> + [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, self(), '_'}), + send(Bin, T); %% First transport process. -send({SvcName, {_,_,_} = T}, Sock, Bin) -> +send(Bin, {SvcName, {_,_,_} = T}) -> [{'Origin-Host', _} = OH, {'Origin-Realm', _} = OR | _] = ?SERVICE(SvcName), putr(origin, [OH, OR]), - putr(config, T), - send(Sock, Bin); + send(Bin, T); %% Discard DWA, failback after another timeout in the peer. -send({Wd, 0 = No, Msg}, Sock, Bin) -> +send(Bin, {Wd, 0 = No, Msg}) -> Origin = getr(origin), - spawn(fun() -> failback(?ONE_WD(Wd), Msg, Sock, Bin, Origin) end), - putr(config, No), - ok; + [{defer, ?ONE_WD(Wd), [msg(Msg, Bin, Origin)]}, fun message/3, No]; %% Send DWA while we're in the mood (aka 0 < N). -send({Wd, N, Msg}, Sock, Bin) -> - putr(config, {Wd, N-1, Msg}), - gen_tcp:send(Sock, Bin); +send(Bin, {Wd, N, Msg}) -> + [Bin, fun message/3, {Wd, N-1, Msg}]; %% Discard DWA. -send(0, _Sock, _Bin) -> - ok; +send(_Bin, 0 = No) -> + [fun message/3, No]; %% Send DWA. -send(N, Sock, <<_:32, 0:1, _:7, 280:24, _/binary>> = Bin) -> - putr(config, N-1), - gen_tcp:send(Sock, Bin). - -%% tpid/1 - -tpid(Sock) -> - {connected, Pid} = erlang:port_info(Sock, connected), - Pid. - -%%failback/5 - -failback(Tmo, Msg, Sock, Bin, Origin) -> - timer:sleep(Tmo), - ok = gen_tcp:send(Sock, msg(Msg, Bin, Origin)). +send(<<_:32, 0:1, _:7, 280:24, _/binary>> = DWA, N) -> + [DWA, fun message/3, N-1]. %% msg/2 -- cgit v1.2.3 From ff367b200930d5107666e0a059d09e3218740567 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Tue, 21 Mar 2017 13:01:15 +0100 Subject: Remove upgrade from diameter_traffic --- lib/diameter/src/base/diameter_traffic.erl | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index 96f3a307f9..ccfab22e9c 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -54,9 +54,6 @@ -define(RELAY, ?DIAMETER_DICT_RELAY). -define(BASE, ?DIAMETER_DICT_COMMON). %% Note: the RFC 3588 dictionary --define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests --define(DEFAULT_SPAWN_OPTS, []). - %% Table containing outgoing entries that live and die with %% peer_up/down. The name is historic, since the table used to contain %% information about outgoing requests for which an answer has yet to @@ -67,7 +64,7 @@ -record(options, {filter = none :: diameter:peer_filter(), extra = [] :: list(), - timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF, + timeout = 5000 :: 0..16#FFFFFFFF, %% for outgoing requests detach = false :: boolean()}). %% Term passed back to receive_message/6 with every incoming message. @@ -211,11 +208,13 @@ incr_rc(Dir, Pkt, TPid, Dict0) -> %% Handle an incoming Diameter message in a watchdog process. %% --------------------------------------------------------------------------- --spec receive_message(pid(), Route, #diameter_packet{}, module(), #recvdata{}) +-spec receive_message(pid(), Route, #diameter_packet{}, module(), RecvData) -> pid() | boolean() when Route :: {Handler, RequestRef, Seqs} | Ack, + RecvData :: {[SpawnOpt], #recvdata{}}, + SpawnOpt :: term(), Handler :: pid(), RequestRef :: reference(), Seqs :: {0..16#FFFFFFFF, 0..16#FFFFFFFF}, @@ -255,11 +254,6 @@ recv(false, false, TPid, Pkt, _, _) -> %% spawn_request/5 spawn_request(Ack, TPid, Pkt, Dict0, {Opts, RecvData}) -> - spawn_request(Ack, TPid, Pkt, Dict0, Opts, RecvData); -spawn_request(Ack, TPid, Pkt, Dict0, RecvData) -> - spawn_request(Ack, TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData). - -spawn_request(Ack, TPid, Pkt, Dict0, Opts, RecvData) -> spawn_opt(fun() -> recv_request(Ack, TPid, Pkt, Dict0, RecvData) end, -- cgit v1.2.3 From 04cb786904558d9e9c64ccbb91cc15c49129e336 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 10 Jun 2017 05:51:34 +0200 Subject: Exercise diameter_{tcp,sctp} sender in traffic suite --- lib/diameter/test/diameter_traffic_SUITE.erl | 34 ++++++++++++++++++++-------- lib/diameter/test/diameter_util.erl | 19 ++++++++++++---- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index 7fd13b0536..d7df1f217b 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -152,16 +152,21 @@ %% Which transport protocol to use. -define(TRANSPORTS, [tcp, sctp]). +%% Send from a dedicated process? +-define(SENDERS, [true, false]). + -record(group, {transport, client_service, client_encoding, client_dict0, client_strings, + client_sender, server_service, server_encoding, server_container, - server_strings}). + server_strings, + server_sender}). %% Not really what we should be setting unless the message is sent in %% the common application but diameter doesn't care. @@ -250,7 +255,7 @@ all() -> groups() -> [{P, [P], Ts} || Ts <- [tc()], P <- [shuffle, parallel]] ++ - [{?util:name([T,R,D,A,C,SD,CD]), + [{?util:name([T,R,D,A,C,SD,SS,CD,CS]), [], [{group, if SD orelse CD -> shuffle; true -> parallel end}]} || T <- ?TRANSPORTS, @@ -259,15 +264,20 @@ groups() -> A <- ?ENCODINGS, C <- ?CONTAINERS, SD <- ?STRING_DECODES, - CD <- ?STRING_DECODES] + SS <- ?SENDERS, + CD <- ?STRING_DECODES, + CS <- ?SENDERS] ++ - [{T, [], [{group, ?util:name([T,R,D,A,C,SD,CD])} + [{T, [], [{group, ?util:name([T,R,D,A,C,SD,SS,CD,CS])} || R <- ?ENCODINGS, D <- ?RFCS, A <- ?ENCODINGS, C <- ?CONTAINERS, SD <- ?STRING_DECODES, - CD <- ?STRING_DECODES]} + SS <- ?SENDERS, + CD <- ?STRING_DECODES, + CS <- ?SENDERS, + SS orelse CS]} %% avoid deadlock || T <- ?TRANSPORTS] ++ [{traffic, [], [{group, T} || T <- ?TRANSPORTS]}]. @@ -298,16 +308,18 @@ init_per_group(sctp = Name, Config) -> init_per_group(Name, Config) -> case ?util:name(Name) of - [T,R,D,A,C,SD,CD] -> + [T,R,D,A,C,SD,SS,CD,CS] -> G = #group{transport = T, client_service = [$C|?util:unique_string()], client_encoding = R, client_dict0 = dict0(D), client_strings = CD, + client_sender = CS, server_service = [$S|?util:unique_string()], server_encoding = A, server_container = C, - server_strings = SD}, + server_strings = SD, + server_sender = SS}, [{group, G} | Config]; _ -> Config @@ -417,16 +429,18 @@ start_services(Config) -> add_transports(Config) -> #group{transport = T, client_service = CN, - server_service = SN} + client_sender = CS, + server_service = SN, + server_sender = SS} = group(Config), LRef = ?util:listen(SN, - T, + [T, {sender, SS}], [{capabilities_cb, fun capx/2}, {pool_size, 8}, {spawn_opt, [{min_heap_size, 8096}]}, {applications, apps(rfc3588)}]), Cs = [?util:connect(CN, - T, + [T, {sender, CS}], LRef, [{id, Id}, {capabilities, [{'Origin-State-Id', origin(Id)}]}, diff --git a/lib/diameter/test/diameter_util.erl b/lib/diameter/test/diameter_util.erl index 8df5c907d0..81b43913de 100644 --- a/lib/diameter/test/diameter_util.erl +++ b/lib/diameter/test/diameter_util.erl @@ -309,17 +309,23 @@ listen(SvcName, Prot, Opts) -> connect(Client, Prot, LRef) -> connect(Client, Prot, LRef, []). -connect(Client, Prot, LRef, Opts) -> +connect(Client, ProtOpts, LRef, Opts) -> + Prot = head(ProtOpts), [PortNr] = lport(Prot, LRef), Client = diameter:service_info(Client, name), %% assert true = diameter:subscribe(Client), - Ref = add_transport(Client, {connect, opts(Prot, PortNr) ++ Opts}), + Ref = add_transport(Client, {connect, opts(ProtOpts, PortNr) ++ Opts}), true = transport(Client, Ref), %% assert diameter_lib:for_n(fun(_) -> ok = up(Client, Ref, Prot, PortNr) end, proplists:get_value(pool_size, Opts, 1)), Ref. +head([T|_]) -> + T; +head(T) -> + T. + up(Client, Ref, Prot, PortNr) -> receive {diameter_event, Client, {up, Ref, _, _, _}} -> ok @@ -362,11 +368,14 @@ tmod(sctp) -> tmod(any) -> [diameter_sctp, diameter_tcp]. -opts(Prot, T) -> - tmo(T, lists:append([[{transport_module, M}, {transport_config, C}] +opts([Prot | Opts], T) -> + tmo(T, lists:append([[{transport_module, M}, {transport_config, C ++ Opts}] || M <- tmod(Prot), C <- [buf(M,T) ++ [{ip, addr(M)}, {port, 0}] - ++ remote(M,T)]])). + ++ remote(M,T)]])); + +opts(Prot, T) -> + opts([Prot], T). tmo(listen, Opts) -> Opts; -- cgit v1.2.3 From 090898729237d33b0a6968ece9f11741cde7c27b Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 10 Jun 2017 08:08:30 +0200 Subject: Exercise diameter_tcp message callbacks in traffic suite --- lib/diameter/test/diameter_traffic_SUITE.erl | 88 ++++++++++++++++++++++------ 1 file changed, 69 insertions(+), 19 deletions(-) diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index d7df1f217b..6aa3ffb7af 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -108,6 +108,9 @@ handle_error/6, handle_request/3]). +%% diameter_tcp callbacks +-export([message/3]). + -include("diameter.hrl"). -include("diameter_gen_base_rfc3588.hrl"). -include("diameter_gen_base_accounting.hrl"). @@ -147,7 +150,7 @@ %% Whether to decode stringish Diameter types to strings, or leave %% them as binary. --define(STRING_DECODES, [true, false]). +-define(STRING_DECODES, [false, true]). %% Which transport protocol to use. -define(TRANSPORTS, [tcp, sctp]). @@ -155,6 +158,9 @@ %% Send from a dedicated process? -define(SENDERS, [true, false]). +%% Message callbacks from diameter_tcp? +-define(CALLBACKS, [true, false]). + -record(group, {transport, client_service, @@ -166,7 +172,8 @@ server_encoding, server_container, server_strings, - server_sender}). + server_sender, + server_throttle}). %% Not really what we should be setting unless the message is sent in %% the common application but diameter doesn't care. @@ -253,9 +260,9 @@ all() -> [start, result_codes, {group, traffic}, outstanding, empty, stop]. groups() -> - [{P, [P], Ts} || Ts <- [tc()], P <- [shuffle, parallel]] + [{P, [P], Ts} || Ts <- [tc(tc())], P <- [shuffle, parallel]] ++ - [{?util:name([T,R,D,A,C,SD,SS,CD,CS]), + [{?util:name([T,R,D,A,C,SD,SS,ST,CD,CS]), [], [{group, if SD orelse CD -> shuffle; true -> parallel end}]} || T <- ?TRANSPORTS, @@ -265,23 +272,36 @@ groups() -> C <- ?CONTAINERS, SD <- ?STRING_DECODES, SS <- ?SENDERS, + ST <- ?CALLBACKS, CD <- ?STRING_DECODES, CS <- ?SENDERS] ++ - [{T, [], [{group, ?util:name([T,R,D,A,C,SD,SS,CD,CS])} - || R <- ?ENCODINGS, - D <- ?RFCS, - A <- ?ENCODINGS, - C <- ?CONTAINERS, - SD <- ?STRING_DECODES, - SS <- ?SENDERS, - CD <- ?STRING_DECODES, - CS <- ?SENDERS, - SS orelse CS]} %% avoid deadlock + [{T, [], groups([[T,R,D,A,C,SD,SS,ST,CD,CS] + || R <- ?ENCODINGS, + D <- ?RFCS, + A <- ?ENCODINGS, + C <- ?CONTAINERS, + SD <- ?STRING_DECODES, + SS <- ?SENDERS, + ST <- ?CALLBACKS, + CD <- ?STRING_DECODES, + CS <- ?SENDERS, + SS orelse CS])} %% avoid deadlock || T <- ?TRANSPORTS] ++ [{traffic, [], [{group, T} || T <- ?TRANSPORTS]}]. +%groups(_) -> %% debug +% Name = [sctp,record,rfc6733,record,pkt,false,false,false,false,false], +% [{group, ?util:name(Name)}]; +groups(Names) -> + [{group, ?util:name(L)} || L <- Names]. + +%tc([N|_]) -> %% debug +% [N]; +tc(L) -> + L. + %% -------------------- init_per_suite(Config) -> @@ -308,7 +328,7 @@ init_per_group(sctp = Name, Config) -> init_per_group(Name, Config) -> case ?util:name(Name) of - [T,R,D,A,C,SD,SS,CD,CS] -> + [T,R,D,A,C,SD,SS,ST,CD,CS] -> G = #group{transport = T, client_service = [$C|?util:unique_string()], client_encoding = R, @@ -319,7 +339,8 @@ init_per_group(Name, Config) -> server_encoding = A, server_container = C, server_strings = SD, - server_sender = SS}, + server_sender = SS, + server_throttle = ST}, [{group, G} | Config]; _ -> Config @@ -431,10 +452,14 @@ add_transports(Config) -> client_service = CN, client_sender = CS, server_service = SN, - server_sender = SS} + server_sender = SS, + server_throttle = ST} = group(Config), LRef = ?util:listen(SN, - [T, {sender, SS}], + [T, + {sender, SS} + | [{message_cb, {?MODULE, message, [4]}} + || ST andalso T == tcp]], [{capabilities_cb, fun capx/2}, {pool_size, 8}, {spawn_opt, [{min_heap_size, 8096}]}, @@ -1037,7 +1062,7 @@ pick_peer(Peers, _, [$C|_], _State, {send_detach, Group}, _, {_,_}) -> find(#group{client_service = CN, server_encoding = A, server_container = C}, - Peers) -> + [_|_] = Peers) -> Id = {A,C}, [P] = [P || P <- Peers, id(Id, P, CN)], {ok, P}. @@ -1471,3 +1496,28 @@ request(#diameter_base_STR{'Session-Id' = SId}, %% send_error/send_timeout request(#diameter_base_RAR{}, _Caps) -> receive after 2000 -> {protocol_error, ?TOO_BUSY} end. + +%% message/3 +%% +%% Limit the number of messages received. More can be received if read +%% in the same packet. + +%% incoming request +message(recv, <<_:32, 1, _/bits>> = Bin, N) -> + [Bin, 1 < N, fun ?MODULE:message/3, N-1]; + +%% incoming answer +message(recv, Bin, _) -> + [Bin]; + +%% outgoing +message(send, Bin, _) -> + [Bin]; + +%% sent request +message(ack, <<_:32, 1, _/bits>>, _) -> + []; + +%% sent answer or discarded request +message(ack, _, N) -> + [0 =< N, fun ?MODULE:message/3, N+1]. -- cgit v1.2.3 From 9296f611de2055c65bdae9425ce80e63fba6610f Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 10 Jun 2017 15:33:49 +0200 Subject: Randomly select traffic testcases Since the number of configuration variants tested makes for (too) many. Randomly select a subset of testcases in each configuration group. --- lib/diameter/test/diameter_traffic_SUITE.erl | 10 +++++++++- lib/diameter/test/diameter_util.erl | 13 +------------ 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index 6aa3ffb7af..bb10638cd2 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -341,7 +341,11 @@ init_per_group(Name, Config) -> server_strings = SD, server_sender = SS, server_throttle = ST}, - [{group, G} | Config]; + %% Limit the number of testcase, since the number of + %% groups is large. + All = ?util:scramble(tc()), + TCs = lists:sublist(All, rand:uniform(32)), + [{group, G}, {runlist, TCs} | Config]; _ -> Config end. @@ -359,12 +363,16 @@ end_per_group(_, _) -> %% Skip testcases that can reasonably fail under SCTP. init_per_testcase(Name, Config) -> + TCs = proplists:get_value(runlist, Config, []), + Run = [] == TCs orelse lists:member(Name, TCs), case [G || #group{transport = sctp} = G <- [proplists:get_value(group, Config)]] of [_] when Name == send_maxlen; Name == send_long -> {skip, sctp}; + _ when not Run -> + {skip, random}; _ -> [{testcase, Name} | Config] end. diff --git a/lib/diameter/test/diameter_util.erl b/lib/diameter/test/diameter_util.erl index 81b43913de..7266d3678c 100644 --- a/lib/diameter/test/diameter_util.erl +++ b/lib/diameter/test/diameter_util.erl @@ -173,18 +173,7 @@ recvl([{MRef, F} | L], Ref, Fun, Acc) -> %% Sort a list into random order. scramble(L) -> - foldl(fun(true, _, S, false) -> S end, - false, - [[fun s/1, L]]). - -s(L) -> - s([], L). - -s(Acc, []) -> - Acc; -s(Acc, L) -> - {H, [T|Rest]} = lists:split(rand:uniform(length(L)) - 1, L), - s([T|Acc], H ++ Rest). + [X || {_,X} <- lists:sort([{rand:uniform(), T} || T <- L])]. %% --------------------------------------------------------------------------- %% unique_string/0 -- cgit v1.2.3 From 84bfb4980a5d6dd806cff07c8dc1c9f2ef85fc20 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 12 Jun 2017 15:35:22 +0200 Subject: Let diameter_tcp send/recv callbacks deal in diameter_packet To let a recv callback for an incoming request set transport_data and have it returned in a send callback. --- lib/diameter/src/transport/diameter_tcp.erl | 73 ++++++++++++++++------------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 5819d52bdc..a2f393d5d4 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -523,9 +523,10 @@ getr(Key) -> %% Transition monitor state. %% Outgoing message. -m(Bin, S) - when is_binary(Bin) -> - send(Bin, S), +m(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + send(Msg, S), S; %% Transport has established a connection. Stop monitoring on the @@ -632,9 +633,10 @@ transition({diameter, {send, Msg}}, #transport{} = S) -> message(send, Msg, S); %% Monitor has sent an outgoing message. -transition(Bin, S) - when is_binary(Bin) -> - message(ack, Bin, S); +transition(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + message(ack, Msg, S); %% Deferred actions from a message_cb. transition({actions, Dir, Acts}, S) -> @@ -856,22 +858,25 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(Bin, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) -> - send1(M, Sock, Bin), - B andalso (TPid ! Bin); +send(Msg, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) -> + send1(M, Sock, Msg), + B andalso (TPid ! Msg); -send(Bin, #transport{socket = Sock, module = M, send = false} = S) -> - send1(M, Sock, Bin), - message(ack, Bin, S); +send(Msg, #transport{socket = Sock, module = M, send = false} = S) -> + send1(M, Sock, Msg), + message(ack, Msg, S); %% Send from the monitor process to avoid deadlock if both the %% receiver and the peer were to block in send. -send(Bin, #transport{send = Pid} = S) -> - Pid ! Bin, +send(Msg, #transport{send = Pid} = S) -> + Pid ! Msg, S. %% send1/3 +send1(Mod, Sock, #diameter_packet{bin = Bin}) -> + send1(Mod, Sock, Bin); + send1(Mod, Sock, Bin) -> case send(Mod, Sock, Bin) of ok -> @@ -953,17 +958,20 @@ getstat(M, Sock) -> %% request. Ignoring possible extra arguments, calls are of the %% following form. %% -%% cb(recv, Bin) Pass a received message into diameter? -%% cb(send, Bin) Send a message? -%% cb(ack, Bin) Acknowledgement of a completed send. +%% cb(recv, Msg) Receive a message into diameter? +%% cb(send, Msg) Send a message on the socket? +%% cb(ack, Msg) Acknowledgement of a completed send. %% cb(ack, false) Acknowledgement of a discarded request. %% -%% Callbacks return a list of the following form. +%% Msg will be binary() in a recv callback, but can be a +%% diameter_packet record in a send/ack callback if a recv/send +%% callback returns a record. Callbacks return a list of the following +%% form. %% -%% [boolean() | send | recv | binary()] +%% [boolean() | send | recv | binary() | #diameter_packet{}] %% %% The atoms are meaningless by themselves, but say whether subsequent -%% binaries are to be sent or received. A boolean says whether or not +%% messages are to be sent or received. A boolean says whether or not %% to continue reading on the socket. Messages can be received even %% after false is returned if these arrived in the same packet. A %% leading recv or send is implicit on the corresponding callbacks. A @@ -979,11 +987,8 @@ message(send, false = M, S) -> message(ack, _, #transport{message_cb = false} = S) -> S; -message(Dir, #diameter_packet{bin = Bin}, S) -> - message(Dir, Bin, S); - -message(Dir, Bin, #transport{message_cb = CB} = S) -> - recv(<<>>, actions(cb(CB, Dir, Bin), Dir, S)). +message(Dir, Msg, #transport{message_cb = CB} = S) -> + recv(<<>>, actions(cb(CB, Dir, Msg), Dir, S)). %% actions/3 @@ -999,13 +1004,15 @@ actions([Dir | As], _, S) Dir == recv -> actions(As, Dir, S); -actions([Bin | As], send = Dir, #transport{} = S) - when is_binary(Bin) -> - actions(As, Dir, send(Bin, S)); +actions([Msg | As], send = Dir, S) + when is_binary(Msg); + is_record(Msg, diameter_packet) -> + actions(As, Dir, send(Msg, S)); -actions([Bin | As], recv = Dir, #transport{parent = Pid} = S) - when is_binary(Bin) -> - diameter_peer:recv(Pid, Bin), +actions([Msg | As], recv = Dir, #transport{parent = Pid} = S) + when is_binary(Msg); + is_record(Msg, diameter_packet) -> + diameter_peer:recv(Pid, Msg), actions(As, Dir, S); actions([{defer, Tmo, Acts} | As], Dir, S) -> @@ -1017,8 +1024,8 @@ actions(CB, _, S) -> %% cb/3 -cb(false, _, Bin) -> - [Bin]; +cb(false, _, Msg) -> + [Msg]; cb(CB, Dir, Msg) -> diameter_lib:eval([CB, Dir, Msg]). -- cgit v1.2.3 From 373cd07c28bbe3e299eaca1c96b1441623ad4979 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 11 Jun 2017 01:13:36 +0200 Subject: Add diameter_sctp send/recv callbacks Corresponding to diameter_tcp callbacks a few commits back. Exercise the callbacks in the traffic suite. --- lib/diameter/src/transport/diameter_sctp.erl | 183 ++++++++++++++++++++------- lib/diameter/test/diameter_traffic_SUITE.erl | 12 +- 2 files changed, 144 insertions(+), 51 deletions(-) diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 3919596cb1..470a6aa9bf 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -59,9 +59,6 @@ %% The default port for a listener. -define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 -%% Remote addresses to accept connections from. --define(DEFAULT_ACCEPT, []). %% any - %% How long to wait for a transport process to attach after %% association establishment. -define(ACCEPT_TIMEOUT, 5000). @@ -80,7 +77,8 @@ | term(). %% gen_sctp:open_option(). -type option() :: {sender, boolean()} - | sender. + | sender + | {message_cb, false | diameter:evaluable()}. -type uint() :: non_neg_integer(). @@ -93,6 +91,8 @@ %% {RAs, RP, Errors} | connect, socket :: gen_sctp:sctp_socket() | undefined, + active = false :: boolean(), %% is socket active? + recv = true :: boolean(), %% should it be active? assoc_id :: gen_sctp:assoc_id() %% association identifier | undefined | true, @@ -101,11 +101,13 @@ streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, os = 0 :: uint(), %% next output stream + message_cb :: false | diameter:evaluable(), send = false :: pid() | boolean()}). %% sending process %% Monitor process state. -record(monitor, {transport :: pid(), + ack = false :: boolean(), socket :: gen_sctp:sctp_socket(), assoc_id :: gen_sctp:assoc_id()}). %% next output stream @@ -115,8 +117,7 @@ socket :: gen_sctp:sctp_socket(), service :: pid(), %% service process pending = {0, queue:new()}, - accept :: [match()], - sender :: boolean()}). + opts :: [[match()] | boolean() | diameter:evaluable()]}). %% Field pending implements two queues: the first of transport-to-be %% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived @@ -242,7 +243,7 @@ i(#monitor{transport = TPid} = S) -> i({listen, Ref, {Opts, SvcPid, Addrs}}) -> monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {Split, Rest} = proplists:split(Opts, [accept, sender]), + {Split, Rest} = proplists:split(Opts, [accept, sender, message_cb]), OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), @@ -251,13 +252,17 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) -> #listener{ref = Ref, service = SvcPid, socket = Sock, - accept = [[M] || {accept, M} <- OwnOpts], - sender = proplists:get_value(sender, OwnOpts, false)}; + opts = [[[M] || {accept, M} <- OwnOpts] + | [proplists:get_value(K, OwnOpts, false) + || K <- [sender, message_cb]]]}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> - {[Ps | Split], Rest} = proplists:split(Opts, [rport, raddr, sender]), + {[Ps | Split], Rest} + = proplists:split(Opts, [rport, raddr, sender, message_cb]), OwnOpts = lists:append(Split), + CB = proplists:get_value(message_cb, OwnOpts, false), + false == CB orelse (Pid ! {diameter, ack}), RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts], [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], {LAs, Sock} = open(Addrs, Rest, 0), @@ -267,6 +272,7 @@ i({connect, Pid, Opts, Addrs, Ref}) -> #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock, + message_cb = CB, send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an @@ -301,12 +307,15 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> receive {Ref, Pid} when K == parent -> %% transport process started S#transport{parent = Pid}; - {K, T, Matches, Bool} when K == peeloff -> %% association + {K, T, Opts} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, + [Matches, Sender, CB] = Opts, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), + false == CB orelse (S#transport.parent ! {diameter, ack}), t(T, S#transport{socket = Sock, - send = Bool}); + message_cb = CB, + send = Sender}); accept_timeout = T -> x(T); {'DOWN', _, process, _, _} = T -> @@ -477,12 +486,11 @@ getr(Key) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, - accept = Matches, - sender = Sender} + opts = Opts} = S) -> Id = assoc_id(Data), {TPid, NewS} = accept(S), - TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches, Sender}, + TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Opts}, setopts(Sock), NewS; @@ -536,12 +544,21 @@ t(T,S) -> %% Incoming message. transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> - setopts(Sock), - recv(Data, S); + setopts(S, recv(Data, S#transport{active = false})); %% Outgoing message. transition({diameter, {send, Msg}}, S) -> - send(Msg, S); + message(send, Msg, S); + +%% Monitor has sent an outgoing message. +transition(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + message(ack, Msg, S); + +%% Deferred actions from a message_cb. +transition({actions, Dir, Acts}, S) -> + actions(Acts, Dir, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> @@ -581,8 +598,12 @@ transition({resolve_port, Pid}, #transport{socket = Sock}) %% m/2 -m({Bin, StreamId}, #monitor{} = S) -> - send(StreamId, Bin, S); +m({Msg, StreamId}, #monitor{socket = Sock, + transport = TPid, + assoc_id = AId, + ack = B}) -> + send(Sock, AId, StreamId, Msg), + B andalso (TPid ! Msg); m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) -> x(T). @@ -632,48 +653,47 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% Start monitor process on first send. send(Msg, #transport{send = true, socket = Sock, - assoc_id = AId} + assoc_id = AId, + message_cb = CB} = S) -> {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(), socket = Sock, - assoc_id = AId}), + assoc_id = AId, + ack = false /= CB}), monitor(process, MPid), send(Msg, S#transport{send = MPid}); %% Outbound Diameter message on a specified stream ... -send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, +send(#diameter_packet{transport_data = {outstream, SId}} + = Msg, #transport{streams = {_, OS}} = S) -> - send(SId rem OS, Bin, S), - S; + send(SId rem OS, Msg, S); %% ... or not: rotate through all streams. -send(#diameter_packet{bin = Bin}, S) -> - send(Bin, S); -send(Bin, #transport{streams = {_, OS}, +send(Msg, #transport{streams = {_, OS}, os = N} - = S) - when is_binary(Bin) -> - send(N, Bin, S), - S#transport{os = (N + 1) rem OS}. + = S) -> + send(N, Msg, S#transport{os = (N + 1) rem OS}). %% send/3 -send(StreamId, Bin, #transport{send = false, +send(StreamId, Msg, #transport{send = false, socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin); - -send(StreamId, Bin, #transport{send = MPid}) -> - MPid ! {Bin, StreamId}, - MPid; + assoc_id = AId} + = S) -> + send(Sock, AId, StreamId, Msg), + message(ack, Msg, S); -send(StreamId, Bin, #monitor{socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin). +send(StreamId, Msg, #transport{send = MPid} = S) -> + MPid ! {Msg, StreamId}, + S. %% send/4 +send(Sock, AssocId, StreamId, #diameter_packet{bin = Bin}) -> + send(Sock, AssocId, StreamId, Bin); + send(Sock, AssocId, StreamId, Bin) -> case gen_sctp:send(Sock, AssocId, StreamId, Bin) of ok -> @@ -720,11 +740,10 @@ recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} recv(T, S#transport{assoc_id = Id}); %% Inbound Diameter message. -recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid} = S) +recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, S) when is_binary(Bin) -> - diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id}, - bin = Bin}), - S; + Pkt = #diameter_packet{bin = Bin, transport_data = {stream, Id}}, + message(recv, Pkt, S); recv({_, #sctp_shutdown_event{}}, _) -> stop; @@ -842,6 +861,23 @@ connect(Sock, [Addr | AT] = As, Port, Reasons) -> connect(Sock, AT, Port, [{Addr, E} | Reasons]) end. +%% setopts/2 + +setopts(_, #transport{socket = Sock, + active = A, + recv = B} + = S) + when B, not A -> + setopts(Sock), + S#transport{active = true}; + +setopts(_, #transport{} = S) -> + S; + +setopts(#transport{socket = Sock}, T) -> + setopts(Sock), + T. + %% setopts/1 setopts(Sock) -> @@ -849,3 +885,58 @@ setopts(Sock) -> ok -> ok; X -> x({setopts, Sock, X}) %% possibly on peer disconnect end. + +%% A message_cb is invoked whenever a message is sent or received, or +%% to provide acknowledgement of a completed send or discarded +%% request. See diameter_tcp for semantics. + +%% message/3 + +message(send, false = M, S) -> + message(ack, M, S); + +message(ack, _, #transport{message_cb = false} = S) -> + S; + +message(Dir, Msg, #transport{message_cb = CB} = S) -> + setopts(S, actions(cb(CB, Dir, Msg), Dir, S)). + +%% actions/3 + +actions([], _, S) -> + S; + +actions([B | As], Dir, S) + when is_boolean(B) -> + actions(As, Dir, S#transport{recv = B}); + +actions([Dir | As], _, S) + when Dir == send; + Dir == recv -> + actions(As, Dir, S); + +actions([Msg | As], send = Dir, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + actions(As, Dir, send(Msg, S)); + +actions([Msg | As], recv = Dir, #transport{parent = Pid} = S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + diameter_peer:recv(Pid, Msg), + actions(As, Dir, S); + +actions([{defer, Tmo, Acts} | As], Dir, S) -> + erlang:send_after(Tmo, self(), {actions, Dir, Acts}), + actions(As, Dir, S); + +actions(CB, _, S) -> + S#transport{message_cb = CB}. + +%% cb/3 + +cb(false, _, Msg) -> + [Msg]; + +cb(CB, Dir, Msg) -> + diameter_lib:eval([CB, Dir, Msg]). diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index bb10638cd2..95339127d4 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -108,7 +108,7 @@ handle_error/6, handle_request/3]). -%% diameter_tcp callbacks +%% diameter_{tcp,sctp} callbacks -export([message/3]). -include("diameter.hrl"). @@ -158,7 +158,7 @@ %% Send from a dedicated process? -define(SENDERS, [true, false]). -%% Message callbacks from diameter_tcp? +%% Message callbacks from diameter_{tcp,sctp}? -define(CALLBACKS, [true, false]). -record(group, @@ -465,9 +465,8 @@ add_transports(Config) -> = group(Config), LRef = ?util:listen(SN, [T, - {sender, SS} - | [{message_cb, {?MODULE, message, [4]}} - || ST andalso T == tcp]], + {sender, SS}, + {message_cb, ST andalso {?MODULE, message, [4]}}], [{capabilities_cb, fun capx/2}, {pool_size, 8}, {spawn_opt, [{min_heap_size, 8096}]}, @@ -1510,6 +1509,9 @@ request(#diameter_base_RAR{}, _Caps) -> %% Limit the number of messages received. More can be received if read %% in the same packet. +message(Dir, #diameter_packet{bin = Bin}, N) -> + message(Dir, Bin, N); + %% incoming request message(recv, <<_:32, 1, _/bits>> = Bin, N) -> [Bin, 1 < N, fun ?MODULE:message/3, N-1]; -- cgit v1.2.3 From c591056bb2ce9147cc946e068e980050be67dcc1 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 12 Jun 2017 15:26:45 +0200 Subject: Add diameter_sctp option packet To determine the wrapping of messages passed to recv callbacks and into diameter. The default passing of the input stream in transport_data is probably of no practical use, but has been set since time immemorial. --- lib/diameter/src/transport/diameter_sctp.erl | 53 ++++++++++++++++++++++------ lib/diameter/test/diameter_traffic_SUITE.erl | 6 +++- 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 470a6aa9bf..e47febaf99 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -78,6 +78,7 @@ -type option() :: {sender, boolean()} | sender + | {packet, boolean() | raw} | {message_cb, false | diameter:evaluable()}. -type uint() :: non_neg_integer(). @@ -101,6 +102,8 @@ streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, os = 0 :: uint(), %% next output stream + packet = true :: boolean() %% legacy transport_data? + | raw, message_cb :: false | diameter:evaluable(), send = false :: pid() | boolean()}). %% sending process @@ -243,7 +246,8 @@ i(#monitor{transport = TPid} = S) -> i({listen, Ref, {Opts, SvcPid, Addrs}}) -> monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {Split, Rest} = proplists:split(Opts, [accept, sender, message_cb]), + {Split, Rest} + = proplists:split(Opts, [accept, packet, sender, message_cb]), OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), @@ -252,14 +256,15 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) -> #listener{ref = Ref, service = SvcPid, socket = Sock, - opts = [[[M] || {accept, M} <- OwnOpts] + opts = [[[M] || {accept, M} <- OwnOpts], + proplists:get_value(packet, OwnOpts, true) | [proplists:get_value(K, OwnOpts, false) || K <- [sender, message_cb]]]}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> {[Ps | Split], Rest} - = proplists:split(Opts, [rport, raddr, sender, message_cb]), + = proplists:split(Opts, [rport, raddr, packet, sender, message_cb]), OwnOpts = lists:append(Split), CB = proplists:get_value(message_cb, OwnOpts, false), false == CB orelse (Pid ! {diameter, ack}), @@ -273,6 +278,7 @@ i({connect, Pid, Opts, Addrs, Ref}) -> mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock, message_cb = CB, + packet = proplists:get_value(packet, OwnOpts, true), send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an @@ -309,12 +315,13 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> S#transport{parent = Pid}; {K, T, Opts} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, - [Matches, Sender, CB] = Opts, + [Matches, Packet, Sender, CB] = Opts, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), false == CB orelse (S#transport.parent ! {diameter, ack}), t(T, S#transport{socket = Sock, message_cb = CB, + packet = Packet, send = Sender}); accept_timeout = T -> x(T); @@ -740,10 +747,9 @@ recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} recv(T, S#transport{assoc_id = Id}); %% Inbound Diameter message. -recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, S) +recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S) when is_binary(Bin) -> - Pkt = #diameter_packet{bin = Bin, transport_data = {stream, Id}}, - message(recv, Pkt, S); + message(recv, Msg, S); recv({_, #sctp_shutdown_event{}}, _) -> stop; @@ -888,7 +894,9 @@ setopts(Sock) -> %% A message_cb is invoked whenever a message is sent or received, or %% to provide acknowledgement of a completed send or discarded -%% request. See diameter_tcp for semantics. +%% request. See diameter_tcp for semantics, the only difference being +%% that a recv callback can get a diameter_packet record as Msg +%% depending on how/if option packet has been specified. %% message/3 @@ -898,8 +906,8 @@ message(send, false = M, S) -> message(ack, _, #transport{message_cb = false} = S) -> S; -message(Dir, Msg, #transport{message_cb = CB} = S) -> - setopts(S, actions(cb(CB, Dir, Msg), Dir, S)). +message(Dir, Msg, S) -> + setopts(S, actions(cb(S, Dir, Msg), Dir, S)). %% actions/3 @@ -935,8 +943,31 @@ actions(CB, _, S) -> %% cb/3 -cb(false, _, Msg) -> +cb(#transport{message_cb = false, packet = P}, recv, Msg) -> + [pkt(P, true, Msg)]; + +cb(#transport{message_cb = CB, packet = P}, recv = D, Msg) -> + cb(CB, D, pkt(P, false, Msg)); + +cb(#transport{message_cb = CB}, Dir, Msg) -> + cb(CB, Dir, Msg); + +cb(false, send, Msg) -> [Msg]; cb(CB, Dir, Msg) -> diameter_lib:eval([CB, Dir, Msg]). + +%% pkt/3 + +pkt(false, _, {_Info, Bin}) -> + Bin; + +pkt(true, _, {[#sctp_sndrcvinfo{stream = Id}], Bin}) -> + #diameter_packet{bin = Bin, transport_data = {stream, Id}}; + +pkt(raw, true, {[Info], Bin}) -> + #diameter_packet{bin = Bin, transport_data = Info}; + +pkt(raw, false, {[_], _} = Msg) -> + Msg. diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index 95339127d4..f567a6f367 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -466,7 +466,9 @@ add_transports(Config) -> LRef = ?util:listen(SN, [T, {sender, SS}, - {message_cb, ST andalso {?MODULE, message, [4]}}], + {message_cb, ST andalso {?MODULE, message, [4]}} + | [{packet, hd(?util:scramble([false, raw]))} + || T == sctp andalso CS]], [{capabilities_cb, fun capx/2}, {pool_size, 8}, {spawn_opt, [{min_heap_size, 8096}]}, @@ -1509,6 +1511,8 @@ request(#diameter_base_RAR{}, _Caps) -> %% Limit the number of messages received. More can be received if read %% in the same packet. +message(recv = D, {[_], Bin}, N) -> + message(D, Bin, N); message(Dir, #diameter_packet{bin = Bin}, N) -> message(Dir, Bin, N); -- cgit v1.2.3 From 946cc275f79fb8b96039d7e6c0e49b3e822ecdd6 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 12 Jun 2017 02:23:00 +0200 Subject: Remove client/server string decode from traffic suite Decode on both ends or not, since the choice doesn't affect the peer. --- lib/diameter/test/diameter_traffic_SUITE.erl | 35 ++++++++++++---------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index f567a6f367..f2e796005d 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -163,15 +163,14 @@ -record(group, {transport, + strings, client_service, client_encoding, client_dict0, - client_strings, client_sender, server_service, server_encoding, server_container, - server_strings, server_sender, server_throttle}). @@ -262,29 +261,27 @@ all() -> groups() -> [{P, [P], Ts} || Ts <- [tc(tc())], P <- [shuffle, parallel]] ++ - [{?util:name([T,R,D,A,C,SD,SS,ST,CD,CS]), + [{?util:name([T,R,D,A,C,S,SS,ST,CS]), [], - [{group, if SD orelse CD -> shuffle; true -> parallel end}]} + [{group, if S -> shuffle; not S -> parallel end}]} || T <- ?TRANSPORTS, R <- ?ENCODINGS, D <- ?RFCS, A <- ?ENCODINGS, C <- ?CONTAINERS, - SD <- ?STRING_DECODES, + S <- ?STRING_DECODES, SS <- ?SENDERS, ST <- ?CALLBACKS, - CD <- ?STRING_DECODES, CS <- ?SENDERS] ++ - [{T, [], groups([[T,R,D,A,C,SD,SS,ST,CD,CS] + [{T, [], groups([[T,R,D,A,C,S,SS,ST,CS] || R <- ?ENCODINGS, D <- ?RFCS, A <- ?ENCODINGS, C <- ?CONTAINERS, - SD <- ?STRING_DECODES, + S <- ?STRING_DECODES, SS <- ?SENDERS, ST <- ?CALLBACKS, - CD <- ?STRING_DECODES, CS <- ?SENDERS, SS orelse CS])} %% avoid deadlock || T <- ?TRANSPORTS] @@ -292,7 +289,7 @@ groups() -> [{traffic, [], [{group, T} || T <- ?TRANSPORTS]}]. %groups(_) -> %% debug -% Name = [sctp,record,rfc6733,record,pkt,false,false,false,false,false], +% Name = [sctp,record,rfc6733,record,pkt,false,false,false,false], % [{group, ?util:name(Name)}]; groups(Names) -> [{group, ?util:name(L)} || L <- Names]. @@ -328,17 +325,16 @@ init_per_group(sctp = Name, Config) -> init_per_group(Name, Config) -> case ?util:name(Name) of - [T,R,D,A,C,SD,SS,ST,CD,CS] -> + [T,R,D,A,C,S,SS,ST,CS] -> G = #group{transport = T, + strings = S, client_service = [$C|?util:unique_string()], client_encoding = R, client_dict0 = dict0(D), - client_strings = CD, client_sender = CS, server_service = [$S|?util:unique_string()], server_encoding = A, server_container = C, - server_strings = SD, server_sender = SS, server_throttle = ST}, %% Limit the number of testcase, since the number of @@ -446,14 +442,13 @@ start(_Config) -> ok = diameter:start(). start_services(Config) -> - #group{client_service = CN, - client_strings = CD, - server_service = SN, - server_strings = SD} + #group{strings = S, + client_service = CN, + server_service = SN} = group(Config), - ok = diameter:start_service(SN, ?SERVICE(SN, SD)), + ok = diameter:start_service(SN, ?SERVICE(SN, S)), ok = diameter:start_service(CN, [{sequence, ?CLIENT_MASK} - | ?SERVICE(CN, CD)]). + | ?SERVICE(CN, S)]). add_transports(Config) -> #group{transport = T, @@ -951,7 +946,7 @@ group(Config) -> #group{} = proplists:get_value(group, Config). string(V, Config) -> - #group{client_strings = B} = group(Config), + #group{strings = B} = group(Config), decode(V,B). decode(S, true) -- cgit v1.2.3 From 19ad1805bfc895b12ea692e713cbfd252c44d147 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 12 Jun 2017 20:42:52 +0200 Subject: Fix dialyzer warnings diameter_sctp.erl:292: Record construction #transport{parent::pid(),mode::{'accept',atom() | pid() | port() | {atom(),atom()}},active::'false',recv::'true',os::0,packet::'true',message_cb::'undefined',send::'false'} violates the declared type of field message_cb::'false' | fun() | maybe_improper_list(fun() | maybe_improper_list(any(),[any()]) | {atom(),atom(),[any()]},[any()]) | {atom(),atom(),[any()]} diameter_sctp.erl:302: Record construction #transport{mode::{'accept',atom() | pid() | port() | {atom(),atom()}},active::'false',recv::'true',os::0,packet::'true',message_cb::'undefined',send::'false'} violates the declared type of field message_cb::'false' | fun() | maybe_improper_list(fun() | maybe_improper_list(any(),[any()]) | {atom(),atom(),[any()]},[any()]) | {atom(),atom(),[any()]} --- lib/diameter/src/transport/diameter_sctp.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index e47febaf99..f243f2b7cd 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -104,7 +104,7 @@ os = 0 :: uint(), %% next output stream packet = true :: boolean() %% legacy transport_data? | raw, - message_cb :: false | diameter:evaluable(), + message_cb = false :: false | diameter:evaluable(), send = false :: pid() | boolean()}). %% sending process %% Monitor process state. -- cgit v1.2.3 From 2994dfc0357c5c1b6f3a9c28bdfdafb482183536 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 12 Jun 2017 23:13:13 +0200 Subject: Remove obsolete traffic testcase Outgoing requests no longer write to the request table, as of commit a4da06a5. --- lib/diameter/test/diameter_traffic_SUITE.erl | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index f2e796005d..c6d63a9345 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -92,7 +92,6 @@ send_multiple_filters_2/1, send_multiple_filters_3/1, send_anything/1, - outstanding/1, remove_transports/1, empty/1, stop_services/1, @@ -256,7 +255,7 @@ suite() -> [{timetrap, {seconds, 10}}]. all() -> - [start, result_codes, {group, traffic}, outstanding, empty, stop]. + [start, result_codes, {group, traffic}, empty, stop]. groups() -> [{P, [P], Ts} || Ts <- [tc(tc())], P <- [shuffle, parallel]] @@ -486,11 +485,6 @@ apps(D0) -> D = dict0(D0), [acct(D), D]. -%% Ensure there are no outstanding requests in request table. -outstanding(_Config) -> - [] = [T || T <- ets:tab2list(diameter_request), - is_atom(element(1,T))]. - remove_transports(Config) -> #group{client_service = CN, server_service = SN} -- cgit v1.2.3 From 69c5a74179e13e145da3da70e02dd43881a82008 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 12 Jun 2017 23:49:43 +0200 Subject: Capitulate on SCTP vs sparc-sun-solaris2.10 Despite the efforts of commits 1df74351 and 111261d1 to salvage it, SCTP is just flakey on sparc-sun-solaris2.10. In addition to the woes of the loopback address, even connect on other addresses sporadically returns {error, eafnosupport}, so the initial check for a working SCTP (aka resistance) is futile. Revert both commits. --- lib/diameter/test/diameter_examples_SUITE.erl | 6 +-- lib/diameter/test/diameter_transport_SUITE.erl | 44 +++++++++----------- lib/diameter/test/diameter_util.erl | 57 ++++++++++---------------- 3 files changed, 44 insertions(+), 63 deletions(-) diff --git a/lib/diameter/test/diameter_examples_SUITE.erl b/lib/diameter/test/diameter_examples_SUITE.erl index 680ce4f366..fad54d62b2 100644 --- a/lib/diameter/test/diameter_examples_SUITE.erl +++ b/lib/diameter/test/diameter_examples_SUITE.erl @@ -70,8 +70,6 @@ %% Transport protocols over which the example Diameter nodes are run. -define(PROTS, [tcp, sctp]). --define(ADDR, diameter_util:ip4()). - %% =========================================================================== suite() -> @@ -348,7 +346,7 @@ top(Dir, LibDir) -> start({server, Prot}) -> ok = diameter:start(), ok = server:start(), - {ok, Ref} = server:listen({Prot, ?ADDR, 3868}), + {ok, Ref} = server:listen(Prot), [_] = ?util:lport(Prot, Ref), ok; @@ -356,7 +354,7 @@ start({client = Svc, Prot}) -> ok = diameter:start(), true = diameter:subscribe(Svc), ok = client:start(), - {ok, Ref} = client:connect({Prot, ?ADDR, ?ADDR, 3868}), + {ok, Ref} = client:connect(Prot), receive #diameter_event{info = {up, Ref, _, _, _}} -> ok end; start(Config) -> diff --git a/lib/diameter/test/diameter_transport_SUITE.erl b/lib/diameter/test/diameter_transport_SUITE.erl index 14c748ad20..9d981d0a2b 100644 --- a/lib/diameter/test/diameter_transport_SUITE.erl +++ b/lib/diameter/test/diameter_transport_SUITE.erl @@ -56,10 +56,8 @@ -define(RECV(Pat, Ret), receive Pat -> Ret end). -define(RECV(Pat), ?RECV(Pat, diameter_lib:now())). -%% Address to open sockets on. --define(ADDR(Prot), if sctp == Prot -> diameter_util:ip4(); - true -> {127,0,0,1} - end). +%% Sockets are opened on the loopback address. +-define(ADDR, {127,0,0,1}). %% diameter_tcp doesn't use anything but host_ip_address, and that %% only is a local address isn't configured as at transport start. @@ -351,14 +349,13 @@ rand_bytes(N) -> %% start_connect/3 start_connect(Prot, PortNr, Ref) -> - Addr = ?ADDR(Prot), - {ok, TPid, [_]} = start_connect(Prot, - {connect, Ref}, - ?SVC([]), - [{raddr, Addr}, - {rport, PortNr}, - {ip, Addr}, - {port, 0}]), + {ok, TPid, [?ADDR]} = start_connect(Prot, + {connect, Ref}, + ?SVC([]), + [{raddr, ?ADDR}, + {rport, PortNr}, + {ip, ?ADDR}, + {port, 0}]), ?RECV(?TMSG({TPid, connected, _})), TPid. @@ -371,9 +368,9 @@ start_connect(tcp, T, Svc, Opts) -> start_accept(Prot, Ref) -> {Mod, Opts} = tmod(Prot), - {ok, TPid, [_]} = Mod:start({accept, Ref}, - ?SVC([?ADDR(Prot)]), - [{port, 0} | Opts]), + {ok, TPid, [?ADDR]} = Mod:start({accept, Ref}, + ?SVC([?ADDR]), + [{port, 0} | Opts]), ?RECV(?TMSG({TPid, connected})), TPid. @@ -387,20 +384,19 @@ tmod(tcp) -> %% gen_connect/2 gen_connect(sctp = P, PortNr) -> - Addr = ?ADDR(P), - {ok, Sock} = Ok = gen_sctp:open([{ip, Addr}, {port, 0} | ?SCTP_OPTS]), - ok = gen_sctp:connect_init(Sock, Addr, PortNr, []), + {ok, Sock} = Ok = gen_sctp:open([{ip, ?ADDR}, {port, 0} | ?SCTP_OPTS]), + ok = gen_sctp:connect_init(Sock, ?ADDR, PortNr, []), Ok = gen_accept(P, Sock); -gen_connect(tcp = P, PortNr) -> - gen_tcp:connect(?ADDR(P), PortNr, ?TCP_OPTS). +gen_connect(tcp, PortNr) -> + gen_tcp:connect(?ADDR, PortNr, ?TCP_OPTS). %% gen_listen/1 -gen_listen(sctp = P) -> - {ok, Sock} = gen_sctp:open([{ip, ?ADDR(P)}, {port, 0} | ?SCTP_OPTS]), +gen_listen(sctp) -> + {ok, Sock} = gen_sctp:open([{ip, ?ADDR}, {port, 0} | ?SCTP_OPTS]), {gen_sctp:listen(Sock, true), Sock}; -gen_listen(tcp = P) -> - gen_tcp:listen(0, [{ip, ?ADDR(P)} | ?TCP_OPTS]). +gen_listen(tcp) -> + gen_tcp:listen(0, [{ip, ?ADDR} | ?TCP_OPTS]). %% gen_accept/2 diff --git a/lib/diameter/test/diameter_util.erl b/lib/diameter/test/diameter_util.erl index 7266d3678c..03f79096ac 100644 --- a/lib/diameter/test/diameter_util.erl +++ b/lib/diameter/test/diameter_util.erl @@ -32,8 +32,7 @@ foldl/3, scramble/1, unique_string/0, - have_sctp/0, - ip4/0]). + have_sctp/0]). %% diameter-specific -export([lport/2, @@ -185,13 +184,19 @@ unique_string() -> %% have_sctp/0 have_sctp() -> + have_sctp(erlang:system_info(system_architecture)). + +%% Don't run SCTP on platforms where it's either known to be flakey or +%% isn't available. + +have_sctp("sparc-sun-solaris2.10") -> + false; + +have_sctp(_) -> case gen_sctp:open() of {ok, Sock} -> - RC = gen_sctp:connect(Sock, ip4(), 3868, []), gen_sctp:close(Sock), - %% Connect has been seen to return eafnosupport on at least - %% one SunOS 10 Sparc host, for reasons unknown. - RC /= {error, eafnosupport}; + true; {error, E} when E == eprotonosupport; E == esocktnosupport -> %% fail on any other reason false @@ -360,8 +365,7 @@ tmod(any) -> opts([Prot | Opts], T) -> tmo(T, lists:append([[{transport_module, M}, {transport_config, C ++ Opts}] || M <- tmod(Prot), - C <- [buf(M,T) ++ [{ip, addr(M)}, {port, 0}] - ++ remote(M,T)]])); + C <- [cfg(M,T) ++ cfg(M) ++ cfg(T)]])); opts(Prot, T) -> opts([Prot], T). @@ -380,38 +384,21 @@ tmo([M, C | Opts]) -> %% Listening SCTP socket need larger-than-default buffers to avoid %% resends on some platforms (eg. SLES 11). -buf(diameter_sctp, listen) -> +cfg(diameter_sctp, listen) -> [{recbuf, 1 bsl 16}, {sndbuf, 1 bsl 16}]; -buf(_, _) -> + +cfg(_, _) -> []. -addr(diameter_tcp) -> - {127,0,0,1}; -addr(diameter_sctp) -> - ip4(). +cfg(M) + when M == diameter_tcp; + M == diameter_sctp -> + [{ip, ?ADDR}, {port, 0}]; -remote(_, listen) -> +cfg(listen) -> [{accept, M} || M <- [{256,0,0,1}, ["256.0.0.1", ["^.+$"]]]]; -remote(Mod, PortNr) -> - [{raddr, addr(Mod)}, {rport, PortNr}]. - -%% Try to use something other than the loopback address where this -%% address is known to be problematic for gen_sctp. -ip4() -> - try - "sparc-sun-solaris2.10" = erlang:system_info(system_architecture), - {ok, List} = inet:getifaddrs(), - hd(lists:flatmap(fun ip4/1, List)) - catch - error:_ -> - ?ADDR - end. - -ip4({_, Opts}) -> - {flags, Flags} = lists:keyfind(flags, 1, Opts), - [A || lists:member(up, Flags), - not lists:member(loopback, Flags), - {addr, {_,_,_,_} = A} <- Opts]. +cfg(PortNr) -> + [{raddr, ?ADDR}, {rport, PortNr}]. %% --------------------------------------------------------------------------- %% info/0 -- cgit v1.2.3