diff options
Diffstat (limited to 'lib/diameter/src')
-rw-r--r-- | lib/diameter/src/.gitignore | 1 | ||||
-rw-r--r-- | lib/diameter/src/Makefile | 48 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter.appup.src | 25 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_codec.erl | 11 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_config.erl | 5 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 21 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 493 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_stats.erl | 265 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 47 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 88 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 2 |
11 files changed, 638 insertions, 368 deletions
diff --git a/lib/diameter/src/.gitignore b/lib/diameter/src/.gitignore index feeb378fd8..cc06720fd1 100644 --- a/lib/diameter/src/.gitignore +++ b/lib/diameter/src/.gitignore @@ -1,2 +1,3 @@ /depend.mk +/otp.plt diff --git a/lib/diameter/src/Makefile b/lib/diameter/src/Makefile index dbfaa4e140..99c343275b 100644 --- a/lib/diameter/src/Makefile +++ b/lib/diameter/src/Makefile @@ -1,7 +1,7 @@ # # %CopyrightBegin% # -# Copyright Ericsson AB 2010-2011. All Rights Reserved. +# Copyright Ericsson AB 2010-2012. All Rights Reserved. # # The contents of this file are subject to the Erlang Public License, # Version 1.1, (the "License"); you may not use this file except in @@ -181,6 +181,31 @@ clean: rm -f $(TARGET_FILES) gen/* rm -f depend.mk +realclean: clean + rm -f ../ebin/* +# Not $(EBIN) just to be a bit paranoid + +PLT = ./otp.plt + +plt: + dialyzer --build_plt \ + --apps erts stdlib kernel \ + xmerl ssl public_key crypto \ + compiler syntax_tools runtime_tools \ + --output_plt $(PLT) \ + --verbose + +dialyze: opt $(PLT) + dialyzer --plt $(PLT) \ + --verbose \ + -Wno_improper_lists \ + $(EBIN)/diameter_gen_base_rfc3588.$(EMULATOR) \ + $(patsubst %, $(EBIN)/%.$(EMULATOR), \ + $(notdir $(RT_MODULES) $(CT_MODULES))) +# Omit all but the common dictionary module since these +# (diameter_gen_relay in particular) generate warning depending on how +# much of the included diameter_gen.hrl they use. + # ---------------------------------------------------- # Release targets # ---------------------------------------------------- @@ -195,27 +220,27 @@ endif release_spec: opt for d in bin ebin include src/dict; do \ - $(INSTALL_DIR) $(RELSYSDIR)/$$d; \ + $(INSTALL_DIR) "$(RELSYSDIR)/$$d"; \ done - $(INSTALL_SCRIPT) $(BINS:%=../bin/%) $(RELSYSDIR)/bin - $(INSTALL_DATA) $(TARGET_FILES) $(RELSYSDIR)/ebin + $(INSTALL_SCRIPT) $(BINS:%=../bin/%) "$(RELSYSDIR)/bin" + $(INSTALL_DATA) $(TARGET_FILES) "$(RELSYSDIR)/ebin" $(INSTALL_DATA) $(EXTERNAL_HRLS:%=../include/%) $(DICT_HRLS) \ - $(RELSYSDIR)/include - $(INSTALL_DATA) $(DICTS:%=dict/%.dia) $(RELSYSDIR)/src/dict + "$(RELSYSDIR)/include" + $(INSTALL_DATA) $(DICTS:%=dict/%.dia) "$(RELSYSDIR)/src/dict" $(MAKE) $(TARGET_DIRS:%/=release_src_%) $(MAKE) $(EXAMPLE_DIRS:%/=release_examples_%) $(TARGET_DIRS:%/=release_src_%): release_src_%: - $(INSTALL_DIR) $(RELSYSDIR)/src/$* + $(INSTALL_DIR) "$(RELSYSDIR)/src/$*" $(INSTALL_DATA) $(filter $*/%, $(TARGET_MODULES:%=%.erl) \ $(INTERNAL_HRLS)) \ $(filter $*/%, compiler/$(DICT_YRL).yrl) \ - $(RELSYSDIR)/src/$* + "$(RELSYSDIR)/src/$*" $(EXAMPLE_DIRS:%/=release_examples_%): release_examples_%: - $(INSTALL_DIR) $(RELSYSDIR)/examples/$* + $(INSTALL_DIR) "$(RELSYSDIR)/examples/$*" $(INSTALL_DATA) $(patsubst %, ../examples/%, $(filter $*/%, $(EXAMPLES))) \ - $(RELSYSDIR)/examples/$* + "$(RELSYSDIR)/examples/$*" release_docs_spec: @@ -245,10 +270,11 @@ depend.mk: depend.sed $(MODULES:%=%.erl) Makefile -include depend.mk -.PHONY: app clean depend dict info release_subdir +.PHONY: app clean realclean depend dict info release_subdir .PHONY: debug opt release_docs_spec release_spec .PHONY: $(TARGET_DIRS:%/=%) $(TARGET_DIRS:%/=release_src_%) .PHONY: $(EXAMPLE_DIRS:%/=release_examples_%) +.PHONY: plt dialyze # Keep intermediate files. .SECONDARY: $(DICT_ERLS) $(DICT_HRLS) gen/$(DICT_YRL:%=%.erl) diff --git a/lib/diameter/src/base/diameter.appup.src b/lib/diameter/src/base/diameter.appup.src index 2ebdad598f..9b2a7d18ab 100644 --- a/lib/diameter/src/base/diameter.appup.src +++ b/lib/diameter/src/base/diameter.appup.src @@ -2,7 +2,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -22,13 +22,28 @@ [ {"0.9", [{restart_application, diameter}]}, {"0.10", [{restart_application, diameter}]}, - {"1.0", [{update, diameter_service}, - {update, diameter_watchdog}]} + {"1.0", [{restart_application, diameter}]}, + {"1.1", [%% new code + {add_module, diameter_transport}, + %% modified code + {load, diameter_sctp}, + {load, diameter_stats}, + {load, diameter_service}, + {load, diameter_config}, + {load, diameter_codec}, + {load, diameter_watchdog}, + {load, diameter_peer}, + {load, diameter_peer_fsm}, + {load, diameter}, + %% unmodified but including modified diameter.hrl + {load, diameter_callback}, + {load, diameter_capx}, + {load, diameter_types}]} ], [ {"0.9", [{restart_application, diameter}]}, {"0.10", [{restart_application, diameter}]}, - {"1.0", [{update, diameter_watchdog}, - {update, diameter_service}]} + {"1.0", [{restart_application, diameter}]}, + {"1.1", [{restart_application, diameter}]} ] }. diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl index fe1212b7e0..421e280422 100644 --- a/lib/diameter/src/base/diameter_codec.erl +++ b/lib/diameter/src/base/diameter_codec.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -63,9 +63,9 @@ encode(Mod, #diameter_packet{} = Pkt) -> e(Mod, Pkt) catch error: Reason -> - %% Be verbose rather than letting the emulator truncate the - %% error report. - X = {Reason, ?STACK}, + %% Be verbose since a crash report may be truncated and + %% encode errors are self-inflicted. + X = {?MODULE, encode, {Reason, ?STACK}}, diameter_lib:error_report(X, {?MODULE, encode, [Mod, Pkt]}), exit(X) end; @@ -91,7 +91,8 @@ e(_, #diameter_packet{msg = [#diameter_header{} = Hdr | As]} = Pkt) -> Flags = make_flags(0, Hdr), - Pkt#diameter_packet{bin = <<Vsn:8, Length:24, + Pkt#diameter_packet{header = Hdr, + bin = <<Vsn:8, Length:24, Flags:8, Code:24, Aid:32, Hid:32, diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index 9253af0de2..e47f63f814 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -519,6 +519,7 @@ rm(SvcName, L) -> Refs = lists:map(fun(#transport{ref = R}) -> R end, L), case stop_transport(SvcName, Refs) of ok -> + diameter_stats:flush(Refs), lists:foreach(fun delete_object/1, L); {error, _} = No -> No @@ -600,7 +601,7 @@ app_acc({application, Opts}, Acc) -> module = init_mod(Mod), init_state = ModS, mutable = init_mutable(M), - answer_errors = init_answers(A)} + options = [{answer_errors, init_answers(A)}]} | Acc]; app_acc(_, Acc) -> Acc. diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index bbda62c32b..302540e76b 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -131,11 +131,10 @@ %% specified on the transport in question. Check here that the list is %% still non-empty. -start({_, Ref} = Type, Opts, #diameter_service{applications = Apps} = Svc) -> +start({_,_} = Type, Opts, #diameter_service{applications = Apps} = Svc) -> [] /= Apps orelse ?ERROR({no_apps, Type, Opts}), T = {self(), Type, Opts, Svc}, {ok, Pid} = diameter_peer_fsm_sup:start_child(T), - diameter_stats:reg(Pid, Ref), Pid. start_link(T) -> @@ -157,6 +156,7 @@ init(T) -> i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc}) -> putr(?DWA_KEY, dwa(Caps)), {M, Ref} = T, + diameter_stats:reg(Ref), {[Ts], Rest} = proplists:split(Opts, [capabilities_cb]), putr(?CB_KEY, {Ref, [F || {_,F} <- Ts]}), erlang:monitor(process, WPid), @@ -250,14 +250,27 @@ handle_info(T, #state{} = State) -> ?LOG(stop, T), x(T, State) catch + exit: {diameter_codec, encode, _} = Reason -> + close_wd(Reason, State#state.parent), + ?LOG(stop, Reason), + %% diameter_codec:encode/2 emits an error report. Only + %% indicate the probable reason here. + diameter_lib:info_report(probable_configuration_error, + insufficient_capabilities), + {stop, {shutdown, Reason}, State}; {?MODULE, Tag, Reason} -> ?LOG(Tag, {Reason, T}), {stop, {shutdown, Reason}, State} end. -%% The form of the exception caught here is historical. It's +%% The form of the throw caught here is historical. It's %% significant that it's not a 2-tuple, as in ?FAILURE(Reason), %% since these are caught elsewhere. +%% Note that there's no guarantee that the service and transport +%% capabilities are good enough to build a CER/CEA that can be +%% succesfully encoded. It's not checked at diameter:add_transport/2 +%% since this can be called before creating the service. + x(Reason, #state{} = S) -> close_wd(Reason, S), {stop, {shutdown, Reason}, S}. diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 3dfdcee2b2..5b8a399758 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -43,8 +43,7 @@ subscriptions/0, services/0, services/1, - whois/1, - flush_stats/1]). + whois/1]). %% test/debug -export([call_module/3, @@ -65,13 +64,33 @@ -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). +%% The "old" states maintained in this module historically. -define(STATE_UP, up). -define(STATE_DOWN, down). +-type op_state() :: ?STATE_UP + | ?STATE_DOWN. + +%% The RFC 3539 watchdog states that are now maintained, albeit +%% along with the old up/down. okay = up, else down. +-define(WD_INITIAL, initial). +-define(WD_OKAY, okay). +-define(WD_SUSPECT, suspect). +-define(WD_DOWN, down). +-define(WD_REOPEN, reopen). + +-type wd_state() :: ?WD_INITIAL + | ?WD_OKAY + | ?WD_SUSPECT + | ?WD_DOWN + | ?WD_REOPEN. + -define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1 -define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests -define(RESTART_TC, 1000). %% if restart was this recent +-define(RELAY, ?DIAMETER_DICT_RELAY). + %% Used to be able to swap this with anything else dict-like but now %% rely on the fact that a service's #state{} record does not change %% in storing in it ?STATE table and not always going through the @@ -117,7 +136,8 @@ type :: match(connect | accept), ref :: match(reference()), %% key into diameter_config options :: match([diameter:transport_opt()]),%% from start_transport - op_state = ?STATE_DOWN :: match(?STATE_DOWN | ?STATE_UP), + op_state = {?STATE_DOWN, ?WD_INITIAL} + :: match(op_state() | {op_state(), wd_state()}), started = now(), %% at process start conn = false :: match(boolean() | pid())}). %% true at accept, pid() at connection_up (connT key) @@ -388,15 +408,6 @@ whois(SvcName) -> undefined end. -%%% --------------------------------------------------------------------------- -%%% # flush_stats/1 -%%% -%%% Output: list of {{SvcName, Alias, Counter}, Value} -%%% --------------------------------------------------------------------------- - -flush_stats(TPid) -> - diameter_stats:flush(TPid). - %% =========================================================================== %% =========================================================================== @@ -516,6 +527,34 @@ transition({reconnect, Pid}, S) -> reconnect(Pid, S), ok; +%% Watchdog is sending notification of a state transition. Note that +%% the connection_up/down messages are pre-date this message and are +%% still used. A 'watchdog' message will follow these and communicate +%% the same state as was set in handling connection_up/down. +transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName, + peerT = PeerT}) -> + #peer{ref = Ref, type = T, options = Opts, op_state = {OS,_}} + = P + = fetch(PeerT, Pid), + insert(PeerT, P#peer{op_state = {OS, To}}), + send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}), + ok; +%% Death of a peer process results in the removal of it's peer and any +%% associated conn record when 'DOWN' is received (after this) but the +%% states will be {?STATE_UP, ?WD_DOWN} for a short time. (No real +%% problem since ?WD_* is only used in service_info.) We set ?WD_OKAY +%% as a consequence of connection_up since we know a watchdog is +%% coming. We can't set anything at connection_down since we don't +%% know if the subsequent watchdog message will be ?WD_DOWN or +%% ?WD_SUSPECT. We don't (yet) set ?STATE_* as a consequence of a +%% watchdog message since this requires changing some of the matching +%% on ?STATE_*. +%% +%% Death of a conn process results in connection_down followed by +%% watchdog ?WD_DOWN. The latter doesn't result in the conn record +%% being deleted since 'DOWN' from death of its peer doesn't (yet) +%% deal with the record having been removed. + %% Monitor process has died. Just die with a reason that tells %% diameter_config about the happening. If a cleaner shutdown is %% required then someone should stop us. @@ -879,7 +918,14 @@ accepted(Pid, _TPid, #state{peerT = PeerT} = S) -> fetch(Tid, Key) -> [T] = ets:lookup(Tid, Key), - T. + case T of + #peer{op_state = ?STATE_UP} = P -> + P#peer{op_state = {?STATE_UP, ?WD_OKAY}}; + #peer{op_state = ?STATE_DOWN} = P -> + P#peer{op_state = {?STATE_DOWN, ?WD_DOWN}}; + _ -> + T + end. %%% --------------------------------------------------------------------------- %%% # connection_up/3 @@ -925,12 +971,12 @@ connection_up(T, P, C, #state{peerT = PeerT, service = #diameter_service{applications = Apps}} = S) -> - #peer{conn = TPid, op_state = ?STATE_DOWN} + #peer{conn = TPid, op_state = {?STATE_DOWN, _}} = P, #conn{apps = SApps, caps = Caps} = C, - insert(PeerT, P#peer{op_state = ?STATE_UP}), + insert(PeerT, P#peer{op_state = {?STATE_UP, ?WD_OKAY}}), request_peer_up(TPid), report_status(up, P, C, S, T), @@ -945,27 +991,35 @@ ilp({Id, Alias}, {TC, SA}, LDict) -> init_conn(Id, Alias, TC, SA), ?Dict:append(Alias, TC, LDict). -init_conn(Id, Alias, TC, {SvcName, Apps}) -> +init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) -> #diameter_app{module = ModX, id = Id} %% assert = find_app(Alias, Apps), - peer_cb({ModX, peer_up, [SvcName, TC]}, Alias). + peer_cb({ModX, peer_up, [SvcName, TC]}, Alias) + orelse exit(TPid, kill). %% fake transport failure + +%% find_app/2 find_app(Alias, Apps) -> - lists:keyfind(Alias, #diameter_app.alias, Apps). + case lists:keyfind(Alias, #diameter_app.alias, Apps) of + #diameter_app{options = E} = A when is_atom(E) -> %% upgrade + A#diameter_app{options = [{answer_errors, E}]}; + A -> + A + end. -%% A failing peer callback brings down the service. In the case of -%% peer_up we could just kill the transport and emit an error but for -%% peer_down we have no way to cleanup any state change that peer_up -%% may have introduced. +%% Don't bring down the service (and all associated connections) +%% regardless of what happens. peer_cb(MFA, Alias) -> try state_cb(MFA, Alias) of ModS -> - mod_state(Alias, ModS) + mod_state(Alias, ModS), + true catch - E: Reason -> - ?ERROR({E, Reason, MFA, ?STACK}) + E:R -> + diameter_lib:error_report({failure, {E, R, Alias, ?STACK}}, MFA), + false end. %%% --------------------------------------------------------------------------- @@ -979,22 +1033,22 @@ peer_cb(MFA, Alias) -> connection_down(Pid, #state{peerT = PeerT, connT = ConnT} = S) -> - #peer{op_state = ?STATE_UP, %% assert + #peer{op_state = {?STATE_UP, WS}, %% assert conn = TPid} = P = fetch(PeerT, Pid), C = fetch(ConnT, TPid), - insert(PeerT, P#peer{op_state = ?STATE_DOWN}), + insert(PeerT, P#peer{op_state = {?STATE_DOWN, WS}}), connection_down(P,C,S). %% connection_down/3 -connection_down(#peer{op_state = ?STATE_DOWN}, _, S) -> +connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, S) -> S; connection_down(#peer{conn = TPid, - op_state = ?STATE_UP} + op_state = {?STATE_UP, _}} = P, #conn{caps = Caps, apps = SApps} @@ -1043,7 +1097,7 @@ peer_down(Pid, Reason, #state{peerT = PeerT} = S) -> %% Send an event at connection establishment failure. closed({shutdown, {close, _TPid, Reason}}, - #peer{op_state = ?STATE_DOWN, + #peer{op_state = {?STATE_DOWN, _}, ref = Ref, type = Type, options = Opts}, @@ -1352,7 +1406,7 @@ send_request(Pkt, TPid, Caps, App, Opts, Caller, SvcName) -> #diameter_app{alias = Alias, dictionary = Dict, module = ModX, - answer_errors = AE} + options = [{answer_errors, AE} | _]} = App, EPkt = encode(Dict, Pkt), @@ -1935,6 +1989,12 @@ is_loop(Code, Vid, OH, Avps) -> %% %% Send a locally originating reply. +%% Skip the setting of Result-Code and Failed-AVP's below. +reply([Msg], Dict, TPid, Pkt) + when is_list(Msg); + is_tuple(Msg) -> + reply(Msg, Dict, TPid, Pkt#diameter_packet{errors = []}); + %% No errors or a diameter_header/avp list. reply(Msg, Dict, TPid, #diameter_packet{errors = Es, transport_data = TD} @@ -1942,7 +2002,7 @@ reply(Msg, Dict, TPid, #diameter_packet{errors = Es, when [] == Es; is_record(hd(Msg), diameter_header) -> Pkt = diameter_codec:encode(Dict, make_answer_packet(Msg, ReqPkt)), - incr(send, Pkt, TPid), %% count result codes in sent answers + incr(send, Pkt, Dict, TPid), %% count result codes in sent answers send(TPid, Pkt#diameter_packet{transport_data = TD}); %% Or not: set Result-Code and Failed-AVP AVP's. @@ -1983,7 +2043,10 @@ rc(RC) -> rc(Rec, RC, Failed, Dict) when is_integer(RC) -> - set(Rec, [{'Result-Code', RC} | failed_avp(Rec, Failed, Dict)], Dict). + set(Rec, + lists:append([rc(Rec, {'Result-Code', RC}, Dict), + failed_avp(Rec, Failed, Dict)]), + Dict). %% Reply as name and tuple list ... set([_|_] = Ans, Avps, _) -> @@ -1993,6 +2056,22 @@ set([_|_] = Ans, Avps, _) -> set(Rec, Avps, Dict) -> Dict:'#set-'(Avps, Rec). +%% rc/3 +%% +%% Turn the result code into a list if its optional and only set it if +%% the arity is 1 or {0,1}. In other cases (which probably shouldn't +%% exist in practise) we can't know what's appropriate. + +rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) -> + case Dict:avp_arity(MsgName, 'Result-Code') of + 1 -> [T]; + {0,1} -> [{K, [RC]}]; + _ -> [] + end; + +rc(Rec, T, Dict) -> + rc([Dict:rec2msg(element(1, Rec))], T, Dict). + %% failed_avp/3 failed_avp(_, [] = No, _) -> @@ -2200,44 +2279,39 @@ handle_answer(SvcName, _, {error, Req, Reason}) -> handle_answer(SvcName, AnswerErrors, {answer, #request{dictionary = Dict} = Req, Pkt}) -> - a(examine(diameter_codec:decode(Dict, Pkt)), - SvcName, - AnswerErrors, - Req). + answer(examine(diameter_codec:decode(Dict, Pkt)), + SvcName, + AnswerErrors, + Req). %% We don't really need to do a full decode if we're a relay and will %% just resend with a new hop by hop identifier, but might a proxy %% want to examine the answer? -a(#diameter_packet{errors = []} - = Pkt, - SvcName, - AE, - #request{transport = TPid, - caps = Caps, - packet = P} - = Req) -> +answer(Pkt, SvcName, AE, #request{transport = TPid, + dictionary = Dict} + = Req) -> try - incr(in, Pkt, TPid) + incr(recv, Pkt, Dict, TPid) of - _ -> - cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]) + _ -> a(Pkt, SvcName, AE, Req) catch exit: {invalid_error_bit, _} = E -> - e(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req) - end; - -a(#diameter_packet{} = Pkt, SvcName, AE, Req) -> - e(Pkt, SvcName, AE, Req). + a(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req) + end. -e(Pkt, SvcName, callback, #request{transport = TPid, - caps = Caps, - packet = Pkt} - = Req) -> - cb(Req, handle_answer, [Pkt, msg(Pkt), SvcName, {TPid, Caps}]); -e(Pkt, SvcName, report, Req) -> +a(#diameter_packet{errors = Es} = Pkt, SvcName, AE, #request{transport = TPid, + caps = Caps, + packet = P} + = Req) + when [] == Es; + callback == AE -> + cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]); + +a(Pkt, SvcName, report, Req) -> x(errors, handle_answer, [SvcName, Req, Pkt]); -e(Pkt, SvcName, discard, Req) -> + +a(Pkt, SvcName, discard, Req) -> x({errors, handle_answer, [SvcName, Req, Pkt]}). %% Note that we don't check that the application id in the answer's @@ -2249,17 +2323,19 @@ e(Pkt, SvcName, discard, Req) -> %% Increment a stats counter for an incoming or outgoing message. %% TODO: fix -incr(_, #diameter_packet{msg = undefined}, _) -> +incr(_, #diameter_packet{msg = undefined}, _, _) -> ok; -incr(Dir, Pkt, TPid) - when is_pid(TPid) -> +incr(recv = D, #diameter_packet{header = H, errors = [_|_]}, _, TPid) -> + incr(TPid, {diameter_codec:msg_id(H), D, error}); + +incr(Dir, Pkt, Dict, TPid) -> #diameter_packet{header = #diameter_header{is_error = E} = Hdr, msg = Rec} = Pkt, - RC = int(get_avp_value(?BASE, 'Result-Code', Rec)), + RC = int(get_avp_value(Dict, 'Result-Code', Rec)), PE = is_protocol_error(RC), %% Check that the E bit is set only for 3xxx result codes. @@ -2267,15 +2343,21 @@ incr(Dir, Pkt, TPid) orelse (E andalso PE) orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]), - Ctr = rc_counter(Rec, RC), - is_tuple(Ctr) - andalso incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}). + irc(TPid, Hdr, Dir, rc_counter(Dict, Rec, RC)). + +irc(_, _, _, undefined) -> + false; + +irc(TPid, Hdr, Dir, Ctr) -> + incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}). %% incr/2 incr(TPid, Counter) -> diameter_stats:incr(Counter, TPid, 1). +%% error_counter/2 + %% RFC 3588, 7.6: %% %% All Diameter answer messages defined in vendor-specific @@ -2285,26 +2367,27 @@ incr(TPid, Counter) -> %% Maintain statistics assuming one or the other, not both, which is %% surely the intent of the RFC. -rc_counter(_, RC) - when is_integer(RC) -> - {'Result-Code', RC}; -rc_counter(Rec, _) -> - rcc(get_avp_value(?BASE, 'Experimental-Result', Rec)). +rc_counter(Dict, Rec, undefined) -> + er(get_avp_value(Dict, 'Experimental-Result', Rec)); +rc_counter(_, _, RC) -> + {'Result-Code', RC}. %% Outgoing answers may be in any of the forms messages can be sent %% in. Incoming messages will be records. We're assuming here that the %% arity of the result code AVP's is 0 or 1. -rcc([{_,_,RC} = T]) - when is_integer(RC) -> +er([{_,_,N} = T | _]) + when is_integer(N) -> T; -rcc({_,_,RC} = T) - when is_integer(RC) -> +er({_,_,N} = T) + when is_integer(N) -> T; -rcc(_) -> +er(_) -> undefined. -int([N]) +%% Extract the first good looking integer. There's no guarantee +%% that what we're looking for has arity 1. +int([N|_]) when is_integer(N) -> N; int(N) @@ -2349,8 +2432,11 @@ rt(#request{packet = #diameter_packet{msg = undefined}}, _) -> false; %% TODO: Not what we should do. %% ... or not. -rt(#request{packet = #diameter_packet{msg = Msg}} = Req, S) -> - find_transport(get_destination(Msg), Req, S). +rt(#request{packet = #diameter_packet{msg = Msg}, + dictionary = Dict} + = Req, + S) -> + find_transport(get_destination(Dict, Msg), Req, S). %%% --------------------------------------------------------------------------- %%% # report_status/5 @@ -2462,12 +2548,12 @@ find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) -> find_transport(#diameter_app{} = App, Msg, Opts, S) -> ft(App, Msg, Opts, S). -ft(#diameter_app{module = Mod} = App, Msg, Opts, S) -> +ft(#diameter_app{module = Mod, dictionary = Dict} = App, Msg, Opts, S) -> #options{filter = Filter, extra = Xtra} = Opts, pick_peer(App#diameter_app{module = Mod ++ Xtra}, - get_destination(Msg), + get_destination(Dict, Msg), Filter, S); ft(false = No, _, _, _) -> @@ -2503,11 +2589,11 @@ find_transport([_,_] = RH, Filter, S). -%% get_destination/1 +%% get_destination/2 -get_destination(Msg) -> - [str(get_avp_value(?BASE, 'Destination-Realm', Msg)), - str(get_avp_value(?BASE, 'Destination-Host', Msg))]. +get_destination(Dict, Msg) -> + [str(get_avp_value(Dict, 'Destination-Realm', Msg)), + str(get_avp_value(Dict, 'Destination-Host', Msg))]. %% This is not entirely correct. The avp could have an arity 1, in %% which case an empty list is a DiameterIdentity of length 0 rather @@ -2531,6 +2617,9 @@ str(T) -> %% question. The third form allows messages to be sent as is, without %% a dictionary, which is needed in the case of relay agents, for one. +get_avp_value(?RELAY, Name, Msg) -> + get_avp_value(?BASE, Name, Msg); + get_avp_value(Dict, Name, [#diameter_header{} | Avps]) -> try {Code, _, VId} = Dict:avp_header(Name), @@ -2746,20 +2835,45 @@ transports(#state{peerT = PeerT}) -> 'Vendor-Specific-Application-Id', 'Firmware-Revision']). +%% The config returned by diameter:service_info(SvcName, all). -define(ALL_INFO, [capabilities, applications, transport, - pending, - statistics]). + pending]). + +%% The rest. +-define(OTHER_INFO, [connections, + name, + peers, + statistics]). -service_info(Items, S) - when is_list(Items) -> - [{complete(I), service_info(I,S)} || I <- Items]; service_info(Item, S) when is_atom(Item) -> - service_info(Item, S, true). + case tagged_info(Item, S) of + {_, T} -> T; + undefined = No -> No + end; + +service_info(Items, S) -> + tagged_info(Items, S). -service_info(Item, #state{service = Svc} = S, Complete) -> +tagged_info(Item, S) + when is_atom(Item) -> + case complete(Item) of + {value, I} -> + {I, complete_info(I,S)}; + false -> + undefined + end; + +tagged_info(Items, S) + when is_list(Items) -> + [T || I <- Items, T <- [tagged_info(I,S)], T /= undefined, T /= []]; + +tagged_info(_, _) -> + undefined. + +complete_info(Item, #state{service = Svc} = S) -> case Item of name -> S#state.service_name; @@ -2803,70 +2917,119 @@ service_info(Item, #state{service = Svc} = S, Complete) -> applications -> info_apps(S); transport -> info_transport(S); pending -> info_pending(S); - statistics -> info_stats(S); - keys -> ?ALL_INFO ++ ?CAP_INFO; %% mostly for test + keys -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO; all -> service_info(?ALL_INFO, S); - _ when Complete -> service_info(complete(Item), S, false); - _ -> undefined + statistics -> info_stats(S); + connections -> info_connections(S); + peers -> info_peers(S) end. +complete(I) + when I == keys; + I == all -> + {value, I}; complete(Pre) -> P = atom_to_list(Pre), - case [I || I <- [name | ?ALL_INFO] ++ ?CAP_INFO, + case [I || I <- ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO, lists:prefix(P, atom_to_list(I))] of - [I] -> I; - _ -> Pre + [I] -> {value, I}; + _ -> false end. +%% info_stats/1 + info_stats(#state{peerT = PeerT}) -> - Peers = ets:select(PeerT, [{#peer{ref = '$1', conn = '$2', _ = '_'}, - [{'is_pid', '$2'}], - [['$1', '$2']]}]), - diameter_stats:read(lists:append(Peers)). -%% TODO: include peer identities in return value - -info_transport(#state{peerT = PeerT, connT = ConnT}) -> - dict:fold(fun it/3, + MatchSpec = [{#peer{ref = '$1', conn = '$2', _ = '_'}, + [{'is_pid', '$2'}], + [['$1', '$2']]}], + diameter_stats:read(lists:append(ets:select(PeerT, MatchSpec))). + +%% info_transport/1 +%% +%% One entry per configured transport. Statistics for each entry are +%% the accumulated values for the ref and associated peer pids. + +info_transport(S) -> + PeerD = peer_dict(S), + RefsD = dict:map(fun(_, Ls) -> [P || L <- Ls, {peer, {P,_}} <- L] end, + PeerD), + Refs = lists:append(dict:fold(fun(R, Ps, A) -> [[R|Ps] | A] end, + [], + RefsD)), + Stats = diameter_stats:read(Refs), + dict:fold(fun(R, Ls, A) -> + Ps = dict:fetch(R, RefsD), + [[{ref, R} | transport(Ls)] ++ [stats([R|Ps], Stats)] + | A] + end, [], - ets:foldl(fun(T,A) -> it_acc(ConnT, A, T) end, - dict:new(), - PeerT)). - -it(Ref, [[{type, connect} | _] = L], Acc) -> - [[{ref, Ref} | L] | Acc]; -it(Ref, [[{type, accept}, {options, Opts} | _] | _] = L, Acc) -> - [[{ref, Ref}, - {type, listen}, - {options, Opts}, - {accept, [lists:nthtail(2,A) || A <- L]}] - | Acc]. -%% Each entry has the same Opts. (TODO) - -it_acc(ConnT, Acc, #peer{pid = Pid, - type = Type, - ref = Ref, - options = Opts, - op_state = OS, - started = T, - conn = TPid}) -> + PeerD). + +transport([[{type, connect} | _] = L]) -> + L; + +transport([[{type, accept}, {options, Opts} | _] | _] = Ls) -> + [{type, listen}, + {options, Opts}, + {accept, [lists:nthtail(2,L) || L <- Ls]}]. +%% Note that all peer records for a listening transport (ie. same Ref) +%% have the same options. (TODO) + +peer_dict(#state{peerT = PeerT, connT = ConnT}) -> + ets:foldl(fun(T,A) -> peer_acc(ConnT, A, T) end, dict:new(), PeerT). + +peer_acc(ConnT, Acc, #peer{pid = Pid, + type = Type, + ref = Ref, + options = Opts, + op_state = OS, + started = T, + conn = TPid}) -> + WS = wd_state(OS), dict:append(Ref, [{type, Type}, {options, Opts}, - {watchdog, {Pid, T, OS}} - | info_conn(ConnT, TPid)], + {watchdog, {Pid, T, WS}} + | info_conn(ConnT, TPid, WS /= ?WD_DOWN)], Acc). -info_conn(ConnT, TPid) -> - info_conn(ets:lookup(ConnT, TPid)). +info_conn(ConnT, TPid, true) + when is_pid(TPid) -> + info_conn(ets:lookup(ConnT, TPid)); +info_conn(_, _, _) -> + []. + +wd_state({_,S}) -> + S; +wd_state(?STATE_UP) -> + ?WD_OKAY; +wd_state(?STATE_DOWN) -> + ?WD_DOWN. info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) -> [{peer, {Pid, T}}, {apps, SApps}, - {caps, info_caps(Caps)}]; + {caps, info_caps(Caps)} + | try [{port, info_port(Pid)}] catch _:_ -> [] end]; info_conn([] = No) -> No. +%% Extract information that the processes involved are expected to +%% "publish" in their process dictionaries. Simple but backhanded. +info_port(Pid) -> + {_, PD} = process_info(Pid, dictionary), + {_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD), + {TPid, {_Type, TMod, _Cfg}} = T, + {_, TD} = process_info(TPid, dictionary), + {_, Data} = lists:keyfind({TMod, info}, 1, TD), + [{owner, TPid}, {module, TMod} | [_|_] = TMod:info(Data)]. + +%% Use the fields names from diameter_caps instead of +%% diameter_base_CER to distinguish between the 2-tuple values +%% compared to the single capabilities values. Note also that the +%% returned list is tagged 'caps' rather than 'capabilities' to +%% emphasize the difference. info_caps(#diameter_caps{} = C) -> lists:zip(record_info(fields, diameter_caps), tl(tuple_to_list(C))). @@ -2882,6 +3045,10 @@ mk_app(#diameter_app{alias = Alias, {module, ModX}, {id, Id}]. +%% info_pending/1 +%% +%% One entry for each outgoing request whose answer is outstanding. + info_pending(#state{} = S) -> MatchSpec = [{{'$1', #request{transport = '$2', @@ -2895,3 +3062,59 @@ info_pending(#state{} = S) -> {{from, '$3'}}]}}]}], ets:select(?REQUEST_TABLE, MatchSpec). + +%% info_connections/1 +%% +%% One entry per transport connection. Statistics for each entry are +%% for the peer pid only. + +info_connections(S) -> + ConnL = conn_list(S), + Stats = diameter_stats:read([P || L <- ConnL, {peer, {P,_}} <- L]), + [L ++ [stats([P], Stats)] || L <- ConnL, {peer, {P,_}} <- L]. + +conn_list(S) -> + lists:append(dict:fold(fun conn_acc/3, [], peer_dict(S))). + +conn_acc(Ref, Peers, Acc) -> + [[[{ref, Ref} | L] || L <- Peers, lists:keymember(peer, 1, L)] + | Acc]. + +stats(Refs, Stats) -> + {statistics, dict:to_list(lists:foldl(fun(R,D) -> + stats_acc(R, D, Stats) + end, + dict:new(), + Refs))}. + +stats_acc(Ref, Dict, Stats) -> + lists:foldl(fun({C,N}, D) -> dict:update_counter(C, N, D) end, + Dict, + proplists:get_value(Ref, Stats, [])). + +%% info_peers/1 +%% +%% One entry per peer Origin-Host. Statistics for each entry are +%% accumulated values for all associated transport refs and peer pids. + +info_peers(S) -> + ConnL = conn_list(S), + {PeerD, RefD} = lists:foldl(fun peer_acc/2, + {dict:new(), dict:new()}, + ConnL), + Refs = lists:append(dict:fold(fun(_, Rs, A) -> [lists:append(Rs) | A] end, + [], + RefD)), + Stats = diameter_stats:read(Refs), + dict:fold(fun(OH, Cs, A) -> + Rs = lists:append(dict:fetch(OH, RefD)), + [{OH, [{connections, Cs}, stats(Rs, Stats)]} + | A] + end, + [], + PeerD). + +peer_acc(Peer, {PeerD, RefD}) -> + [Ref, {TPid, _}, [{origin_host, {_, OH}} | _]] + = [proplists:get_value(K, Peer) || K <- [ref, peer, caps]], + {dict:append(OH, Peer, PeerD), dict:append(OH, [Ref, TPid], RefD)}. diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl index 71479afa95..70727d068e 100644 --- a/lib/diameter/src/base/diameter_stats.erl +++ b/lib/diameter/src/base/diameter_stats.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -22,14 +22,13 @@ %% -module(diameter_stats). --compile({no_auto_import, [monitor/2]}). -behaviour(gen_server). --export([reg/1, reg/2, - incr/1, incr/2, incr/3, +-export([reg/2, reg/1, + incr/3, incr/1, read/1, - flush/0, flush/1]). + flush/1]). %% supervisor callback -export([start_link/0]). @@ -48,123 +47,105 @@ -include("diameter_internal.hrl"). -%% ets table containing stats. reg(Pid, Ref) inserts a {Pid, Ref}, -%% incr(Counter, X, N) updates the counter keyed at {Counter, X}, and -%% Pid death causes counters keyed on {Counter, Pid} to be deleted and -%% added to those keyed on {Counter, Ref}. +%% ets table containing 2-tuple stats. reg(Pid, Ref) inserts a {Pid, +%% Ref}, incr(Counter, X, N) updates the counter keyed at {Counter, +%% X}, and Pid death causes counters keyed on {Counter, Pid} to be +%% deleted and added to those keyed on {Counter, Ref}. -define(TABLE, ?MODULE). %% Name of registered server. -define(SERVER, ?MODULE). -%% Entries in the table. --define(REC(Key, Value), {Key, Value}). - %% Server state. -record(state, {id = now()}). -type counter() :: any(). --type contrib() :: any(). - -%%% --------------------------------------------------------------------------- -%%% # reg(Pid, Contrib) -%%% -%%% Description: Register a process as a contributor of statistics -%%% associated with a specified term. Statistics can be -%%% contributed by specifying either Pid or Contrib as -%%% the second argument to incr/3. Statistics contributed -%%% by Pid are folded into the corresponding entry for -%%% Contrib when the process dies. -%%% -%%% Contrib can be any term but should not be a pid -%%% passed as the first argument to reg/2. Subsequent -%%% registrations for the same Pid overwrite the association -%%% --------------------------------------------------------------------------- - --spec reg(pid(), contrib()) - -> true. +-type ref() :: any(). + +%% --------------------------------------------------------------------------- +%% # reg(Pid, Ref) +%% +%% Register a process as a contributor of statistics associated with a +%% specified term. Statistics can be contributed by specifying either +%% Pid or Ref as the second argument to incr/3. Statistics contributed +%% by Pid are folded into the corresponding entry for Ref when the +%% process dies. +%% --------------------------------------------------------------------------- + +-spec reg(pid(), ref()) + -> boolean(). -reg(Pid, Contrib) +reg(Pid, Ref) when is_pid(Pid) -> - call({reg, Pid, Contrib}). + call({reg, Pid, Ref}). --spec reg(contrib()) +-spec reg(ref()) -> true. reg(Ref) -> reg(self(), Ref). -%%% --------------------------------------------------------------------------- -%%% # incr(Counter, Contrib, N) -%%% -%%% Description: Increment a counter for the specified contributor. -%%% -%%% Contrib will typically be an argument passed to reg/2 -%%% but there's nothing that requires this. In particular, -%%% if Contrib is a pid that hasn't been registered then -%%% counters are unaffected by the death of the process. -%%% --------------------------------------------------------------------------- - --spec incr(counter(), contrib(), integer()) - -> integer(). +%% --------------------------------------------------------------------------- +%% # incr(Counter, Ref, N) +%% +%% Increment a counter for the specified contributor. +%% +%% Ref will typically be an argument passed to reg/2 but there's +%% nothing that requires this. Only registered pids can contribute +%% counters however, otherwise incr/3 is a no-op. +%% --------------------------------------------------------------------------- -incr(Ctr, Contrib, N) -> - update_counter({Ctr, Contrib}, N). +-spec incr(counter(), ref(), integer()) + -> integer() | false. -incr(Ctr, N) +incr(Ctr, Ref, N) when is_integer(N) -> - incr(Ctr, self(), N); - -incr(Ctr, Contrib) -> - incr(Ctr, Contrib, 1). + update_counter({Ctr, Ref}, N). incr(Ctr) -> incr(Ctr, self(), 1). -%%% --------------------------------------------------------------------------- -%%% # read(Contribs) -%%% -%%% Description: Retrieve counters for the specified contributors. -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # read(Refs) +%% +%% Retrieve counters for the specified contributors. +%% --------------------------------------------------------------------------- --spec read([contrib()]) - -> [{contrib(), [{counter(), integer()}]}]. +-spec read([ref()]) + -> [{ref(), [{counter(), integer()}]}]. -read(Contribs) -> - lists:foldl(fun(?REC({T,C}, N), D) -> orddict:append(C, {T,N}, D) end, +read(Refs) -> + read(Refs, false). + +read(Refs, B) -> + MatchSpec = [{{{'_', '$1'}, '_'}, + [?ORCOND([{'=:=', '$1', {const, R}} + || R <- Refs])], + ['$_']}], + L = ets:select(?TABLE, MatchSpec), + B andalso delete(L), + lists:foldl(fun({{C,R}, N}, D) -> orddict:append(R, {C,N}, D) end, orddict:new(), - ets:select(?TABLE, [{?REC({'_', '$1'}, '_'), - [?ORCOND([{'=:=', '$1', {const, C}} - || C <- Contribs])], - ['$_']}])). - -%%% --------------------------------------------------------------------------- -%%% # flush(Contrib) -%%% -%%% Description: Retrieve and delete statistics for the specified -%%% contributor. -%%% -%%% If Contrib is a pid registered with reg/2 then statistics -%%% for both and its associated contributor are retrieved. -%%% --------------------------------------------------------------------------- - --spec flush(contrib()) - -> [{counter(), integer()}]. + L). + +%% --------------------------------------------------------------------------- +%% # flush(Refs) +%% +%% Retrieve and delete statistics for the specified contributors. +%% --------------------------------------------------------------------------- + +-spec flush([ref()]) + -> [{ref(), {counter(), integer()}}]. -flush(Contrib) -> +flush(Refs) -> try - call({flush, Contrib}) + call({flush, Refs}) catch exit: _ -> [] end. -flush() -> - flush(self()). - -%%% --------------------------------------------------------- -%%% EXPORTED INTERNAL FUNCTIONS -%%% --------------------------------------------------------- +%% =========================================================================== start_link() -> ServerName = {local, ?SERVER}, @@ -179,18 +160,16 @@ state() -> uptime() -> call(uptime). -%%% ---------------------------------------------------------- -%%% # init(_) -%%% -%%% Output: {ok, State} -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # init/1 +%% ---------------------------------------------------------- init([]) -> ets:new(?TABLE, [named_table, ordered_set, public]), {ok, #state{}}. %% ---------------------------------------------------------- -%% handle_call(Request, From, State) +%% # handle_call/3 %% ---------------------------------------------------------- handle_call(state, _, State) -> @@ -199,31 +178,31 @@ handle_call(state, _, State) -> handle_call(uptime, _, #state{id = Time} = State) -> {reply, diameter_lib:now_diff(Time), State}; -handle_call({reg, Pid, Contrib}, _From, State) -> - monitor(not ets:member(?TABLE, Pid), Pid), - {reply, insert(?REC(Pid, Contrib)), State}; +handle_call({incr, T}, _, State) -> + {reply, update_counter(T), State}; -handle_call({flush, Contrib}, _From, State) -> - {reply, fetch(Contrib), State}; +handle_call({reg, Pid, Ref}, _From, State) -> + B = ets:insert_new(?TABLE, {Pid, Ref}), + B andalso erlang:monitor(process, Pid), + {reply, B, State}; + +handle_call({flush, Refs}, _From, State) -> + {reply, read(Refs, true), State}; handle_call(Req, From, State) -> ?UNEXPECTED([Req, From]), {reply, nok, State}. %% ---------------------------------------------------------- -%% handle_cast(Request, State) +%% # handle_cast/2 %% ---------------------------------------------------------- -handle_cast({incr, Rec}, State) -> - update_counter(Rec), - {noreply, State}; - handle_cast(Msg, State) -> ?UNEXPECTED([Msg]), {noreply, State}. %% ---------------------------------------------------------- -%% handle_info(Request, State) +%% # handle_info/2 %% ---------------------------------------------------------- handle_info({'DOWN', _MRef, process, Pid, _}, State) -> @@ -235,91 +214,62 @@ handle_info(Info, State) -> {noreply, State}. %% ---------------------------------------------------------- -%% terminate(Reason, State) +%% # terminate/2 %% ---------------------------------------------------------- terminate(_Reason, _State) -> ok. %% ---------------------------------------------------------- -%% code_change(OldVsn, State, Extra) +%% # code_change/3 %% ---------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%% --------------------------------------------------------- -%%% INTERNAL FUNCTIONS -%%% --------------------------------------------------------- - -%% monitor/2 - -monitor(true, Pid) -> - erlang:monitor(process, Pid); -monitor(false = No, _) -> - No. +%% =========================================================================== %% down/1 down(Pid) -> - L = ets:match_object(?TABLE, ?REC({'_', Pid}, '_')), - [?REC(_, Ref) = T] = lookup(Pid), + down(lookup(Pid), ets:match_object(?TABLE, {{'_', Pid}, '_'})). + +down([{_, Ref} = T], L) -> fold(Ref, L), - delete_object(T), + delete([T|L]); +down([], L) -> %% flushed delete(L). -%% Fold Pid-based entries into Ref-based ones. +%% Fold pid-based entries into ref-based ones. fold(Ref, L) -> - lists:foreach(fun(?REC({K, _}, V)) -> update_counter({{K, Ref}, V}) end, - L). - -delete(Objs) -> - lists:foreach(fun delete_object/1, Objs). - -%% fetch/1 - -fetch(X) -> - MatchSpec = [{?REC({'_', '$1'}, '_'), - [?ORCOND([{'==', '$1', {const, T}} || T <- [X | ref(X)]])], - ['$_']}], - L = ets:select(?TABLE, MatchSpec), - delete(L), - D = lists:foldl(fun sum/2, dict:new(), L), - dict:to_list(D). - -sum({{Ctr, _}, N}, Dict) -> - dict:update(Ctr, fun(V) -> V+N end, N, Dict). - -ref(Pid) - when is_pid(Pid) -> - ets:select(?TABLE, [{?REC(Pid, '$1'), [], ['$1']}]); -ref(_) -> - []. + lists:foreach(fun({{K, _}, V}) -> update_counter({{K, Ref}, V}) end, L). %% update_counter/2 %% -%% From an arbitrary request process. Cast to the server process to -%% insert a new element if the counter doesn't exists so that two -%% processes don't do so simultaneously. +%% From an arbitrary process. Call to the server process to insert a +%% new element if the counter doesn't exists so that two processes +%% don't insert simultaneously. update_counter(Key, N) -> try ets:update_counter(?TABLE, Key, N) catch error: badarg -> - cast({incr, ?REC(Key, N)}) + call({incr, {Key, N}}) end. %% update_counter/1 %% -%% From the server process. +%% From the server process, when update_counter/2 failed due to a +%% non-existent entry. -update_counter(?REC(Key, N) = T) -> +update_counter({{_Ctr, Ref} = Key, N} = T) -> try ets:update_counter(?TABLE, Key, N) catch error: badarg -> - insert(T) + (not is_pid(Ref) orelse ets:member(?TABLE, Ref)) + andalso begin insert(T), N end end. insert(T) -> @@ -328,13 +278,8 @@ insert(T) -> lookup(Key) -> ets:lookup(?TABLE, Key). -delete_object(T) -> - ets:delete_object(?TABLE, T). - -%% cast/1 - -cast(Msg) -> - gen_server:cast(?SERVER, Msg). +delete(Objs) -> + lists:foreach(fun({K,_}) -> ets:delete(?TABLE, K) end, Objs). %% call/1 diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index fb22fd8275..d7474e5c56 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -54,7 +54,7 @@ %% number of DWAs received during reopen %% end PCB parent = self() :: pid(), - transport :: pid(), + transport :: pid() | undefined, tref :: reference(), %% reference for current watchdog timer message_data}). %% term passed into diameter_service with message @@ -64,6 +64,13 @@ %% that a failed capabilities exchange produces the desired exit %% reason. +-spec start(Type, {RecvData, [Opt], SvcName, #diameter_service{}}) + -> {reference(), pid()} + when Type :: {connect|accept, diameter:transport_ref()}, + RecvData :: term(), + Opt :: diameter:transport_opt(), + SvcName :: diameter:service_name(). + start({_,_} = Type, T) -> Ref = make_ref(), {ok, Pid} = diameter_watchdog_sup:start_child({Ref, {Type, self(), T}}), @@ -102,7 +109,7 @@ i({_, Pid, _} = T) -> %% from old code erlang:monitor(process, Pid), make_state(T). -make_state({T, Pid, {ConnT, +make_state({T, Pid, {RecvData, Opts, SvcName, #diameter_service{applications = Apps, @@ -116,7 +123,7 @@ make_state({T, Pid, {ConnT, tw = proplists:get_value(watchdog_timer, Opts, ?DEFAULT_TW_INIT), - message_data = {ConnT, SvcName, Apps}}. + message_data = {RecvData, SvcName, Apps}}. %% handle_call/3 @@ -134,14 +141,36 @@ handle_info(T, State) -> case transition(T, State) of ok -> {noreply, State}; - #watchdog{status = X} = S -> - ?LOGC(X =/= State#watchdog.status, transition, X), + #watchdog{} = S -> + event(State, S), {noreply, S}; stop -> ?LOG(stop, T), + event(State, State#watchdog{status = down}), {stop, {shutdown, T}, State} end. +event(#watchdog{status = T}, #watchdog{status = T}) -> + ok; + +event(#watchdog{transport = undefined}, #watchdog{transport = undefined}) -> + ok; + +event(#watchdog{status = From, transport = F, parent = Pid}, + #watchdog{status = To, transport = T}) -> + E = {tpid(F,T), From, To}, + notify(Pid, E), + ?LOG(transition, {self(), E}). + +tpid(_, Pid) + when is_pid(Pid) -> + Pid; +tpid(Pid, _) -> + Pid. + +notify(Pid, E) -> + Pid ! {watchdog, self(), E}. + %% terminate/2 terminate(_, _) -> @@ -251,8 +280,8 @@ transition({'DOWN', _, process, TPid, _}, status = initial}) -> stop; -transition({'DOWN', _, process, Pid, _}, - #watchdog{transport = Pid} +transition({'DOWN', _, process, TPid, _}, + #watchdog{transport = TPid} = S) -> failover(S), close(S), @@ -385,7 +414,7 @@ recv(Name, Pkt, S) -> rcv(Name, Pkt, S), NS catch - throw: {?MODULE, throwaway, #watchdog{} = NS} -> + {?MODULE, throwaway, #watchdog{} = NS} -> NS end. diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 9a65834647..79b8b851fb 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -68,6 +68,7 @@ -record(transport, {parent :: pid(), mode :: {accept, pid()} + | accept | {connect, {list(inet:ip_address()), uint(), list()}} %% {RAs, RP, Errors} | connect, @@ -349,6 +350,11 @@ terminate(_, #transport{assoc_id = undefined}) -> ok; terminate(_, #transport{socket = Sock, + mode = accept, + assoc_id = Id}) -> + close(Sock, Id); + +terminate(_, #transport{socket = Sock, mode = {accept, _}, assoc_id = Id}) -> close(Sock, Id); @@ -380,13 +386,16 @@ start_timer(S) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) -> - setopts(Sock), - case find(Data, S) of + Id = assoc_id(Data), + + try find(Id, Data, S) of {TPid, NewS} -> - TPid ! Msg, + TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg}, NewS; false -> S + after + setopts(Sock) end; %% Transport is asking message to be sent. See send/3 for why the send @@ -454,15 +463,19 @@ t(T,S) -> %% transition/2 -%% Incoming message. -transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock, - mode = {accept, _}} - = S) -> - recv(Data, S); +%% Listening process is transfering ownership of an association. +transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg}, + #transport{mode = {accept, _}, + socket = LSock} + = S) -> + transition(Msg, S#transport{socket = Sock}); -transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> +%% Incoming message. +transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> setopts(Sock), recv(Data, S); +%% Don't match on Sock since in R15B01 it can be the listening socket +%% in the (peeled-off) accept case, which is likely a bug. %% Outgoing message. transition({diameter, {send, Msg}}, S) -> @@ -480,13 +493,18 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> stop; +%% Parent process has died. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> + stop; + %% Listener process has died. transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) -> stop; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> - stop; +%% Ditto but we have ownership of the association. It might be that +%% we'll go down anyway though. +transition({'DOWN', _, process, _Pid, _}, #transport{mode = accept}) -> + ok; %% Request for the local port number. transition({resolve_port, Pid}, #transport{socket = Sock}) @@ -545,14 +563,6 @@ send(Bin, #transport{streams = {_, OS}, %% send/3 -%% Messages have to be sent from the controlling process, which is -%% probably a bug. Sending from here causes an inet_reply, Sock, -%% Status} message to be sent to the controlling process while -%% gen_sctp:send/4 here hangs. -send(StreamId, Bin, #transport{assoc_id = AId, - mode = {accept, LPid}}) -> - LPid ! {send, AId, StreamId, Bin}; - send(StreamId, Bin, #transport{socket = Sock, assoc_id = AId}) -> send(Sock, AId, StreamId, Bin). @@ -635,21 +645,15 @@ up(#transport{parent = Pid, S#transport{mode = C}; up(#transport{parent = Pid, - mode = {accept, _}} + mode = {accept = A, _}} = S) -> diameter_peer:up(Pid), - S. + S#transport{mode = A}. -%% find/2 +%% find/3 -find({[#sctp_sndrcvinfo{assoc_id = Id}], _} - = Data, - #listener{tmap = T} - = S) -> - f(ets:lookup(T, Id), Data, S); - -find({_, Rec} = Data, #listener{tmap = T} = S) -> - f(ets:lookup(T, assoc_id(Rec)), Data, S). +find(Id, Data, #listener{tmap = T} = S) -> + f(ets:lookup(T, Id), Data, S). %% New association and a transport waiting for one: use it. f([], @@ -690,17 +694,29 @@ f([], _, _) -> %% assoc_id/1 -assoc_id(#sctp_shutdown_event{assoc_id = Id}) -> +assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) -> + Id; +assoc_id({_, Rec}) -> + id(Rec). + +id(#sctp_shutdown_event{assoc_id = Id}) -> Id; -assoc_id(#sctp_assoc_change{assoc_id = Id}) -> +id(#sctp_assoc_change{assoc_id = Id}) -> Id; -assoc_id(#sctp_sndrcvinfo{assoc_id = Id}) -> +id(#sctp_sndrcvinfo{assoc_id = Id}) -> Id; -assoc_id(#sctp_paddr_change{assoc_id = Id}) -> +id(#sctp_paddr_change{assoc_id = Id}) -> Id; -assoc_id(#sctp_adaptation_event{assoc_id = Id}) -> +id(#sctp_adaptation_event{assoc_id = Id}) -> Id. +%% peeloff/3 + +peeloff(LSock, Id, TPid) -> + {ok, Sock} = gen_sctp:peeloff(LSock, Id), + ok = gen_sctp:controlling_process(Sock, TPid), + Sock. + %% connect/4 connect(_, [], _, Reasons) -> diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 597f2f14d7..f3fbbee609 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -80,7 +80,7 @@ %% Accepting/connecting transport process state. -record(transport, - {socket :: inet:socket(), %% accept or connect socket + {socket :: inet:socket() | ssl:sslsock(), %% accept or connect socket parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module frag = <<>> :: binary() | {tref(), frag()}, %% message fragment |