%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2010-2011. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. %% %% %CopyrightEnd% %% %% %% Statistics collector. %% -module(diameter_stats). -compile({no_auto_import, [monitor/2]}). -behaviour(gen_server). -export([reg/1, reg/2, incr/1, incr/2, incr/3, read/1, flush/0, flush/1]). %% supervisor callback -export([start_link/0]). %% gen_server callbacks -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]). %% debug -export([state/0, uptime/0]). -include("diameter_internal.hrl"). %% ets table containing stats. reg(Pid, Ref) inserts a {Pid, Ref}, %% incr(Counter, X, N) updates the counter keyed at {Counter, X}, and %% Pid death causes counters keyed on {Counter, Pid} to be deleted and %% added to those keyed on {Counter, Ref}. -define(TABLE, ?MODULE). %% Name of registered server. -define(SERVER, ?MODULE). %% Entries in the table. -define(REC(Key, Value), {Key, Value}). %% Server state. -record(state, {id = now()}). -type counter() :: any(). -type contrib() :: any(). %%% --------------------------------------------------------------------------- %%% # reg(Pid, Contrib) %%% %%% Description: Register a process as a contributor of statistics %%% associated with a specified term. Statistics can be %%% contributed by specifying either Pid or Contrib as %%% the second argument to incr/3. Statistics contributed %%% by Pid are folded into the corresponding entry for %%% Contrib when the process dies. %%% %%% Contrib can be any term but should not be a pid %%% passed as the first argument to reg/2. Subsequent %%% registrations for the same Pid overwrite the association %%% --------------------------------------------------------------------------- -spec reg(pid(), contrib()) -> true. reg(Pid, Contrib) when is_pid(Pid) -> call({reg, Pid, Contrib}). -spec reg(contrib()) -> true. reg(Ref) -> reg(self(), Ref). %%% --------------------------------------------------------------------------- %%% # incr(Counter, Contrib, N) %%% %%% Description: Increment a counter for the specified contributor. %%% %%% Contrib will typically be an argument passed to reg/2 %%% but there's nothing that requires this. In particular, %%% if Contrib is a pid that hasn't been registered then %%% counters are unaffected by the death of the process. %%% --------------------------------------------------------------------------- -spec incr(counter(), contrib(), integer()) -> integer(). incr(Ctr, Contrib, N) -> update_counter({Ctr, Contrib}, N). incr(Ctr, N) when is_integer(N) -> incr(Ctr, self(), N); incr(Ctr, Contrib) -> incr(Ctr, Contrib, 1). incr(Ctr) -> incr(Ctr, self(), 1). %%% --------------------------------------------------------------------------- %%% # read(Contribs) %%% %%% Description: Retrieve counters for the specified contributors. %%% --------------------------------------------------------------------------- -spec read([contrib()]) -> [{contrib(), [{counter(), integer()}]}]. read(Contribs) -> lists:foldl(fun(?REC({T,C}, N), D) -> orddict:append(C, {T,N}, D) end, orddict:new(), ets:select(?TABLE, [{?REC({'_', '$1'}, '_'), [?ORCOND([{'=:=', '$1', {const, C}} || C <- Contribs])], ['$_']}])). %%% --------------------------------------------------------------------------- %%% # flush(Contrib) %%% %%% Description: Retrieve and delete statistics for the specified %%% contributor. %%% %%% If Contrib is a pid registered with reg/2 then statistics %%% for both and its associated contributor are retrieved. %%% --------------------------------------------------------------------------- -spec flush(contrib()) -> [{counter(), integer()}]. flush(Contrib) -> try call({flush, Contrib}) catch exit: _ -> [] end. flush() -> flush(self()). %%% --------------------------------------------------------- %%% EXPORTED INTERNAL FUNCTIONS %%% --------------------------------------------------------- start_link() -> ServerName = {local, ?SERVER}, Module = ?MODULE, Args = [], Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}], gen_server:start_link(ServerName, Module, Args, Options). state() -> call(state). uptime() -> call(uptime). %%% ---------------------------------------------------------- %%% # init(_) %%% %%% Output: {ok, State} %%% ---------------------------------------------------------- init([]) -> ets:new(?TABLE, [named_table, ordered_set, public]), {ok, #state{}}. %% ---------------------------------------------------------- %% handle_call(Request, From, State) %% ---------------------------------------------------------- handle_call(state, _, State) -> {reply, State, State}; handle_call(uptime, _, #state{id = Time} = State) -> {reply, diameter_lib:now_diff(Time), State}; handle_call({reg, Pid, Contrib}, _From, State) -> monitor(not ets:member(?TABLE, Pid), Pid), {reply, insert(?REC(Pid, Contrib)), State}; handle_call({flush, Contrib}, _From, State) -> {reply, fetch(Contrib), State}; handle_call(Req, From, State) -> warning_msg("received unexpected request from ~p:~n~w", [From, Req]), {reply, nok, State}. %% ---------------------------------------------------------- %% handle_cast(Request, State) %% ---------------------------------------------------------- handle_cast({incr, Rec}, State) -> update_counter(Rec), {noreply, State}; handle_cast(Msg, State) -> warning_msg("received unexpected message:~n~w", [Msg]), {noreply, State}. %% ---------------------------------------------------------- %% handle_info(Request, State) %% ---------------------------------------------------------- handle_info({'DOWN', _MRef, process, Pid, _}, State) -> down(Pid), {noreply, State}; handle_info(Info, State) -> warning_msg("received unknown info:~n~w", [Info]), {noreply, State}. %% ---------------------------------------------------------- %% terminate(Reason, State) %% ---------------------------------------------------------- terminate(_Reason, _State) -> ok. %% ---------------------------------------------------------- %% code_change(OldVsn, State, Extra) %% ---------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}. %%% --------------------------------------------------------- %%% INTERNAL FUNCTIONS %%% --------------------------------------------------------- %% monitor/2 monitor(true, Pid) -> erlang:monitor(process, Pid); monitor(false = No, _) -> No. %% down/1 down(Pid) -> L = ets:match_object(?TABLE, ?REC({'_', Pid}, '_')), [?REC(_, Ref) = T] = lookup(Pid), fold(Ref, L), delete_object(T), delete(L). %% Fold Pid-based entries into Ref-based ones. fold(Ref, L) -> lists:foreach(fun(?REC({K, _}, V)) -> update_counter({{K, Ref}, V}) end, L). delete(Objs) -> lists:foreach(fun delete_object/1, Objs). %% fetch/1 fetch(X) -> MatchSpec = [{?REC({'_', '$1'}, '_'), [?ORCOND([{'==', '$1', {const, T}} || T <- [X | ref(X)]])], ['$_']}], L = ets:select(?TABLE, MatchSpec), delete(L), D = lists:foldl(fun sum/2, dict:new(), L), dict:to_list(D). sum({{Ctr, _}, N}, Dict) -> dict:update(Ctr, fun(V) -> V+N end, N, Dict). ref(Pid) when is_pid(Pid) -> ets:select(?TABLE, [{?REC(Pid, '$1'), [], ['$1']}]); ref(_) -> []. %% update_counter/2 %% %% From an arbitrary request process. Cast to the server process to %% insert a new element if the counter doesn't exists so that two %% processes don't do so simultaneously. update_counter(Key, N) -> try ets:update_counter(?TABLE, Key, N) catch error: badarg -> cast({incr, ?REC(Key, N)}) end. %% update_counter/1 %% %% From the server process. update_counter(?REC(Key, N) = T) -> try ets:update_counter(?TABLE, Key, N) catch error: badarg -> insert(T) end. insert(T) -> ets:insert(?TABLE, T). lookup(Key) -> ets:lookup(?TABLE, Key). delete_object(T) -> ets:delete_object(?TABLE, T). %% cast/1 cast(Msg) -> gen_server:cast(?SERVER, Msg). %% call/1 call(Request) -> gen_server:call(?SERVER, Request, infinity). %% warning_msg/2 warning_msg(F, A) -> ?diameter_warning("~p: " ++ F, [?MODULE | A]).