%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2007-2010. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%% @doc Percept database.
%%
%%
-module(percept_db).
-export([
start/0,
stop/0,
insert/1,
select/2,
select/1,
consolidate/0
]).
-include("percept.hrl").
-define(STOP_TIMEOUT, 1000).
%%==========================================================================
%%
%% Type definitions
%%
%%==========================================================================
%% @type activity_option() =
%% {ts_min, timestamp()} |
%% {ts_max, timestamp()} |
%% {ts_exact, bool()} |
%% {mfa, {atom(), atom(), byte()}} |
%% {state, active | inactive} |
%% {id, all | procs | ports | pid() | port()}
%% @type scheduler_option() =
%% {ts_min, timestamp()} |
%% {ts_max, timestamp()} |
%% {ts_exact, bool()} |
%% {id, scheduler_id()}
%% @type system_option() = start_ts | stop_ts
%% @type information_option() =
%% all | procs | ports | pid() | port()
%%==========================================================================
%%
%% Interface functions
%%
%%==========================================================================
%% @spec start() -> ok | {started, Pid} | {restarted, Pid}
%% Pid = pid()
%% @doc Starts or restarts the percept database.
-spec start() -> {'started', pid()} | {'restarted', pid()}.
start() ->
case erlang:whereis(percept_db) of
undefined ->
{started, do_start()};
PerceptDB ->
{restarted, restart(PerceptDB)}
end.
%% @spec restart(pid()) -> pid()
%% @private
%% @doc restarts the percept database.
-spec restart(pid())-> pid().
restart(PerceptDB)->
stop_sync(PerceptDB),
do_start().
%% @spec do_start() -> pid()
%% @private
%% @doc starts the percept database.
-spec do_start()-> pid().
do_start()->
Pid = spawn( fun() -> init_percept_db() end),
erlang:register(percept_db, Pid),
Pid.
%% @spec stop() -> not_started | {stopped, Pid}
%% Pid = pid()
%% @doc Stops the percept database.
-spec stop() -> 'not_started' | {'stopped', pid()}.
stop() ->
case erlang:whereis(percept_db) of
undefined ->
not_started;
Pid ->
Pid ! {action, stop},
{stopped, Pid}
end.
%% @spec stop_sync(pid()) -> true
%% @private
%% @doc Stops the percept database, with a synchronous call.
-spec stop_sync(pid())-> true.
stop_sync(Pid)->
MonitorRef = erlang:monitor(process, Pid),
stop(),
receive
{'DOWN', MonitorRef, _Type, Pid, _Info}->
true
after ?STOP_TIMEOUT->
erlang:demonitor(MonitorRef, [flush]),
exit(Pid, kill)
end.
%% @spec insert(tuple()) -> ok
%% @doc Inserts a trace or profile message to the database.
insert(Trace) ->
percept_db ! {insert, Trace},
ok.
%% @spec select({atom(), Options}) -> Result
%% @doc Synchronous call. Selects information based on a query.
%%
%% <p>Queries:</p>
%% <pre>
%% {system, Option}
%% Option = system_option()
%% Result = timestamp()
%% {information, Options}
%% Options = [information_option()]
%% Result = [#information{}]
%% {scheduler, Options}
%% Options = [sceduler_option()]
%% Result = [#activity{}]
%% {activity, Options}
%% Options = [activity_option()]
%% Result = [#activity{}]
%% </pre>
%% <p>
%% Note: selection of Id's are always OR all other options are considered AND.
%% </p>
select(Query) ->
percept_db ! {select, self(), Query},
receive {result, Match} -> Match end.
%% @spec select(atom(), list()) -> Result
%% @equiv select({Table,Options})
select(Table, Options) ->
percept_db ! {select, self(), {Table, Options}},
receive {result, Match} -> Match end.
%% @spec consolidate() -> Result
%% @doc Checks timestamp and state-flow inconsistencies in the
%% the database.
consolidate() ->
percept_db ! {action, consolidate},
ok.
%%==========================================================================
%%
%% Database loop
%%
%%==========================================================================
init_percept_db() ->
% Proc and Port information
ets:new(pdb_info, [named_table, private, {keypos, #information.id}, set]),
% Scheduler runnability
ets:new(pdb_scheduler, [named_table, private, {keypos, #activity.timestamp}, ordered_set]),
% Process and Port runnability
ets:new(pdb_activity, [named_table, private, {keypos, #activity.timestamp}, ordered_set]),
% System status
ets:new(pdb_system, [named_table, private, {keypos, 1}, set]),
% System warnings
ets:new(pdb_warnings, [named_table, private, {keypos, 1}, ordered_set]),
put(debug, 0),
loop_percept_db().
loop_percept_db() ->
receive
{insert, Trace} ->
insert_trace(clean_trace(Trace)),
loop_percept_db();
{select, Pid, Query} ->
Pid ! {result, select_query(Query)},
loop_percept_db();
{action, stop} ->
stopped;
{action, consolidate} ->
consolidate_db(),
loop_percept_db();
{operate, Pid, {Table, {Fun, Start}}} ->
Result = ets:foldl(Fun, Start, Table),
Pid ! {result, Result},
loop_percept_db();
Unhandled ->
io:format("loop_percept_db, unhandled query: ~p~n", [Unhandled]),
loop_percept_db()
end.
%%==========================================================================
%%
%% Auxiliary functions
%%
%%==========================================================================
%% cleans trace messages from external pids
clean_trace(Trace) when is_tuple(Trace) -> list_to_tuple(clean_trace(tuple_to_list(Trace)));
clean_trace(Trace) when is_list(Trace) -> clean_list(Trace, []);
clean_trace(Trace) when is_pid(Trace) ->
PidStr = pid_to_list(Trace),
[_,P2,P3p] = string:tokens(PidStr,"."),
P3 = lists:sublist(P3p, 1, length(P3p) - 1),
erlang:list_to_pid("<0." ++ P2 ++ "." ++ P3 ++ ">");
clean_trace(Trace) -> Trace.
clean_list([], Out) -> lists:reverse(Out);
clean_list([Element|Trace], Out) ->
clean_list(Trace, [clean_trace(Element)|Out]).
insert_trace(Trace) ->
case Trace of
{profile_start, Ts} ->
update_system_start_ts(Ts),
ok;
{profile_stop, Ts} ->
update_system_stop_ts(Ts),
ok;
%%% erlang:system_profile, option: runnable_procs
%%% ---------------------------------------------
{profile, Id, State, Mfa, TS} when is_pid(Id) ->
% Update runnable count in activity and db
case check_activity_consistency(Id, State) of
invalid_state ->
ignored;
ok ->
Rc = get_runnable_count(procs, State),
% Update registered procs
% insert proc activity
update_activity(#activity{
id = Id,
state = State,
timestamp = TS,
runnable_count = Rc,
where = Mfa}),
ok
end;
%%% erlang:system_profile, option: runnable_ports
%%% ---------------------------------------------
{profile, Id, State, Mfa, TS} when is_port(Id) ->
case check_activity_consistency(Id, State) of
invalid_state ->
ignored;
ok ->
% Update runnable count in activity and db
Rc = get_runnable_count(ports, State),
% Update registered ports
% insert port activity
update_activity(#activity{
id = Id,
state = State,
timestamp = TS,
runnable_count = Rc,
where = Mfa}),
ok
end;
%%% erlang:system_profile, option: scheduler
{profile, scheduler, Id, State, Scheds, Ts} ->
% insert scheduler activity
update_scheduler(#activity{
id = {scheduler, Id},
state = State,
timestamp = Ts,
where = Scheds}),
ok;
%%% erlang:trace, option: procs
%%% ---------------------------
{trace_ts, Parent, spawn, Pid, Mfa, TS} ->
InformativeMfa = mfa2informative(Mfa),
% Update id_information
update_information(#information{id = Pid, start = TS, parent = Parent, entry = InformativeMfa}),
update_information_child(Parent, Pid),
ok;
{trace_ts, Pid, exit, _Reason, TS} ->
% Update registered procs
% Update id_information
update_information(#information{id = Pid, stop = TS}),
ok;
{trace_ts, Pid, register, Name, _Ts} when is_pid(Pid) ->
% Update id_information
update_information(#information{id = Pid, name = Name}),
ok;
{trace_ts, Pid, register, Name, _Ts} when is_pid(Pid) ->
% Update id_information
update_information(#information{id = Pid, name = Name}),
ok;
{trace_ts, _Pid, unregister, _Name, _Ts} ->
% Not implemented
ok;
{trace_ts, Pid, getting_unlinked, _Id, _Ts} when is_pid(Pid) ->
% Update id_information
ok;
{trace_ts, Pid, getting_linked, _Id, _Ts} when is_pid(Pid)->
% Update id_information
ok;
{trace_ts, Pid, link, _Id, _Ts} when is_pid(Pid)->
% Update id_information
ok;
{trace_ts, Pid, unlink, _Id, _Ts} when is_pid(Pid) ->
% Update id_information
ok;
%%% erlang:trace, option: ports
%%% ----------------------------
{trace_ts, Caller, open, Port, Driver, TS} ->
% Update id_information
update_information(#information{
id = Port, entry = Driver, start = TS, parent = Caller}),
ok;
{trace_ts, Port, closed, _Reason, Ts} ->
% Update id_information
update_information(#information{id = Port, stop = Ts}),
ok;
Unhandled ->
io:format("insert_trace, unhandled: ~p~n", [Unhandled])
end.
mfa2informative({erlang, apply, [M, F, Args]}) -> mfa2informative({M, F,Args});
mfa2informative({erlang, apply, [Fun, Args]}) ->
FunInfo = erlang:fun_info(Fun),
M = case proplists:get_value(module, FunInfo, undefined) of
[] -> undefined_fun_module;
undefined -> undefined_fun_module;
Module -> Module
end,
F = case proplists:get_value(name, FunInfo, undefined) of
[] -> undefined_fun_function;
undefined -> undefined_fun_function;
Function -> Function
end,
mfa2informative({M, F, Args});
mfa2informative(Mfa) -> Mfa.
%% consolidate_db() -> bool()
%% Purpose:
%% Check start/stop time
%% Activity consistency
consolidate_db() ->
io:format("Consolidating...~n"),
% Check start/stop timestamps
case select_query({system, start_ts}) of
undefined ->
Min = lists:min(list_all_ts()),
update_system_start_ts(Min);
_ -> ok
end,
case select_query({system, stop_ts}) of
undefined ->
Max = lists:max(list_all_ts()),
update_system_stop_ts(Max);
_ -> ok
end,
consolidate_runnability(),
ok.
consolidate_runnability() ->
put({runnable, procs}, undefined),
put({runnable, ports}, undefined),
consolidate_runnability_loop(ets:first(pdb_activity)).
consolidate_runnability_loop('$end_of_table') -> ok;
consolidate_runnability_loop(Key) ->
case ets:lookup(pdb_activity, Key) of
[#activity{id = Id, state = State } = A] when is_pid(Id) ->
Rc = get_runnable_count(procs, State),
ets:insert(pdb_activity, A#activity{ runnable_count = Rc});
[#activity{id = Id, state = State } = A] when is_port(Id) ->
Rc = get_runnable_count(ports, State),
ets:insert(pdb_activity, A#activity{ runnable_count = Rc});
_ -> throw(consolidate)
end,
consolidate_runnability_loop(ets:next(pdb_activity, Key)).
list_all_ts() ->
ATs = [Act#activity.timestamp || Act <- select_query({activity, []})],
STs = [Act#activity.timestamp || Act <- select_query({scheduler, []})],
ITs = lists:flatten([
[I#information.start,
I#information.stop] ||
I <- select_query({information, all})]),
%% Filter out all undefined (non ts)
[Elem || Elem = {_,_,_} <- ATs ++ STs ++ ITs].
%% get_runnable_count(Type, State) -> RunnableCount
%% In:
%% Type = procs | ports
%% State = active | inactive
%% Out:
%% RunnableCount = integer()
%% Purpose:
%% Keep track of the number of runnable ports and processes
%% during the profile duration.
get_runnable_count(Type, State) ->
case {get({runnable, Type}), State} of
{undefined, active} ->
put({runnable, Type}, 1),
1;
{N, active} ->
put({runnable, Type}, N + 1),
N + 1;
{N, inactive} ->
put({runnable, Type}, N - 1),
N - 1;
Unhandled ->
io:format("get_runnable_count, unhandled ~p~n", [Unhandled]),
Unhandled
end.
check_activity_consistency(Id, State) ->
case get({previous_state, Id}) of
State ->
io:format("check_activity_consistency, state flow invalid.~n"),
invalid_state;
undefined when State == inactive ->
invalid_state;
_ ->
put({previous_state, Id}, State),
ok
end.
%%%
%%% select_query
%%% In:
%%% Query = {Table, Option}
%%% Table = system | activity | scheduler | information
select_query(Query) ->
case Query of
{system, _ } ->
select_query_system(Query);
{activity, _ } ->
select_query_activity(Query);
{scheduler, _} ->
select_query_scheduler(Query);
{information, _ } ->
select_query_information(Query);
Unhandled ->
io:format("select_query, unhandled: ~p~n", [Unhandled]),
[]
end.
%%% select_query_information
select_query_information(Query) ->
case Query of
{information, all} ->
ets:select(pdb_info, [{
#information{ _ = '_'},
[],
['$_']
}]);
{information, procs} ->
ets:select(pdb_info, [{
#information{ id = '$1', _ = '_'},
[{is_pid, '$1'}],
['$_']
}]);
{information, ports} ->
ets:select(pdb_info, [{
#information{ id = '$1', _ = '_'},
[{is_port, '$1'}],
['$_']
}]);
{information, Id} when is_port(Id) ; is_pid(Id) ->
ets:select(pdb_info, [{
#information{ id = Id, _ = '_'},
[],
['$_']
}]);
Unhandled ->
io:format("select_query_information, unhandled: ~p~n", [Unhandled]),
[]
end.
%%% select_query_scheduler
select_query_scheduler(Query) ->
case Query of
{scheduler, Options} when is_list(Options) ->
Head = #activity{
timestamp = '$1',
id = '$2',
state = '$3',
where = '$4',
_ = '_'},
Body = ['$_'],
% We don't need id's
{Constraints, _ } = activity_ms_and(Head, Options, [], []),
ets:select(pdb_scheduler, [{Head, Constraints, Body}]);
Unhandled ->
io:format("select_query_scheduler, unhandled: ~p~n", [Unhandled]),
[]
end.
%%% select_query_system
select_query_system(Query) ->
case Query of
{system, start_ts} ->
case ets:lookup(pdb_system, {system, start_ts}) of
[] -> undefined;
[{{system, start_ts}, StartTS}] -> StartTS
end;
{system, stop_ts} ->
case ets:lookup(pdb_system, {system, stop_ts}) of
[] -> undefined;
[{{system, stop_ts}, StopTS}] -> StopTS
end;
Unhandled ->
io:format("select_query_system, unhandled: ~p~n", [Unhandled]),
[]
end.
%%% select_query_activity
select_query_activity(Query) ->
case Query of
{activity, Options} when is_list(Options) ->
case lists:member({ts_exact, true},Options) of
true ->
case catch select_query_activity_exact_ts(Options) of
{'EXIT', Reason} ->
io:format(" - select_query_activity [ catch! ]: ~p~n", [Reason]),
[];
Match ->
Match
end;
false ->
MS = activity_ms(Options),
case catch ets:select(pdb_activity, MS) of
{'EXIT', Reason} ->
io:format(" - select_query_activity [ catch! ]: ~p~n", [Reason]),
[];
Match ->
Match
end
end;
Unhandled ->
io:format("select_query_activity, unhandled: ~p~n", [Unhandled]),
[]
end.
select_query_activity_exact_ts(Options) ->
case { proplists:get_value(ts_min, Options, undefined), proplists:get_value(ts_max, Options, undefined) } of
{undefined, undefined} -> [];
{undefined, _ } -> [];
{_ , undefined} -> [];
{TsMin , TsMax } ->
% Remove unwanted options
Opts = lists_filter([ts_exact], Options),
Ms = activity_ms(Opts),
case ets:select(pdb_activity, Ms) of
% no entries within interval
[] ->
Opts2 = lists_filter([ts_max, ts_min], Opts) ++ [{ts_min, TsMax}],
Ms2 = activity_ms(Opts2),
case ets:select(pdb_activity, Ms2, 1) of
'$end_of_table' -> [];
{[E], _} ->
[PrevAct] = ets:lookup(pdb_activity, ets:prev(pdb_activity, E#activity.timestamp)),
[PrevAct#activity{ timestamp = TsMin} , E]
end;
Acts ->
[Head| _] = Acts,
if
Head#activity.timestamp == TsMin -> Acts;
true ->
PrevTs = ets:prev(pdb_activity, Head#activity.timestamp),
case ets:lookup(pdb_activity, PrevTs) of
[] -> Acts;
[PrevAct] -> [PrevAct#activity{timestamp = TsMin}|Acts]
end
end
end
end.
lists_filter([], Options) -> Options;
lists_filter([D|Ds], Options) ->
lists_filter(Ds, lists:filter(
fun ({Pred, _}) ->
if
Pred == D -> false;
true -> true
end
end, Options)).
% Options:
% {ts_min, timestamp()}
% {ts_max, timestamp()}
% {mfa, mfa()}
% {state, active | inactive}
% {id, all | procs | ports | pid() | port()}
%
% All options are regarded as AND expect id which are regarded as OR
% For example: [{ts_min, TS1}, {ts_max, TS2}, {id, PID1}, {id, PORT1}] would be
% ({ts_min, TS1} and {ts_max, TS2} and {id, PID1}) or
% ({ts_min, TS1} and {ts_max, TS2} and {id, PORT1}).
activity_ms(Opts) ->
% {activity, Timestamp, State, Mfa}
Head = #activity{
timestamp = '$1',
id = '$2',
state = '$3',
where = '$4',
_ = '_'},
{Conditions, IDs} = activity_ms_and(Head, Opts, [], []),
Body = ['$_'],
lists:foldl(
fun (Option, MS) ->
case Option of
{id, ports} ->
[{Head, [{is_port, Head#activity.id} | Conditions], Body} | MS];
{id, procs} ->
[{Head,[{is_pid, Head#activity.id} | Conditions], Body} | MS];
{id, ID} when is_pid(ID) ; is_port(ID) ->
[{Head,[{'==', Head#activity.id, ID} | Conditions], Body} | MS];
{id, all} ->
[{Head, Conditions,Body} | MS];
_ ->
io:format("activity_ms id dropped ~p~n", [Option]),
MS
end
end, [], IDs).
activity_ms_and(_, [], Constraints, []) ->
{Constraints, [{id, all}]};
activity_ms_and(_, [], Constraints, IDs) ->
{Constraints, IDs};
activity_ms_and(Head, [Opt|Opts], Constraints, IDs) ->
case Opt of
{ts_min, Min} ->
activity_ms_and(Head, Opts,
[{'>=', Head#activity.timestamp, {Min}} | Constraints], IDs);
{ts_max, Max} ->
activity_ms_and(Head, Opts,
[{'=<', Head#activity.timestamp, {Max}} | Constraints], IDs);
{id, ID} ->
activity_ms_and(Head, Opts,
Constraints, [{id, ID} | IDs]);
{state, State} ->
activity_ms_and(Head, Opts,
[{'==', Head#activity.state, State} | Constraints], IDs);
{mfa, Mfa} ->
activity_ms_and(Head, Opts,
[{'==', Head#activity.where, {Mfa}}| Constraints], IDs);
_ ->
io:format("activity_ms_and option dropped ~p~n", [Opt]),
activity_ms_and(Head, Opts, Constraints, IDs)
end.
% Information = information()
%%%
%%% update_information
%%%
update_information(#information{id = Id} = NewInfo) ->
case ets:lookup(pdb_info, Id) of
[] ->
ets:insert(pdb_info, NewInfo),
ok;
[Info] ->
% Remake NewInfo and Info to lists then substitute
% old values for new values that are not undefined or empty lists.
{_, Result} = lists:foldl(
fun (InfoElem, {[NewInfoElem | Tail], Out}) ->
case NewInfoElem of
undefined ->
{Tail, [InfoElem | Out]};
[] ->
{Tail, [InfoElem | Out]};
NewInfoElem ->
{Tail, [NewInfoElem | Out]}
end
end, {tuple_to_list(NewInfo), []}, tuple_to_list(Info)),
ets:insert(pdb_info, list_to_tuple(lists:reverse(Result))),
ok
end.
update_information_child(Id, Child) ->
case ets:lookup(pdb_info, Id) of
[] ->
ets:insert(pdb_info,#information{
id = Id,
children = [Child]}),
ok;
[I] ->
ets:insert(pdb_info,I#information{children = [Child | I#information.children]}),
ok
end.
%%%
%%% update_activity
%%%
update_scheduler(Activity) ->
ets:insert(pdb_scheduler, Activity).
update_activity(Activity) ->
ets:insert(pdb_activity, Activity).
%%%
%%% update_system_ts
%%%
update_system_start_ts(TS) ->
case ets:lookup(pdb_system, {system, start_ts}) of
[] ->
ets:insert(pdb_system, {{system, start_ts}, TS});
[{{system, start_ts}, StartTS}] ->
DT = ?seconds(StartTS, TS),
if
DT > 0.0 -> ets:insert(pdb_system, {{system, start_ts}, TS});
true -> ok
end;
Unhandled ->
io:format("update_system_start_ts, unhandled ~p ~n", [Unhandled])
end.
update_system_stop_ts(TS) ->
case ets:lookup(pdb_system, {system, stop_ts}) of
[] ->
ets:insert(pdb_system, {{system, stop_ts}, TS});
[{{system, stop_ts}, StopTS}] ->
DT = ?seconds(StopTS, TS),
if
DT < 0.0 -> ets:insert(pdb_system, {{system, stop_ts}, TS});
true -> ok
end;
Unhandled ->
io:format("update_system_stop_ts, unhandled ~p ~n", [Unhandled])
end.