%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2007-2010. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. %% %% %CopyrightEnd% %% %% @doc Percept database. %% %% -module(percept_db). -export([ start/0, stop/0, insert/1, select/2, select/1, consolidate/0 ]). -include("percept.hrl"). -define(STOP_TIMEOUT, 1000). %%========================================================================== %% %% Type definitions %% %%========================================================================== %% @type activity_option() = %% {ts_min, timestamp()} | %% {ts_max, timestamp()} | %% {ts_exact, bool()} | %% {mfa, {atom(), atom(), byte()}} | %% {state, active | inactive} | %% {id, all | procs | ports | pid() | port()} %% @type scheduler_option() = %% {ts_min, timestamp()} | %% {ts_max, timestamp()} | %% {ts_exact, bool()} | %% {id, scheduler_id()} %% @type system_option() = start_ts | stop_ts %% @type information_option() = %% all | procs | ports | pid() | port() %%========================================================================== %% %% Interface functions %% %%========================================================================== %% @spec start() -> ok | {started, Pid} | {restarted, Pid} %% Pid = pid() %% @doc Starts or restarts the percept database. -spec start() -> {'started', pid()} | {'restarted', pid()}. start() -> case erlang:whereis(percept_db) of undefined -> {started, do_start()}; PerceptDB -> {restarted, restart(PerceptDB)} end. %% @spec restart(pid()) -> pid() %% @private %% @doc restarts the percept database. -spec restart(pid())-> pid(). restart(PerceptDB)-> stop_sync(PerceptDB), do_start(). %% @spec do_start() -> pid() %% @private %% @doc starts the percept database. -spec do_start()-> pid(). do_start()-> Pid = spawn( fun() -> init_percept_db() end), erlang:register(percept_db, Pid), Pid. %% @spec stop() -> not_started | {stopped, Pid} %% Pid = pid() %% @doc Stops the percept database. -spec stop() -> 'not_started' | {'stopped', pid()}. stop() -> case erlang:whereis(percept_db) of undefined -> not_started; Pid -> Pid ! {action, stop}, {stopped, Pid} end. %% @spec stop_sync(pid()) -> true %% @private %% @doc Stops the percept database, with a synchronous call. -spec stop_sync(pid())-> true. stop_sync(Pid)-> MonitorRef = erlang:monitor(process, Pid), stop(), receive {'DOWN', MonitorRef, _Type, Pid, _Info}-> true after ?STOP_TIMEOUT-> erlang:demonitor(MonitorRef, [flush]), exit(Pid, kill) end. %% @spec insert(tuple()) -> ok %% @doc Inserts a trace or profile message to the database. insert(Trace) -> percept_db ! {insert, Trace}, ok. %% @spec select({atom(), Options}) -> Result %% @doc Synchronous call. Selects information based on a query. %% %%
Queries:
%%%% {system, Option} %% Option = system_option() %% Result = timestamp() %% {information, Options} %% Options = [information_option()] %% Result = [#information{}] %% {scheduler, Options} %% Options = [sceduler_option()] %% Result = [#activity{}] %% {activity, Options} %% Options = [activity_option()] %% Result = [#activity{}] %%%%
%% Note: selection of Id's are always OR all other options are considered AND. %%
select(Query) -> percept_db ! {select, self(), Query}, receive Match -> Match end. %% @spec select(atom(), list()) -> Result %% @equiv select({Table,Options}) select(Table, Options) -> percept_db ! {select, self(), {Table, Options}}, receive Match -> Match end. %% @spec consolidate() -> Result %% @doc Checks timestamp and state-flow inconsistencies in the %% the database. consolidate() -> percept_db ! {action, consolidate}, ok. %%========================================================================== %% %% Database loop %% %%========================================================================== init_percept_db() -> % Proc and Port information ets:new(pdb_info, [named_table, private, {keypos, #information.id}, set]), % Scheduler runnability ets:new(pdb_scheduler, [named_table, private, {keypos, #activity.timestamp}, ordered_set]), % Process and Port runnability ets:new(pdb_activity, [named_table, private, {keypos, #activity.timestamp}, ordered_set]), % System status ets:new(pdb_system, [named_table, private, {keypos, 1}, set]), % System warnings ets:new(pdb_warnings, [named_table, private, {keypos, 1}, ordered_set]), put(debug, 0), loop_percept_db(). loop_percept_db() -> receive {insert, Trace} -> insert_trace(clean_trace(Trace)), loop_percept_db(); {select, Pid, Query} -> Pid ! select_query(Query), loop_percept_db(); {action, stop} -> stopped; {action, consolidate} -> consolidate_db(), loop_percept_db(); {operate, Pid, {Table, {Fun, Start}}} -> Result = ets:foldl(Fun, Start, Table), Pid ! Result, loop_percept_db(); Unhandled -> io:format("loop_percept_db, unhandled query: ~p~n", [Unhandled]), loop_percept_db() end. %%========================================================================== %% %% Auxiliary functions %% %%========================================================================== %% cleans trace messages from external pids clean_trace(Trace) when is_tuple(Trace) -> list_to_tuple(clean_trace(tuple_to_list(Trace))); clean_trace(Trace) when is_list(Trace) -> clean_list(Trace, []); clean_trace(Trace) when is_pid(Trace) -> PidStr = pid_to_list(Trace), [_,P2,P3p] = string:tokens(PidStr,"."), P3 = lists:sublist(P3p, 1, length(P3p) - 1), erlang:list_to_pid("<0." ++ P2 ++ "." ++ P3 ++ ">"); clean_trace(Trace) -> Trace. clean_list([], Out) -> lists:reverse(Out); clean_list([Element|Trace], Out) -> clean_list(Trace, [clean_trace(Element)|Out]). insert_trace(Trace) -> case Trace of {profile_start, Ts} -> update_system_start_ts(Ts), ok; {profile_stop, Ts} -> update_system_stop_ts(Ts), ok; %%% erlang:system_profile, option: runnable_procs %%% --------------------------------------------- {profile, Id, State, Mfa, TS} when is_pid(Id) -> % Update runnable count in activity and db case check_activity_consistency(Id, State) of invalid_state -> ignored; ok -> Rc = get_runnable_count(procs, State), % Update registered procs % insert proc activity update_activity(#activity{ id = Id, state = State, timestamp = TS, runnable_count = Rc, where = Mfa}), ok end; %%% erlang:system_profile, option: runnable_ports %%% --------------------------------------------- {profile, Id, State, Mfa, TS} when is_port(Id) -> case check_activity_consistency(Id, State) of invalid_state -> ignored; ok -> % Update runnable count in activity and db Rc = get_runnable_count(ports, State), % Update registered ports % insert port activity update_activity(#activity{ id = Id, state = State, timestamp = TS, runnable_count = Rc, where = Mfa}), ok end; %%% erlang:system_profile, option: scheduler {profile, scheduler, Id, State, Scheds, Ts} -> % insert scheduler activity update_scheduler(#activity{ id = {scheduler, Id}, state = State, timestamp = Ts, where = Scheds}), ok; %%% erlang:trace, option: procs %%% --------------------------- {trace_ts, Parent, spawn, Pid, Mfa, TS} -> InformativeMfa = mfa2informative(Mfa), % Update id_information update_information(#information{id = Pid, start = TS, parent = Parent, entry = InformativeMfa}), update_information_child(Parent, Pid), ok; {trace_ts, Pid, exit, _Reason, TS} -> % Update registered procs % Update id_information update_information(#information{id = Pid, stop = TS}), ok; {trace_ts, Pid, register, Name, _Ts} when is_pid(Pid) -> % Update id_information update_information(#information{id = Pid, name = Name}), ok; {trace_ts, Pid, register, Name, _Ts} when is_pid(Pid) -> % Update id_information update_information(#information{id = Pid, name = Name}), ok; {trace_ts, _Pid, unregister, _Name, _Ts} -> % Not implemented ok; {trace_ts, Pid, getting_unlinked, _Id, _Ts} when is_pid(Pid) -> % Update id_information ok; {trace_ts, Pid, getting_linked, _Id, _Ts} when is_pid(Pid)-> % Update id_information ok; {trace_ts, Pid, link, _Id, _Ts} when is_pid(Pid)-> % Update id_information ok; {trace_ts, Pid, unlink, _Id, _Ts} when is_pid(Pid) -> % Update id_information ok; %%% erlang:trace, option: ports %%% ---------------------------- {trace_ts, Caller, open, Port, Driver, TS} -> % Update id_information update_information(#information{ id = Port, entry = Driver, start = TS, parent = Caller}), ok; {trace_ts, Port, closed, _Reason, Ts} -> % Update id_information update_information(#information{id = Port, stop = Ts}), ok; Unhandled -> io:format("insert_trace, unhandled: ~p~n", [Unhandled]) end. mfa2informative({erlang, apply, [M, F, Args]}) -> mfa2informative({M, F,Args}); mfa2informative({erlang, apply, [Fun, Args]}) -> FunInfo = erlang:fun_info(Fun), M = case proplists:get_value(module, FunInfo, undefined) of [] -> undefined_fun_module; undefined -> undefined_fun_module; Module -> Module end, F = case proplists:get_value(name, FunInfo, undefined) of [] -> undefined_fun_function; undefined -> undefined_fun_function; Function -> Function end, mfa2informative({M, F, Args}); mfa2informative(Mfa) -> Mfa. %% consolidate_db() -> bool() %% Purpose: %% Check start/stop time %% Activity consistency consolidate_db() -> io:format("Consolidating...~n"), % Check start/stop timestamps case select_query({system, start_ts}) of undefined -> Min = lists:min(list_all_ts()), update_system_start_ts(Min); _ -> ok end, case select_query({system, stop_ts}) of undefined -> Max = lists:max(list_all_ts()), update_system_stop_ts(Max); _ -> ok end, consolidate_runnability(), ok. consolidate_runnability() -> put({runnable, procs}, undefined), put({runnable, ports}, undefined), consolidate_runnability_loop(ets:first(pdb_activity)). consolidate_runnability_loop('$end_of_table') -> ok; consolidate_runnability_loop(Key) -> case ets:lookup(pdb_activity, Key) of [#activity{id = Id, state = State } = A] when is_pid(Id) -> Rc = get_runnable_count(procs, State), ets:insert(pdb_activity, A#activity{ runnable_count = Rc}); [#activity{id = Id, state = State } = A] when is_port(Id) -> Rc = get_runnable_count(ports, State), ets:insert(pdb_activity, A#activity{ runnable_count = Rc}); _ -> throw(consolidate) end, consolidate_runnability_loop(ets:next(pdb_activity, Key)). list_all_ts() -> ATs = [Act#activity.timestamp || Act <- select_query({activity, []})], STs = [Act#activity.timestamp || Act <- select_query({scheduler, []})], ITs = lists:flatten([ [I#information.start, I#information.stop] || I <- select_query({information, all})]), %% Filter out all undefined (non ts) [Elem || Elem = {_,_,_} <- ATs ++ STs ++ ITs]. %% get_runnable_count(Type, State) -> RunnableCount %% In: %% Type = procs | ports %% State = active | inactive %% Out: %% RunnableCount = integer() %% Purpose: %% Keep track of the number of runnable ports and processes %% during the profile duration. get_runnable_count(Type, State) -> case {get({runnable, Type}), State} of {undefined, active} -> put({runnable, Type}, 1), 1; {N, active} -> put({runnable, Type}, N + 1), N + 1; {N, inactive} -> put({runnable, Type}, N - 1), N - 1; Unhandled -> io:format("get_runnable_count, unhandled ~p~n", [Unhandled]), Unhandled end. check_activity_consistency(Id, State) -> case get({previous_state, Id}) of State -> io:format("check_activity_consistency, state flow invalid.~n"), invalid_state; undefined when State == inactive -> invalid_state; _ -> put({previous_state, Id}, State), ok end. %%% %%% select_query %%% In: %%% Query = {Table, Option} %%% Table = system | activity | scheduler | information select_query(Query) -> case Query of {system, _ } -> select_query_system(Query); {activity, _ } -> select_query_activity(Query); {scheduler, _} -> select_query_scheduler(Query); {information, _ } -> select_query_information(Query); Unhandled -> io:format("select_query, unhandled: ~p~n", [Unhandled]), [] end. %%% select_query_information select_query_information(Query) -> case Query of {information, all} -> ets:select(pdb_info, [{ #information{ _ = '_'}, [], ['$_'] }]); {information, procs} -> ets:select(pdb_info, [{ #information{ id = '$1', _ = '_'}, [{is_pid, '$1'}], ['$_'] }]); {information, ports} -> ets:select(pdb_info, [{ #information{ id = '$1', _ = '_'}, [{is_port, '$1'}], ['$_'] }]); {information, Id} when is_port(Id) ; is_pid(Id) -> ets:select(pdb_info, [{ #information{ id = Id, _ = '_'}, [], ['$_'] }]); Unhandled -> io:format("select_query_information, unhandled: ~p~n", [Unhandled]), [] end. %%% select_query_scheduler select_query_scheduler(Query) -> case Query of {scheduler, Options} when is_list(Options) -> Head = #activity{ timestamp = '$1', id = '$2', state = '$3', where = '$4', _ = '_'}, Body = ['$_'], % We don't need id's {Constraints, _ } = activity_ms_and(Head, Options, [], []), ets:select(pdb_scheduler, [{Head, Constraints, Body}]); Unhandled -> io:format("select_query_scheduler, unhandled: ~p~n", [Unhandled]), [] end. %%% select_query_system select_query_system(Query) -> case Query of {system, start_ts} -> case ets:lookup(pdb_system, {system, start_ts}) of [] -> undefined; [{{system, start_ts}, StartTS}] -> StartTS end; {system, stop_ts} -> case ets:lookup(pdb_system, {system, stop_ts}) of [] -> undefined; [{{system, stop_ts}, StopTS}] -> StopTS end; Unhandled -> io:format("select_query_system, unhandled: ~p~n", [Unhandled]), [] end. %%% select_query_activity select_query_activity(Query) -> case Query of {activity, Options} when is_list(Options) -> case lists:member({ts_exact, true},Options) of true -> case catch select_query_activity_exact_ts(Options) of {'EXIT', Reason} -> io:format(" - select_query_activity [ catch! ]: ~p~n", [Reason]), []; Match -> Match end; false -> MS = activity_ms(Options), case catch ets:select(pdb_activity, MS) of {'EXIT', Reason} -> io:format(" - select_query_activity [ catch! ]: ~p~n", [Reason]), []; Match -> Match end end; Unhandled -> io:format("select_query_activity, unhandled: ~p~n", [Unhandled]), [] end. select_query_activity_exact_ts(Options) -> case { proplists:get_value(ts_min, Options, undefined), proplists:get_value(ts_max, Options, undefined) } of {undefined, undefined} -> []; {undefined, _ } -> []; {_ , undefined} -> []; {TsMin , TsMax } -> % Remove unwanted options Opts = lists_filter([ts_exact], Options), Ms = activity_ms(Opts), case ets:select(pdb_activity, Ms) of % no entries within interval [] -> Opts2 = lists_filter([ts_max, ts_min], Opts) ++ [{ts_min, TsMax}], Ms2 = activity_ms(Opts2), case ets:select(pdb_activity, Ms2, 1) of '$end_of_table' -> []; {[E], _} -> [PrevAct] = ets:lookup(pdb_activity, ets:prev(pdb_activity, E#activity.timestamp)), [PrevAct#activity{ timestamp = TsMin} , E] end; Acts -> [Head| _] = Acts, if Head#activity.timestamp == TsMin -> Acts; true -> PrevTs = ets:prev(pdb_activity, Head#activity.timestamp), case ets:lookup(pdb_activity, PrevTs) of [] -> Acts; [PrevAct] -> [PrevAct#activity{timestamp = TsMin}|Acts] end end end end. lists_filter([], Options) -> Options; lists_filter([D|Ds], Options) -> lists_filter(Ds, lists:filter( fun ({Pred, _}) -> if Pred == D -> false; true -> true end end, Options)). % Options: % {ts_min, timestamp()} % {ts_max, timestamp()} % {mfa, mfa()} % {state, active | inactive} % {id, all | procs | ports | pid() | port()} % % All options are regarded as AND expect id which are regarded as OR % For example: [{ts_min, TS1}, {ts_max, TS2}, {id, PID1}, {id, PORT1}] would be % ({ts_min, TS1} and {ts_max, TS2} and {id, PID1}) or % ({ts_min, TS1} and {ts_max, TS2} and {id, PORT1}). activity_ms(Opts) -> % {activity, Timestamp, State, Mfa} Head = #activity{ timestamp = '$1', id = '$2', state = '$3', where = '$4', _ = '_'}, {Conditions, IDs} = activity_ms_and(Head, Opts, [], []), Body = ['$_'], lists:foldl( fun (Option, MS) -> case Option of {id, ports} -> [{Head, [{is_port, Head#activity.id} | Conditions], Body} | MS]; {id, procs} -> [{Head,[{is_pid, Head#activity.id} | Conditions], Body} | MS]; {id, ID} when is_pid(ID) ; is_port(ID) -> [{Head,[{'==', Head#activity.id, ID} | Conditions], Body} | MS]; {id, all} -> [{Head, Conditions,Body} | MS]; _ -> io:format("activity_ms id dropped ~p~n", [Option]), MS end end, [], IDs). activity_ms_and(_, [], Constraints, []) -> {Constraints, [{id, all}]}; activity_ms_and(_, [], Constraints, IDs) -> {Constraints, IDs}; activity_ms_and(Head, [Opt|Opts], Constraints, IDs) -> case Opt of {ts_min, Min} -> activity_ms_and(Head, Opts, [{'>=', Head#activity.timestamp, {Min}} | Constraints], IDs); {ts_max, Max} -> activity_ms_and(Head, Opts, [{'=<', Head#activity.timestamp, {Max}} | Constraints], IDs); {id, ID} -> activity_ms_and(Head, Opts, Constraints, [{id, ID} | IDs]); {state, State} -> activity_ms_and(Head, Opts, [{'==', Head#activity.state, State} | Constraints], IDs); {mfa, Mfa} -> activity_ms_and(Head, Opts, [{'==', Head#activity.where, {Mfa}}| Constraints], IDs); _ -> io:format("activity_ms_and option dropped ~p~n", [Opt]), activity_ms_and(Head, Opts, Constraints, IDs) end. % Information = information() %%% %%% update_information %%% update_information(#information{id = Id} = NewInfo) -> case ets:lookup(pdb_info, Id) of [] -> ets:insert(pdb_info, NewInfo), ok; [Info] -> % Remake NewInfo and Info to lists then substitute % old values for new values that are not undefined or empty lists. {_, Result} = lists:foldl( fun (InfoElem, {[NewInfoElem | Tail], Out}) -> case NewInfoElem of undefined -> {Tail, [InfoElem | Out]}; [] -> {Tail, [InfoElem | Out]}; NewInfoElem -> {Tail, [NewInfoElem | Out]} end end, {tuple_to_list(NewInfo), []}, tuple_to_list(Info)), ets:insert(pdb_info, list_to_tuple(lists:reverse(Result))), ok end. update_information_child(Id, Child) -> case ets:lookup(pdb_info, Id) of [] -> ets:insert(pdb_info,#information{ id = Id, children = [Child]}), ok; [I] -> ets:insert(pdb_info,I#information{children = [Child | I#information.children]}), ok end. %%% %%% update_activity %%% update_scheduler(Activity) -> ets:insert(pdb_scheduler, Activity). update_activity(Activity) -> ets:insert(pdb_activity, Activity). %%% %%% update_system_ts %%% update_system_start_ts(TS) -> case ets:lookup(pdb_system, {system, start_ts}) of [] -> ets:insert(pdb_system, {{system, start_ts}, TS}); [{{system, start_ts}, StartTS}] -> DT = ?seconds(StartTS, TS), if DT > 0.0 -> ets:insert(pdb_system, {{system, start_ts}, TS}); true -> ok end; Unhandled -> io:format("update_system_start_ts, unhandled ~p ~n", [Unhandled]) end. update_system_stop_ts(TS) -> case ets:lookup(pdb_system, {system, stop_ts}) of [] -> ets:insert(pdb_system, {{system, stop_ts}, TS}); [{{system, stop_ts}, StopTS}] -> DT = ?seconds(StopTS, TS), if DT < 0.0 -> ets:insert(pdb_system, {{system, stop_ts}, TS}); true -> ok end; Unhandled -> io:format("update_system_stop_ts, unhandled ~p ~n", [Unhandled]) end.