%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%

%%
%% Tests of the RFC3539 watchdog state machine as implemented by
%% module diameter_watchdog.
%%

-module(diameter_watchdog_SUITE).

-export([suite/0,
         all/0,
         init_per_suite/1,
         end_per_suite/1]).

%% testcases
-export([reopen/0, reopen/1, reopen/4, reopen/6,
         suspect/1, suspect/4,
         okay/1, okay/4]).

-export([id/1,    %% jitter callback
         run1/1,
         abuse/1,
         abuse/2]).

%% diameter_app callbacks
-export([peer_up/3,
         peer_down/3]).

%% gen_tcp-ish interface
-export([listen/2,
         accept/1,
         connect/3,
         send/2,
         setopts/2,
         close/1]).

-include("diameter.hrl").
-include("diameter_ct.hrl").

%% ===========================================================================

-define(util, diameter_util).

-define(BASE, ?DIAMETER_DICT_COMMON).
-define(REALM, "erlang.org").
-define(ADDR, {127,0,0,1}).

%% Config for diameter:start_service/2.
-define(SERVICE(Name),
        [{'Origin-Host', Name ++ "." ++ ?REALM},
         {'Origin-Realm', ?REALM},
         {'Host-IP-Address', [?ADDR]},
         {'Vendor-Id', 42},
         {'Product-Name', "OTP/diameter"},
         {'Auth-Application-Id', [0 = ?BASE:id()]},
         {application, [{alias, Name},
                        {dictionary, ?BASE},
                        {module, ?MODULE}]}]).

%% Watchdog timer as a callback.
-define(WD(T), {?MODULE, id, [T]}).

%% Watchdog timers used by the testcases.
-define(WD_TIMERS, [10000, ?WD(10000)]).

%% Watchdog timer of the misbehaving node.
-define(PEER_WD, 10000).

%% A timeout that ensures one watchdog. To ensure only one watchdog
%% requires (Wd + 2000) + 1000 < 2*(Wd - 2000) ==> 7000 < Wd for the
%% case with random jitter.
-define(ONE_WD(Wd), jitter(Wd,2000) + 1000).
-define(INFO(T), #diameter_event{info = T}).

%% Receive an event message from diameter.
-define(EVENT(T),    %% apply to not bind T_
        apply(fun() ->
                      receive ?INFO(T = T_) -> log_event(T_) end
              end,
              [])).

%% Receive a watchdog event.
-define(WD_EVENT(Ref), log_wd(element(4, ?EVENT({watchdog, Ref, _, _, _})))).
-define(WD_EVENT(Ref, Ms),
        apply(fun() ->
                      receive ?INFO({watchdog, Ref, _, T_, _}) ->
                              log_wd(T_)
                      after Ms ->
                              false
                      end
              end,
              [])).

%% Log to make failures identifiable.
-define(LOG(T),     ?LOG("~p", [T])).
-define(LOG(F,A),   ct:pal("~p: " ++ F, [self() | A])).
-define(WARN(F,A),  ct:pal(error, "~p: " ++ F, [self() | A])).

%% ===========================================================================

suite() ->
    [{timetrap, {seconds, 90}}].

all() ->
    [reopen,
     suspect,
     okay].

init_per_suite(Config) ->
    ok = diameter:start(),
    Config.

end_per_suite(_Config) ->
    ok = diameter:stop().

%% ===========================================================================
%% # reopen/1
%% ===========================================================================

%% Test the watchdog state machine for the required failover, failback
%% and reopen behaviour by examining watchdog events.

reopen() ->
    [{timetrap, {minutes, 5}}]. %% 20 watchdogs @ 15 sec

reopen(_) ->
    [] = run([[reopen, T, W, N, M]
              || T <- [listen, connect], %% watchdog to test
                 W <- ?WD_TIMERS,        %% watchdog_timer value
                 N <- [0,1,2],           %% DWR's to answer before ignoring
                 M <- ['DWR', 'DWA', 'RAA']]). %% how to induce failback

reopen(Test, Wd, N, M) ->
    %% Publish a ref ensure the connecting transport is added only
    %% once events from the listening transport are subscribed to.
    Ref = make_ref(),
    [] = run([[reopen, T, Test, Ref, Wd, N, M] || T <- [listen, connect]]).

%% reopen/6

reopen(Type, Test, Ref, Wd, N, M) ->
    {SvcName, TRef} = start(Type, Ref, cfg(Type, Test, Wd)),
    reopen(Type, Test, SvcName, TRef, Wd, N, M).

cfg(Type, Type, Wd) ->
    {Wd, [], []};
cfg(_Type, _Test, _Wd) ->
    {?WD(?PEER_WD), [{okay, 0}], [{module, ?MODULE}]}.

%% reopen/7

%% The watchdog to be tested.
reopen(Type, Type, SvcName, Ref, Wd, N, M) ->
    ?LOG("node ~p", [[Type, SvcName, Ref, Wd, N, M]]),

    %% Connection should come up immediately as a consequence of
    %% starting the watchdog process. In the accepting case this
    %% results in a new watchdog on a transport waiting for a new
    %% connection.

    {initial, okay} = ?WD_EVENT(Ref),
    ?EVENT({up, Ref, _, _, #diameter_packet{}}),

    %%   OKAY          Timer expires &      Failover()
    %%                 Pending              SetWatchdog()        SUSPECT
    %%
    %% The peer replies to N DWR's before becoming silent, we should
    %% go down after N+2 watchdog_timer expirations: that is, after
    %% the first unanswered DWR. Knowing the min/max watchdog timeout
    %% values gives the time interval in which the event is expected.

    [0,0,0,0] = wd_counts(SvcName),

    {okay, suspect} = ?WD_EVENT(Ref),
    ?EVENT({down, Ref, _, _}),

    %% N received DWA's
    [_,_,_,N] = wd_counts(SvcName),

    %%   SUSPECT       Receive DWA          Pending = FALSE
    %%                                      Failback()
    %%                                      SetWatchdog()        OKAY
    %%
    %%   SUSPECT       Receive non-DWA      Failback()
    %%                                      SetWatchdog()        OKAY
    %%
    %% The peer sends a message before the expiry of another watchdog
    %% to induce failback.

    {suspect, okay} = ?WD_EVENT(Ref),
    ?EVENT({up, Ref, _, _}),

    %% N+1 sent DWR's, N/N+1 received DWA's
    R1 = N+1,
    A1 = choose(M == 'DWA', R1, N),
    [R1,_,_,A1] = wd_counts(SvcName),

    %%   OKAY          Timer expires &      SendWatchdog()
    %%                 !Pending             SetWatchdog()
    %%                                      Pending = TRUE       OKAY
    %%
    %%   OKAY          Timer expires &      Failover()
    %%                 Pending              SetWatchdog()        SUSPECT
    %%
    %% The peer is now ignoring all watchdogs so the connection goes
    %% back down after either one or two watchdog expiries, depending
    %% on whether or not DWA restored the connection.

    {okay, suspect} = ?WD_EVENT(Ref),
    ?EVENT({down, Ref, _, _}),

    %%   SUSPECT       Timer expires        CloseConnection()
    %%                                      SetWatchdog()        DOWN
    %%
    %% Non-response brings the connection down after another timeout.

    {suspect, down} = ?WD_EVENT(Ref),

    R2 = R1 + choose(M == 'DWA', 1, 0),
    A2 = A1,
    [R2,_,_,A2] = wd_counts(SvcName),

    %%   DOWN          Timer expires        AttemptOpen()
    %%                                      SetWatchdog()        DOWN
    %%
    %%   DOWN          Connection up        NumDWA = 0
    %%                                      SendWatchdog()
    %%                                      SetWatchdog()
    %%                                      Pending = TRUE       REOPEN
    %%
    %% The connection is reestablished after another timeout.

    recv_reopen(Type, Ref),

    %%   REOPEN        Receive non-DWA      Throwaway()          REOPEN
    %%
    %%   REOPEN        Receive DWA &        Pending = FALSE
    %%                 NumDWA < 2           NumDWA++             REOPEN
    %%
    %%   REOPEN        Receive DWA &        Pending = FALSE
    %%                 NumDWA == 2          NumDWA++
    %%                                      Failback()           OKAY
    %%
    %%   REOPEN        Timer expires &      SendWatchdog()
    %%                 !Pending             SetWatchdog()
    %%                                      Pending = TRUE       REOPEN
    %%
    %% An exchange of 3 watchdogs (the first directly after
    %% capabilities exchange) brings the connection back up.

    {reopen, okay} = ?WD_EVENT(Ref),
    ?EVENT({up, Ref, _, _, #diameter_packet{}}),

    %% Three DWR's have been answered.
    R3 = R2 + 3,
    A3 = A2 + 3,
    [R3,_,_,A3] = wd_counts(SvcName),

    %% Non-response brings it down again.

    {okay, suspect} = ?WD_EVENT(Ref),
    ?EVENT({down, Ref, _, _}),
    {suspect, down} = ?WD_EVENT(Ref),

    R4 = R3 + 1,
    A4 = A3,
    [R4,_,_,A4] = wd_counts(SvcName),

    %% Reestablish after another watchdog.

    recv_reopen(Type, Ref),

    %%   REOPEN        Timer expires &      NumDWA = -1
    %%                 Pending &            SetWatchdog()
    %%                 NumDWA >= 0                               REOPEN
    %%
    %%   REOPEN        Timer expires &      CloseConnection()
    %%                 Pending &            SetWatchdog()
    %%                 NumDWA < 0                                DOWN
    %%
    %% Peer is now ignoring all watchdogs go down again after 2
    %% timeouts.

    {reopen, down} = ?WD_EVENT(Ref);

%% The misbehaving peer.
reopen(Type, _, SvcName, Ref, Wd, N, M) ->
    ?LOG("peer ~p", [[Type, SvcName, Ref, Wd, N, M]]),

    %% First transport process.
    {initial, okay} = ?WD_EVENT(Ref),
    ?EVENT({up, Ref, _, _, #diameter_packet{}}),

    reg(Ref, SvcName, {SvcName, {Wd,N,M}}),

    {okay, down} = ?WD_EVENT(Ref),

    %% Second transport process.
    ?EVENT({watchdog, Ref, _, {_, okay}, _}),
    reg(Ref, SvcName, 3),  %% answer 3 watchdogs then fall silent
    ?EVENT({watchdog, Ref, _, {_, down}, _}),

    %% Third transport process.
    ?EVENT({watchdog, Ref, _, {_, okay}, _}),
    reg(Ref, SvcName, 0),  %% disable outgoing DWA
    ?EVENT({watchdog, Ref, _, {_, down}, _}),

    ok.

log_wd({From, To} = T) ->
    ?LOG("~p -> ~p", [From, To]),
    T.

log_event(E) ->
    T = element(1,E),
    T == watchdog orelse ?LOG("~p", [T]),
    E.

%% recv_reopen/2

recv_reopen(connect, Ref) ->
    {down, reopen} = ?WD_EVENT(Ref),
    ?EVENT({reconnect, Ref, _});

recv_reopen(listen, Ref) ->
    {_, reopen} = ?WD_EVENT(Ref).

%% reg/3
%%
%% Lookup the pid of the transport process and publish a term for
%% send/2 to lookup.
reg(TRef, SvcName, T) ->
    TPid = tpid(TRef, diameter:service_info(SvcName, transport)),
    true = diameter_reg:add_new({?MODULE, TPid, T}).

%% tpid/2

tpid(Ref, [[{ref, Ref},
            {type, connect},
            {options, _},
            {watchdog, _},
            {peer, _},
            {apps, _},
            {caps, _},
            {port, [{owner, TPid} | _]}
            | _]]) ->
    TPid;

tpid(Ref, [[{ref, Ref},
            {type, listen},
            {options, _},
            {accept, As}
            | _]]) ->
    [[{watchdog, _},
      {peer, _},
      {apps, _},
      {caps, _},
      {port, [{owner, TPid} | _]}
      | _]]
        = lists:filter(fun([{watchdog, {_,_,S}} | _]) ->
                               S == okay orelse S == reopen
                       end,
                       As),
    TPid.

%% ===========================================================================
%% # suspect/1
%% ===========================================================================

%% Configure transports to require a set number of watchdog timeouts
%% before moving from OKAY to SUSPECT.

suspect(_) ->
    [] = run([[abuse, [suspect, N]] || N <- [0,1,3]]).

suspect(Type, Fake, Ref, N)
  when is_reference(Ref) ->
    {SvcName, TRef}
        = start(Type, Ref, {?WD(10000), [{suspect, N}], mod(Fake)}),
    {initial, okay} = ?WD_EVENT(TRef),
    suspect(TRef, Fake, SvcName, N);

suspect(TRef, true, SvcName, _) ->
    reg(TRef, SvcName, 0),  %% disable outgoing DWA
    {okay, _} = ?WD_EVENT(TRef);

suspect(TRef, false, SvcName, 0) ->  %% SUSPECT disabled
    %% Wait 2+ watchdogs and see that only one watchdog has been sent.
    false = ?WD_EVENT(TRef, 28000),
    [1,0,0,0] = wd_counts(SvcName);

suspect(TRef, false, SvcName, N) ->
    %% Check that no watchdog transition takes place within N+
    %% watchdogs ...
    false = ?WD_EVENT(TRef, N*10000+8000),
    [1,0,0,0] = wd_counts(SvcName),
    %% ... but that the connection then becomes suspect ...
    {okay, suspect} = ?WD_EVENT(TRef, 10000),
    [1,0,0,0] = wd_counts(SvcName),
    %% ... and goes down.
    {suspect, down} = ?WD_EVENT(TRef, 18000),
    [1,0,0,0] = wd_counts(SvcName).

%% abuse/1

abuse(F) ->
    [] = run([[abuse, F, T] || T <- [listen, connect]]).

abuse(F, [_,_,_|_] = Args) ->
    ?LOG("~p", [Args]),
    apply(?MODULE, F, Args);

abuse([F|A], Test) ->
    Ref = make_ref(),
    [] = run([[abuse, F, [T, T == Test, Ref] ++ A]
              || T <- [listen, connect]]);

abuse(F, Test) ->
    abuse([F], Test).

mod(true) ->
    [{module, ?MODULE}];
mod(false) ->
    [].

%% ===========================================================================
%% # okay/1
%% ===========================================================================

%% Configure the number of watchdog exchanges before moving from
%% REOPEN to OKAY.

okay(_) ->
    [] = run([[abuse, [okay, N]] || N <- [0,2,3]]).

okay(Type, Fake, Ref, N)
  when is_reference(Ref) ->
    {SvcName, TRef}
        = start(Type, Ref, {?WD(10000),
                            [{okay, choose(Fake, 0, N)}],
                            mod(Fake)}),
    {initial, okay} = ?WD_EVENT(TRef),
    okay(TRef,
         Fake,
         SvcName,
         choose(Type == listen, initial, down),
         N).

okay(TRef, true, SvcName, Down, _) ->
    reg(TRef, SvcName, 0),  %% disable outgoing DWA
    {okay, down} = ?WD_EVENT(TRef),
    {Down, okay} = ?WD_EVENT(TRef),
    reg(TRef, SvcName, -1), %% enable outgoing DWA
    {okay, down} = ?WD_EVENT(TRef);

okay(TRef, false, SvcName, Down, N) ->
    {okay, suspect} = ?WD_EVENT(TRef),
    [1,0,0,0] = wd_counts(SvcName),
    {suspect, down} = ?WD_EVENT(TRef),
    ok(TRef, SvcName, Down, N).

ok(TRef, SvcName, Down, 0) ->
    %% Connection comes up without watchdog exchange.
    {Down, okay} = ?WD_EVENT(TRef),
    [1,0,0,0] = wd_counts(SvcName),
    %% Wait 2+ watchdog timeouts to see that the connection stays up
    %% and two watchdogs are exchanged.
    false = ?WD_EVENT(TRef, 28000),
    [3,0,0,2] = wd_counts(SvcName);

ok(TRef, SvcName, Down, N) ->
    %% Connection required watchdog exchange before reaching OKAY.
    {Down, reopen} = ?WD_EVENT(TRef),
    {reopen, okay} = ?WD_EVENT(TRef),
    %% One DWR was sent in moving to expect, plus N more to reopen the
    %% connection.
    N1 = N+1,
    [N1,0,0,N] = wd_counts(SvcName).

%% ===========================================================================

%% wd_counts/1

wd_counts(SvcName) ->
    [Info] = diameter:service_info(SvcName, transport),
    {_, Counters} = lists:keyfind(statistics, 1, Info),
    [proplists:get_value({{0,280,R}, D}, Counters, 0) || D <- [send,recv],
                                                         R <- [1,0]].

%% start/3

start(Type, Ref, T) ->
    Name = hostname(),
    true = diameter:subscribe(Name),
    ok = diameter:start_service(Name, [{monitor, self()} | ?SERVICE(Name)]),
    {ok, TRef} = diameter:add_transport(Name, {Type, opts(Type, Ref, T)}),
    true = diameter_reg:add_new({Type, Ref, Name}),
    {Name, TRef}.

opts(Type, Ref, {Timer, Config, Mod}) ->
    [{transport_module, diameter_tcp},
     {transport_config, Mod ++ [{ip, ?ADDR}, {port, 0}] ++ cfg(Type, Ref)},
     {watchdog_timer, Timer},
     {watchdog_config, Config}].

cfg(listen, _) ->
    [];
cfg(connect, Ref) ->
    [{{_, _, SvcName}, _Pid}] = diameter_reg:wait({listen, Ref, '_'}),
    [[{ref, LRef} | _]] = diameter:service_info(SvcName, transport),
    [LP] = ?util:lport(tcp, LRef),
    [{raddr, ?ADDR}, {rport, LP}].

%% ===========================================================================

listen(PortNr, Opts) ->
    gen_tcp:listen(PortNr, Opts).

accept(LSock) ->
    gen_tcp:accept(LSock).

connect(Addr, Port, Opts) ->
    gen_tcp:connect(Addr, Port, Opts).

setopts(Sock, Opts) ->
    inet:setopts(Sock, Opts).

send(Sock, Bin) ->
    send(getr(config), Sock, Bin).

close(Sock) ->
    gen_tcp:close(Sock).

%% send/3

%% First outgoing message from a new transport process is CER/CEA.
%% Remaining outgoing messages are either DWR or DWA.
send(undefined, Sock, Bin) ->
    <<_:32, _:8, 257:24, _/binary>> = Bin,
    putr(config, init),
    gen_tcp:send(Sock, Bin);

%% Outgoing DWR: fake reception of DWA. Use the fact that AVP values
%% are ignored. This is to ensure that the peer's watchdog state
%% transitions are only induced by responses to messages it sends.
send(_, Sock, <<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>) ->
    Pkt = #diameter_packet{header = #diameter_header{version = 1,
                                                     end_to_end_id = EId,
                                                     hop_by_hop_id = HId},
                           msg = ['DWA', {'Result-Code', 2001},
                                         {'Origin-Host', "XXX"},
                                         {'Origin-Realm', ?REALM}]},
    #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt),
    self() ! {tcp, Sock, Bin},
    ok;

%% First outgoing DWA.
send(init, Sock, Bin) ->
    [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, self(), '_'}),
    putr(config, T),
    send(Sock, Bin);

%% First transport process.
send({SvcName, {_,_,_} = T}, Sock, Bin) ->
    [{'Origin-Host', _} = OH, {'Origin-Realm', _} = OR | _]
        = ?SERVICE(SvcName),
    putr(origin, [OH, OR]),
    putr(config, T),
    send(Sock, Bin);

%% Discard DWA, failback after another timeout in the peer.
send({Wd, 0 = No, Msg}, Sock, Bin) ->
    Origin = getr(origin),
    spawn(fun() -> failback(?ONE_WD(Wd), Msg, Sock, Bin, Origin) end),
    putr(config, No),
    ok;

%% Send DWA while we're in the mood (aka 0 < N).
send({Wd, N, Msg}, Sock, Bin) ->
    putr(config, {Wd, N-1, Msg}),
    gen_tcp:send(Sock, Bin);

%% Discard DWA.
send(0, _Sock, _Bin) ->
    ok;

%% Send DWA.
send(N, Sock, <<_:32, 0:1, _:7, 280:24, _/binary>> = Bin) ->
    putr(config, N-1),
    gen_tcp:send(Sock, Bin).

failback(Tmo, Msg, Sock, Bin, Origin) ->
    timer:sleep(Tmo),
    ok = gen_tcp:send(Sock, msg(Msg, Bin, Origin)).

%% msg/2

msg('DWA', Bin, _Origin) ->
    Bin;
msg(Msg, _Bin, Origin) ->
    #diameter_packet{bin = Bin}
        = diameter_codec:encode(?BASE, msg(Msg, Origin)),
    Bin.

msg('DWR' = M, T) ->
    [M | T];

msg('RAA', T) ->
    ['RAA', {'Session-Id', diameter:session_id("abc")},
            {'Result-Code', 2001}
          | T].
%% An unexpected answer is discarded after passing through the
%% watchdog state machine.

%% ===========================================================================

peer_up(_SvcName, _Peer, S) ->
    S.

peer_down(_SvcName, _Peer, S) ->
    S.

%% ===========================================================================

choose(true, X, _)  -> X;
choose(false, _, X) -> X.

%% id/1
%%
%% Jitter callback.

id(T) ->
    T.

%% run/1
%%
%% A more useful badmatch in case of failure.

run(Fs) ->
    ?util:run([{?MODULE, [run1, F]} || F <- Fs]).

run1([F|A]) ->
    ok = try
             apply(?MODULE, F, A),
             ok
         catch
             E:R ->
                 S = erlang:get_stacktrace(),
                 ?WARN("~p", [{A, E, R, S}]),
                 S
         end.

%% jitter/2

jitter(?WD(T), _) ->
    T;
jitter(T,D) ->
    T+D.

%% Generate a unique hostname for the faked peer.
hostname() ->
    ?util:unique_string().

putr(Key, Val) ->
    put({?MODULE, Key}, Val).

getr(Key) ->
    get({?MODULE, Key}).