aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src')
-rw-r--r--lib/diameter/src/Makefile18
-rw-r--r--lib/diameter/src/base/diameter.erl1
-rw-r--r--lib/diameter/src/base/diameter_codec.erl11
-rw-r--r--lib/diameter/src/base/diameter_config.erl5
-rw-r--r--lib/diameter/src/base/diameter_peer.erl130
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl176
-rw-r--r--lib/diameter/src/base/diameter_service.erl493
-rw-r--r--lib/diameter/src/base/diameter_stats.erl265
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl47
-rw-r--r--lib/diameter/src/modules.mk1
-rw-r--r--lib/diameter/src/transport/diameter_etcp.erl22
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl129
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl75
-rw-r--r--lib/diameter/src/transport/diameter_transport.erl55
14 files changed, 996 insertions, 432 deletions
diff --git a/lib/diameter/src/Makefile b/lib/diameter/src/Makefile
index 26f5ae480e..99c343275b 100644
--- a/lib/diameter/src/Makefile
+++ b/lib/diameter/src/Makefile
@@ -220,27 +220,27 @@ endif
release_spec: opt
for d in bin ebin include src/dict; do \
- $(INSTALL_DIR) $(RELSYSDIR)/$$d; \
+ $(INSTALL_DIR) "$(RELSYSDIR)/$$d"; \
done
- $(INSTALL_SCRIPT) $(BINS:%=../bin/%) $(RELSYSDIR)/bin
- $(INSTALL_DATA) $(TARGET_FILES) $(RELSYSDIR)/ebin
+ $(INSTALL_SCRIPT) $(BINS:%=../bin/%) "$(RELSYSDIR)/bin"
+ $(INSTALL_DATA) $(TARGET_FILES) "$(RELSYSDIR)/ebin"
$(INSTALL_DATA) $(EXTERNAL_HRLS:%=../include/%) $(DICT_HRLS) \
- $(RELSYSDIR)/include
- $(INSTALL_DATA) $(DICTS:%=dict/%.dia) $(RELSYSDIR)/src/dict
+ "$(RELSYSDIR)/include"
+ $(INSTALL_DATA) $(DICTS:%=dict/%.dia) "$(RELSYSDIR)/src/dict"
$(MAKE) $(TARGET_DIRS:%/=release_src_%)
$(MAKE) $(EXAMPLE_DIRS:%/=release_examples_%)
$(TARGET_DIRS:%/=release_src_%): release_src_%:
- $(INSTALL_DIR) $(RELSYSDIR)/src/$*
+ $(INSTALL_DIR) "$(RELSYSDIR)/src/$*"
$(INSTALL_DATA) $(filter $*/%, $(TARGET_MODULES:%=%.erl) \
$(INTERNAL_HRLS)) \
$(filter $*/%, compiler/$(DICT_YRL).yrl) \
- $(RELSYSDIR)/src/$*
+ "$(RELSYSDIR)/src/$*"
$(EXAMPLE_DIRS:%/=release_examples_%): release_examples_%:
- $(INSTALL_DIR) $(RELSYSDIR)/examples/$*
+ $(INSTALL_DIR) "$(RELSYSDIR)/examples/$*"
$(INSTALL_DATA) $(patsubst %, ../examples/%, $(filter $*/%, $(EXAMPLES))) \
- $(RELSYSDIR)/examples/$*
+ "$(RELSYSDIR)/examples/$*"
release_docs_spec:
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl
index 336f0c1f2d..6703841f80 100644
--- a/lib/diameter/src/base/diameter.erl
+++ b/lib/diameter/src/base/diameter.erl
@@ -312,6 +312,7 @@ call(SvcName, App, Message) ->
-type transport_opt()
:: {transport_module, atom()}
| {transport_config, any()}
+ | {transport_timeout, non_neg_integer() | infinity}
| {applications, [app_alias()]}
| {capabilities, [capability()]}
| {capabilities_cb, evaluable()}
diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl
index fe1212b7e0..421e280422 100644
--- a/lib/diameter/src/base/diameter_codec.erl
+++ b/lib/diameter/src/base/diameter_codec.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -63,9 +63,9 @@ encode(Mod, #diameter_packet{} = Pkt) ->
e(Mod, Pkt)
catch
error: Reason ->
- %% Be verbose rather than letting the emulator truncate the
- %% error report.
- X = {Reason, ?STACK},
+ %% Be verbose since a crash report may be truncated and
+ %% encode errors are self-inflicted.
+ X = {?MODULE, encode, {Reason, ?STACK}},
diameter_lib:error_report(X, {?MODULE, encode, [Mod, Pkt]}),
exit(X)
end;
@@ -91,7 +91,8 @@ e(_, #diameter_packet{msg = [#diameter_header{} = Hdr | As]} = Pkt) ->
Flags = make_flags(0, Hdr),
- Pkt#diameter_packet{bin = <<Vsn:8, Length:24,
+ Pkt#diameter_packet{header = Hdr,
+ bin = <<Vsn:8, Length:24,
Flags:8, Code:24,
Aid:32,
Hid:32,
diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl
index 9253af0de2..e47f63f814 100644
--- a/lib/diameter/src/base/diameter_config.erl
+++ b/lib/diameter/src/base/diameter_config.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -519,6 +519,7 @@ rm(SvcName, L) ->
Refs = lists:map(fun(#transport{ref = R}) -> R end, L),
case stop_transport(SvcName, Refs) of
ok ->
+ diameter_stats:flush(Refs),
lists:foreach(fun delete_object/1, L);
{error, _} = No ->
No
@@ -600,7 +601,7 @@ app_acc({application, Opts}, Acc) ->
module = init_mod(Mod),
init_state = ModS,
mutable = init_mutable(M),
- answer_errors = init_answers(A)}
+ options = [{answer_errors, init_answers(A)}]}
| Acc];
app_acc(_, Acc) ->
Acc.
diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl
index 3e78c4caef..a2a1c567d8 100644
--- a/lib/diameter/src/base/diameter_peer.erl
+++ b/lib/diameter/src/base/diameter_peer.erl
@@ -27,12 +27,15 @@
up/2]).
%% ... and the stack.
--export([start/3,
+-export([start/1,
send/2,
close/1,
abort/1,
notify/2]).
+%% Old interface only called from old code.
+-export([start/3]). %% < diameter-1.2 (R15B02)
+
%% Server start.
-export([start_link/0]).
@@ -57,6 +60,11 @@
%% Server state.
-record(state, {id = now()}).
+%% Default transport_module/config.
+-define(DEFAULT_TMOD, diameter_tcp).
+-define(DEFAULT_TCFG, []).
+-define(DEFAULT_TTMO, infinity).
+
%%% ---------------------------------------------------------------------------
%%% # notify/2
%%% ---------------------------------------------------------------------------
@@ -68,9 +76,108 @@ notify(SvcName, T) ->
%%% # start/3
%%% ---------------------------------------------------------------------------
-start(T, Opts, #diameter_service{} = Svc) ->
- {Mod, Cfg} = split_transport(Opts),
- apply(Mod, start, [T, Svc, Cfg]).
+%% From old code: make is restart.
+start(_T, _Opts, #diameter_service{}) ->
+ {error, restart}.
+
+%%% ---------------------------------------------------------------------------
+%%% # start/1
+%%% ---------------------------------------------------------------------------
+
+-spec start({T, [Opt], #diameter_service{}})
+ -> {TPid, [Addr], Tmo, Data}
+ | {error, [term()]}
+ when T :: {connect|accept, diameter:transport_ref()},
+ Opt :: diameter:transport_opt(),
+ TPid :: pid(),
+ Addr :: inet:ip_address(),
+ Tmo :: non_neg_integer(),
+ Data :: {{T, Mod, Cfg}, [Mod], [{T, [Mod], Cfg}], [Err]},
+ Mod :: module(),
+ Cfg :: term(),
+ Err :: term().
+
+%% Initial start.
+start({T, Opts, #diameter_service{} = Svc}) ->
+ start(T, Svc, pair(Opts, [], []), []);
+
+%% Subsequent start.
+start({#diameter_service{} = Svc, Tmo, {{T, _, Cfg}, Ms, Rest, Errs}}) ->
+ start(T, Ms, Cfg, Svc, Tmo, Rest, Errs).
+
+%% pair/3
+%%
+%% Pair transport modules with config.
+
+%% Another transport_module: accumulate it.
+pair([{transport_module, M} | Rest], Mods, Acc) ->
+ pair(Rest, [M|Mods], Acc);
+
+%% Another transport_config: accumulate another tuple.
+pair([{transport_config = T, C} | Rest], Mods, Acc) ->
+ pair([{T, C, ?DEFAULT_TTMO} | Rest], Mods, Acc);
+pair([{transport_config, C, Tmo} | Rest], Mods, Acc) ->
+ pair(Rest, [], acc({Mods, C, Tmo}, Acc));
+
+pair([_ | Rest], Mods, Acc) ->
+ pair(Rest, Mods, Acc);
+
+%% No transport_module or transport_config: defaults.
+pair([], [], []) ->
+ [{[?DEFAULT_TMOD], ?DEFAULT_TCFG, ?DEFAULT_TTMO}];
+
+%% One transport_module, one transport_config.
+pair([], [M], [{[], Cfg, Tmo}]) ->
+ [{[M], Cfg, Tmo}];
+
+%% Trailing transport_module: default transport_config.
+pair([], [_|_] = Mods, Acc) ->
+ lists:reverse(acc({Mods, ?DEFAULT_TCFG, ?DEFAULT_TTMO}, Acc));
+
+pair([], [], Acc) ->
+ lists:reverse(def(Acc)).
+
+%% acc/2
+
+acc(T, Acc) ->
+ [T | def(Acc)].
+
+%% def/1
+%%
+%% Default module of previous pair if none were specified.
+
+def([{[], Cfg, Tmo} | Acc]) ->
+ [{[?DEFAULT_TMOD], Cfg, Tmo} | Acc];
+def(Acc) ->
+ Acc.
+
+%% start/4
+
+start(T, Svc, [{Ms, Cfg, Tmo} | Rest], Errs) ->
+ start(T, Ms, Cfg, Svc, Tmo, Rest, Errs);
+
+start(_, _, [], Errs) ->
+ {error, Errs}.
+
+%% start/7
+
+start(T, [], _, Svc, _, Rest, Errs) ->
+ start(T, Svc, Rest, Errs);
+
+start(T, [M|Ms], Cfg, Svc, Tmo, Rest, Errs) ->
+ case start(M, [T, Svc, Cfg]) of
+ {ok, TPid} ->
+ {TPid, [], Tmo, {{T, M, Cfg}, Ms, Rest, Errs}};
+ {ok, TPid, [_|_] = Addrs} ->
+ {TPid, Addrs, Tmo, {{T, M, Cfg}, Ms, Rest, Errs}};
+ E ->
+ start(T, Ms, Cfg, Svc, Tmo, Rest, [E|Errs])
+ end.
+
+%% start/2
+
+start(Mod, Args) ->
+ apply(Mod, start, Args).
%%% ---------------------------------------------------------------------------
%%% # up/[12]
@@ -204,21 +311,6 @@ bang(undefined = No, _) ->
bang(Pid, T) ->
Pid ! T.
-%% split_transport/1
-%%
-%% Split options into transport module, transport config and
-%% remaining options.
-
-split_transport(Opts) ->
- {[M,C], _} = proplists:split(Opts, [transport_module,
- transport_config]),
- {value(M, diameter_tcp), value(C, [])}.
-
-value([{_,V}], _) ->
- V;
-value([], V) ->
- V.
-
%% call/1
call(Request) ->
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 99644814d2..302540e76b 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -54,6 +54,12 @@
-define(NO_INBAND_SECURITY, 0).
-define(TLS, 1).
+%% Keys in process dictionary.
+-define(CB_KEY, cb). %% capabilities callback
+-define(DWA_KEY, dwa). %% outgoing DWA
+-define(Q_KEY, q). %% transport start queue
+-define(START_KEY, start). %% start of connected transport
+
%% A 2xxx series Result-Code. Not necessarily 2001.
-define(IS_SUCCESS(N), 2 == (N) div 1000).
@@ -115,16 +121,20 @@
%%% Output: Pid
%%% ---------------------------------------------------------------------------
+-spec start(T, [Opt], #diameter_service{})
+ -> pid()
+ when T :: {connect|accept, diameter:transport_ref()},
+ Opt :: diameter:transport_opt().
+
%% diameter_config requires a non-empty list of applications on the
%% service but diameter_service then constrains the list to any
%% specified on the transport in question. Check here that the list is
%% still non-empty.
-start({_, Ref} = Type, Opts, #diameter_service{applications = Apps} = Svc) ->
+start({_,_} = Type, Opts, #diameter_service{applications = Apps} = Svc) ->
[] /= Apps orelse ?ERROR({no_apps, Type, Opts}),
T = {self(), Type, Opts, Svc},
{ok, Pid} = diameter_peer_fsm_sup:start_child(T),
- diameter_stats:reg(Pid, Ref),
Pid.
start_link(T) ->
@@ -143,18 +153,18 @@ init(T) ->
proc_lib:init_ack({ok, self()}),
gen_server:enter_loop(?MODULE, [], i(T)).
-i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc0}) ->
- putr(dwa, dwa(Caps)),
+i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc}) ->
+ putr(?DWA_KEY, dwa(Caps)),
{M, Ref} = T,
+ diameter_stats:reg(Ref),
{[Ts], Rest} = proplists:split(Opts, [capabilities_cb]),
- putr(capabilities_cb, {Ref, [F || {_,F} <- Ts]}),
- {ok, TPid, Svc} = start_transport(T, Rest, Svc0),
- erlang:monitor(process, TPid),
+ putr(?CB_KEY, {Ref, [F || {_,F} <- Ts]}),
erlang:monitor(process, WPid),
+ {TPid, Addrs} = start_transport(T, Rest, Svc),
#state{parent = WPid,
transport = TPid,
mode = M,
- service = Svc}.
+ service = svc(Svc, Addrs)}.
%% The transport returns its local ip addresses so that different
%% transports on the same service can use different local addresses.
%% The local addresses are put into Host-IP-Address avps here when
@@ -164,18 +174,56 @@ i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc0}) ->
%% watchdog start (start/2) succeeds regardless so as not to crash the
%% service.
-start_transport(T, Opts, Svc) ->
- case diameter_peer:start(T, Opts, Svc) of
- {ok, TPid} ->
- {ok, TPid, Svc};
- {ok, TPid, [_|_] = Addrs} ->
- #diameter_service{capabilities = Caps0} = Svc,
- Caps = Caps0#diameter_caps{host_ip_address = Addrs},
- {ok, TPid, Svc#diameter_service{capabilities = Caps}};
+start_transport(T, Opts, #diameter_service{capabilities = Caps} = Svc) ->
+ Addrs0 = Caps#diameter_caps.host_ip_address,
+ start_transport(Addrs0, {T, Opts, Svc}).
+
+start_transport(Addrs0, T) ->
+ case diameter_peer:start(T) of
+ {TPid, Addrs, Tmo, Data} ->
+ erlang:monitor(process, TPid),
+ q_next(TPid, Addrs0, Tmo, Data),
+ {TPid, addrs(Addrs, Addrs0)};
No ->
exit({shutdown, No})
end.
+addrs([], Addrs0) ->
+ Addrs0;
+addrs(Addrs, _) ->
+ Addrs.
+
+svc(Svc, []) ->
+ Svc;
+svc(Svc, Addrs) ->
+ readdr(Svc, Addrs).
+
+readdr(#diameter_service{capabilities = Caps0} = Svc, Addrs) ->
+ Caps = Caps0#diameter_caps{host_ip_address = Addrs},
+ Svc#diameter_service{capabilities = Caps}.
+
+%% The 4-tuple Data returned from diameter_peer:start/1 identifies the
+%% transport module/config use to start the transport process in
+%% question as well as any alternates to try if a connection isn't
+%% established within Tmo.
+q_next(TPid, Addrs0, Tmo, {_,_,_,_} = Data) ->
+ send_after(Tmo, {connection_timeout, TPid}),
+ putr(?Q_KEY, {Addrs0, Tmo, Data}).
+
+%% Connection has been established: retain the started
+%% pid/module/config in the process dictionary. This is a part of the
+%% interface defined by this module, so that the transport pid can be
+%% found when constructing service_info (in order to extract further
+%% information from it).
+keep_transport(TPid) ->
+ {_, _, {{_,_,_} = T, _, _, _}} = eraser(?Q_KEY),
+ putr(?START_KEY, {TPid, T}).
+
+send_after(infinity, _) ->
+ ok;
+send_after(Tmo, T) ->
+ erlang:send_after(Tmo, self(), T).
+
%% handle_call/3
handle_call(_, _, State) ->
@@ -202,14 +250,27 @@ handle_info(T, #state{} = State) ->
?LOG(stop, T),
x(T, State)
catch
+ exit: {diameter_codec, encode, _} = Reason ->
+ close_wd(Reason, State#state.parent),
+ ?LOG(stop, Reason),
+ %% diameter_codec:encode/2 emits an error report. Only
+ %% indicate the probable reason here.
+ diameter_lib:info_report(probable_configuration_error,
+ insufficient_capabilities),
+ {stop, {shutdown, Reason}, State};
{?MODULE, Tag, Reason} ->
?LOG(Tag, {Reason, T}),
{stop, {shutdown, Reason}, State}
end.
-%% The form of the exception caught here is historical. It's
+%% The form of the throw caught here is historical. It's
%% significant that it's not a 2-tuple, as in ?FAILURE(Reason),
%% since these are caught elsewhere.
+%% Note that there's no guarantee that the service and transport
+%% capabilities are good enough to build a CER/CEA that can be
+%% succesfully encoded. It's not checked at diameter:add_transport/2
+%% since this can be called before creating the service.
+
x(Reason, #state{} = S) ->
close_wd(Reason, S),
{stop, {shutdown, Reason}, S}.
@@ -240,25 +301,48 @@ eraser(Key) ->
%% Connection to peer.
transition({diameter, {TPid, connected, Remote}},
- #state{state = PS,
+ #state{transport = TPid,
+ state = PS,
mode = M}
= S) ->
'Wait-Conn-Ack' = PS, %% assert
connect = M, %%
- send_CER(S#state{mode = {M, Remote},
- transport = TPid});
+ keep_transport(TPid),
+ send_CER(S#state{mode = {M, Remote}});
%% Connection from peer.
transition({diameter, {TPid, connected}},
- #state{state = PS,
+ #state{transport = TPid,
+ state = PS,
mode = M,
parent = Pid}
= S) ->
'Wait-Conn-Ack' = PS, %% assert
accept = M, %%
+ keep_transport(TPid),
Pid ! {accepted, self()},
- start_timer(S#state{state = recv_CER,
- transport = TPid});
+ start_timer(S#state{state = recv_CER});
+
+%% Connection established after receiving a connection_timeout
+%% message. This may be followed by an incoming message which arrived
+%% before the transport was killed and this can't be distinguished
+%% from one from the transport that's been started to replace it.
+transition({diameter, {_, connected}}, _) ->
+ {stop, connection_timeout};
+transition({diameter, {_, connected, _}}, _) ->
+ {stop, connection_timeout};
+
+%% Connection has timed out: start an alternate.
+transition({connection_timeout = T, TPid},
+ #state{transport = TPid,
+ state = 'Wait-Conn-Ack'}
+ = S) ->
+ exit(TPid, {shutdown, T}),
+ start_next(S);
+
+%% Connect timeout after connection or alternate start: ignore.
+transition({connection_timeout, _}, _) ->
+ ok;
%% Incoming message from the transport.
transition({diameter, {recv, Pkt}}, S) ->
@@ -305,14 +389,21 @@ transition({resolve_port, _Pid} = T, #state{transport = TPid}) ->
TPid ! T,
ok;
-%% Parent or transport has died.
-transition({'DOWN', _, process, P, _},
- #state{parent = Pid,
- transport = TPid})
- when P == Pid;
- P == TPid ->
+%% Parent has died.
+transition({'DOWN', _, process, WPid, _},
+ #state{parent = WPid}) ->
stop;
+%% Transport has died before connection timeout.
+transition({'DOWN', _, process, TPid, _},
+ #state{transport = TPid}
+ = S) ->
+ start_next(S);
+
+%% Transport has died after connection timeout.
+transition({'DOWN', _, process, _, _}, _) ->
+ ok;
+
%% State query.
transition({state, Pid}, #state{state = S, transport = TPid}) ->
Pid ! {self(), [S, TPid]},
@@ -320,6 +411,19 @@ transition({state, Pid}, #state{state = S, transport = TPid}) ->
%% Crash on anything unexpected.
+%% start_next/1
+
+start_next(#state{service = Svc0} = S) ->
+ case getr(?Q_KEY) of
+ {Addrs0, Tmo, Data} ->
+ Svc = readdr(Svc0, Addrs0),
+ {TPid, Addrs} = start_transport(Addrs0, {Svc, Tmo, Data}),
+ S#state{transport = TPid,
+ service = svc(Svc, Addrs)};
+ undefined ->
+ stop
+ end.
+
%% send_CER/1
send_CER(#state{mode = {connect, Remote},
@@ -649,7 +753,7 @@ rc([RC|_]) ->
%% answer/2
answer('DWR', _) ->
- getr(dwa);
+ getr(?DWA_KEY);
answer(Name, #state{service = #diameter_service{capabilities = Caps}}) ->
a(Name, Caps).
@@ -749,15 +853,15 @@ caps(#state{service = Svc}) ->
%% caps_cb/1
caps_cb(Caps) ->
- {Ref, Ts} = eraser(capabilities_cb),
- ccb(Ts, [Ref, Caps]).
+ {Ref, Ts} = eraser(?CB_KEY),
+ caps_cb(Ts, [Ref, Caps]).
-ccb([], _) ->
+caps_cb([], _) ->
ok;
-ccb([F | Rest], T) ->
+caps_cb([F | Rest], T) ->
case diameter_lib:eval([F|T]) of
ok ->
- ccb(Rest, T);
+ caps_cb(Rest, T);
N when ?IS_SUCCESS(N) -> %% 2xxx result code: accept immediately
N;
Res ->
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index 3dfdcee2b2..5b8a399758 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -43,8 +43,7 @@
subscriptions/0,
services/0,
services/1,
- whois/1,
- flush_stats/1]).
+ whois/1]).
%% test/debug
-export([call_module/3,
@@ -65,13 +64,33 @@
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
+%% The "old" states maintained in this module historically.
-define(STATE_UP, up).
-define(STATE_DOWN, down).
+-type op_state() :: ?STATE_UP
+ | ?STATE_DOWN.
+
+%% The RFC 3539 watchdog states that are now maintained, albeit
+%% along with the old up/down. okay = up, else down.
+-define(WD_INITIAL, initial).
+-define(WD_OKAY, okay).
+-define(WD_SUSPECT, suspect).
+-define(WD_DOWN, down).
+-define(WD_REOPEN, reopen).
+
+-type wd_state() :: ?WD_INITIAL
+ | ?WD_OKAY
+ | ?WD_SUSPECT
+ | ?WD_DOWN
+ | ?WD_REOPEN.
+
-define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1
-define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests
-define(RESTART_TC, 1000). %% if restart was this recent
+-define(RELAY, ?DIAMETER_DICT_RELAY).
+
%% Used to be able to swap this with anything else dict-like but now
%% rely on the fact that a service's #state{} record does not change
%% in storing in it ?STATE table and not always going through the
@@ -117,7 +136,8 @@
type :: match(connect | accept),
ref :: match(reference()), %% key into diameter_config
options :: match([diameter:transport_opt()]),%% from start_transport
- op_state = ?STATE_DOWN :: match(?STATE_DOWN | ?STATE_UP),
+ op_state = {?STATE_DOWN, ?WD_INITIAL}
+ :: match(op_state() | {op_state(), wd_state()}),
started = now(), %% at process start
conn = false :: match(boolean() | pid())}).
%% true at accept, pid() at connection_up (connT key)
@@ -388,15 +408,6 @@ whois(SvcName) ->
undefined
end.
-%%% ---------------------------------------------------------------------------
-%%% # flush_stats/1
-%%%
-%%% Output: list of {{SvcName, Alias, Counter}, Value}
-%%% ---------------------------------------------------------------------------
-
-flush_stats(TPid) ->
- diameter_stats:flush(TPid).
-
%% ===========================================================================
%% ===========================================================================
@@ -516,6 +527,34 @@ transition({reconnect, Pid}, S) ->
reconnect(Pid, S),
ok;
+%% Watchdog is sending notification of a state transition. Note that
+%% the connection_up/down messages are pre-date this message and are
+%% still used. A 'watchdog' message will follow these and communicate
+%% the same state as was set in handling connection_up/down.
+transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName,
+ peerT = PeerT}) ->
+ #peer{ref = Ref, type = T, options = Opts, op_state = {OS,_}}
+ = P
+ = fetch(PeerT, Pid),
+ insert(PeerT, P#peer{op_state = {OS, To}}),
+ send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}),
+ ok;
+%% Death of a peer process results in the removal of it's peer and any
+%% associated conn record when 'DOWN' is received (after this) but the
+%% states will be {?STATE_UP, ?WD_DOWN} for a short time. (No real
+%% problem since ?WD_* is only used in service_info.) We set ?WD_OKAY
+%% as a consequence of connection_up since we know a watchdog is
+%% coming. We can't set anything at connection_down since we don't
+%% know if the subsequent watchdog message will be ?WD_DOWN or
+%% ?WD_SUSPECT. We don't (yet) set ?STATE_* as a consequence of a
+%% watchdog message since this requires changing some of the matching
+%% on ?STATE_*.
+%%
+%% Death of a conn process results in connection_down followed by
+%% watchdog ?WD_DOWN. The latter doesn't result in the conn record
+%% being deleted since 'DOWN' from death of its peer doesn't (yet)
+%% deal with the record having been removed.
+
%% Monitor process has died. Just die with a reason that tells
%% diameter_config about the happening. If a cleaner shutdown is
%% required then someone should stop us.
@@ -879,7 +918,14 @@ accepted(Pid, _TPid, #state{peerT = PeerT} = S) ->
fetch(Tid, Key) ->
[T] = ets:lookup(Tid, Key),
- T.
+ case T of
+ #peer{op_state = ?STATE_UP} = P ->
+ P#peer{op_state = {?STATE_UP, ?WD_OKAY}};
+ #peer{op_state = ?STATE_DOWN} = P ->
+ P#peer{op_state = {?STATE_DOWN, ?WD_DOWN}};
+ _ ->
+ T
+ end.
%%% ---------------------------------------------------------------------------
%%% # connection_up/3
@@ -925,12 +971,12 @@ connection_up(T, P, C, #state{peerT = PeerT,
service
= #diameter_service{applications = Apps}}
= S) ->
- #peer{conn = TPid, op_state = ?STATE_DOWN}
+ #peer{conn = TPid, op_state = {?STATE_DOWN, _}}
= P,
#conn{apps = SApps, caps = Caps}
= C,
- insert(PeerT, P#peer{op_state = ?STATE_UP}),
+ insert(PeerT, P#peer{op_state = {?STATE_UP, ?WD_OKAY}}),
request_peer_up(TPid),
report_status(up, P, C, S, T),
@@ -945,27 +991,35 @@ ilp({Id, Alias}, {TC, SA}, LDict) ->
init_conn(Id, Alias, TC, SA),
?Dict:append(Alias, TC, LDict).
-init_conn(Id, Alias, TC, {SvcName, Apps}) ->
+init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) ->
#diameter_app{module = ModX,
id = Id} %% assert
= find_app(Alias, Apps),
- peer_cb({ModX, peer_up, [SvcName, TC]}, Alias).
+ peer_cb({ModX, peer_up, [SvcName, TC]}, Alias)
+ orelse exit(TPid, kill). %% fake transport failure
+
+%% find_app/2
find_app(Alias, Apps) ->
- lists:keyfind(Alias, #diameter_app.alias, Apps).
+ case lists:keyfind(Alias, #diameter_app.alias, Apps) of
+ #diameter_app{options = E} = A when is_atom(E) -> %% upgrade
+ A#diameter_app{options = [{answer_errors, E}]};
+ A ->
+ A
+ end.
-%% A failing peer callback brings down the service. In the case of
-%% peer_up we could just kill the transport and emit an error but for
-%% peer_down we have no way to cleanup any state change that peer_up
-%% may have introduced.
+%% Don't bring down the service (and all associated connections)
+%% regardless of what happens.
peer_cb(MFA, Alias) ->
try state_cb(MFA, Alias) of
ModS ->
- mod_state(Alias, ModS)
+ mod_state(Alias, ModS),
+ true
catch
- E: Reason ->
- ?ERROR({E, Reason, MFA, ?STACK})
+ E:R ->
+ diameter_lib:error_report({failure, {E, R, Alias, ?STACK}}, MFA),
+ false
end.
%%% ---------------------------------------------------------------------------
@@ -979,22 +1033,22 @@ peer_cb(MFA, Alias) ->
connection_down(Pid, #state{peerT = PeerT,
connT = ConnT}
= S) ->
- #peer{op_state = ?STATE_UP, %% assert
+ #peer{op_state = {?STATE_UP, WS}, %% assert
conn = TPid}
= P
= fetch(PeerT, Pid),
C = fetch(ConnT, TPid),
- insert(PeerT, P#peer{op_state = ?STATE_DOWN}),
+ insert(PeerT, P#peer{op_state = {?STATE_DOWN, WS}}),
connection_down(P,C,S).
%% connection_down/3
-connection_down(#peer{op_state = ?STATE_DOWN}, _, S) ->
+connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, S) ->
S;
connection_down(#peer{conn = TPid,
- op_state = ?STATE_UP}
+ op_state = {?STATE_UP, _}}
= P,
#conn{caps = Caps,
apps = SApps}
@@ -1043,7 +1097,7 @@ peer_down(Pid, Reason, #state{peerT = PeerT} = S) ->
%% Send an event at connection establishment failure.
closed({shutdown, {close, _TPid, Reason}},
- #peer{op_state = ?STATE_DOWN,
+ #peer{op_state = {?STATE_DOWN, _},
ref = Ref,
type = Type,
options = Opts},
@@ -1352,7 +1406,7 @@ send_request(Pkt, TPid, Caps, App, Opts, Caller, SvcName) ->
#diameter_app{alias = Alias,
dictionary = Dict,
module = ModX,
- answer_errors = AE}
+ options = [{answer_errors, AE} | _]}
= App,
EPkt = encode(Dict, Pkt),
@@ -1935,6 +1989,12 @@ is_loop(Code, Vid, OH, Avps) ->
%%
%% Send a locally originating reply.
+%% Skip the setting of Result-Code and Failed-AVP's below.
+reply([Msg], Dict, TPid, Pkt)
+ when is_list(Msg);
+ is_tuple(Msg) ->
+ reply(Msg, Dict, TPid, Pkt#diameter_packet{errors = []});
+
%% No errors or a diameter_header/avp list.
reply(Msg, Dict, TPid, #diameter_packet{errors = Es,
transport_data = TD}
@@ -1942,7 +2002,7 @@ reply(Msg, Dict, TPid, #diameter_packet{errors = Es,
when [] == Es;
is_record(hd(Msg), diameter_header) ->
Pkt = diameter_codec:encode(Dict, make_answer_packet(Msg, ReqPkt)),
- incr(send, Pkt, TPid), %% count result codes in sent answers
+ incr(send, Pkt, Dict, TPid), %% count result codes in sent answers
send(TPid, Pkt#diameter_packet{transport_data = TD});
%% Or not: set Result-Code and Failed-AVP AVP's.
@@ -1983,7 +2043,10 @@ rc(RC) ->
rc(Rec, RC, Failed, Dict)
when is_integer(RC) ->
- set(Rec, [{'Result-Code', RC} | failed_avp(Rec, Failed, Dict)], Dict).
+ set(Rec,
+ lists:append([rc(Rec, {'Result-Code', RC}, Dict),
+ failed_avp(Rec, Failed, Dict)]),
+ Dict).
%% Reply as name and tuple list ...
set([_|_] = Ans, Avps, _) ->
@@ -1993,6 +2056,22 @@ set([_|_] = Ans, Avps, _) ->
set(Rec, Avps, Dict) ->
Dict:'#set-'(Avps, Rec).
+%% rc/3
+%%
+%% Turn the result code into a list if its optional and only set it if
+%% the arity is 1 or {0,1}. In other cases (which probably shouldn't
+%% exist in practise) we can't know what's appropriate.
+
+rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) ->
+ case Dict:avp_arity(MsgName, 'Result-Code') of
+ 1 -> [T];
+ {0,1} -> [{K, [RC]}];
+ _ -> []
+ end;
+
+rc(Rec, T, Dict) ->
+ rc([Dict:rec2msg(element(1, Rec))], T, Dict).
+
%% failed_avp/3
failed_avp(_, [] = No, _) ->
@@ -2200,44 +2279,39 @@ handle_answer(SvcName, _, {error, Req, Reason}) ->
handle_answer(SvcName,
AnswerErrors,
{answer, #request{dictionary = Dict} = Req, Pkt}) ->
- a(examine(diameter_codec:decode(Dict, Pkt)),
- SvcName,
- AnswerErrors,
- Req).
+ answer(examine(diameter_codec:decode(Dict, Pkt)),
+ SvcName,
+ AnswerErrors,
+ Req).
%% We don't really need to do a full decode if we're a relay and will
%% just resend with a new hop by hop identifier, but might a proxy
%% want to examine the answer?
-a(#diameter_packet{errors = []}
- = Pkt,
- SvcName,
- AE,
- #request{transport = TPid,
- caps = Caps,
- packet = P}
- = Req) ->
+answer(Pkt, SvcName, AE, #request{transport = TPid,
+ dictionary = Dict}
+ = Req) ->
try
- incr(in, Pkt, TPid)
+ incr(recv, Pkt, Dict, TPid)
of
- _ ->
- cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}])
+ _ -> a(Pkt, SvcName, AE, Req)
catch
exit: {invalid_error_bit, _} = E ->
- e(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req)
- end;
-
-a(#diameter_packet{} = Pkt, SvcName, AE, Req) ->
- e(Pkt, SvcName, AE, Req).
+ a(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req)
+ end.
-e(Pkt, SvcName, callback, #request{transport = TPid,
- caps = Caps,
- packet = Pkt}
- = Req) ->
- cb(Req, handle_answer, [Pkt, msg(Pkt), SvcName, {TPid, Caps}]);
-e(Pkt, SvcName, report, Req) ->
+a(#diameter_packet{errors = Es} = Pkt, SvcName, AE, #request{transport = TPid,
+ caps = Caps,
+ packet = P}
+ = Req)
+ when [] == Es;
+ callback == AE ->
+ cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]);
+
+a(Pkt, SvcName, report, Req) ->
x(errors, handle_answer, [SvcName, Req, Pkt]);
-e(Pkt, SvcName, discard, Req) ->
+
+a(Pkt, SvcName, discard, Req) ->
x({errors, handle_answer, [SvcName, Req, Pkt]}).
%% Note that we don't check that the application id in the answer's
@@ -2249,17 +2323,19 @@ e(Pkt, SvcName, discard, Req) ->
%% Increment a stats counter for an incoming or outgoing message.
%% TODO: fix
-incr(_, #diameter_packet{msg = undefined}, _) ->
+incr(_, #diameter_packet{msg = undefined}, _, _) ->
ok;
-incr(Dir, Pkt, TPid)
- when is_pid(TPid) ->
+incr(recv = D, #diameter_packet{header = H, errors = [_|_]}, _, TPid) ->
+ incr(TPid, {diameter_codec:msg_id(H), D, error});
+
+incr(Dir, Pkt, Dict, TPid) ->
#diameter_packet{header = #diameter_header{is_error = E}
= Hdr,
msg = Rec}
= Pkt,
- RC = int(get_avp_value(?BASE, 'Result-Code', Rec)),
+ RC = int(get_avp_value(Dict, 'Result-Code', Rec)),
PE = is_protocol_error(RC),
%% Check that the E bit is set only for 3xxx result codes.
@@ -2267,15 +2343,21 @@ incr(Dir, Pkt, TPid)
orelse (E andalso PE)
orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]),
- Ctr = rc_counter(Rec, RC),
- is_tuple(Ctr)
- andalso incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}).
+ irc(TPid, Hdr, Dir, rc_counter(Dict, Rec, RC)).
+
+irc(_, _, _, undefined) ->
+ false;
+
+irc(TPid, Hdr, Dir, Ctr) ->
+ incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}).
%% incr/2
incr(TPid, Counter) ->
diameter_stats:incr(Counter, TPid, 1).
+%% error_counter/2
+
%% RFC 3588, 7.6:
%%
%% All Diameter answer messages defined in vendor-specific
@@ -2285,26 +2367,27 @@ incr(TPid, Counter) ->
%% Maintain statistics assuming one or the other, not both, which is
%% surely the intent of the RFC.
-rc_counter(_, RC)
- when is_integer(RC) ->
- {'Result-Code', RC};
-rc_counter(Rec, _) ->
- rcc(get_avp_value(?BASE, 'Experimental-Result', Rec)).
+rc_counter(Dict, Rec, undefined) ->
+ er(get_avp_value(Dict, 'Experimental-Result', Rec));
+rc_counter(_, _, RC) ->
+ {'Result-Code', RC}.
%% Outgoing answers may be in any of the forms messages can be sent
%% in. Incoming messages will be records. We're assuming here that the
%% arity of the result code AVP's is 0 or 1.
-rcc([{_,_,RC} = T])
- when is_integer(RC) ->
+er([{_,_,N} = T | _])
+ when is_integer(N) ->
T;
-rcc({_,_,RC} = T)
- when is_integer(RC) ->
+er({_,_,N} = T)
+ when is_integer(N) ->
T;
-rcc(_) ->
+er(_) ->
undefined.
-int([N])
+%% Extract the first good looking integer. There's no guarantee
+%% that what we're looking for has arity 1.
+int([N|_])
when is_integer(N) ->
N;
int(N)
@@ -2349,8 +2432,11 @@ rt(#request{packet = #diameter_packet{msg = undefined}}, _) ->
false; %% TODO: Not what we should do.
%% ... or not.
-rt(#request{packet = #diameter_packet{msg = Msg}} = Req, S) ->
- find_transport(get_destination(Msg), Req, S).
+rt(#request{packet = #diameter_packet{msg = Msg},
+ dictionary = Dict}
+ = Req,
+ S) ->
+ find_transport(get_destination(Dict, Msg), Req, S).
%%% ---------------------------------------------------------------------------
%%% # report_status/5
@@ -2462,12 +2548,12 @@ find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) ->
find_transport(#diameter_app{} = App, Msg, Opts, S) ->
ft(App, Msg, Opts, S).
-ft(#diameter_app{module = Mod} = App, Msg, Opts, S) ->
+ft(#diameter_app{module = Mod, dictionary = Dict} = App, Msg, Opts, S) ->
#options{filter = Filter,
extra = Xtra}
= Opts,
pick_peer(App#diameter_app{module = Mod ++ Xtra},
- get_destination(Msg),
+ get_destination(Dict, Msg),
Filter,
S);
ft(false = No, _, _, _) ->
@@ -2503,11 +2589,11 @@ find_transport([_,_] = RH,
Filter,
S).
-%% get_destination/1
+%% get_destination/2
-get_destination(Msg) ->
- [str(get_avp_value(?BASE, 'Destination-Realm', Msg)),
- str(get_avp_value(?BASE, 'Destination-Host', Msg))].
+get_destination(Dict, Msg) ->
+ [str(get_avp_value(Dict, 'Destination-Realm', Msg)),
+ str(get_avp_value(Dict, 'Destination-Host', Msg))].
%% This is not entirely correct. The avp could have an arity 1, in
%% which case an empty list is a DiameterIdentity of length 0 rather
@@ -2531,6 +2617,9 @@ str(T) ->
%% question. The third form allows messages to be sent as is, without
%% a dictionary, which is needed in the case of relay agents, for one.
+get_avp_value(?RELAY, Name, Msg) ->
+ get_avp_value(?BASE, Name, Msg);
+
get_avp_value(Dict, Name, [#diameter_header{} | Avps]) ->
try
{Code, _, VId} = Dict:avp_header(Name),
@@ -2746,20 +2835,45 @@ transports(#state{peerT = PeerT}) ->
'Vendor-Specific-Application-Id',
'Firmware-Revision']).
+%% The config returned by diameter:service_info(SvcName, all).
-define(ALL_INFO, [capabilities,
applications,
transport,
- pending,
- statistics]).
+ pending]).
+
+%% The rest.
+-define(OTHER_INFO, [connections,
+ name,
+ peers,
+ statistics]).
-service_info(Items, S)
- when is_list(Items) ->
- [{complete(I), service_info(I,S)} || I <- Items];
service_info(Item, S)
when is_atom(Item) ->
- service_info(Item, S, true).
+ case tagged_info(Item, S) of
+ {_, T} -> T;
+ undefined = No -> No
+ end;
+
+service_info(Items, S) ->
+ tagged_info(Items, S).
-service_info(Item, #state{service = Svc} = S, Complete) ->
+tagged_info(Item, S)
+ when is_atom(Item) ->
+ case complete(Item) of
+ {value, I} ->
+ {I, complete_info(I,S)};
+ false ->
+ undefined
+ end;
+
+tagged_info(Items, S)
+ when is_list(Items) ->
+ [T || I <- Items, T <- [tagged_info(I,S)], T /= undefined, T /= []];
+
+tagged_info(_, _) ->
+ undefined.
+
+complete_info(Item, #state{service = Svc} = S) ->
case Item of
name ->
S#state.service_name;
@@ -2803,70 +2917,119 @@ service_info(Item, #state{service = Svc} = S, Complete) ->
applications -> info_apps(S);
transport -> info_transport(S);
pending -> info_pending(S);
- statistics -> info_stats(S);
- keys -> ?ALL_INFO ++ ?CAP_INFO; %% mostly for test
+ keys -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO;
all -> service_info(?ALL_INFO, S);
- _ when Complete -> service_info(complete(Item), S, false);
- _ -> undefined
+ statistics -> info_stats(S);
+ connections -> info_connections(S);
+ peers -> info_peers(S)
end.
+complete(I)
+ when I == keys;
+ I == all ->
+ {value, I};
complete(Pre) ->
P = atom_to_list(Pre),
- case [I || I <- [name | ?ALL_INFO] ++ ?CAP_INFO,
+ case [I || I <- ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO,
lists:prefix(P, atom_to_list(I))]
of
- [I] -> I;
- _ -> Pre
+ [I] -> {value, I};
+ _ -> false
end.
+%% info_stats/1
+
info_stats(#state{peerT = PeerT}) ->
- Peers = ets:select(PeerT, [{#peer{ref = '$1', conn = '$2', _ = '_'},
- [{'is_pid', '$2'}],
- [['$1', '$2']]}]),
- diameter_stats:read(lists:append(Peers)).
-%% TODO: include peer identities in return value
-
-info_transport(#state{peerT = PeerT, connT = ConnT}) ->
- dict:fold(fun it/3,
+ MatchSpec = [{#peer{ref = '$1', conn = '$2', _ = '_'},
+ [{'is_pid', '$2'}],
+ [['$1', '$2']]}],
+ diameter_stats:read(lists:append(ets:select(PeerT, MatchSpec))).
+
+%% info_transport/1
+%%
+%% One entry per configured transport. Statistics for each entry are
+%% the accumulated values for the ref and associated peer pids.
+
+info_transport(S) ->
+ PeerD = peer_dict(S),
+ RefsD = dict:map(fun(_, Ls) -> [P || L <- Ls, {peer, {P,_}} <- L] end,
+ PeerD),
+ Refs = lists:append(dict:fold(fun(R, Ps, A) -> [[R|Ps] | A] end,
+ [],
+ RefsD)),
+ Stats = diameter_stats:read(Refs),
+ dict:fold(fun(R, Ls, A) ->
+ Ps = dict:fetch(R, RefsD),
+ [[{ref, R} | transport(Ls)] ++ [stats([R|Ps], Stats)]
+ | A]
+ end,
[],
- ets:foldl(fun(T,A) -> it_acc(ConnT, A, T) end,
- dict:new(),
- PeerT)).
-
-it(Ref, [[{type, connect} | _] = L], Acc) ->
- [[{ref, Ref} | L] | Acc];
-it(Ref, [[{type, accept}, {options, Opts} | _] | _] = L, Acc) ->
- [[{ref, Ref},
- {type, listen},
- {options, Opts},
- {accept, [lists:nthtail(2,A) || A <- L]}]
- | Acc].
-%% Each entry has the same Opts. (TODO)
-
-it_acc(ConnT, Acc, #peer{pid = Pid,
- type = Type,
- ref = Ref,
- options = Opts,
- op_state = OS,
- started = T,
- conn = TPid}) ->
+ PeerD).
+
+transport([[{type, connect} | _] = L]) ->
+ L;
+
+transport([[{type, accept}, {options, Opts} | _] | _] = Ls) ->
+ [{type, listen},
+ {options, Opts},
+ {accept, [lists:nthtail(2,L) || L <- Ls]}].
+%% Note that all peer records for a listening transport (ie. same Ref)
+%% have the same options. (TODO)
+
+peer_dict(#state{peerT = PeerT, connT = ConnT}) ->
+ ets:foldl(fun(T,A) -> peer_acc(ConnT, A, T) end, dict:new(), PeerT).
+
+peer_acc(ConnT, Acc, #peer{pid = Pid,
+ type = Type,
+ ref = Ref,
+ options = Opts,
+ op_state = OS,
+ started = T,
+ conn = TPid}) ->
+ WS = wd_state(OS),
dict:append(Ref,
[{type, Type},
{options, Opts},
- {watchdog, {Pid, T, OS}}
- | info_conn(ConnT, TPid)],
+ {watchdog, {Pid, T, WS}}
+ | info_conn(ConnT, TPid, WS /= ?WD_DOWN)],
Acc).
-info_conn(ConnT, TPid) ->
- info_conn(ets:lookup(ConnT, TPid)).
+info_conn(ConnT, TPid, true)
+ when is_pid(TPid) ->
+ info_conn(ets:lookup(ConnT, TPid));
+info_conn(_, _, _) ->
+ [].
+
+wd_state({_,S}) ->
+ S;
+wd_state(?STATE_UP) ->
+ ?WD_OKAY;
+wd_state(?STATE_DOWN) ->
+ ?WD_DOWN.
info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
[{peer, {Pid, T}},
{apps, SApps},
- {caps, info_caps(Caps)}];
+ {caps, info_caps(Caps)}
+ | try [{port, info_port(Pid)}] catch _:_ -> [] end];
info_conn([] = No) ->
No.
+%% Extract information that the processes involved are expected to
+%% "publish" in their process dictionaries. Simple but backhanded.
+info_port(Pid) ->
+ {_, PD} = process_info(Pid, dictionary),
+ {_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD),
+ {TPid, {_Type, TMod, _Cfg}} = T,
+ {_, TD} = process_info(TPid, dictionary),
+ {_, Data} = lists:keyfind({TMod, info}, 1, TD),
+ [{owner, TPid}, {module, TMod} | [_|_] = TMod:info(Data)].
+
+%% Use the fields names from diameter_caps instead of
+%% diameter_base_CER to distinguish between the 2-tuple values
+%% compared to the single capabilities values. Note also that the
+%% returned list is tagged 'caps' rather than 'capabilities' to
+%% emphasize the difference.
info_caps(#diameter_caps{} = C) ->
lists:zip(record_info(fields, diameter_caps), tl(tuple_to_list(C))).
@@ -2882,6 +3045,10 @@ mk_app(#diameter_app{alias = Alias,
{module, ModX},
{id, Id}].
+%% info_pending/1
+%%
+%% One entry for each outgoing request whose answer is outstanding.
+
info_pending(#state{} = S) ->
MatchSpec = [{{'$1',
#request{transport = '$2',
@@ -2895,3 +3062,59 @@ info_pending(#state{} = S) ->
{{from, '$3'}}]}}]}],
ets:select(?REQUEST_TABLE, MatchSpec).
+
+%% info_connections/1
+%%
+%% One entry per transport connection. Statistics for each entry are
+%% for the peer pid only.
+
+info_connections(S) ->
+ ConnL = conn_list(S),
+ Stats = diameter_stats:read([P || L <- ConnL, {peer, {P,_}} <- L]),
+ [L ++ [stats([P], Stats)] || L <- ConnL, {peer, {P,_}} <- L].
+
+conn_list(S) ->
+ lists:append(dict:fold(fun conn_acc/3, [], peer_dict(S))).
+
+conn_acc(Ref, Peers, Acc) ->
+ [[[{ref, Ref} | L] || L <- Peers, lists:keymember(peer, 1, L)]
+ | Acc].
+
+stats(Refs, Stats) ->
+ {statistics, dict:to_list(lists:foldl(fun(R,D) ->
+ stats_acc(R, D, Stats)
+ end,
+ dict:new(),
+ Refs))}.
+
+stats_acc(Ref, Dict, Stats) ->
+ lists:foldl(fun({C,N}, D) -> dict:update_counter(C, N, D) end,
+ Dict,
+ proplists:get_value(Ref, Stats, [])).
+
+%% info_peers/1
+%%
+%% One entry per peer Origin-Host. Statistics for each entry are
+%% accumulated values for all associated transport refs and peer pids.
+
+info_peers(S) ->
+ ConnL = conn_list(S),
+ {PeerD, RefD} = lists:foldl(fun peer_acc/2,
+ {dict:new(), dict:new()},
+ ConnL),
+ Refs = lists:append(dict:fold(fun(_, Rs, A) -> [lists:append(Rs) | A] end,
+ [],
+ RefD)),
+ Stats = diameter_stats:read(Refs),
+ dict:fold(fun(OH, Cs, A) ->
+ Rs = lists:append(dict:fetch(OH, RefD)),
+ [{OH, [{connections, Cs}, stats(Rs, Stats)]}
+ | A]
+ end,
+ [],
+ PeerD).
+
+peer_acc(Peer, {PeerD, RefD}) ->
+ [Ref, {TPid, _}, [{origin_host, {_, OH}} | _]]
+ = [proplists:get_value(K, Peer) || K <- [ref, peer, caps]],
+ {dict:append(OH, Peer, PeerD), dict:append(OH, [Ref, TPid], RefD)}.
diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl
index 71479afa95..70727d068e 100644
--- a/lib/diameter/src/base/diameter_stats.erl
+++ b/lib/diameter/src/base/diameter_stats.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -22,14 +22,13 @@
%%
-module(diameter_stats).
--compile({no_auto_import, [monitor/2]}).
-behaviour(gen_server).
--export([reg/1, reg/2,
- incr/1, incr/2, incr/3,
+-export([reg/2, reg/1,
+ incr/3, incr/1,
read/1,
- flush/0, flush/1]).
+ flush/1]).
%% supervisor callback
-export([start_link/0]).
@@ -48,123 +47,105 @@
-include("diameter_internal.hrl").
-%% ets table containing stats. reg(Pid, Ref) inserts a {Pid, Ref},
-%% incr(Counter, X, N) updates the counter keyed at {Counter, X}, and
-%% Pid death causes counters keyed on {Counter, Pid} to be deleted and
-%% added to those keyed on {Counter, Ref}.
+%% ets table containing 2-tuple stats. reg(Pid, Ref) inserts a {Pid,
+%% Ref}, incr(Counter, X, N) updates the counter keyed at {Counter,
+%% X}, and Pid death causes counters keyed on {Counter, Pid} to be
+%% deleted and added to those keyed on {Counter, Ref}.
-define(TABLE, ?MODULE).
%% Name of registered server.
-define(SERVER, ?MODULE).
-%% Entries in the table.
--define(REC(Key, Value), {Key, Value}).
-
%% Server state.
-record(state, {id = now()}).
-type counter() :: any().
--type contrib() :: any().
-
-%%% ---------------------------------------------------------------------------
-%%% # reg(Pid, Contrib)
-%%%
-%%% Description: Register a process as a contributor of statistics
-%%% associated with a specified term. Statistics can be
-%%% contributed by specifying either Pid or Contrib as
-%%% the second argument to incr/3. Statistics contributed
-%%% by Pid are folded into the corresponding entry for
-%%% Contrib when the process dies.
-%%%
-%%% Contrib can be any term but should not be a pid
-%%% passed as the first argument to reg/2. Subsequent
-%%% registrations for the same Pid overwrite the association
-%%% ---------------------------------------------------------------------------
-
--spec reg(pid(), contrib())
- -> true.
+-type ref() :: any().
+
+%% ---------------------------------------------------------------------------
+%% # reg(Pid, Ref)
+%%
+%% Register a process as a contributor of statistics associated with a
+%% specified term. Statistics can be contributed by specifying either
+%% Pid or Ref as the second argument to incr/3. Statistics contributed
+%% by Pid are folded into the corresponding entry for Ref when the
+%% process dies.
+%% ---------------------------------------------------------------------------
+
+-spec reg(pid(), ref())
+ -> boolean().
-reg(Pid, Contrib)
+reg(Pid, Ref)
when is_pid(Pid) ->
- call({reg, Pid, Contrib}).
+ call({reg, Pid, Ref}).
--spec reg(contrib())
+-spec reg(ref())
-> true.
reg(Ref) ->
reg(self(), Ref).
-%%% ---------------------------------------------------------------------------
-%%% # incr(Counter, Contrib, N)
-%%%
-%%% Description: Increment a counter for the specified contributor.
-%%%
-%%% Contrib will typically be an argument passed to reg/2
-%%% but there's nothing that requires this. In particular,
-%%% if Contrib is a pid that hasn't been registered then
-%%% counters are unaffected by the death of the process.
-%%% ---------------------------------------------------------------------------
-
--spec incr(counter(), contrib(), integer())
- -> integer().
+%% ---------------------------------------------------------------------------
+%% # incr(Counter, Ref, N)
+%%
+%% Increment a counter for the specified contributor.
+%%
+%% Ref will typically be an argument passed to reg/2 but there's
+%% nothing that requires this. Only registered pids can contribute
+%% counters however, otherwise incr/3 is a no-op.
+%% ---------------------------------------------------------------------------
-incr(Ctr, Contrib, N) ->
- update_counter({Ctr, Contrib}, N).
+-spec incr(counter(), ref(), integer())
+ -> integer() | false.
-incr(Ctr, N)
+incr(Ctr, Ref, N)
when is_integer(N) ->
- incr(Ctr, self(), N);
-
-incr(Ctr, Contrib) ->
- incr(Ctr, Contrib, 1).
+ update_counter({Ctr, Ref}, N).
incr(Ctr) ->
incr(Ctr, self(), 1).
-%%% ---------------------------------------------------------------------------
-%%% # read(Contribs)
-%%%
-%%% Description: Retrieve counters for the specified contributors.
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # read(Refs)
+%%
+%% Retrieve counters for the specified contributors.
+%% ---------------------------------------------------------------------------
--spec read([contrib()])
- -> [{contrib(), [{counter(), integer()}]}].
+-spec read([ref()])
+ -> [{ref(), [{counter(), integer()}]}].
-read(Contribs) ->
- lists:foldl(fun(?REC({T,C}, N), D) -> orddict:append(C, {T,N}, D) end,
+read(Refs) ->
+ read(Refs, false).
+
+read(Refs, B) ->
+ MatchSpec = [{{{'_', '$1'}, '_'},
+ [?ORCOND([{'=:=', '$1', {const, R}}
+ || R <- Refs])],
+ ['$_']}],
+ L = ets:select(?TABLE, MatchSpec),
+ B andalso delete(L),
+ lists:foldl(fun({{C,R}, N}, D) -> orddict:append(R, {C,N}, D) end,
orddict:new(),
- ets:select(?TABLE, [{?REC({'_', '$1'}, '_'),
- [?ORCOND([{'=:=', '$1', {const, C}}
- || C <- Contribs])],
- ['$_']}])).
-
-%%% ---------------------------------------------------------------------------
-%%% # flush(Contrib)
-%%%
-%%% Description: Retrieve and delete statistics for the specified
-%%% contributor.
-%%%
-%%% If Contrib is a pid registered with reg/2 then statistics
-%%% for both and its associated contributor are retrieved.
-%%% ---------------------------------------------------------------------------
-
--spec flush(contrib())
- -> [{counter(), integer()}].
+ L).
+
+%% ---------------------------------------------------------------------------
+%% # flush(Refs)
+%%
+%% Retrieve and delete statistics for the specified contributors.
+%% ---------------------------------------------------------------------------
+
+-spec flush([ref()])
+ -> [{ref(), {counter(), integer()}}].
-flush(Contrib) ->
+flush(Refs) ->
try
- call({flush, Contrib})
+ call({flush, Refs})
catch
exit: _ ->
[]
end.
-flush() ->
- flush(self()).
-
-%%% ---------------------------------------------------------
-%%% EXPORTED INTERNAL FUNCTIONS
-%%% ---------------------------------------------------------
+%% ===========================================================================
start_link() ->
ServerName = {local, ?SERVER},
@@ -179,18 +160,16 @@ state() ->
uptime() ->
call(uptime).
-%%% ----------------------------------------------------------
-%%% # init(_)
-%%%
-%%% Output: {ok, State}
-%%% ----------------------------------------------------------
+%% ----------------------------------------------------------
+%% # init/1
+%% ----------------------------------------------------------
init([]) ->
ets:new(?TABLE, [named_table, ordered_set, public]),
{ok, #state{}}.
%% ----------------------------------------------------------
-%% handle_call(Request, From, State)
+%% # handle_call/3
%% ----------------------------------------------------------
handle_call(state, _, State) ->
@@ -199,31 +178,31 @@ handle_call(state, _, State) ->
handle_call(uptime, _, #state{id = Time} = State) ->
{reply, diameter_lib:now_diff(Time), State};
-handle_call({reg, Pid, Contrib}, _From, State) ->
- monitor(not ets:member(?TABLE, Pid), Pid),
- {reply, insert(?REC(Pid, Contrib)), State};
+handle_call({incr, T}, _, State) ->
+ {reply, update_counter(T), State};
-handle_call({flush, Contrib}, _From, State) ->
- {reply, fetch(Contrib), State};
+handle_call({reg, Pid, Ref}, _From, State) ->
+ B = ets:insert_new(?TABLE, {Pid, Ref}),
+ B andalso erlang:monitor(process, Pid),
+ {reply, B, State};
+
+handle_call({flush, Refs}, _From, State) ->
+ {reply, read(Refs, true), State};
handle_call(Req, From, State) ->
?UNEXPECTED([Req, From]),
{reply, nok, State}.
%% ----------------------------------------------------------
-%% handle_cast(Request, State)
+%% # handle_cast/2
%% ----------------------------------------------------------
-handle_cast({incr, Rec}, State) ->
- update_counter(Rec),
- {noreply, State};
-
handle_cast(Msg, State) ->
?UNEXPECTED([Msg]),
{noreply, State}.
%% ----------------------------------------------------------
-%% handle_info(Request, State)
+%% # handle_info/2
%% ----------------------------------------------------------
handle_info({'DOWN', _MRef, process, Pid, _}, State) ->
@@ -235,91 +214,62 @@ handle_info(Info, State) ->
{noreply, State}.
%% ----------------------------------------------------------
-%% terminate(Reason, State)
+%% # terminate/2
%% ----------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%% ----------------------------------------------------------
-%% code_change(OldVsn, State, Extra)
+%% # code_change/3
%% ----------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-%%% ---------------------------------------------------------
-%%% INTERNAL FUNCTIONS
-%%% ---------------------------------------------------------
-
-%% monitor/2
-
-monitor(true, Pid) ->
- erlang:monitor(process, Pid);
-monitor(false = No, _) ->
- No.
+%% ===========================================================================
%% down/1
down(Pid) ->
- L = ets:match_object(?TABLE, ?REC({'_', Pid}, '_')),
- [?REC(_, Ref) = T] = lookup(Pid),
+ down(lookup(Pid), ets:match_object(?TABLE, {{'_', Pid}, '_'})).
+
+down([{_, Ref} = T], L) ->
fold(Ref, L),
- delete_object(T),
+ delete([T|L]);
+down([], L) -> %% flushed
delete(L).
-%% Fold Pid-based entries into Ref-based ones.
+%% Fold pid-based entries into ref-based ones.
fold(Ref, L) ->
- lists:foreach(fun(?REC({K, _}, V)) -> update_counter({{K, Ref}, V}) end,
- L).
-
-delete(Objs) ->
- lists:foreach(fun delete_object/1, Objs).
-
-%% fetch/1
-
-fetch(X) ->
- MatchSpec = [{?REC({'_', '$1'}, '_'),
- [?ORCOND([{'==', '$1', {const, T}} || T <- [X | ref(X)]])],
- ['$_']}],
- L = ets:select(?TABLE, MatchSpec),
- delete(L),
- D = lists:foldl(fun sum/2, dict:new(), L),
- dict:to_list(D).
-
-sum({{Ctr, _}, N}, Dict) ->
- dict:update(Ctr, fun(V) -> V+N end, N, Dict).
-
-ref(Pid)
- when is_pid(Pid) ->
- ets:select(?TABLE, [{?REC(Pid, '$1'), [], ['$1']}]);
-ref(_) ->
- [].
+ lists:foreach(fun({{K, _}, V}) -> update_counter({{K, Ref}, V}) end, L).
%% update_counter/2
%%
-%% From an arbitrary request process. Cast to the server process to
-%% insert a new element if the counter doesn't exists so that two
-%% processes don't do so simultaneously.
+%% From an arbitrary process. Call to the server process to insert a
+%% new element if the counter doesn't exists so that two processes
+%% don't insert simultaneously.
update_counter(Key, N) ->
try
ets:update_counter(?TABLE, Key, N)
catch
error: badarg ->
- cast({incr, ?REC(Key, N)})
+ call({incr, {Key, N}})
end.
%% update_counter/1
%%
-%% From the server process.
+%% From the server process, when update_counter/2 failed due to a
+%% non-existent entry.
-update_counter(?REC(Key, N) = T) ->
+update_counter({{_Ctr, Ref} = Key, N} = T) ->
try
ets:update_counter(?TABLE, Key, N)
catch
error: badarg ->
- insert(T)
+ (not is_pid(Ref) orelse ets:member(?TABLE, Ref))
+ andalso begin insert(T), N end
end.
insert(T) ->
@@ -328,13 +278,8 @@ insert(T) ->
lookup(Key) ->
ets:lookup(?TABLE, Key).
-delete_object(T) ->
- ets:delete_object(?TABLE, T).
-
-%% cast/1
-
-cast(Msg) ->
- gen_server:cast(?SERVER, Msg).
+delete(Objs) ->
+ lists:foreach(fun({K,_}) -> ets:delete(?TABLE, K) end, Objs).
%% call/1
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index fb22fd8275..d7474e5c56 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -54,7 +54,7 @@
%% number of DWAs received during reopen
%% end PCB
parent = self() :: pid(),
- transport :: pid(),
+ transport :: pid() | undefined,
tref :: reference(), %% reference for current watchdog timer
message_data}). %% term passed into diameter_service with message
@@ -64,6 +64,13 @@
%% that a failed capabilities exchange produces the desired exit
%% reason.
+-spec start(Type, {RecvData, [Opt], SvcName, #diameter_service{}})
+ -> {reference(), pid()}
+ when Type :: {connect|accept, diameter:transport_ref()},
+ RecvData :: term(),
+ Opt :: diameter:transport_opt(),
+ SvcName :: diameter:service_name().
+
start({_,_} = Type, T) ->
Ref = make_ref(),
{ok, Pid} = diameter_watchdog_sup:start_child({Ref, {Type, self(), T}}),
@@ -102,7 +109,7 @@ i({_, Pid, _} = T) -> %% from old code
erlang:monitor(process, Pid),
make_state(T).
-make_state({T, Pid, {ConnT,
+make_state({T, Pid, {RecvData,
Opts,
SvcName,
#diameter_service{applications = Apps,
@@ -116,7 +123,7 @@ make_state({T, Pid, {ConnT,
tw = proplists:get_value(watchdog_timer,
Opts,
?DEFAULT_TW_INIT),
- message_data = {ConnT, SvcName, Apps}}.
+ message_data = {RecvData, SvcName, Apps}}.
%% handle_call/3
@@ -134,14 +141,36 @@ handle_info(T, State) ->
case transition(T, State) of
ok ->
{noreply, State};
- #watchdog{status = X} = S ->
- ?LOGC(X =/= State#watchdog.status, transition, X),
+ #watchdog{} = S ->
+ event(State, S),
{noreply, S};
stop ->
?LOG(stop, T),
+ event(State, State#watchdog{status = down}),
{stop, {shutdown, T}, State}
end.
+event(#watchdog{status = T}, #watchdog{status = T}) ->
+ ok;
+
+event(#watchdog{transport = undefined}, #watchdog{transport = undefined}) ->
+ ok;
+
+event(#watchdog{status = From, transport = F, parent = Pid},
+ #watchdog{status = To, transport = T}) ->
+ E = {tpid(F,T), From, To},
+ notify(Pid, E),
+ ?LOG(transition, {self(), E}).
+
+tpid(_, Pid)
+ when is_pid(Pid) ->
+ Pid;
+tpid(Pid, _) ->
+ Pid.
+
+notify(Pid, E) ->
+ Pid ! {watchdog, self(), E}.
+
%% terminate/2
terminate(_, _) ->
@@ -251,8 +280,8 @@ transition({'DOWN', _, process, TPid, _},
status = initial}) ->
stop;
-transition({'DOWN', _, process, Pid, _},
- #watchdog{transport = Pid}
+transition({'DOWN', _, process, TPid, _},
+ #watchdog{transport = TPid}
= S) ->
failover(S),
close(S),
@@ -385,7 +414,7 @@ recv(Name, Pkt, S) ->
rcv(Name, Pkt, S),
NS
catch
- throw: {?MODULE, throwaway, #watchdog{} = NS} ->
+ {?MODULE, throwaway, #watchdog{} = NS} ->
NS
end.
diff --git a/lib/diameter/src/modules.mk b/lib/diameter/src/modules.mk
index 7a700a6d53..5d3c4157ae 100644
--- a/lib/diameter/src/modules.mk
+++ b/lib/diameter/src/modules.mk
@@ -58,6 +58,7 @@ RT_MODULES = \
transport/diameter_tcp_sup \
transport/diameter_sctp \
transport/diameter_sctp_sup \
+ transport/diameter_transport \
transport/diameter_transport_sup
# Handwritten (compile time) modules not included in the app file.
diff --git a/lib/diameter/src/transport/diameter_etcp.erl b/lib/diameter/src/transport/diameter_etcp.erl
index d925d62545..cd62cf34fa 100644
--- a/lib/diameter/src/transport/diameter_etcp.erl
+++ b/lib/diameter/src/transport/diameter_etcp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -36,7 +36,9 @@
send/2,
close/1,
setopts/2,
- port/1]).
+ sockname/1,
+ peername/1,
+ getstat/1]).
%% child start
-export([start_link/1]).
@@ -113,10 +115,20 @@ close(Pid) ->
setopts(_, _) ->
ok.
-%% port/1
+%% sockname/1
-port(_) ->
- 3868. %% We have no local port: fake it.
+sockname(_) ->
+ {error, ?MODULE}.
+
+%% peername/1
+
+peername(_) ->
+ {error, ?MODULE}.
+
+%% getstat/1
+
+getstat(_) ->
+ {error, ?MODULE}.
%% start_link/1
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 68b0342cd5..79b8b851fb 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -37,12 +37,18 @@
code_change/3,
terminate/2]).
+-export([info/1]). %% service_info callback
+
-export([ports/0,
ports/1]).
-include_lib("kernel/include/inet_sctp.hrl").
-include_lib("diameter/include/diameter.hrl").
+%% Keys into process dictionary.
+-define(INFO_KEY, info).
+-define(REF_KEY, ref).
+
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
%% The default port for a listener.
@@ -62,6 +68,7 @@
-record(transport,
{parent :: pid(),
mode :: {accept, pid()}
+ | accept
| {connect, {list(inet:ip_address()), uint(), list()}}
%% {RAs, RP, Errors}
| connect,
@@ -134,6 +141,24 @@ start_link(T) ->
diameter_lib:spawn_opts(server, [])).
%% ---------------------------------------------------------------------------
+%% # info/1
+%% ---------------------------------------------------------------------------
+
+info({gen_sctp, Sock}) ->
+ lists:flatmap(fun(K) -> info(K, Sock) end,
+ [{socket, sockname},
+ {peer, peername},
+ {statistics, getstat}]).
+
+info({K,F}, Sock) ->
+ case inet:F(Sock) of
+ {ok, V} ->
+ [{K,V}];
+ _ ->
+ []
+ end.
+
+%% ---------------------------------------------------------------------------
%% # init/1
%% ---------------------------------------------------------------------------
@@ -157,7 +182,7 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As],
[RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps],
{LAs, Sock} = open(Addrs, Rest, 0),
- putr(ref, Ref),
+ putr(?REF_KEY, Ref),
proc_lib:init_ack({ok, self(), LAs}),
erlang:monitor(process, Pid),
#transport{parent = Pid,
@@ -169,7 +194,7 @@ i({connect, _, _, _} = T) -> %% from old code
%% An accepting transport spawned by diameter.
i({accept, Pid, LPid, Sock, Ref})
when is_pid(Pid) ->
- putr(ref, Ref),
+ putr(?REF_KEY, Ref),
proc_lib:init_ack({ok, self()}),
erlang:monitor(process, Pid),
erlang:monitor(process, LPid),
@@ -181,7 +206,7 @@ i({accept, _, _, _} = T) -> %% from old code
%% An accepting transport spawned at association establishment.
i({accept, Ref, LPid, Sock, Id}) ->
- putr(ref, Ref),
+ putr(?REF_KEY, Ref),
proc_lib:init_ack({ok, self()}),
MRef = erlang:monitor(process, LPid),
%% Wait for a signal that the transport has been started before
@@ -325,6 +350,11 @@ terminate(_, #transport{assoc_id = undefined}) ->
ok;
terminate(_, #transport{socket = Sock,
+ mode = accept,
+ assoc_id = Id}) ->
+ close(Sock, Id);
+
+terminate(_, #transport{socket = Sock,
mode = {accept, _},
assoc_id = Id}) ->
close(Sock, Id);
@@ -356,13 +386,16 @@ start_timer(S) ->
%% Incoming message from SCTP.
l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) ->
- setopts(Sock),
- case find(Data, S) of
+ Id = assoc_id(Data),
+
+ try find(Id, Data, S) of
{TPid, NewS} ->
- TPid ! Msg,
+ TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg},
NewS;
false ->
S
+ after
+ setopts(Sock)
end;
%% Transport is asking message to be sent. See send/3 for why the send
@@ -430,15 +463,19 @@ t(T,S) ->
%% transition/2
-%% Incoming message.
-transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock,
- mode = {accept, _}}
- = S) ->
- recv(Data, S);
+%% Listening process is transfering ownership of an association.
+transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg},
+ #transport{mode = {accept, _},
+ socket = LSock}
+ = S) ->
+ transition(Msg, S#transport{socket = Sock});
-transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
+%% Incoming message.
+transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
setopts(Sock),
recv(Data, S);
+%% Don't match on Sock since in R15B01 it can be the listening socket
+%% in the (peeled-off) accept case, which is likely a bug.
%% Outgoing message.
transition({diameter, {send, Msg}}, S) ->
@@ -456,13 +493,18 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
stop;
+%% Parent process has died.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+ stop;
+
%% Listener process has died.
transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) ->
stop;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
- stop;
+%% Ditto but we have ownership of the association. It might be that
+%% we'll go down anyway though.
+transition({'DOWN', _, process, _Pid, _}, #transport{mode = accept}) ->
+ ok;
%% Request for the local port number.
transition({resolve_port, Pid}, #transport{socket = Sock})
@@ -521,14 +563,6 @@ send(Bin, #transport{streams = {_, OS},
%% send/3
-%% Messages have to be sent from the controlling process, which is
-%% probably a bug. Sending from here causes an inet_reply, Sock,
-%% Status} message to be sent to the controlling process while
-%% gen_sctp:send/4 here hangs.
-send(StreamId, Bin, #transport{assoc_id = AId,
- mode = {accept, LPid}}) ->
- LPid ! {send, AId, StreamId, Bin};
-
send(StreamId, Bin, #transport{socket = Sock,
assoc_id = AId}) ->
send(Sock, AId, StreamId, Bin).
@@ -554,10 +588,9 @@ recv({_, #sctp_assoc_change{state = comm_up,
mode = {T, _},
socket = Sock}
= S) ->
- Ref = getr(ref),
+ Ref = getr(?REF_KEY),
is_reference(Ref) %% started in new code
- andalso
- (true = diameter_reg:add_new({?MODULE, T, {Ref, {Id, Sock}}})),
+ andalso publish(T, Ref, Id, Sock),
up(S#transport{assoc_id = Id,
streams = {IS, OS}});
@@ -599,6 +632,10 @@ recv({_, #sctp_paddr_change{}}, _) ->
recv({_, #sctp_pdapi_event{}}, _) ->
ok.
+publish(T, Ref, Id, Sock) ->
+ true = diameter_reg:add_new({?MODULE, T, {Ref, {Id, Sock}}}),
+ putr(?INFO_KEY, {gen_sctp, Sock}). %% for info/1
+
%% up/1
up(#transport{parent = Pid,
@@ -608,21 +645,15 @@ up(#transport{parent = Pid,
S#transport{mode = C};
up(#transport{parent = Pid,
- mode = {accept, _}}
+ mode = {accept = A, _}}
= S) ->
diameter_peer:up(Pid),
- S.
+ S#transport{mode = A}.
-%% find/2
-
-find({[#sctp_sndrcvinfo{assoc_id = Id}], _}
- = Data,
- #listener{tmap = T}
- = S) ->
- f(ets:lookup(T, Id), Data, S);
+%% find/3
-find({_, Rec} = Data, #listener{tmap = T} = S) ->
- f(ets:lookup(T, assoc_id(Rec)), Data, S).
+find(Id, Data, #listener{tmap = T} = S) ->
+ f(ets:lookup(T, Id), Data, S).
%% New association and a transport waiting for one: use it.
f([],
@@ -663,17 +694,29 @@ f([], _, _) ->
%% assoc_id/1
-assoc_id(#sctp_shutdown_event{assoc_id = Id}) ->
+assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) ->
Id;
-assoc_id(#sctp_assoc_change{assoc_id = Id}) ->
+assoc_id({_, Rec}) ->
+ id(Rec).
+
+id(#sctp_shutdown_event{assoc_id = Id}) ->
+ Id;
+id(#sctp_assoc_change{assoc_id = Id}) ->
Id;
-assoc_id(#sctp_sndrcvinfo{assoc_id = Id}) ->
+id(#sctp_sndrcvinfo{assoc_id = Id}) ->
Id;
-assoc_id(#sctp_paddr_change{assoc_id = Id}) ->
+id(#sctp_paddr_change{assoc_id = Id}) ->
Id;
-assoc_id(#sctp_adaptation_event{assoc_id = Id}) ->
+id(#sctp_adaptation_event{assoc_id = Id}) ->
Id.
+%% peeloff/3
+
+peeloff(LSock, Id, TPid) ->
+ {ok, Sock} = gen_sctp:peeloff(LSock, Id),
+ ok = gen_sctp:controlling_process(Sock, TPid),
+ Sock.
+
%% connect/4
connect(_, [], _, Reasons) ->
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index f98ae84cf4..f3fbbee609 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -37,11 +37,17 @@
code_change/3,
terminate/2]).
+-export([info/1]). %% service_info callback
+
-export([ports/0,
ports/1]).
-include_lib("diameter/include/diameter.hrl").
+%% Keys into process dictionary.
+-define(INFO_KEY, info).
+-define(REF_KEY, ref).
+
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1
@@ -111,6 +117,33 @@ start_link(T) ->
diameter_lib:spawn_opts(server, [])).
%% ---------------------------------------------------------------------------
+%% # info/1
+%% ---------------------------------------------------------------------------
+
+info({Mod, Sock}) ->
+ lists:flatmap(fun(K) -> info(Mod, K, Sock) end,
+ [{socket, fun sockname/2},
+ {peer, fun peername/2},
+ {statistics, fun getstat/2}
+ | ssl_info(Mod, Sock)]).
+
+info(Mod, {K,F}, Sock) ->
+ case F(Mod, Sock) of
+ {ok, V} ->
+ [{K,V}];
+ _ ->
+ []
+ end.
+
+ssl_info(ssl = M, Sock) ->
+ [{M, ssl_info(Sock)}];
+ssl_info(_, _) ->
+ [].
+
+ssl_info(Sock) ->
+ [{peercert, C} || {ok, C} <- [ssl:peercert(Sock)]].
+
+%% ---------------------------------------------------------------------------
%% # init/1
%% ---------------------------------------------------------------------------
@@ -133,7 +166,7 @@ i({T, Ref, Mod, Pid, Opts, Addrs})
MPid ! {stop, self()}, %% tell the monitor to die
M = if SslOpts -> ssl; true -> Mod end,
setopts(M, Sock),
- putr(ref, Ref),
+ putr(?REF_KEY, Ref),
#transport{parent = Pid,
module = M,
socket = Sock,
@@ -191,7 +224,7 @@ i(accept = T, Ref, Mod, Pid, Opts, Addrs) ->
{LAddr, LSock} = listener(Ref, {Mod, Opts, Addrs}),
proc_lib:init_ack({ok, self(), [LAddr]}),
Sock = ok(accept(Mod, LSock)),
- true = diameter_reg:add_new({?MODULE, T, {Ref, Sock}}),
+ publish(Mod, T, Ref, Sock),
diameter_peer:up(Pid),
Sock;
@@ -202,10 +235,14 @@ i(connect = T, Ref, Mod, Pid, Opts, Addrs) ->
RPort = get_port(RP),
proc_lib:init_ack({ok, self(), [LAddr]}),
Sock = ok(connect(Mod, RAddr, RPort, gen_opts(LAddr, Rest))),
- true = diameter_reg:add_new({?MODULE, T, {Ref, Sock}}),
+ publish(Mod, T, Ref, Sock),
diameter_peer:up(Pid, {RAddr, RPort}),
Sock.
+publish(Mod, T, Ref, Sock) ->
+ true = diameter_reg:add_new({?MODULE, T, {Ref, Sock}}),
+ putr(?INFO_KEY, {Mod, Sock}). %% for info/1
+
ok({ok, T}) ->
T;
ok(No) ->
@@ -521,7 +558,7 @@ tls_handshake(Type, true, #transport{socket = Sock,
ssl = Opts}
= S) ->
{ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]),
- Ref = getr(ref),
+ Ref = getr(?REF_KEY),
is_reference(Ref) %% started in new code
andalso
(true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}})),
@@ -696,12 +733,32 @@ setopts(M, Sock) ->
portnr(gen_tcp, Sock) ->
inet:port(Sock);
-portnr(ssl, Sock) ->
- case ssl:sockname(Sock) of
+portnr(M, Sock) ->
+ case M:sockname(Sock) of
{ok, {_Addr, PortNr}} ->
{ok, PortNr};
{error, _} = No ->
No
- end;
-portnr(M, Sock) ->
- M:port(Sock).
+ end.
+
+%% sockname/2
+
+sockname(gen_tcp, Sock) ->
+ inet:sockname(Sock);
+sockname(M, Sock) ->
+ M:sockname(Sock).
+
+%% peername/2
+
+peername(gen_tcp, Sock) ->
+ inet:peername(Sock);
+peername(M, Sock) ->
+ M:peername(Sock).
+
+%% getstat/2
+
+getstat(gen_tcp, Sock) ->
+ inet:getstat(Sock);
+getstat(M, Sock) ->
+ M:getstat(Sock).
+%% Note that ssl:getstat/1 doesn't yet exist in R15B01.
diff --git a/lib/diameter/src/transport/diameter_transport.erl b/lib/diameter/src/transport/diameter_transport.erl
new file mode 100644
index 0000000000..ff4b6bbc6d
--- /dev/null
+++ b/lib/diameter/src/transport/diameter_transport.erl
@@ -0,0 +1,55 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2012. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(diameter_transport).
+
+%%
+%% This module implements a transport start function that
+%% evaluates its config argument.
+%%
+
+%% Transport start functions
+-export([start/3,
+ select/3,
+ eval/3]).
+
+%% start/3
+
+%% Call a start function in this module ...
+start(T, Svc, {F,A}) ->
+ start(T, Svc, {?MODULE, F, [A]});
+
+%% ... or some other.
+start(T, Svc, F) ->
+ diameter_lib:eval([F, T, Svc]).
+
+%% select/3
+%%
+%% A start function that whose config argument is expected to return a
+%% new start function.
+
+select(T, Svc, F) ->
+ start(T, Svc, diameter_lib:eval([F, T, Svc])).
+
+%% eval/3
+%%
+%% A start function that simply evaluates its config argument.
+
+eval(_, _, F) ->
+ diameter_lib:eval(F).