%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%%
%% Tests of diameter_tcp/sctp as implementations of the diameter
%% transport interface.
%%
-module(diameter_transport_SUITE).
-export([suite/0,
all/0,
groups/0,
init_per_suite/1,
end_per_suite/1]).
%% testcases
-export([start/1,
tcp_accept/1,
tcp_connect/1,
sctp_accept/1,
sctp_connect/1,
reconnect/1, reconnect/0,
stop/1]).
-export([accept/1,
connect/1,
init/2]).
-include_lib("kernel/include/inet_sctp.hrl").
-include("diameter.hrl").
-define(util, diameter_util).
%% Corresponding to diameter_* transport modules.
-define(TRANSPORTS, [tcp, sctp]).
%% Receive a message.
-define(RECV(Pat, Ret), receive Pat -> Ret end).
-define(RECV(Pat), ?RECV(Pat, now())).
%% Sockets are opened on the loopback address.
-define(ADDR, {127,0,0,1}).
%% diameter_tcp doesn't use anything but host_ip_address, and that
%% only is a local address isn't configured as at transport start.
-define(SVC(Addrs), #diameter_service{capabilities
= #diameter_caps{host_ip_address
= Addrs}}).
%% The term we register after open a listening port with gen_tcp.
-define(TEST_LISTENER(Ref, PortNr),
{?MODULE, listen, Ref, PortNr}).
%% Message over the transport interface.
-define(TMSG(T), {diameter, T}).
%% Options for gen_tcp/gen_sctp.
-define(TCP_OPTS, [binary, {active, true}, {packet, 0}]).
-define(SCTP_OPTS, [binary, {active, true}, {sctp_initmsg, ?SCTP_INIT}]).
%% Request a specific number of streams just because we can.
-define(SCTP_INIT, #sctp_initmsg{num_ostreams = 5,
max_instreams = 5}).
%% Messages from gen_sctp.
-define(SCTP(Sock, Data), {sctp, Sock, _, _, Data}).
%% ===========================================================================
suite() ->
[{timetrap, {minutes, 2}}].
all() ->
[start,
{group, all},
{group, all, [parallel]},
stop].
groups() ->
[{all, [], tc()}].
tc() ->
[tcp_accept,
tcp_connect,
sctp_accept,
sctp_connect,
reconnect].
init_per_suite(Config) ->
[{sctp, have_sctp()} | Config].
end_per_suite(_Config) ->
ok.
%% ===========================================================================
start(_Config) ->
ok = diameter:start().
stop(_Config) ->
ok = diameter:stop().
%% ===========================================================================
%% tcp_accept/1
%% sctp_accept/1
%%
%% diameter transport accepting, test code connecting.
tcp_accept(_) ->
accept(tcp).
sctp_accept(Config) ->
if_sctp(fun accept/1, Config).
%% Start multiple accepting transport processes that are connected to
%% with an equal number of connecting processes using gen_tcp/sctp
%% directly.
-define(PEER_COUNT, 8).
accept(Prot) ->
T = {Prot, make_ref()},
[] = ?util:run(?util:scramble(acc(2*?PEER_COUNT, T, []))).
acc(0, _, Acc) ->
Acc;
acc(N, T, Acc) ->
acc(N-1, T, [{?MODULE, [init,
element(1 + N rem 2, {accept, gen_connect}),
T]}
| Acc]).
%% ===========================================================================
%% tcp_connect/1
%% sctp_connect/1
%%
%% Test code accepting, diameter transport connecting.
tcp_connect(_) ->
connect(tcp).
sctp_connect(Config) ->
if_sctp(fun connect/1, Config).
connect(Prot) ->
T = {Prot, make_ref()},
[] = ?util:run([{?MODULE, [init, X, T]} || X <- [gen_accept, connect]]).
%% ===========================================================================
%% reconnect/1
%%
%% Exercise reconnection behaviour: that a connecting transport
%% doesn't try to establish a new connection until the old one is
%% broken.
reconnect() ->
[{timetrap, {minutes, 4}}].
reconnect({listen, Ref}) ->
SvcName = make_ref(),
ok = start_service(SvcName),
LRef = ?util:listen(SvcName, tcp, [{watchdog_timer, 6000}]),
[_] = diameter_reg:wait({diameter_tcp, listener, {LRef, '_'}}),
true = diameter_reg:add_new({?MODULE, Ref, LRef}),
%% Wait for partner to request transport death.
TPid = abort(SvcName, LRef, Ref),
%% Kill transport to force the peer to reconnect.
exit(TPid, kill),
%% Wait for the partner again.
abort(SvcName, LRef, Ref);
reconnect({connect, Ref}) ->
SvcName = make_ref(),
true = diameter:subscribe(SvcName),
ok = start_service(SvcName),
[{{_, _, LRef}, Pid}] = diameter_reg:wait({?MODULE, Ref, '_'}),
CRef = ?util:connect(SvcName, tcp, LRef, [{connect_timer, 2000},
{watchdog_timer, 6000}]),
%% Tell partner to kill transport after seeing that there are no
%% reconnection attempts.
abort(SvcName, Pid, Ref),
%% Transport goes down and is reestablished.
?RECV(#diameter_event{service = SvcName, info = {down, CRef, _, _}}),
?RECV(#diameter_event{service = SvcName, info = {reconnect, CRef, _}}),
?RECV(#diameter_event{service = SvcName, info = {up, CRef, _, _, _}}),
%% Kill again.
abort(SvcName, Pid, Ref),
%% Wait for partner to die.
MRef = erlang:monitor(process, Pid),
?RECV({'DOWN', MRef, process, _, _});
reconnect(_) ->
Ref = make_ref(),
[] = ?util:run([{?MODULE, [reconnect, {T, Ref}]}
|| T <- [listen, connect]]).
start_service(SvcName) ->
OH = io_lib:format("~p-~p-~p", tuple_to_list(now())),
Opts = [{application, [{dictionary, diameter_gen_base_rfc6733},
{module, diameter_callback}]},
{'Origin-Host', OH},
{'Origin-Realm', OH ++ ".org"},
{'Vendor-Id', 0},
{'Product-Name', "x"},
{'Auth-Application-Id', [0]}],
diameter:start_service(SvcName, Opts).
abort(SvcName, Pid, Ref)
when is_pid(Pid) ->
receive
#diameter_event{service = SvcName, info = {reconnect, _, _}} = E ->
erlang:error(E)
after 45000 ->
ok
end,
Pid ! {abort, Ref};
abort(SvcName, LRef, Ref)
when is_reference(LRef) ->
?RECV({abort, Ref}),
[[{ref, LRef}, {type, listen}, {options, _}, {accept, [_,_] = Ts} | _]]
%% assert on two accepting
= diameter:service_info(SvcName, transport),
[TPid] = [P || [{watchdog, {_,_,okay}}, {peer, {P,_}} | _] <- Ts],
TPid.
%% ===========================================================================
%% ===========================================================================
%% have_sctp/0
have_sctp() ->
case gen_sctp:open() of
{ok, Sock} ->
gen_sctp:close(Sock),
true;
{error, E} when E == eprotonosupport;
E == esocktnosupport -> %% fail on any other reason
false
end.
%% if_sctp/2
if_sctp(F, Config) ->
case proplists:get_value(sctp, Config) of
true ->
F(sctp);
false ->
{skip, no_sctp}
end.
%% init/2
init(accept, {Prot, Ref}) ->
%% Start an accepting transport and receive notification of a
%% connection.
TPid = start_accept(Prot, Ref),
%% Receive a message and send it back.
<<_:8, Len:24, _/binary>> = Bin = bin(Prot, ?RECV(?TMSG({recv, P}), P)),
Len = size(Bin),
TPid ! ?TMSG({send, Bin}),
%% Expect the transport process to die as a result of the peer
%% closing the connection.
MRef = erlang:monitor(process, TPid),
?RECV({'DOWN', MRef, process, _, _});
init(gen_connect, {Prot, Ref}) ->
%% Lookup the peer's listening socket.
[PortNr] = ?util:lport(Prot, Ref),
%% Connect, send a message and receive it back.
{ok, Sock} = gen_connect(Prot, PortNr),
Bin = make_msg(),
ok = gen_send(Prot, Sock, Bin),
Bin = gen_recv(Prot, Sock);
init(gen_accept, {Prot, Ref}) ->
%% Open a listening socket and publish the port number.
{ok, LSock} = gen_listen(Prot),
{ok, PortNr} = inet:port(LSock),
true = diameter_reg:add_new(?TEST_LISTENER(Ref, PortNr)),
%% Accept a connection, receive a message and send it back.
{ok, Sock} = gen_accept(Prot, LSock),
Bin = gen_recv(Prot, Sock),
ok = gen_send(Prot, Sock, Bin);
init(connect, {Prot, Ref}) ->
%% Lookup the peer's listening socket.
[{?TEST_LISTENER(_, PortNr), _}]
= diameter_reg:wait(?TEST_LISTENER(Ref, '_')),
%% Start a connecting transport and receive notification of
%% the connection.
TPid = start_connect(Prot, PortNr, Ref),
%% Send a message and receive it back.
Bin = make_msg(),
TPid ! ?TMSG({send, Bin}),
Bin = bin(Prot, ?RECV(?TMSG({recv, P}), P)),
%% Expect the transport process to die as a result of the peer
%% closing the connection.
MRef = erlang:monitor(process, TPid),
?RECV({'DOWN', MRef, process, _, _}).
bin(sctp, #diameter_packet{bin = Bin}) ->
Bin;
bin(tcp, Bin) ->
Bin.
%% make_msg/0
%%
%% A valid Diameter message in as far as diameter_tcp examines it,
%% the module examining the length in the Diameter header to locate
%% message boundaries.
make_msg() ->
N = 1024,
Bin = rand_bytes(4*N),
Len = 4*(N+1),
<<1:8, Len:24, Bin/binary>>.
%% crypto:rand_bytes/1 isn't available on all platforms (since openssl
%% isn't) so roll our own.
rand_bytes(N) ->
random:seed(now()),
rand_bytes(N, <<>>).
rand_bytes(0, Bin) ->
Bin;
rand_bytes(N, Bin) ->
Oct = random:uniform(256) - 1,
rand_bytes(N-1, <<Oct, Bin/binary>>).
%% ===========================================================================
%% start_connect/3
start_connect(Prot, PortNr, Ref) ->
{ok, TPid, [?ADDR]} = start_connect(Prot,
{connect, Ref},
?SVC([]),
[{raddr, ?ADDR},
{rport, PortNr},
{ip, ?ADDR},
{port, 0}]),
?RECV(?TMSG({TPid, connected, _})),
TPid.
start_connect(sctp, T, Svc, Opts) ->
diameter_sctp:start(T, Svc, [{sctp_initmsg, ?SCTP_INIT} | Opts]);
start_connect(tcp, T, Svc, Opts) ->
diameter_tcp:start(T, Svc, Opts).
%% start_accept/2
%%
%% Start transports sequentially by having each wait for a message
%% from a job in a queue before commencing. Only one transport with a
%% pending accept is started at a time since diameter_{tcp,sctp}
%% currently assume (and diameter currently implements) this.
start_accept(Prot, Ref) ->
Pid = sync(accept, Ref),
{Mod, Opts} = tmod(Prot),
try
{ok, TPid, [?ADDR]} = Mod:start({accept, Ref},
?SVC([?ADDR]),
[{port, 0} | Opts]),
?RECV(?TMSG({TPid, connected})),
TPid
after
Pid ! Ref
end.
sync(What, Ref) ->
ok = diameter_sync:cast({?MODULE, What, Ref},
[fun lock/2, Ref, self()],
infinity,
infinity),
receive {start, Ref, Pid} -> Pid end.
lock(Ref, Pid) ->
Pid ! {start, Ref, self()},
erlang:monitor(process, Pid),
Ref = receive T -> T end.
tmod(sctp) ->
{diameter_sctp, [{sctp_initmsg, ?SCTP_INIT}]};
tmod(tcp) ->
{diameter_tcp, []}.
%% ===========================================================================
%% gen_connect/2
gen_connect(sctp = P, PortNr) ->
{ok, Sock} = Ok = gen_sctp:open([{ip, ?ADDR}, {port, 0} | ?SCTP_OPTS]),
ok = gen_sctp:connect_init(Sock, ?ADDR, PortNr, []),
Ok = gen_accept(P, Sock);
gen_connect(tcp, PortNr) ->
gen_tcp:connect(?ADDR, PortNr, ?TCP_OPTS).
%% gen_listen/1
gen_listen(sctp) ->
{ok, Sock} = gen_sctp:open([{ip, ?ADDR}, {port, 0} | ?SCTP_OPTS]),
{gen_sctp:listen(Sock, true), Sock};
gen_listen(tcp) ->
gen_tcp:listen(0, [{ip, ?ADDR} | ?TCP_OPTS]).
%% gen_accept/2
gen_accept(sctp, Sock) ->
Assoc = ?RECV(?SCTP(Sock, {_, #sctp_assoc_change{state = comm_up,
outbound_streams = O,
inbound_streams = I,
assoc_id = A}}),
{O, I, A}),
putr(assoc, Assoc),
{ok, Sock};
gen_accept(tcp, LSock) ->
gen_tcp:accept(LSock).
%% gen_send/3
gen_send(sctp, Sock, Bin) ->
{OS, _IS, Id} = getr(assoc),
{_, _, Us} = now(),
gen_sctp:send(Sock, Id, Us rem OS, Bin);
gen_send(tcp, Sock, Bin) ->
gen_tcp:send(Sock, Bin).
%% gen_recv/2
gen_recv(sctp, Sock) ->
{_OS, _IS, Id} = getr(assoc),
?RECV(?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = Id}], Bin}), Bin);
gen_recv(tcp, Sock) ->
tcp_recv(Sock, <<>>).
tcp_recv(_, <<_:8, Len:24, _/binary>> = Bin)
when Len =< size(Bin) ->
Bin;
tcp_recv(Sock, B) ->
receive {tcp, Sock, Bin} -> tcp_recv(Sock, <<B/binary, Bin/binary>>) end.
%% putr/2
putr(Key, Val) ->
put({?MODULE, Key}, Val).
%% getr/1
getr(Key) ->
get({?MODULE, Key}).