From e05868f49b442c10cedbded390c110473cda6f00 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 8 Aug 2011 18:24:36 +0200 Subject: Add transport and gen_sctp suites --- lib/diameter/test/diameter_gen_sctp_SUITE.erl | 354 ++++++++++++++++++ lib/diameter/test/diameter_tcp_test.erl | 482 ------------------------- lib/diameter/test/diameter_transport_SUITE.erl | 454 +++++++++++++++++++++++ lib/diameter/test/modules.mk | 4 +- 4 files changed, 811 insertions(+), 483 deletions(-) create mode 100644 lib/diameter/test/diameter_gen_sctp_SUITE.erl delete mode 100644 lib/diameter/test/diameter_tcp_test.erl create mode 100644 lib/diameter/test/diameter_transport_SUITE.erl (limited to 'lib/diameter/test') diff --git a/lib/diameter/test/diameter_gen_sctp_SUITE.erl b/lib/diameter/test/diameter_gen_sctp_SUITE.erl new file mode 100644 index 0000000000..be6d28beb2 --- /dev/null +++ b/lib/diameter/test/diameter_gen_sctp_SUITE.erl @@ -0,0 +1,354 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% Some gen_sctp-specific tests demonstrating problems that were +%% encountered during diameter development but have nothing +%% specifically to do with diameter. At least one of them can cause +%% diameter_transport_SUITE testcases to fail. +%% + +-module(diameter_gen_sctp_SUITE). + +-export([suite/0, + all/0, + init_per_suite/1, + end_per_suite/1]). + +%% testcases +-export([send_not_from_controlling_process/1, + send_from_multiple_clients/1, + receive_what_was_sent/1]). + +-include_lib("kernel/include/inet_sctp.hrl"). + +%% Message from gen_sctp are of this form. +-define(SCTP(Sock, Data), {sctp, Sock, _, _, Data}). + +%% Open sockets on the loopback address. +-define(ADDR, {127,0,0,1}). + +%% Snooze, nap, siesta. +-define(SLEEP(T), receive after T -> ok end). + +%% An indescribably long number of milliseconds after which everthing +%% that should have happened has. +-define(FOREVER, 2000). + +%% The first byte in each message we send as a simple guard against +%% not receiving what was sent. +-define(MAGIC, 42). + +%% =========================================================================== + +suite() -> + [{timetrap, {minutes, 2}}]. + +all() -> + [send_not_from_controlling_process, + send_from_multiple_clients, + receive_what_was_sent]. + +init_per_suite(Config) -> + try gen_sctp:open() of + {ok, Sock} -> + gen_sctp:close(Sock), + Config + catch + error: badarg -> + {skip, no_sctp} + end. + +end_per_suite(_Config) -> + ok. + +%% =========================================================================== + +%% send_not_from_controlling_process/1 +%% +%% This testcase failing shows gen_sctp:send/4 hanging when called +%% outside the controlling process of the socket in question. + +send_not_from_controlling_process(_) -> + Pids = send_not_from_controlling_process(), + ?SLEEP(?FOREVER), + try + [] = [{P,I} || P <- Pids, I <- [process_info(P)], I /= undefined] + after + lists:foreach(fun(P) -> exit(P, kill) end, Pids) + end. + +%% send_not_from_controlling_process/0 +%% +%% Returns the pids of three spawned processes: a listening process, a +%% connecting process and a sending process. +%% +%% The expected behaviour is that all three processes exit: +%% +%% - The listening process exits upon receiving an SCTP message +%% sent by the sending process. +%% - The connecting process exits upon listening process exit. +%% - The sending process exits upon gen_sctp:send/4 return. +%% +%% The observed behaviour is that all three processes remain alive +%% indefinitely: +%% +%% - The listening process never receives the SCTP message sent +%% by the sending process. +%% - The connecting process has an inet_reply message in its mailbox +%% as a consequence of the call to gen_sctp:send/4 call from the +%% sending process. +%% - The call to gen_sctp:send/4 in the sending process doesn't return, +%% hanging in prim_inet:getopts/2. + +send_not_from_controlling_process() -> + FPid = self(), + {L, MRef} = spawn_monitor(fun() -> listen(FPid) end),%% listening process + receive + {?MODULE, C, S} -> + erlang:demonitor(MRef, [flush]), + [L,C,S]; + {'DOWN', MRef, process, _, _} = T -> + error(T) + end. + +%% listen/1 + +listen(FPid) -> + {ok, Sock} = open(), + ok = gen_sctp:listen(Sock, true), + {ok, PortNr} = inet:port(Sock), + LPid = self(), + spawn(fun() -> connect1(PortNr, FPid, LPid) end), %% connecting process + Id = assoc(Sock), + ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = Id}], _Bin}) + = recv(). %% Waits with this as current_function. + +%% recv/0 + +recv() -> + receive T -> T end. + +%% connect1/3 + +connect1(PortNr, FPid, LPid) -> + {ok, Sock} = open(), + ok = gen_sctp:connect_init(Sock, ?ADDR, PortNr, []), + Id = assoc(Sock), + FPid ! {?MODULE, + self(), + spawn(fun() -> send(Sock, Id) end)}, %% sending process + MRef = erlang:monitor(process, LPid), + down(MRef). %% Waits with this as current_function. + +%% down/1 + +down(MRef) -> + receive {'DOWN', MRef, process, _, Reason} -> Reason end. + +%% send/2 + +send(Sock, Id) -> + ok = gen_sctp:send(Sock, Id, 0, <<0:32>>). + +%% =========================================================================== + +%% send_from_multiple_clients/0 +%% +%% Demonstrates sluggish delivery of messages. + +send_from_multiple_clients(_) -> + {S, Rs} = T = send_from_multiple_clients(8, 1024), + {false, [], _} = {?FOREVER < S, + Rs -- [OI || {O,_} = OI <- Rs, is_integer(O)], + T}. + +%% send_from_multiple_clients/2 +%% +%% Opens a listening socket and then spawns a specified number of +%% processes, each of which connects to the listening socket. Each +%% connecting process then sends a message, whose size in bytes is +%% passed as an argument, the listening process sends a reply +%% containing the time at which the message was received, and the +%% connecting process then exits upon reception of this reply. +%% +%% Returns the elapsed time for all connecting process to exit +%% together with a list of exit reasons for the connecting processes. +%% In the successful case a connecting process exits with the +%% outbound/inbound transit times for the sent/received message as +%% reason. +%% +%% The observed behaviour is that some outbound messages (that is, +%% from a connecting process to the listening process) can take an +%% unexpectedly long time to complete their journey. The more +%% connecting processes, the longer the possible delay it seems. +%% +%% eg. (With F = fun send_from_multiple_clients/2.) +%% +%% 5> F(2, 1024). +%% {875,[{128,116},{113,139}]} +%% 6> F(4, 1024). +%% {2995290,[{2994022,250},{2994071,80},{200,130},{211,113}]} +%% 7> F(8, 1024). +%% {8997461,[{8996161,116}, +%% {2996471,86}, +%% {2996278,116}, +%% {2996360,95}, +%% {246,112}, +%% {213,159}, +%% {373,173}, +%% {376,118}]} +%% 8> F(8, 1024). +%% {21001891,[{20999968,128}, +%% {8997891,172}, +%% {8997927,91}, +%% {2995716,164}, +%% {2995860,87}, +%% {134,100}, +%% {117,98}, +%% {149,125}]} + +send_from_multiple_clients(N, Sz) + when is_integer(N), 0 < N, is_integer(Sz), 0 < Sz -> + timer:tc(fun listen/2, [N, <>]). + +%% listen/2 + +listen(N, Bin) -> + {ok, Sock} = open(), + ok = gen_sctp:listen(Sock, true), + {ok, PortNr} = inet:port(Sock), + + %% Spawn a middleman that in turn spawns N connecting processes, + %% collects a list of exit reasons and then exits with the list as + %% reason. loop/3 returns when we receive this list from the + %% middleman's 'DOWN'. + + Self = self(), + Fun = fun() -> exit(connect2(Self, PortNr, Bin)) end, + {_, MRef} = spawn_monitor(fun() -> exit(fold(N, Fun)) end), + loop(Sock, MRef, Bin). + +%% fold/2 +%% +%% Spawn N processes and collect their exit reasons in a list. + +fold(N, Fun) -> + start(N, Fun), + acc(N, []). + +start(0, _) -> + ok; +start(N, Fun) -> + spawn_monitor(Fun), + start(N-1, Fun). + +acc(0, Acc) -> + Acc; +acc(N, Acc) -> + receive + {'DOWN', _MRef, process, _, RC} -> + acc(N-1, [RC | Acc]) + end. + +%% loop/3 + +loop(Sock, MRef, Bin) -> + receive + ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = Id}], B}) -> + Sz = size(Bin), + {Sz, Bin} = {size(B), B}, %% assert + ok = send(Sock, Id, mark(Bin)), + loop(Sock, MRef, Bin); + ?SCTP(Sock, _) -> + loop(Sock, MRef, Bin); + {'DOWN', MRef, process, _, Reason} -> + Reason + end. + +%% connect2/3 + +connect2(Pid, PortNr, Bin) -> + erlang:monitor(process, Pid), + + {ok, Sock} = open(), + ok = gen_sctp:connect_init(Sock, ?ADDR, PortNr, []), + Id = assoc(Sock), + + %% T1 = time before send + %% T2 = time after listening process received our message + %% T3 = time after reply is received + + T1 = now(), + ok = send(Sock, Id, Bin), + T2 = unmark(recv(Sock, Id)), + T3 = now(), + {timer:now_diff(T2, T1), timer:now_diff(T3, T2)}. %% {Outbound, Inbound} + +%% recv/2 + +recv(Sock, Id) -> + receive + ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = Id}], Bin}) -> + Bin; + T -> %% eg. 'DOWN' + exit(T) + end. + +%% send/3 + +send(Sock, Id, Bin) -> + gen_sctp:send(Sock, Id, 0, Bin). + +%% mark/1 + +mark(Bin) -> + Info = term_to_binary(now()), + <>. + +%% unmark/1 + +unmark(Bin) -> + {_,_,_} = binary_to_term(Bin). + +%% =========================================================================== + +%% receive_what_was_sent/1 +%% +%% Demonstrates reception of a message that differs from that sent. + +receive_what_was_sent(_Config) -> + send_from_multiple_clients(1, 1024*32). %% fails + +%% =========================================================================== + +%% open/0 + +open() -> + gen_sctp:open([{ip, ?ADDR}, {port, 0}, {active, true}, binary]). + +%% assoc/1 + +assoc(Sock) -> + receive + ?SCTP(Sock, {[], #sctp_assoc_change{state = S, + assoc_id = Id}}) -> + comm_up = S, %% assert + Id + end. diff --git a/lib/diameter/test/diameter_tcp_test.erl b/lib/diameter/test/diameter_tcp_test.erl deleted file mode 100644 index b002a3d289..0000000000 --- a/lib/diameter/test/diameter_tcp_test.erl +++ /dev/null @@ -1,482 +0,0 @@ -%% -%% %CopyrightBegin% -%% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. -%% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. -%% -%% %CopyrightEnd% -%% - -%% -%%---------------------------------------------------------------------- -%% Purpose: Verify the tcp transport component of the Diameter application -%%---------------------------------------------------------------------- -%% --module(diameter_tcp_test). - --export([ - init_per_testcase/2, fin_per_testcase/2, - - all/0, - groups/0, - init_per_suite/1, end_per_suite/1, - suite_init/1, suite_fin/1, - init_per_group/2, end_per_group/2, - - start_and_stop_transport_plain/1, - start_and_listen/1, - simple_connect/1, - simple_send_and_recv/1 - - ]). - --export([t/0, t/1]). - -%% diameter_peer (internal) callback API --export([up/1, up/3, recv/2]). - --include("diameter_test_lib.hrl"). --include_lib("diameter/include/diameter.hrl"). -%% -include_lib("diameter/src/tcp/diameter_tcp.hrl"). - - -t() -> diameter_test_server:t(?MODULE). -t(Case) -> diameter_test_server:t({?MODULE, Case}). - - -%% Test server callbacks -init_per_testcase(Case, Config) -> - diameter_test_server:init_per_testcase(Case, Config). - -fin_per_testcase(Case, Config) -> - diameter_test_server:fin_per_testcase(Case, Config). - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -all() -> - [ - {group, start}, - {group, simple} - ]. - -groups() -> - [ - {start, [], [start_and_stop_transport_plain, start_and_listen]}, - {simple, [], [simple_connect, simple_send_and_recv]} - ]. - -init_per_group(_GroupName, Config) -> - Config. - -end_per_group(_GroupName, Config) -> - Config. - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -suite_init(X) -> init_per_suite(X). - -init_per_suite(suite) -> []; -init_per_suite(doc) -> []; -init_per_suite(Config) when is_list(Config) -> - Config. - - -suite_fin(X) -> end_per_suite(X). - -end_per_suite(suite) -> []; -end_per_suite(doc) -> []; -end_per_suite(Config) when is_list(Config) -> - Config. - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% -%% Test case(s) -%% - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% -%% Plain start and stop of TCP transport -%% - -start_and_stop_transport_plain(suite) -> - []; -start_and_stop_transport_plain(doc) -> - []; -start_and_stop_transport_plain(Config) when is_list(Config) -> - - ?SKIP(not_yet_implemented), - - %% This has been changed *a lot* since it was written... - - process_flag(trap_exit, true), - Transport = ensure_transport_started(), - TcpTransport = ensure_tcp_transport_started(), - ensure_tcp_transport_stopped(TcpTransport), - ensure_transport_stopped(Transport), - i("done"), - ok. - - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% -%% Start TCP transport and then create a listen socket -%% - -start_and_listen(suite) -> - []; -start_and_listen(doc) -> - []; -start_and_listen(Config) when is_list(Config) -> - - ?SKIP(not_yet_implemented), - - %% This has been changed *a lot* since it was written... - - process_flag(trap_exit, true), - Transport = ensure_transport_started(), - TcpTransport = ensure_tcp_transport_started(), - - case listen([{port, 0}]) of - {ok, Acceptor} when is_pid(Acceptor) -> - Ref = erlang:monitor(process, Acceptor), - [{Acceptor, Info}] = diameter_tcp:which_listeners(), - case lists:keysearch(socket, 1, Info) of - {value, {_, Listen}} -> - i("Listen socket: ~p" - "~n Opts: ~p" - "~n Stats: ~p" - "~n Name: ~p", - [Listen, - ok(inet:getopts(Listen, [keepalive, delay_send])), - ok(inet:getstat(Listen)), - ok(inet:sockname(Listen)) - ]), - ok; - _ -> - ?FAIL({bad_listener_info, Acceptor, Info}) - end, - Crash = simulate_crash, - exit(Acceptor, Crash), - receive - {'DOWN', Ref, process, Acceptor, Crash} -> - ?SLEEP(1000), - case diameter_tcp:which_listeners() of - [{NewAcceptor, _NewInfo}] -> - diameter_tcp_accept:stop(NewAcceptor), - ?SLEEP(1000), - case diameter_tcp:which_listeners() of - [] -> - ok; - UnexpectedListeners -> - ?FAIL({unexpected_listeners, empty, UnexpectedListeners}) - end; - UnexpectedListeners -> - ?FAIL({unexpected_listeners, non_empty, UnexpectedListeners}) - end - after 5000 -> - ?FAIL({failed_killing, Acceptor}) - end; - Error -> - ?FAIL({failed_creating_acceptor, Error}) - end, - ensure_tcp_transport_stopped(TcpTransport), - ensure_transport_stopped(Transport), - i("done"), - ok. - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% -%% TCP transport connecting -%% - -simple_connect(suite) -> - []; -simple_connect(doc) -> - []; -simple_connect(Config) when is_list(Config) -> - - ?SKIP(not_yet_implemented), - - %% This has been changed *a lot* since it was written... - - process_flag(trap_exit, true), - Transport = ensure_transport_started(), - TcpTransport = ensure_tcp_transport_started(), - {_Acceptor, Port} = ensure_tcp_listener(), - - {ok, Hostname} = inet:gethostname(), - - i("try connect"), - Opts = [{host, Hostname}, {port, Port}, {module, ?MODULE}], - Conn = case connect(Opts) of - {ok, C} -> - C; - Error -> - ?FAIL({failed_connecting, Error}) - end, - i("connected: ~p", [Conn]), - - %% Up for connect - receive - {diameter, {up, Host, Port}} -> - i("Received expected connect up (~p:~p)", [Host, Port]), - ok - after 5000 -> - ?FAIL(connect_up_confirmation_timeout) - end, - - %% Up for accept - receive - {diameter, {up, _ConnPid}} -> - i("Received expected accept up"), - ok - after 5000 -> - ?FAIL(acceptor_up_confirmation_timeout) - end, - - i("try disconnect"), - diameter_tcp:disconnect(Conn), - ensure_tcp_transport_stopped(TcpTransport), - ensure_transport_stopped(Transport), - i("done"), - ok. - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% -%% Plain start and stop of TCP transport -%% - -simple_send_and_recv(suite) -> - []; -simple_send_and_recv(doc) -> - []; -simple_send_and_recv(Config) when is_list(Config) -> - - ?SKIP(not_yet_implemented), - - %% This has been changed *a lot* since it was written... - - process_flag(trap_exit, true), - %% -------------------------------------------------- - %% Start the TCP transport sub-system - %% - - Transport = ensure_transport_started(), - TcpTransport = ensure_tcp_transport_started(), - - {_Acceptor, Port} = ensure_tcp_listener(), - - {ok, Hostname} = inet:gethostname(), - - i("try connect"), - Opts = [{host, Hostname}, {port, Port}, {module, ?MODULE}], - Conn = case connect(Opts) of - {ok, C1} -> - C1; - Error -> - ?FAIL({failed_connecting, Error}) - end, - i("connected: ~p", [Conn]), - - %% Up for connect - receive - {diameter, {up, Host, Port}} -> - i("Received expected connect up (~p:~p)", [Host, Port]), - ok - after 5000 -> - ?FAIL(connect_up_confirmation_timeout) - end, - - %% Up for accept - APid = - receive - {diameter, {up, C2}} -> - i("Received expected accept up"), - C2 - after 5000 -> - ?FAIL(acceptor_up_confirmation_timeout) - end, - - %% -------------------------------------------------- - %% Start some stuff needed for the codec to run - %% - - i("start persistent table"), - {ok, _Pers} = diameter_persistent_table:start_link(), - - i("start session"), - {ok, _Session} = diameter_session:start_link(), - - i("try decode a (DWR) message"), - Base = diameter_gen_base_rfc3588, - DWR = ['DWR', - {'Origin-Host', Hostname}, - {'Origin-Realm', "whatever-realm"}, - {'Origin-State-Id', [10]}], - - #diameter_packet{msg = Msg} = diameter_codec:encode(Base, DWR), - - - %% -------------------------------------------------- - %% Now try to send the message - %% - %% This is not the codec-test suite, so we dont really care what we - %% send, as long as it encoded/decodes correctly in the transport - %% - - i("try send from connect side"), - ok = diameter_tcp:send_message(Conn, Msg), - - %% Wait for data on Accept side - APkt = - receive - {diameter, {recv, A}} -> - i("[accept] Received expected data message: ~p", [A]), - A - after 5000 -> - ?FAIL(acceptor_up_confirmation_timeout) - end, - - %% Send the same message back, just to have something to send... - i("try send (\"reply\") from accept side"), - ok = diameter_tcp:send_message(APid, APkt), - - %% Wait for data on Connect side - receive - {diameter, {recv, B}} -> - i("[connect] Received expected data message: ~p", [B]), - ok - after 5000 -> - ?FAIL(acceptor_up_confirmation_timeout) - end, - - i("we are done - now close shop"), - diameter_session:stop(), - diameter_persistent_table:stop(), - - ensure_tcp_transport_stopped(TcpTransport), - ensure_transport_stopped(Transport), - i("done"), - ok. - - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -ensure_transport_started() -> -%% i("start diameter transport (top) supervisor"), - case diameter_transport_sup:start_link() of - {ok, TransportSup} -> - TransportSup; - Error -> - ?FAIL({failed_starting_transport_sup, Error}) - end. - -ensure_transport_stopped(Pid) when is_pid(Pid) -> -%% i("stop diameter transport (top) supervisor"), - Stop = fun(P) -> exit(P, kill) end, - ensure_stopped(Pid, Stop, failed_stopping_transport_sup). - -ensure_tcp_transport_started() -> -%% i("start diameter TCP transport"), - case diameter_tcp:start_transport() of - {ok, TcpTransport} when is_pid(TcpTransport) -> - TcpTransport; - Error -> - ?FAIL({failed_starting_transport, Error}) - end. - -ensure_tcp_transport_stopped(Pid) when is_pid(Pid) -> -%% i("stop diameter TCP transport supervisor"), - Stop = fun(P) -> diameter_tcp:stop_transport(P) end, - ensure_stopped(Pid, Stop, failed_stopping_tcp_transport). - - -ensure_tcp_listener() -> -%% i("create diameter TCP transport listen socket"), - case listen([{port, 0}]) of - {ok, Acceptor} -> - [{Acceptor, Info}] = diameter_tcp:which_listeners(), - case lists:keysearch(socket, 1, Info) of - {value, {_, Listen}} -> - {ok, Port} = inet:port(Listen), - {Acceptor, Port}; - _ -> - ?FAIL({failed_retrieving_listen_socket, Info}) - end; - Error -> - ?FAIL({failed_creating_listen_socket, Error}) - end. - - -ensure_stopped(Pid, Stop, ReasonTag) when is_pid(Pid) -> -%% i("ensure_stopped -> create monitor to ~p", [Pid]), - Ref = erlang:monitor(process, Pid), -%% i("ensure_stopped -> try stop"), - Stop(Pid), -%% i("ensure_stopped -> await DOWN message"), - receive - {'DOWN', Ref, process, Pid, _} -> -%% i("ensure_stopped -> received DOWN message"), - ok - after 5000 -> -%% i("ensure_stopped -> timeout"), - ?FAIL({ReasonTag, Pid}) - end. - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -listen(Opts) -> - diameter_tcp:listen([{module, ?MODULE} | Opts]). - -connect(Opts) -> - diameter_tcp:connect([{module, ?MODULE} | Opts]). - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -up(Pid, Host, Port) -> - Pid ! {diameter, {up, Host, Port}}, - ok. - -up(Pid) -> - Pid ! {diameter, {up, self()}}, - ok. - -recv(Pid, Pkt) -> - Pid ! {diameter, {recv, Pkt}}. - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -i(F) -> - i(F, []). - -i(F, A) -> - io:format(F ++ "~n", A). - - -ok({ok, Whatever}) -> - Whatever; -ok(Crap) -> - Crap. - - diff --git a/lib/diameter/test/diameter_transport_SUITE.erl b/lib/diameter/test/diameter_transport_SUITE.erl new file mode 100644 index 0000000000..6de6d6bad3 --- /dev/null +++ b/lib/diameter/test/diameter_transport_SUITE.erl @@ -0,0 +1,454 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% Tests of diameter_tcp/sctp as implementations of the diameter +%% transport interface. See also the sctp suite for some gen_sctp +%% related issues. +%% + +-module(diameter_transport_SUITE). + +-export([suite/0, + all/0, + init_per_suite/1, + end_per_suite/1]). + +%% testcases +-export([scramble/1, + tcp_accept/1, + tcp_connect/1, + sctp_accept/1, + sctp_connect/1]). + +-export([accept/1, + connect/1, + init/2]). + +-include_lib("kernel/include/inet_sctp.hrl"). +-include_lib("diameter/include/diameter.hrl"). +-include("diameter_ct.hrl"). + +-define(util, diameter_util). + +%% Corresponding to diameter_* transport modules. +-define(TRANSPORTS, [tcp, sctp]). + +%% Receive a message. +-define(RECV(Pat, Ret), receive Pat -> Ret end). +-define(RECV(Pat), ?RECV(Pat, now())). + +%% Or not. +-define(WAIT(Ms), receive after Ms -> now() end). + +%% Sockets are opened on the loopback address. +-define(ADDR, {127,0,0,1}). + +%% diameter_tcp doesn't use anything but host_ip_address, and that +%% only is a local address isn't configured as at transport start. +-define(SVC(Addrs), #diameter_service{capabilities + = #diameter_caps{host_ip_address + = Addrs}}). + +%% The term diameter_tcp/sctp registers after opening a listening +%% socket. This is an implementation detail that should probably be +%% replaced by some documented way of getting at the port number of +%% the listening socket, which is what we're after since we specify +%% port 0 to get something unused. +-define(TCP_LISTENER(Ref, Addr, LSock), + {diameter_tcp, listener, {Ref, {Addr, LSock}}}). +-define(SCTP_LISTENER(Ref, Addr, LSock), + {diameter_sctp, listener, {Ref, {[Addr], LSock}}}). + +%% The term we register after open a listening port with gen_tcp. +-define(TEST_LISTENER(Ref, PortNr), + {?MODULE, listen, Ref, PortNr}). + +%% Message over the transport interface. +-define(TMSG(T), {diameter, T}). + +%% Options for gen_tcp/gen_sctp. +-define(TCP_OPTS, [binary, {active, true}, {packet, 0}]). +-define(SCTP_OPTS, [binary, {active, true}, {sctp_initmsg, ?SCTP_INIT}]). + +%% Request a specific number of streams just because we can. +-define(SCTP_INIT, #sctp_initmsg{num_ostreams = 5, + max_instreams = 5}). + +%% Messages from gen_sctp. +-define(SCTP(Sock, Data), {sctp, Sock, _, _, Data}). + +%% =========================================================================== + +suite() -> + [{timetrap, {minutes, 2}}]. + +all() -> + [scramble | tc()]. + +tc() -> + [tcp_accept, + tcp_connect, + sctp_accept, + sctp_connect]. + +init_per_suite(Config) -> + ok = diameter:start(), + [{sctp, have_sctp()} | Config]. + +end_per_suite(_Config) -> + ok = diameter:stop(). + +%% =========================================================================== +%% scramble/1 + +scramble(Config) -> + [] = ?util:run(?util:scramble([{?MODULE, [F, Config]} || F <- tc()])). + +%% =========================================================================== +%% tcp_accept/1 +%% sctp_accept/1 +%% +%% diameter transport accepting, test code connecting. + +tcp_accept(_) -> + accept(tcp). + +sctp_accept(Config) -> + if_sctp(fun accept/1, Config). + +%% Start multiple accepting transport processes that are connected to +%% with an equal number of connecting processes using gen_tcp/sctp +%% directly. + +-define(PEER_COUNT, 8). + +accept(Prot) -> + T = {Prot, make_ref()}, + [] = ?util:run(?util:scramble(acc(2*?PEER_COUNT, T, []))). + +acc(0, _, Acc) -> + Acc; +acc(N, T, Acc) -> + acc(N-1, T, [{?MODULE, [init, + element(1 + N rem 2, {accept, gen_connect}), + T]} + | Acc]). + +%% =========================================================================== +%% tcp_connect/1 +%% sctp_connect/1 +%% +%% Test code accepting, diameter transport connecting. + +tcp_connect(_) -> + connect(tcp). + +sctp_connect(Config) -> + if_sctp(fun connect/1, Config). + +connect(Prot) -> + T = {Prot, make_ref()}, + [] = ?util:run([{?MODULE, [init, X, T]} || X <- [gen_accept, connect]]). + +%% =========================================================================== +%% =========================================================================== + +%% have_sctp/0 + +have_sctp() -> + try gen_sctp:open() of + {ok, Sock} -> + gen_sctp:close(Sock), + true + catch + error: badarg -> + false + end. + +%% if_sctp/2 + +if_sctp(F, Config) -> + case proplists:get_value(sctp, Config) of + true -> + F(sctp); + false -> + {skip, no_sctp} + end. + +%% init/2 + +init(accept, {Prot, Ref}) -> + %% Start an accepting transport and receive notification of a + %% connection. + TPid = start_accept(Prot, Ref), + + %% Receive a message and send it back. + <<_:8, Len:24, _/binary>> = Bin = bin(Prot, ?RECV(?TMSG({recv, P}), P)), + + Len = size(Bin), + TPid ! ?TMSG({send, Bin}), + + %% Expect the transport process to die as a result of the peer + %% closing the connection. + MRef = erlang:monitor(process, TPid), + ?RECV({'DOWN', MRef, process, _, _}); + +init(gen_connect, {Prot, Ref}) -> + %% Lookup the peer's listening socket. + {ok, PortNr} = inet:port(lsock(Prot, Ref)), + + %% Connect, send a message and receive it back. + {ok, Sock} = gen_connect(Prot, PortNr, Ref), + Bin = make_msg(), + ok = gen_send(Prot, Sock, Bin), + Bin = gen_recv(Prot, Sock); + +init(gen_accept, {Prot, Ref}) -> + %% Open a listening socket and publish the port number. + {ok, LSock} = gen_listen(Prot), + {ok, PortNr} = inet:port(LSock), + true = diameter_reg:add_new(?TEST_LISTENER(Ref, PortNr)), + + %% Accept a connection, receive a message and send it back. + {ok, Sock} = gen_accept(Prot, LSock), + Bin = gen_recv(Prot, Sock), + ok = gen_send(Prot, Sock, Bin); + +init(connect, {Prot, Ref}) -> + %% Lookup the peer's listening socket. + [{?TEST_LISTENER(_, PortNr), _}] = match(?TEST_LISTENER(Ref, '_')), + + %% Start a connecting transport and receive notification of + %% the connection. + TPid = start_connect(Prot, PortNr, Ref), + + %% Send a message and receive it back. + Bin = make_msg(), + TPid ! ?TMSG({send, Bin}), + Bin = bin(Prot, ?RECV(?TMSG({recv, P}), P)), + + %% Expect the transport process to die as a result of the peer + %% closing the connection. + MRef = erlang:monitor(process, TPid), + ?RECV({'DOWN', MRef, process, _, _}). + +lsock(sctp, Ref) -> + [{?SCTP_LISTENER(_ , _, LSock), _}] + = match(?SCTP_LISTENER(Ref, ?ADDR, '_')), + LSock; +lsock(tcp, Ref) -> + [{?TCP_LISTENER(_ , _, LSock), _}] + = match(?TCP_LISTENER(Ref, ?ADDR, '_')), + LSock. + +match(Pat) -> + case diameter_reg:match(Pat) of + [] -> + ?WAIT(50), + match(Pat); + L -> + L + end. + +bin(sctp, #diameter_packet{bin = Bin}) -> + Bin; +bin(tcp, Bin) -> + Bin. + +%% make_msg/0 +%% +%% A valid Diameter message in as far as diameter_tcp examines it, +%% the module examining the length in the Diameter header to locate +%% message boundaries. + +make_msg() -> + N = 1024, + Bin = rand_bytes(4*N), + Len = 4*(N+1), + <<1:8, Len:24, Bin/binary>>. + +%% crypto:rand_bytes/1 isn't available on all platforms (since openssl +%% isn't) so roll our own. +rand_bytes(N) -> + random:seed(now()), + rand_bytes(N, <<>>). + +rand_bytes(0, Bin) -> + Bin; +rand_bytes(N, Bin) -> + Oct = random:uniform(256) - 1, + rand_bytes(N-1, <>). + +%% =========================================================================== + +%% start_connect/3 + +start_connect(Prot, PortNr, Ref) -> + {ok, TPid, [?ADDR]} = start_connect(Prot, + {connect, Ref}, + ?SVC([]), + [{raddr, ?ADDR}, + {rport, PortNr}, + {ip, ?ADDR}, + {port, 0}]), + ?RECV(?TMSG({TPid, connected, _})), + TPid. + +start_connect(sctp, T, Svc, Opts) -> + diameter_sctp:start(T, Svc, [{sctp_initmsg, ?SCTP_INIT} | Opts]); +start_connect(tcp, T, Svc, Opts) -> + diameter_tcp:start(T, Svc, Opts). + +%% start_accept/2 +%% +%% Start transports sequentially by having each wait for a message +%% from a job in a queue before commencing. Only one transport with +%% a pending accept is started at a time since diameter_sctp currently +%% assumes (and diameter currently implements) this. + +start_accept(Prot, Ref) -> + Pid = sync(accept, Ref), + + %% Configure the same port number for transports on the same + %% reference. + PortNr = portnr(Prot, Ref), + {Mod, Opts} = tmod(Prot), + + try + {ok, TPid, [?ADDR]} = Mod:start({accept, Ref}, + ?SVC([?ADDR]), + [{port, PortNr} | Opts]), + ?RECV(?TMSG({TPid, connected})), + TPid + after + Pid ! Ref + end. + +sync(What, Ref) -> + ok = diameter_sync:cast({?MODULE, What, Ref}, + [fun lock/2, Ref, self()], + infinity, + infinity), + receive {start, Ref, Pid} -> Pid end. + +lock(Ref, Pid) -> + Pid ! {start, Ref, self()}, + erlang:monitor(process, Pid), + Ref = receive T -> T end. + +tmod(sctp) -> + {diameter_sctp, [{sctp_initmsg, ?SCTP_INIT}]}; +tmod(tcp) -> + {diameter_tcp, []}. + +portnr(sctp, Ref) -> + case diameter_reg:match(?SCTP_LISTENER(Ref, ?ADDR, '_')) of + [{?SCTP_LISTENER(_, _, LSock), _}] -> + {ok, N} = inet:port(LSock), + N; + [] -> + 0 + end; +portnr(tcp, Ref) -> + case diameter_reg:match(?TCP_LISTENER(Ref, ?ADDR, '_')) of + [{?TCP_LISTENER(_, _, LSock), _}] -> + {ok, N} = inet:port(LSock), + N; + [] -> + 0 + end. + +%% =========================================================================== + +%% gen_connect/3 + +gen_connect(Prot, PortNr, Ref) -> + Pid = sync(connect, Ref), + + %% Stagger connect attempts to avoid the situation that no + %% transport process is accepting yet. + receive after 250 -> ok end, + + try + gen_connect(Prot, PortNr) + after + Pid ! Ref + end. + +gen_connect(sctp = P, PortNr) -> + {ok, Sock} = Ok = gen_sctp:open([{ip, ?ADDR}, {port, 0} | ?SCTP_OPTS]), + ok = gen_sctp:connect_init(Sock, ?ADDR, PortNr, []), + Ok = gen_accept(P, Sock); +gen_connect(tcp, PortNr) -> + gen_tcp:connect(?ADDR, PortNr, ?TCP_OPTS). + +%% gen_listen/1 + +gen_listen(sctp) -> + {ok, Sock} = gen_sctp:open([{ip, ?ADDR}, {port, 0} | ?SCTP_OPTS]), + {gen_sctp:listen(Sock, true), Sock}; +gen_listen(tcp) -> + gen_tcp:listen(0, [{ip, ?ADDR} | ?TCP_OPTS]). + +%% gen_accept/2 + +gen_accept(sctp, Sock) -> + Assoc = ?RECV(?SCTP(Sock, {_, #sctp_assoc_change{state = comm_up, + outbound_streams = O, + inbound_streams = I, + assoc_id = A}}), + {O, I, A}), + putr(assoc, Assoc), + {ok, Sock}; +gen_accept(tcp, LSock) -> + gen_tcp:accept(LSock). + +%% gen_send/3 + +gen_send(sctp, Sock, Bin) -> + {OS, _IS, Id} = getr(assoc), + {_, _, Us} = now(), + gen_sctp:send(Sock, Id, Us rem OS, Bin); +gen_send(tcp, Sock, Bin) -> + gen_tcp:send(Sock, Bin). + +%% gen_recv/2 + +gen_recv(sctp, Sock) -> + {_OS, _IS, Id} = getr(assoc), + ?RECV(?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = Id}], Bin}), Bin); +gen_recv(tcp, Sock) -> + tcp_recv(Sock, <<>>). + +tcp_recv(_, <<_:8, Len:24, _/binary>> = Bin) + when Len =< size(Bin) -> + Bin; +tcp_recv(Sock, B) -> + receive {tcp, Sock, Bin} -> tcp_recv(Sock, <>) end. + +%% putr/2 + +putr(Key, Val) -> + put({?MODULE, Key}, Val). + +%% getr/1 + +getr(Key) -> + get({?MODULE, Key}). diff --git a/lib/diameter/test/modules.mk b/lib/diameter/test/modules.mk index daf6ef64ec..218a70017a 100644 --- a/lib/diameter/test/modules.mk +++ b/lib/diameter/test/modules.mk @@ -31,7 +31,9 @@ MODULES = \ diameter_reg_SUITE \ diameter_sync_SUITE \ diameter_stats_SUITE \ - diameter_watchdog_SUITE + diameter_watchdog_SUITE \ + diameter_transport_SUITE \ + diameter_gen_sctp_SUITE INTERNAL_HRL_FILES = \ diameter_ct.hrl -- cgit v1.2.3