From 84adefa331c4159d432d22840663c38f155cd4c1 Mon Sep 17 00:00:00 2001 From: Erlang/OTP Date: Fri, 20 Nov 2009 14:54:40 +0000 Subject: The R13B03 release. --- lib/percept/src/percept_db.erl | 768 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 768 insertions(+) create mode 100644 lib/percept/src/percept_db.erl (limited to 'lib/percept/src/percept_db.erl') diff --git a/lib/percept/src/percept_db.erl b/lib/percept/src/percept_db.erl new file mode 100644 index 0000000000..dc85fa3510 --- /dev/null +++ b/lib/percept/src/percept_db.erl @@ -0,0 +1,768 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2007-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% + +%% +%% @doc Percept database. +%% +%% + +-module(percept_db). + +-export([ + start/0, + stop/0, + insert/1, + select/2, + select/1, + consolidate/0 + ]). + +-include("percept.hrl"). + +%%========================================================================== +%% +%% Type definitions +%% +%%========================================================================== + +%% @type activity_option() = +%% {ts_min, timestamp()} | +%% {ts_max, timestamp()} | +%% {ts_exact, bool()} | +%% {mfa, {atom(), atom(), byte()}} | +%% {state, active | inactive} | +%% {id, all | procs | ports | pid() | port()} + +%% @type scheduler_option() = +%% {ts_min, timestamp()} | +%% {ts_max, timestamp()} | +%% {ts_exact, bool()} | +%% {id, scheduler_id()} + +%% @type system_option() = start_ts | stop_ts + +%% @type information_option() = +%% all | procs | ports | pid() | port() + + + + +%%========================================================================== +%% +%% Interface functions +%% +%%========================================================================== + +%% @spec start() -> ok | {started, Pid} | {restarted, Pid} +%% Pid = pid() +%% @doc Starts or restarts the percept database. + +-spec(start/0 :: () -> {'started', pid()} | {'restarted', pid()}). + +start() -> + case erlang:whereis(percept_db) of + undefined -> + Pid = spawn( fun() -> init_percept_db() end), + erlang:register(percept_db, Pid), + {started, Pid}; + PerceptDB -> + erlang:unregister(percept_db), + PerceptDB ! {action, stop}, + Pid = spawn( fun() -> init_percept_db() end), + erlang:register(percept_db, Pid), + {restarted, Pid} + end. + +%% @spec stop() -> not_started | {stopped, Pid} +%% Pid = pid() +%% @doc Stops the percept database. + +-spec(stop/0 :: () -> 'not_started' | {'stopped', pid()}). + +stop() -> + case erlang:whereis(percept_db) of + undefined -> + not_started; + Pid -> + Pid ! {action, stop}, + {stopped, Pid} + end. + +%% @spec insert(tuple()) -> ok +%% @doc Inserts a trace or profile message to the database. + +insert(Trace) -> + percept_db ! {insert, Trace}, + ok. + + +%% @spec select({atom(), Options}) -> Result +%% @doc Synchronous call. Selects information based on a query. +%% +%%

Queries:

+%%
+%% {system, Option}
+%%	Option = system_option()
+%%	Result = timestamp() 
+%% {information, Options}
+%%	Options = [information_option()]
+%%	Result = [#information{}] 
+%% {scheduler, Options}
+%%	Options = [sceduler_option()]
+%%	Result = [#activity{}]
+%% {activity, Options}
+%%	Options = [activity_option()]
+%%	Result = [#activity{}]
+%% 
+%%

+%% Note: selection of Id's are always OR all other options are considered AND. +%%

+ +select(Query) -> + percept_db ! {select, self(), Query}, + receive Match -> Match end. + +%% @spec select(atom(), list()) -> Result +%% @equiv select({Table,Options}) + +select(Table, Options) -> + percept_db ! {select, self(), {Table, Options}}, + receive Match -> Match end. + +%% @spec consolidate() -> Result +%% @doc Checks timestamp and state-flow inconsistencies in the +%% the database. + +consolidate() -> + percept_db ! {action, consolidate}, + ok. + +%%========================================================================== +%% +%% Database loop +%% +%%========================================================================== + +init_percept_db() -> + % Proc and Port information + ets:new(pdb_info, [named_table, private, {keypos, #information.id}, set]), + + % Scheduler runnability + ets:new(pdb_scheduler, [named_table, private, {keypos, #activity.timestamp}, ordered_set]), + + % Process and Port runnability + ets:new(pdb_activity, [named_table, private, {keypos, #activity.timestamp}, ordered_set]), + + % System status + ets:new(pdb_system, [named_table, private, {keypos, 1}, set]), + + % System warnings + ets:new(pdb_warnings, [named_table, private, {keypos, 1}, ordered_set]), + put(debug, 0), + loop_percept_db(). + +loop_percept_db() -> + receive + {insert, Trace} -> + insert_trace(clean_trace(Trace)), + loop_percept_db(); + {select, Pid, Query} -> + Pid ! select_query(Query), + loop_percept_db(); + {action, stop} -> + stopped; + {action, consolidate} -> + consolidate_db(), + loop_percept_db(); + {operate, Pid, {Table, {Fun, Start}}} -> + Result = ets:foldl(Fun, Start, Table), + Pid ! Result, + loop_percept_db(); + Unhandled -> + io:format("loop_percept_db, unhandled query: ~p~n", [Unhandled]), + loop_percept_db() + end. + +%%========================================================================== +%% +%% Auxiliary functions +%% +%%========================================================================== + +%% cleans trace messages from external pids + +clean_trace(Trace) when is_tuple(Trace) -> list_to_tuple(clean_trace(tuple_to_list(Trace))); +clean_trace(Trace) when is_list(Trace) -> clean_list(Trace, []); +clean_trace(Trace) when is_pid(Trace) -> + PidStr = pid_to_list(Trace), + [_,P2,P3p] = string:tokens(PidStr,"."), + P3 = lists:sublist(P3p, 1, length(P3p) - 1), + erlang:list_to_pid("<0." ++ P2 ++ "." ++ P3 ++ ">"); +clean_trace(Trace) -> Trace. + +clean_list([], Out) -> lists:reverse(Out); +clean_list([Element|Trace], Out) -> + clean_list(Trace, [clean_trace(Element)|Out]). + + +insert_trace(Trace) -> + case Trace of + {profile_start, Ts} -> + update_system_start_ts(Ts), + ok; + {profile_stop, Ts} -> + update_system_stop_ts(Ts), + ok; + %%% erlang:system_profile, option: runnable_procs + %%% --------------------------------------------- + {profile, Id, State, Mfa, TS} when is_pid(Id) -> + % Update runnable count in activity and db + + case check_activity_consistency(Id, State) of + invalid_state -> + ignored; + ok -> + Rc = get_runnable_count(procs, State), + % Update registered procs + % insert proc activity + update_activity(#activity{ + id = Id, + state = State, + timestamp = TS, + runnable_count = Rc, + where = Mfa}), + ok + end; + %%% erlang:system_profile, option: runnable_ports + %%% --------------------------------------------- + {profile, Id, State, Mfa, TS} when is_port(Id) -> + case check_activity_consistency(Id, State) of + invalid_state -> + ignored; + ok -> + % Update runnable count in activity and db + Rc = get_runnable_count(ports, State), + + % Update registered ports + % insert port activity + update_activity(#activity{ + id = Id, + state = State, + timestamp = TS, + runnable_count = Rc, + where = Mfa}), + + ok + end; + %%% erlang:system_profile, option: scheduler + {profile, scheduler, Id, State, Scheds, Ts} -> + % insert scheduler activity + update_scheduler(#activity{ + id = {scheduler, Id}, + state = State, + timestamp = Ts, + where = Scheds}), + ok; + + %%% erlang:trace, option: procs + %%% --------------------------- + {trace_ts, Parent, spawn, Pid, Mfa, TS} -> + InformativeMfa = mfa2informative(Mfa), + % Update id_information + update_information(#information{id = Pid, start = TS, parent = Parent, entry = InformativeMfa}), + update_information_child(Parent, Pid), + ok; + {trace_ts, Pid, exit, _Reason, TS} -> + % Update registered procs + + % Update id_information + update_information(#information{id = Pid, stop = TS}), + + ok; + {trace_ts, Pid, register, Name, _Ts} when is_pid(Pid) -> + % Update id_information + update_information(#information{id = Pid, name = Name}), + ok; + {trace_ts, Pid, register, Name, _Ts} when is_pid(Pid) -> + % Update id_information + update_information(#information{id = Pid, name = Name}), + ok; + {trace_ts, _Pid, unregister, _Name, _Ts} -> + % Not implemented + ok; + {trace_ts, Pid, getting_unlinked, _Id, _Ts} when is_pid(Pid) -> + % Update id_information + ok; + {trace_ts, Pid, getting_linked, _Id, _Ts} when is_pid(Pid)-> + % Update id_information + ok; + {trace_ts, Pid, link, _Id, _Ts} when is_pid(Pid)-> + % Update id_information + ok; + {trace_ts, Pid, unlink, _Id, _Ts} when is_pid(Pid) -> + % Update id_information + ok; + + %%% erlang:trace, option: ports + %%% ---------------------------- + {trace_ts, Caller, open, Port, Driver, TS} -> + % Update id_information + update_information(#information{ + id = Port, entry = Driver, start = TS, parent = Caller}), + ok; + {trace_ts, Port, closed, _Reason, Ts} -> + % Update id_information + update_information(#information{id = Port, stop = Ts}), + ok; + + Unhandled -> + io:format("insert_trace, unhandled: ~p~n", [Unhandled]) + end. + +mfa2informative({erlang, apply, [M, F, Args]}) -> mfa2informative({M, F,Args}); +mfa2informative({erlang, apply, [Fun, Args]}) -> + FunInfo = erlang:fun_info(Fun), + M = case proplists:get_value(module, FunInfo, undefined) of + [] -> undefined_fun_module; + undefined -> undefined_fun_module; + Module -> Module + end, + F = case proplists:get_value(name, FunInfo, undefined) of + [] -> undefined_fun_function; + undefined -> undefined_fun_function; + Function -> Function + end, + mfa2informative({M, F, Args}); +mfa2informative(Mfa) -> Mfa. + +%% consolidate_db() -> bool() +%% Purpose: +%% Check start/stop time +%% Activity consistency + +consolidate_db() -> + io:format("Consolidating...~n"), + % Check start/stop timestamps + case select_query({system, start_ts}) of + undefined -> + Min = lists:min(list_all_ts()), + update_system_start_ts(Min); + _ -> ok + end, + case select_query({system, stop_ts}) of + undefined -> + Max = lists:max(list_all_ts()), + update_system_stop_ts(Max); + _ -> ok + end, + consolidate_runnability(), + ok. + +consolidate_runnability() -> + put({runnable, procs}, undefined), + put({runnable, ports}, undefined), + consolidate_runnability_loop(ets:first(pdb_activity)). + +consolidate_runnability_loop('$end_of_table') -> ok; +consolidate_runnability_loop(Key) -> + case ets:lookup(pdb_activity, Key) of + [#activity{id = Id, state = State } = A] when is_pid(Id) -> + Rc = get_runnable_count(procs, State), + ets:insert(pdb_activity, A#activity{ runnable_count = Rc}); + [#activity{id = Id, state = State } = A] when is_port(Id) -> + Rc = get_runnable_count(ports, State), + ets:insert(pdb_activity, A#activity{ runnable_count = Rc}); + _ -> throw(consolidate) + end, + consolidate_runnability_loop(ets:next(pdb_activity, Key)). + +list_all_ts() -> + ATs = [ Act#activity.timestamp || + Act <- select_query({activity, []})], + STs = [ Act#activity.timestamp || + Act <- select_query({scheduler, []})], + ITs = lists:flatten([ + [I#information.start, + I#information.stop] || + I <- select_query({information, all})]), + % Filter out all undefined (non ts) + TsList = lists:filter( + fun(Element) -> + case Element of + {_,_,_} -> true; + _ -> false + end + end, ATs ++ STs ++ ITs), + TsList. + +%% get_runnable_count(Type, State) -> RunnableCount +%% In: +%% Type = procs | ports +%% State = active | inactive +%% Out: +%% RunnableCount = integer() +%% Purpose: +%% Keep track of the number of runnable ports and processes +%% during the profile duration. + +get_runnable_count(Type, State) -> + case {get({runnable, Type}), State} of + {undefined, active} -> + put({runnable, Type}, 1), + 1; + {N, active} -> + put({runnable, Type}, N + 1), + N + 1; + {N, inactive} -> + put({runnable, Type}, N - 1), + N - 1; + Unhandled -> + io:format("get_runnable_count, unhandled ~p~n", [Unhandled]), + Unhandled + end. + +check_activity_consistency(Id, State) -> + case get({previous_state, Id}) of + State -> + io:format("check_activity_consistency, state flow invalid.~n"), + invalid_state; + undefined when State == inactive -> + invalid_state; + _ -> + put({previous_state, Id}, State), + ok + end. +%%% +%%% select_query +%%% In: +%%% Query = {Table, Option} +%%% Table = system | activity | scheduler | information + + +select_query(Query) -> + case Query of + {system, _ } -> + select_query_system(Query); + {activity, _ } -> + select_query_activity(Query); + {scheduler, _} -> + select_query_scheduler(Query); + {information, _ } -> + select_query_information(Query); + Unhandled -> + io:format("select_query, unhandled: ~p~n", [Unhandled]), + [] + end. + +%%% select_query_information + +select_query_information(Query) -> + case Query of + {information, all} -> + ets:select(pdb_info, [{ + #information{ _ = '_'}, + [], + ['$_'] + }]); + {information, procs} -> + ets:select(pdb_info, [{ + #information{ id = '$1', _ = '_'}, + [{is_pid, '$1'}], + ['$_'] + }]); + {information, ports} -> + ets:select(pdb_info, [{ + #information{ id = '$1', _ = '_'}, + [{is_port, '$1'}], + ['$_'] + }]); + {information, Id} when is_port(Id) ; is_pid(Id) -> + ets:select(pdb_info, [{ + #information{ id = Id, _ = '_'}, + [], + ['$_'] + }]); + Unhandled -> + io:format("select_query_information, unhandled: ~p~n", [Unhandled]), + [] + end. + +%%% select_query_scheduler + +select_query_scheduler(Query) -> + case Query of + {scheduler, Options} when is_list(Options) -> + Head = #activity{ + timestamp = '$1', + id = '$2', + state = '$3', + where = '$4', + _ = '_'}, + Body = ['$_'], + % We don't need id's + {Constraints, _ } = activity_ms_and(Head, Options, [], []), + ets:select(pdb_scheduler, [{Head, Constraints, Body}]); + Unhandled -> + io:format("select_query_scheduler, unhandled: ~p~n", [Unhandled]), + [] + end. + +%%% select_query_system + +select_query_system(Query) -> + case Query of + {system, start_ts} -> + case ets:lookup(pdb_system, {system, start_ts}) of + [] -> undefined; + [{{system, start_ts}, StartTS}] -> StartTS + end; + {system, stop_ts} -> + case ets:lookup(pdb_system, {system, stop_ts}) of + [] -> undefined; + [{{system, stop_ts}, StopTS}] -> StopTS + end; + Unhandled -> + io:format("select_query_system, unhandled: ~p~n", [Unhandled]), + [] + end. + +%%% select_query_activity + +select_query_activity(Query) -> + case Query of + {activity, Options} when is_list(Options) -> + case lists:member({ts_exact, true},Options) of + true -> + case catch select_query_activity_exact_ts(Options) of + {'EXIT', Reason} -> + io:format(" - select_query_activity [ catch! ]: ~p~n", [Reason]), + []; + Match -> + Match + end; + false -> + MS = activity_ms(Options), + case catch ets:select(pdb_activity, MS) of + {'EXIT', Reason} -> + io:format(" - select_query_activity [ catch! ]: ~p~n", [Reason]), + []; + Match -> + Match + end + end; + Unhandled -> + io:format("select_query_activity, unhandled: ~p~n", [Unhandled]), + [] + end. + +select_query_activity_exact_ts(Options) -> + case { proplists:get_value(ts_min, Options, undefined), proplists:get_value(ts_max, Options, undefined) } of + {undefined, undefined} -> []; + {undefined, _ } -> []; + {_ , undefined} -> []; + {TsMin , TsMax } -> + % Remove unwanted options + Opts = lists_filter([ts_exact], Options), + Ms = activity_ms(Opts), + case ets:select(pdb_activity, Ms) of + % no entries within interval + [] -> + Opts2 = lists_filter([ts_max, ts_min], Opts) ++ [{ts_min, TsMax}], + Ms2 = activity_ms(Opts2), + case ets:select(pdb_activity, Ms2, 1) of + '$end_of_table' -> []; + {[E], _} -> + [PrevAct] = ets:lookup(pdb_activity, ets:prev(pdb_activity, E#activity.timestamp)), + [PrevAct#activity{ timestamp = TsMin} , E] + end; + Acts -> + [Head| _] = Acts, + if + Head#activity.timestamp == TsMin -> Acts; + true -> + PrevTs = ets:prev(pdb_activity, Head#activity.timestamp), + case ets:lookup(pdb_activity, PrevTs) of + [] -> Acts; + [PrevAct] -> [PrevAct#activity{timestamp = TsMin}|Acts] + end + end + end + end. + +lists_filter([], Options) -> Options; +lists_filter([D|Ds], Options) -> + lists_filter(Ds, lists:filter( + fun ({Pred, _}) -> + if + Pred == D -> false; + true -> true + end + end, Options)). + +% Options: +% {ts_min, timestamp()} +% {ts_max, timestamp()} +% {mfa, mfa()} +% {state, active | inactive} +% {id, all | procs | ports | pid() | port()} +% +% All options are regarded as AND expect id which are regarded as OR +% For example: [{ts_min, TS1}, {ts_max, TS2}, {id, PID1}, {id, PORT1}] would be +% ({ts_min, TS1} and {ts_max, TS2} and {id, PID1}) or +% ({ts_min, TS1} and {ts_max, TS2} and {id, PORT1}). + +activity_ms(Opts) -> + % {activity, Timestamp, State, Mfa} + Head = #activity{ + timestamp = '$1', + id = '$2', + state = '$3', + where = '$4', + _ = '_'}, + + {Conditions, IDs} = activity_ms_and(Head, Opts, [], []), + Body = ['$_'], + + lists:foldl( + fun (Option, MS) -> + case Option of + {id, ports} -> + [{Head, [{is_port, Head#activity.id} | Conditions], Body} | MS]; + {id, procs} -> + [{Head,[{is_pid, Head#activity.id} | Conditions], Body} | MS]; + {id, ID} when is_pid(ID) ; is_port(ID) -> + [{Head,[{'==', Head#activity.id, ID} | Conditions], Body} | MS]; + {id, all} -> + [{Head, Conditions,Body} | MS]; + _ -> + io:format("activity_ms id dropped ~p~n", [Option]), + MS + end + end, [], IDs). + +activity_ms_and(_, [], Constraints, []) -> + {Constraints, [{id, all}]}; +activity_ms_and(_, [], Constraints, IDs) -> + {Constraints, IDs}; +activity_ms_and(Head, [Opt|Opts], Constraints, IDs) -> + case Opt of + {ts_min, Min} -> + activity_ms_and(Head, Opts, + [{'>=', Head#activity.timestamp, {Min}} | Constraints], IDs); + {ts_max, Max} -> + activity_ms_and(Head, Opts, + [{'=<', Head#activity.timestamp, {Max}} | Constraints], IDs); + {id, ID} -> + activity_ms_and(Head, Opts, + Constraints, [{id, ID} | IDs]); + {state, State} -> + activity_ms_and(Head, Opts, + [{'==', Head#activity.state, State} | Constraints], IDs); + {mfa, Mfa} -> + activity_ms_and(Head, Opts, + [{'==', Head#activity.where, {Mfa}}| Constraints], IDs); + _ -> + io:format("activity_ms_and option dropped ~p~n", [Opt]), + activity_ms_and(Head, Opts, Constraints, IDs) + end. + +% Information = information() + +%%% +%%% update_information +%%% + + +update_information(#information{id = Id} = NewInfo) -> + case ets:lookup(pdb_info, Id) of + [] -> + ets:insert(pdb_info, NewInfo), + ok; + [Info] -> + % Remake NewInfo and Info to lists then substitute + % old values for new values that are not undefined or empty lists. + + {_, Result} = lists:foldl( + fun (InfoElem, {[NewInfoElem | Tail], Out}) -> + case NewInfoElem of + undefined -> + {Tail, [InfoElem | Out]}; + [] -> + {Tail, [InfoElem | Out]}; + NewInfoElem -> + {Tail, [NewInfoElem | Out]} + end + end, {tuple_to_list(NewInfo), []}, tuple_to_list(Info)), + ets:insert(pdb_info, list_to_tuple(lists:reverse(Result))), + ok + end. + +update_information_child(Id, Child) -> + case ets:lookup(pdb_info, Id) of + [] -> + ets:insert(pdb_info,#information{ + id = Id, + children = [Child]}), + ok; + [I] -> + ets:insert(pdb_info,I#information{children = [Child | I#information.children]}), + ok + end. + +%%% +%%% update_activity +%%% +update_scheduler(Activity) -> + ets:insert(pdb_scheduler, Activity). + +update_activity(Activity) -> + ets:insert(pdb_activity, Activity). + +%%% +%%% update_system_ts +%%% + +update_system_start_ts(TS) -> + case ets:lookup(pdb_system, {system, start_ts}) of + [] -> + ets:insert(pdb_system, {{system, start_ts}, TS}); + [{{system, start_ts}, StartTS}] -> + DT = ?seconds(StartTS, TS), + if + DT > 0.0 -> ets:insert(pdb_system, {{system, start_ts}, TS}); + true -> ok + end; + Unhandled -> + io:format("update_system_start_ts, unhandled ~p ~n", [Unhandled]) + end. + +update_system_stop_ts(TS) -> + case ets:lookup(pdb_system, {system, stop_ts}) of + [] -> + ets:insert(pdb_system, {{system, stop_ts}, TS}); + [{{system, stop_ts}, StopTS}] -> + DT = ?seconds(StopTS, TS), + if + DT < 0.0 -> ets:insert(pdb_system, {{system, stop_ts}, TS}); + true -> ok + end; + Unhandled -> + io:format("update_system_stop_ts, unhandled ~p ~n", [Unhandled]) + end. + + -- cgit v1.2.3