diff options
Diffstat (limited to 'lib/diameter/src')
-rw-r--r-- | lib/diameter/src/.gitignore | 1 | ||||
-rw-r--r-- | lib/diameter/src/Makefile | 48 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter.appup.src | 25 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter.erl | 1 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_codec.erl | 11 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_config.erl | 5 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer.erl | 141 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 176 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 198 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_stats.erl | 265 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 11 | ||||
-rw-r--r-- | lib/diameter/src/modules.mk | 1 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_etcp.erl | 22 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 129 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 79 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_transport.erl | 55 |
16 files changed, 783 insertions, 385 deletions
diff --git a/lib/diameter/src/.gitignore b/lib/diameter/src/.gitignore index feeb378fd8..cc06720fd1 100644 --- a/lib/diameter/src/.gitignore +++ b/lib/diameter/src/.gitignore @@ -1,2 +1,3 @@ /depend.mk +/otp.plt diff --git a/lib/diameter/src/Makefile b/lib/diameter/src/Makefile index dbfaa4e140..99c343275b 100644 --- a/lib/diameter/src/Makefile +++ b/lib/diameter/src/Makefile @@ -1,7 +1,7 @@ # # %CopyrightBegin% # -# Copyright Ericsson AB 2010-2011. All Rights Reserved. +# Copyright Ericsson AB 2010-2012. All Rights Reserved. # # The contents of this file are subject to the Erlang Public License, # Version 1.1, (the "License"); you may not use this file except in @@ -181,6 +181,31 @@ clean: rm -f $(TARGET_FILES) gen/* rm -f depend.mk +realclean: clean + rm -f ../ebin/* +# Not $(EBIN) just to be a bit paranoid + +PLT = ./otp.plt + +plt: + dialyzer --build_plt \ + --apps erts stdlib kernel \ + xmerl ssl public_key crypto \ + compiler syntax_tools runtime_tools \ + --output_plt $(PLT) \ + --verbose + +dialyze: opt $(PLT) + dialyzer --plt $(PLT) \ + --verbose \ + -Wno_improper_lists \ + $(EBIN)/diameter_gen_base_rfc3588.$(EMULATOR) \ + $(patsubst %, $(EBIN)/%.$(EMULATOR), \ + $(notdir $(RT_MODULES) $(CT_MODULES))) +# Omit all but the common dictionary module since these +# (diameter_gen_relay in particular) generate warning depending on how +# much of the included diameter_gen.hrl they use. + # ---------------------------------------------------- # Release targets # ---------------------------------------------------- @@ -195,27 +220,27 @@ endif release_spec: opt for d in bin ebin include src/dict; do \ - $(INSTALL_DIR) $(RELSYSDIR)/$$d; \ + $(INSTALL_DIR) "$(RELSYSDIR)/$$d"; \ done - $(INSTALL_SCRIPT) $(BINS:%=../bin/%) $(RELSYSDIR)/bin - $(INSTALL_DATA) $(TARGET_FILES) $(RELSYSDIR)/ebin + $(INSTALL_SCRIPT) $(BINS:%=../bin/%) "$(RELSYSDIR)/bin" + $(INSTALL_DATA) $(TARGET_FILES) "$(RELSYSDIR)/ebin" $(INSTALL_DATA) $(EXTERNAL_HRLS:%=../include/%) $(DICT_HRLS) \ - $(RELSYSDIR)/include - $(INSTALL_DATA) $(DICTS:%=dict/%.dia) $(RELSYSDIR)/src/dict + "$(RELSYSDIR)/include" + $(INSTALL_DATA) $(DICTS:%=dict/%.dia) "$(RELSYSDIR)/src/dict" $(MAKE) $(TARGET_DIRS:%/=release_src_%) $(MAKE) $(EXAMPLE_DIRS:%/=release_examples_%) $(TARGET_DIRS:%/=release_src_%): release_src_%: - $(INSTALL_DIR) $(RELSYSDIR)/src/$* + $(INSTALL_DIR) "$(RELSYSDIR)/src/$*" $(INSTALL_DATA) $(filter $*/%, $(TARGET_MODULES:%=%.erl) \ $(INTERNAL_HRLS)) \ $(filter $*/%, compiler/$(DICT_YRL).yrl) \ - $(RELSYSDIR)/src/$* + "$(RELSYSDIR)/src/$*" $(EXAMPLE_DIRS:%/=release_examples_%): release_examples_%: - $(INSTALL_DIR) $(RELSYSDIR)/examples/$* + $(INSTALL_DIR) "$(RELSYSDIR)/examples/$*" $(INSTALL_DATA) $(patsubst %, ../examples/%, $(filter $*/%, $(EXAMPLES))) \ - $(RELSYSDIR)/examples/$* + "$(RELSYSDIR)/examples/$*" release_docs_spec: @@ -245,10 +270,11 @@ depend.mk: depend.sed $(MODULES:%=%.erl) Makefile -include depend.mk -.PHONY: app clean depend dict info release_subdir +.PHONY: app clean realclean depend dict info release_subdir .PHONY: debug opt release_docs_spec release_spec .PHONY: $(TARGET_DIRS:%/=%) $(TARGET_DIRS:%/=release_src_%) .PHONY: $(EXAMPLE_DIRS:%/=release_examples_%) +.PHONY: plt dialyze # Keep intermediate files. .SECONDARY: $(DICT_ERLS) $(DICT_HRLS) gen/$(DICT_YRL:%=%.erl) diff --git a/lib/diameter/src/base/diameter.appup.src b/lib/diameter/src/base/diameter.appup.src index 2ebdad598f..9b2a7d18ab 100644 --- a/lib/diameter/src/base/diameter.appup.src +++ b/lib/diameter/src/base/diameter.appup.src @@ -2,7 +2,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -22,13 +22,28 @@ [ {"0.9", [{restart_application, diameter}]}, {"0.10", [{restart_application, diameter}]}, - {"1.0", [{update, diameter_service}, - {update, diameter_watchdog}]} + {"1.0", [{restart_application, diameter}]}, + {"1.1", [%% new code + {add_module, diameter_transport}, + %% modified code + {load, diameter_sctp}, + {load, diameter_stats}, + {load, diameter_service}, + {load, diameter_config}, + {load, diameter_codec}, + {load, diameter_watchdog}, + {load, diameter_peer}, + {load, diameter_peer_fsm}, + {load, diameter}, + %% unmodified but including modified diameter.hrl + {load, diameter_callback}, + {load, diameter_capx}, + {load, diameter_types}]} ], [ {"0.9", [{restart_application, diameter}]}, {"0.10", [{restart_application, diameter}]}, - {"1.0", [{update, diameter_watchdog}, - {update, diameter_service}]} + {"1.0", [{restart_application, diameter}]}, + {"1.1", [{restart_application, diameter}]} ] }. diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index 336f0c1f2d..789a5db5f0 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -312,6 +312,7 @@ call(SvcName, App, Message) -> -type transport_opt() :: {transport_module, atom()} | {transport_config, any()} + | {transport_config, any(), non_neg_integer() | infinity} | {applications, [app_alias()]} | {capabilities, [capability()]} | {capabilities_cb, evaluable()} diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl index fe1212b7e0..421e280422 100644 --- a/lib/diameter/src/base/diameter_codec.erl +++ b/lib/diameter/src/base/diameter_codec.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -63,9 +63,9 @@ encode(Mod, #diameter_packet{} = Pkt) -> e(Mod, Pkt) catch error: Reason -> - %% Be verbose rather than letting the emulator truncate the - %% error report. - X = {Reason, ?STACK}, + %% Be verbose since a crash report may be truncated and + %% encode errors are self-inflicted. + X = {?MODULE, encode, {Reason, ?STACK}}, diameter_lib:error_report(X, {?MODULE, encode, [Mod, Pkt]}), exit(X) end; @@ -91,7 +91,8 @@ e(_, #diameter_packet{msg = [#diameter_header{} = Hdr | As]} = Pkt) -> Flags = make_flags(0, Hdr), - Pkt#diameter_packet{bin = <<Vsn:8, Length:24, + Pkt#diameter_packet{header = Hdr, + bin = <<Vsn:8, Length:24, Flags:8, Code:24, Aid:32, Hid:32, diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index 9253af0de2..e47f63f814 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -519,6 +519,7 @@ rm(SvcName, L) -> Refs = lists:map(fun(#transport{ref = R}) -> R end, L), case stop_transport(SvcName, Refs) of ok -> + diameter_stats:flush(Refs), lists:foreach(fun delete_object/1, L); {error, _} = No -> No @@ -600,7 +601,7 @@ app_acc({application, Opts}, Acc) -> module = init_mod(Mod), init_state = ModS, mutable = init_mutable(M), - answer_errors = init_answers(A)} + options = [{answer_errors, init_answers(A)}]} | Acc]; app_acc(_, Acc) -> Acc. diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index 3e78c4caef..74ba709aac 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -27,12 +27,15 @@ up/2]). %% ... and the stack. --export([start/3, +-export([start/1, send/2, close/1, abort/1, notify/2]). +%% Old interface only called from old code. +-export([start/3]). %% < diameter-1.2 (R15B02) + %% Server start. -export([start_link/0]). @@ -57,6 +60,11 @@ %% Server state. -record(state, {id = now()}). +%% Default transport_module/config. +-define(DEFAULT_TMOD, diameter_tcp). +-define(DEFAULT_TCFG, []). +-define(DEFAULT_TTMO, infinity). + %%% --------------------------------------------------------------------------- %%% # notify/2 %%% --------------------------------------------------------------------------- @@ -68,9 +76,119 @@ notify(SvcName, T) -> %%% # start/3 %%% --------------------------------------------------------------------------- -start(T, Opts, #diameter_service{} = Svc) -> - {Mod, Cfg} = split_transport(Opts), - apply(Mod, start, [T, Svc, Cfg]). +%% From old code: make is restart. +start(_T, _Opts, #diameter_service{}) -> + {error, restart}. + +%%% --------------------------------------------------------------------------- +%%% # start/1 +%%% --------------------------------------------------------------------------- + +-spec start({T, [Opt], #diameter_service{}}) + -> {TPid, [Addr], Tmo, Data} + | {error, [term()]} + when T :: {connect|accept, diameter:transport_ref()}, + Opt :: diameter:transport_opt(), + TPid :: pid(), + Addr :: inet:ip_address(), + Tmo :: non_neg_integer() | infinity, + Data :: {{T, Mod, Cfg}, [Mod], [{T, [Mod], Cfg}], [Err]}, + Mod :: module(), + Cfg :: term(), + Err :: term() + ; ({#diameter_service{}, Tmo, Data}) + -> {TPid, [Addr], Tmo, Data} + | {error, [term()]} + when TPid :: pid(), + Addr :: inet:ip_address(), + Tmo :: non_neg_integer() | infinity, + Data :: {{T, Mod, Cfg}, [Mod], [{T, [Mod], Cfg}], [Err]}, + T :: {connect|accept, diameter:transport_ref()}, + Mod :: module(), + Cfg :: term(), + Err :: term(). + +%% Initial start. +start({T, Opts, #diameter_service{} = Svc}) -> + start(T, Svc, pair(Opts, [], []), []); + +%% Subsequent start. +start({#diameter_service{} = Svc, Tmo, {{T, _, Cfg}, Ms, Rest, Errs}}) -> + start(T, Ms, Cfg, Svc, Tmo, Rest, Errs). + +%% pair/3 +%% +%% Pair transport modules with config. + +%% Another transport_module: accumulate it. +pair([{transport_module, M} | Rest], Mods, Acc) -> + pair(Rest, [M|Mods], Acc); + +%% Another transport_config: accumulate another tuple. +pair([{transport_config = T, C} | Rest], Mods, Acc) -> + pair([{T, C, ?DEFAULT_TTMO} | Rest], Mods, Acc); +pair([{transport_config, C, Tmo} | Rest], Mods, Acc) -> + pair(Rest, [], acc({Mods, C, Tmo}, Acc)); + +pair([_ | Rest], Mods, Acc) -> + pair(Rest, Mods, Acc); + +%% No transport_module or transport_config: defaults. +pair([], [], []) -> + [{[?DEFAULT_TMOD], ?DEFAULT_TCFG, ?DEFAULT_TTMO}]; + +%% One transport_module, one transport_config. +pair([], [M], [{[], Cfg, Tmo}]) -> + [{[M], Cfg, Tmo}]; + +%% Trailing transport_module: default transport_config. +pair([], [_|_] = Mods, Acc) -> + lists:reverse(acc({Mods, ?DEFAULT_TCFG, ?DEFAULT_TTMO}, Acc)); + +pair([], [], Acc) -> + lists:reverse(def(Acc)). + +%% acc/2 + +acc(T, Acc) -> + [T | def(Acc)]. + +%% def/1 +%% +%% Default module of previous pair if none were specified. + +def([{[], Cfg, Tmo} | Acc]) -> + [{[?DEFAULT_TMOD], Cfg, Tmo} | Acc]; +def(Acc) -> + Acc. + +%% start/4 + +start(T, Svc, [{Ms, Cfg, Tmo} | Rest], Errs) -> + start(T, Ms, Cfg, Svc, Tmo, Rest, Errs); + +start(_, _, [], Errs) -> + {error, Errs}. + +%% start/7 + +start(T, [], _, Svc, _, Rest, Errs) -> + start(T, Svc, Rest, Errs); + +start(T, [M|Ms], Cfg, Svc, Tmo, Rest, Errs) -> + case start(M, [T, Svc, Cfg]) of + {ok, TPid} -> + {TPid, [], Tmo, {{T, M, Cfg}, Ms, Rest, Errs}}; + {ok, TPid, [_|_] = Addrs} -> + {TPid, Addrs, Tmo, {{T, M, Cfg}, Ms, Rest, Errs}}; + E -> + start(T, Ms, Cfg, Svc, Tmo, Rest, [E|Errs]) + end. + +%% start/2 + +start(Mod, Args) -> + apply(Mod, start, Args). %%% --------------------------------------------------------------------------- %%% # up/[12] @@ -204,21 +322,6 @@ bang(undefined = No, _) -> bang(Pid, T) -> Pid ! T. -%% split_transport/1 -%% -%% Split options into transport module, transport config and -%% remaining options. - -split_transport(Opts) -> - {[M,C], _} = proplists:split(Opts, [transport_module, - transport_config]), - {value(M, diameter_tcp), value(C, [])}. - -value([{_,V}], _) -> - V; -value([], V) -> - V. - %% call/1 call(Request) -> diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 99644814d2..302540e76b 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -54,6 +54,12 @@ -define(NO_INBAND_SECURITY, 0). -define(TLS, 1). +%% Keys in process dictionary. +-define(CB_KEY, cb). %% capabilities callback +-define(DWA_KEY, dwa). %% outgoing DWA +-define(Q_KEY, q). %% transport start queue +-define(START_KEY, start). %% start of connected transport + %% A 2xxx series Result-Code. Not necessarily 2001. -define(IS_SUCCESS(N), 2 == (N) div 1000). @@ -115,16 +121,20 @@ %%% Output: Pid %%% --------------------------------------------------------------------------- +-spec start(T, [Opt], #diameter_service{}) + -> pid() + when T :: {connect|accept, diameter:transport_ref()}, + Opt :: diameter:transport_opt(). + %% diameter_config requires a non-empty list of applications on the %% service but diameter_service then constrains the list to any %% specified on the transport in question. Check here that the list is %% still non-empty. -start({_, Ref} = Type, Opts, #diameter_service{applications = Apps} = Svc) -> +start({_,_} = Type, Opts, #diameter_service{applications = Apps} = Svc) -> [] /= Apps orelse ?ERROR({no_apps, Type, Opts}), T = {self(), Type, Opts, Svc}, {ok, Pid} = diameter_peer_fsm_sup:start_child(T), - diameter_stats:reg(Pid, Ref), Pid. start_link(T) -> @@ -143,18 +153,18 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc0}) -> - putr(dwa, dwa(Caps)), +i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc}) -> + putr(?DWA_KEY, dwa(Caps)), {M, Ref} = T, + diameter_stats:reg(Ref), {[Ts], Rest} = proplists:split(Opts, [capabilities_cb]), - putr(capabilities_cb, {Ref, [F || {_,F} <- Ts]}), - {ok, TPid, Svc} = start_transport(T, Rest, Svc0), - erlang:monitor(process, TPid), + putr(?CB_KEY, {Ref, [F || {_,F} <- Ts]}), erlang:monitor(process, WPid), + {TPid, Addrs} = start_transport(T, Rest, Svc), #state{parent = WPid, transport = TPid, mode = M, - service = Svc}. + service = svc(Svc, Addrs)}. %% The transport returns its local ip addresses so that different %% transports on the same service can use different local addresses. %% The local addresses are put into Host-IP-Address avps here when @@ -164,18 +174,56 @@ i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc0}) -> %% watchdog start (start/2) succeeds regardless so as not to crash the %% service. -start_transport(T, Opts, Svc) -> - case diameter_peer:start(T, Opts, Svc) of - {ok, TPid} -> - {ok, TPid, Svc}; - {ok, TPid, [_|_] = Addrs} -> - #diameter_service{capabilities = Caps0} = Svc, - Caps = Caps0#diameter_caps{host_ip_address = Addrs}, - {ok, TPid, Svc#diameter_service{capabilities = Caps}}; +start_transport(T, Opts, #diameter_service{capabilities = Caps} = Svc) -> + Addrs0 = Caps#diameter_caps.host_ip_address, + start_transport(Addrs0, {T, Opts, Svc}). + +start_transport(Addrs0, T) -> + case diameter_peer:start(T) of + {TPid, Addrs, Tmo, Data} -> + erlang:monitor(process, TPid), + q_next(TPid, Addrs0, Tmo, Data), + {TPid, addrs(Addrs, Addrs0)}; No -> exit({shutdown, No}) end. +addrs([], Addrs0) -> + Addrs0; +addrs(Addrs, _) -> + Addrs. + +svc(Svc, []) -> + Svc; +svc(Svc, Addrs) -> + readdr(Svc, Addrs). + +readdr(#diameter_service{capabilities = Caps0} = Svc, Addrs) -> + Caps = Caps0#diameter_caps{host_ip_address = Addrs}, + Svc#diameter_service{capabilities = Caps}. + +%% The 4-tuple Data returned from diameter_peer:start/1 identifies the +%% transport module/config use to start the transport process in +%% question as well as any alternates to try if a connection isn't +%% established within Tmo. +q_next(TPid, Addrs0, Tmo, {_,_,_,_} = Data) -> + send_after(Tmo, {connection_timeout, TPid}), + putr(?Q_KEY, {Addrs0, Tmo, Data}). + +%% Connection has been established: retain the started +%% pid/module/config in the process dictionary. This is a part of the +%% interface defined by this module, so that the transport pid can be +%% found when constructing service_info (in order to extract further +%% information from it). +keep_transport(TPid) -> + {_, _, {{_,_,_} = T, _, _, _}} = eraser(?Q_KEY), + putr(?START_KEY, {TPid, T}). + +send_after(infinity, _) -> + ok; +send_after(Tmo, T) -> + erlang:send_after(Tmo, self(), T). + %% handle_call/3 handle_call(_, _, State) -> @@ -202,14 +250,27 @@ handle_info(T, #state{} = State) -> ?LOG(stop, T), x(T, State) catch + exit: {diameter_codec, encode, _} = Reason -> + close_wd(Reason, State#state.parent), + ?LOG(stop, Reason), + %% diameter_codec:encode/2 emits an error report. Only + %% indicate the probable reason here. + diameter_lib:info_report(probable_configuration_error, + insufficient_capabilities), + {stop, {shutdown, Reason}, State}; {?MODULE, Tag, Reason} -> ?LOG(Tag, {Reason, T}), {stop, {shutdown, Reason}, State} end. -%% The form of the exception caught here is historical. It's +%% The form of the throw caught here is historical. It's %% significant that it's not a 2-tuple, as in ?FAILURE(Reason), %% since these are caught elsewhere. +%% Note that there's no guarantee that the service and transport +%% capabilities are good enough to build a CER/CEA that can be +%% succesfully encoded. It's not checked at diameter:add_transport/2 +%% since this can be called before creating the service. + x(Reason, #state{} = S) -> close_wd(Reason, S), {stop, {shutdown, Reason}, S}. @@ -240,25 +301,48 @@ eraser(Key) -> %% Connection to peer. transition({diameter, {TPid, connected, Remote}}, - #state{state = PS, + #state{transport = TPid, + state = PS, mode = M} = S) -> 'Wait-Conn-Ack' = PS, %% assert connect = M, %% - send_CER(S#state{mode = {M, Remote}, - transport = TPid}); + keep_transport(TPid), + send_CER(S#state{mode = {M, Remote}}); %% Connection from peer. transition({diameter, {TPid, connected}}, - #state{state = PS, + #state{transport = TPid, + state = PS, mode = M, parent = Pid} = S) -> 'Wait-Conn-Ack' = PS, %% assert accept = M, %% + keep_transport(TPid), Pid ! {accepted, self()}, - start_timer(S#state{state = recv_CER, - transport = TPid}); + start_timer(S#state{state = recv_CER}); + +%% Connection established after receiving a connection_timeout +%% message. This may be followed by an incoming message which arrived +%% before the transport was killed and this can't be distinguished +%% from one from the transport that's been started to replace it. +transition({diameter, {_, connected}}, _) -> + {stop, connection_timeout}; +transition({diameter, {_, connected, _}}, _) -> + {stop, connection_timeout}; + +%% Connection has timed out: start an alternate. +transition({connection_timeout = T, TPid}, + #state{transport = TPid, + state = 'Wait-Conn-Ack'} + = S) -> + exit(TPid, {shutdown, T}), + start_next(S); + +%% Connect timeout after connection or alternate start: ignore. +transition({connection_timeout, _}, _) -> + ok; %% Incoming message from the transport. transition({diameter, {recv, Pkt}}, S) -> @@ -305,14 +389,21 @@ transition({resolve_port, _Pid} = T, #state{transport = TPid}) -> TPid ! T, ok; -%% Parent or transport has died. -transition({'DOWN', _, process, P, _}, - #state{parent = Pid, - transport = TPid}) - when P == Pid; - P == TPid -> +%% Parent has died. +transition({'DOWN', _, process, WPid, _}, + #state{parent = WPid}) -> stop; +%% Transport has died before connection timeout. +transition({'DOWN', _, process, TPid, _}, + #state{transport = TPid} + = S) -> + start_next(S); + +%% Transport has died after connection timeout. +transition({'DOWN', _, process, _, _}, _) -> + ok; + %% State query. transition({state, Pid}, #state{state = S, transport = TPid}) -> Pid ! {self(), [S, TPid]}, @@ -320,6 +411,19 @@ transition({state, Pid}, #state{state = S, transport = TPid}) -> %% Crash on anything unexpected. +%% start_next/1 + +start_next(#state{service = Svc0} = S) -> + case getr(?Q_KEY) of + {Addrs0, Tmo, Data} -> + Svc = readdr(Svc0, Addrs0), + {TPid, Addrs} = start_transport(Addrs0, {Svc, Tmo, Data}), + S#state{transport = TPid, + service = svc(Svc, Addrs)}; + undefined -> + stop + end. + %% send_CER/1 send_CER(#state{mode = {connect, Remote}, @@ -649,7 +753,7 @@ rc([RC|_]) -> %% answer/2 answer('DWR', _) -> - getr(dwa); + getr(?DWA_KEY); answer(Name, #state{service = #diameter_service{capabilities = Caps}}) -> a(Name, Caps). @@ -749,15 +853,15 @@ caps(#state{service = Svc}) -> %% caps_cb/1 caps_cb(Caps) -> - {Ref, Ts} = eraser(capabilities_cb), - ccb(Ts, [Ref, Caps]). + {Ref, Ts} = eraser(?CB_KEY), + caps_cb(Ts, [Ref, Caps]). -ccb([], _) -> +caps_cb([], _) -> ok; -ccb([F | Rest], T) -> +caps_cb([F | Rest], T) -> case diameter_lib:eval([F|T]) of ok -> - ccb(Rest, T); + caps_cb(Rest, T); N when ?IS_SUCCESS(N) -> %% 2xxx result code: accept immediately N; Res -> diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 18460a113f..54594db292 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -43,8 +43,7 @@ subscriptions/0, services/0, services/1, - whois/1, - flush_stats/1]). + whois/1]). %% test/debug -export([call_module/3, @@ -90,6 +89,8 @@ -define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests -define(RESTART_TC, 1000). %% if restart was this recent +-define(RELAY, ?DIAMETER_DICT_RELAY). + %% Used to be able to swap this with anything else dict-like but now %% rely on the fact that a service's #state{} record does not change %% in storing in it ?STATE table and not always going through the @@ -407,15 +408,6 @@ whois(SvcName) -> undefined end. -%%% --------------------------------------------------------------------------- -%%% # flush_stats/1 -%%% -%%% Output: list of {{SvcName, Alias, Counter}, Value} -%%% --------------------------------------------------------------------------- - -flush_stats(TPid) -> - diameter_stats:flush(TPid). - %% =========================================================================== %% =========================================================================== @@ -999,27 +991,35 @@ ilp({Id, Alias}, {TC, SA}, LDict) -> init_conn(Id, Alias, TC, SA), ?Dict:append(Alias, TC, LDict). -init_conn(Id, Alias, TC, {SvcName, Apps}) -> +init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) -> #diameter_app{module = ModX, id = Id} %% assert = find_app(Alias, Apps), - peer_cb({ModX, peer_up, [SvcName, TC]}, Alias). + peer_cb({ModX, peer_up, [SvcName, TC]}, Alias) + orelse exit(TPid, kill). %% fake transport failure + +%% find_app/2 find_app(Alias, Apps) -> - lists:keyfind(Alias, #diameter_app.alias, Apps). + case lists:keyfind(Alias, #diameter_app.alias, Apps) of + #diameter_app{options = E} = A when is_atom(E) -> %% upgrade + A#diameter_app{options = [{answer_errors, E}]}; + A -> + A + end. -%% A failing peer callback brings down the service. In the case of -%% peer_up we could just kill the transport and emit an error but for -%% peer_down we have no way to cleanup any state change that peer_up -%% may have introduced. +%% Don't bring down the service (and all associated connections) +%% regardless of what happens. peer_cb(MFA, Alias) -> try state_cb(MFA, Alias) of ModS -> - mod_state(Alias, ModS) + mod_state(Alias, ModS), + true catch - E: Reason -> - ?ERROR({E, Reason, MFA, ?STACK}) + E:R -> + diameter_lib:error_report({failure, {E, R, Alias, ?STACK}}, MFA), + false end. %%% --------------------------------------------------------------------------- @@ -1406,7 +1406,7 @@ send_request(Pkt, TPid, Caps, App, Opts, Caller, SvcName) -> #diameter_app{alias = Alias, dictionary = Dict, module = ModX, - answer_errors = AE} + options = [{answer_errors, AE} | _]} = App, EPkt = encode(Dict, Pkt), @@ -1989,6 +1989,12 @@ is_loop(Code, Vid, OH, Avps) -> %% %% Send a locally originating reply. +%% Skip the setting of Result-Code and Failed-AVP's below. +reply([Msg], Dict, TPid, Pkt) + when is_list(Msg); + is_tuple(Msg) -> + reply(Msg, Dict, TPid, Pkt#diameter_packet{errors = []}); + %% No errors or a diameter_header/avp list. reply(Msg, Dict, TPid, #diameter_packet{errors = Es, transport_data = TD} @@ -1996,7 +2002,7 @@ reply(Msg, Dict, TPid, #diameter_packet{errors = Es, when [] == Es; is_record(hd(Msg), diameter_header) -> Pkt = diameter_codec:encode(Dict, make_answer_packet(Msg, ReqPkt)), - incr(send, Pkt, TPid), %% count result codes in sent answers + incr(send, Pkt, Dict, TPid), %% count result codes in sent answers send(TPid, Pkt#diameter_packet{transport_data = TD}); %% Or not: set Result-Code and Failed-AVP AVP's. @@ -2037,7 +2043,10 @@ rc(RC) -> rc(Rec, RC, Failed, Dict) when is_integer(RC) -> - set(Rec, [{'Result-Code', RC} | failed_avp(Rec, Failed, Dict)], Dict). + set(Rec, + lists:append([rc(Rec, {'Result-Code', RC}, Dict), + failed_avp(Rec, Failed, Dict)]), + Dict). %% Reply as name and tuple list ... set([_|_] = Ans, Avps, _) -> @@ -2047,6 +2056,22 @@ set([_|_] = Ans, Avps, _) -> set(Rec, Avps, Dict) -> Dict:'#set-'(Avps, Rec). +%% rc/3 +%% +%% Turn the result code into a list if its optional and only set it if +%% the arity is 1 or {0,1}. In other cases (which probably shouldn't +%% exist in practise) we can't know what's appropriate. + +rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) -> + case Dict:avp_arity(MsgName, 'Result-Code') of + 1 -> [T]; + {0,1} -> [{K, [RC]}]; + _ -> [] + end; + +rc(Rec, T, Dict) -> + rc([Dict:rec2msg(element(1, Rec))], T, Dict). + %% failed_avp/3 failed_avp(_, [] = No, _) -> @@ -2254,44 +2279,39 @@ handle_answer(SvcName, _, {error, Req, Reason}) -> handle_answer(SvcName, AnswerErrors, {answer, #request{dictionary = Dict} = Req, Pkt}) -> - a(examine(diameter_codec:decode(Dict, Pkt)), - SvcName, - AnswerErrors, - Req). + answer(examine(diameter_codec:decode(Dict, Pkt)), + SvcName, + AnswerErrors, + Req). %% We don't really need to do a full decode if we're a relay and will %% just resend with a new hop by hop identifier, but might a proxy %% want to examine the answer? -a(#diameter_packet{errors = []} - = Pkt, - SvcName, - AE, - #request{transport = TPid, - caps = Caps, - packet = P} - = Req) -> +answer(Pkt, SvcName, AE, #request{transport = TPid, + dictionary = Dict} + = Req) -> try - incr(in, Pkt, TPid) + incr(recv, Pkt, Dict, TPid) of - _ -> - cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]) + _ -> a(Pkt, SvcName, AE, Req) catch exit: {invalid_error_bit, _} = E -> - e(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req) - end; - -a(#diameter_packet{} = Pkt, SvcName, AE, Req) -> - e(Pkt, SvcName, AE, Req). + a(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req) + end. -e(Pkt, SvcName, callback, #request{transport = TPid, - caps = Caps, - packet = Pkt} - = Req) -> - cb(Req, handle_answer, [Pkt, msg(Pkt), SvcName, {TPid, Caps}]); -e(Pkt, SvcName, report, Req) -> +a(#diameter_packet{errors = Es} = Pkt, SvcName, AE, #request{transport = TPid, + caps = Caps, + packet = P} + = Req) + when [] == Es; + callback == AE -> + cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]); + +a(Pkt, SvcName, report, Req) -> x(errors, handle_answer, [SvcName, Req, Pkt]); -e(Pkt, SvcName, discard, Req) -> + +a(Pkt, SvcName, discard, Req) -> x({errors, handle_answer, [SvcName, Req, Pkt]}). %% Note that we don't check that the application id in the answer's @@ -2303,17 +2323,19 @@ e(Pkt, SvcName, discard, Req) -> %% Increment a stats counter for an incoming or outgoing message. %% TODO: fix -incr(_, #diameter_packet{msg = undefined}, _) -> +incr(_, #diameter_packet{msg = undefined}, _, _) -> ok; -incr(Dir, Pkt, TPid) - when is_pid(TPid) -> +incr(recv = D, #diameter_packet{header = H, errors = [_|_]}, _, TPid) -> + incr(TPid, {diameter_codec:msg_id(H), D, error}); + +incr(Dir, Pkt, Dict, TPid) -> #diameter_packet{header = #diameter_header{is_error = E} = Hdr, msg = Rec} = Pkt, - RC = int(get_avp_value(?BASE, 'Result-Code', Rec)), + RC = int(get_avp_value(Dict, 'Result-Code', Rec)), PE = is_protocol_error(RC), %% Check that the E bit is set only for 3xxx result codes. @@ -2321,15 +2343,21 @@ incr(Dir, Pkt, TPid) orelse (E andalso PE) orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]), - Ctr = rc_counter(Rec, RC), - is_tuple(Ctr) - andalso incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}). + irc(TPid, Hdr, Dir, rc_counter(Dict, Rec, RC)). + +irc(_, _, _, undefined) -> + false; + +irc(TPid, Hdr, Dir, Ctr) -> + incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}). %% incr/2 incr(TPid, Counter) -> diameter_stats:incr(Counter, TPid, 1). +%% error_counter/2 + %% RFC 3588, 7.6: %% %% All Diameter answer messages defined in vendor-specific @@ -2339,26 +2367,27 @@ incr(TPid, Counter) -> %% Maintain statistics assuming one or the other, not both, which is %% surely the intent of the RFC. -rc_counter(_, RC) - when is_integer(RC) -> - {'Result-Code', RC}; -rc_counter(Rec, _) -> - rcc(get_avp_value(?BASE, 'Experimental-Result', Rec)). +rc_counter(Dict, Rec, undefined) -> + er(get_avp_value(Dict, 'Experimental-Result', Rec)); +rc_counter(_, _, RC) -> + {'Result-Code', RC}. %% Outgoing answers may be in any of the forms messages can be sent %% in. Incoming messages will be records. We're assuming here that the %% arity of the result code AVP's is 0 or 1. -rcc([{_,_,RC} = T]) - when is_integer(RC) -> +er([{_,_,N} = T | _]) + when is_integer(N) -> T; -rcc({_,_,RC} = T) - when is_integer(RC) -> +er({_,_,N} = T) + when is_integer(N) -> T; -rcc(_) -> +er(_) -> undefined. -int([N]) +%% Extract the first good looking integer. There's no guarantee +%% that what we're looking for has arity 1. +int([N|_]) when is_integer(N) -> N; int(N) @@ -2403,8 +2432,11 @@ rt(#request{packet = #diameter_packet{msg = undefined}}, _) -> false; %% TODO: Not what we should do. %% ... or not. -rt(#request{packet = #diameter_packet{msg = Msg}} = Req, S) -> - find_transport(get_destination(Msg), Req, S). +rt(#request{packet = #diameter_packet{msg = Msg}, + dictionary = Dict} + = Req, + S) -> + find_transport(get_destination(Dict, Msg), Req, S). %%% --------------------------------------------------------------------------- %%% # report_status/5 @@ -2516,12 +2548,12 @@ find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) -> find_transport(#diameter_app{} = App, Msg, Opts, S) -> ft(App, Msg, Opts, S). -ft(#diameter_app{module = Mod} = App, Msg, Opts, S) -> +ft(#diameter_app{module = Mod, dictionary = Dict} = App, Msg, Opts, S) -> #options{filter = Filter, extra = Xtra} = Opts, pick_peer(App#diameter_app{module = Mod ++ Xtra}, - get_destination(Msg), + get_destination(Dict, Msg), Filter, S); ft(false = No, _, _, _) -> @@ -2557,11 +2589,11 @@ find_transport([_,_] = RH, Filter, S). -%% get_destination/1 +%% get_destination/2 -get_destination(Msg) -> - [str(get_avp_value(?BASE, 'Destination-Realm', Msg)), - str(get_avp_value(?BASE, 'Destination-Host', Msg))]. +get_destination(Dict, Msg) -> + [str(get_avp_value(Dict, 'Destination-Realm', Msg)), + str(get_avp_value(Dict, 'Destination-Host', Msg))]. %% This is not entirely correct. The avp could have an arity 1, in %% which case an empty list is a DiameterIdentity of length 0 rather @@ -2585,6 +2617,9 @@ str(T) -> %% question. The third form allows messages to be sent as is, without %% a dictionary, which is needed in the case of relay agents, for one. +get_avp_value(?RELAY, Name, Msg) -> + get_avp_value(?BASE, Name, Msg); + get_avp_value(Dict, Name, [#diameter_header{} | Avps]) -> try {Code, _, VId} = Dict:avp_header(Name), @@ -2905,11 +2940,10 @@ complete(Pre) -> %% info_stats/1 info_stats(#state{peerT = PeerT}) -> - Peers = ets:select(PeerT, [{#peer{ref = '$1', conn = '$2', _ = '_'}, - [{'is_pid', '$2'}], - [['$1', '$2']]}]), - diameter_stats:read(lists:append(Peers)). -%% TODO: include peer identities in return value + MatchSpec = [{#peer{ref = '$1', conn = '$2', _ = '_'}, + [{'is_pid', '$2'}], + [['$1', '$2']]}], + diameter_stats:read(lists:append(ets:select(PeerT, MatchSpec))). %% info_transport/1 %% diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl index 71479afa95..70727d068e 100644 --- a/lib/diameter/src/base/diameter_stats.erl +++ b/lib/diameter/src/base/diameter_stats.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -22,14 +22,13 @@ %% -module(diameter_stats). --compile({no_auto_import, [monitor/2]}). -behaviour(gen_server). --export([reg/1, reg/2, - incr/1, incr/2, incr/3, +-export([reg/2, reg/1, + incr/3, incr/1, read/1, - flush/0, flush/1]). + flush/1]). %% supervisor callback -export([start_link/0]). @@ -48,123 +47,105 @@ -include("diameter_internal.hrl"). -%% ets table containing stats. reg(Pid, Ref) inserts a {Pid, Ref}, -%% incr(Counter, X, N) updates the counter keyed at {Counter, X}, and -%% Pid death causes counters keyed on {Counter, Pid} to be deleted and -%% added to those keyed on {Counter, Ref}. +%% ets table containing 2-tuple stats. reg(Pid, Ref) inserts a {Pid, +%% Ref}, incr(Counter, X, N) updates the counter keyed at {Counter, +%% X}, and Pid death causes counters keyed on {Counter, Pid} to be +%% deleted and added to those keyed on {Counter, Ref}. -define(TABLE, ?MODULE). %% Name of registered server. -define(SERVER, ?MODULE). -%% Entries in the table. --define(REC(Key, Value), {Key, Value}). - %% Server state. -record(state, {id = now()}). -type counter() :: any(). --type contrib() :: any(). - -%%% --------------------------------------------------------------------------- -%%% # reg(Pid, Contrib) -%%% -%%% Description: Register a process as a contributor of statistics -%%% associated with a specified term. Statistics can be -%%% contributed by specifying either Pid or Contrib as -%%% the second argument to incr/3. Statistics contributed -%%% by Pid are folded into the corresponding entry for -%%% Contrib when the process dies. -%%% -%%% Contrib can be any term but should not be a pid -%%% passed as the first argument to reg/2. Subsequent -%%% registrations for the same Pid overwrite the association -%%% --------------------------------------------------------------------------- - --spec reg(pid(), contrib()) - -> true. +-type ref() :: any(). + +%% --------------------------------------------------------------------------- +%% # reg(Pid, Ref) +%% +%% Register a process as a contributor of statistics associated with a +%% specified term. Statistics can be contributed by specifying either +%% Pid or Ref as the second argument to incr/3. Statistics contributed +%% by Pid are folded into the corresponding entry for Ref when the +%% process dies. +%% --------------------------------------------------------------------------- + +-spec reg(pid(), ref()) + -> boolean(). -reg(Pid, Contrib) +reg(Pid, Ref) when is_pid(Pid) -> - call({reg, Pid, Contrib}). + call({reg, Pid, Ref}). --spec reg(contrib()) +-spec reg(ref()) -> true. reg(Ref) -> reg(self(), Ref). -%%% --------------------------------------------------------------------------- -%%% # incr(Counter, Contrib, N) -%%% -%%% Description: Increment a counter for the specified contributor. -%%% -%%% Contrib will typically be an argument passed to reg/2 -%%% but there's nothing that requires this. In particular, -%%% if Contrib is a pid that hasn't been registered then -%%% counters are unaffected by the death of the process. -%%% --------------------------------------------------------------------------- - --spec incr(counter(), contrib(), integer()) - -> integer(). +%% --------------------------------------------------------------------------- +%% # incr(Counter, Ref, N) +%% +%% Increment a counter for the specified contributor. +%% +%% Ref will typically be an argument passed to reg/2 but there's +%% nothing that requires this. Only registered pids can contribute +%% counters however, otherwise incr/3 is a no-op. +%% --------------------------------------------------------------------------- -incr(Ctr, Contrib, N) -> - update_counter({Ctr, Contrib}, N). +-spec incr(counter(), ref(), integer()) + -> integer() | false. -incr(Ctr, N) +incr(Ctr, Ref, N) when is_integer(N) -> - incr(Ctr, self(), N); - -incr(Ctr, Contrib) -> - incr(Ctr, Contrib, 1). + update_counter({Ctr, Ref}, N). incr(Ctr) -> incr(Ctr, self(), 1). -%%% --------------------------------------------------------------------------- -%%% # read(Contribs) -%%% -%%% Description: Retrieve counters for the specified contributors. -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # read(Refs) +%% +%% Retrieve counters for the specified contributors. +%% --------------------------------------------------------------------------- --spec read([contrib()]) - -> [{contrib(), [{counter(), integer()}]}]. +-spec read([ref()]) + -> [{ref(), [{counter(), integer()}]}]. -read(Contribs) -> - lists:foldl(fun(?REC({T,C}, N), D) -> orddict:append(C, {T,N}, D) end, +read(Refs) -> + read(Refs, false). + +read(Refs, B) -> + MatchSpec = [{{{'_', '$1'}, '_'}, + [?ORCOND([{'=:=', '$1', {const, R}} + || R <- Refs])], + ['$_']}], + L = ets:select(?TABLE, MatchSpec), + B andalso delete(L), + lists:foldl(fun({{C,R}, N}, D) -> orddict:append(R, {C,N}, D) end, orddict:new(), - ets:select(?TABLE, [{?REC({'_', '$1'}, '_'), - [?ORCOND([{'=:=', '$1', {const, C}} - || C <- Contribs])], - ['$_']}])). - -%%% --------------------------------------------------------------------------- -%%% # flush(Contrib) -%%% -%%% Description: Retrieve and delete statistics for the specified -%%% contributor. -%%% -%%% If Contrib is a pid registered with reg/2 then statistics -%%% for both and its associated contributor are retrieved. -%%% --------------------------------------------------------------------------- - --spec flush(contrib()) - -> [{counter(), integer()}]. + L). + +%% --------------------------------------------------------------------------- +%% # flush(Refs) +%% +%% Retrieve and delete statistics for the specified contributors. +%% --------------------------------------------------------------------------- + +-spec flush([ref()]) + -> [{ref(), {counter(), integer()}}]. -flush(Contrib) -> +flush(Refs) -> try - call({flush, Contrib}) + call({flush, Refs}) catch exit: _ -> [] end. -flush() -> - flush(self()). - -%%% --------------------------------------------------------- -%%% EXPORTED INTERNAL FUNCTIONS -%%% --------------------------------------------------------- +%% =========================================================================== start_link() -> ServerName = {local, ?SERVER}, @@ -179,18 +160,16 @@ state() -> uptime() -> call(uptime). -%%% ---------------------------------------------------------- -%%% # init(_) -%%% -%%% Output: {ok, State} -%%% ---------------------------------------------------------- +%% ---------------------------------------------------------- +%% # init/1 +%% ---------------------------------------------------------- init([]) -> ets:new(?TABLE, [named_table, ordered_set, public]), {ok, #state{}}. %% ---------------------------------------------------------- -%% handle_call(Request, From, State) +%% # handle_call/3 %% ---------------------------------------------------------- handle_call(state, _, State) -> @@ -199,31 +178,31 @@ handle_call(state, _, State) -> handle_call(uptime, _, #state{id = Time} = State) -> {reply, diameter_lib:now_diff(Time), State}; -handle_call({reg, Pid, Contrib}, _From, State) -> - monitor(not ets:member(?TABLE, Pid), Pid), - {reply, insert(?REC(Pid, Contrib)), State}; +handle_call({incr, T}, _, State) -> + {reply, update_counter(T), State}; -handle_call({flush, Contrib}, _From, State) -> - {reply, fetch(Contrib), State}; +handle_call({reg, Pid, Ref}, _From, State) -> + B = ets:insert_new(?TABLE, {Pid, Ref}), + B andalso erlang:monitor(process, Pid), + {reply, B, State}; + +handle_call({flush, Refs}, _From, State) -> + {reply, read(Refs, true), State}; handle_call(Req, From, State) -> ?UNEXPECTED([Req, From]), {reply, nok, State}. %% ---------------------------------------------------------- -%% handle_cast(Request, State) +%% # handle_cast/2 %% ---------------------------------------------------------- -handle_cast({incr, Rec}, State) -> - update_counter(Rec), - {noreply, State}; - handle_cast(Msg, State) -> ?UNEXPECTED([Msg]), {noreply, State}. %% ---------------------------------------------------------- -%% handle_info(Request, State) +%% # handle_info/2 %% ---------------------------------------------------------- handle_info({'DOWN', _MRef, process, Pid, _}, State) -> @@ -235,91 +214,62 @@ handle_info(Info, State) -> {noreply, State}. %% ---------------------------------------------------------- -%% terminate(Reason, State) +%% # terminate/2 %% ---------------------------------------------------------- terminate(_Reason, _State) -> ok. %% ---------------------------------------------------------- -%% code_change(OldVsn, State, Extra) +%% # code_change/3 %% ---------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%% --------------------------------------------------------- -%%% INTERNAL FUNCTIONS -%%% --------------------------------------------------------- - -%% monitor/2 - -monitor(true, Pid) -> - erlang:monitor(process, Pid); -monitor(false = No, _) -> - No. +%% =========================================================================== %% down/1 down(Pid) -> - L = ets:match_object(?TABLE, ?REC({'_', Pid}, '_')), - [?REC(_, Ref) = T] = lookup(Pid), + down(lookup(Pid), ets:match_object(?TABLE, {{'_', Pid}, '_'})). + +down([{_, Ref} = T], L) -> fold(Ref, L), - delete_object(T), + delete([T|L]); +down([], L) -> %% flushed delete(L). -%% Fold Pid-based entries into Ref-based ones. +%% Fold pid-based entries into ref-based ones. fold(Ref, L) -> - lists:foreach(fun(?REC({K, _}, V)) -> update_counter({{K, Ref}, V}) end, - L). - -delete(Objs) -> - lists:foreach(fun delete_object/1, Objs). - -%% fetch/1 - -fetch(X) -> - MatchSpec = [{?REC({'_', '$1'}, '_'), - [?ORCOND([{'==', '$1', {const, T}} || T <- [X | ref(X)]])], - ['$_']}], - L = ets:select(?TABLE, MatchSpec), - delete(L), - D = lists:foldl(fun sum/2, dict:new(), L), - dict:to_list(D). - -sum({{Ctr, _}, N}, Dict) -> - dict:update(Ctr, fun(V) -> V+N end, N, Dict). - -ref(Pid) - when is_pid(Pid) -> - ets:select(?TABLE, [{?REC(Pid, '$1'), [], ['$1']}]); -ref(_) -> - []. + lists:foreach(fun({{K, _}, V}) -> update_counter({{K, Ref}, V}) end, L). %% update_counter/2 %% -%% From an arbitrary request process. Cast to the server process to -%% insert a new element if the counter doesn't exists so that two -%% processes don't do so simultaneously. +%% From an arbitrary process. Call to the server process to insert a +%% new element if the counter doesn't exists so that two processes +%% don't insert simultaneously. update_counter(Key, N) -> try ets:update_counter(?TABLE, Key, N) catch error: badarg -> - cast({incr, ?REC(Key, N)}) + call({incr, {Key, N}}) end. %% update_counter/1 %% -%% From the server process. +%% From the server process, when update_counter/2 failed due to a +%% non-existent entry. -update_counter(?REC(Key, N) = T) -> +update_counter({{_Ctr, Ref} = Key, N} = T) -> try ets:update_counter(?TABLE, Key, N) catch error: badarg -> - insert(T) + (not is_pid(Ref) orelse ets:member(?TABLE, Ref)) + andalso begin insert(T), N end end. insert(T) -> @@ -328,13 +278,8 @@ insert(T) -> lookup(Key) -> ets:lookup(?TABLE, Key). -delete_object(T) -> - ets:delete_object(?TABLE, T). - -%% cast/1 - -cast(Msg) -> - gen_server:cast(?SERVER, Msg). +delete(Objs) -> + lists:foreach(fun({K,_}) -> ets:delete(?TABLE, K) end, Objs). %% call/1 diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index b615bed860..d7474e5c56 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -64,13 +64,12 @@ %% that a failed capabilities exchange produces the desired exit %% reason. --spec start(Type, {RecvData, Opts, SvcName, Svc}) - -> pid() - when Type :: {connect|accept, reference()}, +-spec start(Type, {RecvData, [Opt], SvcName, #diameter_service{}}) + -> {reference(), pid()} + when Type :: {connect|accept, diameter:transport_ref()}, RecvData :: term(), - Opts :: list(), - SvcName :: term(), - Svc :: #diameter_service{}. + Opt :: diameter:transport_opt(), + SvcName :: diameter:service_name(). start({_,_} = Type, T) -> Ref = make_ref(), diff --git a/lib/diameter/src/modules.mk b/lib/diameter/src/modules.mk index 7a700a6d53..5d3c4157ae 100644 --- a/lib/diameter/src/modules.mk +++ b/lib/diameter/src/modules.mk @@ -58,6 +58,7 @@ RT_MODULES = \ transport/diameter_tcp_sup \ transport/diameter_sctp \ transport/diameter_sctp_sup \ + transport/diameter_transport \ transport/diameter_transport_sup # Handwritten (compile time) modules not included in the app file. diff --git a/lib/diameter/src/transport/diameter_etcp.erl b/lib/diameter/src/transport/diameter_etcp.erl index d925d62545..cd62cf34fa 100644 --- a/lib/diameter/src/transport/diameter_etcp.erl +++ b/lib/diameter/src/transport/diameter_etcp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -36,7 +36,9 @@ send/2, close/1, setopts/2, - port/1]). + sockname/1, + peername/1, + getstat/1]). %% child start -export([start_link/1]). @@ -113,10 +115,20 @@ close(Pid) -> setopts(_, _) -> ok. -%% port/1 +%% sockname/1 -port(_) -> - 3868. %% We have no local port: fake it. +sockname(_) -> + {error, ?MODULE}. + +%% peername/1 + +peername(_) -> + {error, ?MODULE}. + +%% getstat/1 + +getstat(_) -> + {error, ?MODULE}. %% start_link/1 diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 68b0342cd5..79b8b851fb 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -37,12 +37,18 @@ code_change/3, terminate/2]). +-export([info/1]). %% service_info callback + -export([ports/0, ports/1]). -include_lib("kernel/include/inet_sctp.hrl"). -include_lib("diameter/include/diameter.hrl"). +%% Keys into process dictionary. +-define(INFO_KEY, info). +-define(REF_KEY, ref). + -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). %% The default port for a listener. @@ -62,6 +68,7 @@ -record(transport, {parent :: pid(), mode :: {accept, pid()} + | accept | {connect, {list(inet:ip_address()), uint(), list()}} %% {RAs, RP, Errors} | connect, @@ -134,6 +141,24 @@ start_link(T) -> diameter_lib:spawn_opts(server, [])). %% --------------------------------------------------------------------------- +%% # info/1 +%% --------------------------------------------------------------------------- + +info({gen_sctp, Sock}) -> + lists:flatmap(fun(K) -> info(K, Sock) end, + [{socket, sockname}, + {peer, peername}, + {statistics, getstat}]). + +info({K,F}, Sock) -> + case inet:F(Sock) of + {ok, V} -> + [{K,V}]; + _ -> + [] + end. + +%% --------------------------------------------------------------------------- %% # init/1 %% --------------------------------------------------------------------------- @@ -157,7 +182,7 @@ i({connect, Pid, Opts, Addrs, Ref}) -> RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As], [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], {LAs, Sock} = open(Addrs, Rest, 0), - putr(ref, Ref), + putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self(), LAs}), erlang:monitor(process, Pid), #transport{parent = Pid, @@ -169,7 +194,7 @@ i({connect, _, _, _} = T) -> %% from old code %% An accepting transport spawned by diameter. i({accept, Pid, LPid, Sock, Ref}) when is_pid(Pid) -> - putr(ref, Ref), + putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), erlang:monitor(process, Pid), erlang:monitor(process, LPid), @@ -181,7 +206,7 @@ i({accept, _, _, _} = T) -> %% from old code %% An accepting transport spawned at association establishment. i({accept, Ref, LPid, Sock, Id}) -> - putr(ref, Ref), + putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), MRef = erlang:monitor(process, LPid), %% Wait for a signal that the transport has been started before @@ -325,6 +350,11 @@ terminate(_, #transport{assoc_id = undefined}) -> ok; terminate(_, #transport{socket = Sock, + mode = accept, + assoc_id = Id}) -> + close(Sock, Id); + +terminate(_, #transport{socket = Sock, mode = {accept, _}, assoc_id = Id}) -> close(Sock, Id); @@ -356,13 +386,16 @@ start_timer(S) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) -> - setopts(Sock), - case find(Data, S) of + Id = assoc_id(Data), + + try find(Id, Data, S) of {TPid, NewS} -> - TPid ! Msg, + TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg}, NewS; false -> S + after + setopts(Sock) end; %% Transport is asking message to be sent. See send/3 for why the send @@ -430,15 +463,19 @@ t(T,S) -> %% transition/2 -%% Incoming message. -transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock, - mode = {accept, _}} - = S) -> - recv(Data, S); +%% Listening process is transfering ownership of an association. +transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg}, + #transport{mode = {accept, _}, + socket = LSock} + = S) -> + transition(Msg, S#transport{socket = Sock}); -transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> +%% Incoming message. +transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> setopts(Sock), recv(Data, S); +%% Don't match on Sock since in R15B01 it can be the listening socket +%% in the (peeled-off) accept case, which is likely a bug. %% Outgoing message. transition({diameter, {send, Msg}}, S) -> @@ -456,13 +493,18 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> stop; +%% Parent process has died. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> + stop; + %% Listener process has died. transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) -> stop; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> - stop; +%% Ditto but we have ownership of the association. It might be that +%% we'll go down anyway though. +transition({'DOWN', _, process, _Pid, _}, #transport{mode = accept}) -> + ok; %% Request for the local port number. transition({resolve_port, Pid}, #transport{socket = Sock}) @@ -521,14 +563,6 @@ send(Bin, #transport{streams = {_, OS}, %% send/3 -%% Messages have to be sent from the controlling process, which is -%% probably a bug. Sending from here causes an inet_reply, Sock, -%% Status} message to be sent to the controlling process while -%% gen_sctp:send/4 here hangs. -send(StreamId, Bin, #transport{assoc_id = AId, - mode = {accept, LPid}}) -> - LPid ! {send, AId, StreamId, Bin}; - send(StreamId, Bin, #transport{socket = Sock, assoc_id = AId}) -> send(Sock, AId, StreamId, Bin). @@ -554,10 +588,9 @@ recv({_, #sctp_assoc_change{state = comm_up, mode = {T, _}, socket = Sock} = S) -> - Ref = getr(ref), + Ref = getr(?REF_KEY), is_reference(Ref) %% started in new code - andalso - (true = diameter_reg:add_new({?MODULE, T, {Ref, {Id, Sock}}})), + andalso publish(T, Ref, Id, Sock), up(S#transport{assoc_id = Id, streams = {IS, OS}}); @@ -599,6 +632,10 @@ recv({_, #sctp_paddr_change{}}, _) -> recv({_, #sctp_pdapi_event{}}, _) -> ok. +publish(T, Ref, Id, Sock) -> + true = diameter_reg:add_new({?MODULE, T, {Ref, {Id, Sock}}}), + putr(?INFO_KEY, {gen_sctp, Sock}). %% for info/1 + %% up/1 up(#transport{parent = Pid, @@ -608,21 +645,15 @@ up(#transport{parent = Pid, S#transport{mode = C}; up(#transport{parent = Pid, - mode = {accept, _}} + mode = {accept = A, _}} = S) -> diameter_peer:up(Pid), - S. + S#transport{mode = A}. -%% find/2 - -find({[#sctp_sndrcvinfo{assoc_id = Id}], _} - = Data, - #listener{tmap = T} - = S) -> - f(ets:lookup(T, Id), Data, S); +%% find/3 -find({_, Rec} = Data, #listener{tmap = T} = S) -> - f(ets:lookup(T, assoc_id(Rec)), Data, S). +find(Id, Data, #listener{tmap = T} = S) -> + f(ets:lookup(T, Id), Data, S). %% New association and a transport waiting for one: use it. f([], @@ -663,17 +694,29 @@ f([], _, _) -> %% assoc_id/1 -assoc_id(#sctp_shutdown_event{assoc_id = Id}) -> +assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) -> Id; -assoc_id(#sctp_assoc_change{assoc_id = Id}) -> +assoc_id({_, Rec}) -> + id(Rec). + +id(#sctp_shutdown_event{assoc_id = Id}) -> + Id; +id(#sctp_assoc_change{assoc_id = Id}) -> Id; -assoc_id(#sctp_sndrcvinfo{assoc_id = Id}) -> +id(#sctp_sndrcvinfo{assoc_id = Id}) -> Id; -assoc_id(#sctp_paddr_change{assoc_id = Id}) -> +id(#sctp_paddr_change{assoc_id = Id}) -> Id; -assoc_id(#sctp_adaptation_event{assoc_id = Id}) -> +id(#sctp_adaptation_event{assoc_id = Id}) -> Id. +%% peeloff/3 + +peeloff(LSock, Id, TPid) -> + {ok, Sock} = gen_sctp:peeloff(LSock, Id), + ok = gen_sctp:controlling_process(Sock, TPid), + Sock. + %% connect/4 connect(_, [], _, Reasons) -> diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 78dbda6888..f3fbbee609 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -37,11 +37,17 @@ code_change/3, terminate/2]). +-export([info/1]). %% service_info callback + -export([ports/0, ports/1]). -include_lib("diameter/include/diameter.hrl"). +%% Keys into process dictionary. +-define(INFO_KEY, info). +-define(REF_KEY, ref). + -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). -define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 @@ -74,7 +80,7 @@ %% Accepting/connecting transport process state. -record(transport, - {socket :: inet:socket(), %% accept or connect socket + {socket :: inet:socket() | ssl:sslsock(), %% accept or connect socket parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module frag = <<>> :: binary() | {tref(), frag()}, %% message fragment @@ -111,6 +117,33 @@ start_link(T) -> diameter_lib:spawn_opts(server, [])). %% --------------------------------------------------------------------------- +%% # info/1 +%% --------------------------------------------------------------------------- + +info({Mod, Sock}) -> + lists:flatmap(fun(K) -> info(Mod, K, Sock) end, + [{socket, fun sockname/2}, + {peer, fun peername/2}, + {statistics, fun getstat/2} + | ssl_info(Mod, Sock)]). + +info(Mod, {K,F}, Sock) -> + case F(Mod, Sock) of + {ok, V} -> + [{K,V}]; + _ -> + [] + end. + +ssl_info(ssl = M, Sock) -> + [{M, ssl_info(Sock)}]; +ssl_info(_, _) -> + []. + +ssl_info(Sock) -> + [{peercert, C} || {ok, C} <- [ssl:peercert(Sock)]]. + +%% --------------------------------------------------------------------------- %% # init/1 %% --------------------------------------------------------------------------- @@ -133,7 +166,7 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) MPid ! {stop, self()}, %% tell the monitor to die M = if SslOpts -> ssl; true -> Mod end, setopts(M, Sock), - putr(ref, Ref), + putr(?REF_KEY, Ref), #transport{parent = Pid, module = M, socket = Sock, @@ -191,7 +224,7 @@ i(accept = T, Ref, Mod, Pid, Opts, Addrs) -> {LAddr, LSock} = listener(Ref, {Mod, Opts, Addrs}), proc_lib:init_ack({ok, self(), [LAddr]}), Sock = ok(accept(Mod, LSock)), - true = diameter_reg:add_new({?MODULE, T, {Ref, Sock}}), + publish(Mod, T, Ref, Sock), diameter_peer:up(Pid), Sock; @@ -202,10 +235,14 @@ i(connect = T, Ref, Mod, Pid, Opts, Addrs) -> RPort = get_port(RP), proc_lib:init_ack({ok, self(), [LAddr]}), Sock = ok(connect(Mod, RAddr, RPort, gen_opts(LAddr, Rest))), - true = diameter_reg:add_new({?MODULE, T, {Ref, Sock}}), + publish(Mod, T, Ref, Sock), diameter_peer:up(Pid, {RAddr, RPort}), Sock. +publish(Mod, T, Ref, Sock) -> + true = diameter_reg:add_new({?MODULE, T, {Ref, Sock}}), + putr(?INFO_KEY, {Mod, Sock}). %% for info/1 + ok({ok, T}) -> T; ok(No) -> @@ -521,7 +558,7 @@ tls_handshake(Type, true, #transport{socket = Sock, ssl = Opts} = S) -> {ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]), - Ref = getr(ref), + Ref = getr(?REF_KEY), is_reference(Ref) %% started in new code andalso (true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}})), @@ -696,12 +733,32 @@ setopts(M, Sock) -> portnr(gen_tcp, Sock) -> inet:port(Sock); -portnr(ssl, Sock) -> - case ssl:sockname(Sock) of +portnr(M, Sock) -> + case M:sockname(Sock) of {ok, {_Addr, PortNr}} -> {ok, PortNr}; {error, _} = No -> No - end; -portnr(M, Sock) -> - M:port(Sock). + end. + +%% sockname/2 + +sockname(gen_tcp, Sock) -> + inet:sockname(Sock); +sockname(M, Sock) -> + M:sockname(Sock). + +%% peername/2 + +peername(gen_tcp, Sock) -> + inet:peername(Sock); +peername(M, Sock) -> + M:peername(Sock). + +%% getstat/2 + +getstat(gen_tcp, Sock) -> + inet:getstat(Sock); +getstat(M, Sock) -> + M:getstat(Sock). +%% Note that ssl:getstat/1 doesn't yet exist in R15B01. diff --git a/lib/diameter/src/transport/diameter_transport.erl b/lib/diameter/src/transport/diameter_transport.erl new file mode 100644 index 0000000000..ff4b6bbc6d --- /dev/null +++ b/lib/diameter/src/transport/diameter_transport.erl @@ -0,0 +1,55 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2012. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +-module(diameter_transport). + +%% +%% This module implements a transport start function that +%% evaluates its config argument. +%% + +%% Transport start functions +-export([start/3, + select/3, + eval/3]). + +%% start/3 + +%% Call a start function in this module ... +start(T, Svc, {F,A}) -> + start(T, Svc, {?MODULE, F, [A]}); + +%% ... or some other. +start(T, Svc, F) -> + diameter_lib:eval([F, T, Svc]). + +%% select/3 +%% +%% A start function that whose config argument is expected to return a +%% new start function. + +select(T, Svc, F) -> + start(T, Svc, diameter_lib:eval([F, T, Svc])). + +%% eval/3 +%% +%% A start function that simply evaluates its config argument. + +eval(_, _, F) -> + diameter_lib:eval(F). |