diff options
Diffstat (limited to 'lib/diameter/src/base')
-rw-r--r-- | lib/diameter/src/base/diameter.appup.src | 18 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter.erl | 10 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_capx.erl | 145 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_codec.erl | 57 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_config.erl | 83 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_internal.hrl | 4 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer.erl | 11 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 311 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_reg.erl | 9 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 2661 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_stats.erl | 4 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_traffic.erl | 1705 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 293 |
13 files changed, 2761 insertions, 2550 deletions
diff --git a/lib/diameter/src/base/diameter.appup.src b/lib/diameter/src/base/diameter.appup.src index f6d772b534..2ce89579ff 100644 --- a/lib/diameter/src/base/diameter.appup.src +++ b/lib/diameter/src/base/diameter.appup.src @@ -20,14 +20,15 @@ {"%VSN%", [ - {"0.9", [{restart_application, diameter}]}, - {"0.10", [{restart_application, diameter}]}, - {"1.0", [{restart_application, diameter}]}, - {"1.1", [{restart_application, diameter}]}, - {"1.2", [{restart_application, diameter}]}, + {"0.9", [{restart_application, diameter}]}, %% R14B03 + {"0.10", [{restart_application, diameter}]}, %% R14B04 + {"1.0", [{restart_application, diameter}]}, %% R15B + {"1.1", [{restart_application, diameter}]}, %% R15B01 + {"1.2", [{restart_application, diameter}]}, %% R15B02 {"1.2.1", [{restart_application, diameter}]}, - {"1.3", [{restart_application, diameter}]}, - {"1.3.1", [{restart_application, diameter}]} + {"1.3", [{restart_application, diameter}]}, %% R15B03 + {"1.3.1", [{restart_application, diameter}]}, + {"1.4", [{restart_application, diameter}]} %% R16A ], [ {"0.9", [{restart_application, diameter}]}, @@ -37,6 +38,7 @@ {"1.2", [{restart_application, diameter}]}, {"1.2.1", [{restart_application, diameter}]}, {"1.3", [{restart_application, diameter}]}, - {"1.3.1", [{restart_application, diameter}]} + {"1.3.1", [{restart_application, diameter}]}, + {"1.4", [{restart_application, diameter}]} ] }. diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index 8f9901907a..c67fba5f89 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2012. All Rights Reserved. +%% Copyright Ericsson AB 2010-2013. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -213,7 +213,7 @@ origin_state_id() -> -> any(). call(SvcName, App, Message, Options) -> - diameter_service:call(SvcName, {alias, App}, Message, Options). + diameter_traffic:send_request(SvcName, {alias, App}, Message, Options). call(SvcName, App, Message) -> call(SvcName, App, Message, []). @@ -306,7 +306,8 @@ call(SvcName, App, Message) -> | {module, app_module()} | {state, any()} | {call_mutates_state, boolean()} - | {answer_errors, callback|report|discard}. + | {answer_errors, callback|report|discard} + | {request_errors, answer_3xxx|answer|callback}. -type app_alias() :: any(). @@ -332,8 +333,9 @@ call(SvcName, App, Message) -> | {capabilities_cb, evaluable()} | {capx_timeout, 'Unsigned32'()} | {disconnect_cb, evaluable()} - | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}} + | {length_errors, exit | handle | discard} | {reconnect_timer, 'Unsigned32'()} + | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}} | {private, any()}. %% Predicate passed to remove_transport/2 diff --git a/lib/diameter/src/base/diameter_capx.erl b/lib/diameter/src/base/diameter_capx.erl index c6c3d2934d..715b15628c 100644 --- a/lib/diameter/src/base/diameter_capx.erl +++ b/lib/diameter/src/base/diameter_capx.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2012. All Rights Reserved. +%% Copyright Ericsson AB 2010-2013. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -47,14 +47,13 @@ -module(diameter_capx). --export([build_CER/1, - recv_CER/2, - recv_CEA/2, +-export([build_CER/2, + recv_CER/3, + recv_CEA/3, make_caps/2]). -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). --include("diameter_gen_base_rfc3588.hrl"). -define(SUCCESS, 2001). %% DIAMETER_SUCCESS -define(NOAPP, 5010). %% DIAMETER_NO_COMMON_APPLICATION @@ -67,27 +66,31 @@ -type tried(T) :: {ok, T} | {error, {term(), list()}}. --spec build_CER(#diameter_caps{}) - -> tried(#diameter_base_CER{}). +-spec build_CER(#diameter_caps{}, module()) + -> tried(CER) + when CER :: tuple(). -build_CER(Caps) -> - try_it([fun bCER/1, Caps]). +build_CER(Caps, Dict) -> + try_it([fun bCER/2, Caps, Dict]). --spec recv_CER(#diameter_base_CER{}, #diameter_service{}) +-spec recv_CER(CER, #diameter_service{}, module()) -> tried({[diameter:'Unsigned32'()], #diameter_caps{}, - #diameter_base_CEA{}}). + CEA}) + when CER :: tuple(), + CEA :: tuple(). -recv_CER(CER, Svc) -> - try_it([fun rCER/2, CER, Svc]). +recv_CER(CER, Svc, Dict) -> + try_it([fun rCER/3, CER, Svc, Dict]). --spec recv_CEA(#diameter_base_CEA{}, #diameter_service{}) +-spec recv_CEA(CEA, #diameter_service{}, module()) -> tried({[diameter:'Unsigned32'()], [diameter:'Unsigned32'()], - #diameter_caps{}}). + #diameter_caps{}}) + when CEA :: tuple(). -recv_CEA(CEA, Svc) -> - try_it([fun rCEA/2, CEA, Svc]). +recv_CEA(CEA, Svc, Dict) -> + try_it([fun rCEA/3, CEA, Svc, Dict]). make_caps(Caps, Opts) -> try_it([fun mk_caps/2, Caps, Opts]). @@ -161,16 +164,17 @@ ipaddr(A) -> ?THROW(T) end. -%% bCER/1 +%% bCER/2 %% %% Build a CER record to send to a remote peer. %% Use the fact that diameter_caps has the same field names as CER. -bCER(#diameter_caps{} = Rec) -> - #diameter_base_CER{} - = list_to_tuple([diameter_base_CER | tl(tuple_to_list(Rec))]). +bCER(#diameter_caps{} = Rec, Dict) -> + Values = lists:zip(Dict:'#info-'(diameter_base_CER, fields), + tl(tuple_to_list(Rec))), + Dict:'#new-'(diameter_base_CER, Values). -%% rCER/2 +%% rCER/3 %% %% Build a CEA record to send to a remote peer in response to an %% incoming CER. RFC 3588 gives no guidance on what should be sent @@ -214,12 +218,9 @@ bCER(#diameter_caps{} = Rec) -> %% TLS 1 %% This node supports TLS security, as defined by [TLS]. -rCER(CER, #diameter_service{capabilities = LCaps} = Svc) -> - #diameter_base_CEA{} - = CEA - = cea_from_cer(bCER(LCaps)), - - RCaps = capx_to_caps(CER), +rCER(CER, #diameter_service{capabilities = LCaps} = Svc, Dict) -> + CEA = cea_from_cer(bCER(LCaps, Dict), Dict), + RCaps = capx_to_caps(CER, Dict), SApps = common_applications(LCaps, RCaps, Svc), {SApps, @@ -227,17 +228,18 @@ rCER(CER, #diameter_service{capabilities = LCaps} = Svc) -> build_CEA(SApps, LCaps, RCaps, - CEA#diameter_base_CEA{'Result-Code' = ?SUCCESS})}. + Dict, + Dict:'#set-'({'Result-Code', ?SUCCESS}, CEA))}. -build_CEA([], _, _, CEA) -> - CEA#diameter_base_CEA{'Result-Code' = ?NOAPP}; +build_CEA([], _, _, Dict, CEA) -> + Dict:'#set-'({'Result-Code', ?NOAPP}, CEA); -build_CEA(_, LCaps, RCaps, CEA) -> +build_CEA(_, LCaps, RCaps, Dict, CEA) -> case common_security(LCaps, RCaps) of [] -> - CEA#diameter_base_CEA{'Result-Code' = ?NOSECURITY}; + Dict:'#set-'({'Result-Code', ?NOSECURITY}, CEA); [_] = IS -> - CEA#diameter_base_CEA{'Inband-Security-Id' = IS} + Dict:'#set-'({'Inband-Security-Id', IS}, CEA) end. %% common_security/2 @@ -275,46 +277,41 @@ cs(LS, RS) -> %% practice something there may be a need for more synchronization %% than notification by way of an event subscription offers. -%% cea_from_cer/1 +%% cea_from_cer/2 %% CER is a subset of CEA, the latter adding Result-Code and a few %% more AVP's. -cea_from_cer(#diameter_base_CER{} = CER) -> - lists:foldl(fun(F,A) -> to_cea(CER, F, A) end, - #diameter_base_CEA{}, - record_info(fields, diameter_base_CER)). - -to_cea(CER, Field, CEA) -> - try ?BASE:'#get-'(Field, CER) of - V -> ?BASE:'#set-'({Field, V}, CEA) - catch - error: _ -> CEA - end. - -%% rCEA/2 +cea_from_cer(CER, Dict) -> + [diameter_base_CER | Values] = Dict:'#get-'(CER), + Dict:'#set-'(Values, Dict:'#new-'(diameter_base_CEA)). -rCEA(CEA, #diameter_service{capabilities = LCaps} = Svc) -> - RCaps = capx_to_caps(CEA), +%% rCEA/3 + +rCEA(CEA, #diameter_service{capabilities = LCaps} = Svc, Dict) -> + RCaps = capx_to_caps(CEA, Dict), SApps = common_applications(LCaps, RCaps, Svc), IS = common_security(LCaps, RCaps), {SApps, IS, RCaps}. -%% capx_to_caps/1 - -capx_to_caps(#diameter_base_CEA{'Origin-Host' = OH, - 'Origin-Realm' = OR, - 'Host-IP-Address' = IP, - 'Vendor-Id' = VId, - 'Product-Name' = PN, - 'Origin-State-Id' = OSI, - 'Supported-Vendor-Id' = SV, - 'Auth-Application-Id' = Auth, - 'Inband-Security-Id' = IS, - 'Acct-Application-Id' = Acct, - 'Vendor-Specific-Application-Id' = VSA, - 'Firmware-Revision' = FR, - 'AVP' = X}) -> +%% capx_to_caps/2 + +capx_to_caps(CEX, Dict) -> + [OH, OR, IP, VId, PN, OSI, SV, Auth, IS, Acct, VSA, FR, X] + = Dict:'#get-'(['Origin-Host', + 'Origin-Realm', + 'Host-IP-Address', + 'Vendor-Id', + 'Product-Name', + 'Origin-State-Id', + 'Supported-Vendor-Id', + 'Auth-Application-Id', + 'Inband-Security-Id', + 'Acct-Application-Id', + 'Vendor-Specific-Application-Id', + 'Firmware-Revision', + 'AVP'], + CEX), #diameter_caps{origin_host = OH, origin_realm = OR, vendor_id = VId, @@ -327,10 +324,7 @@ capx_to_caps(#diameter_base_CEA{'Origin-Host' = OH, acct_application_id = Acct, vendor_specific_application_id = VSA, firmware_revision = FR, - avp = X}; - -capx_to_caps(#diameter_base_CER{} = CER) -> - capx_to_caps(cea_from_cer(CER)). + avp = X}. %% --------------------------------------------------------------------------- %% --------------------------------------------------------------------------- @@ -365,13 +359,12 @@ app_union(#diameter_caps{auth_application_id = U, vendor_specific_application_id = V}) -> set_list(U ++ C ++ lists:flatmap(fun vsa_apps/1, V)). -vsa_apps(#'diameter_base_Vendor-Specific-Application-Id' - {'Auth-Application-Id' = U, - 'Acct-Application-Id' = C}) -> - U ++ C; -vsa_apps(L) -> - Rec = ?BASE:'#new-'('diameter_base_Vendor-Specific-Application-Id', L), - vsa_apps(Rec). +vsa_apps([_ | [_,_] = Ids]) -> + lists:append(Ids); +vsa_apps(Rec) + when is_tuple(Rec) -> + [_|T] = tuple_to_list(Rec), + vsa_apps(T). %% It's a configuration error for a locally advertised application not %% to be represented in Apps. Don't just match on lists:keyfind/3 in diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl index 0b0bfe3f0a..e446a0209c 100644 --- a/lib/diameter/src/base/diameter_codec.erl +++ b/lib/diameter/src/base/diameter_codec.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2012. All Rights Reserved. +%% Copyright Ericsson AB 2010-2013. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -26,7 +26,7 @@ decode_header/1, sequence_numbers/1, hop_by_hop_id/2, - msg_name/1, + msg_name/2, msg_id/1]). %% Towards generated encoders (from diameter_gen.hrl). @@ -99,13 +99,13 @@ e(_, #diameter_packet{msg = [#diameter_header{} = Hdr | As]} = Pkt) -> Eid:32, Avps/binary>>}; -e(Mod0, #diameter_packet{header = Hdr, msg = Msg} = Pkt) -> +e(Mod, #diameter_packet{header = Hdr, msg = Msg} = Pkt) -> #diameter_header{version = Vsn, hop_by_hop_id = Hid, end_to_end_id = Eid} = Hdr, - {Mod, MsgName} = rec2msg(Mod0, Msg), + MsgName = rec2msg(Mod, Msg), {Code, Flags0, Aid} = msg_header(Mod, MsgName, Hdr), Flags = make_flags(Flags0, Hdr), @@ -192,11 +192,11 @@ encode_avps(Avps) -> %% msg_header/3 msg_header(Mod, 'answer-message' = MsgName, Header) -> - ?BASE = Mod, + 0 = Mod:id(), %% assert #diameter_header{application_id = Aid, cmd_code = Code} = Header, - {-1, Flags, ?DIAMETER_APP_ID_COMMON} = ?BASE:msg_header(MsgName), + {-1, Flags, ?DIAMETER_APP_ID_COMMON} = Mod:msg_header(MsgName), {Code, Flags, Aid}; msg_header(Mod, MsgName, _) -> @@ -204,22 +204,12 @@ msg_header(Mod, MsgName, _) -> %% rec2msg/2 -rec2msg(_, ['answer-message' = M | _]) -> - {?BASE, M}; - -rec2msg(Mod, [MsgName|_]) - when is_atom(MsgName) -> - {Mod, MsgName}; +rec2msg(_, [Name|_]) + when is_atom(Name) -> + Name; rec2msg(Mod, Rec) -> - R = element(1, Rec), - A = 'answer-message', - case ?BASE:msg2rec(A) of - R -> - {?BASE, A}; - _ -> - {Mod, Mod:rec2msg(R)} - end. + Mod:rec2msg(element(1, Rec)). %%% --------------------------------------------------------------------------- %%% # decode/2 @@ -243,20 +233,19 @@ decode(?APP_ID_RELAY, _, #diameter_packet{} = Pkt) -> end; %% Otherwise decode using the dictionary. -decode(_, Mod, #diameter_packet{header = Hdr} = Pkt) - when is_atom(Mod) -> +decode(_, Mod, #diameter_packet{header = Hdr} = Pkt) -> #diameter_header{cmd_code = CmdCode, is_request = IsRequest, is_error = IsError} = Hdr, - {M, MsgName} = if IsError andalso not IsRequest -> - {?BASE, 'answer-message'}; - true -> - {Mod, Mod:msg_name(CmdCode, IsRequest)} - end, + MsgName = if IsError andalso not IsRequest -> + 'answer-message'; + true -> + Mod:msg_name(CmdCode, IsRequest) + end, - decode_avps(MsgName, M, Pkt, collect_avps(Pkt)); + decode_avps(MsgName, Mod, Pkt, collect_avps(Pkt)); decode(Id, Mod, Bin) when is_bitstring(Bin) -> @@ -360,15 +349,15 @@ hop_by_hop_id(Id, <<H:12/binary, _:32, T/binary>>) -> <<H/binary, Id:32, T/binary>>. %%% --------------------------------------------------------------------------- -%%% # msg_name/1 +%%% # msg_name/2 %%% --------------------------------------------------------------------------- -msg_name(#diameter_header{application_id = ?APP_ID_COMMON, - cmd_code = C, - is_request = R}) -> - ?BASE:msg_name(C,R); +msg_name(Dict0, #diameter_header{application_id = ?APP_ID_COMMON, + cmd_code = C, + is_request = R}) -> + Dict0:msg_name(C,R); -msg_name(Hdr) -> +msg_name(_, Hdr) -> msg_id(Hdr). %% Note that messages in different applications could have the same diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index 63d28f25a2..9f73815756 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2012. All Rights Reserved. +%% Copyright Ericsson AB 2010-2013. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -113,15 +113,22 @@ -define(VALUES(Rec), tl(tuple_to_list(Rec))). +%% The RFC 3588 common dictionary is used to validate capabilities +%% configuration. That a given transport may use the RFC 6733 +%% dictionary is of no consequence. +-define(BASE, diameter_gen_base_rfc3588). + %%% The return values below assume the server diameter_config is started. %%% The functions will exit if it isn't. %% -------------------------------------------------------------------------- -%% # start_service(SvcName, Opts) -%% -%% Output: ok | {error, Reason} +%% # start_service/2 %% -------------------------------------------------------------------------- +-spec start_service(diameter:service_name(), [diameter:service_opt()]) + -> ok + | {error, term()}. + start_service(SvcName, Opts) when is_list(Opts) -> start_rc(sync(SvcName, {start_service, SvcName, Opts})). @@ -134,22 +141,23 @@ start_rc(timeout) -> {error, application_not_started}. %% -------------------------------------------------------------------------- -%% # stop_service(SvcName) -%% -%% Output: ok +%% # stop_service/1 %% -------------------------------------------------------------------------- +-spec stop_service(diameter:service_name()) + -> ok. + stop_service(SvcName) -> sync(SvcName, {stop_service, SvcName}). %% -------------------------------------------------------------------------- -%% # add_transport(SvcName, {Type, Opts}) -%% -%% Input: Type = connect | listen -%% -%% Output: {ok, Ref} | {error, Reason} +%% # add_transport/2 %% -------------------------------------------------------------------------- +-spec add_transport(diameter:service_name(), {connect|listen, [diameter:transport_opt()]}) + -> {ok, diameter:transport_ref()} + | {error, term()}. + add_transport(SvcName, {T, Opts}) when is_list(Opts), (T == connect orelse T == listen) -> sync(SvcName, {add, SvcName, T, Opts}). @@ -171,6 +179,10 @@ add_transport(SvcName, {T, Opts}) %% Output: ok | {error, Reason} %% -------------------------------------------------------------------------- +-spec remove_transport(diameter:service_name(), diameter:transport_pred()) + -> ok + | {error, term()}. + remove_transport(SvcName, Pred) -> try sync(SvcName, {remove, SvcName, pred(Pred)}) @@ -473,6 +485,10 @@ stop(SvcName) -> %% add/3 +%% Can't check for a single common dictionary since a transport may +%% restrict applications so that that there's one while the service +%% has many. + add(SvcName, Type, Opts) -> %% Ensure usable capabilities. diameter_service:merge_service/2 %% depends on this. @@ -545,7 +561,7 @@ make_config(SvcName, Opts) -> [] == Apps andalso ?THROW(no_apps), %% Use the fact that diameter_caps has the same field names as CER. - Fields = diameter_gen_base_rfc3588:'#info-'(diameter_base_CER) -- ['AVP'], + Fields = ?BASE:'#info-'(diameter_base_CER) -- ['AVP'], COpts = [T || {K,_} = T <- Opts, lists:member(K, Fields)], Caps = make_caps(#diameter_caps{}, COpts), @@ -608,14 +624,14 @@ opt(sequence = K, F) -> E:R -> ?THROW({value, {K, E, R, ?STACK}}) end; - + opt(K, _) -> ?THROW({value, K}). sequence({H,N} = T) when 0 =< N, N =< 32, 0 =< H, 0 == H bsr N -> T; - + sequence(_) -> ?THROW({value, sequence}). @@ -629,7 +645,8 @@ make_caps(Caps, Opts) -> %% Validate types by encoding a CER. encode_CER(Opts) -> - {ok, CER} = diameter_capx:build_CER(make_caps(?EXAMPLE_CAPS, Opts)), + {ok, CER} = diameter_capx:build_CER(make_caps(?EXAMPLE_CAPS, Opts), + ?BASE), Hdr = #diameter_header{version = ?DIAMETER_VERSION, end_to_end_id = 0, @@ -653,15 +670,17 @@ app_acc({application, Opts}, Acc) -> [Dict, Mod] = get_opt([dictionary, module], Opts), Alias = get_opt(alias, Opts, Dict), ModS = get_opt(state, Opts, Alias), - M = get_opt(call_mutates_state, Opts, false), - A = get_opt(answer_errors, Opts, report), + M = get_opt(call_mutates_state, Opts, false, [true]), + A = get_opt(answer_errors, Opts, report, [callback, discard]), + P = get_opt(request_errors, Opts, answer_3xxx, [answer, callback]), [#diameter_app{alias = Alias, dictionary = Dict, id = cb(Dict, id), module = init_mod(Mod), init_state = ModS, - mutable = init_mutable(M), - options = [{answer_errors, init_answers(A)}]} + mutable = M, + options = [{answer_errors, A}, + {request_errors, P}]} | Acc]; app_acc(_, Acc) -> Acc. @@ -690,20 +709,16 @@ init_cb(List) -> V <- [proplists:get_value(F, List, D)]], #diameter_callback{} = list_to_tuple([diameter_callback | Values]). -init_mutable(M) - when M == true; - M == false -> - M; -init_mutable(M) -> - ?THROW({call_mutates_state, M}). - -init_answers(A) - when callback == A; - report == A; - discard == A -> - A; -init_answers(A) -> - ?THROW({answer_errors, A}). +%% Retreive and validate. +get_opt(Key, List, Def, Other) -> + init_opt(Key, get_opt(Key, List, Def), [Def|Other]). + +init_opt(_, V, [V|_]) -> + V; +init_opt(Name, V, [_|Vals]) -> + init_opt(Name, V, Vals); +init_opt(Name, V, []) -> + ?THROW({Name, V}). %% Get a single value at the specified key. get_opt(Keys, List) diff --git a/lib/diameter/src/base/diameter_internal.hrl b/lib/diameter/src/base/diameter_internal.hrl index 63b35550a8..4b672aa071 100644 --- a/lib/diameter/src/base/diameter_internal.hrl +++ b/lib/diameter/src/base/diameter_internal.hrl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2013. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -58,8 +58,6 @@ -define(APP_ID_COMMON, 0). -define(APP_ID_RELAY, 16#FFFFFFFF). --define(BASE, diameter_gen_base_rfc3588). - %%% --------------------------------------------------------- %%% RFC 3588, ch 2.6 Peer table diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index 25c9eab4cb..130bedda84 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -123,7 +123,7 @@ pair([_ | Rest], Mods, Acc) -> pair(Rest, Mods, Acc); %% No transport_module or transport_config: defaults. -pair([], [], []) -> +pair([], [], []) -> [{[?DEFAULT_TMOD], ?DEFAULT_TCFG, ?DEFAULT_TTMO}]; %% One transport_module, one transport_config. @@ -272,7 +272,7 @@ handle_cast(Msg, State) -> %% Remote service is distributing a message. handle_info({notify, SvcName, T}, S) -> - bang(diameter_service:whois(SvcName), T), + diameter_service:notify(SvcName, T), {noreply, S}; handle_info(Info, State) -> @@ -304,13 +304,6 @@ code_change(_OldVsn, State, _Extra) -> ifc_send(Pid, T) -> Pid ! {diameter, T}. -%% bang/2 - -bang(undefined = No, _) -> - No; -bang(Pid, T) -> - Pid ! T. - %% call/1 call(Request) -> diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 5dab6214b1..66342f7b62 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -18,10 +18,10 @@ %% %% -%% This module implements (as a process) the RFC 3588 Peer State +%% This module implements (as a process) the RFC 3588/6733 Peer State %% Machine modulo the necessity of adapting the peer election to the -%% fact that we don't know the identity of a peer until we've -%% received a CER/CEA from it. +%% fact that we don't know the identity of a peer until we've received +%% a CER/CEA from it. %% -module(diameter_peer_fsm). @@ -46,16 +46,19 @@ -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). --include("diameter_gen_base_rfc3588.hrl"). %% Values of Disconnect-Cause in DPR. --define(GOAWAY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_DO_NOT_WANT_TO_TALK_TO_YOU'). --define(REBOOT, ?'DIAMETER_BASE_DISCONNECT-CAUSE_REBOOTING'). --define(BUSY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_BUSY'). +-define(GOAWAY, 2). %% DO_NOT_WANT_TO_TALK_TO_YOU +-define(BUSY, 1). %% BUSY +-define(REBOOT, 0). %% REBOOTING +%% Values of Inband-Security-Id. -define(NO_INBAND_SECURITY, 0). -define(TLS, 1). +%% Note that the a common dictionary hrl is purposely not included +%% since the common dictionary is an argument to start/3. + %% Keys in process dictionary. -define(CB_KEY, cb). %% capabilities callback -define(DPR_KEY, dpr). %% disconnect callback @@ -100,11 +103,13 @@ | {'Wait-CEA', uint32(), uint32()} | 'Open', mode :: accept | connect | {connect, reference()}, - parent :: pid(), %% watchdog process - transport :: pid(), %% transport process + parent :: pid(), %% watchdog process + transport :: pid(), %% transport process + dictionary :: module(), %% common dictionary service :: #diameter_service{}, - dpr = false :: false | {uint32(), uint32()}}). + dpr = false :: false | {uint32(), uint32()}, %% | hop by hop and end to end identifiers + length_errors :: exit | handle | discard}). %% There are non-3588 states possible as a consequence of 5.6.1 of the %% standard and the corresponding problem for incoming CEA's: we don't @@ -126,24 +131,18 @@ %% State Machine rather than closer to the transport. This is what we %% now do below: connect/accept call diameter_watchdog and return the %% pid of the watchdog process, and the watchdog in turn calls start/3 -%% below to start the process implementing the Peer State Machine. The -%% former is a "peer" in diameter_service while the latter is a -%% "conn". In a sense, diameter_service sees the watchdog as -%% implementing the Peer State Machine and the process implemented -%% here as being the transport, not being aware of the watchdog at -%% all. +%% below to start the process implementing the Peer State Machine. %% -%%% --------------------------------------------------------------------------- -%%% # start({connect|accept, Ref}, Opts, Service) -%%% -%%% Output: Pid -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # start/3 +%% --------------------------------------------------------------------------- -spec start(T, [Opt], {diameter:sequence(), - diameter:restriction(), + [node()], + module(), #diameter_service{}}) - -> pid() + -> {reference(), pid()} when T :: {connect|accept, diameter:transport_ref()}, Opt :: diameter:transport_opt(). @@ -152,9 +151,15 @@ %% specified on the transport in question. Check here that the list is %% still non-empty. -start({_,_} = Type, Opts, MS) -> - {ok, Pid} = diameter_peer_fsm_sup:start_child({self(), Type, Opts, MS}), - Pid. +start({_,_} = Type, Opts, S) -> + Ack = make_ref(), + T = {Ack, self(), Type, Opts, S}, + {ok, Pid} = diameter_peer_fsm_sup:start_child(T), + try + {erlang:monitor(process, Pid), Pid} + after + Pid ! Ack + end. start_link(T) -> {ok, _} = proc_lib:start_link(?MODULE, @@ -163,8 +168,8 @@ start_link(T) -> infinity, diameter_lib:spawn_opts(server, [])). -%%% --------------------------------------------------------------------------- -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- %% init/1 @@ -172,12 +177,14 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, - capabilities = LCaps} - = Svc}}) -> - [] /= Apps orelse ?ERROR({no_apps, T, Opts}), +i({Ack, WPid, {M, Ref} = T, Opts, {Mask, + Nodes, + Dict0, + #diameter_service{capabilities = LCaps} + = Svc}}) -> + erlang:monitor(process, WPid), + wait(Ack, WPid), putr(?DWA_KEY, dwa(LCaps)), - {M, Ref} = T, diameter_stats:reg(Ref), {[Cs,Ds], Rest} = proplists:split(Opts, [capabilities_cb, disconnect_cb]), putr(?CB_KEY, {Ref, [F || {_,F} <- Cs]}), @@ -185,23 +192,39 @@ i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, putr(?REF_KEY, Ref), putr(?SEQUENCE_KEY, Mask), putr(?RESTRICT_KEY, Nodes), - erlang:monitor(process, WPid), - {TPid, Addrs} = start_transport(T, Rest, Svc), + Tmo = proplists:get_value(capx_timeout, Opts, ?EVENT_TIMEOUT), ?IS_TIMEOUT(Tmo) orelse ?ERROR({invalid, {capx_timeout, Tmo}}), + OnLengthErr = proplists:get_value(length_errors, Opts, exit), + lists:member(OnLengthErr, [exit, handle, discard]) + orelse ?ERROR({invalid, {length_errors, OnLengthErr}}), + + {TPid, Addrs} = start_transport(T, Rest, Svc), + #state{state = {'Wait-Conn-Ack', Tmo}, parent = WPid, transport = TPid, + dictionary = Dict0, mode = M, - service = svc(Svc, Addrs)}. + service = svc(Svc, Addrs), + length_errors = OnLengthErr}. %% The transport returns its local ip addresses so that different %% transports on the same service can use different local addresses. %% The local addresses are put into Host-IP-Address avps here when %% sending capabilities exchange messages. %% %% Invalid transport config may cause us to crash but note that the -%% watchdog start (start/2) succeeds regardless so as not to crash the -%% service. +%% watchdog start (start/2) succeeds regardless. + +%% Wait for the caller to have a monitor to avoid a race with our +%% death. (Since the exit reason is used in diameter_service.) +wait(Ref, Pid) -> + receive + Ref -> + ok; + {'DOWN', _, process, Pid, _} = D -> + exit({shutdown, D}) + end. start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) -> Addrs0 = LCaps#diameter_caps.host_ip_address, @@ -274,13 +297,12 @@ handle_info(T, #state{} = State) -> {noreply, S}; {stop, Reason} -> ?LOG(stop, Reason), - x(Reason, State); + {stop, {shutdown, Reason}, State}; stop -> ?LOG(stop, T), - x(T, State) + {stop, {shutdown, T}, State} catch exit: {diameter_codec, encode, _} = Reason -> - close_wd(Reason, State#state.parent), ?LOG(stop, Reason), %% diameter_codec:encode/2 emits an error report. Only %% indicate the probable reason here. @@ -300,10 +322,6 @@ handle_info(T, #state{} = State) -> %% succesfully encoded. It's not checked at diameter:add_transport/2 %% since this can be called before creating the service. -x(Reason, #state{} = S) -> - close_wd(Reason, S), - {stop, {shutdown, Reason}, S}. - %% terminate/2 terminate(_, _) -> @@ -314,8 +332,8 @@ terminate(_, _) -> code_change(_, State, _) -> {ok, State}. -%%% --------------------------------------------------------------------------- -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- putr(Key, Val) -> put({?MODULE, Key}, Val). @@ -378,9 +396,8 @@ transition({diameter, {recv, Pkt}}, S) -> recv(Pkt, S); %% Timeout when still in the same state ... -transition({timeout = T, PS}, #state{state = PS} = S) -> - close({capx(PS), T}, S), - stop; +transition({timeout = T, PS}, #state{state = PS}) -> + {stop, {capx(PS), T}}; %% ... or not. transition({timeout, _}, _) -> @@ -393,8 +410,6 @@ transition({send, Msg}, #state{transport = TPid}) -> %% Request for graceful shutdown at remove_transport, stop_service of %% application shutdown. -transition({shutdown = T, Pid}, S) -> - transition({T, Pid, transport}, S); transition({shutdown, Pid, Reason}, #state{parent = Pid, dpr = false} = S) -> dpr(Reason, S); transition({shutdown, Pid, _}, #state{parent = Pid}) -> @@ -454,18 +469,19 @@ start_next(#state{service = Svc0} = S) -> send_CER(#state{state = {'Wait-Conn-Ack', Tmo}, mode = {connect, Remote}, service = #diameter_service{capabilities = LCaps}, - transport = TPid} + transport = TPid, + dictionary = Dict} = S) -> OH = LCaps#diameter_caps.origin_host, req_send_CER(OH, Remote) orelse - close({already_connected, Remote, LCaps}, S), + close({already_connected, Remote, LCaps}), CER = build_CER(S), ?LOG(send, 'CER'), #diameter_packet{header = #diameter_header{end_to_end_id = Eid, hop_by_hop_id = Hid}} = Pkt - = encode(CER), + = encode(CER, Dict), send(TPid, Pkt), start_timer(Tmo, S#state{state = {'Wait-CEA', Hid, Eid}}). @@ -487,42 +503,29 @@ start_timer(Tmo, #state{state = PS} = S) -> %% build_CER/1 -build_CER(#state{service = #diameter_service{capabilities = LCaps}}) -> - {ok, CER} = diameter_capx:build_CER(LCaps), +build_CER(#state{service = #diameter_service{capabilities = LCaps}, + dictionary = Dict}) -> + {ok, CER} = diameter_capx:build_CER(LCaps, Dict), CER. -%% encode/1 +%% encode/2 -encode(Rec) -> +encode(Rec, Dict) -> Seq = diameter_session:sequence({_,_} = getr(?SEQUENCE_KEY)), Hdr = #diameter_header{version = ?DIAMETER_VERSION, end_to_end_id = Seq, hop_by_hop_id = Seq}, - diameter_codec:encode(?BASE, #diameter_packet{header = Hdr, - msg = Rec}). + diameter_codec:encode(Dict, #diameter_packet{header = Hdr, + msg = Rec}). %% recv/2 -%% RFC 3588 has result code 5015 for an invalid length but if a -%% transport is detecting message boundaries using the length header -%% then a length error will likely lead to further errors. - -recv(#diameter_packet{header = #diameter_header{length = Len} - = Hdr, - bin = Bin}, - S) - when Len < 20; - (0 /= Len rem 4 orelse bit_size(Bin) /= 8*Len) -> - discard(invalid_message_length, recv, [size(Bin), - bit_size(Bin) rem 8, - Hdr, - S]); - recv(#diameter_packet{header = #diameter_header{} = Hdr} = Pkt, - #state{parent = Pid} + #state{parent = Pid, + dictionary = Dict0} = S) -> - Name = diameter_codec:msg_name(Hdr), + Name = diameter_codec:msg_name(Dict0, Hdr), Pid ! {recv, self(), Name, Pkt}, diameter_stats:incr({msg_id(Name, Hdr), recv}), %% count received rcv(Name, Pkt, S); @@ -531,29 +534,52 @@ recv(#diameter_packet{header = undefined, bin = Bin} = Pkt, S) -> - recv(Pkt#diameter_packet{header = diameter_codec:decode_header(Bin)}, S); + recv(diameter_codec:decode_header(Bin), Pkt, S); -recv(Bin, S) - when is_binary(Bin) -> - recv(#diameter_packet{bin = Bin}, S); +recv(Bin, S) -> + recv(#diameter_packet{bin = Bin}, S). -recv(#diameter_packet{header = false} = Pkt, S) -> - discard(truncated_header, recv, [Pkt, S]). +%% recv/3 -msg_id({_,_,_} = T, _) -> - T; -msg_id(_, Hdr) -> - diameter_codec:msg_id(Hdr). +recv(#diameter_header{length = Len} + = H, + #diameter_packet{bin = Bin} + = Pkt, + #state{length_errors = E} + = S) + when E == handle; + 0 == Len rem 4, bit_size(Bin) == 8*Len -> + recv(Pkt#diameter_packet{header = H}, S); + +recv(#diameter_header{} + = H, + #diameter_packet{bin = Bin}, + #state{length_errors = E} + = S) -> + invalid(E, + invalid_message_length, + recv, + [size(Bin), bit_size(Bin) rem 8, H, S]); -%% Treat invalid length as a transport error and die. Especially in -%% the TCP case, in which there's no telling where the next message -%% begins in the incoming byte stream, keeping a crippled connection -%% alive may just make things worse. +recv(false, Pkt, #state{length_errors = E} = S) -> + invalid(E, truncated_header, recv, [Pkt, S]). -discard(Reason, F, A) -> +%% Note that counters here only count discarded messages. +invalid(E, Reason, F, A) -> diameter_stats:incr(Reason), + abort(E, Reason, F, A). + +abort(exit, Reason, F, A) -> diameter_lib:warning_report(Reason, {?MODULE, F, A}), - throw({?MODULE, abort, Reason}). + throw({?MODULE, abort, Reason}); + +abort(_, _, _, _) -> + ok. + +msg_id({_,_,_} = T, _) -> + T; +msg_id(_, Hdr) -> + {_,_,_} = diameter_codec:msg_id(Hdr). %% rcv/3 @@ -607,13 +633,13 @@ send(Pid, Msg) -> %% handle_request/3 -handle_request(Type, #diameter_packet{} = Pkt, S) -> +handle_request(Type, #diameter_packet{} = Pkt, #state{dictionary = D} = S) -> ?LOG(recv, Type), - send_answer(Type, diameter_codec:decode(?BASE, Pkt), S). + send_answer(Type, diameter_codec:decode(D, Pkt), S). %% send_answer/3 -send_answer(Type, ReqPkt, #state{transport = TPid} = S) -> +send_answer(Type, ReqPkt, #state{transport = TPid, dictionary = Dict} = S) -> #diameter_packet{header = H, transport_data = TD} = ReqPkt, @@ -630,13 +656,15 @@ send_answer(Type, ReqPkt, #state{transport = TPid} = S) -> msg = Msg, transport_data = TD}, - send(TPid, diameter_codec:encode(?BASE, Pkt)), + send(TPid, diameter_codec:encode(Dict, Pkt)), eval(PostF, S). eval([F|A], S) -> apply(F, A ++ [S]); eval(ok, S) -> - S. + S; +eval(T, _) -> + close(T). %% build_answer/3 @@ -647,11 +675,11 @@ build_answer('CER', is_error = false}, errors = []} = Pkt, - S) -> - {SupportedApps, RCaps, #diameter_base_CEA{'Result-Code' = RC, - 'Inband-Security-Id' = IS} - = CEA} - = recv_CER(CER, S), + #state{dictionary = Dict0} + = S) -> + {SupportedApps, RCaps, CEA} = recv_CER(CER, S), + + [RC, IS] = Dict0:'#get-'(['Result-Code', 'Inband-Security-Id'], CEA), #diameter_caps{origin_host = {OH, DH}} = Caps @@ -664,10 +692,10 @@ build_answer('CER', orelse ?THROW(4003), %% DIAMETER_ELECTION_LOST caps_cb(Caps) of - N -> {cea(CEA, N), [fun open/5, Pkt, - SupportedApps, - Caps, - {accept, hd([_] = IS)}]} + N -> {cea(CEA, N, Dict0), [fun open/5, Pkt, + SupportedApps, + Caps, + {accept, hd([_] = IS)}]} catch ?FAILURE(Reason) -> rejected(Reason, {'CER', Reason, Caps, Pkt}, S) @@ -684,25 +712,25 @@ build_answer(Type, RC = rc(H, Es), {answer(Type, RC, Es, S), post(Type, RC, Pkt, S)}. -cea(CEA, ok) -> +cea(CEA, ok, _) -> CEA; -cea(CEA, 2001) -> +cea(CEA, 2001, _) -> CEA; -cea(CEA, RC) -> - CEA#diameter_base_CEA{'Result-Code' = RC}. +cea(CEA, RC, Dict0) -> + Dict0:'#set-'({'Result-Code', RC}, CEA). post('CER' = T, RC, Pkt, S) -> - [fun close/2, {T, caps(S), {RC, Pkt}}]; + {T, caps(S), {RC, Pkt}}; post(_, _, _, _) -> ok. rejected({capabilities_cb, _F, Reason}, T, S) -> rejected(Reason, T, S); -rejected(discard, T, S) -> - close(T, S); +rejected(discard, T, _) -> + close(T); rejected({N, Es}, T, S) -> - {answer('CER', N, Es, S), [fun close/2, T]}; + {answer('CER', N, Es, S), T}; rejected(N, T, S) -> rejected({N, []}, T, S). @@ -728,7 +756,7 @@ is_origin({N, _}) -> orelse N == 'Origin-State-Id'. %% failed_avp/1 - + failed_avp([] = No) -> No; failed_avp(Avps) -> @@ -817,22 +845,23 @@ a('DPR', #diameter_caps{origin_host = {Host, _}, %% recv_CER/2 -recv_CER(CER, #state{service = Svc}) -> - {ok, T} = diameter_capx:recv_CER(CER, Svc), +recv_CER(CER, #state{service = Svc, dictionary = Dict}) -> + {ok, T} = diameter_capx:recv_CER(CER, Svc, Dict), T. %% handle_CEA/1 handle_CEA(#diameter_packet{bin = Bin} = Pkt, - #state{service = #diameter_service{capabilities = LCaps}} + #state{dictionary = Dict0, + service = #diameter_service{capabilities = LCaps}} = S) when is_binary(Bin) -> ?LOG(recv, 'CEA'), #diameter_packet{msg = CEA} = DPkt - = diameter_codec:decode(?BASE, Pkt), + = diameter_codec:decode(Dict0, Pkt), {SApps, IS, RCaps} = recv_CEA(DPkt, S), @@ -840,8 +869,7 @@ handle_CEA(#diameter_packet{bin = Bin} = Caps = capz(LCaps, RCaps), - #diameter_base_CEA{'Result-Code' = RC} - = CEA, + RC = Dict0:'#get-'('Result-Code', CEA), %% Ensure that we don't already have a connection to the peer in %% question. This isn't the peer election of 3588 except in the @@ -862,7 +890,7 @@ handle_CEA(#diameter_packet{bin = Bin} of _ -> open(DPkt, SApps, Caps, {connect, hd([_] = IS)}, S) catch - ?FAILURE(Reason) -> close({'CEA', Reason, Caps, DPkt}, S) + ?FAILURE(Reason) -> close({'CEA', Reason, Caps, DPkt}) end. %% Check more than the result code since the peer could send success %% regardless. If not 2001 then a peer_up callback could do anything @@ -877,12 +905,13 @@ recv_CEA(#diameter_packet{header = #diameter_header{version is_error = false}, msg = CEA, errors = []}, - #state{service = Svc}) -> - {ok, T} = diameter_capx:recv_CEA(CEA, Svc), + #state{service = Svc, + dictionary = Dict}) -> + {ok, T} = diameter_capx:recv_CEA(CEA, Svc, Dict), T; recv_CEA(Pkt, S) -> - close({'CEA', caps(S), Pkt}, S). + close({'CEA', caps(S), Pkt}). caps(#diameter_service{capabilities = Caps}) -> Caps; @@ -935,14 +964,14 @@ open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid, %% We've advertised TLS support: tell the transport the result %% and expect a reply when the handshake is complete. -tls_ack(true, Caps, Type, IS, #state{transport = TPid} = S) -> +tls_ack(true, Caps, Type, IS, #state{transport = TPid}) -> Ref = make_ref(), TPid ! {diameter, {tls, Ref, Type, IS == ?TLS}}, receive {diameter, {tls, Ref}} -> ok; {'DOWN', _, process, TPid, Reason} -> - close({tls_ack, Reason, Caps}, S) + close({tls_ack, Reason, Caps}) end; %% Or not. Don't send anything to the transport so that transports @@ -955,25 +984,11 @@ capz(#diameter_caps{} = L, #diameter_caps{} = R) -> = list_to_tuple([diameter_caps | lists:zip(tl(tuple_to_list(L)), tl(tuple_to_list(R)))]). -%% close/2 +%% close/1 -%% Tell the watchdog that our death isn't due to transport failure. -close(Reason, #state{parent = Pid}) -> - close_wd(Reason, Pid), +close(Reason) -> throw({?MODULE, close, Reason}). -%% close_wd/2 - -%% Ensure the watchdog dies if DPR has been sent ... -close_wd(_, #state{dpr = false}) -> - ok; -close_wd(Reason, #state{parent = Pid}) -> - close_wd(Reason, Pid); - -%% ... or otherwise -close_wd(Reason, Pid) -> - Pid ! {close, self(), Reason}. - %% dwa/1 dwa(#diameter_caps{origin_host = OH, @@ -1035,13 +1050,14 @@ dpr([CB|Rest], [Reason | _] = Args, S) -> diameter_lib:error_report(failure, No), {stop, No} end; - + dpr([], [Reason | _], S) -> send_dpr(Reason, [], S). -record(opts, {cause, timeout = ?DPA_TIMEOUT}). send_dpr(Reason, Opts, #state{transport = TPid, + dictionary = Dict, service = #diameter_service{capabilities = Caps}} = S) -> #opts{cause = Cause, timeout = Tmo} @@ -1061,7 +1077,8 @@ send_dpr(Reason, Opts, #state{transport = TPid, = Pkt = encode(['DPR', {'Origin-Host', OH}, {'Origin-Realm', OR}, - {'Disconnect-Cause', Cause}]), + {'Disconnect-Cause', Cause}], + Dict), send(TPid, Pkt), dpa_timer(Tmo), ?LOG(send, 'DPR'), diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 619b12ecad..ac58e4bf5b 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2012. All Rights Reserved. +%% Copyright Ericsson AB 2010-2013. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -208,10 +208,6 @@ init(_) -> %% # handle_call/3 %% ---------------------------------------------------------- -handle_call(Req, From, S) - when not is_record(S, state) -> - handle_call(Req, From, upgrade(S)); - handle_call({add, Fun, Key, Pid}, _, S) -> B = Fun(?TABLE, {Key, Pid}), monitor(B andalso no_monitor(Pid), Pid), @@ -281,9 +277,6 @@ code_change(_OldVsn, State, _Extra) -> %% =========================================================================== -upgrade(S) -> - #state{} = list_to_tuple(tuple_to_list(S) ++ [[]]). - monitor(true, Pid) -> ets:insert(?TABLE, ?MONITOR(Pid, erlang:monitor(process, Pid))); monitor(false, _) -> diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index d2a416166f..f1342df16c 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -24,33 +24,38 @@ -module(diameter_service). -behaviour(gen_server). +%% towards diameter_service_sup +-export([start_link/1]). + +%% towards diameter +-export([subscribe/1, + unsubscribe/1, + services/0, + info/2]). + +%% towards diameter_config -export([start/1, stop/1, start_transport/2, - stop_transport/2, - info/2, - call/4]). + stop_transport/2]). -%% towards diameter_watchdog --export([receive_message/3]). +%% towards diameter_peer +-export([notify/2]). -%% service supervisor --export([start_link/1]). +%% towards diameter_traffic +-export([find_incoming_app/4, + pick_peer/3]). --export([subscribe/1, - unsubscribe/1, +%% test/debug +-export([services/1, subscriptions/1, subscriptions/0, - services/0, - services/1, - whois/1]). - -%% test/debug --export([call_module/3, + call_module/3, + whois/1, state/1, uptime/1]). -%%% gen_server callbacks +%% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, @@ -58,21 +63,10 @@ terminate/2, code_change/3]). -%% Other callbacks. --export([send/1]). - -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). -%% The states mirrored by peer_up/peer_down callbacks. --define(STATE_UP, up). --define(STATE_DOWN, down). - --type op_state() :: ?STATE_UP - | ?STATE_DOWN. - -%% The RFC 3539 watchdog states that are now maintained, albeit -%% along with the old up/down. okay = up, else down. +%% RFC 3539 watchdog states. -define(WD_INITIAL, initial). -define(WD_OKAY, okay). -define(WD_SUSPECT, suspect). @@ -86,11 +80,8 @@ | ?WD_REOPEN. -define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1 --define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests -define(RESTART_TC, 1000). %% if restart was this recent --define(RELAY, ?DIAMETER_DICT_RELAY). - %% Used to be able to swap this with anything else dict-like but now %% rely on the fact that a service's #state{} record does not change %% in storing in it ?STATE table and not always going through the @@ -98,10 +89,6 @@ %% a ?Dict don't change the handle to it. -define(Dict, diameter_dict). -%% Table containing outgoing requests for which a reply has yet to be -%% received. --define(REQUEST_TABLE, diameter_request). - %% Maintains state in a table. In contrast to previously, a service's %% stat is not constant and is accessed outside of the service %% process. @@ -115,83 +102,58 @@ %% Workaround for dialyzer's lack of understanding of match specs. -type match(T) - :: T | '_' | '$1' | '$2' | '$3' | '$4'. - -%% State of service gen_server. + :: T | '_' | '$1' | '$2'. + +%% State of service gen_server. Note that the state term itself +%% doesn't change, which is relevant for the stateless application +%% callbacks since the state is retrieved from ?STATE_TABLE from +%% outside the service process. The pid in the service record is used +%% to determine whether or not we need to call the process for a +%% pick_peer callback in the statefull case. -record(state, {id = now(), - service_name, %% as passed to start_service/2, key in ?STATE_TABLE + service_name :: diameter:service_name(), %% key in ?STATE_TABLE service :: #diameter_service{}, - peerT = ets_new(peers) :: ets:tid(),%% #peer{} at start_fsm - connT = ets_new(conns) :: ets:tid(),%% #conn{} at connection_up/reopen - shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] - local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] + watchdogT = ets_new(watchdogs) %% #watchdog{} at start + :: ets:tid(), + peerT = ets_new(peers) %% #peer{pid = TPid} at okay/reopen + :: ets:tid(), + shared_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...] + :: ets:tid(), + local_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...] + :: ets:tid(), monitor = false :: false | pid(), %% process to die with options :: [{sequence, diameter:sequence()} %% sequence mask | {restrict_connections, diameter:restriction()} | {share_peers, boolean()} %% broadcast peers to remote nodes? | {use_shared_peers, boolean()}]}).%% use broadcasted peers? -%% shared_peers reflects the peers broadcast from remote nodes. Note -%% that the state term itself doesn't change, which is relevant for -%% the stateless application callbacks since the state is retrieved -%% from ?STATE_TABLE from outside the service process. The pid in the -%% service record is used to determine whether or not we need to call -%% the process for a pick_peer callback. - -%% Record representing a watchdog process as implemented by -%% diameter_watchdog. The term "peer" here is historical, made -%% especially confusing by the fact that a peer_ref() in the -%% documentation is the key of a #conn{} record, not a #peer{} record. -%% The name is also unfortunate given the meaning of peer in the -%% Diameter sense. --record(peer, +%% shared_peers reflects the peers broadcast from remote nodes. + +%% Record representing an RFC 3539 watchdog process implemented by +%% diameter_watchdog. +-record(watchdog, {pid :: match(pid()), type :: match(connect | accept), ref :: match(reference()), %% key into diameter_config options :: match([diameter:transport_opt()]),%% from start_transport - op_state = {?STATE_DOWN, ?WD_INITIAL} - :: match(op_state() | {op_state(), wd_state()}), + state = ?WD_INITIAL :: match(wd_state()), started = now(), %% at process start - conn = false :: match(boolean() | pid())}). - %% true at accepted, pid() at connection_up or reopen - -%% Record representing a peer process as implemented by -%% diameter_peer_fsm. The term "conn" is historical. Despite the name -%% here, comments refer to watchdog and peer processes, that are keys -%% in #peer{} and #conn{} records respectively. To add to the -%% confusion, a #request.transport is a peer process = key in a -%% #conn{} record. The actual transport process (that the peer process -%% knows about and that has a transport connection) isn't seen here. --record(conn, + peer = false :: match(boolean() | pid())}). + %% true at accepted, pid() at okay/reopen + +%% Record representing an Peer State Machine processes implemented by +%% diameter_peer_fsm. +-record(peer, {pid :: pid(), apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias} caps :: #diameter_caps{}, started = now(), %% at process start - peer :: pid()}). %% key into peerT - -%% Record stored in diameter_request for each outgoing request. --record(request, - {from, %% arg 2 of handle_call/3 - handler :: match(pid()), %% request process - transport :: match(pid()), %% peer process - caps :: match(#diameter_caps{}), - app :: match(diameter:app_alias()),%% #diameter_app.alias - dictionary :: match(module()), %% #diameter_app.dictionary - module :: match([module() | list()]), %% #diameter_app.module - filter :: match(diameter:peer_filter()), - packet :: match(#diameter_packet{})}). - -%% Record call/4 options are parsed into. --record(options, - {filter = none :: diameter:peer_filter(), - extra = [] :: list(), - timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF, - detach = false :: boolean()}). - -%%% --------------------------------------------------------------------------- -%%% # start(SvcName) -%%% --------------------------------------------------------------------------- + watchdog :: pid()}). %% key into watchdogT + +%% --------------------------------------------------------------------------- +%% # start/1 +%% --------------------------------------------------------------------------- start(SvcName) -> diameter_service_sup:start_child(SvcName). @@ -202,9 +164,9 @@ start_link(SvcName) -> %% Put the arbitrary term SvcName in a list in case we ever want to %% send more than this and need to distinguish old from new. -%%% --------------------------------------------------------------------------- -%%% # stop(SvcName) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # stop/1 +%% --------------------------------------------------------------------------- stop(SvcName) -> case whois(SvcName) of @@ -220,180 +182,43 @@ stop(ok, Pid) -> stop(No, _) -> No. -%%% --------------------------------------------------------------------------- -%%% # start_transport(SvcName, {Ref, Type, Opts}) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # start_transport/3 +%% --------------------------------------------------------------------------- -start_transport(SvcName, {_,_,_} = T) -> +start_transport(SvcName, {_Ref, _Type, _Opts} = T) -> call_service_by_name(SvcName, {start, T}). -%%% --------------------------------------------------------------------------- -%%% # stop_transport(SvcName, Refs) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # stop_transport/2 +%% --------------------------------------------------------------------------- stop_transport(_, []) -> ok; stop_transport(SvcName, [_|_] = Refs) -> call_service_by_name(SvcName, {stop, Refs}). -%%% --------------------------------------------------------------------------- -%%% # info(SvcName, Item) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # info/2 +%% --------------------------------------------------------------------------- info(SvcName, Item) -> - case find_state(SvcName) of - #state{} = S -> + case lookup_state(SvcName) of + [#state{} = S] -> service_info(Item, S); - false -> + [] -> undefined end. -%%% --------------------------------------------------------------------------- -%%% # receive_message(TPid, Pkt, MessageData) -%%% --------------------------------------------------------------------------- - -%% Handle an incoming Diameter message in the watchdog process. This -%% used to come through the service process but this avoids that -%% becoming a bottleneck. - -receive_message(TPid, Pkt, T) - when is_pid(TPid) -> - #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, - recv(R, (not R) andalso lookup_request(Pkt, TPid), TPid, Pkt, T). - -%% Incoming request ... -recv(true, false, TPid, Pkt, T) -> - try - spawn(fun() -> recv_request(TPid, Pkt, T) end) - catch - error: system_limit = E -> %% discard - ?LOG({error, E}, now()) - end; - -%% ... answer to known request ... -recv(false, #request{from = {_, Ref}, handler = Pid} = Req, _, Pkt, _) -> - Pid ! {answer, Ref, Req, Pkt}; -%% Note that failover could have happened prior to this message being -%% received and triggering failback. That is, both a failover message -%% and answer may be on their way to the handler process. In the worst -%% case the request process gets notification of the failover and -%% sends to the alternate peer before an answer arrives, so it's -%% always the case that we can receive more than one answer after -%% failover. The first answer received by the request process wins, -%% any others are discarded. - -%% ... or not. -recv(false, false, _, _, _) -> - ok. - -%%% --------------------------------------------------------------------------- -%%% # call(SvcName, App, Msg, Options) -%%% --------------------------------------------------------------------------- - -call(SvcName, App, Msg, Options) - when is_list(Options) -> - Rec = make_options(Options), - Ref = make_ref(), - Caller = {self(), Ref}, - Fun = fun() -> exit({Ref, call(SvcName, App, Msg, Rec, Caller)}) end, - try spawn_monitor(Fun) of - {_, MRef} -> - recv(MRef, Ref, Rec#options.detach, false) - catch - error: system_limit = E -> - {error, E} - end. - -%% Don't rely on gen_server:call/3 for the timeout handling since it -%% makes no guarantees about not leaving a reply message in the -%% mailbox if we catch its exit at timeout. It currently *can* do so, -%% which is also undocumented. - -recv(MRef, _, true, true) -> - erlang:demonitor(MRef, [flush]), - ok; - -recv(MRef, Ref, Detach, Sent) -> - receive - Ref -> %% send has been attempted - recv(MRef, Ref, Detach, true); - {'DOWN', MRef, process, _, Reason} -> - call_rc(Reason, Ref, Sent) - end. - -%% call/5 has returned ... -call_rc({Ref, Ans}, Ref, _) -> - Ans; - -%% ... or not. In this case failure/encode are documented. -call_rc(_, _, Sent) -> - {error, choose(Sent, failure, encode)}. - -%% call/5 -%% -%% In the process spawned for the outgoing request. - -call(SvcName, App, Msg, Opts, Caller) -> - c(find_state(SvcName), App, Msg, Opts, Caller). - -c(#state{service_name = Svc, options = [{_, Mask} | _]} = S, - App, - Msg, - Opts, - Caller) -> - case find_transport(App, Msg, Opts, S) of - {_,_,_} = T -> - send_request(T, Mask, Msg, Opts, Caller, Svc); - false -> - {error, no_connection}; - {error, _} = No -> - No - end; - -c(false, _, _, _, _) -> - {error, no_service}. - -%% find_state/1 - -find_state(SvcName) -> - fs(ets:lookup(?STATE_TABLE, SvcName)). - -fs([#state{} = S]) -> - S; - -fs([]) -> - false. - -%% make_options/1 +%% lookup_state/1 -make_options(Options) -> - lists:foldl(fun mo/2, #options{}, Options). +lookup_state(SvcName) -> + ets:lookup(?STATE_TABLE, SvcName). -mo({timeout, T}, Rec) - when is_integer(T), 0 =< T -> - Rec#options{timeout = T}; - -mo({filter, F}, #options{filter = none} = Rec) -> - Rec#options{filter = F}; -mo({filter, F}, #options{filter = {all, Fs}} = Rec) -> - Rec#options{filter = {all, [F | Fs]}}; -mo({filter, F}, #options{filter = F0} = Rec) -> - Rec#options{filter = {all, [F0, F]}}; - -mo({extra, L}, #options{extra = X} = Rec) - when is_list(L) -> - Rec#options{extra = X ++ L}; - -mo(detach, Rec) -> - Rec#options{detach = true}; - -mo(T, _) -> - ?ERROR({invalid_option, T}). - -%%% --------------------------------------------------------------------------- -%%% # subscribe(SvcName) -%%% # unsubscribe(SvcName) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # subscribe/1 +%% # unsubscribe/1 +%% --------------------------------------------------------------------------- subscribe(SvcName) -> diameter_reg:add({?MODULE, subscriber, SvcName}). @@ -410,9 +235,9 @@ subscriptions() -> pmap(Props) -> lists:map(fun({{?MODULE, _, Name}, Pid}) -> {Name, Pid} end, Props). -%%% --------------------------------------------------------------------------- -%%% # services(Pattern) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # services/1 +%% --------------------------------------------------------------------------- services(Pat) -> pmap(diameter_reg:match({?MODULE, service, Pat})). @@ -428,6 +253,86 @@ whois(SvcName) -> undefined end. +%% --------------------------------------------------------------------------- +%% # pick_peer/3 +%% --------------------------------------------------------------------------- + +-spec pick_peer(SvcName, AppOrAlias, Opts) + -> {{TPid, Caps, App}, Mask} + | false + | {error, term()} + when SvcName :: diameter:service_name(), + AppOrAlias :: {alias, diameter:app_alias()} | #diameter_app{}, + Opts :: tuple(), + TPid :: pid(), + Caps :: #diameter_caps{}, + App :: #diameter_app{}, + Mask :: diameter:sequence(). + +pick_peer(SvcName, App, Opts) -> + pick(lookup_state(SvcName), App, Opts). + +pick([], _, _) -> + {error, no_service}; + +pick([S], App, Opts) -> + pick(S, App, Opts); + +pick(#state{service = #diameter_service{applications = Apps}} + = S, + {alias, Alias}, + Opts) -> %% initial call from diameter:call/4 + pick(S, find_outgoing_app(Alias, Apps), Opts); + +pick(_, false, _) -> + false; + +pick(#state{options = [{_, Mask} | _]} + = S, + #diameter_app{module = ModX, dictionary = Dict} + = App0, + {DestF, Filter, Xtra}) -> + App = App0#diameter_app{module = ModX ++ Xtra}, + [_,_] = RealmAndHost = diameter_lib:eval([DestF, Dict]), + case pick_peer(App, RealmAndHost, Filter, S) of + {TPid, Caps} -> + {{TPid, Caps, App}, Mask}; + false = No -> + No + end. + +%% --------------------------------------------------------------------------- +%% # find_incoming_app/4 +%% --------------------------------------------------------------------------- + +-spec find_incoming_app(PeerT, TPid, Id, Apps) + -> {#diameter_app{}, #diameter_caps{}} %% connection and suitable app + | #diameter_caps{} %% connection but no suitable app + | false %% no connection + when PeerT :: ets:tid(), + TPid :: pid(), + Id :: non_neg_integer(), + Apps :: [#diameter_app{}]. + +find_incoming_app(PeerT, TPid, Id, Apps) -> + try ets:lookup(PeerT, TPid) of + [#peer{} = P] -> + find_incoming_app(P, Id, Apps); + [] -> %% transport has gone down + false + catch + error: badarg -> %% service has gone down (and taken table with it) + false + end. + +%% --------------------------------------------------------------------------- +%% # notify/2 +%% --------------------------------------------------------------------------- + +notify(SvcName, Msg) -> + Pid = whois(SvcName), + is_pid(Pid) andalso (Pid ! Msg). + %% =========================================================================== %% =========================================================================== @@ -442,9 +347,9 @@ uptime(Svc) -> call_module(Service, AppMod, Request) -> call_service(Service, {call_module, AppMod, Request}). -%%% --------------------------------------------------------------------------- -%%% # init([SvcName]) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # init/1 +%% --------------------------------------------------------------------------- init([SvcName]) -> process_flag(trap_exit, true), %% ensure terminate(shutdown, _) @@ -455,9 +360,9 @@ i(SvcName, true) -> i(_, false) -> {stop, {shutdown, already_started}}. -%%% --------------------------------------------------------------------------- -%%% # handle_call(Req, From, State) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # handle_call/3 +%% --------------------------------------------------------------------------- handle_call(state, _, S) -> {reply, S, S}; @@ -493,17 +398,17 @@ handle_call(Req, From, S) -> unexpected(handle_call, [Req, From], S), {reply, nok, S}. -%%% --------------------------------------------------------------------------- -%%% # handle_cast(Req, State) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # handle_cast/2 +%% --------------------------------------------------------------------------- handle_cast(Req, S) -> unexpected(handle_cast, [Req], S), {noreply, S}. -%%% --------------------------------------------------------------------------- -%%% # handle_info(Req, State) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # handle_info/2 +%% --------------------------------------------------------------------------- handle_info(T, #state{} = S) -> case transition(T,S) of @@ -520,67 +425,39 @@ transition({accepted, Pid, TPid}, S) -> accepted(Pid, TPid, S), ok; -%% Peer process has a new open connection. -transition({connection_up, Pid, T}, S) -> - connection_up(Pid, T, S), - ok; - -%% Watchdog has a new connection that will be opened after DW[RA] -%% exchange. This message was added long after connection_up, to -%% communicate the information as soon as it's available. Leave -%% connection_up as is it for now, duplicated information and all. -transition({reopen, Pid, T}, S) -> - reopen(Pid, T, S), - ok; - -%% Watchdog has left state OKAY. -transition({connection_down, Pid}, S) -> - connection_down(Pid, S), - ok; - -%% Watchdog has returned to state OKAY. -transition({connection_up, Pid}, S) -> - connection_up(Pid, S), - ok; - -%% Accepting transport has lost connectivity. -transition({close, Pid}, S) -> - close(Pid, S), - ok; - %% Connecting transport is being restarted by watchdog. transition({reconnect, Pid}, S) -> reconnect(Pid, S), ok; -%% Watchdog is sending notification of a state transition. Note that -%% the connection_up/down messages pre-date this message and are still -%% used. A watchdog message will follow these and communicate the same -%% state as was set in handling connection_up/down. -transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName, - peerT = PeerT}) -> - #peer{ref = Ref, type = T, options = Opts, op_state = {OS,_}} - = P - = fetch(PeerT, Pid), - insert(PeerT, P#peer{op_state = {OS, To}}), +%% Watchdog is sending notification of transport death. +transition({close, Pid, Reason}, #state{service_name = SvcName, + watchdogT = WatchdogT}) -> + #watchdog{state = WS, + ref = Ref, + type = Type, + options = Opts} + = fetch(WatchdogT, Pid), + WS /= ?WD_OKAY + andalso + send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}), + ok; + +%% Watchdog is sending notification of a state transition. +transition({watchdog, Pid, {[TPid | Data], From, To}}, + #state{service_name = SvcName, + watchdogT = WatchdogT} + = S) -> + #watchdog{ref = Ref, type = T, options = Opts} + = Wd + = fetch(WatchdogT, Pid), + watchdog(TPid, Data, From, To, Wd, S), send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}), ok; -%% Death of a watchdog process (#peer.pid) results in the removal of -%% it's peer and any associated conn record when 'DOWN' is received -%% (after this) but the states will be {?STATE_UP, ?WD_DOWN} for a -%% short time. (No real problem since ?WD_* is only used in -%% service_info.) We set ?WD_OKAY as a consequence of connection_up -%% since we know a watchdog is coming. We can't set anything at -%% connection_down since we don't know if the subsequent watchdog -%% message will be ?WD_DOWN or ?WD_SUSPECT. We don't (yet) set -%% ?STATE_* as a consequence of a watchdog message since this requires -%% changing some of the matching on ?STATE_*. -%% -%% Death of a peer process process (#conn.pid, #peer.conn) results in -%% connection_down followed by watchdog ?WD_DOWN. The latter doesn't -%% result in the conn record being deleted since 'DOWN' from death of -%% its watchdog doesn't (yet) deal with the record having been -%% removed. +%% Death of a watchdog process (#watchdog.pid) results in the removal of +%% it's peer and any associated conn record when 'DOWN' is received. +%% Death of a peer process process (#peer.pid, #watchdog.peer) results in +%% ?WD_DOWN. %% Monitor process has died. Just die with a reason that tells %% diameter_config about the happening. If a cleaner shutdown is @@ -589,9 +466,9 @@ transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) -> {stop, {monitor, Reason}}; %% Local watchdog process has died. -transition({'DOWN', _, process, Pid, Reason}, S) +transition({'DOWN', _, process, Pid, _Reason}, S) when node(Pid) == node() -> - peer_down(Pid, Reason, S), + watchdog_down(Pid, S), ok; %% Remote service wants to know about shared peers. @@ -614,20 +491,13 @@ transition({tc_timeout, T}, S) -> tc_timeout(T, S), ok; -%% Request process is telling us it may have missed a failover message -%% after a transport went down and the service process looked up -%% outstanding requests. -transition({failover, TRef, Seqs}, S) -> - failover(TRef, Seqs, S), - ok; - transition(Req, S) -> unexpected(handle_info, [Req], S), ok. -%%% --------------------------------------------------------------------------- -%%% # terminate(Reason, State) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # terminate/2 +%% --------------------------------------------------------------------------- terminate(Reason, #state{service_name = Name} = S) -> send_event(Name, stop), @@ -635,9 +505,9 @@ terminate(Reason, #state{service_name = Name} = S) -> shutdown == Reason %% application shutdown andalso shutdown(application, S). -%%% --------------------------------------------------------------------------- -%%% # code_change(FromVsn, State, Extra) -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # code_change/3 +%% --------------------------------------------------------------------------- code_change(FromVsn, #state{service_name = SvcName, @@ -663,30 +533,20 @@ code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) -> unexpected(F, A, #state{service_name = Name}) -> ?UNEXPECTED(F, A ++ [Name]). -cb([_|_] = M, F, A) -> - eval(M, F, A); -cb(Rec, F, A) -> - {_, M} = app(Rec), +cb(#diameter_app{module = [_|_] = M}, F, A) -> eval(M, F, A). -app(#request{app = A, module = M}) -> - {A,M}; -app(#diameter_app{alias = A, module = M}) -> - {A,M}. - eval([M|X], F, A) -> apply(M, F, A ++ X). %% Callback with state. -state_cb(#diameter_app{mutable = false, init_state = S}, {ModX, F, A}) -> +state_cb(#diameter_app{module = ModX, mutable = false, init_state = S}, + pick_peer = F, + A) -> eval(ModX, F, A ++ [S]); -state_cb(#diameter_app{mutable = true, alias = Alias}, {_,_,_} = MFA) -> - state_cb(MFA, Alias); - -state_cb({ModX,F,A}, Alias) - when is_list(ModX) -> +state_cb(#diameter_app{module = ModX, alias = Alias}, F, A) -> eval(ModX, F, A ++ [mod_state(Alias)]). choose(true, X, _) -> X; @@ -712,57 +572,38 @@ mod_state(Alias) -> mod_state(Alias, ModS) -> put({?MODULE, mod_state, Alias}, ModS). -%%% --------------------------------------------------------------------------- -%%% # shutdown/2 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # shutdown/2 +%% --------------------------------------------------------------------------- -%% remove_transport: ask watchdogs to terminate their transport. -shutdown(Refs, #state{peerT = PeerT}) +%% remove_transport +shutdown(Refs, #state{watchdogT = WatchdogT}) when is_list(Refs) -> - ets:foldl(fun(P,ok) -> sp(P, Refs), ok end, ok, PeerT); - -%% application/service shutdown: ask transports to terminate themselves. -shutdown(Reason, #state{peerT = PeerT}) -> - %% A transport might not be alive to receive the shutdown request - %% but give those that are a chance to shutdown gracefully. - shutdown(conn, Reason, PeerT), - %% Kill the watchdogs explicitly in case there was no transport. - shutdown(peer, Reason, PeerT). - -%% sp/2 - -sp(#peer{ref = Ref, pid = Pid}, Refs) -> - lists:member(Ref, Refs) - andalso (Pid ! {shutdown, self()}). %% 'DOWN' cleans up + ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, WatchdogT); -%% shutdown/3 - -shutdown(Who, Reason, T) -> - diameter_lib:wait(ets:foldl(fun(X,A) -> shutdown(Who, X, Reason, A) end, +%% application/service shutdown +shutdown(Reason, #state{watchdogT = WatchdogT}) + when Reason == application; + Reason == service -> + diameter_lib:wait(ets:foldl(fun(P,A) -> st(P, Reason, A) end, [], - T)). + WatchdogT)). -shutdown(conn = Who, #peer{op_state = {OS,_}} = P, Reason, Acc) -> - shutdown(Who, P#peer{op_state = OS}, Reason, Acc); +%% st/2 -shutdown(conn, - #peer{pid = Pid, op_state = ?STATE_UP, conn = TPid}, - Reason, - Acc) -> - TPid ! {shutdown, Pid, Reason}, - [TPid | Acc]; +st(#watchdog{ref = Ref, pid = Pid}, Refs) -> + lists:member(Ref, Refs) + andalso (Pid ! {shutdown, self(), transport}). %% 'DOWN' cleans up -shutdown(peer, #peer{pid = Pid}, _Reason, Acc) - when is_pid(Pid) -> - exit(Pid, shutdown), - [Pid | Acc]; +%% st/3 -shutdown(_, #peer{}, _, Acc) -> - Acc. +st(#watchdog{pid = Pid}, Reason, Acc) -> + Pid ! {shutdown, self(), Reason}, + [Pid | Acc]. -%%% --------------------------------------------------------------------------- -%%% # call_service/2 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # call_service/2 +%% --------------------------------------------------------------------------- call_service(Pid, Req) when is_pid(Pid) -> @@ -785,11 +626,9 @@ cs(Pid, Req) cs(undefined, _) -> {error, no_service}. -%%% --------------------------------------------------------------------------- -%%% # i/1 -%%% -%%% Output: #state{} -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # i/1 +%% --------------------------------------------------------------------------- %% Intialize the state of a service gen_server. @@ -859,9 +698,9 @@ get_value(Key, Vs) -> {_, V} = lists:keyfind(Key, 1, Vs), V. -%%% --------------------------------------------------------------------------- -%%% # start/3 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # start/3 +%% --------------------------------------------------------------------------- %% If the initial start/3 at service/transport start succeeds then %% subsequent calls to start/4 on the same service will also succeed @@ -891,22 +730,28 @@ type(connect = T) -> T. %% start/4 -start(Ref, Type, Opts, #state{peerT = PeerT, - connT = ConnT, +start(Ref, Type, Opts, #state{watchdogT = WatchdogT, + peerT = PeerT, options = SvcOpts, service_name = SvcName, - service = Svc}) + service = Svc0}) when Type == connect; Type == accept -> - Pid = s(Type, Ref, {ConnT, + #diameter_service{applications = Apps} + = Svc + = merge_service(Opts, Svc0), + {_,_} = Mask = proplists:get_value(sequence, SvcOpts), + Pid = s(Type, Ref, {diameter_traffic:make_recvdata([SvcName, + PeerT, + Apps, + Mask]), Opts, - SvcName, SvcOpts, - merge_service(Opts, Svc)}), - insert(PeerT, #peer{pid = Pid, - type = Type, - ref = Ref, - options = Opts}), + Svc}), + insert(WatchdogT, #watchdog{pid = Pid, + type = Type, + ref = Ref, + options = Opts}), Pid. %% Note that the service record passed into the watchdog is the merged @@ -949,100 +794,115 @@ ms({capabilities, Opts}, #diameter_service{capabilities = Caps0} = Svc) ms(_, Svc) -> Svc. -%%% --------------------------------------------------------------------------- -%%% # accepted/3 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # accepted/3 +%% --------------------------------------------------------------------------- -accepted(Pid, _TPid, #state{peerT = PeerT} = S) -> - #peer{ref = Ref, type = accept = T, conn = false, options = Opts} - = P - = fetch(PeerT, Pid), - insert(PeerT, P#peer{conn = true}), %% mark replacement as started - start(Ref, T, Opts, S). %% start new watchdog +accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) -> + #watchdog{ref = Ref, type = accept = T, peer = false, options = Opts} + = Wd + = fetch(WatchdogT, Pid), + insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement as started + start(Ref, T, Opts, S). %% start new watchdog fetch(Tid, Key) -> [T] = ets:lookup(Tid, Key), - case T of - #peer{op_state = ?STATE_UP} = P -> - P#peer{op_state = {?STATE_UP, ?WD_OKAY}}; - #peer{op_state = ?STATE_DOWN} = P -> - P#peer{op_state = {?STATE_DOWN, ?WD_DOWN}}; - _ -> - T - end. + T. -%%% --------------------------------------------------------------------------- -%%% # connection_up/3 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # watchdog/6 +%% +%% React to a watchdog state transition. +%% --------------------------------------------------------------------------- -%% Watchdog process has reached state OKAY. +%% Watchdog has a new open connection. +watchdog(TPid, [T], _, ?WD_OKAY, Wd, State) -> + connection_up({TPid, T}, Wd, State); -connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT, - connT = ConnT} - = S) -> - P = fetch(PeerT, Pid), - C = #conn{pid = TPid, - apps = SApps, - caps = Caps, - peer = Pid}, - - insert(ConnT, C), - connection_up([Pkt], P#peer{conn = TPid}, C, S). - -%%% --------------------------------------------------------------------------- -%%% # reopen/3 -%%% --------------------------------------------------------------------------- - -%% Note that this connection_up/3 rewrites the same #conn{} now -%% written here. Both do so in case reopen has not happened in old -%% code. - -reopen(Pid, {TPid, {Caps, SApps, _Pkt}}, #state{peerT = PeerT, - connT = ConnT}) -> - P = fetch(PeerT, Pid), - C = #conn{pid = TPid, - apps = SApps, - caps = Caps, - peer = Pid}, - - insert(ConnT, C), - #peer{op_state = {?STATE_DOWN, _}} - = P, - insert(PeerT, P#peer{op_state = {?STATE_DOWN, ?WD_REOPEN}, - conn = TPid}). - -%%% --------------------------------------------------------------------------- -%%% # connection_up/2 -%%% --------------------------------------------------------------------------- - -%% Peer process has transitioned back into the open state. Note that there -%% has been no new capabilties exchange in this case. - -connection_up(Pid, #state{peerT = PeerT, - connT = ConnT} - = S) -> - #peer{conn = TPid} = P = fetch(PeerT, Pid), - C = fetch(ConnT, TPid), - connection_up([], P, C, S). +%% Watchdog has a new connection that will be opened after DW[RA] +%% exchange. +watchdog(TPid, [T], _, ?WD_REOPEN, Wd, State) -> + reopen({TPid, T}, Wd, State); + +%% Watchdog has recovered a suspect connection. +watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) -> + #watchdog{peer = TPid} = Wd, %% assert + connection_up(Wd, State); + +%% Watchdog has an unresponsive connection. +watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) -> + #watchdog{peer = TPid} = Wd, %% assert + connection_down(Wd, To, State); + +%% Watchdog has lost its connection. +watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) -> + close(Wd, S), + connection_down(Wd, To, S), + ets:delete(PeerT, TPid); + +watchdog(_, [], _, _, _, _) -> + ok. -%% connection_up/4 +%% --------------------------------------------------------------------------- +%% # connection_up/3 +%% --------------------------------------------------------------------------- -connection_up(T, P, C, #state{peerT = PeerT, - local_peers = LDict, - service_name = SvcName, - service - = #diameter_service{applications = Apps}} - = S) -> - #peer{conn = TPid, op_state = {?STATE_DOWN, _}} - = P, - #conn{apps = SApps, caps = Caps} - = C, +%% Watchdog process has reached state OKAY. + +connection_up({TPid, {Caps, SupportedApps, Pkt}}, + #watchdog{pid = Pid} + = Wd, + #state{peerT = PeerT} + = S) -> + Pr = #peer{pid = TPid, + apps = SupportedApps, + caps = Caps, + watchdog = Pid}, + insert(PeerT, Pr), + connection_up([Pkt], Wd#watchdog{peer = TPid}, Pr, S). + +%% --------------------------------------------------------------------------- +%% # reopen/3 +%% --------------------------------------------------------------------------- + +reopen({TPid, {Caps, SupportedApps, _Pkt}}, + #watchdog{pid = Pid} + = Wd, + #state{watchdogT = WatchdogT, + peerT = PeerT}) -> + insert(PeerT, #peer{pid = TPid, + apps = SupportedApps, + caps = Caps, + watchdog = Pid}), + insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN, + peer = TPid}). + +%% --------------------------------------------------------------------------- +%% # connection_up/2 +%% --------------------------------------------------------------------------- + +%% Watchdog has recovered as suspect connection. Note that there has +%% been no new capabilties exchange in this case. + +connection_up(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> + connection_up([], Wd, fetch(PeerT, TPid), S). - insert(PeerT, P#peer{op_state = {?STATE_UP, ?WD_OKAY}}), +%% connection_up/4 - request_peer_up(TPid), +connection_up(Extra, + #watchdog{peer = TPid} + = Wd, + #peer{apps = SApps, caps = Caps} + = Pr, + #state{watchdogT = WatchdogT, + local_peers = LDict, + service_name = SvcName, + service = #diameter_service{applications = Apps}} + = S) -> + insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}), + diameter_traffic:peer_up(TPid), insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), - report_status(up, P, C, S, T). + report_status(up, Wd, Pr, S, Extra). insert_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps). @@ -1052,13 +912,57 @@ ilp({Id, Alias}, {TC, SA}, LDict) -> ?Dict:append(Alias, TC, LDict). init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) -> - #diameter_app{module = ModX, - id = Id} %% assert + #diameter_app{id = Id} %% assert + = App = find_app(Alias, Apps), - peer_cb({ModX, peer_up, [SvcName, TC]}, Alias) + peer_cb(App, peer_up, [SvcName, TC]) orelse exit(TPid, kill). %% fake transport failure +%% --------------------------------------------------------------------------- +%% # find_incoming_app/3 +%% --------------------------------------------------------------------------- + +%% No one should be sending the relay identifier. +find_incoming_app(#peer{caps = Caps}, ?APP_ID_RELAY, _) -> + Caps; + +find_incoming_app(Peer, Id, Apps) + when is_integer(Id) -> + find_incoming_app(Peer, [Id, ?APP_ID_RELAY], Apps); + +%% Note that the apps represented in SApps may be a strict subset of +%% those in Apps. +find_incoming_app(#peer{apps = SApps, caps = Caps}, Ids, Apps) -> + case keyfind(Ids, 1, SApps) of + {_Id, Alias} -> + {#diameter_app{} = find_app(Alias, Apps), Caps}; + false -> + Caps + end. + +%% keyfind/3 + +keyfind([], _, _) -> + false; +keyfind([Key | Rest], Pos, L) -> + case lists:keyfind(Key, Pos, L) of + false -> + keyfind(Rest, Pos, L); + T -> + T + end. + +%% find_outgoing_app/2 + +find_outgoing_app(Alias, Apps) -> + case find_app(Alias, Apps) of + #diameter_app{id = ?APP_ID_RELAY} -> + false; + A -> + A + end. + %% find_app/2 find_app(Alias, Apps) -> @@ -1066,53 +970,51 @@ find_app(Alias, Apps) -> %% Don't bring down the service (and all associated connections) %% regardless of what happens. -peer_cb(MFA, Alias) -> - try state_cb(MFA, Alias) of +peer_cb(App, F, A) -> + try state_cb(App, F, A) of ModS -> - mod_state(Alias, ModS), + mod_state(App#diameter_app.alias, ModS), true catch E:R -> - diameter_lib:error_report({failure, {E, R, Alias, ?STACK}}, MFA), + diameter_lib:error_report({failure, {E, R, ?STACK}}, + {App, F, A}), false end. -%%% --------------------------------------------------------------------------- -%%% # connection_down/2 -%%% --------------------------------------------------------------------------- - -%% Watchdog has transitioned out of state OKAY. - -connection_down(Pid, #state{peerT = PeerT, - connT = ConnT} - = S) -> - #peer{op_state = {?STATE_UP, WS}, %% assert - conn = TPid} - = P - = fetch(PeerT, Pid), +%% --------------------------------------------------------------------------- +%% # connection_down/3 +%% --------------------------------------------------------------------------- - C = fetch(ConnT, TPid), - insert(PeerT, P#peer{op_state = {?STATE_DOWN, WS}}), - connection_down(P,C,S). - -%% connection_down/3 - -connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, _) -> - ok; - -connection_down(#peer{conn = TPid, - op_state = {?STATE_UP, _}} - = P, - #conn{caps = Caps, +connection_down(#watchdog{state = ?WD_OKAY, + peer = TPid} + = Wd, + #peer{caps = Caps, apps = SApps} - = C, + = Pr, #state{service_name = SvcName, service = #diameter_service{applications = Apps}, local_peers = LDict} = S) -> - report_status(down, P, C, S, []), + report_status(down, Wd, Pr, S, []), remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), - request_peer_down(TPid, S). + diameter_traffic:peer_down(TPid); + +connection_down(#watchdog{}, #peer{}, _) -> + ok; + +connection_down(#watchdog{state = WS, + peer = TPid} + = Wd, + To, + #state{watchdogT = WatchdogT, + peerT = PeerT} + = S) + when is_atom(To) -> + insert(WatchdogT, Wd#watchdog{state = To}), + ?WD_OKAY == WS + andalso + connection_down(Wd, fetch(PeerT, TPid), S). remove_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). @@ -1123,71 +1025,58 @@ rlp({Id, Alias}, {TC, SA}, LDict) -> ?Dict:store(Alias, lists:delete(TC, L), LDict). down_conn(Id, Alias, TC, {SvcName, Apps}) -> - #diameter_app{module = ModX, - id = Id} %% assert + #diameter_app{id = Id} %% assert + = App = find_app(Alias, Apps), - peer_cb({ModX, peer_down, [SvcName, TC]}, Alias). + peer_cb(App, peer_down, [SvcName, TC]). -%%% --------------------------------------------------------------------------- -%%% # peer_down/3 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # watchdog_down/2 +%% --------------------------------------------------------------------------- %% Watchdog process has died. -peer_down(Pid, Reason, #state{peerT = PeerT} = S) -> - P = fetch(PeerT, Pid), - ets:delete_object(PeerT, P), - closed(Reason, P, S), - restart(P,S), - peer_down(P,S). - -%% Send an event at connection establishment failure. -closed({shutdown, {close, _TPid, Reason}}, - #peer{op_state = {?STATE_DOWN, _}, - ref = Ref, - type = Type, - options = Opts}, - #state{service_name = SvcName}) -> - send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}); -closed(_, _, _) -> - ok. +watchdog_down(Pid, #state{watchdogT = WatchdogT} = S) -> + Wd = fetch(WatchdogT, Pid), + ets:delete_object(WatchdogT, Wd), + restart(Wd,S), + wd_down(Wd,S). -%% The watchdog has never reached OKAY ... -peer_down(#peer{conn = B}, _) +%% Watchdog has never reached OKAY ... +wd_down(#watchdog{peer = B}, _) when is_boolean(B) -> ok; %% ... or maybe it has. -peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) -> - #conn{} = C = fetch(ConnT, TPid), - ets:delete_object(ConnT, C), - connection_down(P,C,S). +wd_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> + connection_down(Wd, ?WD_DOWN, S), + ets:delete(PeerT, TPid). %% restart/2 -restart(P,S) -> - q_restart(restart(P), S). +restart(Wd, S) -> + q_restart(restart(Wd), S). %% restart/1 %% Always try to reconnect. -restart(#peer{ref = Ref, - type = connect = T, - options = Opts, - started = Time}) -> +restart(#watchdog{ref = Ref, + type = connect = T, + options = Opts, + started = Time}) -> {Time, {Ref, T, Opts}}; %% Transport connection hasn't yet been accepted ... -restart(#peer{ref = Ref, - type = accept = T, - options = Opts, - conn = false, - started = Time}) -> +restart(#watchdog{ref = Ref, + type = accept = T, + options = Opts, + peer = false, + started = Time}) -> {Time, {Ref, T, Opts}}; %% ... or it has: a replacement has already been spawned. -restart(#peer{type = accept}) -> +restart(#watchdog{type = accept}) -> false. %% q_restart/2 @@ -1237,9 +1126,9 @@ tc(true, {Ref, Type, Opts}, #state{service_name = SvcName} tc(false = No, _, _) -> %% removed No. -%%% --------------------------------------------------------------------------- -%%% # close/2 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # close/2 +%% --------------------------------------------------------------------------- %% The watchdog doesn't start a new fsm in the accept case, it %% simply stays alive until someone tells it to die in order for @@ -1248,14 +1137,13 @@ tc(false = No, _, _) -> %% removed %% the accepting watchdog upon reception of a CER from the previously %% connected peer, or us after reconnect_timer timeout. -close(Pid, #state{service_name = SvcName, - peerT = PeerT}) -> - #peer{pid = Pid, - type = accept, - ref = Ref, - options = Opts} - = fetch(PeerT, Pid), - +close(#watchdog{type = connect}, _) -> + ok; +close(#watchdog{type = accept, + pid = Pid, + ref = Ref, + options = Opts}, + #state{service_name = SvcName}) -> c(Pid, diameter_config:have_transport(SvcName, Ref), Opts). %% Tell watchdog to (maybe) die later ... @@ -1273,21 +1161,21 @@ c(Pid, false, _Opts) -> %% which a new connection attempt is expected of a connecting peer. %% The value should be greater than the peer's Tc + jitter. -%%% --------------------------------------------------------------------------- -%%% # reconnect/2 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # reconnect/2 +%% --------------------------------------------------------------------------- reconnect(Pid, #state{service_name = SvcName, - peerT = PeerT}) -> - #peer{ref = Ref, - type = connect, - options = Opts} - = fetch(PeerT, Pid), + watchdogT = WatchdogT}) -> + #watchdog{ref = Ref, + type = connect, + options = Opts} + = fetch(WatchdogT, Pid), send_event(SvcName, {reconnect, Ref, Opts}). -%%% --------------------------------------------------------------------------- -%%% # call_module/4 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # call_module/4 +%% --------------------------------------------------------------------------- %% Backwards compatibility and never documented/advertised. May be %% removed. @@ -1309,10 +1197,10 @@ call_module(Mod, Req, From, #state{service {reply, {error, Reason}, S} end. -cm([#diameter_app{module = ModX, alias = Alias}], Req, From, Svc) -> - MFA = {ModX, handle_call, [Req, From, Svc]}, +cm([#diameter_app{alias = Alias} = App], Req, From, Svc) -> + Args = [Req, From, Svc], - try state_cb(MFA, Alias) of + try state_cb(App, handle_call, Args) of {noreply = T, ModS} -> mod_state(Alias, ModS), T; @@ -1320,11 +1208,13 @@ cm([#diameter_app{module = ModX, alias = Alias}], Req, From, Svc) -> mod_state(Alias, ModS), {T, RC}; T -> - diameter_lib:error_report({invalid, T}, MFA), + diameter_lib:error_report({invalid, T}, + {App, handle_call, Args}), invalid catch E: Reason -> - diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA), + diameter_lib:error_report({failure, {E, Reason, ?STACK}}, + {App, handle_call, Args}), failure end; @@ -1334,1270 +1224,16 @@ cm([], _, _, _) -> cm([_,_|_], _, _, _) -> multiple. -%%% --------------------------------------------------------------------------- -%%% # send_request/6 -%%% --------------------------------------------------------------------------- - -%% Send an outgoing request in its dedicated process. -%% -%% Note that both encode of the outgoing request and of the received -%% answer happens in this process. It's also this process that replies -%% to the caller. The service process only handles the state-retaining -%% callbacks. -%% -%% The mod field of the #diameter_app{} here includes any extra -%% arguments passed to diameter:call/2. - -send_request({TPid, Caps, App} = T, Mask, Msg, Opts, Caller, SvcName) -> - #diameter_app{module = ModX} - = App, - - Pkt = make_prepare_packet(Mask, Msg), - - send_req(cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]), - Pkt, - T, - Opts, - Caller, - SvcName, - []). - -send_req({send, P}, Pkt, T, Opts, Caller, SvcName, Fs) -> - send_req(make_request_packet(P, Pkt), T, Opts, Caller, SvcName, Fs); - -send_req({discard, Reason} , _, _, _, _, _, _) -> - {error, Reason}; - -send_req(discard, _, _, _, _, _, _) -> - {error, discarded}; - -send_req({eval_packet, RC, F}, Pkt, T, Opts, Caller, SvcName, Fs) -> - send_req(RC, Pkt, T, Opts, Caller, SvcName, [F|Fs]); - -send_req(E, _, {_, _, App}, _, _, _, _) -> - ?ERROR({invalid_return, prepare_request, App, E}). - -%% make_prepare_packet/2 -%% -%% Turn an outgoing request as passed to call/4 into a diameter_packet -%% record in preparation for a prepare_request callback. - -make_prepare_packet(_, Bin) - when is_binary(Bin) -> - #diameter_packet{header = diameter_codec:decode_header(Bin), - bin = Bin}; - -make_prepare_packet(Mask, #diameter_packet{msg = [#diameter_header{} = Hdr - | Avps]} - = Pkt) -> - Pkt#diameter_packet{msg = [make_prepare_header(Mask, Hdr) | Avps]}; - -make_prepare_packet(Mask, #diameter_packet{header = Hdr} = Pkt) -> - Pkt#diameter_packet{header = make_prepare_header(Mask, Hdr)}; - -make_prepare_packet(Mask, Msg) -> - make_prepare_packet(Mask, #diameter_packet{msg = Msg}). - -%% make_prepare_header/2 - -make_prepare_header(Mask, undefined) -> - Seq = diameter_session:sequence(Mask), - make_prepare_header(#diameter_header{end_to_end_id = Seq, - hop_by_hop_id = Seq}); - -make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined, - hop_by_hop_id = undefined} - = H) -> - Seq = diameter_session:sequence(Mask), - make_prepare_header(H#diameter_header{end_to_end_id = Seq, - hop_by_hop_id = Seq}); - -make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined} = H) -> - Seq = diameter_session:sequence(Mask), - make_prepare_header(H#diameter_header{end_to_end_id = Seq}); - -make_prepare_header(Mask, #diameter_header{hop_by_hop_id = undefined} = H) -> - Seq = diameter_session:sequence(Mask), - make_prepare_header(H#diameter_header{hop_by_hop_id = Seq}); - -make_prepare_header(_, Hdr) -> - make_prepare_header(Hdr). - -%% make_prepare_header/1 - -make_prepare_header(#diameter_header{version = undefined} = Hdr) -> - make_prepare_header(Hdr#diameter_header{version = ?DIAMETER_VERSION}); - -make_prepare_header(#diameter_header{} = Hdr) -> - Hdr; - -make_prepare_header(T) -> - ?ERROR({invalid_header, T}). - -%% make_request_packet/2 -%% -%% Reconstruct a diameter_packet from the return value of -%% prepare_request or prepare_retransmit callback. - -make_request_packet(Bin, _) - when is_binary(Bin) -> - make_prepare_packet(false, Bin); - -make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]} - = Pkt, - _) -> - Pkt; - -%% Returning a diameter_packet with no header from a prepare_request -%% or prepare_retransmit callback retains the header passed into it. -%% This is primarily so that the end to end and hop by hop identifiers -%% are retained. -make_request_packet(#diameter_packet{header = Hdr} = Pkt, - #diameter_packet{header = Hdr0}) -> - Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)}; - -make_request_packet(Msg, Pkt) -> - Pkt#diameter_packet{msg = Msg}. - -%% fold_record/2 - -fold_record(undefined, R) -> - R; -fold_record(Rec, R) -> - diameter_lib:fold_tuple(2, Rec, R). - -%% send_req/6 - -send_req(Pkt, {TPid, Caps, App}, Opts, Caller, SvcName, Fs) -> - #diameter_app{alias = Alias, - dictionary = Dict, - module = ModX, - options = [{answer_errors, AE} | _]} - = App, - - EPkt = encode(Dict, Pkt, Fs), - - #options{filter = Filter, - timeout = Timeout} - = Opts, - - Req = #request{packet = Pkt, - from = Caller, - handler = self(), - transport = TPid, - caps = Caps, - app = Alias, - filter = Filter, - dictionary = Dict, - module = ModX}, - - try - TRef = send_request(TPid, EPkt, Req, Timeout), - ack(Caller), - handle_answer(SvcName, AE, recv_answer(Timeout, SvcName, {TRef, Req})) - after - erase_request(EPkt) - end. - -%% Tell caller a send has been attempted. -ack({Pid, Ref}) -> - Pid ! Ref. - -%% recv_answer/3 - -recv_answer(Timeout, - SvcName, - {TRef, #request{from = {_, Ref}, packet = RPkt} = Req} - = T) -> - - %% Matching on TRef below ensures we ignore messages that pertain - %% to a previous transport prior to failover. The answer message - %% includes the #request{} since it's not necessarily Req; that - %% is, from the last peer to which we've transmitted. - - receive - {answer = A, Ref, Rq, Pkt} -> %% Answer from peer - {A, Rq, Pkt}; - {timeout = Reason, TRef, _} -> %% No timely reply - {error, Req, Reason}; - {failover = Reason, TRef, false} -> %% No alternate peer - {error, Req, Reason}; - {failover, TRef, Transport} -> %% Resend to alternate peer - try_retransmit(Timeout, SvcName, Req, Transport); - {failover, TRef} -> %% May have missed failover notification - Seqs = diameter_codec:sequence_numbers(RPkt), - Pid = whois(SvcName), - is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}), - recv_answer(Timeout, SvcName, T) - end. -%% Note that failover starts a new timer and that expiry of an old -%% timer value is ignored. This means that an answer could be accepted -%% from a peer after timeout in the case of failover. - -try_retransmit(Timeout, SvcName, Req, Transport) -> - try retransmit(Transport, Req, SvcName, Timeout) of - T -> recv_answer(Timeout, SvcName, T) - catch - ?FAILURE(Reason) -> {error, Req, Reason} - end. - -%% handle_error/3 - -handle_error(Req, Reason, SvcName) -> - #request{module = ModX, - packet = Pkt, - transport = TPid, - caps = Caps} - = Req, - cb(ModX, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]). - -msg(#diameter_packet{msg = undefined, bin = Bin}) -> - Bin; -msg(#diameter_packet{msg = Msg}) -> - Msg. - -%% encode/3 - -encode(Dict, Pkt, Fs) -> - P = encode(Dict, Pkt), - eval_packet(P, Fs), - P. - -%% encode/2 - -%% Note that prepare_request can return a diameter_packet containing -%% header or transport_data. Even allow the returned record to contain -%% an encoded binary. This isn't the usual case but could some in -%% handy, for test at least. (For example, to send garbage.) - -%% The normal case: encode the returned message. -encode(Dict, #diameter_packet{msg = Msg, bin = undefined} = Pkt) -> - D = pick_dictionary([Dict, ?BASE], Msg), - diameter_codec:encode(D, Pkt); - -%% Callback has returned an encoded binary: just send. -encode(_, #diameter_packet{} = Pkt) -> - Pkt. - -%% pick_dictionary/2 - -%% Pick the first dictionary that declares the application id in the -%% specified header. -pick_dictionary(Ds, [#diameter_header{application_id = Id} | _]) -> - pd(Ds, fun(D) -> Id = D:id() end); - -%% Pick the first dictionary that knows the specified message name. -pick_dictionary(Ds, [MsgName|_]) -> - pd(Ds, fun(D) -> D:msg2rec(MsgName) end); - -%% Pick the first dictionary that knows the name of the specified -%% message record. -pick_dictionary(Ds, Rec) -> - Name = element(1, Rec), - pd(Ds, fun(D) -> D:rec2msg(Name) end). - -pd([D|Ds], F) -> - try - F(D), - D - catch - error:_ -> - pd(Ds, F) - end; - -pd([], _) -> - ?ERROR(no_dictionary). - -%% send_request/4 - -send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, Timeout) - when node() == node(TPid) -> - %% Store the outgoing request before sending to avoid a race with - %% reply reception. - TRef = store_request(TPid, Bin, Req, Timeout), - send(TPid, Pkt), - TRef; - -%% Send using a remote transport: spawn a process on the remote node -%% to relay the answer. -send_request(TPid, #diameter_packet{} = Pkt, Req, Timeout) -> - TRef = erlang:start_timer(Timeout, self(), timeout), - T = {TPid, Pkt, Req, Timeout, TRef}, - spawn(node(TPid), ?MODULE, send, [T]), - TRef. - -%% send/1 - -send({TPid, Pkt, #request{handler = Pid} = Req, Timeout, TRef}) -> - Ref = send_request(TPid, Pkt, Req#request{handler = self()}, Timeout), - Pid ! reref(receive T -> T end, Ref, TRef). - -reref({T, Ref, R}, Ref, TRef) -> - {T, TRef, R}; -reref(T, _, _) -> - T. - -%% send/2 - -send(Pid, Pkt) -> - Pid ! {send, Pkt}. - -%% retransmit/4 - -retransmit({TPid, Caps, #diameter_app{alias = Alias} = App} = T, - #request{app = Alias, packet = Pkt0} - = Req, - SvcName, - Timeout) -> - have_request(Pkt0, TPid) %% Don't failover to a peer we've - andalso ?THROW(timeout), %% already sent to. - - #diameter_packet{header = Hdr0} = Pkt0, - Hdr = Hdr0#diameter_header{is_retransmitted = true}, - Pkt = Pkt0#diameter_packet{header = Hdr}, - - resend_req(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]), - T, - Req#request{packet = Pkt}, - Timeout, - []). - -resend_req({send, P}, T, #request{packet = Pkt} = Req, Timeout, Fs) -> - retransmit(make_request_packet(P, Pkt), T, Req, Timeout, Fs); - -resend_req({discard, Reason}, _, _, _, _) -> - ?THROW(Reason); - -resend_req(discard, _, _, _, _) -> - ?THROW(discarded); - -resend_req({eval_packet, RC, F}, T, Req, Timeout, Fs) -> - resend_req(RC, T, Req, Timeout, [F|Fs]); - -resend_req(T, {_, _, App}, _, _, _) -> - ?ERROR({invalid_return, prepare_retransmit, App, T}). - -%% retransmit/6 - -retransmit(Pkt, {TPid, Caps, _}, #request{dictionary = D} = Req0, Tmo, Fs) -> - EPkt = encode(D, Pkt, Fs), - - Req = Req0#request{transport = TPid, - packet = Pkt, - caps = Caps}, - - ?LOG(retransmission, Req), - TRef = send_request(TPid, EPkt, Req, Tmo), - {TRef, Req}. - -%% store_request/4 - -store_request(TPid, Bin, Req, Timeout) -> - Seqs = diameter_codec:sequence_numbers(Bin), - TRef = erlang:start_timer(Timeout, self(), timeout), - ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}), - ets:member(?REQUEST_TABLE, TPid) - orelse (self() ! {failover, TRef}), %% possibly missed failover - TRef. - -%% lookup_request/2 - -lookup_request(Msg, TPid) - when is_pid(TPid) -> - lookup(Msg, TPid, '_'); - -lookup_request(Msg, TRef) - when is_reference(TRef) -> - lookup(Msg, '_', TRef). - -lookup(Msg, TPid, TRef) -> - Seqs = diameter_codec:sequence_numbers(Msg), - Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, TRef}, - [], - ['$_']}], - case ets:select(?REQUEST_TABLE, Spec) of - [{_, Req, _}] -> - Req; - [] -> - false - end. - -%% erase_request/1 - -erase_request(Pkt) -> - ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)). - -%% match_requests/1 - -match_requests(TPid) -> - Pat = {'_', #request{transport = TPid, _ = '_'}, '_'}, - ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}]). - -%% have_request/2 - -have_request(Pkt, TPid) -> - Seqs = diameter_codec:sequence_numbers(Pkt), - Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'}, - '$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1). - -%% request_peer_up/1 - -request_peer_up(TPid) -> - ets:insert(?REQUEST_TABLE, {TPid}). - -%% request_peer_down/2 - -request_peer_down(TPid, S) -> - ets:delete(?REQUEST_TABLE, TPid), - lists:foreach(fun(T) -> failover(T,S) end, match_requests(TPid)). -%% Note that a request process can store its request after failover -%% notifications are sent here: store_request/4 sends the notification -%% in that case. Note also that we'll send as many notifications to a -%% given handler as there are peers its sent to. All but one of these -%% will be ignored. - -%%% --------------------------------------------------------------------------- -%%% recv_request/3 -%%% --------------------------------------------------------------------------- - -recv_request(TPid, Pkt, {ConnT, SvcName, Apps, Mask}) -> - try ets:lookup(ConnT, TPid) of - [C] -> - recv_request(C, TPid, Pkt, SvcName, Apps, Mask); - [] -> %% transport has gone down - ok - catch - error: badarg -> %% service has gone down (and taken table with it) - ok - end. - -%% recv_request/5 - -recv_request(#conn{apps = SApps, caps = Caps}, - TPid, - Pkt, - SvcName, - Apps, - Mask) -> - #diameter_caps{origin_host = {OH,_}, - origin_realm = {OR,_}} - = Caps, - - #diameter_packet{header = #diameter_header{application_id = Id}} - = Pkt, - - recv_request(find_recv_app(Id, SApps), - {SvcName, OH, OR}, - TPid, - Apps, - Mask, - Caps, - Pkt). - -%% find_recv_app/2 - -%% No one should be sending the relay identifier. -find_recv_app(?APP_ID_RELAY, _) -> - false; - -%% With any other id we either support it locally or as a relay. -find_recv_app(Id, SApps) -> - keyfind([Id, ?APP_ID_RELAY], 1, SApps). - -%% keyfind/3 - -keyfind([], _, _) -> - false; -keyfind([Key | Rest], Pos, L) -> - case lists:keyfind(Key, Pos, L) of - false -> - keyfind(Rest, Pos, L); - T -> - T - end. - -%% recv_request/7 - -recv_request({Id, Alias}, T, TPid, Apps, Mask, Caps, Pkt) -> - #diameter_app{dictionary = Dict} - = A - = find_app(Alias, Apps), - recv_request(T, - {TPid, Caps}, - A, - Mask, - diameter_codec:decode(Id, Dict, Pkt)); -%% Note that the decode is different depending on whether or not Id is -%% ?APP_ID_RELAY. - -%% DIAMETER_APPLICATION_UNSUPPORTED 3007 -%% A request was sent for an application that is not supported. - -recv_request(false, T, TPid, _, _, _, Pkt) -> - As = collect_avps(Pkt), - protocol_error(3007, T, TPid, Pkt#diameter_packet{avps = As}). - -collect_avps(Pkt) -> - case diameter_codec:collect_avps(Pkt) of - {_Bs, As} -> - As; - As -> - As - end. - -%% recv_request/5 - -%% Wrong number of bits somewhere in the message: reply. -%% -%% DIAMETER_INVALID_AVP_BITS 3009 -%% A request was received that included an AVP whose flag bits are -%% set to an unrecognized value, or that is inconsistent with the -%% AVP's definition. -%% -recv_request(T, {TPid, _}, _, _, #diameter_packet{errors = [Bs | _]} = Pkt) - when is_bitstring(Bs) -> - protocol_error(3009, T, TPid, Pkt); - -%% Either we support this application but don't recognize the command -%% or we're a relay and the command isn't proxiable. -%% -%% DIAMETER_COMMAND_UNSUPPORTED 3001 -%% The Request contained a Command-Code that the receiver did not -%% recognize or support. This MUST be used when a Diameter node -%% receives an experimental command that it does not understand. -%% -recv_request(T, - {TPid, _}, - #diameter_app{id = Id}, - _, - #diameter_packet{header = #diameter_header{is_proxiable = P}, - msg = M} - = Pkt) - when ?APP_ID_RELAY /= Id, undefined == M; - ?APP_ID_RELAY == Id, not P -> - protocol_error(3001, T, TPid, Pkt); - -%% Error bit was set on a request. -%% -%% DIAMETER_INVALID_HDR_BITS 3008 -%% A request was received whose bits in the Diameter header were -%% either set to an invalid combination, or to a value that is -%% inconsistent with the command code's definition. -%% -recv_request(T, - {TPid, _}, - _, - _, - #diameter_packet{header = #diameter_header{is_error = true}} - = Pkt) -> - protocol_error(3008, T, TPid, Pkt); - -%% A message in a locally supported application or a proxiable message -%% in the relay application. Don't distinguish between the two since -%% each application has its own callback config. That is, the user can -%% easily distinguish between the two cases. -recv_request(T, TC, App, Mask, Pkt) -> - request_cb(T, TC, App, Mask, examine(Pkt)). - -%% Note that there may still be errors but these aren't protocol -%% (3xxx) errors that lead to an answer-message. - -request_cb({SvcName, _OH, _OR} = T, TC, App, Mask, Pkt) -> - request_cb(cb(App, handle_request, [Pkt, SvcName, TC]), - App, - Mask, - T, - TC, - [], - Pkt). - -%% examine/1 -%% -%% Look for errors in a decoded message. Length errors result in -%% decode failure in diameter_codec. - -examine(#diameter_packet{header = #diameter_header{version - = ?DIAMETER_VERSION}} - = Pkt) -> - Pkt; - -%% DIAMETER_UNSUPPORTED_VERSION 5011 -%% This error is returned when a request was received, whose version -%% number is unsupported. - -examine(#diameter_packet{errors = Es} = Pkt) -> - Pkt#diameter_packet{errors = [5011 | Es]}. -%% It's odd/unfortunate that this isn't a protocol error. - -%% request_cb/7 - -%% A reply may be an answer-message, constructed either here or by -%% the handle_request callback. The header from the incoming request -%% is passed into the encode so that it can retrieve the relevant -%% command code in this case. It will also then ignore Dict and use -%% the base encoder. -request_cb({reply, Ans}, - #diameter_app{dictionary = Dict}, - _, - _, - {TPid, _}, - Fs, - Pkt) -> - reply(Ans, Dict, TPid, Fs, Pkt); - -%% An 3xxx result code, for which the E-bit is set in the header. -request_cb({protocol_error, RC}, _, _, T, {TPid, _}, Fs, Pkt) - when 3000 =< RC, RC < 4000 -> - protocol_error(RC, T, TPid, Fs, Pkt); - -%% RFC 3588 says we must reply 3001 to anything unrecognized or -%% unsupported. 'noreply' is undocumented (and inappropriately named) -%% backwards compatibility for this, protocol_error the documented -%% alternative. -request_cb(noreply, _, _, T, {TPid, _}, Fs, Pkt) -> - protocol_error(3001, T, TPid, Fs, Pkt); - -%% Relay a request to another peer. This is equivalent to doing an -%% explicit call/4 with the message in question except that (1) a loop -%% will be detected by examining Route-Record AVP's, (3) a -%% Route-Record AVP will be added to the outgoing request and (3) the -%% End-to-End Identifier will default to that in the -%% #diameter_header{} without the need for an end_to_end_identifier -%% option. -%% -%% relay and proxy are similar in that they require the same handling -%% with respect to Route-Record and End-to-End identifier. The -%% difference is that a proxy advertises specific applications, while -%% a relay advertises the relay application. If a callback doesn't -%% want to distinguish between the cases in the callback return value -%% then 'resend' is a neutral alternative. -%% -request_cb({A, Opts}, - #diameter_app{id = Id} - = App, - Mask, - T, - TC, - Fs, - Pkt) - when A == relay, Id == ?APP_ID_RELAY; - A == proxy, Id /= ?APP_ID_RELAY; - A == resend -> - resend(Opts, App, Mask, T, TC, Fs, Pkt); - -request_cb(discard, _, _, _, _, _, _) -> - ok; - -request_cb({eval_packet, RC, F}, App, Mask, T, TC, Fs, Pkt) -> - request_cb(RC, App, Mask, T, TC, [F|Fs], Pkt); - -request_cb({eval, RC, F}, App, Mask, T, TC, Fs, Pkt) -> - request_cb(RC, App, Mask, T, TC, Fs, Pkt), - diameter_lib:eval(F). - -%% protocol_error/5 - -protocol_error(RC, {_, OH, OR}, TPid, Fs, Pkt) -> - #diameter_packet{avps = Avps, errors = Es} = Pkt, - ?LOG({error, RC}, Pkt), - reply(answer_message({OH, OR, RC}, Avps), - ?BASE, - TPid, - Fs, - Pkt#diameter_packet{errors = [RC | Es]}). -%% Note that reply/5 may set the result code once more. It's set in -%% answer_message/2 in case reply/5 doesn't. - -%% protocol_error/4 - -protocol_error(RC, T, TPid, Pkt) -> - protocol_error(RC, T, TPid, [], Pkt). - -%% resend/7 -%% -%% Resend a message as a relay or proxy agent. - -resend(Opts, - #diameter_app{} = App, - Mask, - {_SvcName, OH, _OR} = T, - {_TPid, _Caps} = TC, - Fs, - #diameter_packet{avps = Avps} = Pkt) -> - {Code, _Flags, Vid} = ?BASE:avp_header('Route-Record'), - resend(is_loop(Code, Vid, OH, Avps), Opts, App, Mask, T, TC, Fs, Pkt). - -%% DIAMETER_LOOP_DETECTED 3005 -%% An agent detected a loop while trying to get the message to the -%% intended recipient. The message MAY be sent to an alternate peer, -%% if one is available, but the peer reporting the error has -%% identified a configuration problem. - -resend(true, _, _, _, T, {TPid, _}, Fs, Pkt) -> %% Route-Record loop - protocol_error(3005, T, TPid, Fs, Pkt); - -%% 6.1.8. Relaying and Proxying Requests -%% -%% A relay or proxy agent MUST append a Route-Record AVP to all requests -%% forwarded. The AVP contains the identity of the peer the request was -%% received from. - -resend(false, - Opts, - App, - Mask, - {SvcName, _, _} = T, - {TPid, #diameter_caps{origin_host = {_, OH}}}, - Fs, - #diameter_packet{header = Hdr0, - avps = Avps} - = Pkt) -> - Route = #diameter_avp{data = {?BASE, 'Route-Record', OH}}, - Seq = diameter_session:sequence(Mask), - Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq}, - Msg = [Hdr, Route | Avps], - resend(call(SvcName, App, Msg, Opts), T, TPid, Fs, Pkt). -%% The incoming request is relayed with the addition of a -%% Route-Record. Note the requirement on the return from call/4 below, -%% which places a requirement on the value returned by the -%% handle_answer callback of the application module in question. -%% -%% Note that there's nothing stopping the request from being relayed -%% back to the sender. A pick_peer callback may want to avoid this but -%% a smart peer might recognize the potential loop and choose another -%% route. A less smart one will probably just relay the request back -%% again and force us to detect the loop. A pick_peer that wants to -%% avoid this can specify filter to avoid the possibility. -%% Eg. {neg, {host, OH} where #diameter_caps{origin_host = {OH, _}}. -%% -%% RFC 6.3 says that a relay agent does not modify Origin-Host but -%% says nothing about a proxy. Assume it should behave the same way. - -%% resend/4 -%% -%% Relay a reply to a relayed request. - -%% Answer from the peer: reset the hop by hop identifier and send. -resend(#diameter_packet{bin = B} - = Pkt, - _, - TPid, - Fs, - #diameter_packet{header = #diameter_header{hop_by_hop_id = Id}, - transport_data = TD}) -> - P = Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B), - transport_data = TD}, - eval_packet(P, Fs), - send(TPid, P); -%% TODO: counters - -%% Or not: DIAMETER_UNABLE_TO_DELIVER. -resend(_, T, TPid, Fs, Pkt) -> - protocol_error(3002, T, TPid, Fs, Pkt). - -%% is_loop/4 -%% -%% Is there a Route-Record AVP with our Origin-Host? - -is_loop(Code, - Vid, - Bin, - [#diameter_avp{code = Code, vendor_id = Vid, data = Bin} | _]) -> - true; - -is_loop(_, _, _, []) -> - false; - -is_loop(Code, Vid, OH, [_ | Avps]) - when is_binary(OH) -> - is_loop(Code, Vid, OH, Avps); - -is_loop(Code, Vid, OH, Avps) -> - is_loop(Code, Vid, ?BASE:avp(encode, OH, 'Route-Record'), Avps). - -%% reply/5 -%% -%% Send a locally originating reply. - -%% Skip the setting of Result-Code and Failed-AVP's below. This is -%% currently undocumented. -reply([Msg], Dict, TPid, Fs, Pkt) - when is_list(Msg); - is_tuple(Msg) -> - reply(Msg, Dict, TPid, Fs, Pkt#diameter_packet{errors = []}); - -%% No errors or a diameter_header/avp list. -reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = Es} = ReqPkt) - when [] == Es; - is_record(hd(Msg), diameter_header) -> - Pkt = diameter_codec:encode(Dict, make_answer_packet(Msg, ReqPkt)), - eval_packet(Pkt, Fs), - incr(send, Pkt, Dict, TPid), %% count result codes in sent answers - send(TPid, Pkt); - -%% Or not: set Result-Code and Failed-AVP AVP's. -reply(Msg, Dict, TPid, Fs, #diameter_packet{errors = [H|_] = Es} = Pkt) -> - reply(rc(Msg, rc(H), [A || {_,A} <- Es], Dict), - Dict, - TPid, - Fs, - Pkt#diameter_packet{errors = []}). - -eval_packet(Pkt, Fs) -> - lists:foreach(fun(F) -> diameter_lib:eval([F,Pkt]) end, Fs). - -%% make_answer_packet/2 - -%% A reply message clears the R and T flags and retains the P flag. -%% The E flag will be set at encode. 6.2 of 3588 requires the same P -%% flag on an answer as on the request. A #diameter_packet{} returned -%% from a handle_request callback can circumvent this by setting its -%% own header values. -make_answer_packet(#diameter_packet{header = Hdr, - msg = Msg, - transport_data = TD}, - #diameter_packet{header = ReqHdr}) -> - Hdr0 = ReqHdr#diameter_header{version = ?DIAMETER_VERSION, - is_request = false, - is_error = undefined, - is_retransmitted = false}, - #diameter_packet{header = fold_record(Hdr0, Hdr), - msg = Msg, - transport_data = TD}; - -%% Binaries and header/avp lists are sent as-is. -make_answer_packet(Bin, #diameter_packet{transport_data = TD}) - when is_binary(Bin) -> - #diameter_packet{bin = Bin, - transport_data = TD}; -make_answer_packet([#diameter_header{} | _] = Msg, - #diameter_packet{transport_data = TD}) -> - #diameter_packet{msg = Msg, - transport_data = TD}; - -%% Otherwise, preserve transport_data. -make_answer_packet(Msg, #diameter_packet{transport_data = TD} = Pkt) -> - make_answer_packet(#diameter_packet{msg = Msg, transport_data = TD}, Pkt). - -%% rc/1 - -rc({RC, _}) -> - RC; -rc(RC) -> - RC. - -%% rc/4 - -rc(#diameter_packet{msg = Rec} = Pkt, RC, Failed, Dict) -> - Pkt#diameter_packet{msg = rc(Rec, RC, Failed, Dict)}; - -rc(Rec, RC, Failed, Dict) - when is_integer(RC) -> - set(Rec, - lists:append([rc(Rec, {'Result-Code', RC}, Dict), - failed_avp(Rec, Failed, Dict)]), - Dict). - -%% Reply as name and tuple list ... -set([_|_] = Ans, Avps, _) -> - Ans ++ Avps; %% Values nearer tail take precedence. - -%% ... or record. -set(Rec, Avps, Dict) -> - Dict:'#set-'(Avps, Rec). - -%% rc/3 -%% -%% Turn the result code into a list if its optional and only set it if -%% the arity is 1 or {0,1}. In other cases (which probably shouldn't -%% exist in practise) we can't know what's appropriate. - -rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) -> - case Dict:avp_arity(MsgName, 'Result-Code') of - 1 -> [T]; - {0,1} -> [{K, [RC]}]; - _ -> [] - end; - -rc(Rec, T, Dict) -> - rc([Dict:rec2msg(element(1, Rec))], T, Dict). - -%% failed_avp/3 - -failed_avp(_, [] = No, _) -> - No; - -failed_avp(Rec, Failed, Dict) -> - [fa(Rec, [{'AVP', Failed}], Dict)]. - -%% Reply as name and tuple list ... -fa([MsgName | Values], FailedAvp, Dict) -> - R = Dict:msg2rec(MsgName), - try - Dict:'#info-'(R, {index, 'Failed-AVP'}), - {'Failed-AVP', [FailedAvp]} - catch - error: _ -> - Avps = proplists:get_value('AVP', Values, []), - A = #diameter_avp{name = 'Failed-AVP', - value = FailedAvp}, - {'AVP', [A|Avps]} - end; - -%% ... or record. -fa(Rec, FailedAvp, Dict) -> - try - {'Failed-AVP', [FailedAvp]} - catch - error: _ -> - Avps = Dict:'get-'('AVP', Rec), - A = #diameter_avp{name = 'Failed-AVP', - value = FailedAvp}, - {'AVP', [A|Avps]} - end. - -%% 3. Diameter Header -%% -%% E(rror) - If set, the message contains a protocol error, -%% and the message will not conform to the ABNF -%% described for this command. Messages with the 'E' -%% bit set are commonly referred to as error -%% messages. This bit MUST NOT be set in request -%% messages. See Section 7.2. - -%% 3.2. Command Code ABNF specification -%% -%% e-bit = ", ERR" -%% ; If present, the 'E' bit in the Command -%% ; Flags is set, indicating that the answer -%% ; message contains a Result-Code AVP in -%% ; the "protocol error" class. - -%% 7.1.3. Protocol Errors -%% -%% Errors that fall within the Protocol Error category SHOULD be treated -%% on a per-hop basis, and Diameter proxies MAY attempt to correct the -%% error, if it is possible. Note that these and only these errors MUST -%% only be used in answer messages whose 'E' bit is set. - -%% Thus, only construct answers to protocol errors. Other errors -%% require an message-specific answer and must be handled by the -%% application. - -%% 6.2. Diameter Answer Processing -%% -%% When a request is locally processed, the following procedures MUST be -%% applied to create the associated answer, in addition to any -%% additional procedures that MAY be discussed in the Diameter -%% application defining the command: -%% -%% - The same Hop-by-Hop identifier in the request is used in the -%% answer. -%% -%% - The local host's identity is encoded in the Origin-Host AVP. -%% -%% - The Destination-Host and Destination-Realm AVPs MUST NOT be -%% present in the answer message. -%% -%% - The Result-Code AVP is added with its value indicating success or -%% failure. -%% -%% - If the Session-Id is present in the request, it MUST be included -%% in the answer. -%% -%% - Any Proxy-Info AVPs in the request MUST be added to the answer -%% message, in the same order they were present in the request. -%% -%% - The 'P' bit is set to the same value as the one in the request. -%% -%% - The same End-to-End identifier in the request is used in the -%% answer. -%% -%% Note that the error messages (see Section 7.3) are also subjected to -%% the above processing rules. - -%% 7.3. Error-Message AVP -%% -%% The Error-Message AVP (AVP Code 281) is of type UTF8String. It MAY -%% accompany a Result-Code AVP as a human readable error message. The -%% Error-Message AVP is not intended to be useful in real-time, and -%% SHOULD NOT be expected to be parsed by network entities. - -%% answer_message/2 - -answer_message({OH, OR, RC}, Avps) -> - {Code, _, Vid} = ?BASE:avp_header('Session-Id'), - ['answer-message', {'Origin-Host', OH}, - {'Origin-Realm', OR}, - {'Result-Code', RC} - | session_id(Code, Vid, Avps)]. - -session_id(Code, Vid, Avps) - when is_list(Avps) -> - try - {value, #diameter_avp{data = D}} = find_avp(Code, Vid, Avps), - [{'Session-Id', [?BASE:avp(decode, D, 'Session-Id')]}] - catch - error: _ -> - [] - end. - -%% find_avp/3 - -find_avp(Code, Vid, Avps) - when is_integer(Code), (undefined == Vid orelse is_integer(Vid)) -> - find(fun(A) -> is_avp(Code, Vid, A) end, Avps). - -%% The final argument here could be a list of AVP's, depending on the case, -%% but we're only searching at the top level. -is_avp(Code, Vid, #diameter_avp{code = Code, vendor_id = Vid}) -> - true; -is_avp(_, _, _) -> - false. - -find(_, []) -> - false; -find(Pred, [H|T]) -> - case Pred(H) of - true -> - {value, H}; - false -> - find(Pred, T) - end. - -%% 7. Error Handling -%% -%% There are certain Result-Code AVP application errors that require -%% additional AVPs to be present in the answer. In these cases, the -%% Diameter node that sets the Result-Code AVP to indicate the error -%% MUST add the AVPs. Examples are: -%% -%% - An unrecognized AVP is received with the 'M' bit (Mandatory bit) -%% set, causes an answer to be sent with the Result-Code AVP set to -%% DIAMETER_AVP_UNSUPPORTED, and the Failed-AVP AVP containing the -%% offending AVP. -%% -%% - An AVP that is received with an unrecognized value causes an -%% answer to be returned with the Result-Code AVP set to -%% DIAMETER_INVALID_AVP_VALUE, with the Failed-AVP AVP containing the -%% AVP causing the error. -%% -%% - A command is received with an AVP that is omitted, yet is -%% mandatory according to the command's ABNF. The receiver issues an -%% answer with the Result-Code set to DIAMETER_MISSING_AVP, and -%% creates an AVP with the AVP Code and other fields set as expected -%% in the missing AVP. The created AVP is then added to the Failed- -%% AVP AVP. -%% -%% The Result-Code AVP describes the error that the Diameter node -%% encountered in its processing. In case there are multiple errors, -%% the Diameter node MUST report only the first error it encountered -%% (detected possibly in some implementation dependent order). The -%% specific errors that can be described by this AVP are described in -%% the following section. - -%% 7.5. Failed-AVP AVP -%% -%% The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides -%% debugging information in cases where a request is rejected or not -%% fully processed due to erroneous information in a specific AVP. The -%% value of the Result-Code AVP will provide information on the reason -%% for the Failed-AVP AVP. -%% -%% The possible reasons for this AVP are the presence of an improperly -%% constructed AVP, an unsupported or unrecognized AVP, an invalid AVP -%% value, the omission of a required AVP, the presence of an explicitly -%% excluded AVP (see tables in Section 10), or the presence of two or -%% more occurrences of an AVP which is restricted to 0, 1, or 0-1 -%% occurrences. -%% -%% A Diameter message MAY contain one Failed-AVP AVP, containing the -%% entire AVP that could not be processed successfully. If the failure -%% reason is omission of a required AVP, an AVP with the missing AVP -%% code, the missing vendor id, and a zero filled payload of the minimum -%% required length for the omitted AVP will be added. - -%%% --------------------------------------------------------------------------- -%%% # handle_answer/3 -%%% --------------------------------------------------------------------------- - -%% Process an answer message in call-specific process. - -handle_answer(SvcName, _, {error, Req, Reason}) -> - handle_error(Req, Reason, SvcName); - -handle_answer(SvcName, - AnswerErrors, - {answer, #request{dictionary = Dict} = Req, Pkt}) -> - answer(examine(diameter_codec:decode(Dict, Pkt)), - SvcName, - AnswerErrors, - Req). - -%% We don't really need to do a full decode if we're a relay and will -%% just resend with a new hop by hop identifier, but might a proxy -%% want to examine the answer? - -answer(Pkt, SvcName, AE, #request{transport = TPid, - dictionary = Dict} - = Req) -> - try - incr(recv, Pkt, Dict, TPid) - of - _ -> a(Pkt, SvcName, AE, Req) - catch - exit: {invalid_error_bit, _} = E -> - a(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req) - end. - -a(#diameter_packet{errors = Es} = Pkt, SvcName, AE, #request{transport = TPid, - caps = Caps, - packet = P} - = Req) - when [] == Es; - callback == AE -> - cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]); - -a(Pkt, SvcName, report, Req) -> - x(errors, handle_answer, [SvcName, Req, Pkt]); - -a(Pkt, SvcName, discard, Req) -> - x({errors, handle_answer, [SvcName, Req, Pkt]}). - -%% Note that we don't check that the application id in the answer's -%% header is what we expect. (TODO: Does the rfc says anything about -%% this?) - -%% incr/4 -%% -%% Increment a stats counter for an incoming or outgoing message. - -%% TODO: fix -incr(_, #diameter_packet{msg = undefined}, _, _) -> - ok; - -incr(recv = D, #diameter_packet{header = H, errors = [_|_]}, _, TPid) -> - incr(TPid, {diameter_codec:msg_id(H), D, error}); - -incr(Dir, Pkt, Dict, TPid) -> - #diameter_packet{header = #diameter_header{is_error = E} - = Hdr, - msg = Rec} - = Pkt, - - RC = int(get_avp_value(Dict, 'Result-Code', Rec)), - PE = is_protocol_error(RC), - - %% Check that the E bit is set only for 3xxx result codes. - (not (E orelse PE)) - orelse (E andalso PE) - orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]), - - irc(TPid, Hdr, Dir, rc_counter(Dict, Rec, RC)). - -irc(_, _, _, undefined) -> - false; - -irc(TPid, Hdr, Dir, Ctr) -> - incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}). - -%% incr/2 - -incr(TPid, Counter) -> - diameter_stats:incr(Counter, TPid, 1). - -%% error_counter/2 - -%% RFC 3588, 7.6: -%% -%% All Diameter answer messages defined in vendor-specific -%% applications MUST include either one Result-Code AVP or one -%% Experimental-Result AVP. -%% -%% Maintain statistics assuming one or the other, not both, which is -%% surely the intent of the RFC. - -rc_counter(Dict, Rec, undefined) -> - er(get_avp_value(Dict, 'Experimental-Result', Rec)); -rc_counter(_, _, RC) -> - {'Result-Code', RC}. - -%% Outgoing answers may be in any of the forms messages can be sent -%% in. Incoming messages will be records. We're assuming here that the -%% arity of the result code AVP's is 0 or 1. - -er([{_,_,N} = T | _]) - when is_integer(N) -> - T; -er({_,_,N} = T) - when is_integer(N) -> - T; -er(_) -> - undefined. - -%% Extract the first good looking integer. There's no guarantee -%% that what we're looking for has arity 1. -int([N|_]) - when is_integer(N) -> - N; -int(N) - when is_integer(N) -> - N; -int(_) -> - undefined. - -is_protocol_error(RC) -> - 3000 =< RC andalso RC < 4000. - --spec x(any(), atom(), list()) -> no_return(). - -%% Warn and exit request process on errors in an incoming answer. -x(Reason, F, A) -> - diameter_lib:warning_report(Reason, {?MODULE, F, A}), - x(Reason). - -x(T) -> - exit(T). - -%%% --------------------------------------------------------------------------- -%%% # failover/[23] -%%% --------------------------------------------------------------------------- - -%% Failover as a consequence of request_peer_down/2. -failover({_, #request{handler = Pid} = Req, TRef}, S) -> - Pid ! {failover, TRef, rt(Req, S)}. - -%% Failover as a consequence of store_request/4. -failover(TRef, Seqs, S) - when is_reference(TRef) -> - case lookup_request(Seqs, TRef) of - #request{} = Req -> - failover({Seqs, Req, TRef}, S); - false -> - ok - end. - -%% prepare_request returned a binary ... -rt(#request{packet = #diameter_packet{msg = undefined}}, _) -> - false; %% TODO: Not what we should do. - -%% ... or not. -rt(#request{packet = #diameter_packet{msg = Msg}, - dictionary = Dict} - = Req, - S) -> - find_transport(get_destination(Dict, Msg), Req, S). - -%%% --------------------------------------------------------------------------- -%%% # report_status/5 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # report_status/5 +%% --------------------------------------------------------------------------- report_status(Status, - #peer{ref = Ref, - conn = TPid, - type = Type, - options = Opts}, - #conn{apps = [_|_] = As, + #watchdog{ref = Ref, + peer = TPid, + type = Type, + options = Opts}, + #peer{apps = [_|_] = As, caps = Caps}, #state{service_name = SvcName} = S, @@ -2615,9 +1251,9 @@ send_event(SvcName, Info) -> send_event(#diameter_event{service = SvcName} = E) -> lists:foreach(fun({_, Pid}) -> Pid ! E end, subscriptions(SvcName)). -%%% --------------------------------------------------------------------------- -%%% # share_peer/5 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # share_peer/5 +%% --------------------------------------------------------------------------- share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _], service_name = Svc}) -> @@ -2626,9 +1262,9 @@ share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _], share_peer(_, _, _, _, _) -> ok. -%%% --------------------------------------------------------------------------- -%%% # share_peers/2 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # share_peers/2 +%% --------------------------------------------------------------------------- share_peers(Pid, #state{options = [_, {_, true} | _], local_peers = PDict}) -> @@ -2640,9 +1276,9 @@ share_peers(_, _) -> sp(Pid, Alias, Peers) -> lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers). -%%% --------------------------------------------------------------------------- -%%% # remote_peer_up/4 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # remote_peer_up/4 +%% --------------------------------------------------------------------------- remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _], service = Svc, @@ -2662,9 +1298,9 @@ rpu(Pid, Caps, PDict, Aliases) -> T = {Pid, Caps}, lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases). -%%% --------------------------------------------------------------------------- -%%% # remote_peer_down/2 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # remote_peer_down/2 +%% --------------------------------------------------------------------------- remote_peer_down(Pid, #state{options = [_, _, {_, true} | _], shared_peers = PDict}) -> @@ -2673,164 +1309,20 @@ remote_peer_down(Pid, #state{options = [_, _, {_, true} | _], rpd(Pid, Alias, PDict) -> ?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict). -%%% --------------------------------------------------------------------------- -%%% find_transport/[34] -%%% -%%% Output: {TransportPid, #diameter_caps{}, #diameter_app{}} -%%% | false -%%% | {error, Reason} -%%% --------------------------------------------------------------------------- - -%% Initial call, from an arbitrary process. -find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) -> - #diameter_service{applications = Apps} = Svc, - ft(find_send_app(Alias, Apps), Msg, Opts, S); - -%% Relay or proxy send. -find_transport(#diameter_app{} = App, Msg, Opts, S) -> - ft(App, Msg, Opts, S). - -ft(#diameter_app{module = Mod, dictionary = Dict} = App, Msg, Opts, S) -> - #options{filter = Filter, - extra = Xtra} - = Opts, - pick_peer(App#diameter_app{module = Mod ++ Xtra}, - get_destination(Dict, Msg), - Filter, - S); -ft(false = No, _, _, _) -> - No. - -%% This can't be used if we're a relay and sending a message -%% in an application not known locally. (TODO) -find_send_app(Alias, Apps) -> - case lists:keyfind(Alias, #diameter_app.alias, Apps) of - #diameter_app{id = ?APP_ID_RELAY} -> - false; - T -> - T - end. - -%% Retransmission, in the service process. -find_transport([_,_] = RH, - Req, - #state{service = #diameter_service{pid = Pid, - applications = Apps}} - = S) - when self() == Pid -> - #request{app = Alias, - filter = Filter, - module = ModX} - = Req, - #diameter_app{} - = App - = lists:keyfind(Alias, #diameter_app.alias, Apps), - - pick_peer(App#diameter_app{module = ModX}, - RH, - Filter, - S). - -%% get_destination/2 - -get_destination(Dict, Msg) -> - [str(get_avp_value(Dict, 'Destination-Realm', Msg)), - str(get_avp_value(Dict, 'Destination-Host', Msg))]. - -%% This is not entirely correct. The avp could have an arity 1, in -%% which case an empty list is a DiameterIdentity of length 0 rather -%% than the list of no values we treat it as by mapping to undefined. -%% This behaviour is documented. -str([]) -> - undefined; -str(T) -> - T. - -%% get_avp_value/3 -%% -%% Find an AVP in a message of one of three forms: -%% -%% - a message record (as generated from a .dia spec) or -%% - a list of an atom message name followed by 2-tuple, avp name/value pairs. -%% - a list of a #diameter_header{} followed by #diameter_avp{} records, -%% -%% In the first two forms a dictionary module is used at encode to -%% identify the type of the AVP and its arity in the message in -%% question. The third form allows messages to be sent as is, without -%% a dictionary, which is needed in the case of relay agents, for one. - -%% Messages will be header/avps list as a relay and the only AVP's we -%% look for are in the common dictionary. This is required since the -%% relay dictionary doesn't inherit the common dictionary (which maybe -%% it should). -get_avp_value(?RELAY, Name, Msg) -> - get_avp_value(?BASE, Name, Msg); - -%% Message sent as a header/avps list, probably a relay case but not -%% necessarily. -get_avp_value(Dict, Name, [#diameter_header{} | Avps]) -> - try - {Code, _, VId} = Dict:avp_header(Name), - [A|_] = lists:dropwhile(fun(#diameter_avp{code = C, vendor_id = V}) -> - C /= Code orelse V /= VId - end, - Avps), - avp_decode(Dict, Name, A) - catch - error: _ -> - undefined - end; - -%% Outgoing message as a name/values list. -get_avp_value(_, Name, [_MsgName | Avps]) -> - case lists:keyfind(Name, 1, Avps) of - {_, V} -> - V; - _ -> - undefined - end; - -%% Record might be an answer message in the common dictionary. -get_avp_value(Dict, Name, Rec) - when Dict /= ?BASE, element(1, Rec) == 'diameter_base_answer-message' -> - get_avp_value(?BASE, Name, Rec); - -%% Message is typically a record but not necessarily: diameter:call/4 -%% can be passed an arbitrary term. -get_avp_value(Dict, Name, Rec) -> - try - Dict:'#get-'(Name, Rec) - catch - error:_ -> - undefined - end. - -avp_decode(Dict, Name, #diameter_avp{value = undefined, - data = Bin}) -> - Dict:avp(decode, Bin, Name); -avp_decode(_, _, #diameter_avp{value = V}) -> - V. - -%%% --------------------------------------------------------------------------- -%%% # pick_peer(App, [DestRealm, DestHost], Filter, #state{}) -%%% -%%% Output: {TransportPid, #diameter_caps{}, App} -%%% | false -%%% | {error, Reason} -%%% --------------------------------------------------------------------------- - -%% Find transports to a given realm/host. +%% --------------------------------------------------------------------------- +%% pick_peer/4 +%% --------------------------------------------------------------------------- pick_peer(#diameter_app{alias = Alias} = App, - [_,_] = RH, + RealmAndHost, Filter, #state{local_peers = L, shared_peers = S, service_name = SvcName, service = #diameter_service{pid = Pid}}) -> - pick_peer(peers(Alias, RH, Filter, L), - peers(Alias, RH, Filter, S), + pick_peer(peers(Alias, RealmAndHost, Filter, L), + peers(Alias, RealmAndHost, Filter, S), Pid, SvcName, App). @@ -2843,7 +1335,12 @@ pick_peer([], [], _, _, _) -> %% App state is mutable but we're not in the service process: go there. pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App) when self() /= Pid -> - call_service(Pid, {pick_peer, Local, Remote, App}); + case call_service(Pid, {pick_peer, Local, Remote, App}) of + {TPid, _} = T when is_pid(TPid) -> + T; + {error, _} -> + false + end; %% App state isn't mutable or it is and we're in the service process: %% do the deed. @@ -2851,19 +1348,18 @@ pick_peer(Local, Remote, _Pid, SvcName, - #diameter_app{module = ModX, - alias = Alias, + #diameter_app{alias = Alias, init_state = S, mutable = M} = App) -> - MFA = {ModX, pick_peer, [Local, Remote, SvcName]}, + Args = [Local, Remote, SvcName], - try state_cb(App, MFA) of - {ok, {TPid, #diameter_caps{} = Caps}} when is_pid(TPid) -> - {TPid, Caps, App}; - {{TPid, #diameter_caps{} = Caps}, ModS} when is_pid(TPid), M -> + try state_cb(App, pick_peer, Args) of + {ok, {TPid, #diameter_caps{}} = T} when is_pid(TPid) -> + T; + {{TPid, #diameter_caps{}} = T, ModS} when is_pid(TPid), M -> mod_state(Alias, ModS), - {TPid, Caps, App}; + T; {false = No, ModS} when M -> mod_state(Alias, ModS), No; @@ -2871,15 +1367,17 @@ pick_peer(Local, No; false = No -> No; - {{TPid, #diameter_caps{} = Caps}, S} when is_pid(TPid) -> - {TPid, Caps, App}; %% Accept returned state in the immutable + {{TPid, #diameter_caps{}} = T, S} when is_pid(TPid) -> + T; %% Accept returned state in the immutable {false = No, S} -> %% case as long it isn't changed. No; T -> - diameter_lib:error_report({invalid, T, App}, MFA) + diameter_lib:error_report({invalid, T, App}, + {App, pick_peer, Args}) catch E: Reason -> - diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA) + diameter_lib:error_report({failure, {E, Reason, ?STACK}}, + {App, pick_peer, Args}) end. %% peers/4 @@ -2966,14 +1464,14 @@ eq(Any, Id, PeerId) -> %% transports/1 -transports(#state{peerT = PeerT}) -> - ets:select(PeerT, [{#peer{conn = '$1', _ = '_'}, +transports(#state{watchdogT = WatchdogT}) -> + ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'}, [{'is_pid', '$1'}], ['$1']}]). -%%% --------------------------------------------------------------------------- -%%% # service_info/2 -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # service_info/2 +%% --------------------------------------------------------------------------- %% The config passed to diameter:start_service/2. -define(CAP_INFO, ['Origin-Host', @@ -3021,11 +1519,12 @@ tagged_info(Item, S) undefined end; -tagged_info(TPid, #state{peerT = PT, connT = CT}) +tagged_info(TPid, #state{watchdogT = WatchdogT, peerT = PeerT}) when is_pid(TPid) -> try - [#conn{peer = Pid}] = ets:lookup(CT, TPid), - [#peer{ref = Ref, type = Type, options = Opts}] = ets:lookup(PT, Pid), + [#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid), + [#watchdog{ref = Ref, type = Type, options = Opts}] + = ets:lookup(WatchdogT, Pid), [{ref, Ref}, {type, Type}, {options, Opts}] @@ -3108,11 +1607,11 @@ complete(Pre) -> %% info_stats/1 -info_stats(#state{peerT = PeerT}) -> - MatchSpec = [{#peer{ref = '$1', conn = '$2', _ = '_'}, +info_stats(#state{watchdogT = WatchdogT}) -> + MatchSpec = [{#watchdog{ref = '$1', peer = '$2', _ = '_'}, [{'is_pid', '$2'}], [['$1', '$2']]}], - try ets:select(PeerT, MatchSpec) of + try ets:select(WatchdogT, MatchSpec) of L -> diameter_stats:read(lists:append(L)) catch @@ -3122,7 +1621,8 @@ info_stats(#state{peerT = PeerT}) -> %% info_transport/1 %% %% One entry per configured transport. Statistics for each entry are -%% the accumulated values for the ref and associated peer pids. +%% the accumulated values for the ref and associated watchdog/peer +%% pids. info_transport(S) -> PeerD = peer_dict(S, config_dict(S)), @@ -3155,43 +1655,42 @@ transport([[_,_] | L]) -> %% Possibly many peer entries for a listening transport. Note that all %% have the same options by construction, which is not terribly space -%% efficient. (TODO: all entries for the same Ref should share options.) +%% efficient. transport([[{type, accept}, {options, Opts} | _] | _] = Ls) -> [{type, listen}, {options, Opts}, {accept, [lists:nthtail(2,L) || L <- Ls]}]. -peer_dict(#state{peerT = PeerT, connT = ConnT}, Dict0) -> - try ets:tab2list(PeerT) of +peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) -> + try ets:tab2list(WatchdogT) of L -> - lists:foldl(fun(T,A) -> peer_acc(ConnT, A, T) end, Dict0, L) + lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L) catch error: badarg -> Dict0 %% service has gone down end. -peer_acc(ConnT, Acc, #peer{pid = Pid, - type = Type, - ref = Ref, - options = Opts, - op_state = OS, - started = T, - conn = TPid}) -> - WS = wd_state(OS), +peer_acc(PeerT, Acc, #watchdog{pid = Pid, + type = Type, + ref = Ref, + options = Opts, + state = WS, + started = At, + peer = TPid}) -> dict:append(Ref, [{type, Type}, {options, Opts}, - {watchdog, {Pid, T, WS}} - | info_conn(ConnT, TPid, WS /= ?WD_DOWN)], + {watchdog, {Pid, At, WS}} + | info_peer(PeerT, TPid, WS)], Acc). -info_conn(ConnT, TPid, true) - when is_pid(TPid) -> - try ets:lookup(ConnT, TPid) of - T -> info_conn(T) +info_peer(PeerT, TPid, WS) + when is_pid(TPid), WS /= ?WD_DOWN -> + try ets:lookup(PeerT, TPid) of + T -> info_peer(T) catch error: badarg -> [] %% service has gone down end; -info_conn(_, _, _) -> +info_peer(_, _, _) -> []. %% The point of extracting the config here is so that 'transport' info @@ -3210,19 +1709,12 @@ config_acc({Ref, T, Opts}, Dict) config_acc(_, Dict) -> Dict. -wd_state({_,S}) -> - S; -wd_state(?STATE_UP) -> - ?WD_OKAY; -wd_state(?STATE_DOWN) -> - ?WD_DOWN. - -info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) -> +info_peer([#peer{pid = Pid, apps = SApps, caps = Caps, started = T}]) -> [{peer, {Pid, T}}, {apps, SApps}, {caps, info_caps(Caps)} | try [{port, info_port(Pid)}] catch _:_ -> [] end]; -info_conn([] = No) -> +info_peer([] = No) -> No. %% Extract information that the processes involved are expected to @@ -3256,22 +1748,7 @@ mk_app(#diameter_app{} = A) -> %% One entry for each outgoing request whose answer is outstanding. info_pending(#state{} = S) -> - MatchSpec = [{{'$1', - #request{transport = '$2', - from = '$3', - app = '$4', - _ = '_'}, - '_'}, - [?ORCOND([{'==', T, '$2'} || T <- transports(S)])], - [{{'$1', [{{app, '$4'}}, - {{transport, '$2'}}, - {{from, '$3'}}]}}]}], - - try - ets:select(?REQUEST_TABLE, MatchSpec) - catch - error: badarg -> [] %% service has gone down - end. + diameter_traffic:pending(transports(S)). %% info_connections/1 %% diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl index 70727d068e..8fd5ded300 100644 --- a/lib/diameter/src/base/diameter_stats.erl +++ b/lib/diameter/src/base/diameter_stats.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2012. All Rights Reserved. +%% Copyright Ericsson AB 2010-2013. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -136,7 +136,7 @@ read(Refs, B) -> -spec flush([ref()]) -> [{ref(), {counter(), integer()}}]. - + flush(Refs) -> try call({flush, Refs}) diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl new file mode 100644 index 0000000000..f527f7c754 --- /dev/null +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -0,0 +1,1705 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2013. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% Implements the handling of incoming and outgoing Diameter messages +%% except CER/CEA, DWR/DWA and DPR/DPA. That is, the messages that a +%% diameter client sends and receives. +%% + +-module(diameter_traffic). + +%% towards diameter +-export([send_request/4]). + +%% towards diameter_watchdog +-export([receive_message/4]). + +%% towards diameter_service +-export([make_recvdata/1, + peer_up/1, + peer_down/1, + failover/1, + pending/1]). + +%% towards ?MODULE +-export([send/1]). %% send from remote node + +-include_lib("diameter/include/diameter.hrl"). +-include("diameter_internal.hrl"). + +-define(RELAY, ?DIAMETER_DICT_RELAY). +-define(BASE, ?DIAMETER_DICT_COMMON). %% Note: the RFC 3588 dictionary + +-define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests + +%% Table containing outgoing requests for which a reply has yet to be +%% received. +-define(REQUEST_TABLE, diameter_request). + +%% Workaround for dialyzer's lack of understanding of match specs. +-type match(T) + :: T | '_' | '$1' | '$2' | '$3' | '$4'. + +%% Record diameter:call/4 options are parsed into. +-record(options, + {filter = none :: diameter:peer_filter(), + extra = [] :: list(), + timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF, + detach = false :: boolean()}). + +%% Term passed back to receive_message/4 with every incoming message. +-record(recvdata, + {peerT :: ets:tid(), + service_name :: diameter:service_name(), + apps :: [#diameter_app{}], + sequence :: diameter:sequence()}). + +%% Record stored in diameter_request for each outgoing request. +-record(request, + {ref :: match(reference()), %% used to receive answer + caller :: match(pid()), %% calling process + handler :: match(pid()), %% request process + transport :: match(pid()), %% peer process + caps :: match(#diameter_caps{}), %% of connection + packet :: match(#diameter_packet{})}). %% of request + +%% --------------------------------------------------------------------------- +%% # make_recvdata/1 +%% --------------------------------------------------------------------------- + +make_recvdata([SvcName, PeerT, Apps, Mask | _]) -> + #recvdata{service_name = SvcName, + peerT = PeerT, + apps = Apps, + sequence = Mask}. +%% Take a list so that the caller (diameter_service) can be upgraded +%% first if new members are added. Note that receive_message/4 might +%% still get an old term from any watchdog started in old code. + +%% --------------------------------------------------------------------------- +%% peer_up/1 +%% --------------------------------------------------------------------------- + +%% Insert an element that is used to detect whether or not there has +%% been a failover when inserting an outgoing request. +peer_up(TPid) -> + ets:insert(?REQUEST_TABLE, {TPid}). + +%% --------------------------------------------------------------------------- +%% peer_down/1 +%% --------------------------------------------------------------------------- + +peer_down(TPid) -> + ets:delete(?REQUEST_TABLE, TPid), + failover(TPid). + +%% --------------------------------------------------------------------------- +%% pending/1 +%% --------------------------------------------------------------------------- + +pending(TPids) -> + MatchSpec = [{{'$1', + #request{caller = '$2', + handler = '$3', + transport = '$4', + _ = '_'}, + '_'}, + [?ORCOND([{'==', T, '$4'} || T <- TPids])], + [{{'$1', [{{caller, '$2'}}, + {{handler, '$3'}}, + {{transport, '$4'}}]}}]}], + + try + ets:select(?REQUEST_TABLE, MatchSpec) + catch + error: badarg -> [] %% service has gone down + end. + +%% --------------------------------------------------------------------------- +%% # receive_message/4 +%% +%% Handle an incoming Diameter message. +%% --------------------------------------------------------------------------- + +%% Handle an incoming Diameter message in the watchdog process. This +%% used to come through the service process but this avoids that +%% becoming a bottleneck. + +receive_message(TPid, Pkt, Dict0, RecvData) + when is_pid(TPid) -> + #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, + recv(R, + (not R) andalso lookup_request(Pkt, TPid), + TPid, + Pkt, + Dict0, + RecvData). + +%% Incoming request ... +recv(true, false, TPid, Pkt, Dict0, RecvData) -> + try + spawn(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end) + catch + error: system_limit = E -> %% discard + ?LOG({error, E}, now()) + end; + +%% ... answer to known request ... +recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> + Pid ! {answer, Ref, Req, Dict0, Pkt}; +%% Note that failover could have happened prior to this message being +%% received and triggering failback. That is, both a failover message +%% and answer may be on their way to the handler process. In the worst +%% case the request process gets notification of the failover and +%% sends to the alternate peer before an answer arrives, so it's +%% always the case that we can receive more than one answer after +%% failover. The first answer received by the request process wins, +%% any others are discarded. + +%% ... or not. +recv(false, false, _, _, _, _) -> + ok. + +%% --------------------------------------------------------------------------- +%% recv_request/4 +%% --------------------------------------------------------------------------- + +recv_request(TPid, + #diameter_packet{header = #diameter_header{application_id = Id}} + = Pkt, + Dict0, + #recvdata{peerT = PeerT, apps = Apps} + = RecvData) -> + send_A(recv_R(diameter_service:find_incoming_app(PeerT, TPid, Id, Apps), + TPid, + Pkt, + Dict0, + RecvData), + TPid, + Dict0, + RecvData). + +%% recv_R/5 + +recv_R({#diameter_app{id = Id, dictionary = Dict} = App, Caps}, + TPid, + Pkt0, + Dict0, + RecvData) -> + Pkt = errors(Id, diameter_codec:decode(Id, Dict, Pkt0)), + {Caps, Pkt, App, recv_R(App, TPid, Dict0, Caps, RecvData, Pkt)}; +%% Note that the decode is different depending on whether or not Id is +%% ?APP_ID_RELAY. + +%% DIAMETER_APPLICATION_UNSUPPORTED 3007 +%% A request was sent for an application that is not supported. + +recv_R(#diameter_caps{} + = Caps, + _TPid, + #diameter_packet{errors = Es} + = Pkt, + _Dict0, + _RecvData) -> + {Caps, Pkt#diameter_packet{avps = collect_avps(Pkt), + errors = [3007 | Es]}}; + +recv_R(false = No, _, _, _, _) -> %% transport has gone down + No. + +collect_avps(Pkt) -> + case diameter_codec:collect_avps(Pkt) of + {_Bs, As} -> + As; + As -> + As + end. + +%% recv_R/6 + +%% Answer errors ourselves ... +recv_R(#diameter_app{options = [_, {request_errors, E} | _]}, + _TPid, + Dict0, + _Caps, + _RecvData, + #diameter_packet{errors = [RC|_]}) %% a detected 3xxx is hd + when E == answer, (Dict0 /= ?BASE orelse 3 == RC div 1000); + E == answer_3xxx, 3 == RC div 1000 -> + {{answer_message, rc(RC)}, [], []}; + +%% ... or make a handle_request callback. Note that +%% Pkt#diameter_packet.msg = undefined in the 3001 case. +recv_R(App, + TPid, + _Dict0, + Caps, + #recvdata{service_name = SvcName}, + Pkt) -> + request_cb(cb(App, handle_request, [Pkt, SvcName, {TPid, Caps}]), + App, + [], + []). + +rc({N,_}) -> + N; +rc(N) -> + N. + +%% errors/1 +%% +%% Look for additional errors in a decoded message, prepending the +%% errors field with the first detected error. It's odd/unfortunate +%% that 501[15] aren't protocol errors. With RFC 3588 this means that +%% a handle_request callback has to formulate the answer. With RFC +%% 6733 it's acceptable for 5xxx to be sent in an answer-message. + +%% DIAMETER_INVALID_MESSAGE_LENGTH 5015 +%% This error is returned when a request is received with an invalid +%% message length. + +errors(_, #diameter_packet{header = #diameter_header{length = Len}, + bin = Bin, + errors = Es} + = Pkt) + when Len < 20; + 0 /= Len rem 4; + 8*Len /= bit_size(Bin) -> + Pkt#diameter_packet{errors = [5015 | Es]}; + +%% DIAMETER_UNSUPPORTED_VERSION 5011 +%% This error is returned when a request was received, whose version +%% number is unsupported. + +errors(_, #diameter_packet{header = #diameter_header{version = V}, + errors = Es} + = Pkt) + when V /= ?DIAMETER_VERSION -> + Pkt#diameter_packet{errors = [5011 | Es]}; + +%% DIAMETER_INVALID_AVP_BITS 3009 +%% A request was received that included an AVP whose flag bits are +%% set to an unrecognized value, or that is inconsistent with the +%% AVP's definition. + +errors(_, #diameter_packet{errors = [Bs | Es]} = Pkt) + when is_bitstring(Bs) -> + Pkt#diameter_packet{errors = [3009 | Es]}; + +%% DIAMETER_COMMAND_UNSUPPORTED 3001 +%% The Request contained a Command-Code that the receiver did not +%% recognize or support. This MUST be used when a Diameter node +%% receives an experimental command that it does not understand. + +errors(Id, #diameter_packet{header = #diameter_header{is_proxiable = P}, + msg = M, + errors = Es} + = Pkt) + when ?APP_ID_RELAY /= Id, undefined == M; %% don't know the command + ?APP_ID_RELAY == Id, not P -> %% command isn't proxiable + Pkt#diameter_packet{errors = [3001 | Es]}; + +%% DIAMETER_INVALID_HDR_BITS 3008 +%% A request was received whose bits in the Diameter header were +%% either set to an invalid combination, or to a value that is +%% inconsistent with the command code's definition. + +errors(_, #diameter_packet{header = #diameter_header{is_request = true, + is_error = true}, + errors = Es} + = Pkt) -> + Pkt#diameter_packet{errors = [3008 | Es]}; + +%% Green. +errors(_, Pkt) -> + Pkt. + +%% request_cb/4 + +%% A reply may be an answer-message, constructed either here or by +%% the handle_request callback. The header from the incoming request +%% is passed into the encode so that it can retrieve the relevant +%% command code in this case. It will also then ignore Dict and use +%% the base encoder. +request_cb({reply, _Ans} = T, _App, EvalPktFs, EvalFs) -> + {T, EvalPktFs, EvalFs}; + +%% An 3xxx result code, for which the E-bit is set in the header. +request_cb({protocol_error, RC}, _App, EvalPktFs, EvalFs) + when 3 == RC div 1000 -> + {{answer_message, RC}, EvalPktFs, EvalFs}; + +request_cb({answer_message, RC} = T, _App, EvalPktFs, EvalFs) + when 3 == RC div 1000; + 5 == RC div 1000 -> + {T, EvalPktFs, EvalFs}; + +%% RFC 3588 says we must reply 3001 to anything unrecognized or +%% unsupported. 'noreply' is undocumented (and inappropriately named) +%% backwards compatibility for this, protocol_error the documented +%% alternative. +request_cb(noreply, _App, EvalPktFs, EvalFs) -> + {{answer_message, 3001}, EvalPktFs, EvalFs}; + +%% Relay a request to another peer. This is equivalent to doing an +%% explicit call/4 with the message in question except that (1) a loop +%% will be detected by examining Route-Record AVP's, (3) a +%% Route-Record AVP will be added to the outgoing request and (3) the +%% End-to-End Identifier will default to that in the +%% #diameter_header{} without the need for an end_to_end_identifier +%% option. +%% +%% relay and proxy are similar in that they require the same handling +%% with respect to Route-Record and End-to-End identifier. The +%% difference is that a proxy advertises specific applications, while +%% a relay advertises the relay application. If a callback doesn't +%% want to distinguish between the cases in the callback return value +%% then 'resend' is a neutral alternative. +%% +request_cb({A, Opts}, #diameter_app{id = Id}, EvalPktFs, EvalFs) + when A == relay, Id == ?APP_ID_RELAY; + A == proxy, Id /= ?APP_ID_RELAY; + A == resend -> + {{call, Opts}, EvalPktFs, EvalFs}; + +request_cb(discard = No, _, _, _) -> + No; + +request_cb({eval_packet, RC, F}, App, Fs, EvalFs) -> + request_cb(RC, App, [F|Fs], EvalFs); + +request_cb({eval, RC, F}, App, EvalPktFs, Fs) -> + request_cb(RC, App, EvalPktFs, [F|Fs]); + +request_cb(T, App, _, _) -> + ?ERROR({invalid_return, T, handle_request, App}). + +%% send_A/4 + +send_A({Caps, Pkt}, TPid, Dict0, _RecvData) -> %% unsupported application + #diameter_packet{errors = [RC|_]} = Pkt, + send_A(answer_message(RC, Caps, Dict0, Pkt), + TPid, + Dict0, + Pkt, + [], + []); + +send_A({Caps, Pkt, App, {T, EvalPktFs, EvalFs}}, TPid, Dict0, RecvData) -> + send_A(answer(T, Caps, Pkt, App, Dict0, RecvData), + TPid, + Dict0, + Pkt, + EvalPktFs, + EvalFs); + +send_A(_, _, _, _) -> + ok. + +%% send_A/6 + +send_A(T, TPid, Dict0, ReqPkt, EvalPktFs, EvalFs) -> + reply(T, TPid, Dict0, EvalPktFs, ReqPkt), + lists:foreach(fun diameter_lib:eval/1, EvalFs). + +%% answer/6 + +answer({reply, Ans}, _Caps, _Pkt, App, Dict0, _RecvData) -> + {dict(App#diameter_app.dictionary, Dict0, Ans), Ans}; + +answer({call, Opts}, Caps, Pkt, App, Dict0, RecvData) -> + #diameter_caps{origin_host = {OH,_}} + = Caps, + #diameter_packet{avps = Avps} + = Pkt, + {Code, _Flags, Vid} = Dict0:avp_header('Route-Record'), + resend(is_loop(Code, Vid, OH, Dict0, Avps), + Opts, + Caps, + Pkt, + App, + Dict0, + RecvData); + +%% RFC 3588 only allows 3xxx errors in an answer-message. RFC 6733 +%% added the possibility of setting 5xxx. +answer({answer_message, RC} = T, Caps, Pkt, App, Dict0, _RecvData) -> + Dict0 /= ?BASE orelse 3 == RC div 1000 + orelse ?ERROR({invalid_return, T, handle_request, App}), + answer_message(RC, Caps, Dict0, Pkt). + +%% dict/3 + +%% An incoming answer, not yet decoded. +dict(Dict, Dict0, #diameter_packet{header + = #diameter_header{is_request = false, + is_error = E}, + msg = undefined}) -> + if E -> Dict0; true -> Dict end; + +dict(Dict, Dict0, [Msg]) -> + dict(Dict, Dict0, Msg); + +dict(Dict, Dict0, #diameter_packet{msg = Msg}) -> + dict(Dict, Dict0, Msg); + +dict(Dict, Dict0, Msg) -> + choose(is_answer_message(Msg, Dict0), Dict0, Dict). + +is_answer_message([Name | _], _) -> + Name == 'answer-message'; + +is_answer_message(Rec, Dict) -> + try + 'answer-message' == Dict:rec2msg(element(1,Rec)) + catch + error:_ -> false + end. + +%% answer_message/4 + +answer_message(RC, + #diameter_caps{origin_host = {OH,_}, + origin_realm = {OR,_}}, + Dict0, + #diameter_packet{avps = Avps} + = Pkt) -> + ?LOG({error, RC}, Pkt), + {Dict0, answer_message(OH, OR, RC, Dict0, Avps)}. + +%% resend/7 + +%% DIAMETER_LOOP_DETECTED 3005 +%% An agent detected a loop while trying to get the message to the +%% intended recipient. The message MAY be sent to an alternate peer, +%% if one is available, but the peer reporting the error has +%% identified a configuration problem. + +resend(true, _Opts, Caps, Pkt, _App, Dict0, _RecvData) -> + answer_message(3005, Caps, Dict0, Pkt); + +%% 6.1.8. Relaying and Proxying Requests +%% +%% A relay or proxy agent MUST append a Route-Record AVP to all requests +%% forwarded. The AVP contains the identity of the peer the request was +%% received from. + +resend(false, + Opts, + #diameter_caps{origin_host = {_,OH}} + = Caps, + #diameter_packet{header = Hdr0, + avps = Avps} + = Pkt, + App, + Dict0, + #recvdata{service_name = SvcName, + sequence = Mask}) -> + Route = #diameter_avp{data = {Dict0, 'Route-Record', OH}}, + Seq = diameter_session:sequence(Mask), + Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq}, + Msg = [Hdr, Route | Avps], + resend(send_request(SvcName, App, Msg, Opts), Caps, Dict0, Pkt). +%% The incoming request is relayed with the addition of a +%% Route-Record. Note the requirement on the return from call/4 below, +%% which places a requirement on the value returned by the +%% handle_answer callback of the application module in question. +%% +%% Note that there's nothing stopping the request from being relayed +%% back to the sender. A pick_peer callback may want to avoid this but +%% a smart peer might recognize the potential loop and choose another +%% route. A less smart one will probably just relay the request back +%% again and force us to detect the loop. A pick_peer that wants to +%% avoid this can specify filter to avoid the possibility. +%% Eg. {neg, {host, OH} where #diameter_caps{origin_host = {OH, _}}. +%% +%% RFC 6.3 says that a relay agent does not modify Origin-Host but +%% says nothing about a proxy. Assume it should behave the same way. + +%% resend/4 +%% +%% Relay a reply to a relayed request. + +%% Answer from the peer: reset the hop by hop identifier and send. +resend(#diameter_packet{bin = B} + = Pkt, + _Caps, + _Dict0, + #diameter_packet{header = #diameter_header{hop_by_hop_id = Id}, + transport_data = TD}) -> + Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B), + transport_data = TD}; +%% TODO: counters + +%% Or not: DIAMETER_UNABLE_TO_DELIVER. +resend(_, Caps, Dict0, Pkt) -> + answer_message(3002, Caps, Dict0, Pkt). + +%% is_loop/5 +%% +%% Is there a Route-Record AVP with our Origin-Host? + +is_loop(Code, + Vid, + Bin, + _Dict0, + [#diameter_avp{code = Code, vendor_id = Vid, data = Bin} | _]) -> + true; + +is_loop(_, _, _, _, []) -> + false; + +is_loop(Code, Vid, OH, Dict0, [_ | Avps]) + when is_binary(OH) -> + is_loop(Code, Vid, OH, Dict0, Avps); + +is_loop(Code, Vid, OH, Dict0, Avps) -> + is_loop(Code, Vid, Dict0:avp(encode, OH, 'Route-Record'), Dict0, Avps). + +%% reply/5 + +%% Local answer ... +reply({Dict, Ans}, TPid, Dict0, Fs, ReqPkt) -> + reply(Ans, Dict, TPid, Dict0, Fs, ReqPkt); + +%% ... or relayed. +reply(#diameter_packet{} = Pkt, TPid, _Dict0, Fs, _ReqPkt) -> + eval_packet(Pkt, Fs), + send(TPid, Pkt). + +%% reply/6 +%% +%% Send a locally originating reply. + +%% Skip the setting of Result-Code and Failed-AVP's below. This is +%% undocumented and shouldn't be relied on. +reply([Msg], Dict, TPid, Dict0, Fs, ReqPkt) + when is_list(Msg); + is_tuple(Msg) -> + reply(Msg, Dict, TPid, Dict0, Fs, ReqPkt#diameter_packet{errors = []}); + +%% No errors or a diameter_header/avp list. +reply(Msg, Dict, TPid, Dict0, Fs, ReqPkt) -> + Pkt = encode(Dict, reset(make_answer_packet(Msg, ReqPkt), Dict), Fs), + incr(send, Pkt, Dict, TPid, Dict0), %% count outgoing result codes + send(TPid, Pkt). + +%% reset/2 + +%% Header/avps list: send as is. +reset(#diameter_packet{msg = [#diameter_header{} | _]} = Pkt, _) -> + Pkt; + +%% No errors to set or errors explicitly ignored. +reset(#diameter_packet{errors = Es} = Pkt, _) + when Es == []; + Es == false -> + Pkt; + +%% Otherwise possibly set Result-Code and/or Failed-AVP. +reset(#diameter_packet{msg = Msg, errors = Es} = Pkt, Dict) -> + Pkt#diameter_packet{msg = reset(Msg, Dict, Es)}. + +%% reset/3 + +reset(Msg, Dict, Es) + when is_list(Es) -> + {E3, E5, Fs} = partition(Es), + FailedAVP = failed_avp(Msg, lists:reverse(Fs), Dict), + reset(set(Msg, FailedAVP, Dict), + Dict, + choose(is_answer_message(Msg, Dict), E3, E5)); + +reset(Msg, Dict, N) + when is_integer(N) -> + ResultCode = rc(Msg, {'Result-Code', N}, Dict), + set(Msg, ResultCode, Dict); + +reset(Msg, _, _) -> + Msg. + +partition(Es) -> + lists:foldl(fun pacc/2, {false, false, []}, Es). + +%% Note that the errors list can contain not only integer() and +%% {integer(), #diameter_avp{}} but also #diameter_avp{}. The latter +%% isn't something that's returned by decode but can be set in a reply +%% for encode. + +pacc({RC, #diameter_avp{} = A}, {E3, E5, Acc}) + when is_integer(RC) -> + pacc(RC, {E3, E5, [A|Acc]}); + +pacc(#diameter_avp{} = A, {E3, E5, Acc}) -> + {E3, E5, [A|Acc]}; + +pacc(RC, {false, E5, Acc}) + when 3 == RC div 1000 -> + {RC, E5, Acc}; + +pacc(RC, {E3, false, Acc}) + when 5 == RC div 1000 -> + {E3, RC, Acc}; + +pacc(_, Acc) -> + Acc. + +eval_packet(Pkt, Fs) -> + lists:foreach(fun(F) -> diameter_lib:eval([F,Pkt]) end, Fs). + +%% make_answer_packet/2 + +%% Use decode errors to set Result-Code and/or Failed-AVP unless the +%% the errors field has been explicitly set. Unfortunately, the +%% default value is the empty list rather than 'undefined' so use the +%% atom 'false' for "set nothing". (This is historical and changing +%% the default value would require modules including diameter.hrl to +%% be recompiled.) +make_answer_packet(#diameter_packet{errors = []} + = Pkt, + #diameter_packet{errors = [_|_] = Es} + = ReqPkt) -> + make_answer_packet(Pkt#diameter_packet{errors = Es}, ReqPkt); + +%% A reply message clears the R and T flags and retains the P flag. +%% The E flag will be set at encode. 6.2 of 3588 requires the same P +%% flag on an answer as on the request. A #diameter_packet{} returned +%% from a handle_request callback can circumvent this by setting its +%% own header values. +make_answer_packet(#diameter_packet{header = Hdr, + msg = Msg, + errors = Es, + transport_data = TD}, + #diameter_packet{header = ReqHdr}) -> + Hdr0 = ReqHdr#diameter_header{version = ?DIAMETER_VERSION, + is_request = false, + is_error = undefined, + is_retransmitted = false}, + #diameter_packet{header = fold_record(Hdr0, Hdr), + msg = Msg, + errors = Es, + transport_data = TD}; + +%% Binaries and header/avp lists are sent as-is. +make_answer_packet(Bin, #diameter_packet{transport_data = TD}) + when is_binary(Bin) -> + #diameter_packet{bin = Bin, + transport_data = TD}; +make_answer_packet([#diameter_header{} | _] = Msg, + #diameter_packet{transport_data = TD}) -> + #diameter_packet{msg = Msg, + transport_data = TD}; + +%% Otherwise, preserve transport_data. +make_answer_packet(Msg, #diameter_packet{transport_data = TD} = Pkt) -> + make_answer_packet(#diameter_packet{msg = Msg, transport_data = TD}, Pkt). + +%% Reply as name and tuple list ... +set([_|_] = Ans, Avps, _) -> + Ans ++ Avps; %% Values nearer tail take precedence. + +%% ... or record. +set(Rec, Avps, Dict) -> + Dict:'#set-'(Avps, Rec). + +%% rc/3 +%% +%% Turn the result code into a list if its optional and only set it if +%% the arity is 1 or {0,1}. In other cases (which probably shouldn't +%% exist in practise) we can't know what's appropriate. + +rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) -> + case Dict:avp_arity(MsgName, 'Result-Code') of + 1 -> [T]; + {0,1} -> [{K, [RC]}]; + _ -> [] + end; + +rc(Rec, T, Dict) -> + rc([Dict:rec2msg(element(1, Rec))], T, Dict). + +%% failed_avp/3 + +failed_avp(_, [] = No, _) -> + No; + +failed_avp(Rec, Failed, Dict) -> + [fa(Rec, [{'AVP', Failed}], Dict)]. + +%% Reply as name and tuple list ... +fa([MsgName | Values], FailedAvp, Dict) -> + R = Dict:msg2rec(MsgName), + try + Dict:'#info-'(R, {index, 'Failed-AVP'}), + {'Failed-AVP', [FailedAvp]} + catch + error: _ -> + Avps = proplists:get_value('AVP', Values, []), + A = #diameter_avp{name = 'Failed-AVP', + value = FailedAvp}, + {'AVP', [A|Avps]} + end; + +%% ... or record. +fa(Rec, FailedAvp, Dict) -> + try + {'Failed-AVP', [FailedAvp]} + catch + error: _ -> + Avps = Dict:'get-'('AVP', Rec), + A = #diameter_avp{name = 'Failed-AVP', + value = FailedAvp}, + {'AVP', [A|Avps]} + end. + +%% 3. Diameter Header +%% +%% E(rror) - If set, the message contains a protocol error, +%% and the message will not conform to the ABNF +%% described for this command. Messages with the 'E' +%% bit set are commonly referred to as error +%% messages. This bit MUST NOT be set in request +%% messages. See Section 7.2. + +%% 3.2. Command Code ABNF specification +%% +%% e-bit = ", ERR" +%% ; If present, the 'E' bit in the Command +%% ; Flags is set, indicating that the answer +%% ; message contains a Result-Code AVP in +%% ; the "protocol error" class. + +%% 7.1.3. Protocol Errors +%% +%% Errors that fall within the Protocol Error category SHOULD be treated +%% on a per-hop basis, and Diameter proxies MAY attempt to correct the +%% error, if it is possible. Note that these and only these errors MUST +%% only be used in answer messages whose 'E' bit is set. + +%% Thus, only construct answers to protocol errors. Other errors +%% require an message-specific answer and must be handled by the +%% application. + +%% 6.2. Diameter Answer Processing +%% +%% When a request is locally processed, the following procedures MUST be +%% applied to create the associated answer, in addition to any +%% additional procedures that MAY be discussed in the Diameter +%% application defining the command: +%% +%% - The same Hop-by-Hop identifier in the request is used in the +%% answer. +%% +%% - The local host's identity is encoded in the Origin-Host AVP. +%% +%% - The Destination-Host and Destination-Realm AVPs MUST NOT be +%% present in the answer message. +%% +%% - The Result-Code AVP is added with its value indicating success or +%% failure. +%% +%% - If the Session-Id is present in the request, it MUST be included +%% in the answer. +%% +%% - Any Proxy-Info AVPs in the request MUST be added to the answer +%% message, in the same order they were present in the request. +%% +%% - The 'P' bit is set to the same value as the one in the request. +%% +%% - The same End-to-End identifier in the request is used in the +%% answer. +%% +%% Note that the error messages (see Section 7.3) are also subjected to +%% the above processing rules. + +%% 7.3. Error-Message AVP +%% +%% The Error-Message AVP (AVP Code 281) is of type UTF8String. It MAY +%% accompany a Result-Code AVP as a human readable error message. The +%% Error-Message AVP is not intended to be useful in real-time, and +%% SHOULD NOT be expected to be parsed by network entities. + +%% answer_message/5 + +answer_message(OH, OR, RC, Dict0, Avps) -> + {Code, _, Vid} = Dict0:avp_header('Session-Id'), + ['answer-message', {'Origin-Host', OH}, + {'Origin-Realm', OR}, + {'Result-Code', RC} + | session_id(Code, Vid, Dict0, Avps)]. + +session_id(Code, Vid, Dict0, Avps) + when is_list(Avps) -> + try + {value, #diameter_avp{data = D}} = find_avp(Code, Vid, Avps), + [{'Session-Id', [Dict0:avp(decode, D, 'Session-Id')]}] + catch + error: _ -> + [] + end. + +%% find_avp/3 + +find_avp(Code, Vid, Avps) + when is_integer(Code), (undefined == Vid orelse is_integer(Vid)) -> + find(fun(A) -> is_avp(Code, Vid, A) end, Avps). + +%% The final argument here could be a list of AVP's, depending on the case, +%% but we're only searching at the top level. +is_avp(Code, Vid, #diameter_avp{code = Code, vendor_id = Vid}) -> + true; +is_avp(_, _, _) -> + false. + +find(_, []) -> + false; +find(Pred, [H|T]) -> + case Pred(H) of + true -> + {value, H}; + false -> + find(Pred, T) + end. + +%% 7. Error Handling +%% +%% There are certain Result-Code AVP application errors that require +%% additional AVPs to be present in the answer. In these cases, the +%% Diameter node that sets the Result-Code AVP to indicate the error +%% MUST add the AVPs. Examples are: +%% +%% - An unrecognized AVP is received with the 'M' bit (Mandatory bit) +%% set, causes an answer to be sent with the Result-Code AVP set to +%% DIAMETER_AVP_UNSUPPORTED, and the Failed-AVP AVP containing the +%% offending AVP. +%% +%% - An AVP that is received with an unrecognized value causes an +%% answer to be returned with the Result-Code AVP set to +%% DIAMETER_INVALID_AVP_VALUE, with the Failed-AVP AVP containing the +%% AVP causing the error. +%% +%% - A command is received with an AVP that is omitted, yet is +%% mandatory according to the command's ABNF. The receiver issues an +%% answer with the Result-Code set to DIAMETER_MISSING_AVP, and +%% creates an AVP with the AVP Code and other fields set as expected +%% in the missing AVP. The created AVP is then added to the Failed- +%% AVP AVP. +%% +%% The Result-Code AVP describes the error that the Diameter node +%% encountered in its processing. In case there are multiple errors, +%% the Diameter node MUST report only the first error it encountered +%% (detected possibly in some implementation dependent order). The +%% specific errors that can be described by this AVP are described in +%% the following section. + +%% 7.5. Failed-AVP AVP +%% +%% The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides +%% debugging information in cases where a request is rejected or not +%% fully processed due to erroneous information in a specific AVP. The +%% value of the Result-Code AVP will provide information on the reason +%% for the Failed-AVP AVP. +%% +%% The possible reasons for this AVP are the presence of an improperly +%% constructed AVP, an unsupported or unrecognized AVP, an invalid AVP +%% value, the omission of a required AVP, the presence of an explicitly +%% excluded AVP (see tables in Section 10), or the presence of two or +%% more occurrences of an AVP which is restricted to 0, 1, or 0-1 +%% occurrences. +%% +%% A Diameter message MAY contain one Failed-AVP AVP, containing the +%% entire AVP that could not be processed successfully. If the failure +%% reason is omission of a required AVP, an AVP with the missing AVP +%% code, the missing vendor id, and a zero filled payload of the minimum +%% required length for the omitted AVP will be added. + +%% incr/4 +%% +%% Increment a stats counter for result codes in incoming and outgoing +%% answers. + +%% Outgoing message as binary: don't count. (Sending binaries is only +%% partially supported.) +incr(_, #diameter_packet{msg = undefined}, _, _, _) -> + ok; + +%% Incoming with decode errors. +incr(recv = D, #diameter_packet{header = H, errors = [_|_]}, _, TPid, _) -> + incr(TPid, {diameter_codec:msg_id(H), D, error}); + +%% Incoming without errors or outgoing. Outgoing with encode errors +%% never gets here since encode fails. +incr(Dir, Pkt, Dict, TPid, Dict0) -> + #diameter_packet{header = #diameter_header{is_error = E} + = Hdr, + msg = Rec} + = Pkt, + + RC = int(get_avp_value(Dict, 'Result-Code', Rec)), + + %% Exit on an improper Result-Code. + is_result(RC, E, Dict0) + orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]), + + irc(TPid, Hdr, Dir, rc_counter(Dict, Rec, RC)). + +%% No E-bit: can't be 3xxx. +is_result(RC, false, _Dict0) -> + RC < 3000 orelse 4000 =< RC; + +%% E-bit in RFC 3588: only 3xxx. +is_result(RC, true, ?BASE) -> + 3000 =< RC andalso RC < 4000; + +%% E-bit in RFC 6733: 3xxx or 5xxx. +is_result(RC, true, _) -> + 3000 =< RC andalso RC < 4000 + orelse + 5000 =< RC andalso RC < 6000. + +irc(_, _, _, undefined) -> + false; + +irc(TPid, Hdr, Dir, Ctr) -> + incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}). + +%% incr/2 + +incr(TPid, Counter) -> + diameter_stats:incr(Counter, TPid, 1). + +%% rc_counter/2 + +%% RFC 3588, 7.6: +%% +%% All Diameter answer messages defined in vendor-specific +%% applications MUST include either one Result-Code AVP or one +%% Experimental-Result AVP. +%% +%% Maintain statistics assuming one or the other, not both, which is +%% surely the intent of the RFC. + +rc_counter(Dict, Rec, undefined) -> + rcc(get_avp_value(Dict, 'Experimental-Result', Rec)); +rc_counter(_, _, RC) -> + {'Result-Code', RC}. + +%% Outgoing answers may be in any of the forms messages can be sent +%% in. Incoming messages will be records. We're assuming here that the +%% arity of the result code AVP's is 0 or 1. + +rcc([{_,_,N} = T | _]) + when is_integer(N) -> + T; +rcc({_,_,N} = T) + when is_integer(N) -> + T; +rcc(_) -> + undefined. + +%% Extract the first good looking integer. There's no guarantee +%% that what we're looking for has arity 1. +int([N|_]) + when is_integer(N) -> + N; +int(N) + when is_integer(N) -> + N; +int(_) -> + undefined. + +-spec x(any(), atom(), list()) -> no_return(). + +%% Warn and exit request process on errors in an incoming answer. +x(Reason, F, A) -> + diameter_lib:warning_report(Reason, {?MODULE, F, A}), + x(Reason). + +x(T) -> + exit(T). + +%% --------------------------------------------------------------------------- +%% # send_request/4 +%% +%% Handle an outgoing Diameter request. +%% --------------------------------------------------------------------------- + +send_request(SvcName, AppOrAlias, Msg, Options) + when is_list(Options) -> + Rec = make_options(Options), + Ref = make_ref(), + Caller = {self(), Ref}, + ReqF = fun() -> + exit({Ref, send_R(SvcName, AppOrAlias, Msg, Rec, Caller)}) + end, + try spawn_monitor(ReqF) of + {_, MRef} -> + recv_A(MRef, Ref, Rec#options.detach, false) + catch + error: system_limit = E -> + {error, E} + end. +%% The R in send_R is because Diameter request are usually given short +%% names of the form XXR. (eg. CER, DWR, etc.) Similarly, answers have +%% names of the form XXA. + +%% Don't rely on gen_server:call/3 for the timeout handling since it +%% makes no guarantees about not leaving a reply message in the +%% mailbox if we catch its exit at timeout. It currently *can* do so, +%% which is also undocumented. + +recv_A(MRef, _, true, true) -> + erlang:demonitor(MRef, [flush]), + ok; + +recv_A(MRef, Ref, Detach, Sent) -> + receive + Ref -> %% send has been attempted + recv_A(MRef, Ref, Detach, true); + {'DOWN', MRef, process, _, Reason} -> + answer_rc(Reason, Ref, Sent) + end. + +%% send_R/5 has returned ... +answer_rc({Ref, Ans}, Ref, _) -> + Ans; + +%% ... or not. Note that failure/encode are documented return values. +answer_rc(_, _, Sent) -> + {error, choose(Sent, failure, encode)}. + +%% send_R/5 +%% +%% In the process spawned for the outgoing request. + +send_R(SvcName, AppOrAlias, Msg, Opts, Caller) -> + case pick_peer(SvcName, AppOrAlias, Msg, Opts) of + {{_,_,_} = Transport, Mask} -> + send_request(Transport, Mask, Msg, Opts, Caller, SvcName); + false -> + {error, no_connection}; + {error, _} = No -> + No + end. + +%% make_options/1 + +make_options(Options) -> + lists:foldl(fun mo/2, #options{}, Options). + +mo({timeout, T}, Rec) + when is_integer(T), 0 =< T -> + Rec#options{timeout = T}; + +mo({filter, F}, #options{filter = none} = Rec) -> + Rec#options{filter = F}; +mo({filter, F}, #options{filter = {all, Fs}} = Rec) -> + Rec#options{filter = {all, [F | Fs]}}; +mo({filter, F}, #options{filter = F0} = Rec) -> + Rec#options{filter = {all, [F0, F]}}; + +mo({extra, L}, #options{extra = X} = Rec) + when is_list(L) -> + Rec#options{extra = X ++ L}; + +mo(detach, Rec) -> + Rec#options{detach = true}; + +mo(T, _) -> + ?ERROR({invalid_option, T}). + +%% --------------------------------------------------------------------------- +%% # send_request/6 +%% --------------------------------------------------------------------------- + +%% Send an outgoing request in its dedicated process. +%% +%% Note that both encode of the outgoing request and of the received +%% answer happens in this process. It's also this process that replies +%% to the caller. The service process only handles the state-retaining +%% callbacks. +%% +%% The module field of the #diameter_app{} here includes any extra +%% arguments passed to diameter:call/4. + +send_request({TPid, Caps, App} + = Transport, + Mask, + Msg, + Opts, + Caller, + SvcName) -> + Pkt = make_prepare_packet(Mask, Msg), + + send_R(cb(App, prepare_request, [Pkt, SvcName, {TPid, Caps}]), + Pkt, + Transport, + Opts, + Caller, + SvcName, + []). + +send_R({send, Msg}, Pkt, Transport, Opts, Caller, SvcName, Fs) -> + send_R(make_request_packet(Msg, Pkt), + Transport, + Opts, + Caller, + SvcName, + Fs); + +send_R({discard, Reason} , _, _, _, _, _, _) -> + {error, Reason}; + +send_R(discard, _, _, _, _, _, _) -> + {error, discarded}; + +send_R({eval_packet, RC, F}, Pkt, T, Opts, Caller, SvcName, Fs) -> + send_R(RC, Pkt, T, Opts, Caller, SvcName, [F|Fs]); + +send_R(E, _, {_, _, App}, _, _, _, _) -> + ?ERROR({invalid_return, E, prepare_request, App}). + +%% make_prepare_packet/2 +%% +%% Turn an outgoing request as passed to call/4 into a diameter_packet +%% record in preparation for a prepare_request callback. + +make_prepare_packet(_, Bin) + when is_binary(Bin) -> + #diameter_packet{header = diameter_codec:decode_header(Bin), + bin = Bin}; + +make_prepare_packet(Mask, #diameter_packet{msg = [#diameter_header{} = Hdr + | Avps]} + = Pkt) -> + Pkt#diameter_packet{msg = [make_prepare_header(Mask, Hdr) | Avps]}; + +make_prepare_packet(Mask, #diameter_packet{header = Hdr} = Pkt) -> + Pkt#diameter_packet{header = make_prepare_header(Mask, Hdr)}; + +make_prepare_packet(Mask, Msg) -> + make_prepare_packet(Mask, #diameter_packet{msg = Msg}). + +%% make_prepare_header/2 + +make_prepare_header(Mask, undefined) -> + Seq = diameter_session:sequence(Mask), + make_prepare_header(#diameter_header{end_to_end_id = Seq, + hop_by_hop_id = Seq}); + +make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined, + hop_by_hop_id = undefined} + = H) -> + Seq = diameter_session:sequence(Mask), + make_prepare_header(H#diameter_header{end_to_end_id = Seq, + hop_by_hop_id = Seq}); + +make_prepare_header(Mask, #diameter_header{end_to_end_id = undefined} = H) -> + Seq = diameter_session:sequence(Mask), + make_prepare_header(H#diameter_header{end_to_end_id = Seq}); + +make_prepare_header(Mask, #diameter_header{hop_by_hop_id = undefined} = H) -> + Seq = diameter_session:sequence(Mask), + make_prepare_header(H#diameter_header{hop_by_hop_id = Seq}); + +make_prepare_header(_, Hdr) -> + make_prepare_header(Hdr). + +%% make_prepare_header/1 + +make_prepare_header(#diameter_header{version = undefined} = Hdr) -> + make_prepare_header(Hdr#diameter_header{version = ?DIAMETER_VERSION}); + +make_prepare_header(#diameter_header{} = Hdr) -> + Hdr; + +make_prepare_header(T) -> + ?ERROR({invalid_header, T}). + +%% make_request_packet/2 +%% +%% Reconstruct a diameter_packet from the return value of +%% prepare_request or prepare_retransmit callback. + +make_request_packet(Bin, _) + when is_binary(Bin) -> + make_prepare_packet(false, Bin); + +make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]} + = Pkt, + _) -> + Pkt; + +%% Returning a diameter_packet with no header from a prepare_request +%% or prepare_retransmit callback retains the header passed into it. +%% This is primarily so that the end to end and hop by hop identifiers +%% are retained. +make_request_packet(#diameter_packet{header = Hdr} = Pkt, + #diameter_packet{header = Hdr0}) -> + Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)}; + +make_request_packet(Msg, Pkt) -> + Pkt#diameter_packet{msg = Msg}. + +%% fold_record/2 + +fold_record(undefined, R) -> + R; +fold_record(Rec, R) -> + diameter_lib:fold_tuple(2, Rec, R). + +%% send_R/6 + +send_R(Pkt0, + {TPid, Caps, #diameter_app{dictionary = Dict} = App}, + Opts, + {Pid, Ref}, + SvcName, + Fs) -> + Pkt = encode(Dict, Pkt0, Fs), + + #options{timeout = Timeout} + = Opts, + + Req = #request{ref = Ref, + caller = Pid, + handler = self(), + transport = TPid, + caps = Caps, + packet = Pkt0}, + + try + TRef = send_request(TPid, Pkt, Req, SvcName, Timeout), + Pid ! Ref, %% tell caller a send has been attempted + handle_answer(SvcName, + App, + recv_A(Timeout, SvcName, App, Opts, {TRef, Req})) + after + erase_requests(Pkt) + end. + +%% recv_A/5 + +recv_A(Timeout, SvcName, App, Opts, {TRef, #request{ref = Ref} = Req}) -> + %% Matching on TRef below ensures we ignore messages that pertain + %% to a previous transport prior to failover. The answer message + %% includes the #request{} since it's not necessarily Req; that + %% is, from the last peer to which we've transmitted. + receive + {answer = A, Ref, Rq, Dict0, Pkt} -> %% Answer from peer + {A, Rq, Dict0, Pkt}; + {timeout = Reason, TRef, _} -> %% No timely reply + {error, Req, Reason}; + {failover, TRef} -> %% Service says peer has gone down + retransmit(pick_peer(SvcName, App, Req, Opts), + Req, + Opts, + SvcName, + Timeout) + end. + +%% handle_answer/3 + +handle_answer(SvcName, App, {error, Req, Reason}) -> + handle_error(App, Req, Reason, SvcName); + +handle_answer(SvcName, + #diameter_app{dictionary = Dict, + id = Id} + = App, + {answer, Req, Dict0, Pkt}) -> + Mod = dict(Dict, Dict0, Pkt), + handle_A(errors(Id, diameter_codec:decode(Mod, Pkt)), + SvcName, + Mod, + Dict0, + App, + Req). + +%% We don't really need to do a full decode if we're a relay and will +%% just resend with a new hop by hop identifier, but might a proxy +%% want to examine the answer? + +handle_A(Pkt, SvcName, Dict, Dict0, App, #request{transport = TPid} = Req) -> + try + incr(recv, Pkt, Dict, TPid, Dict0) %% count incoming result codes + of + _ -> answer(Pkt, SvcName, App, Req) + catch + exit: {invalid_error_bit, RC} -> + #diameter_packet{errors = Es} + = Pkt, + E = {5004, #diameter_avp{name = 'Result-Code', value = RC}}, + answer(Pkt#diameter_packet{errors = [E|Es]}, SvcName, App, Req) + end. + +answer(Pkt, + SvcName, + #diameter_app{module = ModX, + options = [{answer_errors, AE} | _]}, + Req) -> + a(Pkt, SvcName, ModX, AE, Req). + +a(#diameter_packet{errors = Es} + = Pkt, + SvcName, + ModX, + AE, + #request{transport = TPid, + caps = Caps, + packet = P}) + when [] == Es; + callback == AE -> + cb(ModX, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]); + +a(Pkt, SvcName, _, report, Req) -> + x(errors, handle_answer, [SvcName, Req, Pkt]); + +a(Pkt, SvcName, _, discard, Req) -> + x({errors, handle_answer, [SvcName, Req, Pkt]}). + +%% Note that we don't check that the application id in the answer's +%% header is what we expect. (TODO: Does the rfc says anything about +%% this?) + +%% Note that failover starts a new timer and that expiry of an old +%% timer value is ignored. This means that an answer could be accepted +%% from a peer after timeout in the case of failover. + +retransmit({{_,_,App} = Transport, _Mask}, Req, Opts, SvcName, Timeout) -> + try retransmit(Transport, Req, SvcName, Timeout) of + T -> recv_A(Timeout, SvcName, App, Opts, T) + catch + ?FAILURE(Reason) -> {error, Req, Reason} + end; + +retransmit(_, Req, _, _, _) -> %% no alternate peer + {error, Req, failover}. + +%% pick_peer/4 + +%% Retransmission after failover: call-specific arguments have already +%% been appended in App. +pick_peer(SvcName, + App, + #request{packet = #diameter_packet{msg = Msg}}, + Opts) -> + pick_peer(SvcName, App, Msg, Opts#options{extra = []}); + +pick_peer(_, _, undefined, _) -> + false; + +pick_peer(SvcName, + AppOrAlias, + Msg, + #options{filter = Filter, extra = Xtra}) -> + diameter_service:pick_peer(SvcName, + AppOrAlias, + {fun(D) -> get_destination(D, Msg) end, + Filter, + Xtra}). + +%% handle_error/4 + +handle_error(App, + #request{packet = Pkt, + transport = TPid, + caps = Caps}, + Reason, + SvcName) -> + cb(App, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]). + +msg(#diameter_packet{msg = undefined, bin = Bin}) -> + Bin; +msg(#diameter_packet{msg = Msg}) -> + Msg. + +%% encode/3 + +encode(Dict, Pkt, Fs) -> + P = encode(Dict, Pkt), + eval_packet(P, Fs), + P. + +%% encode/2 + +%% Note that prepare_request can return a diameter_packet containing a +%% header or transport_data. Even allow the returned record to contain +%% an encoded binary. This isn't the usual case and doesn't properly +%% support retransmission but is useful for test. + +%% A message to be encoded. +encode(Dict, #diameter_packet{bin = undefined} = Pkt) -> + diameter_codec:encode(Dict, Pkt); + +%% An encoded binary: just send. +encode(_, #diameter_packet{} = Pkt) -> + Pkt. + +%% send_request/5 + +send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, _SvcName, Timeout) + when node() == node(TPid) -> + %% Store the outgoing request before sending to avoid a race with + %% reply reception. + TRef = store_request(TPid, Bin, Req, Timeout), + send(TPid, Pkt), + TRef; + +%% Send using a remote transport: spawn a process on the remote node +%% to relay the answer. +send_request(TPid, #diameter_packet{} = Pkt, Req, SvcName, Timeout) -> + TRef = erlang:start_timer(Timeout, self(), TPid), + T = {TPid, Pkt, Req, SvcName, Timeout, TRef}, + spawn(node(TPid), ?MODULE, send, [T]), + TRef. + +%% send/1 + +send({TPid, Pkt, #request{handler = Pid} = Req, SvcName, Timeout, TRef}) -> + Ref = send_request(TPid, + Pkt, + Req#request{handler = self()}, + SvcName, + Timeout), + Pid ! reref(receive T -> T end, Ref, TRef). + +reref({T, Ref, R}, Ref, TRef) -> + {T, TRef, R}; +reref(T, _, _) -> + T. + +%% send/2 + +send(Pid, Pkt) -> + Pid ! {send, Pkt}. + +%% retransmit/4 + +retransmit({TPid, Caps, App} + = Transport, + #request{packet = Pkt0} + = Req, + SvcName, + Timeout) -> + have_request(Pkt0, TPid) %% Don't failover to a peer we've + andalso ?THROW(timeout), %% already sent to. + + #diameter_packet{header = Hdr0} = Pkt0, + Hdr = Hdr0#diameter_header{is_retransmitted = true}, + Pkt = Pkt0#diameter_packet{header = Hdr}, + + retransmit(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]), + Transport, + Req#request{packet = Pkt}, + SvcName, + Timeout, + []). + +retransmit({send, Msg}, + Transport, + #request{packet = Pkt} + = Req, + SvcName, + Timeout, + Fs) -> + resend_request(make_request_packet(Msg, Pkt), + Transport, + Req, + SvcName, + Timeout, + Fs); + +retransmit({discard, Reason}, _, _, _, _, _) -> + ?THROW(Reason); + +retransmit(discard, _, _, _, _, _) -> + ?THROW(discarded); + +retransmit({eval_packet, RC, F}, Transport, Req, SvcName, Timeout, Fs) -> + retransmit(RC, Transport, Req, SvcName, Timeout, [F|Fs]); + +retransmit(T, {_, _, App}, _, _, _, _) -> + ?ERROR({invalid_return, T, prepare_retransmit, App}). + +resend_request(Pkt0, + {TPid, Caps, #diameter_app{dictionary = Dict}}, + Req0, + SvcName, + Tmo, + Fs) -> + Pkt = encode(Dict, Pkt0, Fs), + + Req = Req0#request{transport = TPid, + packet = Pkt0, + caps = Caps}, + + ?LOG(retransmission, Req), + TRef = send_request(TPid, Pkt, Req, SvcName, Tmo), + {TRef, Req}. + +%% store_request/4 + +store_request(TPid, Bin, Req, Timeout) -> + Seqs = diameter_codec:sequence_numbers(Bin), + TRef = erlang:start_timer(Timeout, self(), timeout), + ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}), + ets:member(?REQUEST_TABLE, TPid) + orelse (self() ! {failover, TRef}), %% failover/1 may have missed + TRef. + +%% lookup_request/2 + +lookup_request(Msg, TPid) -> + Seqs = diameter_codec:sequence_numbers(Msg), + Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, '_'}, + [], + ['$_']}], + case ets:select(?REQUEST_TABLE, Spec) of + [{_, Req, _}] -> + Req; + [] -> + false + end. + +%% erase_requests/1 + +erase_requests(Pkt) -> + ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)). + +%% match_requests/1 + +match_requests(TPid) -> + Pat = {'_', #request{transport = TPid, _ = '_'}, '_'}, + ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}]). + +%% have_request/2 + +have_request(Pkt, TPid) -> + Seqs = diameter_codec:sequence_numbers(Pkt), + Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'}, + '$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1). + +%% --------------------------------------------------------------------------- +%% # failover/1-2 +%% --------------------------------------------------------------------------- + +failover(TPid) + when is_pid(TPid) -> + lists:foreach(fun failover/1, match_requests(TPid)); +%% Note that a request process can store its request after failover +%% notifications are sent here: store_request/4 sends the notification +%% in that case. + +%% Failover as a consequence of request_peer_down/1: inform the +%% request process. +failover({_, Req, TRef}) -> + #request{handler = Pid, + packet = #diameter_packet{msg = M}} + = Req, + M /= undefined andalso (Pid ! {failover, TRef}). +%% Failover is not performed when msg = binary() since sending +%% pre-encoded binaries is only partially supported. (Mostly for +%% test.) + +%% get_destination/2 + +get_destination(Dict, Msg) -> + [str(get_avp_value(Dict, D, Msg)) || D <- ['Destination-Realm', + 'Destination-Host']]. + +%% This is not entirely correct. The avp could have an arity 1, in +%% which case an empty list is a DiameterIdentity of length 0 rather +%% than the list of no values we treat it as by mapping to undefined. +%% This behaviour is documented. +str([]) -> + undefined; +str(T) -> + T. + +%% get_avp_value/3 +%% +%% Find an AVP in a message of one of three forms: +%% +%% - a message record (as generated from a .dia spec) or +%% - a list of an atom message name followed by 2-tuple, avp name/value pairs. +%% - a list of a #diameter_header{} followed by #diameter_avp{} records, +%% +%% In the first two forms a dictionary module is used at encode to +%% identify the type of the AVP and its arity in the message in +%% question. The third form allows messages to be sent as is, without +%% a dictionary, which is needed in the case of relay agents, for one. + +%% Messages will be header/avps list as a relay and the only AVP's we +%% look for are in the common dictionary. This is required since the +%% relay dictionary doesn't inherit the common dictionary (which maybe +%% it should). +get_avp_value(?RELAY, Name, Msg) -> + get_avp_value(?BASE, Name, Msg); + +%% Message sent as a header/avps list, probably a relay case but not +%% necessarily. +get_avp_value(Dict, Name, [#diameter_header{} | Avps]) -> + try + {Code, _, VId} = Dict:avp_header(Name), + [A|_] = lists:dropwhile(fun(#diameter_avp{code = C, vendor_id = V}) -> + C /= Code orelse V /= VId + end, + Avps), + avp_decode(Dict, Name, A) + catch + error: _ -> + undefined + end; + +%% Outgoing message as a name/values list. +get_avp_value(_, Name, [_MsgName | Avps]) -> + case lists:keyfind(Name, 1, Avps) of + {_, V} -> + V; + _ -> + undefined + end; + +%% Message is typically a record but not necessarily. +get_avp_value(Dict, Name, Rec) -> + try + Dict:'#get-'(Name, Rec) + catch + error:_ -> + undefined + end. + +avp_decode(Dict, Name, #diameter_avp{value = undefined, + data = Bin}) -> + Dict:avp(decode, Bin, Name); +avp_decode(_, _, #diameter_avp{value = V}) -> + V. + +cb(#diameter_app{module = [_|_] = M}, F, A) -> + eval(M, F, A); +cb([_|_] = M, F, A) -> + eval(M, F, A). + +eval([M|X], F, A) -> + apply(M, F, A ++ X). + +choose(true, X, _) -> X; +choose(false, _, X) -> X. diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index a5429c967c..073a415d10 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -45,6 +45,8 @@ -define(DEFAULT_TW_INIT, 30000). %% RFC 3539 ch 3.4.1 -define(NOMASK, {0,32}). %% default sequence mask +-define(BASE, ?DIAMETER_DICT_COMMON). + -record(watchdog, {%% PCB - Peer Control Block; see RFC 3539, Appendix A status = initial :: initial | okay | suspect | down | reopen, @@ -56,33 +58,36 @@ %% end PCB parent = self() :: pid(), %% service process transport :: pid() | undefined, %% peer_fsm process - tref :: reference(), %% reference for current watchdog timer - message_data, %% term passed into diameter_service with message + tref :: reference(), %% reference for current watchdog timer + dictionary :: module(), %% common dictionary + receive_data :: term(), + %% term passed into diameter_service with incoming message sequence :: diameter:sequence(), %% mask restrict :: {diameter:restriction(), boolean()}, shutdown = false :: boolean()}). +%% --------------------------------------------------------------------------- %% start/2 %% %% Start a monitor before the watchdog is allowed to proceed to ensure %% that a failed capabilities exchange produces the desired exit %% reason. +%% --------------------------------------------------------------------------- --spec start(Type, {RecvData, [Opt], SvcName, SvcOpts, #diameter_service{}}) +-spec start(Type, {RecvData, [Opt], SvcOpts, #diameter_service{}}) -> {reference(), pid()} when Type :: {connect|accept, diameter:transport_ref()}, RecvData :: term(), Opt :: diameter:transport_opt(), - SvcOpts :: [diameter:service_opt()], - SvcName :: diameter:service_name(). + SvcOpts :: [diameter:service_opt()]. start({_,_} = Type, T) -> - Ref = make_ref(), - {ok, Pid} = diameter_watchdog_sup:start_child({Ref, {Type, self(), T}}), + Ack = make_ref(), + {ok, Pid} = diameter_watchdog_sup:start_child({Ack, Type, self(), T}), try {erlang:monitor(process, Pid), Pid} after - Pid ! Ref + send(Pid, Ack) end. start_link(T) -> @@ -101,39 +106,95 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({Ref, {_, Pid, _} = T}) -> - MRef = erlang:monitor(process, Pid), - receive - Ref -> - make_state(T); - {'DOWN', MRef, process, _, _} = D -> - exit({shutdown, D}) - end. - -make_state({T, Pid, {RecvData, - Opts, - SvcName, - SvcOpts, - #diameter_service{applications = Apps, - capabilities = Caps} - = Svc}}) -> +i({Ack, T, Pid, {RecvData, + Opts, + SvcOpts, + #diameter_service{applications = Apps, + capabilities = Caps} + = Svc}}) -> + erlang:monitor(process, Pid), + wait(Ack, Pid), random:seed(now()), putr(restart, {T, Opts, Svc}), %% save seeing it in trace putr(dwr, dwr(Caps)), %% {_,_} = Mask = proplists:get_value(sequence, SvcOpts), Restrict = proplists:get_value(restrict_connections, SvcOpts), Nodes = restrict_nodes(Restrict), + Dict0 = common_dictionary(Apps), #watchdog{parent = Pid, - transport = monitor(diameter_peer_fsm:start(T, - Opts, - {Mask, Nodes, Svc})), + transport = start(T, Opts, Mask, Nodes, Dict0, Svc), tw = proplists:get_value(watchdog_timer, Opts, ?DEFAULT_TW_INIT), - message_data = {RecvData, SvcName, Apps, Mask}, + receive_data = RecvData, + dictionary = Dict0, sequence = Mask, restrict = {Restrict, lists:member(node(), Nodes)}}. +wait(Ref, Pid) -> + receive + Ref -> + ok; + {'DOWN', _, process, Pid, _} = D -> + exit({shutdown, D}) + end. + +%% start/5 + +start(T, Opts, Mask, Nodes, Dict0, Svc) -> + {_MRef, Pid} + = diameter_peer_fsm:start(T, Opts, {Mask, Nodes, Dict0, Svc}), + Pid. + +%% common_dictionary/1 +%% +%% Determine the dictionary of the Diameter common application with +%% Application Id 0. Fail on config errors. + +common_dictionary(Apps) -> + case + orddict:fold(fun dict0/3, + false, + lists:foldl(fun(#diameter_app{dictionary = M}, D) -> + orddict:append(M:id(), M, D) + end, + orddict:new(), + Apps)) + of + {value, Mod} -> + Mod; + false -> + %% A transport should configure a common dictionary but + %% don't require it. Not configuring a common dictionary + %% means a user won't be able either send of receive + %% messages in the common dictionary: incoming request + %% will be answered with 3007 and outgoing requests cannot + %% be sent. The dictionary returned here is oly used for + %% messages diameter sends and receives: CER/CEA, DPR/DPA + %% and DWR/DWA. + ?BASE + end. + +%% Each application should be represented by a single dictionary. +dict0(Id, [_,_|_] = Ms, _) -> + config_error({multiple_dictionaries, Ms, {application_id, Id}}); + +%% An explicit common dictionary. +dict0(?APP_ID_COMMON, [Mod], _) -> + {value, Mod}; + +%% A pure relay, in which case the common application is implicit. +%% This uses the fact that the common application will already have +%% been folded. +dict0(?APP_ID_RELAY, _, false) -> + {value, ?BASE}; + +dict0(_, _, Acc) -> + Acc. + +config_error(T) -> + ?ERROR({configuration_error, T}). + %% handle_call/3 handle_call(_, _, State) -> @@ -151,41 +212,59 @@ handle_info(T, #watchdog{} = State) -> ok -> {noreply, State}; #watchdog{} = S -> - event(State, S), + close(T, State), %% service expects 'close' message + event(T, State, S), %% before 'watchdog' {noreply, S}; stop -> ?LOG(stop, T), - event(State, State#watchdog{status = down}), + event(T, State, State#watchdog{status = down}), {stop, {shutdown, T}, State} - end; + end. -handle_info(T, S) -> - handle_info(T, upgrade(S)). +close({'DOWN', _, process, TPid, {shutdown, Reason}}, + #watchdog{transport = TPid, + parent = Pid}) -> + send(Pid, {close, self(), Reason}); -upgrade(S) -> - #watchdog{} = list_to_tuple(tuple_to_list(S) - ++ [?NOMASK, {nodes, true}, false]). +close(_, _) -> + ok. -event(#watchdog{status = T}, #watchdog{status = T}) -> +event(_, #watchdog{status = T}, #watchdog{status = T}) -> ok; -event(#watchdog{transport = undefined}, #watchdog{transport = undefined}) -> +event(_, #watchdog{transport = undefined}, #watchdog{transport = undefined}) -> ok; -event(#watchdog{status = From, transport = F, parent = Pid}, +event(Msg, + #watchdog{status = From, transport = F, parent = Pid}, #watchdog{status = To, transport = T}) -> - E = {tpid(F,T), From, To}, - notify(Pid, E), + TPid = tpid(F,T), + E = {[TPid | data(Msg, TPid, From, To)], From, To}, + send(Pid, {watchdog, self(), E}), ?LOG(transition, {self(), E}). +data(Msg, TPid, reopen, okay) -> + {recv, TPid, 'DWA', _Pkt} = Msg, %% assert + {TPid, T} = eraser(open), + [T]; + +data({open, TPid, _Hosts, T}, TPid, _From, To) + when To == okay; + To == reopen -> + [T]; + +data(_, _, _, _) -> + []. + tpid(_, Pid) when is_pid(Pid) -> Pid; + tpid(Pid, _) -> Pid. -notify(Pid, E) -> - Pid ! {watchdog, self(), E}. +send(Pid, T) -> + Pid ! T. %% terminate/2 @@ -215,15 +294,13 @@ transition(close, #watchdog{}) -> ok; %% Service is asking for the peer to be taken down gracefully. -transition({shutdown, Pid}, #watchdog{parent = Pid, - transport = undefined, - status = S}) -> - down = S, %% sanity check +transition({shutdown, Pid, _}, #watchdog{parent = Pid, + transport = undefined}) -> stop; -transition({shutdown = T, Pid}, #watchdog{parent = Pid, - transport = TPid} - = S) -> - TPid ! {T, self()}, +transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid, + transport = TPid} + = S) -> + send(TPid, {T, self(), Reason}), S#watchdog{shutdown = true}; %% Parent process has died, @@ -234,13 +311,9 @@ transition({'DOWN', _, process, Pid, _Reason}, %% Transport has accepted a connection. transition({accepted = T, TPid}, #watchdog{transport = TPid, parent = Pid}) -> - Pid ! {T, self(), TPid}, + send(Pid, {T, self(), TPid}), ok; -%% Transport is telling us that its impending death isn't failure. -transition({close, TPid, _Reason}, #watchdog{transport = TPid}) -> - stop; - %% STATE Event Actions New State %% ===== ------ ------- ---------- %% INITIAL Connection up SetWatchdog() OKAY @@ -255,15 +328,13 @@ transition({close, TPid, _Reason}, #watchdog{transport = TPid}) -> %% know the identity of the peer (ie. now) that we know that we're in %% state down rather than initial. -transition({open, TPid, Hosts, T} = Open, +transition({open, TPid, Hosts, _} = Open, #watchdog{transport = TPid, status = initial, - parent = Pid, restrict = {_, R}} = S) -> case okay(getr(restart), Hosts, R) of okay -> - open(Pid, {TPid, T}), set_watchdog(S#watchdog{status = okay}); reopen -> transition(Open, S#watchdog{status = down}) @@ -274,17 +345,15 @@ transition({open, TPid, Hosts, T} = Open, %% SetWatchdog() %% Pending = TRUE REOPEN -transition({open = P, TPid, _Hosts, T}, +transition({open = Key, TPid, _Hosts, T}, #watchdog{transport = TPid, - parent = Pid, status = down} = S) -> %% Store the info we need to notify the parent to reopen the %% connection after the requisite DWA's are received, at which %% time we eraser(open). The reopen message is a later addition, %% to communicate the new capabilities as soon as they're known. - putr(P, {TPid, T}), - Pid ! {reopen, self(), {TPid, T}}, + putr(Key, {TPid, T}), set_watchdog(send_watchdog(S#watchdog{status = reopen, num_dwa = 0})); @@ -296,26 +365,18 @@ transition({open = P, TPid, _Hosts, T}, %% REOPEN Connection down CloseConnection() %% SetWatchdog() DOWN -transition({'DOWN', _, process, TPid, _}, +transition({'DOWN', _, process, TPid, _Reason}, #watchdog{transport = TPid, - status = S, - shutdown = D}) - when S == initial; - D -> + shutdown = true}) -> stop; -transition({'DOWN', _, process, TPid, _}, - #watchdog{transport = TPid} +transition({'DOWN', _, process, TPid, _Reason}, + #watchdog{transport = TPid, + status = T} = S) -> - failover(S), - close(S), - set_watchdog(S#watchdog{status = down, + set_watchdog(S#watchdog{status = case T of initial -> T; _ -> down end, pending = false, transport = undefined}); -%% Any outstanding pending (or other messages from the transport) will -%% have arrived before 'DOWN' since the message comes from the same -%% process. Note that we could also get this message in the initial -%% state. %% Incoming message. transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) -> @@ -331,15 +392,11 @@ transition({timeout, _, tw}, #watchdog{}) -> %% State query. transition({state, Pid}, #watchdog{status = S}) -> - Pid ! {self(), S}, + send(Pid, {self(), S}), ok. %% =========================================================================== -monitor(Pid) -> - erlang:monitor(process, Pid), - Pid. - putr(Key, Val) -> put({?MODULE, Key}, Val). @@ -349,16 +406,16 @@ getr(Key) -> eraser(Key) -> erase({?MODULE, Key}). -%% encode/2 +%% encode/3 -encode(Msg, Mask) -> +encode(Msg, Mask, Dict) -> Seq = diameter_session:sequence(Mask), Hdr = #diameter_header{version = ?DIAMETER_VERSION, end_to_end_id = Seq, hop_by_hop_id = Seq}, Pkt = #diameter_packet{header = Hdr, msg = Msg}, - #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt), + #diameter_packet{bin = Bin} = diameter_codec:encode(Dict, Pkt), Bin. %% okay/3 @@ -386,7 +443,7 @@ okay([{_,P}]) -> %% ... or it has. okay(C) -> - [_|_] = [P ! close || {_,P} <- C, self() /= P], + [_|_] = [send(P, close) || {_,P} <- C, self() /= P], reopen. %% set_watchdog/1 @@ -408,36 +465,14 @@ tw(T) tw({M,F,A}) -> apply(M,F,A). -%% open/2 - -open(Pid, {_,_} = T) -> - Pid ! {connection_up, self(), T}. - -%% failover/1 - -failover(#watchdog{status = okay, - parent = Pid}) -> - Pid ! {connection_down, self()}; - -failover(_) -> - ok. - -%% close/1 - -close(#watchdog{status = down}) -> - ok; - -close(#watchdog{parent = Pid}) -> - {{T, _}, _, _} = getr(restart), - T == accept andalso (Pid ! {close, self()}). - %% send_watchdog/1 send_watchdog(#watchdog{pending = false, transport = TPid, + dictionary = Dict0, sequence = Mask} = S) -> - TPid ! {send, encode(getr(dwr), Mask)}, + send(TPid, {send, encode(getr(dwr), Mask, Dict0)}), ?LOG(send, 'DWR'), S#watchdog{pending = true}. @@ -465,8 +500,9 @@ rcv(N, _, _) false; rcv(_, Pkt, #watchdog{transport = TPid, - message_data = T}) -> - diameter_service:receive_message(TPid, Pkt, T). + dictionary = Dict0, + receive_data = T}) -> + diameter_traffic:receive_message(TPid, Pkt, Dict0, T). throwaway(S) -> throw({?MODULE, throwaway, S}). @@ -518,12 +554,10 @@ rcv(_, #watchdog{status = okay} = S) -> %% SetWatchdog() OKAY rcv('DWA', #watchdog{status = suspect} = S) -> - failback(S), set_watchdog(S#watchdog{status = okay, pending = false}); rcv(_, #watchdog{status = suspect} = S) -> - failback(S), set_watchdog(S#watchdog{status = okay}); %% REOPEN Receive DWA & Pending = FALSE @@ -531,10 +565,8 @@ rcv(_, #watchdog{status = suspect} = S) -> %% Failback() OKAY rcv('DWA', #watchdog{status = reopen, - num_dwa = 2 = N, - parent = Pid} + num_dwa = 2 = N} = S) -> - open(Pid, eraser(open)), S#watchdog{status = okay, num_dwa = N+1, pending = false}; @@ -553,11 +585,6 @@ rcv('DWA', #watchdog{status = reopen, rcv(_, #watchdog{status = reopen} = S) -> throwaway(S). -%% failback/1 - -failback(#watchdog{parent = Pid}) -> - Pid ! {connection_up, self()}. - %% timeout/1 %% %% The caller sets the watchdog on the return value. @@ -582,7 +609,6 @@ timeout(#watchdog{status = T, timeout(#watchdog{status = okay, pending = true} = S) -> - failover(S), S#watchdog{status = suspect}; %% SUSPECT Timer expires CloseConnection() @@ -599,7 +625,6 @@ timeout(#watchdog{status = T, when T == suspect; T == reopen, P, N < 0 -> exit(TPid, {shutdown, watchdog_timeout}), - close(S), S#watchdog{status = down}; %% REOPEN Timer expires & NumDWA = -1 @@ -633,7 +658,9 @@ timeout(#watchdog{status = reopen, %% process has died. We only need to handle state down since we start %% the first watchdog when transitioning out of initial. -timeout(#watchdog{status = down} = S) -> +timeout(#watchdog{status = T} = S) + when T == initial; + T == down -> restart(S). %% restart/1 @@ -655,15 +682,15 @@ restart(S) -> %% state down rather then initial when receiving notification of an %% open connection. -restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid, - sequence = Mask, - restrict = {R,_}} - = S) -> - Pid ! {reconnect, self()}, +restart({{connect, _} = T, Opts, Svc}, + #watchdog{parent = Pid, + sequence = Mask, + restrict = {R,_}, + dictionary = Dict0} + = S) -> + send(Pid, {reconnect, self()}), Nodes = restrict_nodes(R), - S#watchdog{transport = monitor(diameter_peer_fsm:start(T, - Opts, - {Mask, Nodes, Svc})), + S#watchdog{transport = start(T, Opts, Mask, Nodes, Dict0, Svc), restrict = {R, lists:member(node(), Nodes)}}; %% No restriction on the number of connections to the same peer: just |