diff options
Diffstat (limited to 'lib/diameter/src/base')
25 files changed, 974 insertions, 551 deletions
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index 9d71bcbbf8..e8f2f63f86 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -35,6 +36,8 @@ %% Information. -export([services/0, + peer_info/1, + peer_find/1, service_info/2]). %% Start/stop the application. In a "real" application this should @@ -52,6 +55,7 @@ service_name/0, capability/0, peer_filter/0, + peer_ref/0, service_opt/0, application_opt/0, app_module/0, @@ -146,6 +150,27 @@ service_info(SvcName, Option) -> diameter_service:info(SvcName, Option). %% --------------------------------------------------------------------------- +%% peer_info/2 +%% --------------------------------------------------------------------------- + +-spec peer_info(peer_ref()) + -> [tuple()]. + +peer_info(PeerRef) -> + diameter_service:peer_info(PeerRef). + +%% --------------------------------------------------------------------------- +%% peer_find/1 +%% --------------------------------------------------------------------------- + +-spec peer_find(peer_ref() | pid()) + -> {peer_ref(), pid()} + | false. + +peer_find(Pid) -> + diameter_peer_fsm:find(Pid). + +%% --------------------------------------------------------------------------- %% add_transport/3 %% --------------------------------------------------------------------------- @@ -279,6 +304,9 @@ call(SvcName, App, Message) -> | {all, [peer_filter()]} | {any, [peer_filter()]}. +-opaque peer_ref() + :: pid(). + -type evaluable() :: {module(), atom(), list()} | fun() diff --git a/lib/diameter/src/base/diameter_app.erl b/lib/diameter/src/base/diameter_app.erl index 600f7ff04d..122f60dd88 100644 --- a/lib/diameter/src/base/diameter_app.erl +++ b/lib/diameter/src/base/diameter_app.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_callback.erl b/lib/diameter/src/base/diameter_callback.erl index 90431099b0..f479cb6612 100644 --- a/lib/diameter/src/base/diameter_callback.erl +++ b/lib/diameter/src/base/diameter_callback.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. -%% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_capx.erl b/lib/diameter/src/base/diameter_capx.erl index 7dc61f229f..07a678c617 100644 --- a/lib/diameter/src/base/diameter_capx.erl +++ b/lib/diameter/src/base/diameter_capx.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl index aab8b5887e..1ea5357924 100644 --- a/lib/diameter/src/base/diameter_codec.erl +++ b/lib/diameter/src/base/diameter_codec.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index 242b6b4d08..fdbbd412a1 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -37,17 +38,17 @@ -module(diameter_config). -behaviour(gen_server). --compile({no_auto_import, [monitor/2]}). - -export([start_service/2, stop_service/1, add_transport/2, remove_transport/2, have_transport/2, - lookup/1]). + lookup/1, + subscribe/2]). -%% child server start --export([start_link/0]). +%% server start +-export([start_link/0, + start_link/1]). %% gen_server callbacks -export([init/1, @@ -57,8 +58,8 @@ handle_info/2, code_change/3]). -%% diameter_sync requests. --export([sync/1]). +%% callbacks +-export([sync/1]). %% diameter_sync requests %% debug -export([state/0, @@ -68,7 +69,8 @@ -include("diameter_internal.hrl"). %% Server state. --record(state, {id = now()}). +-record(state, {id = diameter_lib:now(), + role :: server | transport}). %% Registered name of the server. -define(SERVER, ?MODULE). @@ -76,6 +78,9 @@ %% Table config is written to. -define(TABLE, ?MODULE). +%% Key on which a transport-specific child registers itself. +-define(TRANSPORT_KEY(Ref), {?MODULE, transport, Ref}). + %% Workaround for dialyzer's lack of understanding of match specs. -type match(T) :: T | '_' | '$1' | '$2' | '$3' | '$4'. @@ -224,6 +229,13 @@ pred(_) -> ?THROW(pred). %% -------------------------------------------------------------------------- +%% # subscribe/2 +%% -------------------------------------------------------------------------- + +subscribe(Ref, T) -> + diameter_reg:subscribe(?TRANSPORT_KEY(Ref), T). + +%% -------------------------------------------------------------------------- %% # have_transport/2 %% %% Output: true | false @@ -263,6 +275,9 @@ start_link() -> Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}], gen_server:start_link(ServerName, Module, Args, Options). +start_link(T) -> + proc_lib:start_link(?MODULE, init, [T], infinity, []). + state() -> call(state). @@ -273,8 +288,27 @@ uptime() -> %%% # init/1 %%% ---------------------------------------------------------- +%% ?SERVER start. init([]) -> - {ok, #state{}}. + {ok, #state{role = server}}; + +%% Child start as a consequence of add_transport. +init({SvcName, Type, Opts}) -> + Res = try + add(SvcName, Type, Opts) + catch + ?FAILURE(Reason) -> {error, Reason} + end, + proc_lib:init_ack({ok, self(), Res}), + loop(Res). + +%% loop/1 + +loop({ok, _}) -> + gen_server:enter_loop(?MODULE, [], #state{role = transport}); + +loop({error, _}) -> + ok. %% die %%% ---------------------------------------------------------- %%% # handle_call/2 @@ -283,8 +317,8 @@ init([]) -> handle_call(state, _, State) -> {reply, State, State}; -handle_call(uptime, _, #state{id = Time} = State) -> - {reply, diameter_lib:now_diff(Time), State}; +handle_call(uptime, _, #state{id = Time} = S) -> + {reply, diameter_lib:now_diff(Time), S}; handle_call(Req, From, State) -> ?UNEXPECTED([Req, From]), @@ -303,30 +337,34 @@ handle_cast(Msg, State) -> %%% # handle_info/2 %%% ---------------------------------------------------------- +%% remove_transport is telling published child to die. +handle_info(stop, #state{role = transport} = S) -> + {stop, normal, S}; + %% A service process has died. This is most likely a consequence of %% stop_service, in which case the restart will find no config for the %% service and do nothing. The entry keyed on the monitor ref is only %% removed as a result of the 'DOWN' notification however. -handle_info({'DOWN', MRef, process, _, Reason}, State) -> +handle_info({'DOWN', MRef, process, _, Reason}, #state{role = server} = S) -> [#monitor{service = SvcName} = T] = select([{#monitor{mref = MRef, _ = '_'}, [], ['$_']}]), queue_restart(Reason, SvcName), delete_object(T), - {noreply, State}; + {noreply, S}; -handle_info({monitor, SvcName, Pid}, State) -> - monitor(Pid, SvcName), - {noreply, State}; +handle_info({monitor, SvcName, Pid}, #state{role = server} = S) -> + insert_monitor(Pid, SvcName), + {noreply, S}; -handle_info({restart, SvcName}, State) -> +handle_info({restart, SvcName}, #state{role = server} = S) -> restart(SvcName), - {noreply, State}; + {noreply, S}; -handle_info(restart, State) -> +handle_info(restart, #state{role = server} = S) -> restart(), - {noreply, State}; + {noreply, S}; handle_info(Info, State) -> ?UNEXPECTED([Info]), @@ -403,19 +441,22 @@ sync({start_service, SvcName, Opts}) -> sync({stop_service, SvcName}) -> stop(SvcName); +%% Start a child whose only purpose is to be alive for the lifetime of +%% the transport configuration and publish itself in diameter_reg. +%% This is to provide a way for processes to to be notified when the +%% configuration is removed (diameter_reg:subscribe/2). sync({add, SvcName, Type, Opts}) -> - try - add(SvcName, Type, Opts) - catch - ?FAILURE(Reason) -> {error, Reason} - end; + {ok, _Pid, Res} = diameter_config_sup:start_child({SvcName, Type, Opts}), + Res; sync({remove, SvcName, Pred}) -> - remove(select([{#transport{service = '$1', _ = '_'}, + Recs = select([{#transport{service = '$1', _ = '_'}, [{'=:=', '$1', {const, SvcName}}], ['$_']}]), - SvcName, - Pred). + F = fun(#transport{ref = R, type = T, options = O}) -> + Pred(R,T,O) + end, + remove(SvcName, lists:filter(F, Recs)). %% start/3 @@ -437,8 +478,8 @@ startmon(SvcName, {ok, Pid}) -> startmon(_, {error, _}) -> ok. -monitor(Pid, SvcName) -> - MRef = erlang:monitor(process, Pid), +insert_monitor(Pid, SvcName) -> + MRef = monitor(process, Pid), insert(#monitor{mref = MRef, service = SvcName}). %% queue_restart/2 @@ -502,6 +543,7 @@ add(SvcName, Type, Opts) -> ok = transport_opts(Opts), Ref = make_ref(), + true = diameter_reg:add_new(?TRANSPORT_KEY(Ref)), T = {Ref, Type, Opts}, %% The call to the service returns error if the service isn't %% started yet, which is harmless. The transport will be started @@ -593,26 +635,30 @@ start_transport(SvcName, T) -> No end. -%% remove/3 - -remove(L, SvcName, Pred) -> - rm(SvcName, lists:filter(fun(#transport{ref = R, type = T, options = O}) -> - Pred(R,T,O) - end, - L)). +%% remove/2 -rm(_, []) -> +remove(_, []) -> ok; -rm(SvcName, L) -> + +remove(SvcName, L) -> Refs = lists:map(fun(#transport{ref = R}) -> R end, L), case stop_transport(SvcName, Refs) of ok -> + lists:foreach(fun stop_child/1, Refs), diameter_stats:flush(Refs), lists:foreach(fun delete_object/1, L); {error, _} = No -> No end. +stop_child(Ref) -> + case diameter_reg:match(?TRANSPORT_KEY(Ref)) of + [{_, Pid}] -> %% tell the transport-specific child to die + Pid ! stop; + [] -> %% already removed/dead + ok + end. + stop_transport(SvcName, Refs) -> case diameter_service:stop_transport(SvcName, Refs) of ok -> diff --git a/lib/diameter/src/base/diameter_config_sup.erl b/lib/diameter/src/base/diameter_config_sup.erl new file mode 100644 index 0000000000..9524573378 --- /dev/null +++ b/lib/diameter/src/base/diameter_config_sup.erl @@ -0,0 +1,58 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2016. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% Supervisor for config processes. +%% + +-module(diameter_config_sup). + +-behaviour(supervisor). + +%% interface +-export([start_link/0, %% supervisor start + start_child/1]). %% config start + +-export([init/1]). + +-define(NAME, ?MODULE). %% supervisor name + +%% start_link/0 + +start_link() -> + SupName = {local, ?NAME}, + supervisor:start_link(SupName, ?MODULE, []). + +%% start_child/1 + +start_child(T) -> + supervisor:start_child(?NAME, [T]). + +%% init/1 + +init([]) -> + Mod = diameter_config, + Flags = {simple_one_for_one, 0, 1}, + ChildSpec = {Mod, + {Mod, start_link, []}, + temporary, + 1000, + worker, + [Mod]}, + {ok, {Flags, [ChildSpec]}}. diff --git a/lib/diameter/src/base/diameter_dict.erl b/lib/diameter/src/base/diameter_dict.erl index 3b9ba00a3f..7db294a1b1 100644 --- a/lib/diameter/src/base/diameter_dict.erl +++ b/lib/diameter/src/base/diameter_dict.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_internal.hrl b/lib/diameter/src/base/diameter_internal.hrl index 4b672aa071..a0f4a8567d 100644 --- a/lib/diameter/src/base/diameter_internal.hrl +++ b/lib/diameter/src/base/diameter_internal.hrl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_lib.erl b/lib/diameter/src/base/diameter_lib.erl index 26cc6137a2..b835e87967 100644 --- a/lib/diameter/src/base/diameter_lib.erl +++ b/lib/diameter/src/base/diameter_lib.erl @@ -1,28 +1,35 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% -module(diameter_lib). +-compile({no_auto_import, [now/0]}). -export([info_report/2, error_report/2, warning_report/2, + now/0, + timestamp/0, + timestamp/1, now_diff/1, + micro_diff/1, + micro_diff/2, time/1, eval/1, eval_name/1, @@ -92,21 +99,78 @@ fmt(T) -> end. %% --------------------------------------------------------------------------- +%% # now/0 +%% --------------------------------------------------------------------------- + +-spec now() + -> integer(). + +now() -> + erlang:monotonic_time(). + +%% --------------------------------------------------------------------------- +%% # timestamp/0 +%% --------------------------------------------------------------------------- + +-spec timestamp() + -> erlang:timestamp(). + +timestamp() -> + timestamp(now()). + +%% --------------------------------------------------------------------------- +%% # timestamp/1 +%% --------------------------------------------------------------------------- + +-spec timestamp(integer()) + -> erlang:timestamp(). + +timestamp(MonoT) -> %% monotonic time + MicroSecs = monotonic_to_microseconds(MonoT + erlang:time_offset()), + Secs = MicroSecs div 1000000, + {Secs div 1000000, Secs rem 1000000, MicroSecs rem 1000000}. + +monotonic_to_microseconds(MonoT) -> + erlang:convert_time_unit(MonoT, native, micro_seconds). + +%% --------------------------------------------------------------------------- %% # now_diff/1 %% --------------------------------------------------------------------------- --spec now_diff(NowT :: erlang:timestamp()) +-spec now_diff(T0 :: integer()) -> {Hours, Mins, Secs, MicroSecs} when Hours :: non_neg_integer(), Mins :: 0..59, Secs :: 0..59, MicroSecs :: 0..999999. -%% Return timer:now_diff(now(), NowT) as an {H, M, S, MicroS} tuple -%% instead of as integer microseconds. +%% Return time difference as an {H, M, S, MicroS} tuple instead of as +%% integer microseconds. + +now_diff(T0) -> + time(micro_diff(T0)). + +%% --------------------------------------------------------------------------- +%% # micro_diff/1 +%% --------------------------------------------------------------------------- + +-spec micro_diff(T0 :: integer()) + -> MicroSecs + when MicroSecs :: non_neg_integer(). -now_diff(Time) -> - time(timer:now_diff(now(), Time)). +micro_diff(T0) -> %% monotonic time + monotonic_to_microseconds(erlang:monotonic_time() - T0). + +%% --------------------------------------------------------------------------- +%% # micro_diff/2 +%% --------------------------------------------------------------------------- + +-spec micro_diff(T1 :: integer(), T0 :: integer()) + -> MicroSecs + when MicroSecs :: non_neg_integer(). + +micro_diff(T1, T0) -> %% monotonic time + monotonic_to_microseconds(T1 - T0). %% --------------------------------------------------------------------------- %% # time/1 @@ -114,19 +178,13 @@ now_diff(Time) -> %% Return an elapsed time as an {H, M, S, MicroS} tuple. %% --------------------------------------------------------------------------- --spec time(NowT | Diff) +-spec time(Diff :: non_neg_integer()) -> {Hours, Mins, Secs, MicroSecs} - when NowT :: erlang:timestamp(), - Diff :: non_neg_integer(), - Hours :: non_neg_integer(), + when Hours :: non_neg_integer(), Mins :: 0..59, Secs :: 0..59, MicroSecs :: 0..999999. -time({_,_,_} = NowT) -> %% time of day - %% 24 hours = 24*60*60*1000000 = 86400000000 microsec - time(timer:now_diff(NowT, {0,0,0}) rem 86400000000); - time(Micro) -> %% elapsed time Seconds = Micro div 1000000, H = Seconds div 3600, @@ -226,7 +284,7 @@ ip(T) %% Or not: convert from '.'/':'-separated decimal/hex. ip(Addr) -> - {ok, A} = inet:parse_address(Addr), + {ok, A} = inet_parse:address(Addr), %% documented in inet(3) A. %% --------------------------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_misc_sup.erl b/lib/diameter/src/base/diameter_misc_sup.erl index 4e40476f14..343688be23 100644 --- a/lib/diameter/src/base/diameter_misc_sup.erl +++ b/lib/diameter/src/base/diameter_misc_sup.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. -%% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index 1ae8b567b1..2759f17e64 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -56,7 +57,7 @@ -define(SERVER, ?MODULE). %% Server state. --record(state, {id = now()}). +-record(state, {id = diameter_lib:now()}). %% Default transport_module/config. -define(DEFAULT_TMOD, diameter_tcp). @@ -201,10 +202,10 @@ match1(Addr, Match) -> match(Addr, {ok, A}, _) -> Addr == A; match(Addr, {error, _}, RE) -> - match == re:run(inet:ntoa(Addr), RE, [{capture, none}, caseless]). + match == re:run(inet_parse:ntoa(Addr), RE, [{capture, none}]). addr([_|_] = A) -> - inet:parse_address(A); + inet_parse:address(A); addr(A) -> {ok, A}. diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index a9ee4940a3..996e75a8d3 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -31,6 +32,9 @@ -export([start/3, result_code/2]). +%% Interface towards diameter. +-export([find/1]). + %% gen_server callbacks -export([init/1, handle_call/3, @@ -116,10 +120,9 @@ parent :: pid(), %% watchdog process transport :: pid(), %% transport process dictionary :: module(), %% common dictionary - service :: #diameter_service{}, + service :: #diameter_service{} | undefined, dpr = false :: false | true %% DPR received, DPA sent - | {uint32(), uint32()} %% set in old code | {boolean(), uint32(), uint32()}, %% hop by hop and end to end identifiers in %% outgoing DPR; boolean says whether or not @@ -155,8 +158,7 @@ %% # start/3 %% --------------------------------------------------------------------------- --spec start(T, [Opt], {[diameter:service_opt()] - | diameter:sequence(), %% from old code +-spec start(T, [Opt], {[diameter:service_opt()], [node()], module(), #diameter_service{}}) @@ -186,6 +188,25 @@ start_link(T) -> infinity, diameter_lib:spawn_opts(server, [])). +%% find/1 +%% +%% Identify both pids of a peer_fsm/transport pair. + +find(Pid) -> + findl([{?MODULE, '_', Pid}, {?MODULE, Pid, '_'}]). + +findl([]) -> + false; + +findl([Pat | Rest]) -> + try + [{{_, Pid, TPid}, Pid}] = diameter_reg:match(Pat), + {Pid, TPid} + catch + error:_ -> + findl(Rest) + end. + %% --------------------------------------------------------------------------- %% --------------------------------------------------------------------------- @@ -195,9 +216,6 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({Ack, WPid, T, Opts, {{_,_} = Mask, Nodes, Dict0, Svc}}) -> %% from old code - i({Ack, WPid, T, Opts, {[{sequence, Mask}], Nodes, Dict0, Svc}}); - i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> erlang:monitor(process, WPid), wait(Ack, WPid), @@ -219,6 +237,8 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> {TPid, Addrs} = start_transport(T, Rest, Svc), + diameter_reg:add({?MODULE, self(), TPid}), %% lets pairs be discovered + #state{state = {'Wait-Conn-Ack', Tmo}, parent = WPid, transport = TPid, @@ -329,14 +349,11 @@ handle_info(T, #state{} = State) -> {?MODULE, Tag, Reason} -> ?LOG(stop, Tag), {stop, {shutdown, Reason}, State} - end; + end. %% The form of the throw caught here is historical. It's %% significant that it's not a 2-tuple, as in ?FAILURE(Reason), %% since these are caught elsewhere. -handle_info(T, S) -> %% started in old code - handle_info(T, #state{} = erlang:append_element(S, infinity)). - %% Note that there's no guarantee that the service and transport %% capabilities are good enough to build a CER/CEA that can be %% succesfully encoded. It's not checked at diameter:add_transport/2 @@ -371,9 +388,6 @@ eraser(Key) -> %% transition/2 -transition(T, #state{dpr = {Hid, Eid}} = S) -> %% DPR sent from old code - transition(T, S#state{dpr = {false, Hid, Eid}}); - %% Connection to peer. transition({diameter, {TPid, connected, Remote}}, #state{transport = TPid, @@ -426,8 +440,8 @@ transition({connection_timeout, _}, _) -> ok; %% Incoming message from the transport. -transition({diameter, {recv, Pkt}}, S) -> - recv(Pkt, S); +transition({diameter, {recv, MsgT}}, S) -> + incoming(MsgT, S); %% Timeout when still in the same state ... transition({timeout = T, PS}, #state{state = PS}) -> @@ -553,6 +567,28 @@ encode(Rec, Dict) -> diameter_codec:encode(Dict, #diameter_packet{header = Hdr, msg = Rec}). +%% incoming/2 + +incoming({Msg, NPid}, S) -> + try recv(Msg, S) of + T -> + NPid ! {diameter, discard}, + T + catch + {?MODULE, Name, Pkt} -> + S#state.parent ! {recv, self(), Name, {Pkt, NPid}}, + rcv(Name, Pkt, S) + end; + +incoming(Msg, S) -> + try + recv(Msg, S) + catch + {?MODULE, Name, Pkt} -> + S#state.parent ! {recv, self(), Name, Pkt}, + rcv(Name, Pkt, S) + end. + %% recv/2 recv(#diameter_packet{header = #diameter_header{} = Hdr} @@ -607,9 +643,8 @@ recv1('DPA' = N, %% Any other message with a header and no length errors: send to the %% parent. -recv1(Name, Pkt, #state{parent = Pid} = S) -> - Pid ! {recv, self(), Name, Pkt}, - rcv(Name, Pkt, S). +recv1(Name, Pkt, #state{}) -> + throw({?MODULE, Name, Pkt}). %% recv/3 @@ -1301,25 +1336,15 @@ dpa_timer(Tmo) -> erlang:send_after(Tmo, self(), dpa_timeout). dpa_timeout() -> - dpa_timeout(getr(?DPA_KEY)). - -dpa_timeout({_, Tmo}) -> - Tmo; -dpa_timeout(undefined) -> %% set in old code - ?DPA_TIMEOUT; -dpa_timeout(Tmo) -> %% ditto + {_, Tmo} = getr(?DPA_KEY), Tmo. dpr_timer() -> dpa_timer(dpr_timeout()). dpr_timeout() -> - dpr_timeout(getr(?DPA_KEY)). - -dpr_timeout({Tmo, _}) -> - Tmo; -dpr_timeout(_) -> %% set in old code - ?DPR_TIMEOUT. + {Tmo, _} = getr(?DPA_KEY), + Tmo. %% register_everywhere/1 %% diff --git a/lib/diameter/src/base/diameter_peer_fsm_sup.erl b/lib/diameter/src/base/diameter_peer_fsm_sup.erl index 995eaf74d0..cf3c205e3f 100644 --- a/lib/diameter/src/base/diameter_peer_fsm_sup.erl +++ b/lib/diameter/src/base/diameter_peer_fsm_sup.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. -%% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 6bb4710e63..9027130063 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -24,14 +25,12 @@ -module(diameter_reg). -behaviour(gen_server). --compile({no_auto_import, [monitor/2]}). - -export([add/1, add_new/1, - del/1, - repl/2, + remove/1, match/1, - wait/1]). + wait/1, + subscribe/2]). -export([start_link/0]). @@ -45,29 +44,32 @@ %% test -export([pids/0, - terms/0]). + terms/0, + subs/0, + waits/0]). %% debug -export([state/0, uptime/0]). --include("diameter_internal.hrl"). - -define(SERVER, ?MODULE). -define(TABLE, ?MODULE). -%% Table entry used to keep from starting more than one monitor on the -%% same process. This isn't a problem but there's no point in starting -%% multiple monitors if we can avoid it. Note that we can't have a 2-tuple -%% keyed on Pid since a registered term can be anything. Want the entry -%% keyed on Pid so that lookup is fast. --define(MONITOR(Pid, MRef), {Pid, monitor, MRef}). +-type key() :: term(). +-type from() :: {pid(), term()}. +-type pattern() :: term(). -%% Table entry containing the Term -> Pid mapping. --define(MAPPING(Term, Pid), {Term, Pid}). +-record(state, {id = diameter_lib:now(), + receivers = dict:new() + :: dict:dict(pattern(), [[pid() | term()]%% subscribe + | from()]), %% wait + monitors = sets:new() :: sets:set(pid())}). --record(state, {id = now(), - q = []}). %% [{From, Pat}] +%% The ?TABLE bag contains the Key -> Pid mapping, as {Key, Pid} +%% tuples. Each pid is stored in the monitors set to ensure only one +%% monitor for each pid: more are harmless, but unnecessary. A pattern +%% is added to receivers a result of calls to wait/1 or subscribe/2: +%% changes to ?TABLE causes processes to be notified as required. %% =========================================================================== %% # add(T) @@ -76,18 +78,18 @@ %% this or other assocations can be retrieved using match/1. %% %% An association is removed when the calling process dies or as a -%% result of calling del/1. Adding the same term more than once is -%% equivalent to adding it exactly once. +%% result of calling remove/1. Adding the same term more than once is +%% equivalent to adding it once. %% %% Note that since match/1 takes a pattern as argument, specifying a %% term that contains match variables is probably not a good idea %% =========================================================================== --spec add(any()) +-spec add(key()) -> true. add(T) -> - call({add, fun ets:insert/2, T, self()}). + call({add, false, T}). %% =========================================================================== %% # add_new(T) @@ -96,36 +98,23 @@ add(T) -> %% association, false being returned if an association already exists. %% =========================================================================== --spec add_new(any()) +-spec add_new(key()) -> boolean(). add_new(T) -> - call({add, fun insert_new/2, T, self()}). - -%% =========================================================================== -%% # repl(T, NewT) -%% -%% Like add/1 but only replace an existing association on T, false -%% being returned if it doesn't exist. -%% =========================================================================== - --spec repl(any(), any()) - -> boolean(). - -repl(T, U) -> - call({repl, T, U, self()}). + call({add, true, T}). %% =========================================================================== -%% # del(Term) +%% # remove(Term) %% %% Remove any existing association of Term with self(). %% =========================================================================== --spec del(any()) +-spec remove(key()) -> true. -del(T) -> - call({del, T, self()}). +remove(T) -> + call({remove, T}). %% =========================================================================== %% # match(Pat) @@ -138,12 +127,17 @@ del(T) -> %% associations removed.) %% =========================================================================== --spec match(any()) - -> [{term(), pid()}]. +-spec match(pattern()) + -> [{key(), pid()}]. match(Pat) -> - ets:match_object(?TABLE, ?MAPPING(Pat, '_')). + match(Pat, '_'). + +%% match/2 +match(Pat, Pid) -> + ets:match_object(?TABLE, {Pat, Pid}). + %% =========================================================================== %% # wait(Pat) %% @@ -151,10 +145,29 @@ match(Pat) -> %% It's up to the caller to ensure that the wait won't be forever. %% =========================================================================== +-spec wait(pattern()) + -> [{key(), pid()}]. + wait(Pat) -> + _ = match(Pat), %% ensure match can succeed call({wait, Pat}). %% =========================================================================== +%% # subscribe(Pat, T) +%% +%% Like match/1, but additionally receive messages of the form +%% {T, add|remove, {term(), pid()} when associations are added +%% or removed. +%% =========================================================================== + +-spec subscribe(Pat :: any(), T :: term()) + -> [{term(), pid()}]. + +subscribe(Pat, T) -> + _ = match(Pat), %% ensure match can succeed + call({subscribe, Pat, T}). + +%% =========================================================================== start_link() -> ServerName = {local, ?SERVER}, @@ -168,19 +181,15 @@ uptime() -> call(uptime). %% pids/0 -%% -%% Return: list of {Pid, [Term, ...]} + +-spec pids() + -> [{pid(), [key()]}]. pids() -> to_list(fun swap/1). to_list(Fun) -> - ets:foldl(fun(T,A) -> acc(Fun, T, A) end, orddict:new(), ?TABLE). - -acc(Fun, ?MAPPING(Term, Pid), Dict) -> - append(Fun({Term, Pid}), Dict); -acc(_, _, Dict) -> - Dict. + ets:foldl(fun(T,D) -> append(Fun(T), D) end, orddict:new(), ?TABLE). append({K,V}, Dict) -> orddict:append(K, V, Dict). @@ -188,14 +197,47 @@ append({K,V}, Dict) -> id(T) -> T. %% terms/0 -%% -%% Return: list of {Term, [Pid, ...]} + +-spec terms() + -> [{key(), [pid()]}]. terms() -> to_list(fun id/1). swap({X,Y}) -> {Y,X}. +%% subs/0 + +-spec subs() + -> [{pattern(), [{pid(), term()}]}]. + +subs() -> + #state{receivers = RD} = state(), + dict:fold(fun sub/3, orddict:new(), RD). + +sub(Pat, Ps, Dict) -> + lists:foldl(fun([P|T], D) -> orddict:append(Pat, {P,T}, D); + (_, D) -> D + end, + Dict, + Ps). + +%% waits/0 + +-spec waits() + -> [{pattern(), [{from(), term()}]}]. + +waits() -> + #state{receivers = RD} = state(), + dict:fold(fun wait/3, orddict:new(), RD). + +wait(Pat, Ps, Dict) -> + lists:foldl(fun({_,_} = F, D) -> orddict:append(Pat, F, D); + (_, D) -> D + end, + Dict, + Ps). + %% ---------------------------------------------------------- %% # init/1 %% ---------------------------------------------------------- @@ -208,57 +250,58 @@ init(_) -> %% # handle_call/3 %% ---------------------------------------------------------- -handle_call({add, Fun, Key, Pid}, _, S) -> - B = Fun(?TABLE, {Key, Pid}), - monitor(B andalso no_monitor(Pid), Pid), - {reply, B, pending(B, S)}; - -handle_call({del, Key, Pid}, _, S) -> - {reply, ets:delete_object(?TABLE, ?MAPPING(Key, Pid)), S}; - -handle_call({repl, T, U, Pid}, _, S) -> - MatchSpec = [{?MAPPING('$1', Pid), - [{'=:=', '$1', {const, T}}], - ['$_']}], - {reply, repl(ets:select(?TABLE, MatchSpec), U, Pid), S}; - -handle_call({wait, Pat}, From, #state{q = Q} = S) -> - case find(Pat) of - {ok, L} -> - {reply, L, S}; - false -> - {noreply, S#state{q = [{From, Pat} | Q]}} +handle_call({add, Uniq, Key}, {Pid, _}, S0) -> + Rec = {Key, Pid}, + S1 = flush(Uniq, Rec, S0), + {Res, New} = insert(Uniq, Rec), + {Recvs, S} = add(New, Rec, S1), + notify(Recvs, Rec), + {reply, Res, S}; + +handle_call({remove, Key}, {Pid, _}, S) -> + Rec = {Key, Pid}, + Recvs = delete([Rec], S), + ets:delete_object(?TABLE, Rec), + notify(Recvs, remove), + {reply, true, S}; + +handle_call({wait, Pat}, {Pid, _} = From, #state{receivers = RD} = S) -> + NS = add_monitor(Pid, S), + case match(Pat) of + [_|_] = L -> + {reply, L, NS}; + [] -> + {noreply, NS#state{receivers = dict:append(Pat, From, RD)}} end; +handle_call({subscribe, Pat, T}, {Pid, _}, #state{receivers = RD} = S) -> + NS = add_monitor(Pid, S), + {reply, match(Pat), NS#state{receivers = dict:append(Pat, [Pid | T], RD)}}; + handle_call(state, _, S) -> {reply, S, S}; handle_call(uptime, _, #state{id = Time} = S) -> {reply, diameter_lib:now_diff(Time), S}; -handle_call(Req, From, S) -> - ?UNEXPECTED([Req, From]), +handle_call(_Req, _From, S) -> {reply, nok, S}. %% ---------------------------------------------------------- %% # handle_cast/2 %% ---------------------------------------------------------- -handle_cast(Msg, S)-> - ?UNEXPECTED([Msg]), +handle_cast(_Msg, S)-> {noreply, S}. %% ---------------------------------------------------------- %% # handle_info/2 %% ---------------------------------------------------------- -handle_info({'DOWN', MRef, process, Pid, _}, S) -> - ets:delete_object(?TABLE, ?MONITOR(Pid, MRef)), - ets:match_delete(?TABLE, ?MAPPING('_', Pid)), - {noreply, S}; +handle_info({'DOWN', _MRef, process, Pid, _}, S) -> + {noreply, down(Pid, S)}; -handle_info(Info, S) -> - ?UNEXPECTED([Info]), +handle_info(_Info, S) -> {noreply, S}. %% ---------------------------------------------------------- @@ -277,71 +320,166 @@ code_change(_OldVsn, State, _Extra) -> %% =========================================================================== -monitor(true, Pid) -> - ets:insert(?TABLE, ?MONITOR(Pid, erlang:monitor(process, Pid))); -monitor(false, _) -> - ok. +%% insert/2 + +insert(false, Rec) -> + Spec = [{'$1', [{'==', '$1', {const, Rec}}], ['$_']}], + X = '$end_of_table' /= ets:select(?TABLE, Spec, 1), %% entry exists? + X orelse ets:insert(?TABLE, Rec), + {true, not X}; + +insert(true, Rec) -> + B = ets:insert_new(?TABLE, Rec), %% entry inserted? + {B, B}. -%% Do we need a monitor for the specified Pid? -no_monitor(Pid) -> - [] == ets:match_object(?TABLE, ?MONITOR(Pid, '_')). +%% add/3 -%% insert_new/2 +%% Only add a single monitor for any given process, since there's no +%% use to more. +add(true, {_Key, Pid} = Rec, S) -> + NS = add_monitor(Pid, S), + {Recvs, RD} = add(Rec, NS), + {Recvs, S#state{receivers = RD}}; -insert_new(?TABLE, {Key, _} = T) -> - flush(ets:lookup(?TABLE, Key)), - ets:insert_new(?TABLE, T). +add(false = No, _, S) -> + {No, S}. + +%% add/2 + +%% Notify processes whose patterns match the inserted key. +add({_Key, Pid} = Rec, #state{receivers = RD}) -> + dict:fold(fun(Pt, Ps, A) -> + add(lists:member(Rec, match(Pt, Pid)), Pt, Ps, Rec, A) + end, + {sets:new(), RD}, + RD). + +%% add/5 + +add(true, Pat, Recvs, {_,_} = Rec, {Set, Dict}) -> + {lists:foldl(fun sets:add_element/2, Set, Recvs), + remove(fun erlang:is_list/1, Pat, Recvs, Dict)}; + +add(false, _, _, _, Acc) -> + Acc. + +%% add_monitor/2 + +add_monitor(Pid, #state{monitors = MS} = S) -> + add_monitor(sets:is_element(Pid, MS), Pid, S). + +%% add_monitor/3 + +add_monitor(false, Pid, #state{monitors = MS} = S) -> + monitor(process, Pid), + S#state{monitors = sets:add_element(Pid, MS)}; + +add_monitor(true, _, S) -> + S. + +%% delete/2 + +delete(Recs, #state{receivers = RD}) -> + lists:foldl(fun(R,S) -> delete(R, RD, S) end, sets:new(), Recs). + +%% delete/3 + +delete({_Key, Pid} = Rec, RD, Set) -> + dict:fold(fun(Pt, Ps, S) -> + delete(lists:member(Rec, match(Pt, Pid)), Rec, Ps, S) + end, + Set, + RD). + +%% delete/4 + +%% Entry matches a pattern ... +delete(true, Rec, Recvs, Set) -> + lists:foldl(fun(R,S) -> sets:add_element({R, Rec}, S) end, + Set, + Recvs); + +%% ... or not. +delete(false, _, _, Set) -> + Set. + +%% notify/2 + +notify(false = No, _) -> + No; + +notify(Recvs, remove = Op) -> + sets:fold(fun({P,R}, N) -> send(P, R, Op), N+1 end, 0, Recvs); + +notify(Recvs, {_,_} = Rec) -> + sets:fold(fun(P,N) -> send(P, Rec, add), N+1 end, 0, Recvs). + +%% send/3 + +%% No processes waiting on remove, by construction: they've either +%% received notification at add or aren't waiting. +send([Pid | T], Rec, Op) -> + Pid ! {T, Op, Rec}; + +send({_,_} = From, Rec, add) -> + gen_server:reply(From, [Rec]). + +%% down/2 + +down(Pid, #state{monitors = MS} = S) -> + NS = flush(Pid, S), + Recvs = delete(match('_', Pid), NS), + ets:match_delete(?TABLE, {'_', Pid}), + notify(Recvs, remove), + NS#state{monitors = sets:del_element(Pid, MS)}. + +%% flush/3 %% Remove any processes that are dead but for which we may not have -%% received 'DOWN' yet. This is to ensure that add_new can be used -%% to register a unique name each time a process restarts. -flush(List) -> - lists:foreach(fun({_,P} = T) -> - del(erlang:is_process_alive(P), T) - end, - List). - -del(Alive, T) -> - Alive orelse ets:delete_object(?TABLE, T). - -%% repl/3 - -repl([?MAPPING(_, Pid) = M], Key, Pid) -> - ets:delete_object(?TABLE, M), - true = ets:insert(?TABLE, ?MAPPING(Key, Pid)); -repl([], _, _) -> - false. - -%% pending/1 - -pending(true, #state{q = [_|_] = Q} = S) -> - S#state{q = q(lists:reverse(Q), [])}; %% retain reply order -pending(_, S) -> +%% received 'DOWN' yet, to ensure that add_new can be used to register +%% a unique name each time a registering process restarts. +flush(true, {Key, Pid}, S) -> + Spec = [{{'$1', '$2'}, + [{'andalso', {'==', '$1', {const, Key}}, + {'/=', '$2', Pid}}], + ['$2']}], + lists:foldl(fun down/2, S, [P || P <- ets:select(?TABLE, Spec), + not is_process_alive(P)]); + +flush(false, _, S) -> S. -q([], Q) -> - Q; -q([{From, Pat} = T | Rest], Q) -> - case find(Pat) of - {ok, L} -> - gen_server:reply(From, L), - q(Rest, Q); - false -> - q(Rest, [T|Q]) - end. - -%% find/1 - -find(Pat) -> - try match(Pat) of - [] -> - false; - L -> - {ok, L} - catch - _:_ -> - {ok, []} - end. +%% flush/2 + +%% Process has died and should no longer receive messages/replies. +flush(Pid, #state{receivers = RD} = S) + when is_pid(Pid) -> + S#state{receivers = dict:fold(fun(Pt,Ps,D) -> flush(Pid, Pt, Ps, D) end, + RD, + RD)}. + +%% flush/4 + +flush(Pid, Pat, Recvs, Dict) -> + remove(fun(T) -> Pid /= head(T) end, Pat, Recvs, Dict). + +%% head/1 + +head([P|_]) -> + P; + +head({P,_}) -> + P. + +%% remove/4 + +remove(Pred, Key, Values, Dict) -> + case lists:filter(Pred, Values) of + [] -> + dict:erase(Key, Dict); + Rest -> + dict:store(Key, Rest, Dict) + end. %% call/1 diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 7aa7c422c4..ccf68f4d93 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -31,6 +32,7 @@ -export([subscribe/1, unsubscribe/1, services/0, + peer_info/1, info/2]). %% towards diameter_config @@ -103,7 +105,7 @@ %% to determine whether or not we need to call the process for a %% pick_peer callback in the statefull case. -record(state, - {id = now(), + {id = diameter_lib:now(), service_name :: diameter:service_name(), %% key in ?STATE_TABLE service :: #diameter_service{}, watchdogT = ets_new(watchdogs) %% #watchdog{} at start @@ -127,14 +129,14 @@ %% Record representing an RFC 3539 watchdog process implemented by %% diameter_watchdog. -record(watchdog, - {pid :: match(pid()), + {pid :: match(pid()) | undefined, type :: match(connect | accept), ref :: match(reference()), %% key into diameter_config options :: match([diameter:transport_opt()]),%% from start_transport state = ?WD_INITIAL :: match(wd_state()), - started = now(), %% at process start + started = diameter_lib:now(),%% at process start peer = false :: match(boolean() | pid())}). - %% true at accepted, pid() at okay/reopen + %% true at accepted/remove, pid() at okay/reopen %% Record representing a Peer State Machine processes implemented by %% diameter_peer_fsm. @@ -143,7 +145,7 @@ apps :: match([{0..16#FFFFFFFF, diameter:app_alias()}] %% {Id, Alias} | [diameter:app_alias()]), %% remote caps :: match(#diameter_caps{}), - started = now(), %% at process start or sharing + started = diameter_lib:now(), %% at process start or sharing watchdog :: match(pid() %% key into watchdogT | undefined)}). %% undefined if remote @@ -217,6 +219,29 @@ lookup_state(SvcName) -> end. %% --------------------------------------------------------------------------- +%% # peer_info/2 +%% --------------------------------------------------------------------------- + +%% An extended version of info_peer/1 for peer_info/1. +peer_info(Pid) -> + try + {_, PD} = process_info(Pid, dictionary), + {_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD), + {TPid, {{Type, Ref}, TMod, Cfg}} = T, + {_, TD} = process_info(TPid, dictionary), + {_, Data} = lists:keyfind({TMod, info}, 1, TD), + [{ref, Ref}, + {type, Type}, + {owner, TPid}, + {module, TMod}, + {config, Cfg} + | try TMod:info(Data) catch _:_ -> [] end] + catch + error:_ -> + [] + end. + +%% --------------------------------------------------------------------------- %% # subscribe/1 %% # unsubscribe/1 %% --------------------------------------------------------------------------- @@ -225,7 +250,7 @@ subscribe(SvcName) -> diameter_reg:add({?MODULE, subscriber, SvcName}). unsubscribe(SvcName) -> - diameter_reg:del({?MODULE, subscriber, SvcName}). + diameter_reg:remove({?MODULE, subscriber, SvcName}). subscriptions(Pat) -> pmap(diameter_reg:match({?MODULE, subscriber, Pat})). @@ -651,25 +676,34 @@ mod_state(Alias, ModS) -> %% remove_transport shutdown(Refs, #state{watchdogT = WatchdogT}) when is_list(Refs) -> - ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, WatchdogT); + ets:insert(WatchdogT, ets:foldl(fun(R,A) -> st(R, Refs, A) end, + [], + WatchdogT)); %% application/service shutdown shutdown(Reason, #state{watchdogT = WatchdogT}) when Reason == application; Reason == service -> - diameter_lib:wait(ets:foldl(fun(P,A) -> st(P, Reason, A) end, + diameter_lib:wait(ets:foldl(fun(P,A) -> ss(P, Reason, A) end, [], WatchdogT)). -%% st/2 +%% st/3 -st(#watchdog{ref = Ref, pid = Pid}, Refs) -> - lists:member(Ref, Refs) - andalso (Pid ! {shutdown, self(), transport}). %% 'DOWN' cleans up +%% Mark replacement as started so that a subsequent accept doesn't +%% result in a new process that isn't terminated. +st(#watchdog{ref = Ref, pid = Pid, peer = P} = Rec, Refs, Acc) -> + case lists:member(Ref, Refs) of + true -> + Pid ! {shutdown, self(), transport}, %% 'DOWN' cleans up + [Rec#watchdog{peer = true} || P == false] ++ Acc; + false -> + Acc + end. -%% st/3 +%% ss/3 -st(#watchdog{pid = Pid}, Reason, Acc) -> +ss(#watchdog{pid = Pid}, Reason, Acc) -> MRef = monitor(process, Pid), Pid ! {shutdown, self(), Reason}, [MRef | Acc]. @@ -949,11 +983,22 @@ ms(_, Svc) -> %% --------------------------------------------------------------------------- accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) -> - #watchdog{ref = Ref, type = accept = T, peer = false, options = Opts} + #watchdog{type = accept = T, peer = P} = Wd = fetch(WatchdogT, Pid), - ets:insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement started - start(Ref, T, Opts, S). %% start new watchdog + if not P -> + #watchdog{ref = Ref, options = Opts} = Wd, + %% Mark replacement started, and start new watchdog. + ets:insert(WatchdogT, Wd#watchdog{peer = true}), + start(Ref, T, Opts, S); + P -> + %% Transport removal in progress: true has been set in + %% shutdown/2, and the transport will die as a + %% consequence. + ok + end. + +%% fetch/2 fetch(Tid, Key) -> [T] = ets:lookup(Tid, Key), @@ -1278,7 +1323,7 @@ connect_timer(Opts, Def0) -> %% continuous restarted in case of faulty config or other problems. tc(Time, Tc) -> choose(Tc > ?RESTART_TC - orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC, + orelse diameter_lib:micro_diff(Time) > 1000*?RESTART_TC, Tc, ?RESTART_TC). @@ -1292,8 +1337,7 @@ start_tc(Tc, T, _) -> tc_timeout({Ref, _Type, _Opts} = T, #state{service_name = SvcName} = S) -> tc(diameter_config:have_transport(SvcName, Ref), T, S). -tc(true, {Ref, Type, Opts}, #state{service_name = SvcName} - = S) -> +tc(true, {Ref, Type, Opts}, #state{service_name = SvcName} = S) -> send_event(SvcName, {reconnect, Ref, Opts}), start(Ref, Type, Opts, S); tc(false = No, _, _) -> %% removed diff --git a/lib/diameter/src/base/diameter_service_sup.erl b/lib/diameter/src/base/diameter_service_sup.erl index e3177f0083..369e403fff 100644 --- a/lib/diameter/src/base/diameter_service_sup.erl +++ b/lib/diameter/src/base/diameter_service_sup.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_session.erl b/lib/diameter/src/base/diameter_session.erl index 3b236f109a..d854bc36a5 100644 --- a/lib/diameter/src/base/diameter_session.erl +++ b/lib/diameter/src/base/diameter_session.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2012. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -157,10 +158,9 @@ session_id(Host) -> %% --------------------------------------------------------------------------- init() -> - Now = now(), - random:seed(Now), + Now = diameter_lib:timestamp(), Time = time32(Now), - Seq = (?INT32 band (Time bsl 20)) bor (random:uniform(1 bsl 20) - 1), + Seq = (?INT32 band (Time bsl 20)) bor (rand:uniform(1 bsl 20) - 1), ets:insert(diameter_sequence, [{origin_state_id, Time}, {session_base, Time bsl 32}, {sequence, Seq}]), diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl index c4526d3a08..8c10464e98 100644 --- a/lib/diameter/src/base/diameter_stats.erl +++ b/lib/diameter/src/base/diameter_stats.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -57,7 +58,7 @@ -define(SERVER, ?MODULE). %% Server state. --record(state, {id = now()}). +-record(state, {id = diameter_lib:now()}). -type counter() :: any(). -type ref() :: any(). diff --git a/lib/diameter/src/base/diameter_sup.erl b/lib/diameter/src/base/diameter_sup.erl index 4ede4086d8..482289cb9a 100644 --- a/lib/diameter/src/base/diameter_sup.erl +++ b/lib/diameter/src/base/diameter_sup.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -33,6 +34,7 @@ -export([init/1]). -define(CHILDREN, [diameter_misc_sup, + diameter_config_sup, diameter_watchdog_sup, diameter_peer_fsm_sup, diameter_transport_sup, diff --git a/lib/diameter/src/base/diameter_sync.erl b/lib/diameter/src/base/diameter_sync.erl index cee06b9e96..7fb6888e21 100644 --- a/lib/diameter/src/base/diameter_sync.erl +++ b/lib/diameter/src/base/diameter_sync.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -69,7 +70,7 @@ %% Server state. -record(state, - {time = now(), + {time = diameter_lib:now(), pending = 0 :: non_neg_integer(), %% outstanding requests monitor = new() :: ets:tid(), %% MonitorRef -> {Name, From} queue = new() :: ets:tid()}). %% Name -> queue of {Pid, Ref} diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index 5d39c08213..2112941d5e 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2013-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -97,9 +98,6 @@ %% # make_recvdata/1 %% --------------------------------------------------------------------------- -make_recvdata([SvcName, PeerT, Apps, {_,_} = Mask | _]) -> %% from old code - make_recvdata([SvcName, PeerT, Apps, [{sequence, Mask}]]); - make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) -> {_,_} = Mask = proplists:get_value(sequence, SvcOpts), #recvdata{service_name = SvcName, @@ -232,7 +230,15 @@ pending(TPids) -> %% used to come through the service process but this avoids that %% becoming a bottleneck. -receive_message(TPid, Pkt, Dict0, RecvData) +receive_message(TPid, {Pkt, NPid}, Dict0, RecvData) -> + NPid ! {diameter, incoming(TPid, Pkt, Dict0, RecvData)}; + +receive_message(TPid, Pkt, Dict0, RecvData) -> + incoming(TPid, Pkt, Dict0, RecvData). + +%% incoming/4 + +incoming(TPid, Pkt, Dict0, RecvData) when is_pid(TPid) -> #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, recv(R, @@ -246,11 +252,18 @@ receive_message(TPid, Pkt, Dict0, RecvData) %% Incoming request ... recv(true, false, TPid, Pkt, Dict0, T) -> - spawn_request(TPid, Pkt, Dict0, T); + try + {request, spawn_request(TPid, Pkt, Dict0, T)} + catch + error: system_limit = E -> %% discard + ?LOG(error, E), + discard + end; %% ... answer to known request ... recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> - Pid ! {answer, Ref, Req, Dict0, Pkt}; + Pid ! {answer, Ref, Req, Dict0, Pkt}, + {answer, Pid}; %% Note that failover could have happened prior to this message being %% received and triggering failback. That is, both a failover message @@ -265,7 +278,7 @@ recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> recv(false, false, TPid, Pkt, _, _) -> ?LOG(discarded, Pkt#diameter_packet.header), incr(TPid, {{unknown, 0}, recv, discarded}), - ok. + discard. %% spawn_request/4 @@ -275,12 +288,7 @@ spawn_request(TPid, Pkt, Dict0, RecvData) -> spawn_request(TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData). spawn_request(TPid, Pkt, Dict0, Opts, RecvData) -> - try - spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts) - catch - error: system_limit = E -> %% discard - ?LOG(error, E) - end. + spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts). %% --------------------------------------------------------------------------- %% recv_request/4 @@ -302,13 +310,7 @@ recv_request(TPid, RecvData), TPid, Dict0, - RecvData); - -recv_request(TPid, Pkt, Dict0, RecvData) -> %% from old code - recv_request(TPid, - Pkt, - Dict0, - #recvdata{} = erlang:append_element(RecvData, [])). + RecvData). %% recv_R/5 @@ -1641,9 +1643,6 @@ pick_peer(SvcName, Filter, Xtra})). -pick({{_,_,_} = Transport, Mask}) -> %% from old code; dialyzer complains - {Transport, Mask, []}; %% about this - pick(false) -> {error, no_connection}; diff --git a/lib/diameter/src/base/diameter_types.erl b/lib/diameter/src/base/diameter_types.erl index 87a0f0663d..6ecf385239 100644 --- a/lib/diameter/src/base/diameter_types.erl +++ b/lib/diameter/src/base/diameter_types.erl @@ -3,16 +3,17 @@ %% %% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 26bca7a5bc..2ba60a65fb 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% @@ -66,7 +67,7 @@ parent = self() :: pid(), %% service process transport :: pid() | undefined, %% peer_fsm process tref :: reference() %% reference for current watchdog timer - | tuple() %% now() + | integer() %% monotonic time | undefined, dictionary :: module(), %% common dictionary receive_data :: term(), @@ -124,7 +125,6 @@ i({Ack, T, Pid, {RecvData, = Svc}}) -> monitor(process, Pid), wait(Ack, Pid), - random:seed(now()), putr(restart, {T, Opts, Svc, SvcOpts}), %% save seeing it in trace putr(dwr, dwr(Caps)), %% {_,_} = Mask = proplists:get_value(sequence, SvcOpts), @@ -447,8 +447,14 @@ transition({'DOWN', _, process, TPid, _Reason} = D, end; %% Incoming message. -transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) -> - recv(Name, Pkt, S); +transition({recv, TPid, Name, PktT}, #watchdog{transport = TPid} = S) -> + try + incoming(Name, PktT, S) + catch + #watchdog{dictionary = Dict0, receive_data = T} = NS -> + diameter_traffic:receive_message(TPid, PktT, Dict0, T), + NS + end; %% Current watchdog has timed out. transition({timeout, TRef, tw}, #watchdog{tref = TRef} = S) -> @@ -457,7 +463,7 @@ transition({timeout, TRef, tw}, #watchdog{tref = TRef} = S) -> %% Message has arrived since the timer was started: subtract time %% already elapsed from new timer. transition({timeout, _, tw}, #watchdog{tref = T0} = S) -> - set_watchdog(timer:now_diff(now(), T0) div 1000, S); + set_watchdog(diameter_lib:micro_diff(T0) div 1000, S); %% State query. transition({state, Pid}, #watchdog{status = S}) -> @@ -539,7 +545,7 @@ set_watchdog(#watchdog{tref = undefined} = S) -> %% Timer already set: start at new one only at expiry. set_watchdog(#watchdog{} = S) -> - S#watchdog{tref = now()}. + S#watchdog{tref = diameter_lib:now()}. %% set_watchdog/2 @@ -557,7 +563,7 @@ tw(TwInit, Ms) -> tw(T) when is_integer(T), T >= 6000 -> - T - 2000 + (random:uniform(4001) - 1); %% RFC3539 jitter of +/- 2 sec. + T - 2000 + (rand:uniform(4001) - 1); %% RFC3539 jitter of +/- 2 sec. tw({M,F,A}) -> apply(M,F,A). @@ -576,22 +582,32 @@ send_watchdog(#watchdog{pending = false, %% Don't count encode errors since we don't expect any on DWR/DWA. +%% incoming/3 + +incoming(Name, {Pkt, NPid}, S) -> + NS = recv(Name, Pkt, S), + NPid ! {diameter, discard}, + NS; + +incoming(Name, Pkt, S) -> + recv(Name, Pkt, S). + %% recv/3 recv(Name, Pkt, S) -> - try rcv(Name, S) of + try rcv(Name, Pkt, rcv(Name, S)) of #watchdog{} = NS -> - rcv(Name, Pkt, S), - NS + throw(NS) catch - {?MODULE, throwaway, #watchdog{} = NS} -> + #watchdog{} = NS -> %% throwaway NS end. %% rcv/3 rcv('DWR', Pkt, #watchdog{transport = TPid, - dictionary = Dict0}) -> + dictionary = Dict0} + = S) -> ?LOG(recv, 'DWR'), DPkt = diameter_codec:decode(Dict0, Pkt), diameter_traffic:incr(recv, DPkt, TPid, Dict0), @@ -608,32 +624,30 @@ rcv('DWR', Pkt, #watchdog{transport = TPid, send(TPid, {send, #diameter_packet{header = H, transport_data = T, bin = Bin}}), - ?LOG(send, 'DWA'); + ?LOG(send, 'DWA'), + throw(S); rcv('DWA', Pkt, #watchdog{transport = TPid, - dictionary = Dict0}) -> + dictionary = Dict0} + = S) -> ?LOG(recv, 'DWA'), diameter_traffic:incr(recv, Pkt, TPid, Dict0), diameter_traffic:incr_rc(recv, diameter_codec:decode(Dict0, Pkt), TPid, - Dict0); + Dict0), + throw(S); -rcv(N, _, _) +rcv(N, _, S) when N == 'CER'; N == 'CEA'; N == 'DPR' -> - false; + throw(S); %% DPR can be sent explicitly with diameter:call/4. Only the %% corresponding DPAs arrive here. -rcv(_, Pkt, #watchdog{transport = TPid, - dictionary = Dict0, - receive_data = T}) -> - diameter_traffic:receive_message(TPid, Pkt, Dict0, T). - -throwaway(S) -> - throw({?MODULE, throwaway, S}). +rcv(_, _, S)-> + S. %% rcv/2 %% @@ -650,20 +664,20 @@ throwaway(S) -> %% INITIAL Receive non-DWA Throwaway() INITIAL rcv('DWA', #watchdog{status = initial} = S) -> - throwaway(S#watchdog{pending = false}); + throw(S#watchdog{pending = false}); rcv(_, #watchdog{status = initial} = S) -> - throwaway(S); + throw(S); %% DOWN Receive DWA Pending = FALSE %% Throwaway() DOWN %% DOWN Receive non-DWA Throwaway() DOWN rcv('DWA', #watchdog{status = down} = S) -> - throwaway(S#watchdog{pending = false}); + throw(S#watchdog{pending = false}); rcv(_, #watchdog{status = down} = S) -> - throwaway(S); + throw(S); %% OKAY Receive DWA Pending = FALSE %% SetWatchdog() OKAY @@ -719,7 +733,7 @@ rcv('DWR', #watchdog{status = reopen} = S) -> S; %% ensure DWA: the RFC isn't explicit about answering rcv(_, #watchdog{status = reopen} = S) -> - throwaway(S). + throw(S). %% timeout/1 %% @@ -826,9 +840,6 @@ restart(S) -> %% reconnect has won race with timeout %% state down rather then initial when receiving notification of an %% open connection. -restart({T, Opts, Svc}, S) -> %% put in old code - restart({T, Opts, Svc, []}, S); - restart({{connect, _} = T, Opts, Svc, SvcOpts}, #watchdog{parent = Pid, restrict = {R,_}, @@ -843,7 +854,7 @@ restart({{connect, _} = T, Opts, Svc, SvcOpts}, %% die. Note that a state machine never enters state REOPEN in this %% case. restart({{accept, _}, _, _, _}, #watchdog{restrict = {_, false}}) -> - stop; %% 'DOWN' was in old code: 'close' was not sent + stop; %% Otherwise hang around until told to die, either by the service or %% by another watchdog. diff --git a/lib/diameter/src/base/diameter_watchdog_sup.erl b/lib/diameter/src/base/diameter_watchdog_sup.erl index fc837fe4ef..7b6669f381 100644 --- a/lib/diameter/src/base/diameter_watchdog_sup.erl +++ b/lib/diameter/src/base/diameter_watchdog_sup.erl @@ -1,18 +1,19 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. -%% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% %% %CopyrightEnd% %% |
