aboutsummaryrefslogblamecommitdiffstats
path: root/lib/diameter/src/base/diameter_service.erl
blob: b36160148b4074850ccc5878e82c9d00f3b725c6 (plain) (tree)
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984

































































                                                                         



















































                                                                                
                                                                             







                                                                           
                                                                          









                                                               
                                                                           
                                                                       
                                                  
                                           
                                                     



                                                   
                                                  



























































































































































































































































































































                                                                               
                                            






                                                                               
                                      
















































































                                                                               

                                      



































                                                                               


                                                




































































































































































































































































































































































































                                                                               

                                          








                                                  


                                                       








































                                                                               
                                                    

                                
                         


                   









                                                                   






                                 

                                                               
                                





























































































































































































                                                                               
                                   


                                                                   
                                                     













                                                             
                        

                                                                      
                                                        
 
                        



                                                                


                                                                             
 

                                                            
 

                                                     
 
                        
 
                                 
                                      

                                                               
 

                                                                          
 
                                                                       
                                      
                                                                
 
                                                                       
                                      
                                                                
 
                                                

        
                         

                                
                        



                                                         
                           
                        
                             
 


                                                                    





                                                                      
                                                         


                                                         
                                


























































                                                                              
                                                             
                         
                                                            
                                 
                                                                 
                                 
                                                                     
                                                             
                                                                     
















































































                                                                      
                       








































                                                                          
                                                                              

























































































































































                                                                               


                                                                  

















                                                                       
                                                                        
                          
                                       








                                                                      
               


                                                                          
                                      


                                           
                                       







                                                                     
               

                       
                                                                         
                      
                                       






























































































































                                                                             
                           







                                                              
                                                        
                                                         










                                                                      



                                                                    
           



                                                                  





                                                                      



                                                                             


                                       
























                                                                         
                                           




                                                            
                                                                       
                                                                 

                                                         
                                                





                                                                      
                       
 
                                                
                          

                                
                                                        

                                
                                                                     


                                                                     



                                                             








































































































































                                                                           











































































































                                                                               



                       
                           





























                                                                        
                                                

       
                    





                                                            
                                                       






                                                              
                              
















                                                                    
                 

                        

                                                          


























































                                                                               

                                                              


































































































                                                                               
                             










                                                                               
                                                      



                                                     
                                   


































                                                                       
                    
 


                                                         
 




                                                                     
              




                  
                                                  






























                                                                              


                                                                     






























































































                                                                               

                                                                  








                                                                          




                                    






                               

                            

                                                      

                            

                                                      

                                                                  
 

                                                                    














                                                                 


                                                                    







                                







                                                                         














































































































































































                                                                               
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%

%%
%% Implements the process that represents a service.
%%

-module(diameter_service).
-behaviour(gen_server).

-export([start/1,
         stop/1,
         start_transport/2,
         stop_transport/2,
         info/2,
         call/4]).

%% towards diameter_watchdog
-export([receive_message/3]).

%% service supervisor
-export([start_link/1]).

-export([subscribe/1,
         unsubscribe/1,
         subscriptions/1,
         subscriptions/0,
         services/0,
         services/1,
         whois/1,
         flush_stats/1]).

%% test/debug
-export([call_module/3,
         state/1,
         uptime/1]).

%%% gen_server callbacks
-export([init/1,
         handle_call/3,
         handle_cast/2,
         handle_info/2,
         terminate/2,
         code_change/3]).

%% Other callbacks.
-export([send/1]).

-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").

-define(STATE_UP,   up).
-define(STATE_DOWN, down).

-define(DEFAULT_TC,     30000).  %% RFC 3588 ch 2.1
-define(DEFAULT_TIMEOUT, 5000).  %% for outgoing requests
-define(RESTART_TC,      1000).  %% if restart was this recent

%% Used to be able to swap this with anything else dict-like but now
%% rely on the fact that a service's #state{} record does not change
%% in storing in it ?STATE table and not always going through the
%% service process. In particular, rely on the fact that operations on
%% a ?Dict don't change the handle to it.
-define(Dict, diameter_dict).

%% Table containing outgoing requests for which a reply has yet to be
%% received.
-define(REQUEST_TABLE, diameter_request).

%% Maintains state in a table. In contrast to previously, a service's
%% stat is not constant and is accessed outside of the service
%% process.
-define(STATE_TABLE, ?MODULE).

%% Workaround for dialyzer's lack of understanding of match specs.
-type match(T)
   :: T | '_' | '$1' | '$2' | '$3' | '$4'.

%% State of service gen_server.
-record(state,
        {id = now(),
         service_name,      %% as passed to start_service/2, key in ?STATE_TABLE
         service :: #diameter_service{},
         peerT = ets_new(peers) :: ets:tid(), %% #peer{} at start_fsm
         connT = ets_new(conns) :: ets:tid(), %% #conn{} at connection_up
         share_peers = false :: boolean(), %% broadcast peers to remote nodes?
         use_shared_peers = false :: boolean(),  %% use broadcasted peers?
         shared_peers = ?Dict:new(),         %% Alias -> [{TPid, Caps}, ...]
         local_peers = ?Dict:new(),          %% Alias -> [{TPid, Caps}, ...]
         monitor = false :: false | pid()}). %% process to die with
%% shared_peers reflects the peers broadcast from remote nodes. Note
%% that the state term itself doesn't change, which is relevant for
%% the stateless application callbacks since the state is retrieved
%% from ?STATE_TABLE from outside the service process. The pid in the
%% service record is used to determine whether or not we need to call
%% the process for a pick_peer callback.

%% Record representing a watchdog process.
-record(peer,
        {pid  :: match(pid()),
         type :: match(connect | accept),
         ref  :: match(reference()),  %% key into diameter_config
         options :: match([diameter:transport_opt()]),%% from start_transport
         op_state = ?STATE_DOWN :: match(?STATE_DOWN | ?STATE_UP),
         started = now(),      %% at process start
         conn = false :: match(boolean() | pid())}).
                      %% true at accept, pid() at connection_up (connT key)

%% Record representing a peer_fsm process.
-record(conn,
        {pid   :: pid(),
         apps  :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias}
         caps  :: #diameter_caps{},
         started = now(),  %% at process start
         peer  :: pid()}). %% key into peerT

%% Record stored in diameter_request for each outgoing request.
-record(request,
        {from,               %% arg 2 of handle_call/3
         handler    :: match(pid()), %% request process
         transport  :: match(pid()), %% peer process
         caps       :: match(#diameter_caps{}),
         app        :: match(diameter:app_alias()),  %% #diameter_app.alias
         dictionary :: match(module()),     %% #diameter_app.dictionary
         module     :: match([module() | list()]),
                    %% #diameter_app.module
         filter     :: match(diameter:peer_filter()),
         packet     :: match(#diameter_packet{})}).

%% Record call/4 options are parsed into.
-record(options,
        {filter = none  :: diameter:peer_filter(),
         extra = []     :: list(),
         timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF,
         detach = false :: boolean()}).

%% Since RFC 3588 requires that a Diameter agent not modify End-to-End
%% Identifiers, the possibility of explicitly setting an End-to-End
%% Identifier would be needed to be able to implement an agent in
%% which one side of the communication is not implemented on top of
%% diameter. For example, Diameter being sent or received encapsulated
%% in some other protocol, or even another Diameter stack in a
%% non-Erlang environment. (Not that this is likely to be a normal
%% case.)
%%
%% The implemented solution is not an option but to respect any header
%% values set in a diameter_header record returned from a
%% prepare_request callback. A call to diameter:call/4 can communicate
%% values to the callback using the 'extra' option if so desired.

%%% ---------------------------------------------------------------------------
%%% # start(SvcName)
%%% ---------------------------------------------------------------------------

start(SvcName) ->
    diameter_service_sup:start_child(SvcName).

start_link(SvcName) ->
    Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}],
    gen_server:start_link(?MODULE, [SvcName], Options).
%% Put the arbitrary term SvcName in a list in case we ever want to
%% send more than this and need to distinguish old from new.

%%% ---------------------------------------------------------------------------
%%% # stop(SvcName)
%%% ---------------------------------------------------------------------------

stop(SvcName) ->
    case whois(SvcName) of
        undefined ->
            {error, not_started};
        Pid ->
            stop(call_service(Pid, stop), Pid)
    end.

stop(ok, Pid) ->
    MRef = erlang:monitor(process, Pid),
    receive {'DOWN', MRef, process, _, _} -> ok end;
stop(No, _) ->
    No.

%%% ---------------------------------------------------------------------------
%%% # start_transport(SvcName, {Ref, Type, Opts})
%%% ---------------------------------------------------------------------------

start_transport(SvcName, {_,_,_} = T) ->
    call_service_by_name(SvcName, {start, T}).

%%% ---------------------------------------------------------------------------
%%% # stop_transport(SvcName, Refs)
%%% ---------------------------------------------------------------------------

stop_transport(_, []) ->
    ok;
stop_transport(SvcName, [_|_] = Refs) ->
    call_service_by_name(SvcName, {stop, Refs}).

%%% ---------------------------------------------------------------------------
%%% # info(SvcName, Item)
%%% ---------------------------------------------------------------------------

info(SvcName, Item) ->
    info_rc(call_service_by_name(SvcName, {info, Item})).

info_rc({error, _}) ->
    undefined;
info_rc(Info) ->
    Info.

%%% ---------------------------------------------------------------------------
%%% # receive_message(TPid, Pkt, MessageData)
%%% ---------------------------------------------------------------------------

%% Handle an incoming message in the watchdog process. This used to
%% come through the service process but this avoids that becoming a
%% bottleneck.

receive_message(TPid, Pkt, T)
  when is_pid(TPid) ->
    #diameter_packet{header = #diameter_header{is_request = R}} = Pkt,
    recv(R, (not R) andalso lookup_request(Pkt, TPid), TPid, Pkt, T).

%% Incoming request ...
recv(true, false, TPid, Pkt, T) ->
    try
        spawn(fun() -> recv_request(TPid, Pkt, T) end)
    catch
        error: system_limit = E ->  %% discard
            ?LOG({error, E}, now())
    end;

%% ... answer to known request ...
recv(false, #request{from = {_, Ref}, handler = Pid} = Req, _, Pkt, _) ->
    Pid ! {answer, Ref, Req, Pkt};
%% Note that failover could have happened prior to this message being
%% received and triggering failback. That is, both a failover message
%% and answer may be on their way to the handler process. In the worst
%% case the request process gets notification of the failover and
%% sends to the alternate peer before an answer arrives, so it's
%% always the case that we can receive more than one answer after
%% failover. The first answer received by the request process wins,
%% any others are discarded.

%% ... or not.
recv(false, false, _, _, _) ->
    ok.

%%% ---------------------------------------------------------------------------
%%% # call(SvcName, App, Msg, Options)
%%% ---------------------------------------------------------------------------

call(SvcName, App, Msg, Options)
  when is_list(Options) ->
    Rec = make_options(Options),
    Ref = make_ref(),
    Caller = {self(), Ref},
    Fun = fun() -> exit({Ref, call(SvcName, App, Msg, Rec, Caller)}) end,
    try spawn_monitor(Fun) of
        {_, MRef} ->
            recv(MRef, Ref, Rec#options.detach, false)
    catch
        error: system_limit = E ->
            {error, E}
    end.

%% Don't rely on gen_server:call/3 for the timeout handling since it
%% makes no guarantees about not leaving a reply message in the
%% mailbox if we catch its exit at timeout. It currently *can* do so,
%% which is also undocumented.

recv(MRef, _, true, true) ->
    erlang:demonitor(MRef, [flush]),
    ok;

recv(MRef, Ref, Detach, Sent) ->
    receive
        Ref ->  %% send has been attempted
            recv(MRef, Ref, Detach, true);
        {'DOWN', MRef, process, _, Reason} ->
            call_rc(Reason, Ref, Sent)
    end.

%% call/5 has returned ...
call_rc({Ref, Ans}, Ref, _) ->
    Ans;

%% ... or not. In this case failure/encode are documented.
call_rc(_, _, Sent) ->
    {error, choose(Sent, failure, encode)}.

%% call/5
%%
%% In the process spawned for the outgoing request.

call(SvcName, App, Msg, Opts, Caller) ->
    c(ets:lookup(?STATE_TABLE, SvcName), App, Msg, Opts, Caller).

c([#state{service_name = SvcName} = S], App, Msg, Opts, Caller) ->
    case find_transport(App, Msg, Opts, S) of
        {_,_,_} = T ->
            send_request(T, Msg, Opts, Caller, SvcName);
        false ->
            {error, no_connection};
        {error, _} = No ->
            No
    end;

c([], _, _, _, _) ->
    {error, no_service}.

%% make_options/1

make_options(Options) ->
    lists:foldl(fun mo/2, #options{}, Options).

mo({timeout, T}, Rec)
  when is_integer(T), 0 =< T ->
    Rec#options{timeout = T};

mo({filter, F}, #options{filter = none} = Rec) ->
    Rec#options{filter = F};
mo({filter, F}, #options{filter = {all, Fs}} = Rec) ->
    Rec#options{filter = {all, [F | Fs]}};
mo({filter, F}, #options{filter = F0} = Rec) ->
    Rec#options{filter = {all, [F0, F]}};

mo({extra, L}, #options{extra = X} = Rec)
  when is_list(L) ->
    Rec#options{extra = X ++ L};

mo(detach, Rec) ->
    Rec#options{detach = true};

mo(T, _) ->
    ?ERROR({invalid_option, T}).

%%% ---------------------------------------------------------------------------
%%% # subscribe(SvcName)
%%% # unsubscribe(SvcName)
%%% ---------------------------------------------------------------------------

subscribe(SvcName) ->
    diameter_reg:add({?MODULE, subscriber, SvcName}).

unsubscribe(SvcName) ->
    diameter_reg:del({?MODULE, subscriber, SvcName}).

subscriptions(Pat) ->
    pmap(diameter_reg:match({?MODULE, subscriber, Pat})).

subscriptions() ->
    subscriptions('_').

pmap(Props) ->
    lists:map(fun({{?MODULE, _, Name}, Pid}) -> {Name, Pid} end, Props).

%%% ---------------------------------------------------------------------------
%%% # services(Pattern)
%%% ---------------------------------------------------------------------------

services(Pat) ->
    pmap(diameter_reg:match({?MODULE, service, Pat})).

services() ->
    services('_').

whois(SvcName) ->
    case diameter_reg:match({?MODULE, service, SvcName}) of
        [{_, Pid}] ->
            Pid;
        [] ->
            undefined
    end.

%%% ---------------------------------------------------------------------------
%%% # flush_stats/1
%%%
%%% Output: list of {{SvcName, Alias, Counter}, Value}
%%% ---------------------------------------------------------------------------

flush_stats(TPid) ->
    diameter_stats:flush(TPid).

%% ===========================================================================
%% ===========================================================================

state(Svc) ->
    call_service(Svc, state).

uptime(Svc) ->
    call_service(Svc, uptime).

%% call_module/3

call_module(Service, AppMod, Request) ->
    call_service(Service, {call_module, AppMod, Request}).

%%% ---------------------------------------------------------------------------
%%% # init([SvcName])
%%% ---------------------------------------------------------------------------

init([SvcName]) ->
    process_flag(trap_exit, true),  %% ensure terminate(shutdown, _)
    i(SvcName, diameter_reg:add_new({?MODULE, service, SvcName})).

i(SvcName, true) ->
    {ok, i(SvcName)};
i(_, false) ->
    {stop, {shutdown, already_started}}.

%%% ---------------------------------------------------------------------------
%%% # handle_call(Req, From, State)
%%% ---------------------------------------------------------------------------

handle_call(state, _, S) ->
    {reply, S, S};

handle_call(uptime, _, #state{id = T} = S) ->
    {reply, diameter_lib:now_diff(T), S};

%% Start a transport.
handle_call({start, {Ref, Type, Opts}}, _From, S) ->
    {reply, start(Ref, {Type, Opts}, S), S};

%% Stop transports.
handle_call({stop, Refs}, _From, S) ->
    shutdown(Refs, S),
    {reply, ok, S};

%% pick_peer with mutable state
handle_call({pick_peer, Local, Remote, App}, _From, S) ->
    #diameter_app{mutable = true} = App,  %% assert
    {reply, pick_peer(Local, Remote, self(), S#state.service_name, App), S};

handle_call({call_module, AppMod, Req}, From, S) ->
    call_module(AppMod, Req, From, S);

handle_call({info, Item}, _From, S) ->
    {reply, service_info(Item, S), S};

handle_call(stop, _From, S) ->
    shutdown(S),
    {stop, normal, ok, S};
%% The server currently isn't guaranteed to be dead when the caller
%% gets the reply. We deal with this in the call to the server,
%% stating a monitor that waits for DOWN before returning.

handle_call(Req, From, S) ->
    unexpected(handle_call, [Req, From], S),
    {reply, nok, S}.

%%% ---------------------------------------------------------------------------
%%% # handle_cast(Req, State)
%%% ---------------------------------------------------------------------------

handle_cast(Req, S) ->
    unexpected(handle_cast, [Req], S),
    {noreply, S}.

%%% ---------------------------------------------------------------------------
%%% # handle_info(Req, State)
%%% ---------------------------------------------------------------------------

handle_info(T,S) ->
    case transition(T,S) of
        ok ->
            {noreply, S};
        #state{} = NS ->
            {noreply, NS};
        {stop, Reason} ->
            {stop, {shutdown, Reason}, S}
    end.

%% transition/2

%% Peer process is telling us to start a new accept process.
transition({accepted, Pid, TPid}, S) ->
    accepted(Pid, TPid, S),
    ok;

%% Peer process has a new open connection.
transition({connection_up, Pid, T}, S) ->
    connection_up(Pid, T, S);

%% Peer process has left state open.
transition({connection_down, Pid}, S) ->
    connection_down(Pid, S);

%% Peer process has returned to state open.
transition({connection_up, Pid}, S) ->
    connection_up(Pid, S);

%% Accepting transport has lost connectivity.
transition({close, Pid}, S) ->
    close(Pid, S),
    ok;

%% Connecting transport is being restarted by watchdog.
transition({reconnect, Pid}, S) ->
    reconnect(Pid, S),
    ok;

%% Monitor process has died. Just die with a reason that tells
%% diameter_config about the happening. If a cleaner shutdown is
%% required then someone should stop us.
transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) ->
    {stop, {monitor, Reason}};

%% Local peer process has died.
transition({'DOWN', _, process, Pid, Reason}, S)
  when node(Pid) == node() ->
    peer_down(Pid, Reason, S);

%% Remote service wants to know about shared transports.
transition({service, Pid}, S) ->
    share_peers(Pid, S),
    ok;

%% Remote service is communicating a shared peer.
transition({peer, TPid, Aliases, Caps}, S) ->
    remote_peer_up(TPid, Aliases, Caps, S);

%% Remote peer process has died.
transition({'DOWN', _, process, TPid, _}, S) ->
    remote_peer_down(TPid, S);

%% Restart after tc expiry.
transition({tc_timeout, T}, S) ->
    tc_timeout(T, S),
    ok;

%% Request process is telling us it may have missed a failover message
%% after a transport went down and the service process looked up
%% outstanding requests.
transition({failover, TRef, Seqs}, S) ->
    failover(TRef, Seqs, S),
    ok;

transition(Req, S) ->
    unexpected(handle_info, [Req], S),
    ok.

%%% ---------------------------------------------------------------------------
%%% # terminate(Reason, State)
%%% ---------------------------------------------------------------------------

terminate(Reason, #state{service_name = Name} = S) ->
    ets:delete(?STATE_TABLE, Name),
    shutdown == Reason  %% application shutdown
        andalso shutdown(S).

%%% ---------------------------------------------------------------------------
%%% # code_change(FromVsn, State, Extra)
%%% ---------------------------------------------------------------------------

code_change(FromVsn,
            #state{service_name = SvcName,
                   service = #diameter_service{applications = Apps}}
            = S,
            Extra) ->
    lists:foreach(fun(A) ->
                          code_change(FromVsn, SvcName, Extra, A)
                  end,
                  Apps),
    {ok, S}.

code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) ->
    {ok, S} = cb(A, code_change, [FromVsn,
                                  mod_state(Alias),
                                  Extra,
                                  SvcName]),
    mod_state(Alias, S).

%% ===========================================================================
%% ===========================================================================

unexpected(F, A, #state{service_name = Name}) ->
    ?UNEXPECTED(F, A ++ [Name]).

cb([_|_] = M, F, A) ->
    eval(M, F, A);
cb(Rec, F, A) ->
    {_, M} = app(Rec),
    eval(M, F, A).

app(#request{app = A, module = M}) ->
    {A,M};
app(#diameter_app{alias = A, module = M}) ->
    {A,M}.

eval([M|X], F, A) ->
    apply(M, F, A ++ X).

%% Callback with state.

state_cb(#diameter_app{mutable = false, init_state = S}, {ModX, F, A}) ->
    eval(ModX, F, A ++ [S]);

state_cb(#diameter_app{mutable = true, alias = Alias}, {_,_,_} = MFA) ->
    state_cb(MFA, Alias);

state_cb({ModX,F,A}, Alias)
  when is_list(ModX) ->
    eval(ModX, F, A ++ [mod_state(Alias)]).

choose(true, X, _)  -> X;
choose(false, _, X) -> X.

ets_new(Tbl) ->
    ets:new(Tbl, [{keypos, 2}]).

insert(Tbl, Rec) ->
    ets:insert(Tbl, Rec),
    Rec.

monitor(Pid) ->
    erlang:monitor(process, Pid),
    Pid.

%% Using the process dictionary for the callback state was initially
%% just a way to make what was horrendous trace (big state record and
%% much else everywhere) somewhat more readable. There's not as much
%% need for it now but it's no worse (except possibly that we don't
%% see the table identifier being passed around) than an ets table so
%% keep it.

mod_state(Alias) ->
    get({?MODULE, mod_state, Alias}).

mod_state(Alias, ModS) ->
    put({?MODULE, mod_state, Alias}, ModS).

%% have_transport/2

have_transport(SvcName, Ref) ->
    [] /= diameter_config:have_transport(SvcName, Ref).

%%% ---------------------------------------------------------------------------
%%% # shutdown/2
%%% ---------------------------------------------------------------------------

shutdown(Refs, #state{peerT = PeerT}) ->
    ets:foldl(fun(P,ok) -> s(P, Refs), ok end, ok, PeerT).

s(#peer{ref = Ref, pid = Pid}, Refs) ->
    s(lists:member(Ref, Refs), Pid);

s(true, Pid) ->
    Pid ! {shutdown, self()};  %% 'DOWN' will cleanup as usual
s(false, _) ->
    ok.

%%% ---------------------------------------------------------------------------
%%% # shutdown/1
%%% ---------------------------------------------------------------------------

shutdown(#state{peerT = PeerT}) ->
    %% A transport might not be alive to receive the shutdown request
    %% but give those that are a chance to shutdown gracefully.
    wait(fun st/2, PeerT),
    %% Kill the watchdogs explicitly in case there was no transport.
    wait(fun sw/2, PeerT).

wait(Fun, T) ->
    diameter_lib:wait(ets:foldl(Fun, [], T)).

st(#peer{conn = B}, Acc)
  when is_boolean(B) ->
    Acc;
st(#peer{conn = Pid}, Acc) ->
    Pid ! shutdown,
    [Pid | Acc].

sw(#peer{pid = Pid}, Acc) ->
    exit(Pid, shutdown),
    [Pid | Acc].

%%% ---------------------------------------------------------------------------
%%% # call_service/2
%%% ---------------------------------------------------------------------------

call_service(Pid, Req)
  when is_pid(Pid) ->
    cs(Pid, Req);
call_service(SvcName, Req) ->
    call_service_by_name(SvcName, Req).

call_service_by_name(SvcName, Req) ->
    cs(whois(SvcName), Req).

cs(Pid, Req)
  when is_pid(Pid) ->
    try
        gen_server:call(Pid, Req, infinity)
    catch
        E: Reason when E == exit ->
            {error, {E, Reason}}
    end;

cs(undefined, _) ->
    {error, no_service}.

%%% ---------------------------------------------------------------------------
%%% # i/1
%%%
%%% Output: #state{}
%%% ---------------------------------------------------------------------------

%% Intialize the state of a service gen_server.

i(SvcName) ->
    %% Split the config into a server state and a list of transports.
    {#state{} = S, CL} = lists:foldl(fun cfg_acc/2,
                                     {false, []},
                                     diameter_config:lookup(SvcName)),

    %% Publish the state in order to be able to access it outside of
    %% the service process. Originally table identifiers were only
    %% known to the service process but we now want to provide the
    %% option of application callbacks being 'stateless' in order to
    %% avoid having to go through a common process. (Eg. An agent that
    %% sends a request for every incoming request.)
    true = ets:insert_new(?STATE_TABLE, S),

    %% Start fsms for each transport.
    lists:foreach(fun(T) -> start_fsm(T,S) end, CL),

    init_shared(S),
    S.

cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts},
        {false, Acc}) ->
    lists:foreach(fun init_mod/1, Apps),
    S = #state{service_name = SvcName,
               service = Rec#diameter_service{pid = self()},
               share_peers = get_value(share_peers, Opts),
               use_shared_peers = get_value(use_shared_peers, Opts),
               monitor = mref(get_value(monitor, Opts))},
    {S, Acc};

cfg_acc({_Ref, Type, _Opts} = T, {S, Acc})
  when Type == connect;
       Type == listen ->
    {S, [T | Acc]}.

mref(false = No) ->
    No;
mref(P) ->
    erlang:monitor(process, P).

init_shared(#state{use_shared_peers = true,
                   service_name = Svc}) ->
    diameter_peer:notify(Svc, {service, self()});
init_shared(#state{use_shared_peers = false}) ->
    ok.

init_mod(#diameter_app{alias = Alias,
                       init_state = S}) ->
    mod_state(Alias, S).

start_fsm({Ref, Type, Opts}, S) ->
    start(Ref, {Type, Opts}, S).

get_value(Key, Vs) ->
    {_, V} = lists:keyfind(Key, 1, Vs),
    V.

%%% ---------------------------------------------------------------------------
%%% # start/3
%%% ---------------------------------------------------------------------------

%% If the initial start/3 at service/transport start succeeds then
%% subsequent calls to start/4 on the same service will also succeed
%% since they involve the same call to merge_service/2. We merge here
%% rather than earlier since the service may not yet be configured
%% when the transport is configured.

start(Ref, {T, Opts}, S)
  when T == connect;
       T == listen ->
    try
        {ok, start(Ref, type(T), Opts, S)}
    catch
        ?FAILURE(Reason) ->
            {error, Reason}
    end.
%% TODO: don't actually raise any errors yet

%% There used to be a difference here between the handling of
%% configured listening and connecting transports but now we simply
%% tell the transport_module to start an accepting or connecting
%% process respectively, the transport implementation initiating
%% listening on a port as required.
type(listen)      -> accept;
type(accept)      -> listen;
type(connect = T) -> T.

%% start/4

start(Ref, Type, Opts, #state{peerT = PeerT,
                              connT = ConnT,
                              service_name = SvcName,
                              service = Svc})
  when Type == connect;
       Type == accept ->
    Pid = monitor(s(Type, Ref, {ConnT,
                                Opts,
                                SvcName,
                                merge_service(Opts, Svc)})),
    insert(PeerT, #peer{pid = Pid,
                        type = Type,
                        ref = Ref,
                        options = Opts}),
    Pid.

%% Note that the service record passed into the watchdog is the merged
%% record so that each watchdog (and peer_fsm) may get a different
%% record. This record is what is passed back into application
%% callbacks.

s(Type, Ref, T) ->
    diameter_watchdog:start({Type, Ref}, T).

%% merge_service/2

merge_service(Opts, Svc) ->
    lists:foldl(fun ms/2, Svc, Opts).

%% Limit the applications known to the fsm to those in the 'apps'
%% option. That this might be empty is checked by the fsm. It's not
%% checked at config-time since there's no requirement that the
%% service be configured first. (Which could be considered a bit odd.)
ms({applications, As}, #diameter_service{applications = Apps} = S)
  when is_list(As) ->
    S#diameter_service{applications
                       = [A || A <- Apps,
                               lists:member(A#diameter_app.alias, As)]};

%% The fact that all capabilities can be configured on the transports
%% means that the service doesn't necessarily represent a single
%% locally implemented Diameter peer as identified by Origin-Host: a
%% transport can configure its own Origin-Host. This means that the
%% service little more than a placeholder for default capabilities
%% plus a list of applications that individual transports can choose
%% to support (or not).
ms({capabilities, Opts}, #diameter_service{capabilities = Caps0} = Svc)
  when is_list(Opts) ->
    %% make_caps has already succeeded in diameter_config so it will succeed
    %% again here.
    {ok, Caps} = diameter_capx:make_caps(Caps0, Opts),
    Svc#diameter_service{capabilities = Caps};

ms(_, Svc) ->
    Svc.

%%% ---------------------------------------------------------------------------
%%% # accepted/3
%%% ---------------------------------------------------------------------------

accepted(Pid, _TPid, #state{peerT = PeerT} = S) ->
    #peer{ref = Ref, type = accept = T, conn = false, options = Opts}
        = P
        = fetch(PeerT, Pid),
    insert(PeerT, P#peer{conn = true}),  %% mark replacement transport started
    start(Ref, T, Opts, S).              %% start new peer

fetch(Tid, Key) ->
    [T] = ets:lookup(Tid, Key),
    T.

%%% ---------------------------------------------------------------------------
%%% # connection_up/3
%%%
%%% Output: #state{}
%%% ---------------------------------------------------------------------------

%% Peer process has reached the open state.

connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT,
                                                      connT = ConnT}
                                               = S) ->
    P = fetch(PeerT, Pid),
    C = #conn{pid = TPid,
              apps = SApps,
              caps = Caps,
              peer = Pid},

    insert(ConnT, C),
    connection_up([Pkt], P#peer{conn = TPid}, C, S).

%%% ---------------------------------------------------------------------------
%%% # connection_up/2
%%%
%%% Output: #state{}
%%% ---------------------------------------------------------------------------

%% Peer process has transitioned back into the open state. Note that there
%% has been no new capabilties exchange in this case.

connection_up(Pid, #state{peerT = PeerT,
                          connT = ConnT}
                   = S) ->
    #peer{conn = TPid} = P = fetch(PeerT, Pid),
    C = fetch(ConnT, TPid),
    connection_up([], P, C, S).

%% connection_up/4

connection_up(T, P, C, #state{peerT = PeerT,
                              local_peers = LDict,
                              service_name = SvcName,
                              service
                              = #diameter_service{applications = Apps}}
                       = S) ->
    #peer{conn = TPid, op_state = ?STATE_DOWN}
        = P,
    #conn{apps = SApps, caps = Caps}
        = C,

    insert(PeerT, P#peer{op_state = ?STATE_UP}),

    request_peer_up(TPid),
    report_status(up, P, C, S, T),
    S#state{local_peers = insert_local_peer(SApps,
                                            {{TPid, Caps}, {SvcName, Apps}},
                                            LDict)}.

insert_local_peer(SApps, T, LDict) ->
    lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps).

ilp({Id, Alias}, {TC, SA}, LDict) ->
    init_conn(Id, Alias, TC, SA),
    ?Dict:append(Alias, TC, LDict).

init_conn(Id, Alias, TC, {SvcName, Apps}) ->
    #diameter_app{module = ModX,
                  id = Id}  %% assert
        = find_app(Alias, Apps),

    peer_cb({ModX, peer_up, [SvcName, TC]}, Alias).

find_app(Alias, Apps) ->
    lists:keyfind(Alias, #diameter_app.alias, Apps).

%% A failing peer callback brings down the service. In the case of
%% peer_up we could just kill the transport and emit an error but for
%% peer_down we have no way to cleanup any state change that peer_up
%% may have introduced.
peer_cb(MFA, Alias) ->
    try state_cb(MFA, Alias) of
        ModS ->
            mod_state(Alias, ModS)
    catch
        E: Reason ->
            ?ERROR({E, Reason, MFA, ?STACK})
    end.

%%% ---------------------------------------------------------------------------
%%% # connection_down/2
%%%
%%% Output: #state{}
%%% ---------------------------------------------------------------------------

%% Peer process has transitioned out of the open state.

connection_down(Pid, #state{peerT = PeerT,
                            connT = ConnT}
                     = S) ->
    #peer{op_state = ?STATE_UP,  %% assert
          conn = TPid}
        = P
        = fetch(PeerT, Pid),

    C = fetch(ConnT, TPid),
    insert(PeerT, P#peer{op_state = ?STATE_DOWN}),
    connection_down(P,C,S).

%% connection_down/3

connection_down(#peer{op_state = ?STATE_DOWN}, _, S) ->
    S;

connection_down(#peer{conn = TPid,
                      op_state = ?STATE_UP}
                = P,
                #conn{caps = Caps,
                      apps = SApps}
                = C,
                #state{service_name = SvcName,
                       service = #diameter_service{applications = Apps},
                       local_peers = LDict}
                = S) ->
    report_status(down, P, C, S, []),
    NewS = S#state{local_peers
                   = remove_local_peer(SApps,
                                       {{TPid, Caps}, {SvcName, Apps}},
                                       LDict)},
    request_peer_down(TPid, NewS),
    NewS.

remove_local_peer(SApps, T, LDict) ->
    lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps).

rlp({Id, Alias}, {TC, SA}, LDict) ->
    L = ?Dict:fetch(Alias, LDict),
    down_conn(Id, Alias, TC, SA),
    ?Dict:store(Alias, lists:delete(TC, L), LDict).

down_conn(Id, Alias, TC, {SvcName, Apps}) ->
    #diameter_app{module = ModX,
                  id = Id}  %% assert
        = find_app(Alias, Apps),

    peer_cb({ModX, peer_down, [SvcName, TC]}, Alias).

%%% ---------------------------------------------------------------------------
%%% # peer_down/3
%%%
%%% Output: #state{}
%%% ---------------------------------------------------------------------------

%% Peer process has died.

peer_down(Pid, Reason, #state{peerT = PeerT} = S) ->
    P = fetch(PeerT, Pid),
    ets:delete_object(PeerT, P),
    closed(Reason, P, S),
    restart(P,S),
    peer_down(P,S).

%% Send an event at connection establishment failure.
closed({shutdown, {close, _TPid, Reason}},
       #peer{op_state = ?STATE_DOWN,
             ref = Ref,
             type = Type,
             options = Opts},
       #state{service_name = SvcName}) ->
    send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}});
closed(_, _, _) ->
    ok.

%% The peer has never come up ...
peer_down(#peer{conn = B}, S)
  when is_boolean(B) ->
    S;

%% ... or it has.
peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) ->
    #conn{} = C = fetch(ConnT, TPid),
    ets:delete_object(ConnT, C),
    connection_down(P,C,S).

%% restart/2

restart(P,S) ->
    q_restart(restart(P), S).

%% restart/1

%% Always try to reconnect.
restart(#peer{ref = Ref,
              type = connect = T,
              options = Opts,
              started = Time}) ->
    {Time, {Ref, T, Opts}};

%% Transport connection hasn't yet been accepted ...
restart(#peer{ref = Ref,
              type = accept = T,
              options = Opts,
              conn = false,
              started = Time}) ->
    {Time, {Ref, T, Opts}};

%% ... or it has: a replacement transport has already been spawned.
restart(#peer{type = accept}) ->
    false.

%% q_restart/2

%% Start the reconnect timer.
q_restart({Time, {_Ref, Type, Opts} = T}, S) ->
    start_tc(tc(Time, default_tc(Type, Opts)), T, S);
q_restart(false, _) ->
    ok.

%% RFC 3588, 2.1:
%%
%%   When no transport connection exists with a peer, an attempt to
%%   connect SHOULD be periodically made.  This behavior is handled via
%%   the Tc timer, whose recommended value is 30 seconds.  There are
%%   certain exceptions to this rule, such as when a peer has terminated
%%   the transport connection stating that it does not wish to
%%   communicate.

default_tc(connect, Opts) ->
    proplists:get_value(reconnect_timer, Opts, ?DEFAULT_TC);
default_tc(accept, _) ->
    0.

%% Bound tc below if the peer was restarted recently to avoid
%% continuous in case of faulty config or other problems.
tc(Time, Tc) ->
    choose(Tc > ?RESTART_TC
             orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC,
           Tc,
           ?RESTART_TC).

start_tc(0, T, S) ->
    tc_timeout(T, S);
start_tc(Tc, T, _) ->
    erlang:send_after(Tc, self(), {tc_timeout, T}).

%% tc_timeout/2

tc_timeout({Ref, _Type, _Opts} = T, #state{service_name = SvcName} = S) ->
    tc(have_transport(SvcName, Ref), T, S).

tc(true, {Ref, Type, Opts}, #state{service_name = SvcName}
                            = S) ->
    send_event(SvcName, {reconnect, Ref, Opts}),
    start(Ref, Type, Opts, S);
tc(false = No, _, _) ->  %% removed
    No.

%%% ---------------------------------------------------------------------------
%%% # close/2
%%% ---------------------------------------------------------------------------

%% The watchdog doesn't start a new fsm in the accept case, it
%% simply stays alive until someone tells it to die in order for
%% another watchdog to be able to detect that it should transition
%% from initial into reopen rather than okay. That someone is either
%% the accepting watchdog upon reception of a CER from the previously
%% connected peer, or us after reconnect_timer timeout.

close(Pid, #state{service_name = SvcName,
                  peerT = PeerT}) ->
    #peer{pid = Pid,
          type = accept,
          ref = Ref,
          options = Opts}
        = fetch(PeerT, Pid),

    c(Pid, have_transport(SvcName, Ref), Opts).

%% Tell watchdog to (maybe) die later ...
c(Pid, true, Opts) ->
    Tc = proplists:get_value(reconnect_timer, Opts, 2*?DEFAULT_TC),
    erlang:send_after(Tc, Pid, close);

%% ... or now.
c(Pid, false, _Opts) ->
    Pid ! close.

%% The RFC's only document the behaviour of Tc, our reconnect_timer,
%% for the establishment of connections but we also give
%% reconnect_timer semantics for a listener, being the time within
%% which a new connection attempt is expected of a connecting peer.
%% The value should be greater than the peer's Tc + jitter.

%%% ---------------------------------------------------------------------------
%%% # reconnect/2
%%% ---------------------------------------------------------------------------

reconnect(Pid, #state{service_name = SvcName,
                      peerT = PeerT}) ->
    #peer{ref = Ref,
          type = connect,
          options = Opts}
        = fetch(PeerT, Pid),
    send_event(SvcName, {reconnect, Ref, Opts}).

%%% ---------------------------------------------------------------------------
%%% # call_module/4
%%% ---------------------------------------------------------------------------

%% Backwards compatibility and never documented/advertised. May be
%% removed.

call_module(Mod, Req, From, #state{service
                                   = #diameter_service{applications = Apps},
                                   service_name = Svc}
                            = S) ->
    case cm([A || A <- Apps, Mod == hd(A#diameter_app.module)],
            Req,
            From,
            Svc)
    of
        {reply = T, RC} ->
            {T, RC, S};
        noreply = T ->
            {T, S};
        Reason ->
            {reply, {error, Reason}, S}
    end.

cm([#diameter_app{module = ModX, alias = Alias}], Req, From, Svc) ->
    MFA = {ModX, handle_call, [Req, From, Svc]},

    try state_cb(MFA, Alias) of
        {noreply = T, ModS} ->
            mod_state(Alias, ModS),
            T;
        {reply = T, RC, ModS} ->
            mod_state(Alias, ModS),
            {T, RC};
        T ->
            diameter_lib:error_report({invalid, T}, MFA),
            invalid
    catch
        E: Reason ->
            diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA),
            failure
    end;

cm([], _, _, _) ->
    unknown;

cm([_,_|_], _, _, _) ->
    multiple.

%%% ---------------------------------------------------------------------------
%%% # send_request/5
%%% ---------------------------------------------------------------------------

%% Send an outgoing request in its dedicated process.
%%
%% Note that both encode of the outgoing request and of the received
%% answer happens in this process. It's also this process that replies
%% to the caller. The service process only handles the state-retaining
%% callbacks.
%%
%% The mod field of the #diameter_app{} here includes any extra
%% arguments passed to diameter:call/2.

send_request({TPid, Caps, App}, Msg, Opts, Caller, SvcName) ->
    #diameter_app{module = ModX}
        = App,

    Pkt = make_request_packet(Msg),

    case cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]) of
        {send, P} ->
            send_request(make_request_packet(P, Pkt),
                         TPid,
                         Caps,
                         App,
                         Opts,
                         Caller,
                         SvcName);
        {discard, Reason} ->
            {error, Reason};
        discard ->
            {error, discarded};
        T ->
            ?ERROR({invalid_return, prepare_request, App, T})
    end.

%% make_request_packet/1
%%
%% Turn an outgoing request as passed to call/4 into a diameter_packet
%% record in preparation for a prepare_request callback.

make_request_packet(Bin)
  when is_binary(Bin) ->
    #diameter_packet{header = diameter_codec:decode_header(Bin),
                     bin = Bin};

make_request_packet(#diameter_packet{msg = [#diameter_header{} = Hdr | Avps]}
                    = Pkt) ->
    Pkt#diameter_packet{msg = [make_request_header(Hdr) | Avps]};

make_request_packet(#diameter_packet{header = Hdr} = Pkt) ->
    Pkt#diameter_packet{header = make_request_header(Hdr)};

make_request_packet(Msg) ->
    make_request_packet(#diameter_packet{msg = Msg}).

%% make_request_header/1

make_request_header(undefined) ->
    Seq = diameter_session:sequence(),
    make_request_header(#diameter_header{end_to_end_id = Seq,
                                         hop_by_hop_id = Seq});

make_request_header(#diameter_header{version = undefined} = Hdr) ->
    make_request_header(Hdr#diameter_header{version = ?DIAMETER_VERSION});

make_request_header(#diameter_header{end_to_end_id = undefined} = H) ->
    Seq = diameter_session:sequence(),
    make_request_header(H#diameter_header{end_to_end_id = Seq});

make_request_header(#diameter_header{hop_by_hop_id = undefined} = H) ->
    Seq = diameter_session:sequence(),
    make_request_header(H#diameter_header{hop_by_hop_id = Seq});

make_request_header(#diameter_header{} = Hdr) ->
    Hdr;

make_request_header(T) ->
    ?ERROR({invalid_header, T}).

%% make_request_packet/2
%%
%% Reconstruct a diameter_packet from the return value of
%% prepare_request or prepare_retransmit callback.

make_request_packet(Bin, _)
  when is_binary(Bin) ->
    make_request_packet(Bin);

make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]}
                    = Pkt,
                    _) ->
    Pkt;

%% Returning a diameter_packet with no header from a prepare_request
%% or prepare_retransmit callback retains the header passed into it.
%% This is primarily so that the end to end and hop by hop identifiers
%% are retained.
make_request_packet(#diameter_packet{header = Hdr} = Pkt,
            #diameter_packet{header = Hdr0}) ->
    Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)};

make_request_packet(Msg, Pkt) ->
    Pkt#diameter_packet{msg = Msg}.

%% fold_record/2

fold_record(undefined, R) ->
    R;
fold_record(Rec, R) ->
    diameter_lib:fold_tuple(2, Rec, R).

%% send_request/7

send_request(Pkt, TPid, Caps, App, Opts, Caller, SvcName) ->
    #diameter_app{alias = Alias,
                  dictionary = Dict,
                  module = ModX,
                  answer_errors = AE}
        = App,

    EPkt = encode(Dict, Pkt),

    #options{filter = Filter,
             timeout = Timeout}
        = Opts,

    Req = #request{packet = Pkt,
                   from = Caller,
                   handler = self(),
                   transport = TPid,
                   caps = Caps,
                   app = Alias,
                   filter = Filter,
                   dictionary = Dict,
                   module = ModX},

    try
        TRef = send_request(TPid, EPkt, Req, Timeout),
        ack(Caller),
        handle_answer(SvcName, AE, recv_answer(Timeout, SvcName, {TRef, Req}))
    after
        erase_request(EPkt)
    end.

%% Tell caller a send has been attempted.
ack({Pid, Ref}) ->
    Pid ! Ref.

%% recv_answer/3

recv_answer(Timeout,
            SvcName,
            {TRef, #request{from = {_, Ref}, packet = RPkt} = Req}
            = T) ->

    %% Matching on TRef below ensures we ignore messages that pertain
    %% to a previous transport prior to failover. The answer message
    %% includes the #request{} since it's not necessarily Req; that
    %% is, from the last peer to which we've transmitted.

    receive
        {answer = A, Ref, Rq, Pkt} ->     %% Answer from peer
            {A, Rq, Pkt};
        {timeout = Reason, TRef, _} ->    %% No timely reply
            {error, Req, Reason};
        {failover = Reason, TRef, false} ->  %% No alternate peer
            {error, Req, Reason};
        {failover, TRef, Transport} ->    %% Resend to alternate peer
            try_retransmit(Timeout, SvcName, Req, Transport);
        {failover, TRef} ->  %% May have missed failover notification
            Seqs = diameter_codec:sequence_numbers(RPkt),
            Pid  = whois(SvcName),
            is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}),
            recv_answer(Timeout, SvcName, T)
    end.
%% Note that failover starts a new timer and that expiry of an old
%% timer value is ignored. This means that an answer could be accepted
%% from a peer after timeout in the case of failover.

try_retransmit(Timeout, SvcName, Req, Transport) ->
    try retransmit(Transport, Req, SvcName, Timeout) of
        T -> recv_answer(Timeout, SvcName, T)
    catch
        ?FAILURE(Reason) -> {error, Req, Reason}
    end.

%% handle_error/3

handle_error(Req, Reason, SvcName) ->
    #request{module = ModX,
             packet = Pkt,
             transport = TPid,
             caps = Caps}
        = Req,
    cb(ModX, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]).

msg(#diameter_packet{msg = undefined, bin = Bin}) ->
    Bin;
msg(#diameter_packet{msg = Msg}) ->
    Msg.

%% encode/2

%% Note that prepare_request can return a diameter_packet containing
%% header or transport_data. Even allow the returned record to contain
%% an encoded binary. This isn't the usual case but could some in
%% handy, for test at least. (For example, to send garbage.)

%% The normal case: encode the returned message.
encode(Dict, #diameter_packet{msg = Msg, bin = undefined} = Pkt) ->
    D = pick_dictionary([Dict, ?BASE], Msg),
    diameter_codec:encode(D, Pkt);

%% Callback has returned an encoded binary: just send.
encode(_, #diameter_packet{} = Pkt) ->
    Pkt.

%% pick_dictionary/2

%% Pick the first dictionary that declares the application id in the
%% specified header.
pick_dictionary(Ds, [#diameter_header{application_id = Id} | _]) ->
    pd(Ds, fun(D) -> Id = D:id() end);

%% Pick the first dictionary that knows the specified message name.
pick_dictionary(Ds, [MsgName|_]) ->
    pd(Ds, fun(D) -> D:msg2rec(MsgName) end);

%% Pick the first dictionary that knows the name of the specified
%% message record.
pick_dictionary(Ds, Rec) ->
    Name = element(1, Rec),
    pd(Ds, fun(D) -> D:rec2msg(Name) end).

pd([D|Ds], F) ->
    try
        F(D),
        D
    catch
        error:_ ->
            pd(Ds, F)
    end;

pd([], _) ->
    ?ERROR(no_dictionary).

%% send_request/4

send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, Timeout)
  when node() == node(TPid) ->
    %% Store the outgoing request before sending to avoid a race with
    %% reply reception.
    TRef = store_request(TPid, Bin, Req, Timeout),
    send(TPid, Pkt),
    TRef;

%% Send using a remote transport: spawn a process on the remote node
%% to relay the answer.
send_request(TPid, #diameter_packet{} = Pkt, Req, Timeout) ->
    TRef = erlang:start_timer(Timeout, self(), timeout),
    T = {TPid, Pkt, Req, Timeout, TRef},
    spawn(node(TPid), ?MODULE, send, [T]),
    TRef.

%% send/1

send({TPid, Pkt, #request{handler = Pid} = Req, Timeout, TRef}) ->
    Ref = send_request(TPid, Pkt, Req#request{handler = self()}, Timeout),
    Pid ! reref(receive T -> T end, Ref, TRef).

reref({T, Ref, R}, Ref, TRef) ->
    {T, TRef, R};
reref(T, _, _) ->
    T.

%% send/2

send(Pid, Pkt) ->
    Pid ! {send, Pkt}.

%% retransmit/4

retransmit({TPid, Caps, #diameter_app{alias = Alias} = App},
           #request{app = Alias,
                    packet = Pkt}
           = Req,
           SvcName,
           Timeout) ->
    have_request(Pkt, TPid)      %% Don't failover to a peer we've
        andalso ?THROW(timeout), %% already sent to.

    case cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]) of
        {send, P} ->
            retransmit(make_request_packet(P, Pkt), TPid, Caps, Req, Timeout);
        {discard, Reason} ->
            ?THROW(Reason);
        discard ->
            ?THROW(discarded);
        T ->
            ?ERROR({invalid_return, prepare_retransmit, App, T})
    end.

%% retransmit/5

retransmit(Pkt, TPid, Caps, #request{dictionary = Dict} = Req, Timeout) ->
    EPkt = encode(Dict, Pkt),

    NewReq = Req#request{transport = TPid,
                         packet = Pkt,
                         caps = Caps},

    ?LOG(retransmission, NewReq),
    TRef = send_request(TPid, EPkt, NewReq, Timeout),
    {TRef, NewReq}.

%% store_request/4

store_request(TPid, Bin, Req, Timeout) ->
    Seqs = diameter_codec:sequence_numbers(Bin),
    TRef = erlang:start_timer(Timeout, self(), timeout),
    ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}),
    ets:member(?REQUEST_TABLE, TPid)
        orelse (self() ! {failover, TRef}),  %% possibly missed failover
    TRef.

%% lookup_request/2

lookup_request(Msg, TPid)
  when is_pid(TPid) ->
    lookup(Msg, TPid, '_');

lookup_request(Msg, TRef)
  when is_reference(TRef) ->
    lookup(Msg, '_', TRef).

lookup(Msg, TPid, TRef) ->
    Seqs = diameter_codec:sequence_numbers(Msg),
    Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, TRef},
             [],
             ['$_']}],
    case ets:select(?REQUEST_TABLE, Spec) of
        [{_, Req, _}] ->
            Req;
        [] ->
            false
    end.

%% erase_request/1

erase_request(Pkt) ->
    ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)).

%% match_requests/1

match_requests(TPid) ->
    Pat = {'_', #request{transport = TPid, _ = '_'}, '_'},
    ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}]).

%% have_request/2

have_request(Pkt, TPid) ->
    Seqs = diameter_codec:sequence_numbers(Pkt),
    Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'},
    '$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1).

%% request_peer_up/1

request_peer_up(TPid) ->
    ets:insert(?REQUEST_TABLE, {TPid}).

%% request_peer_down/2

request_peer_down(TPid, S) ->
    ets:delete(?REQUEST_TABLE, TPid),
    lists:foreach(fun(T) -> failover(T,S) end, match_requests(TPid)).
%% Note that a request process can store its request after failover
%% notifications are sent here: store_request/4 sends the notification
%% in that case. Note also that we'll send as many notifications to a
%% given handler as there are peers its sent to. All but one of these
%% will be ignored.

%%% ---------------------------------------------------------------------------
%%% recv_request/3
%%% ---------------------------------------------------------------------------

recv_request(TPid, Pkt, {ConnT, SvcName, Apps}) ->
    try ets:lookup(ConnT, TPid) of
        [C] ->
            recv_request(C, TPid, Pkt, SvcName, Apps);
        [] ->             %% transport has gone down
            ok
    catch
        error: badarg ->  %% service has gone down (and taken table with it)
            ok
    end.

%% recv_request/5

recv_request(#conn{apps = SApps, caps = Caps}, TPid, Pkt, SvcName, Apps) ->
    #diameter_caps{origin_host = {OH,_},
                   origin_realm = {OR,_}}
        = Caps,

    #diameter_packet{header = #diameter_header{application_id = Id}}
        = Pkt,

    recv_request(find_recv_app(Id, SApps),
                 {SvcName, OH, OR},
                 TPid,
                 Apps,
                 Caps,
                 Pkt).

%% find_recv_app/2

%% No one should be sending the relay identifier.
find_recv_app(?APP_ID_RELAY, _) ->
    false;

%% With any other id we either support it locally or as a relay.
find_recv_app(Id, SApps) ->
    keyfind([Id, ?APP_ID_RELAY], 1, SApps).

%% keyfind/3

keyfind([], _, _) ->
    false;
keyfind([Key | Rest], Pos, L) ->
    case lists:keyfind(Key, Pos, L) of
        false ->
            keyfind(Rest, Pos, L);
        T ->
            T
    end.

%% recv_request/6

recv_request({Id, Alias}, T, TPid, Apps, Caps, Pkt) ->
    #diameter_app{dictionary = Dict}
        = A
        = find_app(Alias, Apps),
    recv_request(T, {TPid, Caps}, A, diameter_codec:decode(Id, Dict, Pkt));
%% Note that the decode is different depending on whether or not Id is
%% ?APP_ID_RELAY.

%%   DIAMETER_APPLICATION_UNSUPPORTED   3007
%%      A request was sent for an application that is not supported.

recv_request(false, T, TPid, _, _, Pkt) ->
    As = collect_avps(Pkt),
    protocol_error(3007, T, TPid, Pkt#diameter_packet{avps = As}).

collect_avps(Pkt) ->
    case diameter_codec:collect_avps(Pkt) of
        {_Bs, As} ->
            As;
        As ->
            As
    end.

%% recv_request/4

%% Wrong number of bits somewhere in the message: reply.
%%
%%   DIAMETER_INVALID_AVP_BITS          3009
%%      A request was received that included an AVP whose flag bits are
%%      set to an unrecognized value, or that is inconsistent with the
%%      AVP's definition.
%%
recv_request(T, {TPid, _}, _, #diameter_packet{errors = [Bs | _]} = Pkt)
  when is_bitstring(Bs) ->
    protocol_error(3009, T, TPid, Pkt);

%% Either we support this application but don't recognize the command
%% or we're a relay and the command isn't proxiable.
%%
%%   DIAMETER_COMMAND_UNSUPPORTED       3001
%%      The Request contained a Command-Code that the receiver did not
%%      recognize or support.  This MUST be used when a Diameter node
%%      receives an experimental command that it does not understand.
%%
recv_request(T,
             {TPid, _},
             #diameter_app{id = Id},
             #diameter_packet{header = #diameter_header{is_proxiable = P},
                              msg = M}
             = Pkt)
  when ?APP_ID_RELAY /= Id, undefined == M;
       ?APP_ID_RELAY == Id, not P ->
    protocol_error(3001, T, TPid, Pkt);

%% Error bit was set on a request.
%%
%%   DIAMETER_INVALID_HDR_BITS          3008
%%      A request was received whose bits in the Diameter header were
%%      either set to an invalid combination, or to a value that is
%%      inconsistent with the command code's definition.
%%
recv_request(T,
             {TPid, _},
             _,
             #diameter_packet{header = #diameter_header{is_error = true}}
             = Pkt) ->
    protocol_error(3008, T, TPid, Pkt);

%% A message in a locally supported application or a proxiable message
%% in the relay application. Don't distinguish between the two since
%% each application has its own callback config. That is, the user can
%% easily distinguish between the two cases.
recv_request(T, TC, App, Pkt) ->
    request_cb(T, TC, App, examine(Pkt)).

%% Note that there may still be errors but these aren't protocol
%% (3xxx) errors that lead to an answer-message.

request_cb({SvcName, _OH, _OR} = T, TC, App, Pkt) ->
    request_cb(cb(App, handle_request, [Pkt, SvcName, TC]), App, T, TC, Pkt).

%% examine/1
%%
%% Look for errors in a decoded message. Length errors result in
%% decode failure in diameter_codec.

examine(#diameter_packet{header = #diameter_header{version
                                                   = ?DIAMETER_VERSION}}
        = Pkt) ->
    Pkt;

%%   DIAMETER_UNSUPPORTED_VERSION       5011
%%      This error is returned when a request was received, whose version
%%      number is unsupported.

examine(#diameter_packet{errors = Es} = Pkt) ->
    Pkt#diameter_packet{errors = [5011 | Es]}.
%% It's odd/unfortunate that this isn't a protocol error.

%% request_cb/5

%% A reply may be an answer-message, constructed either here or by
%% the handle_request callback. The header from the incoming request
%% is passed into the encode so that it can retrieve the relevant
%% command code in this case. It will also then ignore Dict and use
%% the base encoder.
request_cb({reply, Ans},
           #diameter_app{dictionary = Dict},
           _,
           {TPid, _},
           Pkt) ->
    reply(Ans, Dict, TPid, Pkt);

%% An 3xxx result code, for which the E-bit is set in the header.
request_cb({protocol_error, RC}, _, T, {TPid, _}, Pkt)
  when 3000 =< RC, RC < 4000 ->
    protocol_error(RC, T, TPid, Pkt);

%% RFC 3588 says we must reply 3001 to anything unrecognized or
%% unsupported. 'noreply' is undocumented (and inappropriately named)
%% backwards compatibility for this, protocol_error the documented
%% alternative.
request_cb(noreply, _, T, {TPid, _}, Pkt) ->
    protocol_error(3001, T, TPid, Pkt);

%% Relay a request to another peer. This is equivalent to doing an
%% explicit call/4 with the message in question except that (1) a loop
%% will be detected by examining Route-Record AVP's, (3) a
%% Route-Record AVP will be added to the outgoing request and (3) the
%% End-to-End Identifier will default to that in the
%% #diameter_header{} without the need for an end_to_end_identifier
%% option.
%%
%% relay and proxy are similar in that they require the same handling
%% with respect to Route-Record and End-to-End identifier. The
%% difference is that a proxy advertises specific applications, while
%% a relay advertises the relay application. If a callback doesn't
%% want to distinguish between the cases in the callback return value
%% then 'resend' is a neutral alternative.
%%
request_cb({A, Opts},
           #diameter_app{id = Id}
           = App,
           T,
           TC,
           Pkt)
  when A == relay, Id == ?APP_ID_RELAY;
       A == proxy, Id /= ?APP_ID_RELAY;
       A == resend ->
    resend(Opts, App, T, TC, Pkt);

request_cb(discard, _, _, _, _) ->
    ok;

request_cb({eval, RC, F}, App, T, TC, Pkt) ->
    request_cb(RC, App, T, TC, Pkt),
    diameter_lib:eval(F).

%% protocol_error/4

protocol_error(RC, {_, OH, OR}, TPid, #diameter_packet{avps = Avps} = Pkt) ->
    ?LOG({error, RC}, Pkt),
    reply(answer_message({OH, OR, RC}, Avps), ?BASE, TPid, Pkt).

%% resend/5
%%
%% Resend a message as a relay or proxy agent.

resend(Opts,
       #diameter_app{} = App,
       {_SvcName, OH, _OR} = T,
       {_TPid, _Caps} = TC,
       #diameter_packet{avps = Avps} = Pkt) ->
    {Code, _Flags, Vid} = ?BASE:avp_header('Route-Record'),
    resend(is_loop(Code, Vid, OH, Avps), Opts, App, T, TC, Pkt).

%%   DIAMETER_LOOP_DETECTED             3005
%%      An agent detected a loop while trying to get the message to the
%%      intended recipient.  The message MAY be sent to an alternate peer,
%%      if one is available, but the peer reporting the error has
%%      identified a configuration problem.

resend(true, _, _, T, {TPid, _}, Pkt) ->  %% Route-Record loop
    protocol_error(3005, T, TPid, Pkt);

%% 6.1.8.  Relaying and Proxying Requests
%%
%%   A relay or proxy agent MUST append a Route-Record AVP to all requests
%%   forwarded.  The AVP contains the identity of the peer the request was
%%   received from.

resend(false,
       Opts,
       App,
       {SvcName, _, _} = T,
       {TPid, #diameter_caps{origin_host = {_, OH}}},
       #diameter_packet{header = Hdr0,
                        avps = Avps}
       = Pkt) ->
    Route = #diameter_avp{data = {?BASE, 'Route-Record', OH}},
    Seq = diameter_session:sequence(),
    Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq},
    Msg = [Hdr, Route | Avps],
    resend(call(SvcName, App, Msg, Opts), T, TPid, Pkt).
%% The incoming request is relayed with the addition of a
%% Route-Record. Note the requirement on the return from call/4 below,
%% which places a requirement on the value returned by the
%% handle_answer callback of the application module in question.
%%
%% Note that there's nothing stopping the request from being relayed
%% back to the sender. A pick_peer callback may want to avoid this but
%% a smart peer might recognize the potential loop and choose another
%% route. A less smart one will probably just relay the request back
%% again and force us to detect the loop. A pick_peer that wants to
%% avoid this can specify filter to avoid the possibility.
%% Eg. {neg, {host, OH} where #diameter_caps{origin_host = {OH, _}}.
%%
%% RFC 6.3 says that a relay agent does not modify Origin-Host but
%% says nothing about a proxy. Assume it should behave the same way.

%% resend/4
%%
%% Relay a reply to a relayed request.

%% Answer from the peer: reset the hop by hop identifier and send.
resend(#diameter_packet{bin = B}
       = Pkt,
       _,
       TPid,
       #diameter_packet{header = #diameter_header{hop_by_hop_id = Id},
                        transport_data = TD}) ->
    send(TPid, Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B),
                                   transport_data = TD});
%% TODO: counters

%% Or not: DIAMETER_UNABLE_TO_DELIVER.
resend(_, T, TPid, Pkt) ->
    protocol_error(3002, T, TPid, Pkt).

%% is_loop/4
%%
%% Is there a Route-Record AVP with our Origin-Host?

is_loop(Code,
        Vid,
        Bin,
        [#diameter_avp{code = Code, vendor_id = Vid, data = Bin} | _]) ->
    true;

is_loop(_, _, _, []) ->
    false;

is_loop(Code, Vid, OH, [_ | Avps])
  when is_binary(OH) ->
    is_loop(Code, Vid, OH, Avps);

is_loop(Code, Vid, OH, Avps) ->
    is_loop(Code, Vid, ?BASE:avp(encode, OH, 'Route-Record'), Avps).

%% reply/4
%%
%% Send a locally originating reply.

%% No errors or a diameter_header/avp list.
reply(Msg, Dict, TPid, #diameter_packet{errors = Es,
                                        transport_data = TD}
                       = ReqPkt)
  when [] == Es;
       is_record(hd(Msg), diameter_header) ->
    Pkt = diameter_codec:encode(Dict, make_answer_packet(Msg, ReqPkt)),
    incr(send, Pkt, TPid),  %% count result codes in sent answers
    send(TPid, Pkt#diameter_packet{transport_data = TD});

%% Or not: set Result-Code and Failed-AVP AVP's.
reply(Msg, Dict, TPid, #diameter_packet{errors = [H|_] = Es} = Pkt) ->
    reply(rc(Msg, rc(H), [A || {_,A} <- Es], Dict),
          Dict,
          TPid,
          Pkt#diameter_packet{errors = []}).

%% make_answer_packet/2

%% Binaries and header/avp lists are sent as-is.
make_answer_packet(Bin, _)
  when is_binary(Bin) ->
    #diameter_packet{bin = Bin};
make_answer_packet([#diameter_header{} | _] = Msg, _) ->
    #diameter_packet{msg = Msg};

%% Otherwise a reply message clears the R and T flags and retains the
%% P flag. The E flag will be set at encode. 6.2 of 3588 requires the
%% same P flag on an answer as on the request.
make_answer_packet(Msg, #diameter_packet{header = ReqHdr}) ->
    Hdr = ReqHdr#diameter_header{version = ?DIAMETER_VERSION,
                                 is_request = false,
                                 is_error = undefined,
                                 is_retransmitted = false},
    #diameter_packet{header = Hdr,
                     msg = Msg}.

%% rc/1

rc({RC, _}) ->
    RC;
rc(RC) ->
    RC.

%% rc/4

rc(Rec, RC, Failed, Dict)
  when is_integer(RC) ->
    set(Rec, [{'Result-Code', RC} | failed_avp(Rec, Failed, Dict)], Dict).

%% Reply as name and tuple list ...
set([_|_] = Ans, Avps, _) ->
    Ans ++ Avps;  %% Values nearer tail take precedence.

%% ... or record.
set(Rec, Avps, Dict) ->
    Dict:'#set-'(Avps, Rec).

%% failed_avp/3

failed_avp(_, [] = No, _) ->
    No;

failed_avp(Rec, Failed, Dict) ->
    [fa(Rec, [{'AVP', Failed}], Dict)].

%% Reply as name and tuple list ...
fa([MsgName | Values], FailedAvp, Dict) ->
    R = Dict:msg2rec(MsgName),
    try
        Dict:'#info-'(R, {index, 'Failed-AVP'}),
        {'Failed-AVP', [FailedAvp]}
    catch
        error: _ ->
            Avps = proplists:get_value('AVP', Values, []),
            A = #diameter_avp{name = 'Failed-AVP',
                              value = FailedAvp},
            {'AVP', [A|Avps]}
    end;

%% ... or record.
fa(Rec, FailedAvp, Dict) ->
    try
        {'Failed-AVP', [FailedAvp]}
    catch
        error: _ ->
            Avps = Dict:'get-'('AVP', Rec),
            A = #diameter_avp{name = 'Failed-AVP',
                              value = FailedAvp},
            {'AVP', [A|Avps]}
    end.

%% 3.  Diameter Header
%%
%%       E(rror)     - If set, the message contains a protocol error,
%%                     and the message will not conform to the ABNF
%%                     described for this command.  Messages with the 'E'
%%                     bit set are commonly referred to as error
%%                     messages.  This bit MUST NOT be set in request
%%                     messages.  See Section 7.2.

%% 3.2.  Command Code ABNF specification
%%
%%    e-bit            = ", ERR"
%%                       ; If present, the 'E' bit in the Command
%%                       ; Flags is set, indicating that the answer
%%                       ; message contains a Result-Code AVP in
%%                       ; the "protocol error" class.

%% 7.1.3.  Protocol Errors
%%
%%    Errors that fall within the Protocol Error category SHOULD be treated
%%    on a per-hop basis, and Diameter proxies MAY attempt to correct the
%%    error, if it is possible.  Note that these and only these errors MUST
%%    only be used in answer messages whose 'E' bit is set.

%% Thus, only construct answers to protocol errors. Other errors
%% require an message-specific answer and must be handled by the
%% application.

%% 6.2.  Diameter Answer Processing
%%
%%    When a request is locally processed, the following procedures MUST be
%%    applied to create the associated answer, in addition to any
%%    additional procedures that MAY be discussed in the Diameter
%%    application defining the command:
%%
%%    -  The same Hop-by-Hop identifier in the request is used in the
%%       answer.
%%
%%    -  The local host's identity is encoded in the Origin-Host AVP.
%%
%%    -  The Destination-Host and Destination-Realm AVPs MUST NOT be
%%       present in the answer message.
%%
%%    -  The Result-Code AVP is added with its value indicating success or
%%       failure.
%%
%%    -  If the Session-Id is present in the request, it MUST be included
%%       in the answer.
%%
%%    -  Any Proxy-Info AVPs in the request MUST be added to the answer
%%       message, in the same order they were present in the request.
%%
%%    -  The 'P' bit is set to the same value as the one in the request.
%%
%%    -  The same End-to-End identifier in the request is used in the
%%       answer.
%%
%%    Note that the error messages (see Section 7.3) are also subjected to
%%    the above processing rules.

%% 7.3.  Error-Message AVP
%%
%%    The Error-Message AVP (AVP Code 281) is of type UTF8String.  It MAY
%%    accompany a Result-Code AVP as a human readable error message.  The
%%    Error-Message AVP is not intended to be useful in real-time, and
%%    SHOULD NOT be expected to be parsed by network entities.

%% answer_message/2

answer_message({OH, OR, RC}, Avps) ->
    {Code, _, Vid} = ?BASE:avp_header('Session-Id'),
    ['answer-message', {'Origin-Host', OH},
                       {'Origin-Realm', OR},
                       {'Result-Code', RC}
                       | session_id(Code, Vid, Avps)].

session_id(Code, Vid, Avps)
  when is_list(Avps) ->
    try
        {value, #diameter_avp{data = D}} = find_avp(Code, Vid, Avps),
        [{'Session-Id', [?BASE:avp(decode, D, 'Session-Id')]}]
    catch
        error: _ ->
            []
    end.

%% find_avp/3

find_avp(Code, Vid, Avps)
  when is_integer(Code), (undefined == Vid orelse is_integer(Vid)) ->
    find(fun(A) -> is_avp(Code, Vid, A) end, Avps).

%% The final argument here could be a list of AVP's, depending on the case,
%% but we're only searching at the top level.
is_avp(Code, Vid, #diameter_avp{code = Code, vendor_id = Vid}) ->
    true;
is_avp(_, _, _) ->
    false.

find(_, []) ->
    false;
find(Pred, [H|T]) ->
    case Pred(H) of
        true ->
            {value, H};
        false ->
            find(Pred, T)
    end.

%% 7.  Error Handling
%%
%%    There are certain Result-Code AVP application errors that require
%%    additional AVPs to be present in the answer.  In these cases, the
%%    Diameter node that sets the Result-Code AVP to indicate the error
%%    MUST add the AVPs.  Examples are:
%%
%%    -  An unrecognized AVP is received with the 'M' bit (Mandatory bit)
%%       set, causes an answer to be sent with the Result-Code AVP set to
%%       DIAMETER_AVP_UNSUPPORTED, and the Failed-AVP AVP containing the
%%       offending AVP.
%%
%%    -  An AVP that is received with an unrecognized value causes an
%%       answer to be returned with the Result-Code AVP set to
%%       DIAMETER_INVALID_AVP_VALUE, with the Failed-AVP AVP containing the
%%       AVP causing the error.
%%
%%    -  A command is received with an AVP that is omitted, yet is
%%       mandatory according to the command's ABNF.  The receiver issues an
%%       answer with the Result-Code set to DIAMETER_MISSING_AVP, and
%%       creates an AVP with the AVP Code and other fields set as expected
%%       in the missing AVP.  The created AVP is then added to the Failed-
%%       AVP AVP.
%%
%%    The Result-Code AVP describes the error that the Diameter node
%%    encountered in its processing.  In case there are multiple errors,
%%    the Diameter node MUST report only the first error it encountered
%%    (detected possibly in some implementation dependent order).  The
%%    specific errors that can be described by this AVP are described in
%%    the following section.

%% 7.5.  Failed-AVP AVP
%%
%%    The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides
%%    debugging information in cases where a request is rejected or not
%%    fully processed due to erroneous information in a specific AVP.  The
%%    value of the Result-Code AVP will provide information on the reason
%%    for the Failed-AVP AVP.
%%
%%    The possible reasons for this AVP are the presence of an improperly
%%    constructed AVP, an unsupported or unrecognized AVP, an invalid AVP
%%    value, the omission of a required AVP, the presence of an explicitly
%%    excluded AVP (see tables in Section 10), or the presence of two or
%%    more occurrences of an AVP which is restricted to 0, 1, or 0-1
%%    occurrences.
%%
%%    A Diameter message MAY contain one Failed-AVP AVP, containing the
%%    entire AVP that could not be processed successfully.  If the failure
%%    reason is omission of a required AVP, an AVP with the missing AVP
%%    code, the missing vendor id, and a zero filled payload of the minimum
%%    required length for the omitted AVP will be added.

%%% ---------------------------------------------------------------------------
%%% # handle_answer/3
%%% ---------------------------------------------------------------------------

%% Process an answer message in call-specific process.

handle_answer(SvcName, _, {error, Req, Reason}) ->
    handle_error(Req, Reason, SvcName);

handle_answer(SvcName,
              AnswerErrors,
              {answer, #request{dictionary = Dict} = Req, Pkt}) ->
    a(examine(diameter_codec:decode(Dict, Pkt)),
      SvcName,
      AnswerErrors,
      Req).

%% We don't really need to do a full decode if we're a relay and will
%% just resend with a new hop by hop identifier, but might a proxy
%% want to examine the answer?

a(#diameter_packet{errors = []}
  = Pkt,
  SvcName,
  AE,
  #request{transport = TPid,
           caps = Caps,
           packet = P}
  = Req) ->
    try
        incr(in, Pkt, TPid)
    of
        _ ->
            cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}])
    catch
        exit: {invalid_error_bit, _} = E ->
            e(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req)
    end;

a(#diameter_packet{} = Pkt, SvcName, AE, Req) ->
    e(Pkt, SvcName, AE, Req).

e(Pkt, SvcName, callback, #request{transport = TPid,
                                   caps = Caps,
                                   packet = Pkt}
                          = Req) ->
    cb(Req, handle_answer, [Pkt, msg(Pkt), SvcName, {TPid, Caps}]);
e(Pkt, SvcName, report, Req) ->
    x(errors, handle_answer, [SvcName, Req, Pkt]);
e(Pkt, SvcName, discard, Req) ->
    x({errors, handle_answer, [SvcName, Req, Pkt]}).

%% Note that we don't check that the application id in the answer's
%% header is what we expect. (TODO: Does the rfc says anything about
%% this?)

%% incr/4
%%
%% Increment a stats counter for an incoming or outgoing message.

%% TODO: fix
incr(_, #diameter_packet{msg = undefined}, _) ->
    ok;

incr(Dir, Pkt, TPid)
  when is_pid(TPid) ->
    #diameter_packet{header = #diameter_header{is_error = E}
                            = Hdr,
                     msg = Rec}
        = Pkt,

    RC = int(get_avp_value(?BASE, 'Result-Code', Rec)),
    PE = is_protocol_error(RC),

    %% Check that the E bit is set only for 3xxx result codes.
    (not (E orelse PE))
        orelse (E andalso PE)
        orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]),

    Ctr = rc_counter(Rec, RC),
    is_tuple(Ctr)
        andalso incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}).

%% incr/2

incr(TPid, Counter) ->
    diameter_stats:incr(Counter, TPid, 1).

%% RFC 3588, 7.6:
%%
%%   All Diameter answer messages defined in vendor-specific
%%   applications MUST include either one Result-Code AVP or one
%%   Experimental-Result AVP.
%%
%% Maintain statistics assuming one or the other, not both, which is
%% surely the intent of the RFC.

rc_counter(_, RC)
  when is_integer(RC) ->
    {'Result-Code', RC};
rc_counter(Rec, _) ->
    rcc(get_avp_value(?BASE, 'Experimental-Result', Rec)).

%% Outgoing answers may be in any of the forms messages can be sent
%% in. Incoming messages will be records. We're assuming here that the
%% arity of the result code AVP's is 0 or 1.

rcc([{_,_,RC} = T])
  when is_integer(RC) ->
    T;
rcc({_,_,RC} = T)
  when is_integer(RC) ->
    T;
rcc(_) ->
    undefined.

int([N])
  when is_integer(N) ->
    N;
int(N)
  when is_integer(N) ->
    N;
int(_) ->
    undefined.

is_protocol_error(RC) ->
    3000 =< RC andalso RC < 4000.

-spec x(any(), atom(), list()) -> no_return().

%% Warn and exit request process on errors in an incoming answer.
x(Reason, F, A) ->
    diameter_lib:warning_report(Reason, {?MODULE, F, A}),
    x(Reason).

x(T) ->
    exit(T).

%%% ---------------------------------------------------------------------------
%%% # failover/[23]
%%% ---------------------------------------------------------------------------

%% Failover as a consequence of request_peer_down/2.
failover({_, #request{handler = Pid} = Req, TRef}, S) ->
    Pid ! {failover, TRef, rt(Req, S)}.

%% Failover as a consequence of store_request/4.
failover(TRef, Seqs, S)
  when is_reference(TRef) ->
    case lookup_request(Seqs, TRef) of
        #request{} = Req ->
            failover({Seqs, Req, TRef}, S);
        false ->
            ok
    end.

%% prepare_request returned a binary ...
rt(#request{packet = #diameter_packet{msg = undefined}}, _) ->
    false;  %% TODO: Not what we should do.

%% ... or not.
rt(#request{packet = #diameter_packet{msg = Msg}} = Req, S) ->
    find_transport(get_destination(Msg), Req, S).

%%% ---------------------------------------------------------------------------
%%% # report_status/5
%%% ---------------------------------------------------------------------------

report_status(Status,
              #peer{ref = Ref,
                    conn = TPid,
                    type = Type,
                    options = Opts},
              #conn{apps = [_|_] = As,
                    caps = Caps},
              #state{service_name = SvcName}
              = S,
              Extra) ->
    share_peer(Status, Caps, As, TPid, S),
    Info = [Status, Ref, {TPid, Caps}, {type(Type), Opts} | Extra],
    send_event(SvcName, list_to_tuple(Info)).

%% send_event/2

send_event(SvcName, Info) ->
    send_event(#diameter_event{service = SvcName,
                               info = Info}).

send_event(#diameter_event{service = SvcName} = E) ->
    lists:foreach(fun({_, Pid}) -> Pid ! E end, subscriptions(SvcName)).

%%% ---------------------------------------------------------------------------
%%% # share_peer/5
%%% ---------------------------------------------------------------------------

share_peer(up, Caps, Aliases, TPid, #state{share_peers = true,
                                           service_name = Svc}) ->
    diameter_peer:notify(Svc, {peer, TPid, Aliases, Caps});

share_peer(_, _, _, _, _) ->
    ok.

%%% ---------------------------------------------------------------------------
%%% # share_peers/2
%%% ---------------------------------------------------------------------------

share_peers(Pid, #state{share_peers = true,
                        local_peers = PDict}) ->
    ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict);

share_peers(_, #state{share_peers = false}) ->
    ok.

sp(Pid, Alias, Peers) ->
    lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers).

%%% ---------------------------------------------------------------------------
%%% # remote_peer_up/4
%%% ---------------------------------------------------------------------------

remote_peer_up(Pid, Aliases, Caps, #state{use_shared_peers = true,
                                          service = Svc,
                                          shared_peers = PDict}
                                   = S) ->
    #diameter_service{applications = Apps} = Svc,
    Update = lists:filter(fun(A) ->
                                  lists:keymember(A, #diameter_app.alias, Apps)
                          end,
                          Aliases),
    S#state{shared_peers = rpu(Pid, Caps, PDict, Update)};

remote_peer_up(_, _, _, #state{use_shared_peers = false} = S) ->
    S.

rpu(_, _, PDict, []) ->
    PDict;
rpu(Pid, Caps, PDict, Aliases) ->
    erlang:monitor(process, Pid),
    T = {Pid, Caps},
    lists:foldl(fun(A,D) -> ?Dict:append(A, T, D) end,
                PDict,
                Aliases).

%%% ---------------------------------------------------------------------------
%%% # remote_peer_down/2
%%% ---------------------------------------------------------------------------

remote_peer_down(Pid, #state{use_shared_peers = true,
                             shared_peers = PDict}
                      = S) ->
    S#state{shared_peers = lists:foldl(fun(A,D) -> rpd(Pid, A, D) end,
                                       PDict,
                                       ?Dict:fetch_keys(PDict))}.

rpd(Pid, Alias, PDict) ->
    ?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict).

%%% ---------------------------------------------------------------------------
%%% find_transport/[34]
%%%
%%% Output: {TransportPid, #diameter_caps{}, #diameter_app{}}
%%%         | false
%%%         | {error, Reason}
%%% ---------------------------------------------------------------------------

%% Initial call, from an arbitrary process.
find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) ->
    #diameter_service{applications = Apps} = Svc,
    ft(find_send_app(Alias, Apps), Msg, Opts, S);

%% Relay or proxy send.
find_transport(#diameter_app{} = App, Msg, Opts, S) ->
    ft(App, Msg, Opts, S).

ft(#diameter_app{module = Mod} = App, Msg, Opts, S) ->
    #options{filter = Filter,
             extra = Xtra}
        = Opts,
    pick_peer(App#diameter_app{module = Mod ++ Xtra},
              get_destination(Msg),
              Filter,
              S);
ft(false = No, _, _, _) ->
    No.

%% This can't be used if we're a relay and sending a message
%% in an application not known locally. (TODO)
find_send_app(Alias, Apps) ->
    case lists:keyfind(Alias, #diameter_app.alias, Apps) of
        #diameter_app{id = ?APP_ID_RELAY} ->
            false;
        T ->
            T
    end.

%% Retransmission, in the service process.
find_transport([_,_] = RH,
               Req,
               #state{service = #diameter_service{pid = Pid,
                                                  applications = Apps}}
               = S)
  when self() == Pid ->
    #request{app = Alias,
             filter = Filter,
             module = ModX}
        = Req,
    #diameter_app{}
        = App
        = lists:keyfind(Alias, #diameter_app.alias, Apps),

    pick_peer(App#diameter_app{module = ModX},
              RH,
              Filter,
              S).

%% get_destination/1

get_destination(Msg) ->
    [str(get_avp_value(?BASE, 'Destination-Realm', Msg)),
     str(get_avp_value(?BASE, 'Destination-Host', Msg))].

%% This is not entirely correct. The avp could have an arity 1, in
%% which case an empty list is a DiameterIdentity of length 0 rather
%% than the list of no values we treat it as by mapping to undefined.
%% This behaviour is documented.
str([]) ->
    undefined;
str(T) ->
    T.

%% get_avp_value/3
%%
%% Find an AVP in a message of one of three forms:
%%
%% - a message record (as generated from a .dia spec) or
%% - a list of an atom message name followed by 2-tuple, avp name/value pairs.
%% - a list of a #diameter_header{} followed by #diameter_avp{} records,
%%
%% In the first two forms a dictionary module is used at encode to
%% identify the type of the AVP and its arity in the message in
%% question. The third form allows messages to be sent as is, without
%% a dictionary, which is needed in the case of relay agents, for one.

get_avp_value(Dict, Name, [#diameter_header{} | Avps]) ->
    try
        {Code, _, VId} = Dict:avp_header(Name),
        [A|_] = lists:dropwhile(fun(#diameter_avp{code = C, vendor_id = V}) ->
                                        C /= Code orelse V /= VId
                                end,
                                Avps),
        avp_decode(Dict, Name, A)
    catch
        error: _ ->
            undefined
    end;

get_avp_value(_, Name, [_MsgName | Avps]) ->
    case lists:keyfind(Name, 1, Avps) of
        {_, V} ->
            V;
        _ ->
            undefined
    end;

%% Message is typically a record but not necessarily: diameter:call/4
%% can be passed an arbitrary term.
get_avp_value(Dict, Name, Rec) ->
    try
        Dict:'#get-'(Name, Rec)
    catch
        error:_ ->
            undefined
    end.

avp_decode(Dict, Name, #diameter_avp{value = undefined,
                                     data = Bin}) ->
    Dict:avp(decode, Bin, Name);
avp_decode(_, _, #diameter_avp{value = V}) ->
    V.

%%% ---------------------------------------------------------------------------
%%% # pick_peer(App, [DestRealm, DestHost], Filter, #state{})
%%%
%%% Output: {TransportPid, #diameter_caps{}, App}
%%%         | false
%%%         | {error, Reason}
%%% ---------------------------------------------------------------------------

%% Find transports to a given realm/host.

pick_peer(#diameter_app{alias = Alias}
          = App,
          [_,_] = RH,
          Filter,
          #state{local_peers = L,
                 shared_peers = S,
                 service_name = SvcName,
                 service = #diameter_service{pid = Pid}}) ->
    pick_peer(peers(Alias, RH, Filter, L),
              peers(Alias, RH, Filter, S),
              Pid,
              SvcName,
              App).

%% pick_peer/5

pick_peer([], [], _, _, _) ->
    false;

%% App state is mutable but we're not in the service process: go there.
pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App)
  when self() /= Pid ->
    call_service(Pid, {pick_peer, Local, Remote, App});

%% App state isn't mutable or it is and we're in the service process:
%% do the deed.
pick_peer(Local,
          Remote,
          _Pid,
          SvcName,
          #diameter_app{module = ModX,
                        alias = Alias,
                        init_state = S,
                        mutable = M}
          = App) ->
    MFA = {ModX, pick_peer, [Local, Remote, SvcName]},

    try state_cb(App, MFA) of
        {ok, {TPid, #diameter_caps{} = Caps}} when is_pid(TPid) ->
            {TPid, Caps, App};
        {{TPid, #diameter_caps{} = Caps}, ModS} when is_pid(TPid), M ->
            mod_state(Alias, ModS),
            {TPid, Caps, App};
        {false = No, ModS} when M ->
            mod_state(Alias, ModS),
            No;
        {ok, false = No} ->
            No;
        false = No ->
            No;
        {{TPid, #diameter_caps{} = Caps}, S} when is_pid(TPid) ->
            {TPid, Caps, App};     %% Accept returned state in the immutable
        {false = No, S} ->         %% case as long it isn't changed.
            No;
        T ->
            diameter_lib:error_report({invalid, T, App}, MFA)
    catch
        E: Reason ->
            diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA)
    end.

%% peers/4

peers(Alias, RH, Filter, Peers) ->
    case ?Dict:find(Alias, Peers) of
        {ok, L} ->
            ps(L, RH, Filter, {[],[]});
        error ->
            []
    end.

%% Place a peer whose Destination-Host/Realm matches those of the
%% request at the front of the result list. Could add some sort of
%% 'sort' option to allow more control.

ps([], _, _, {Ys, Ns}) ->
    lists:reverse(Ys, Ns);
ps([{_TPid, #diameter_caps{} = Caps} = TC | Rest], RH, Filter, Acc) ->
    ps(Rest, RH, Filter, pacc(caps_filter(Caps, RH, Filter),
                              caps_filter(Caps, RH, {all, [host, realm]}),
                              TC,
                              Acc)).

pacc(true, true, Peer, {Ts, Fs}) ->
    {[Peer|Ts], Fs};
pacc(true, false, Peer, {Ts, Fs}) ->
    {Ts, [Peer|Fs]};
pacc(_, _, _, Acc) ->
    Acc.

%% caps_filter/3

caps_filter(C, RH, {neg, F}) ->
    not caps_filter(C, RH, F);

caps_filter(C, RH, {all, L})
  when is_list(L) ->
    lists:all(fun(F) -> caps_filter(C, RH, F) end, L);

caps_filter(C, RH, {any, L})
  when is_list(L) ->
    lists:any(fun(F) -> caps_filter(C, RH, F) end, L);

caps_filter(#diameter_caps{origin_host = {_,OH}}, [_,DH], host) ->
    eq(undefined, DH, OH);

caps_filter(#diameter_caps{origin_realm = {_,OR}}, [DR,_], realm) ->
    eq(undefined, DR, OR);

caps_filter(C, _, Filter) ->
    caps_filter(C, Filter).

%% caps_filter/2

caps_filter(_, none) ->
    true;

caps_filter(#diameter_caps{origin_host = {_,OH}}, {host, H}) ->
    eq(any, H, OH);

caps_filter(#diameter_caps{origin_realm = {_,OR}}, {realm, R}) ->
    eq(any, R, OR);

%% Anything else is expected to be an eval filter. Filter failure is
%% documented as being equivalent to a non-matching filter.

caps_filter(C, T) ->
    try
        {eval, F} = T,
        diameter_lib:eval([F,C])
    catch
        _:_ -> false
    end.

eq(Any, Id, PeerId) ->
    Any == Id orelse try
                         iolist_to_binary(Id) == iolist_to_binary(PeerId)
                     catch
                         _:_ -> false
                     end.
%% OctetString() can be specified as an iolist() so test for string
%% rather then term equality.

%% transports/1

transports(#state{peerT = PeerT}) ->
    ets:select(PeerT, [{#peer{conn = '$1', _ = '_'},
                        [{'is_pid', '$1'}],
                        ['$1']}]).

%%% ---------------------------------------------------------------------------
%%% # service_info/2
%%% ---------------------------------------------------------------------------

%% The config passed to diameter:start_service/2.
-define(CAP_INFO, ['Origin-Host',
                   'Origin-Realm',
                   'Vendor-Id',
                   'Product-Name',
                   'Origin-State-Id',
                   'Host-IP-Address',
                   'Supported-Vendor-Id',
                   'Auth-Application-Id',
                   'Inband-Security-Id',
                   'Acct-Application-Id',
                   'Vendor-Specific-Application-Id',
                   'Firmware-Revision']).

-define(ALL_INFO, [capabilities,
                   applications,
                   transport,
                   pending,
                   statistics]).

service_info(Items, S)
  when is_list(Items) ->
    [{complete(I), service_info(I,S)} || I <- Items];
service_info(Item, S)
  when is_atom(Item) ->
    service_info(Item, S, true).

service_info(Item, #state{service = Svc} = S, Complete) ->
    case Item of
        name ->
            S#state.service_name;
        'Origin-Host' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.origin_host;
        'Origin-Realm' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.origin_realm;
        'Vendor-Id' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.vendor_id;
        'Product-Name' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.product_name;
        'Origin-State-Id' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.origin_state_id;
        'Host-IP-Address' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.host_ip_address;
        'Supported-Vendor-Id' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.supported_vendor_id;
        'Auth-Application-Id' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.auth_application_id;
        'Inband-Security-Id'  ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.inband_security_id;
        'Acct-Application-Id' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.acct_application_id;
        'Vendor-Specific-Application-Id' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.vendor_specific_application_id;
        'Firmware-Revision' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.firmware_revision;
        capabilities -> service_info(?CAP_INFO, S);
        applications -> info_apps(S);
        transport    -> info_transport(S);
        pending      -> info_pending(S);
        statistics   -> info_stats(S);
        keys         -> ?ALL_INFO ++ ?CAP_INFO;  %% mostly for test
        all          -> service_info(?ALL_INFO, S);
        _ when Complete   -> service_info(complete(Item), S, false);
        _            -> undefined
    end.

complete(Pre) ->
    P = atom_to_list(Pre),
    case [I || I <- [name | ?ALL_INFO] ++ ?CAP_INFO,
               lists:prefix(P, atom_to_list(I))]
    of
        [I] -> I;
        _   -> Pre
    end.

info_stats(#state{peerT = PeerT}) ->
    Peers = ets:select(PeerT, [{#peer{ref = '$1', conn = '$2', _ = '_'},
                                [{'is_pid', '$2'}],
                                [['$1', '$2']]}]),
    diameter_stats:read(lists:append(Peers)).
%% TODO: include peer identities in return value

info_transport(#state{peerT = PeerT, connT = ConnT}) ->
    dict:fold(fun it/3,
              [],
              ets:foldl(fun(T,A) -> it_acc(ConnT, A, T) end,
                        dict:new(),
                        PeerT)).

it(Ref, [[{type, connect} | _] = L], Acc) ->
    [[{ref, Ref} | L] | Acc];
it(Ref, [[{type, accept}, {options, Opts} | _] | _] = L, Acc) ->
    [[{ref, Ref},
      {type, listen},
      {options, Opts},
      {accept, [lists:nthtail(2,A) || A <- L]}]
     | Acc].
%% Each entry has the same Opts. (TODO)

it_acc(ConnT, Acc, #peer{pid = Pid,
                         type = Type,
                         ref = Ref,
                         options = Opts,
                         op_state = OS,
                         started = T,
                         conn = TPid}) ->
    dict:append(Ref,
                [{type, Type},
                 {options, Opts},
                 {watchdog, {Pid, T, OS}}
                 | info_conn(ConnT, TPid)],
                Acc).

info_conn(ConnT, TPid) ->
    info_conn(ets:lookup(ConnT, TPid)).

info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
    [{peer, {Pid, T}},
     {apps, SApps},
     {caps, info_caps(Caps)}];
info_conn([] = No) ->
    No.

info_caps(#diameter_caps{} = C) ->
    lists:zip(record_info(fields, diameter_caps), tl(tuple_to_list(C))).

info_apps(#state{service = #diameter_service{applications = Apps}}) ->
    lists:map(fun mk_app/1, Apps).

mk_app(#diameter_app{alias = Alias,
                     dictionary = Dict,
                     module = ModX,
                     id = Id}) ->
    [{alias, Alias},
     {dictionary, Dict},
     {module, ModX},
     {id, Id}].

info_pending(#state{} = S) ->
    MatchSpec = [{{'$1',
                   #request{transport = '$2',
                            from = '$3',
                            app = '$4',
                            _ = '_'},
                   '_'},
                  [?ORCOND([{'==', T, '$2'} || T <- transports(S)])],
                  [{{'$1', [{{app, '$4'}},
                            {{transport, '$2'}},
                            {{from, '$3'}}]}}]}],

    ets:select(?REQUEST_TABLE, MatchSpec).