aboutsummaryrefslogblamecommitdiffstats
path: root/lib/diameter/test/diameter_transport_SUITE.erl
blob: 23b972a9a0543c140246b03e1d53d0d4a376e013 (plain) (tree)
1
2
3
4


                   
                                                        
















                                                                         
                       





                                  
                  



                           

                      

                       
                        
                                  
                  





                                             
                         









                                                 








                                                                      























                                                                              



                              

           
                      




                 

                  

                         
                                         

                         







                                                                              


                                                                              








                                                      



                                              




























                                                                              



                                              





                                                                              















                                                                   
                                                   

                                     
                                                     

                     
                                  






                                                                   
                                                                    





                                                                       
                                                














































                                                                              

                                                                              



















                                                                            
                                      

                                                   
                                           
















                                                               

                                                      














                                                                  



















































                                                                              


                                                                     


                            




                                                      
                                                            






















                                                      

                                                                              
                




























































                                                                             
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%

%%
%% Tests of diameter_tcp/sctp as implementations of the diameter
%% transport interface.
%%

-module(diameter_transport_SUITE).

-export([suite/0,
         all/0,
         groups/0,
         init_per_suite/1,
         end_per_suite/1]).

%% testcases
-export([start/1,
         tcp_accept/1,
         tcp_connect/1,
         sctp_accept/1,
         sctp_connect/1,
         reconnect/1, reconnect/0,
         stop/1]).

-export([accept/1,
         connect/1,
         init/2]).

-include_lib("kernel/include/inet_sctp.hrl").
-include("diameter.hrl").

-define(util, diameter_util).

%% Corresponding to diameter_* transport modules.
-define(TRANSPORTS, [tcp, sctp]).

%% Receive a message.
-define(RECV(Pat, Ret), receive Pat -> Ret end).
-define(RECV(Pat), ?RECV(Pat, now())).

%% Sockets are opened on the loopback address.
-define(ADDR, {127,0,0,1}).

%% diameter_tcp doesn't use anything but host_ip_address, and that
%% only is a local address isn't configured as at transport start.
-define(SVC(Addrs), #diameter_service{capabilities
                                      = #diameter_caps{host_ip_address
                                                       = Addrs}}).

%% The term we register after open a listening port with gen_tcp.
-define(TEST_LISTENER(Ref, PortNr),
        {?MODULE, listen, Ref, PortNr}).

%% Message over the transport interface.
-define(TMSG(T), {diameter, T}).

%% Options for gen_tcp/gen_sctp.
-define(TCP_OPTS,  [binary, {active, true}, {packet, 0}]).
-define(SCTP_OPTS, [binary, {active, true}, {sctp_initmsg, ?SCTP_INIT}]).

%% Request a specific number of streams just because we can.
-define(SCTP_INIT, #sctp_initmsg{num_ostreams = 5,
                                 max_instreams = 5}).

%% Messages from gen_sctp.
-define(SCTP(Sock, Data), {sctp, Sock, _, _, Data}).

%% ===========================================================================

suite() ->
    [{timetrap, {minutes, 2}}].

all() ->
    [start,
     {group, all},
     {group, all, [parallel]},
     stop].

groups() ->
    [{all, [], tc()}].

tc() ->
    [tcp_accept,
     tcp_connect,
     sctp_accept,
     sctp_connect,
     reconnect].

init_per_suite(Config) ->
    [{sctp, ?util:have_sctp()} | Config].

end_per_suite(_Config) ->
    ok.

%% ===========================================================================

start(_Config) ->
    ok = diameter:start().

stop(_Config) ->
    ok = diameter:stop().

%% ===========================================================================
%% tcp_accept/1
%% sctp_accept/1
%%
%% diameter transport accepting, test code connecting.

tcp_accept(_) ->
    accept(tcp).

sctp_accept(Config) ->
    case lists:member({sctp, true}, Config) of
        true  -> accept(sctp);
        false -> {skip, no_sctp}
    end.

%% Start multiple accepting transport processes that are connected to
%% with an equal number of connecting processes using gen_tcp/sctp
%% directly.

-define(PEER_COUNT, 8).

accept(Prot) ->
    T = {Prot, make_ref()},
    [] = ?util:run(?util:scramble(acc(2*?PEER_COUNT, T, []))).

acc(0, _, Acc) ->
    Acc;
acc(N, T, Acc) ->
    acc(N-1, T, [{?MODULE, [init,
                            element(1 + N rem 2, {accept, gen_connect}),
                            T]}
                 | Acc]).

%% ===========================================================================
%% tcp_connect/1
%% sctp_connect/1
%%
%% Test code accepting, diameter transport connecting.

tcp_connect(_) ->
    connect(tcp).

sctp_connect(Config) ->
    case lists:member({sctp, true}, Config) of
        true  -> connect(sctp);
        false -> {skip, no_sctp}
    end.

connect(Prot) ->
    T = {Prot, make_ref()},
    [] = ?util:run([{?MODULE, [init, X, T]} || X <- [gen_accept, connect]]).

%% ===========================================================================
%% reconnect/1
%%
%% Exercise reconnection behaviour: that a connecting transport
%% doesn't try to establish a new connection until the old one is
%% broken.

reconnect() ->
    [{timetrap, {minutes, 4}}].

reconnect({listen, Ref}) ->
    SvcName = make_ref(),
    ok = start_service(SvcName),
    LRef = ?util:listen(SvcName, tcp, [{watchdog_timer, 6000}]),
    [_] = diameter_reg:wait({diameter_tcp, listener, {LRef, '_'}}),
    true = diameter_reg:add_new({?MODULE, Ref, LRef}),

    %% Wait for partner to request transport death.
    TPid = abort(SvcName, LRef, Ref),

    %% Kill transport to force the peer to reconnect.
    exit(TPid, kill),

    %% Wait for the partner again.
    abort(SvcName, LRef, Ref);

reconnect({connect, Ref}) ->
    SvcName = make_ref(),
    true = diameter:subscribe(SvcName),
    ok = start_service(SvcName),
    [{{_, _, LRef}, Pid}] = diameter_reg:wait({?MODULE, Ref, '_'}),
    CRef = ?util:connect(SvcName, tcp, LRef, [{connect_timer, 2000},
                                              {watchdog_timer, 6000}]),

    %% Tell partner to kill transport after seeing that there are no
    %% reconnection attempts.
    abort(SvcName, Pid, Ref),

    %% Transport goes down and is reestablished.
    ?RECV(#diameter_event{service = SvcName, info = {down, CRef, _, _}}),
    ?RECV(#diameter_event{service = SvcName, info = {reconnect, CRef, _}}),
    ?RECV(#diameter_event{service = SvcName, info = {up, CRef, _, _, _}}),

    %% Kill again.
    abort(SvcName, Pid, Ref),

    %% Wait for partner to die.
    MRef = erlang:monitor(process, Pid),
    ?RECV({'DOWN', MRef, process, _, _});

reconnect(_) ->
    Ref = make_ref(),
    [] = ?util:run([{?MODULE, [reconnect, {T, Ref}]}
                    || T <- [listen, connect]]).

start_service(SvcName) ->
    OH = io_lib:format("~p-~p-~p", tuple_to_list(now())),
    Opts = [{application, [{dictionary, diameter_gen_base_rfc6733},
                           {module, diameter_callback}]},
            {'Origin-Host', OH},
            {'Origin-Realm', OH ++ ".org"},
            {'Vendor-Id', 0},
            {'Product-Name', "x"},
            {'Auth-Application-Id', [0]}],
    diameter:start_service(SvcName, Opts).

abort(SvcName, Pid, Ref)
  when is_pid(Pid) ->
    receive
        #diameter_event{service = SvcName, info = {reconnect, _, _}} = E ->
            erlang:error(E)
    after 45000 ->
            ok
    end,
    Pid ! {abort, Ref};

abort(SvcName, LRef, Ref)
  when is_reference(LRef) ->
    ?RECV({abort, Ref}),
    [[{ref, LRef}, {type, listen}, {options, _}, {accept, [_,_] = Ts} | _]]
                                                 %% assert on two accepting
        = diameter:service_info(SvcName, transport),
    [TPid] = [P || [{watchdog, {_,_,okay}}, {peer, {P,_}} | _] <- Ts],
    TPid.

%% ===========================================================================
%% ===========================================================================

%% init/2

init(accept, {Prot, Ref}) ->
    %% Start an accepting transport and receive notification of a
    %% connection.
    TPid = start_accept(Prot, Ref),

    %% Receive a message and send it back.
    <<_:8, Len:24, _/binary>> = Bin = bin(Prot, ?RECV(?TMSG({recv, P}), P)),

    Len = size(Bin),
    TPid ! ?TMSG({send, Bin}),

    %% Expect the transport process to die as a result of the peer
    %% closing the connection.
    MRef = erlang:monitor(process, TPid),
    ?RECV({'DOWN', MRef, process, _, _});

init(gen_connect, {Prot, Ref}) ->
    %% Lookup the peer's listening socket.
    [PortNr] = ?util:lport(Prot, Ref),

    %% Connect, send a message and receive it back.
    {ok, Sock} = gen_connect(Prot, PortNr),
    Bin = make_msg(),
    ok = gen_send(Prot, Sock, Bin),
    Bin = gen_recv(Prot, Sock);

init(gen_accept, {Prot, Ref}) ->
    %% Open a listening socket and publish the port number.
    {ok, LSock} = gen_listen(Prot),
    {ok, PortNr} = inet:port(LSock),
    true = diameter_reg:add_new(?TEST_LISTENER(Ref, PortNr)),

    %% Accept a connection, receive a message and send it back.
    {ok, Sock} = gen_accept(Prot, LSock),
    Bin = gen_recv(Prot, Sock),
    ok = gen_send(Prot, Sock, Bin);

init(connect, {Prot, Ref}) ->
    %% Lookup the peer's listening socket.
    [{?TEST_LISTENER(_, PortNr), _}]
        = diameter_reg:wait(?TEST_LISTENER(Ref, '_')),

    %% Start a connecting transport and receive notification of
    %% the connection.
    TPid = start_connect(Prot, PortNr, Ref),

    %% Send a message and receive it back.
    Bin = make_msg(),
    TPid ! ?TMSG({send, Bin}),
    Bin = bin(Prot, ?RECV(?TMSG({recv, P}), P)),

    %% Expect the transport process to die as a result of the peer
    %% closing the connection.
    MRef = erlang:monitor(process, TPid),
    ?RECV({'DOWN', MRef, process, _, _}).

bin(sctp, #diameter_packet{bin = Bin}) ->
    Bin;
bin(tcp, Bin) ->
    Bin.

%% make_msg/0
%%
%% A valid Diameter message in as far as diameter_tcp examines it,
%% the module examining the length in the Diameter header to locate
%% message boundaries.

make_msg() ->
    N = 1024,
    Bin = rand_bytes(4*N),
    Len = 4*(N+1),
    <<1:8, Len:24, Bin/binary>>.

%% crypto:rand_bytes/1 isn't available on all platforms (since openssl
%% isn't) so roll our own.
rand_bytes(N) ->
    random:seed(now()),
    rand_bytes(N, <<>>).

rand_bytes(0, Bin) ->
    Bin;
rand_bytes(N, Bin) ->
    Oct = random:uniform(256) - 1,
    rand_bytes(N-1, <<Oct, Bin/binary>>).

%% ===========================================================================

%% start_connect/3

start_connect(Prot, PortNr, Ref) ->
    {ok, TPid, [?ADDR]} = start_connect(Prot,
                                        {connect, Ref},
                                        ?SVC([]),
                                        [{raddr, ?ADDR},
                                         {rport, PortNr},
                                         {ip, ?ADDR},
                                         {port, 0}]),
    ?RECV(?TMSG({TPid, connected, _})),
    TPid.

start_connect(sctp, T, Svc, Opts) ->
    diameter_sctp:start(T, Svc, [{sctp_initmsg, ?SCTP_INIT} | Opts]);
start_connect(tcp, T, Svc, Opts) ->
    diameter_tcp:start(T, Svc, Opts).

%% start_accept/2
%%
%% Start transports sequentially by having each wait for a message
%% from a job in a queue before commencing. Only one transport with a
%% pending accept is started at a time since diameter_{tcp,sctp}
%% currently assume (and diameter currently implements) this.

start_accept(Prot, Ref) ->
    Pid = sync(accept, Ref),
    {Mod, Opts} = tmod(Prot),

    try
        {ok, TPid, [?ADDR]} = Mod:start({accept, Ref},
                                        ?SVC([?ADDR]),
                                        [{port, 0} | Opts]),
        ?RECV(?TMSG({TPid, connected})),
        TPid
    after
        Pid ! Ref
    end.

sync(What, Ref) ->
    ok = diameter_sync:cast({?MODULE, What, Ref},
                            [fun lock/2, Ref, self()],
                            infinity,
                            infinity),
    receive {start, Ref, Pid} -> Pid end.

lock(Ref, Pid) ->
    Pid ! {start, Ref, self()},
    erlang:monitor(process, Pid),
    Ref = receive T -> T end.

tmod(sctp) ->
    {diameter_sctp, [{sctp_initmsg, ?SCTP_INIT}]};
tmod(tcp) ->
    {diameter_tcp, []}.

%% ===========================================================================

%% gen_connect/2

gen_connect(sctp = P, PortNr) ->
    {ok, Sock} = Ok = gen_sctp:open([{ip, ?ADDR}, {port, 0} | ?SCTP_OPTS]),
    ok = gen_sctp:connect_init(Sock, ?ADDR, PortNr, []),
    Ok = gen_accept(P, Sock);
gen_connect(tcp, PortNr) ->
    gen_tcp:connect(?ADDR, PortNr, ?TCP_OPTS).

%% gen_listen/1

gen_listen(sctp) ->
    {ok, Sock} = gen_sctp:open([{ip, ?ADDR}, {port, 0} | ?SCTP_OPTS]),
    {gen_sctp:listen(Sock, true), Sock};
gen_listen(tcp) ->
    gen_tcp:listen(0, [{ip, ?ADDR} | ?TCP_OPTS]).

%% gen_accept/2

gen_accept(sctp, Sock) ->
    Assoc = ?RECV(?SCTP(Sock, {_, #sctp_assoc_change{state = comm_up,
                                                     outbound_streams = O,
                                                     inbound_streams = I,
                                                     assoc_id = A}}),
                  {O, I, A}),
    putr(assoc, Assoc),
    {ok, Sock};
gen_accept(tcp, LSock) ->
    gen_tcp:accept(LSock).

%% gen_send/3

gen_send(sctp, Sock, Bin) ->
    {OS, _IS, Id} = getr(assoc),
    {_, _, Us} = now(),
    gen_sctp:send(Sock, Id, Us rem OS, Bin);
gen_send(tcp, Sock, Bin) ->
    gen_tcp:send(Sock, Bin).

%% gen_recv/2

gen_recv(sctp, Sock) ->
    {_OS, _IS, Id} = getr(assoc),
    ?RECV(?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = Id}], Bin}), Bin);
gen_recv(tcp, Sock) ->
    tcp_recv(Sock, <<>>).

tcp_recv(_, <<_:8, Len:24, _/binary>> = Bin)
  when Len =< size(Bin) ->
    Bin;
tcp_recv(Sock, B) ->
    receive {tcp, Sock, Bin} -> tcp_recv(Sock, <<B/binary, Bin/binary>>) end.

%% putr/2

putr(Key, Val) ->
    put({?MODULE, Key}, Val).

%% getr/1

getr(Key) ->
    get({?MODULE, Key}).