aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src')
-rw-r--r--lib/diameter/src/.gitignore1
-rw-r--r--lib/diameter/src/Makefile48
-rw-r--r--lib/diameter/src/base/diameter.appup.src25
-rw-r--r--lib/diameter/src/base/diameter_codec.erl11
-rw-r--r--lib/diameter/src/base/diameter_config.erl5
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl21
-rw-r--r--lib/diameter/src/base/diameter_service.erl493
-rw-r--r--lib/diameter/src/base/diameter_stats.erl265
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl47
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl88
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl2
11 files changed, 638 insertions, 368 deletions
diff --git a/lib/diameter/src/.gitignore b/lib/diameter/src/.gitignore
index feeb378fd8..cc06720fd1 100644
--- a/lib/diameter/src/.gitignore
+++ b/lib/diameter/src/.gitignore
@@ -1,2 +1,3 @@
/depend.mk
+/otp.plt
diff --git a/lib/diameter/src/Makefile b/lib/diameter/src/Makefile
index dbfaa4e140..99c343275b 100644
--- a/lib/diameter/src/Makefile
+++ b/lib/diameter/src/Makefile
@@ -1,7 +1,7 @@
#
# %CopyrightBegin%
#
-# Copyright Ericsson AB 2010-2011. All Rights Reserved.
+# Copyright Ericsson AB 2010-2012. All Rights Reserved.
#
# The contents of this file are subject to the Erlang Public License,
# Version 1.1, (the "License"); you may not use this file except in
@@ -181,6 +181,31 @@ clean:
rm -f $(TARGET_FILES) gen/*
rm -f depend.mk
+realclean: clean
+ rm -f ../ebin/*
+# Not $(EBIN) just to be a bit paranoid
+
+PLT = ./otp.plt
+
+plt:
+ dialyzer --build_plt \
+ --apps erts stdlib kernel \
+ xmerl ssl public_key crypto \
+ compiler syntax_tools runtime_tools \
+ --output_plt $(PLT) \
+ --verbose
+
+dialyze: opt $(PLT)
+ dialyzer --plt $(PLT) \
+ --verbose \
+ -Wno_improper_lists \
+ $(EBIN)/diameter_gen_base_rfc3588.$(EMULATOR) \
+ $(patsubst %, $(EBIN)/%.$(EMULATOR), \
+ $(notdir $(RT_MODULES) $(CT_MODULES)))
+# Omit all but the common dictionary module since these
+# (diameter_gen_relay in particular) generate warning depending on how
+# much of the included diameter_gen.hrl they use.
+
# ----------------------------------------------------
# Release targets
# ----------------------------------------------------
@@ -195,27 +220,27 @@ endif
release_spec: opt
for d in bin ebin include src/dict; do \
- $(INSTALL_DIR) $(RELSYSDIR)/$$d; \
+ $(INSTALL_DIR) "$(RELSYSDIR)/$$d"; \
done
- $(INSTALL_SCRIPT) $(BINS:%=../bin/%) $(RELSYSDIR)/bin
- $(INSTALL_DATA) $(TARGET_FILES) $(RELSYSDIR)/ebin
+ $(INSTALL_SCRIPT) $(BINS:%=../bin/%) "$(RELSYSDIR)/bin"
+ $(INSTALL_DATA) $(TARGET_FILES) "$(RELSYSDIR)/ebin"
$(INSTALL_DATA) $(EXTERNAL_HRLS:%=../include/%) $(DICT_HRLS) \
- $(RELSYSDIR)/include
- $(INSTALL_DATA) $(DICTS:%=dict/%.dia) $(RELSYSDIR)/src/dict
+ "$(RELSYSDIR)/include"
+ $(INSTALL_DATA) $(DICTS:%=dict/%.dia) "$(RELSYSDIR)/src/dict"
$(MAKE) $(TARGET_DIRS:%/=release_src_%)
$(MAKE) $(EXAMPLE_DIRS:%/=release_examples_%)
$(TARGET_DIRS:%/=release_src_%): release_src_%:
- $(INSTALL_DIR) $(RELSYSDIR)/src/$*
+ $(INSTALL_DIR) "$(RELSYSDIR)/src/$*"
$(INSTALL_DATA) $(filter $*/%, $(TARGET_MODULES:%=%.erl) \
$(INTERNAL_HRLS)) \
$(filter $*/%, compiler/$(DICT_YRL).yrl) \
- $(RELSYSDIR)/src/$*
+ "$(RELSYSDIR)/src/$*"
$(EXAMPLE_DIRS:%/=release_examples_%): release_examples_%:
- $(INSTALL_DIR) $(RELSYSDIR)/examples/$*
+ $(INSTALL_DIR) "$(RELSYSDIR)/examples/$*"
$(INSTALL_DATA) $(patsubst %, ../examples/%, $(filter $*/%, $(EXAMPLES))) \
- $(RELSYSDIR)/examples/$*
+ "$(RELSYSDIR)/examples/$*"
release_docs_spec:
@@ -245,10 +270,11 @@ depend.mk: depend.sed $(MODULES:%=%.erl) Makefile
-include depend.mk
-.PHONY: app clean depend dict info release_subdir
+.PHONY: app clean realclean depend dict info release_subdir
.PHONY: debug opt release_docs_spec release_spec
.PHONY: $(TARGET_DIRS:%/=%) $(TARGET_DIRS:%/=release_src_%)
.PHONY: $(EXAMPLE_DIRS:%/=release_examples_%)
+.PHONY: plt dialyze
# Keep intermediate files.
.SECONDARY: $(DICT_ERLS) $(DICT_HRLS) gen/$(DICT_YRL:%=%.erl)
diff --git a/lib/diameter/src/base/diameter.appup.src b/lib/diameter/src/base/diameter.appup.src
index 2ebdad598f..9b2a7d18ab 100644
--- a/lib/diameter/src/base/diameter.appup.src
+++ b/lib/diameter/src/base/diameter.appup.src
@@ -2,7 +2,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -22,13 +22,28 @@
[
{"0.9", [{restart_application, diameter}]},
{"0.10", [{restart_application, diameter}]},
- {"1.0", [{update, diameter_service},
- {update, diameter_watchdog}]}
+ {"1.0", [{restart_application, diameter}]},
+ {"1.1", [%% new code
+ {add_module, diameter_transport},
+ %% modified code
+ {load, diameter_sctp},
+ {load, diameter_stats},
+ {load, diameter_service},
+ {load, diameter_config},
+ {load, diameter_codec},
+ {load, diameter_watchdog},
+ {load, diameter_peer},
+ {load, diameter_peer_fsm},
+ {load, diameter},
+ %% unmodified but including modified diameter.hrl
+ {load, diameter_callback},
+ {load, diameter_capx},
+ {load, diameter_types}]}
],
[
{"0.9", [{restart_application, diameter}]},
{"0.10", [{restart_application, diameter}]},
- {"1.0", [{update, diameter_watchdog},
- {update, diameter_service}]}
+ {"1.0", [{restart_application, diameter}]},
+ {"1.1", [{restart_application, diameter}]}
]
}.
diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl
index fe1212b7e0..421e280422 100644
--- a/lib/diameter/src/base/diameter_codec.erl
+++ b/lib/diameter/src/base/diameter_codec.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -63,9 +63,9 @@ encode(Mod, #diameter_packet{} = Pkt) ->
e(Mod, Pkt)
catch
error: Reason ->
- %% Be verbose rather than letting the emulator truncate the
- %% error report.
- X = {Reason, ?STACK},
+ %% Be verbose since a crash report may be truncated and
+ %% encode errors are self-inflicted.
+ X = {?MODULE, encode, {Reason, ?STACK}},
diameter_lib:error_report(X, {?MODULE, encode, [Mod, Pkt]}),
exit(X)
end;
@@ -91,7 +91,8 @@ e(_, #diameter_packet{msg = [#diameter_header{} = Hdr | As]} = Pkt) ->
Flags = make_flags(0, Hdr),
- Pkt#diameter_packet{bin = <<Vsn:8, Length:24,
+ Pkt#diameter_packet{header = Hdr,
+ bin = <<Vsn:8, Length:24,
Flags:8, Code:24,
Aid:32,
Hid:32,
diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl
index 9253af0de2..e47f63f814 100644
--- a/lib/diameter/src/base/diameter_config.erl
+++ b/lib/diameter/src/base/diameter_config.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -519,6 +519,7 @@ rm(SvcName, L) ->
Refs = lists:map(fun(#transport{ref = R}) -> R end, L),
case stop_transport(SvcName, Refs) of
ok ->
+ diameter_stats:flush(Refs),
lists:foreach(fun delete_object/1, L);
{error, _} = No ->
No
@@ -600,7 +601,7 @@ app_acc({application, Opts}, Acc) ->
module = init_mod(Mod),
init_state = ModS,
mutable = init_mutable(M),
- answer_errors = init_answers(A)}
+ options = [{answer_errors, init_answers(A)}]}
| Acc];
app_acc(_, Acc) ->
Acc.
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index bbda62c32b..302540e76b 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -131,11 +131,10 @@
%% specified on the transport in question. Check here that the list is
%% still non-empty.
-start({_, Ref} = Type, Opts, #diameter_service{applications = Apps} = Svc) ->
+start({_,_} = Type, Opts, #diameter_service{applications = Apps} = Svc) ->
[] /= Apps orelse ?ERROR({no_apps, Type, Opts}),
T = {self(), Type, Opts, Svc},
{ok, Pid} = diameter_peer_fsm_sup:start_child(T),
- diameter_stats:reg(Pid, Ref),
Pid.
start_link(T) ->
@@ -157,6 +156,7 @@ init(T) ->
i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc}) ->
putr(?DWA_KEY, dwa(Caps)),
{M, Ref} = T,
+ diameter_stats:reg(Ref),
{[Ts], Rest} = proplists:split(Opts, [capabilities_cb]),
putr(?CB_KEY, {Ref, [F || {_,F} <- Ts]}),
erlang:monitor(process, WPid),
@@ -250,14 +250,27 @@ handle_info(T, #state{} = State) ->
?LOG(stop, T),
x(T, State)
catch
+ exit: {diameter_codec, encode, _} = Reason ->
+ close_wd(Reason, State#state.parent),
+ ?LOG(stop, Reason),
+ %% diameter_codec:encode/2 emits an error report. Only
+ %% indicate the probable reason here.
+ diameter_lib:info_report(probable_configuration_error,
+ insufficient_capabilities),
+ {stop, {shutdown, Reason}, State};
{?MODULE, Tag, Reason} ->
?LOG(Tag, {Reason, T}),
{stop, {shutdown, Reason}, State}
end.
-%% The form of the exception caught here is historical. It's
+%% The form of the throw caught here is historical. It's
%% significant that it's not a 2-tuple, as in ?FAILURE(Reason),
%% since these are caught elsewhere.
+%% Note that there's no guarantee that the service and transport
+%% capabilities are good enough to build a CER/CEA that can be
+%% succesfully encoded. It's not checked at diameter:add_transport/2
+%% since this can be called before creating the service.
+
x(Reason, #state{} = S) ->
close_wd(Reason, S),
{stop, {shutdown, Reason}, S}.
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index 3dfdcee2b2..5b8a399758 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -43,8 +43,7 @@
subscriptions/0,
services/0,
services/1,
- whois/1,
- flush_stats/1]).
+ whois/1]).
%% test/debug
-export([call_module/3,
@@ -65,13 +64,33 @@
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
+%% The "old" states maintained in this module historically.
-define(STATE_UP, up).
-define(STATE_DOWN, down).
+-type op_state() :: ?STATE_UP
+ | ?STATE_DOWN.
+
+%% The RFC 3539 watchdog states that are now maintained, albeit
+%% along with the old up/down. okay = up, else down.
+-define(WD_INITIAL, initial).
+-define(WD_OKAY, okay).
+-define(WD_SUSPECT, suspect).
+-define(WD_DOWN, down).
+-define(WD_REOPEN, reopen).
+
+-type wd_state() :: ?WD_INITIAL
+ | ?WD_OKAY
+ | ?WD_SUSPECT
+ | ?WD_DOWN
+ | ?WD_REOPEN.
+
-define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1
-define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests
-define(RESTART_TC, 1000). %% if restart was this recent
+-define(RELAY, ?DIAMETER_DICT_RELAY).
+
%% Used to be able to swap this with anything else dict-like but now
%% rely on the fact that a service's #state{} record does not change
%% in storing in it ?STATE table and not always going through the
@@ -117,7 +136,8 @@
type :: match(connect | accept),
ref :: match(reference()), %% key into diameter_config
options :: match([diameter:transport_opt()]),%% from start_transport
- op_state = ?STATE_DOWN :: match(?STATE_DOWN | ?STATE_UP),
+ op_state = {?STATE_DOWN, ?WD_INITIAL}
+ :: match(op_state() | {op_state(), wd_state()}),
started = now(), %% at process start
conn = false :: match(boolean() | pid())}).
%% true at accept, pid() at connection_up (connT key)
@@ -388,15 +408,6 @@ whois(SvcName) ->
undefined
end.
-%%% ---------------------------------------------------------------------------
-%%% # flush_stats/1
-%%%
-%%% Output: list of {{SvcName, Alias, Counter}, Value}
-%%% ---------------------------------------------------------------------------
-
-flush_stats(TPid) ->
- diameter_stats:flush(TPid).
-
%% ===========================================================================
%% ===========================================================================
@@ -516,6 +527,34 @@ transition({reconnect, Pid}, S) ->
reconnect(Pid, S),
ok;
+%% Watchdog is sending notification of a state transition. Note that
+%% the connection_up/down messages are pre-date this message and are
+%% still used. A 'watchdog' message will follow these and communicate
+%% the same state as was set in handling connection_up/down.
+transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName,
+ peerT = PeerT}) ->
+ #peer{ref = Ref, type = T, options = Opts, op_state = {OS,_}}
+ = P
+ = fetch(PeerT, Pid),
+ insert(PeerT, P#peer{op_state = {OS, To}}),
+ send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}),
+ ok;
+%% Death of a peer process results in the removal of it's peer and any
+%% associated conn record when 'DOWN' is received (after this) but the
+%% states will be {?STATE_UP, ?WD_DOWN} for a short time. (No real
+%% problem since ?WD_* is only used in service_info.) We set ?WD_OKAY
+%% as a consequence of connection_up since we know a watchdog is
+%% coming. We can't set anything at connection_down since we don't
+%% know if the subsequent watchdog message will be ?WD_DOWN or
+%% ?WD_SUSPECT. We don't (yet) set ?STATE_* as a consequence of a
+%% watchdog message since this requires changing some of the matching
+%% on ?STATE_*.
+%%
+%% Death of a conn process results in connection_down followed by
+%% watchdog ?WD_DOWN. The latter doesn't result in the conn record
+%% being deleted since 'DOWN' from death of its peer doesn't (yet)
+%% deal with the record having been removed.
+
%% Monitor process has died. Just die with a reason that tells
%% diameter_config about the happening. If a cleaner shutdown is
%% required then someone should stop us.
@@ -879,7 +918,14 @@ accepted(Pid, _TPid, #state{peerT = PeerT} = S) ->
fetch(Tid, Key) ->
[T] = ets:lookup(Tid, Key),
- T.
+ case T of
+ #peer{op_state = ?STATE_UP} = P ->
+ P#peer{op_state = {?STATE_UP, ?WD_OKAY}};
+ #peer{op_state = ?STATE_DOWN} = P ->
+ P#peer{op_state = {?STATE_DOWN, ?WD_DOWN}};
+ _ ->
+ T
+ end.
%%% ---------------------------------------------------------------------------
%%% # connection_up/3
@@ -925,12 +971,12 @@ connection_up(T, P, C, #state{peerT = PeerT,
service
= #diameter_service{applications = Apps}}
= S) ->
- #peer{conn = TPid, op_state = ?STATE_DOWN}
+ #peer{conn = TPid, op_state = {?STATE_DOWN, _}}
= P,
#conn{apps = SApps, caps = Caps}
= C,
- insert(PeerT, P#peer{op_state = ?STATE_UP}),
+ insert(PeerT, P#peer{op_state = {?STATE_UP, ?WD_OKAY}}),
request_peer_up(TPid),
report_status(up, P, C, S, T),
@@ -945,27 +991,35 @@ ilp({Id, Alias}, {TC, SA}, LDict) ->
init_conn(Id, Alias, TC, SA),
?Dict:append(Alias, TC, LDict).
-init_conn(Id, Alias, TC, {SvcName, Apps}) ->
+init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) ->
#diameter_app{module = ModX,
id = Id} %% assert
= find_app(Alias, Apps),
- peer_cb({ModX, peer_up, [SvcName, TC]}, Alias).
+ peer_cb({ModX, peer_up, [SvcName, TC]}, Alias)
+ orelse exit(TPid, kill). %% fake transport failure
+
+%% find_app/2
find_app(Alias, Apps) ->
- lists:keyfind(Alias, #diameter_app.alias, Apps).
+ case lists:keyfind(Alias, #diameter_app.alias, Apps) of
+ #diameter_app{options = E} = A when is_atom(E) -> %% upgrade
+ A#diameter_app{options = [{answer_errors, E}]};
+ A ->
+ A
+ end.
-%% A failing peer callback brings down the service. In the case of
-%% peer_up we could just kill the transport and emit an error but for
-%% peer_down we have no way to cleanup any state change that peer_up
-%% may have introduced.
+%% Don't bring down the service (and all associated connections)
+%% regardless of what happens.
peer_cb(MFA, Alias) ->
try state_cb(MFA, Alias) of
ModS ->
- mod_state(Alias, ModS)
+ mod_state(Alias, ModS),
+ true
catch
- E: Reason ->
- ?ERROR({E, Reason, MFA, ?STACK})
+ E:R ->
+ diameter_lib:error_report({failure, {E, R, Alias, ?STACK}}, MFA),
+ false
end.
%%% ---------------------------------------------------------------------------
@@ -979,22 +1033,22 @@ peer_cb(MFA, Alias) ->
connection_down(Pid, #state{peerT = PeerT,
connT = ConnT}
= S) ->
- #peer{op_state = ?STATE_UP, %% assert
+ #peer{op_state = {?STATE_UP, WS}, %% assert
conn = TPid}
= P
= fetch(PeerT, Pid),
C = fetch(ConnT, TPid),
- insert(PeerT, P#peer{op_state = ?STATE_DOWN}),
+ insert(PeerT, P#peer{op_state = {?STATE_DOWN, WS}}),
connection_down(P,C,S).
%% connection_down/3
-connection_down(#peer{op_state = ?STATE_DOWN}, _, S) ->
+connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, S) ->
S;
connection_down(#peer{conn = TPid,
- op_state = ?STATE_UP}
+ op_state = {?STATE_UP, _}}
= P,
#conn{caps = Caps,
apps = SApps}
@@ -1043,7 +1097,7 @@ peer_down(Pid, Reason, #state{peerT = PeerT} = S) ->
%% Send an event at connection establishment failure.
closed({shutdown, {close, _TPid, Reason}},
- #peer{op_state = ?STATE_DOWN,
+ #peer{op_state = {?STATE_DOWN, _},
ref = Ref,
type = Type,
options = Opts},
@@ -1352,7 +1406,7 @@ send_request(Pkt, TPid, Caps, App, Opts, Caller, SvcName) ->
#diameter_app{alias = Alias,
dictionary = Dict,
module = ModX,
- answer_errors = AE}
+ options = [{answer_errors, AE} | _]}
= App,
EPkt = encode(Dict, Pkt),
@@ -1935,6 +1989,12 @@ is_loop(Code, Vid, OH, Avps) ->
%%
%% Send a locally originating reply.
+%% Skip the setting of Result-Code and Failed-AVP's below.
+reply([Msg], Dict, TPid, Pkt)
+ when is_list(Msg);
+ is_tuple(Msg) ->
+ reply(Msg, Dict, TPid, Pkt#diameter_packet{errors = []});
+
%% No errors or a diameter_header/avp list.
reply(Msg, Dict, TPid, #diameter_packet{errors = Es,
transport_data = TD}
@@ -1942,7 +2002,7 @@ reply(Msg, Dict, TPid, #diameter_packet{errors = Es,
when [] == Es;
is_record(hd(Msg), diameter_header) ->
Pkt = diameter_codec:encode(Dict, make_answer_packet(Msg, ReqPkt)),
- incr(send, Pkt, TPid), %% count result codes in sent answers
+ incr(send, Pkt, Dict, TPid), %% count result codes in sent answers
send(TPid, Pkt#diameter_packet{transport_data = TD});
%% Or not: set Result-Code and Failed-AVP AVP's.
@@ -1983,7 +2043,10 @@ rc(RC) ->
rc(Rec, RC, Failed, Dict)
when is_integer(RC) ->
- set(Rec, [{'Result-Code', RC} | failed_avp(Rec, Failed, Dict)], Dict).
+ set(Rec,
+ lists:append([rc(Rec, {'Result-Code', RC}, Dict),
+ failed_avp(Rec, Failed, Dict)]),
+ Dict).
%% Reply as name and tuple list ...
set([_|_] = Ans, Avps, _) ->
@@ -1993,6 +2056,22 @@ set([_|_] = Ans, Avps, _) ->
set(Rec, Avps, Dict) ->
Dict:'#set-'(Avps, Rec).
+%% rc/3
+%%
+%% Turn the result code into a list if its optional and only set it if
+%% the arity is 1 or {0,1}. In other cases (which probably shouldn't
+%% exist in practise) we can't know what's appropriate.
+
+rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) ->
+ case Dict:avp_arity(MsgName, 'Result-Code') of
+ 1 -> [T];
+ {0,1} -> [{K, [RC]}];
+ _ -> []
+ end;
+
+rc(Rec, T, Dict) ->
+ rc([Dict:rec2msg(element(1, Rec))], T, Dict).
+
%% failed_avp/3
failed_avp(_, [] = No, _) ->
@@ -2200,44 +2279,39 @@ handle_answer(SvcName, _, {error, Req, Reason}) ->
handle_answer(SvcName,
AnswerErrors,
{answer, #request{dictionary = Dict} = Req, Pkt}) ->
- a(examine(diameter_codec:decode(Dict, Pkt)),
- SvcName,
- AnswerErrors,
- Req).
+ answer(examine(diameter_codec:decode(Dict, Pkt)),
+ SvcName,
+ AnswerErrors,
+ Req).
%% We don't really need to do a full decode if we're a relay and will
%% just resend with a new hop by hop identifier, but might a proxy
%% want to examine the answer?
-a(#diameter_packet{errors = []}
- = Pkt,
- SvcName,
- AE,
- #request{transport = TPid,
- caps = Caps,
- packet = P}
- = Req) ->
+answer(Pkt, SvcName, AE, #request{transport = TPid,
+ dictionary = Dict}
+ = Req) ->
try
- incr(in, Pkt, TPid)
+ incr(recv, Pkt, Dict, TPid)
of
- _ ->
- cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}])
+ _ -> a(Pkt, SvcName, AE, Req)
catch
exit: {invalid_error_bit, _} = E ->
- e(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req)
- end;
-
-a(#diameter_packet{} = Pkt, SvcName, AE, Req) ->
- e(Pkt, SvcName, AE, Req).
+ a(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req)
+ end.
-e(Pkt, SvcName, callback, #request{transport = TPid,
- caps = Caps,
- packet = Pkt}
- = Req) ->
- cb(Req, handle_answer, [Pkt, msg(Pkt), SvcName, {TPid, Caps}]);
-e(Pkt, SvcName, report, Req) ->
+a(#diameter_packet{errors = Es} = Pkt, SvcName, AE, #request{transport = TPid,
+ caps = Caps,
+ packet = P}
+ = Req)
+ when [] == Es;
+ callback == AE ->
+ cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]);
+
+a(Pkt, SvcName, report, Req) ->
x(errors, handle_answer, [SvcName, Req, Pkt]);
-e(Pkt, SvcName, discard, Req) ->
+
+a(Pkt, SvcName, discard, Req) ->
x({errors, handle_answer, [SvcName, Req, Pkt]}).
%% Note that we don't check that the application id in the answer's
@@ -2249,17 +2323,19 @@ e(Pkt, SvcName, discard, Req) ->
%% Increment a stats counter for an incoming or outgoing message.
%% TODO: fix
-incr(_, #diameter_packet{msg = undefined}, _) ->
+incr(_, #diameter_packet{msg = undefined}, _, _) ->
ok;
-incr(Dir, Pkt, TPid)
- when is_pid(TPid) ->
+incr(recv = D, #diameter_packet{header = H, errors = [_|_]}, _, TPid) ->
+ incr(TPid, {diameter_codec:msg_id(H), D, error});
+
+incr(Dir, Pkt, Dict, TPid) ->
#diameter_packet{header = #diameter_header{is_error = E}
= Hdr,
msg = Rec}
= Pkt,
- RC = int(get_avp_value(?BASE, 'Result-Code', Rec)),
+ RC = int(get_avp_value(Dict, 'Result-Code', Rec)),
PE = is_protocol_error(RC),
%% Check that the E bit is set only for 3xxx result codes.
@@ -2267,15 +2343,21 @@ incr(Dir, Pkt, TPid)
orelse (E andalso PE)
orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]),
- Ctr = rc_counter(Rec, RC),
- is_tuple(Ctr)
- andalso incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}).
+ irc(TPid, Hdr, Dir, rc_counter(Dict, Rec, RC)).
+
+irc(_, _, _, undefined) ->
+ false;
+
+irc(TPid, Hdr, Dir, Ctr) ->
+ incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}).
%% incr/2
incr(TPid, Counter) ->
diameter_stats:incr(Counter, TPid, 1).
+%% error_counter/2
+
%% RFC 3588, 7.6:
%%
%% All Diameter answer messages defined in vendor-specific
@@ -2285,26 +2367,27 @@ incr(TPid, Counter) ->
%% Maintain statistics assuming one or the other, not both, which is
%% surely the intent of the RFC.
-rc_counter(_, RC)
- when is_integer(RC) ->
- {'Result-Code', RC};
-rc_counter(Rec, _) ->
- rcc(get_avp_value(?BASE, 'Experimental-Result', Rec)).
+rc_counter(Dict, Rec, undefined) ->
+ er(get_avp_value(Dict, 'Experimental-Result', Rec));
+rc_counter(_, _, RC) ->
+ {'Result-Code', RC}.
%% Outgoing answers may be in any of the forms messages can be sent
%% in. Incoming messages will be records. We're assuming here that the
%% arity of the result code AVP's is 0 or 1.
-rcc([{_,_,RC} = T])
- when is_integer(RC) ->
+er([{_,_,N} = T | _])
+ when is_integer(N) ->
T;
-rcc({_,_,RC} = T)
- when is_integer(RC) ->
+er({_,_,N} = T)
+ when is_integer(N) ->
T;
-rcc(_) ->
+er(_) ->
undefined.
-int([N])
+%% Extract the first good looking integer. There's no guarantee
+%% that what we're looking for has arity 1.
+int([N|_])
when is_integer(N) ->
N;
int(N)
@@ -2349,8 +2432,11 @@ rt(#request{packet = #diameter_packet{msg = undefined}}, _) ->
false; %% TODO: Not what we should do.
%% ... or not.
-rt(#request{packet = #diameter_packet{msg = Msg}} = Req, S) ->
- find_transport(get_destination(Msg), Req, S).
+rt(#request{packet = #diameter_packet{msg = Msg},
+ dictionary = Dict}
+ = Req,
+ S) ->
+ find_transport(get_destination(Dict, Msg), Req, S).
%%% ---------------------------------------------------------------------------
%%% # report_status/5
@@ -2462,12 +2548,12 @@ find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) ->
find_transport(#diameter_app{} = App, Msg, Opts, S) ->
ft(App, Msg, Opts, S).
-ft(#diameter_app{module = Mod} = App, Msg, Opts, S) ->
+ft(#diameter_app{module = Mod, dictionary = Dict} = App, Msg, Opts, S) ->
#options{filter = Filter,
extra = Xtra}
= Opts,
pick_peer(App#diameter_app{module = Mod ++ Xtra},
- get_destination(Msg),
+ get_destination(Dict, Msg),
Filter,
S);
ft(false = No, _, _, _) ->
@@ -2503,11 +2589,11 @@ find_transport([_,_] = RH,
Filter,
S).
-%% get_destination/1
+%% get_destination/2
-get_destination(Msg) ->
- [str(get_avp_value(?BASE, 'Destination-Realm', Msg)),
- str(get_avp_value(?BASE, 'Destination-Host', Msg))].
+get_destination(Dict, Msg) ->
+ [str(get_avp_value(Dict, 'Destination-Realm', Msg)),
+ str(get_avp_value(Dict, 'Destination-Host', Msg))].
%% This is not entirely correct. The avp could have an arity 1, in
%% which case an empty list is a DiameterIdentity of length 0 rather
@@ -2531,6 +2617,9 @@ str(T) ->
%% question. The third form allows messages to be sent as is, without
%% a dictionary, which is needed in the case of relay agents, for one.
+get_avp_value(?RELAY, Name, Msg) ->
+ get_avp_value(?BASE, Name, Msg);
+
get_avp_value(Dict, Name, [#diameter_header{} | Avps]) ->
try
{Code, _, VId} = Dict:avp_header(Name),
@@ -2746,20 +2835,45 @@ transports(#state{peerT = PeerT}) ->
'Vendor-Specific-Application-Id',
'Firmware-Revision']).
+%% The config returned by diameter:service_info(SvcName, all).
-define(ALL_INFO, [capabilities,
applications,
transport,
- pending,
- statistics]).
+ pending]).
+
+%% The rest.
+-define(OTHER_INFO, [connections,
+ name,
+ peers,
+ statistics]).
-service_info(Items, S)
- when is_list(Items) ->
- [{complete(I), service_info(I,S)} || I <- Items];
service_info(Item, S)
when is_atom(Item) ->
- service_info(Item, S, true).
+ case tagged_info(Item, S) of
+ {_, T} -> T;
+ undefined = No -> No
+ end;
+
+service_info(Items, S) ->
+ tagged_info(Items, S).
-service_info(Item, #state{service = Svc} = S, Complete) ->
+tagged_info(Item, S)
+ when is_atom(Item) ->
+ case complete(Item) of
+ {value, I} ->
+ {I, complete_info(I,S)};
+ false ->
+ undefined
+ end;
+
+tagged_info(Items, S)
+ when is_list(Items) ->
+ [T || I <- Items, T <- [tagged_info(I,S)], T /= undefined, T /= []];
+
+tagged_info(_, _) ->
+ undefined.
+
+complete_info(Item, #state{service = Svc} = S) ->
case Item of
name ->
S#state.service_name;
@@ -2803,70 +2917,119 @@ service_info(Item, #state{service = Svc} = S, Complete) ->
applications -> info_apps(S);
transport -> info_transport(S);
pending -> info_pending(S);
- statistics -> info_stats(S);
- keys -> ?ALL_INFO ++ ?CAP_INFO; %% mostly for test
+ keys -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO;
all -> service_info(?ALL_INFO, S);
- _ when Complete -> service_info(complete(Item), S, false);
- _ -> undefined
+ statistics -> info_stats(S);
+ connections -> info_connections(S);
+ peers -> info_peers(S)
end.
+complete(I)
+ when I == keys;
+ I == all ->
+ {value, I};
complete(Pre) ->
P = atom_to_list(Pre),
- case [I || I <- [name | ?ALL_INFO] ++ ?CAP_INFO,
+ case [I || I <- ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO,
lists:prefix(P, atom_to_list(I))]
of
- [I] -> I;
- _ -> Pre
+ [I] -> {value, I};
+ _ -> false
end.
+%% info_stats/1
+
info_stats(#state{peerT = PeerT}) ->
- Peers = ets:select(PeerT, [{#peer{ref = '$1', conn = '$2', _ = '_'},
- [{'is_pid', '$2'}],
- [['$1', '$2']]}]),
- diameter_stats:read(lists:append(Peers)).
-%% TODO: include peer identities in return value
-
-info_transport(#state{peerT = PeerT, connT = ConnT}) ->
- dict:fold(fun it/3,
+ MatchSpec = [{#peer{ref = '$1', conn = '$2', _ = '_'},
+ [{'is_pid', '$2'}],
+ [['$1', '$2']]}],
+ diameter_stats:read(lists:append(ets:select(PeerT, MatchSpec))).
+
+%% info_transport/1
+%%
+%% One entry per configured transport. Statistics for each entry are
+%% the accumulated values for the ref and associated peer pids.
+
+info_transport(S) ->
+ PeerD = peer_dict(S),
+ RefsD = dict:map(fun(_, Ls) -> [P || L <- Ls, {peer, {P,_}} <- L] end,
+ PeerD),
+ Refs = lists:append(dict:fold(fun(R, Ps, A) -> [[R|Ps] | A] end,
+ [],
+ RefsD)),
+ Stats = diameter_stats:read(Refs),
+ dict:fold(fun(R, Ls, A) ->
+ Ps = dict:fetch(R, RefsD),
+ [[{ref, R} | transport(Ls)] ++ [stats([R|Ps], Stats)]
+ | A]
+ end,
[],
- ets:foldl(fun(T,A) -> it_acc(ConnT, A, T) end,
- dict:new(),
- PeerT)).
-
-it(Ref, [[{type, connect} | _] = L], Acc) ->
- [[{ref, Ref} | L] | Acc];
-it(Ref, [[{type, accept}, {options, Opts} | _] | _] = L, Acc) ->
- [[{ref, Ref},
- {type, listen},
- {options, Opts},
- {accept, [lists:nthtail(2,A) || A <- L]}]
- | Acc].
-%% Each entry has the same Opts. (TODO)
-
-it_acc(ConnT, Acc, #peer{pid = Pid,
- type = Type,
- ref = Ref,
- options = Opts,
- op_state = OS,
- started = T,
- conn = TPid}) ->
+ PeerD).
+
+transport([[{type, connect} | _] = L]) ->
+ L;
+
+transport([[{type, accept}, {options, Opts} | _] | _] = Ls) ->
+ [{type, listen},
+ {options, Opts},
+ {accept, [lists:nthtail(2,L) || L <- Ls]}].
+%% Note that all peer records for a listening transport (ie. same Ref)
+%% have the same options. (TODO)
+
+peer_dict(#state{peerT = PeerT, connT = ConnT}) ->
+ ets:foldl(fun(T,A) -> peer_acc(ConnT, A, T) end, dict:new(), PeerT).
+
+peer_acc(ConnT, Acc, #peer{pid = Pid,
+ type = Type,
+ ref = Ref,
+ options = Opts,
+ op_state = OS,
+ started = T,
+ conn = TPid}) ->
+ WS = wd_state(OS),
dict:append(Ref,
[{type, Type},
{options, Opts},
- {watchdog, {Pid, T, OS}}
- | info_conn(ConnT, TPid)],
+ {watchdog, {Pid, T, WS}}
+ | info_conn(ConnT, TPid, WS /= ?WD_DOWN)],
Acc).
-info_conn(ConnT, TPid) ->
- info_conn(ets:lookup(ConnT, TPid)).
+info_conn(ConnT, TPid, true)
+ when is_pid(TPid) ->
+ info_conn(ets:lookup(ConnT, TPid));
+info_conn(_, _, _) ->
+ [].
+
+wd_state({_,S}) ->
+ S;
+wd_state(?STATE_UP) ->
+ ?WD_OKAY;
+wd_state(?STATE_DOWN) ->
+ ?WD_DOWN.
info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
[{peer, {Pid, T}},
{apps, SApps},
- {caps, info_caps(Caps)}];
+ {caps, info_caps(Caps)}
+ | try [{port, info_port(Pid)}] catch _:_ -> [] end];
info_conn([] = No) ->
No.
+%% Extract information that the processes involved are expected to
+%% "publish" in their process dictionaries. Simple but backhanded.
+info_port(Pid) ->
+ {_, PD} = process_info(Pid, dictionary),
+ {_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD),
+ {TPid, {_Type, TMod, _Cfg}} = T,
+ {_, TD} = process_info(TPid, dictionary),
+ {_, Data} = lists:keyfind({TMod, info}, 1, TD),
+ [{owner, TPid}, {module, TMod} | [_|_] = TMod:info(Data)].
+
+%% Use the fields names from diameter_caps instead of
+%% diameter_base_CER to distinguish between the 2-tuple values
+%% compared to the single capabilities values. Note also that the
+%% returned list is tagged 'caps' rather than 'capabilities' to
+%% emphasize the difference.
info_caps(#diameter_caps{} = C) ->
lists:zip(record_info(fields, diameter_caps), tl(tuple_to_list(C))).
@@ -2882,6 +3045,10 @@ mk_app(#diameter_app{alias = Alias,
{module, ModX},
{id, Id}].
+%% info_pending/1
+%%
+%% One entry for each outgoing request whose answer is outstanding.
+
info_pending(#state{} = S) ->
MatchSpec = [{{'$1',
#request{transport = '$2',
@@ -2895,3 +3062,59 @@ info_pending(#state{} = S) ->
{{from, '$3'}}]}}]}],
ets:select(?REQUEST_TABLE, MatchSpec).
+
+%% info_connections/1
+%%
+%% One entry per transport connection. Statistics for each entry are
+%% for the peer pid only.
+
+info_connections(S) ->
+ ConnL = conn_list(S),
+ Stats = diameter_stats:read([P || L <- ConnL, {peer, {P,_}} <- L]),
+ [L ++ [stats([P], Stats)] || L <- ConnL, {peer, {P,_}} <- L].
+
+conn_list(S) ->
+ lists:append(dict:fold(fun conn_acc/3, [], peer_dict(S))).
+
+conn_acc(Ref, Peers, Acc) ->
+ [[[{ref, Ref} | L] || L <- Peers, lists:keymember(peer, 1, L)]
+ | Acc].
+
+stats(Refs, Stats) ->
+ {statistics, dict:to_list(lists:foldl(fun(R,D) ->
+ stats_acc(R, D, Stats)
+ end,
+ dict:new(),
+ Refs))}.
+
+stats_acc(Ref, Dict, Stats) ->
+ lists:foldl(fun({C,N}, D) -> dict:update_counter(C, N, D) end,
+ Dict,
+ proplists:get_value(Ref, Stats, [])).
+
+%% info_peers/1
+%%
+%% One entry per peer Origin-Host. Statistics for each entry are
+%% accumulated values for all associated transport refs and peer pids.
+
+info_peers(S) ->
+ ConnL = conn_list(S),
+ {PeerD, RefD} = lists:foldl(fun peer_acc/2,
+ {dict:new(), dict:new()},
+ ConnL),
+ Refs = lists:append(dict:fold(fun(_, Rs, A) -> [lists:append(Rs) | A] end,
+ [],
+ RefD)),
+ Stats = diameter_stats:read(Refs),
+ dict:fold(fun(OH, Cs, A) ->
+ Rs = lists:append(dict:fetch(OH, RefD)),
+ [{OH, [{connections, Cs}, stats(Rs, Stats)]}
+ | A]
+ end,
+ [],
+ PeerD).
+
+peer_acc(Peer, {PeerD, RefD}) ->
+ [Ref, {TPid, _}, [{origin_host, {_, OH}} | _]]
+ = [proplists:get_value(K, Peer) || K <- [ref, peer, caps]],
+ {dict:append(OH, Peer, PeerD), dict:append(OH, [Ref, TPid], RefD)}.
diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl
index 71479afa95..70727d068e 100644
--- a/lib/diameter/src/base/diameter_stats.erl
+++ b/lib/diameter/src/base/diameter_stats.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -22,14 +22,13 @@
%%
-module(diameter_stats).
--compile({no_auto_import, [monitor/2]}).
-behaviour(gen_server).
--export([reg/1, reg/2,
- incr/1, incr/2, incr/3,
+-export([reg/2, reg/1,
+ incr/3, incr/1,
read/1,
- flush/0, flush/1]).
+ flush/1]).
%% supervisor callback
-export([start_link/0]).
@@ -48,123 +47,105 @@
-include("diameter_internal.hrl").
-%% ets table containing stats. reg(Pid, Ref) inserts a {Pid, Ref},
-%% incr(Counter, X, N) updates the counter keyed at {Counter, X}, and
-%% Pid death causes counters keyed on {Counter, Pid} to be deleted and
-%% added to those keyed on {Counter, Ref}.
+%% ets table containing 2-tuple stats. reg(Pid, Ref) inserts a {Pid,
+%% Ref}, incr(Counter, X, N) updates the counter keyed at {Counter,
+%% X}, and Pid death causes counters keyed on {Counter, Pid} to be
+%% deleted and added to those keyed on {Counter, Ref}.
-define(TABLE, ?MODULE).
%% Name of registered server.
-define(SERVER, ?MODULE).
-%% Entries in the table.
--define(REC(Key, Value), {Key, Value}).
-
%% Server state.
-record(state, {id = now()}).
-type counter() :: any().
--type contrib() :: any().
-
-%%% ---------------------------------------------------------------------------
-%%% # reg(Pid, Contrib)
-%%%
-%%% Description: Register a process as a contributor of statistics
-%%% associated with a specified term. Statistics can be
-%%% contributed by specifying either Pid or Contrib as
-%%% the second argument to incr/3. Statistics contributed
-%%% by Pid are folded into the corresponding entry for
-%%% Contrib when the process dies.
-%%%
-%%% Contrib can be any term but should not be a pid
-%%% passed as the first argument to reg/2. Subsequent
-%%% registrations for the same Pid overwrite the association
-%%% ---------------------------------------------------------------------------
-
--spec reg(pid(), contrib())
- -> true.
+-type ref() :: any().
+
+%% ---------------------------------------------------------------------------
+%% # reg(Pid, Ref)
+%%
+%% Register a process as a contributor of statistics associated with a
+%% specified term. Statistics can be contributed by specifying either
+%% Pid or Ref as the second argument to incr/3. Statistics contributed
+%% by Pid are folded into the corresponding entry for Ref when the
+%% process dies.
+%% ---------------------------------------------------------------------------
+
+-spec reg(pid(), ref())
+ -> boolean().
-reg(Pid, Contrib)
+reg(Pid, Ref)
when is_pid(Pid) ->
- call({reg, Pid, Contrib}).
+ call({reg, Pid, Ref}).
--spec reg(contrib())
+-spec reg(ref())
-> true.
reg(Ref) ->
reg(self(), Ref).
-%%% ---------------------------------------------------------------------------
-%%% # incr(Counter, Contrib, N)
-%%%
-%%% Description: Increment a counter for the specified contributor.
-%%%
-%%% Contrib will typically be an argument passed to reg/2
-%%% but there's nothing that requires this. In particular,
-%%% if Contrib is a pid that hasn't been registered then
-%%% counters are unaffected by the death of the process.
-%%% ---------------------------------------------------------------------------
-
--spec incr(counter(), contrib(), integer())
- -> integer().
+%% ---------------------------------------------------------------------------
+%% # incr(Counter, Ref, N)
+%%
+%% Increment a counter for the specified contributor.
+%%
+%% Ref will typically be an argument passed to reg/2 but there's
+%% nothing that requires this. Only registered pids can contribute
+%% counters however, otherwise incr/3 is a no-op.
+%% ---------------------------------------------------------------------------
-incr(Ctr, Contrib, N) ->
- update_counter({Ctr, Contrib}, N).
+-spec incr(counter(), ref(), integer())
+ -> integer() | false.
-incr(Ctr, N)
+incr(Ctr, Ref, N)
when is_integer(N) ->
- incr(Ctr, self(), N);
-
-incr(Ctr, Contrib) ->
- incr(Ctr, Contrib, 1).
+ update_counter({Ctr, Ref}, N).
incr(Ctr) ->
incr(Ctr, self(), 1).
-%%% ---------------------------------------------------------------------------
-%%% # read(Contribs)
-%%%
-%%% Description: Retrieve counters for the specified contributors.
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # read(Refs)
+%%
+%% Retrieve counters for the specified contributors.
+%% ---------------------------------------------------------------------------
--spec read([contrib()])
- -> [{contrib(), [{counter(), integer()}]}].
+-spec read([ref()])
+ -> [{ref(), [{counter(), integer()}]}].
-read(Contribs) ->
- lists:foldl(fun(?REC({T,C}, N), D) -> orddict:append(C, {T,N}, D) end,
+read(Refs) ->
+ read(Refs, false).
+
+read(Refs, B) ->
+ MatchSpec = [{{{'_', '$1'}, '_'},
+ [?ORCOND([{'=:=', '$1', {const, R}}
+ || R <- Refs])],
+ ['$_']}],
+ L = ets:select(?TABLE, MatchSpec),
+ B andalso delete(L),
+ lists:foldl(fun({{C,R}, N}, D) -> orddict:append(R, {C,N}, D) end,
orddict:new(),
- ets:select(?TABLE, [{?REC({'_', '$1'}, '_'),
- [?ORCOND([{'=:=', '$1', {const, C}}
- || C <- Contribs])],
- ['$_']}])).
-
-%%% ---------------------------------------------------------------------------
-%%% # flush(Contrib)
-%%%
-%%% Description: Retrieve and delete statistics for the specified
-%%% contributor.
-%%%
-%%% If Contrib is a pid registered with reg/2 then statistics
-%%% for both and its associated contributor are retrieved.
-%%% ---------------------------------------------------------------------------
-
--spec flush(contrib())
- -> [{counter(), integer()}].
+ L).
+
+%% ---------------------------------------------------------------------------
+%% # flush(Refs)
+%%
+%% Retrieve and delete statistics for the specified contributors.
+%% ---------------------------------------------------------------------------
+
+-spec flush([ref()])
+ -> [{ref(), {counter(), integer()}}].
-flush(Contrib) ->
+flush(Refs) ->
try
- call({flush, Contrib})
+ call({flush, Refs})
catch
exit: _ ->
[]
end.
-flush() ->
- flush(self()).
-
-%%% ---------------------------------------------------------
-%%% EXPORTED INTERNAL FUNCTIONS
-%%% ---------------------------------------------------------
+%% ===========================================================================
start_link() ->
ServerName = {local, ?SERVER},
@@ -179,18 +160,16 @@ state() ->
uptime() ->
call(uptime).
-%%% ----------------------------------------------------------
-%%% # init(_)
-%%%
-%%% Output: {ok, State}
-%%% ----------------------------------------------------------
+%% ----------------------------------------------------------
+%% # init/1
+%% ----------------------------------------------------------
init([]) ->
ets:new(?TABLE, [named_table, ordered_set, public]),
{ok, #state{}}.
%% ----------------------------------------------------------
-%% handle_call(Request, From, State)
+%% # handle_call/3
%% ----------------------------------------------------------
handle_call(state, _, State) ->
@@ -199,31 +178,31 @@ handle_call(state, _, State) ->
handle_call(uptime, _, #state{id = Time} = State) ->
{reply, diameter_lib:now_diff(Time), State};
-handle_call({reg, Pid, Contrib}, _From, State) ->
- monitor(not ets:member(?TABLE, Pid), Pid),
- {reply, insert(?REC(Pid, Contrib)), State};
+handle_call({incr, T}, _, State) ->
+ {reply, update_counter(T), State};
-handle_call({flush, Contrib}, _From, State) ->
- {reply, fetch(Contrib), State};
+handle_call({reg, Pid, Ref}, _From, State) ->
+ B = ets:insert_new(?TABLE, {Pid, Ref}),
+ B andalso erlang:monitor(process, Pid),
+ {reply, B, State};
+
+handle_call({flush, Refs}, _From, State) ->
+ {reply, read(Refs, true), State};
handle_call(Req, From, State) ->
?UNEXPECTED([Req, From]),
{reply, nok, State}.
%% ----------------------------------------------------------
-%% handle_cast(Request, State)
+%% # handle_cast/2
%% ----------------------------------------------------------
-handle_cast({incr, Rec}, State) ->
- update_counter(Rec),
- {noreply, State};
-
handle_cast(Msg, State) ->
?UNEXPECTED([Msg]),
{noreply, State}.
%% ----------------------------------------------------------
-%% handle_info(Request, State)
+%% # handle_info/2
%% ----------------------------------------------------------
handle_info({'DOWN', _MRef, process, Pid, _}, State) ->
@@ -235,91 +214,62 @@ handle_info(Info, State) ->
{noreply, State}.
%% ----------------------------------------------------------
-%% terminate(Reason, State)
+%% # terminate/2
%% ----------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%% ----------------------------------------------------------
-%% code_change(OldVsn, State, Extra)
+%% # code_change/3
%% ----------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-%%% ---------------------------------------------------------
-%%% INTERNAL FUNCTIONS
-%%% ---------------------------------------------------------
-
-%% monitor/2
-
-monitor(true, Pid) ->
- erlang:monitor(process, Pid);
-monitor(false = No, _) ->
- No.
+%% ===========================================================================
%% down/1
down(Pid) ->
- L = ets:match_object(?TABLE, ?REC({'_', Pid}, '_')),
- [?REC(_, Ref) = T] = lookup(Pid),
+ down(lookup(Pid), ets:match_object(?TABLE, {{'_', Pid}, '_'})).
+
+down([{_, Ref} = T], L) ->
fold(Ref, L),
- delete_object(T),
+ delete([T|L]);
+down([], L) -> %% flushed
delete(L).
-%% Fold Pid-based entries into Ref-based ones.
+%% Fold pid-based entries into ref-based ones.
fold(Ref, L) ->
- lists:foreach(fun(?REC({K, _}, V)) -> update_counter({{K, Ref}, V}) end,
- L).
-
-delete(Objs) ->
- lists:foreach(fun delete_object/1, Objs).
-
-%% fetch/1
-
-fetch(X) ->
- MatchSpec = [{?REC({'_', '$1'}, '_'),
- [?ORCOND([{'==', '$1', {const, T}} || T <- [X | ref(X)]])],
- ['$_']}],
- L = ets:select(?TABLE, MatchSpec),
- delete(L),
- D = lists:foldl(fun sum/2, dict:new(), L),
- dict:to_list(D).
-
-sum({{Ctr, _}, N}, Dict) ->
- dict:update(Ctr, fun(V) -> V+N end, N, Dict).
-
-ref(Pid)
- when is_pid(Pid) ->
- ets:select(?TABLE, [{?REC(Pid, '$1'), [], ['$1']}]);
-ref(_) ->
- [].
+ lists:foreach(fun({{K, _}, V}) -> update_counter({{K, Ref}, V}) end, L).
%% update_counter/2
%%
-%% From an arbitrary request process. Cast to the server process to
-%% insert a new element if the counter doesn't exists so that two
-%% processes don't do so simultaneously.
+%% From an arbitrary process. Call to the server process to insert a
+%% new element if the counter doesn't exists so that two processes
+%% don't insert simultaneously.
update_counter(Key, N) ->
try
ets:update_counter(?TABLE, Key, N)
catch
error: badarg ->
- cast({incr, ?REC(Key, N)})
+ call({incr, {Key, N}})
end.
%% update_counter/1
%%
-%% From the server process.
+%% From the server process, when update_counter/2 failed due to a
+%% non-existent entry.
-update_counter(?REC(Key, N) = T) ->
+update_counter({{_Ctr, Ref} = Key, N} = T) ->
try
ets:update_counter(?TABLE, Key, N)
catch
error: badarg ->
- insert(T)
+ (not is_pid(Ref) orelse ets:member(?TABLE, Ref))
+ andalso begin insert(T), N end
end.
insert(T) ->
@@ -328,13 +278,8 @@ insert(T) ->
lookup(Key) ->
ets:lookup(?TABLE, Key).
-delete_object(T) ->
- ets:delete_object(?TABLE, T).
-
-%% cast/1
-
-cast(Msg) ->
- gen_server:cast(?SERVER, Msg).
+delete(Objs) ->
+ lists:foreach(fun({K,_}) -> ets:delete(?TABLE, K) end, Objs).
%% call/1
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index fb22fd8275..d7474e5c56 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -54,7 +54,7 @@
%% number of DWAs received during reopen
%% end PCB
parent = self() :: pid(),
- transport :: pid(),
+ transport :: pid() | undefined,
tref :: reference(), %% reference for current watchdog timer
message_data}). %% term passed into diameter_service with message
@@ -64,6 +64,13 @@
%% that a failed capabilities exchange produces the desired exit
%% reason.
+-spec start(Type, {RecvData, [Opt], SvcName, #diameter_service{}})
+ -> {reference(), pid()}
+ when Type :: {connect|accept, diameter:transport_ref()},
+ RecvData :: term(),
+ Opt :: diameter:transport_opt(),
+ SvcName :: diameter:service_name().
+
start({_,_} = Type, T) ->
Ref = make_ref(),
{ok, Pid} = diameter_watchdog_sup:start_child({Ref, {Type, self(), T}}),
@@ -102,7 +109,7 @@ i({_, Pid, _} = T) -> %% from old code
erlang:monitor(process, Pid),
make_state(T).
-make_state({T, Pid, {ConnT,
+make_state({T, Pid, {RecvData,
Opts,
SvcName,
#diameter_service{applications = Apps,
@@ -116,7 +123,7 @@ make_state({T, Pid, {ConnT,
tw = proplists:get_value(watchdog_timer,
Opts,
?DEFAULT_TW_INIT),
- message_data = {ConnT, SvcName, Apps}}.
+ message_data = {RecvData, SvcName, Apps}}.
%% handle_call/3
@@ -134,14 +141,36 @@ handle_info(T, State) ->
case transition(T, State) of
ok ->
{noreply, State};
- #watchdog{status = X} = S ->
- ?LOGC(X =/= State#watchdog.status, transition, X),
+ #watchdog{} = S ->
+ event(State, S),
{noreply, S};
stop ->
?LOG(stop, T),
+ event(State, State#watchdog{status = down}),
{stop, {shutdown, T}, State}
end.
+event(#watchdog{status = T}, #watchdog{status = T}) ->
+ ok;
+
+event(#watchdog{transport = undefined}, #watchdog{transport = undefined}) ->
+ ok;
+
+event(#watchdog{status = From, transport = F, parent = Pid},
+ #watchdog{status = To, transport = T}) ->
+ E = {tpid(F,T), From, To},
+ notify(Pid, E),
+ ?LOG(transition, {self(), E}).
+
+tpid(_, Pid)
+ when is_pid(Pid) ->
+ Pid;
+tpid(Pid, _) ->
+ Pid.
+
+notify(Pid, E) ->
+ Pid ! {watchdog, self(), E}.
+
%% terminate/2
terminate(_, _) ->
@@ -251,8 +280,8 @@ transition({'DOWN', _, process, TPid, _},
status = initial}) ->
stop;
-transition({'DOWN', _, process, Pid, _},
- #watchdog{transport = Pid}
+transition({'DOWN', _, process, TPid, _},
+ #watchdog{transport = TPid}
= S) ->
failover(S),
close(S),
@@ -385,7 +414,7 @@ recv(Name, Pkt, S) ->
rcv(Name, Pkt, S),
NS
catch
- throw: {?MODULE, throwaway, #watchdog{} = NS} ->
+ {?MODULE, throwaway, #watchdog{} = NS} ->
NS
end.
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 9a65834647..79b8b851fb 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -68,6 +68,7 @@
-record(transport,
{parent :: pid(),
mode :: {accept, pid()}
+ | accept
| {connect, {list(inet:ip_address()), uint(), list()}}
%% {RAs, RP, Errors}
| connect,
@@ -349,6 +350,11 @@ terminate(_, #transport{assoc_id = undefined}) ->
ok;
terminate(_, #transport{socket = Sock,
+ mode = accept,
+ assoc_id = Id}) ->
+ close(Sock, Id);
+
+terminate(_, #transport{socket = Sock,
mode = {accept, _},
assoc_id = Id}) ->
close(Sock, Id);
@@ -380,13 +386,16 @@ start_timer(S) ->
%% Incoming message from SCTP.
l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) ->
- setopts(Sock),
- case find(Data, S) of
+ Id = assoc_id(Data),
+
+ try find(Id, Data, S) of
{TPid, NewS} ->
- TPid ! Msg,
+ TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg},
NewS;
false ->
S
+ after
+ setopts(Sock)
end;
%% Transport is asking message to be sent. See send/3 for why the send
@@ -454,15 +463,19 @@ t(T,S) ->
%% transition/2
-%% Incoming message.
-transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock,
- mode = {accept, _}}
- = S) ->
- recv(Data, S);
+%% Listening process is transfering ownership of an association.
+transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg},
+ #transport{mode = {accept, _},
+ socket = LSock}
+ = S) ->
+ transition(Msg, S#transport{socket = Sock});
-transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
+%% Incoming message.
+transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
setopts(Sock),
recv(Data, S);
+%% Don't match on Sock since in R15B01 it can be the listening socket
+%% in the (peeled-off) accept case, which is likely a bug.
%% Outgoing message.
transition({diameter, {send, Msg}}, S) ->
@@ -480,13 +493,18 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
stop;
+%% Parent process has died.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+ stop;
+
%% Listener process has died.
transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) ->
stop;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
- stop;
+%% Ditto but we have ownership of the association. It might be that
+%% we'll go down anyway though.
+transition({'DOWN', _, process, _Pid, _}, #transport{mode = accept}) ->
+ ok;
%% Request for the local port number.
transition({resolve_port, Pid}, #transport{socket = Sock})
@@ -545,14 +563,6 @@ send(Bin, #transport{streams = {_, OS},
%% send/3
-%% Messages have to be sent from the controlling process, which is
-%% probably a bug. Sending from here causes an inet_reply, Sock,
-%% Status} message to be sent to the controlling process while
-%% gen_sctp:send/4 here hangs.
-send(StreamId, Bin, #transport{assoc_id = AId,
- mode = {accept, LPid}}) ->
- LPid ! {send, AId, StreamId, Bin};
-
send(StreamId, Bin, #transport{socket = Sock,
assoc_id = AId}) ->
send(Sock, AId, StreamId, Bin).
@@ -635,21 +645,15 @@ up(#transport{parent = Pid,
S#transport{mode = C};
up(#transport{parent = Pid,
- mode = {accept, _}}
+ mode = {accept = A, _}}
= S) ->
diameter_peer:up(Pid),
- S.
+ S#transport{mode = A}.
-%% find/2
+%% find/3
-find({[#sctp_sndrcvinfo{assoc_id = Id}], _}
- = Data,
- #listener{tmap = T}
- = S) ->
- f(ets:lookup(T, Id), Data, S);
-
-find({_, Rec} = Data, #listener{tmap = T} = S) ->
- f(ets:lookup(T, assoc_id(Rec)), Data, S).
+find(Id, Data, #listener{tmap = T} = S) ->
+ f(ets:lookup(T, Id), Data, S).
%% New association and a transport waiting for one: use it.
f([],
@@ -690,17 +694,29 @@ f([], _, _) ->
%% assoc_id/1
-assoc_id(#sctp_shutdown_event{assoc_id = Id}) ->
+assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) ->
+ Id;
+assoc_id({_, Rec}) ->
+ id(Rec).
+
+id(#sctp_shutdown_event{assoc_id = Id}) ->
Id;
-assoc_id(#sctp_assoc_change{assoc_id = Id}) ->
+id(#sctp_assoc_change{assoc_id = Id}) ->
Id;
-assoc_id(#sctp_sndrcvinfo{assoc_id = Id}) ->
+id(#sctp_sndrcvinfo{assoc_id = Id}) ->
Id;
-assoc_id(#sctp_paddr_change{assoc_id = Id}) ->
+id(#sctp_paddr_change{assoc_id = Id}) ->
Id;
-assoc_id(#sctp_adaptation_event{assoc_id = Id}) ->
+id(#sctp_adaptation_event{assoc_id = Id}) ->
Id.
+%% peeloff/3
+
+peeloff(LSock, Id, TPid) ->
+ {ok, Sock} = gen_sctp:peeloff(LSock, Id),
+ ok = gen_sctp:controlling_process(Sock, TPid),
+ Sock.
+
%% connect/4
connect(_, [], _, Reasons) ->
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 597f2f14d7..f3fbbee609 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -80,7 +80,7 @@
%% Accepting/connecting transport process state.
-record(transport,
- {socket :: inet:socket(), %% accept or connect socket
+ {socket :: inet:socket() | ssl:sslsock(), %% accept or connect socket
parent :: pid(), %% of process that started us
module :: module(), %% gen_tcp-like module
frag = <<>> :: binary() | {tref(), frag()}, %% message fragment